raw/ifpga: fix file handle leak
[dpdk.git] / examples / eventdev_pipeline / pipeline_worker_tx.c
index a0f40c2..a82e064 100644 (file)
@@ -18,21 +18,22 @@ static __rte_always_inline void
 worker_event_enqueue(const uint8_t dev, const uint8_t port,
                struct rte_event *ev)
 {
-       while (rte_event_enqueue_burst(dev, port, ev, 1) != 1)
+       while (!rte_event_enqueue_burst(dev, port, ev, 1) && !fdata->done)
                rte_pause();
 }
 
-static __rte_always_inline void
+static __rte_always_inline uint16_t
 worker_event_enqueue_burst(const uint8_t dev, const uint8_t port,
-               struct rte_event *ev, const uint16_t nb_rx)
+                          struct rte_event *ev, const uint16_t nb_rx)
 {
        uint16_t enq;
 
        enq = rte_event_enqueue_burst(dev, port, ev, nb_rx);
-       while (enq < nb_rx) {
+       while (enq < nb_rx && !fdata->done)
                enq += rte_event_enqueue_burst(dev, port,
                                                ev + enq, nb_rx - enq);
-       }
+
+       return enq;
 }
 
 static __rte_always_inline void
@@ -40,7 +41,8 @@ worker_tx_pkt(const uint8_t dev, const uint8_t port, struct rte_event *ev)
 {
        exchange_mac(ev->mbuf);
        rte_event_eth_tx_adapter_txq_set(ev->mbuf, 0);
-       while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0))
+       while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0) &&
+              !fdata->done)
                rte_pause();
 }
 
@@ -76,6 +78,11 @@ worker_do_tx_single(void *arg)
                }
        }
 
+       if (ev.u64) {
+               ev.op = RTE_EVENT_OP_RELEASE;
+               rte_event_enqueue_burst(dev, port, &ev, 1);
+       }
+
        if (!cdata.quiet)
                printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
                                rte_lcore_id(), received, fwd, tx);
@@ -111,6 +118,11 @@ worker_do_tx_single_atq(void *arg)
                }
        }
 
+       if (ev.u64) {
+               ev.op = RTE_EVENT_OP_RELEASE;
+               rte_event_enqueue_burst(dev, port, &ev, 1);
+       }
+
        if (!cdata.quiet)
                printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
                                rte_lcore_id(), received, fwd, tx);
@@ -126,11 +138,10 @@ worker_do_tx_single_burst(void *arg)
        const uint8_t dev = data->dev_id;
        const uint8_t port = data->port_id;
        size_t fwd = 0, received = 0, tx = 0;
+       uint16_t nb_tx = 0, nb_rx = 0, i;
 
        while (!fdata->done) {
-               uint16_t i;
-               uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
-                               BATCH_SIZE, 0);
+               nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0);
 
                if (!nb_rx) {
                        rte_pause();
@@ -153,10 +164,12 @@ worker_do_tx_single_burst(void *arg)
                        work();
                }
 
-               worker_event_enqueue_burst(dev, port, ev, nb_rx);
-               fwd += nb_rx;
+               nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx);
+               fwd += nb_tx;
        }
 
+       worker_cleanup(dev, port, ev, nb_tx, nb_rx);
+
        if (!cdata.quiet)
                printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
                                rte_lcore_id(), received, fwd, tx);
@@ -172,11 +185,10 @@ worker_do_tx_single_burst_atq(void *arg)
        const uint8_t dev = data->dev_id;
        const uint8_t port = data->port_id;
        size_t fwd = 0, received = 0, tx = 0;
+       uint16_t i, nb_rx = 0, nb_tx = 0;
 
        while (!fdata->done) {
-               uint16_t i;
-               uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
-                               BATCH_SIZE, 0);
+               nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0);
 
                if (!nb_rx) {
                        rte_pause();
@@ -197,10 +209,12 @@ worker_do_tx_single_burst_atq(void *arg)
                        work();
                }
 
-               worker_event_enqueue_burst(dev, port, ev, nb_rx);
-               fwd += nb_rx;
+               nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx);
+               fwd += nb_tx;
        }
 
+       worker_cleanup(dev, port, ev, nb_tx, nb_rx);
+
        if (!cdata.quiet)
                printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
                                rte_lcore_id(), received, fwd, tx);
@@ -251,6 +265,11 @@ worker_do_tx(void *arg)
                fwd++;
        }
 
+       if (ev.u64) {
+               ev.op = RTE_EVENT_OP_RELEASE;
+               rte_event_enqueue_burst(dev, port, &ev, 1);
+       }
+
        if (!cdata.quiet)
                printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
                                rte_lcore_id(), received, fwd, tx);
@@ -297,6 +316,11 @@ worker_do_tx_atq(void *arg)
                fwd++;
        }
 
+       if (ev.u64) {
+               ev.op = RTE_EVENT_OP_RELEASE;
+               rte_event_enqueue_burst(dev, port, &ev, 1);
+       }
+
        if (!cdata.quiet)
                printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
                                rte_lcore_id(), received, fwd, tx);
@@ -314,11 +338,10 @@ worker_do_tx_burst(void *arg)
        uint8_t port = data->port_id;
        uint8_t lst_qid = cdata.num_stages - 1;
        size_t fwd = 0, received = 0, tx = 0;
+       uint16_t i, nb_rx = 0, nb_tx = 0;
 
        while (!fdata->done) {
-               uint16_t i;
-               const uint16_t nb_rx = rte_event_dequeue_burst(dev, port,
-                               ev, BATCH_SIZE, 0);
+               nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0);
 
                if (nb_rx == 0) {
                        rte_pause();
@@ -347,11 +370,13 @@ worker_do_tx_burst(void *arg)
                        }
                        work();
                }
-               worker_event_enqueue_burst(dev, port, ev, nb_rx);
 
-               fwd += nb_rx;
+               nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx);
+               fwd += nb_tx;
        }
 
+       worker_cleanup(dev, port, ev, nb_tx, nb_rx);
+
        if (!cdata.quiet)
                printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
                                rte_lcore_id(), received, fwd, tx);
@@ -369,12 +394,10 @@ worker_do_tx_burst_atq(void *arg)
        uint8_t port = data->port_id;
        uint8_t lst_qid = cdata.num_stages - 1;
        size_t fwd = 0, received = 0, tx = 0;
+       uint16_t i, nb_rx = 0, nb_tx = 0;
 
        while (!fdata->done) {
-               uint16_t i;
-
-               const uint16_t nb_rx = rte_event_dequeue_burst(dev, port,
-                               ev, BATCH_SIZE, 0);
+               nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0);
 
                if (nb_rx == 0) {
                        rte_pause();
@@ -402,10 +425,12 @@ worker_do_tx_burst_atq(void *arg)
                        work();
                }
 
-               worker_event_enqueue_burst(dev, port, ev, nb_rx);
-               fwd += nb_rx;
+               nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx);
+               fwd += nb_tx;
        }
 
+       worker_cleanup(dev, port, ev, nb_tx, nb_rx);
+
        if (!cdata.quiet)
                printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
                                rte_lcore_id(), received, fwd, tx);
@@ -436,6 +461,7 @@ setup_eventdev_worker_tx_enq(struct worker_data *worker_data)
        struct rte_event_dev_config config = {
                        .nb_event_queues = nb_queues,
                        .nb_event_ports = nb_ports,
+                       .nb_single_link_event_port_queues = 0,
                        .nb_events_limit  = 4096,
                        .nb_event_queue_flows = 1024,
                        .nb_event_port_dequeue_depth = 128,
@@ -445,6 +471,7 @@ setup_eventdev_worker_tx_enq(struct worker_data *worker_data)
                        .dequeue_depth = cdata.worker_cq_depth,
                        .enqueue_depth = 64,
                        .new_event_threshold = 4096,
+                       .event_port_cfg = RTE_EVENT_PORT_CFG_HINT_WORKER,
        };
        struct rte_event_queue_conf wkr_q_conf = {
                        .schedule_type = cdata.queue_type,
@@ -603,6 +630,129 @@ service_rx_adapter(void *arg)
        return 0;
 }
 
+/*
+ * Initializes a given port using global settings and with the RX buffers
+ * coming from the mbuf_pool passed as a parameter.
+ */
+static inline int
+port_init(uint8_t port, struct rte_mempool *mbuf_pool)
+{
+       struct rte_eth_rxconf rx_conf;
+       static const struct rte_eth_conf port_conf_default = {
+               .rxmode = {
+                       .mq_mode = RTE_ETH_MQ_RX_RSS,
+               },
+               .rx_adv_conf = {
+                       .rss_conf = {
+                               .rss_hf = RTE_ETH_RSS_IP |
+                                         RTE_ETH_RSS_TCP |
+                                         RTE_ETH_RSS_UDP,
+                       }
+               }
+       };
+       const uint16_t rx_rings = 1, tx_rings = 1;
+       const uint16_t rx_ring_size = 512, tx_ring_size = 512;
+       struct rte_eth_conf port_conf = port_conf_default;
+       int retval;
+       uint16_t q;
+       struct rte_eth_dev_info dev_info;
+       struct rte_eth_txconf txconf;
+
+       if (!rte_eth_dev_is_valid_port(port))
+               return -1;
+
+       retval = rte_eth_dev_info_get(port, &dev_info);
+       if (retval != 0) {
+               printf("Error during getting device (port %u) info: %s\n",
+                               port, strerror(-retval));
+               return retval;
+       }
+
+       if (dev_info.tx_offload_capa & RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE)
+               port_conf.txmode.offloads |=
+                       RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE;
+       rx_conf = dev_info.default_rxconf;
+       rx_conf.offloads = port_conf.rxmode.offloads;
+
+       port_conf.rx_adv_conf.rss_conf.rss_hf &=
+               dev_info.flow_type_rss_offloads;
+       if (port_conf.rx_adv_conf.rss_conf.rss_hf !=
+                       port_conf_default.rx_adv_conf.rss_conf.rss_hf) {
+               printf("Port %u modified RSS hash function based on hardware support,"
+                       "requested:%#"PRIx64" configured:%#"PRIx64"\n",
+                       port,
+                       port_conf_default.rx_adv_conf.rss_conf.rss_hf,
+                       port_conf.rx_adv_conf.rss_conf.rss_hf);
+       }
+
+       /* Configure the Ethernet device. */
+       retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf);
+       if (retval != 0)
+               return retval;
+
+       /* Allocate and set up 1 RX queue per Ethernet port. */
+       for (q = 0; q < rx_rings; q++) {
+               retval = rte_eth_rx_queue_setup(port, q, rx_ring_size,
+                               rte_eth_dev_socket_id(port), &rx_conf,
+                               mbuf_pool);
+               if (retval < 0)
+                       return retval;
+       }
+
+       txconf = dev_info.default_txconf;
+       txconf.offloads = port_conf_default.txmode.offloads;
+       /* Allocate and set up 1 TX queue per Ethernet port. */
+       for (q = 0; q < tx_rings; q++) {
+               retval = rte_eth_tx_queue_setup(port, q, tx_ring_size,
+                               rte_eth_dev_socket_id(port), &txconf);
+               if (retval < 0)
+                       return retval;
+       }
+
+       /* Display the port MAC address. */
+       struct rte_ether_addr addr;
+       retval = rte_eth_macaddr_get(port, &addr);
+       if (retval != 0) {
+               printf("Failed to get MAC address (port %u): %s\n",
+                               port, rte_strerror(-retval));
+               return retval;
+       }
+
+       printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
+                       " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
+                       (unsigned int)port, RTE_ETHER_ADDR_BYTES(&addr));
+
+       /* Enable RX in promiscuous mode for the Ethernet device. */
+       retval = rte_eth_promiscuous_enable(port);
+       if (retval != 0)
+               return retval;
+
+       return 0;
+}
+
+static int
+init_ports(uint16_t num_ports)
+{
+       uint16_t portid;
+
+       if (!cdata.num_mbuf)
+               cdata.num_mbuf = 16384 * num_ports;
+
+       struct rte_mempool *mp = rte_pktmbuf_pool_create("packet_pool",
+                       /* mbufs */ cdata.num_mbuf,
+                       /* cache_size */ 512,
+                       /* priv_size*/ 0,
+                       /* data_room_size */ RTE_MBUF_DEFAULT_BUF_SIZE,
+                       rte_socket_id());
+
+       RTE_ETH_FOREACH_DEV(portid)
+               if (port_init(portid, mp) != 0)
+                       rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu16 "\n",
+                                       portid);
+
+       return 0;
+}
+
 static void
 init_adapters(uint16_t nb_ports)
 {
@@ -619,8 +769,10 @@ init_adapters(uint16_t nb_ports)
                .dequeue_depth = cdata.worker_cq_depth,
                .enqueue_depth = 64,
                .new_event_threshold = 4096,
+               .event_port_cfg = RTE_EVENT_PORT_CFG_HINT_PRODUCER,
        };
 
+       init_ports(nb_ports);
        if (adptr_p_conf.new_event_threshold > dev_info.max_num_events)
                adptr_p_conf.new_event_threshold = dev_info.max_num_events;
        if (adptr_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)