raw/ifpga: remove virtual devices on close
[dpdk.git] / examples / eventdev_pipeline / pipeline_worker_tx.c
index 473940f..a82e064 100644 (file)
@@ -18,21 +18,22 @@ static __rte_always_inline void
 worker_event_enqueue(const uint8_t dev, const uint8_t port,
                struct rte_event *ev)
 {
-       while (rte_event_enqueue_burst(dev, port, ev, 1) != 1)
+       while (!rte_event_enqueue_burst(dev, port, ev, 1) && !fdata->done)
                rte_pause();
 }
 
-static __rte_always_inline void
+static __rte_always_inline uint16_t
 worker_event_enqueue_burst(const uint8_t dev, const uint8_t port,
-               struct rte_event *ev, const uint16_t nb_rx)
+                          struct rte_event *ev, const uint16_t nb_rx)
 {
        uint16_t enq;
 
        enq = rte_event_enqueue_burst(dev, port, ev, nb_rx);
-       while (enq < nb_rx) {
+       while (enq < nb_rx && !fdata->done)
                enq += rte_event_enqueue_burst(dev, port,
                                                ev + enq, nb_rx - enq);
-       }
+
+       return enq;
 }
 
 static __rte_always_inline void
@@ -40,7 +41,8 @@ worker_tx_pkt(const uint8_t dev, const uint8_t port, struct rte_event *ev)
 {
        exchange_mac(ev->mbuf);
        rte_event_eth_tx_adapter_txq_set(ev->mbuf, 0);
-       while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0))
+       while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0) &&
+              !fdata->done)
                rte_pause();
 }
 
@@ -76,6 +78,11 @@ worker_do_tx_single(void *arg)
                }
        }
 
+       if (ev.u64) {
+               ev.op = RTE_EVENT_OP_RELEASE;
+               rte_event_enqueue_burst(dev, port, &ev, 1);
+       }
+
        if (!cdata.quiet)
                printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
                                rte_lcore_id(), received, fwd, tx);
@@ -111,6 +118,11 @@ worker_do_tx_single_atq(void *arg)
                }
        }
 
+       if (ev.u64) {
+               ev.op = RTE_EVENT_OP_RELEASE;
+               rte_event_enqueue_burst(dev, port, &ev, 1);
+       }
+
        if (!cdata.quiet)
                printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
                                rte_lcore_id(), received, fwd, tx);
@@ -126,11 +138,10 @@ worker_do_tx_single_burst(void *arg)
        const uint8_t dev = data->dev_id;
        const uint8_t port = data->port_id;
        size_t fwd = 0, received = 0, tx = 0;
+       uint16_t nb_tx = 0, nb_rx = 0, i;
 
        while (!fdata->done) {
-               uint16_t i;
-               uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
-                               BATCH_SIZE, 0);
+               nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0);
 
                if (!nb_rx) {
                        rte_pause();
@@ -153,10 +164,12 @@ worker_do_tx_single_burst(void *arg)
                        work();
                }
 
-               worker_event_enqueue_burst(dev, port, ev, nb_rx);
-               fwd += nb_rx;
+               nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx);
+               fwd += nb_tx;
        }
 
+       worker_cleanup(dev, port, ev, nb_tx, nb_rx);
+
        if (!cdata.quiet)
                printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
                                rte_lcore_id(), received, fwd, tx);
@@ -172,11 +185,10 @@ worker_do_tx_single_burst_atq(void *arg)
        const uint8_t dev = data->dev_id;
        const uint8_t port = data->port_id;
        size_t fwd = 0, received = 0, tx = 0;
+       uint16_t i, nb_rx = 0, nb_tx = 0;
 
        while (!fdata->done) {
-               uint16_t i;
-               uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
-                               BATCH_SIZE, 0);
+               nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0);
 
                if (!nb_rx) {
                        rte_pause();
@@ -197,10 +209,12 @@ worker_do_tx_single_burst_atq(void *arg)
                        work();
                }
 
-               worker_event_enqueue_burst(dev, port, ev, nb_rx);
-               fwd += nb_rx;
+               nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx);
+               fwd += nb_tx;
        }
 
+       worker_cleanup(dev, port, ev, nb_tx, nb_rx);
+
        if (!cdata.quiet)
                printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
                                rte_lcore_id(), received, fwd, tx);
@@ -251,6 +265,11 @@ worker_do_tx(void *arg)
                fwd++;
        }
 
+       if (ev.u64) {
+               ev.op = RTE_EVENT_OP_RELEASE;
+               rte_event_enqueue_burst(dev, port, &ev, 1);
+       }
+
        if (!cdata.quiet)
                printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
                                rte_lcore_id(), received, fwd, tx);
@@ -297,6 +316,11 @@ worker_do_tx_atq(void *arg)
                fwd++;
        }
 
+       if (ev.u64) {
+               ev.op = RTE_EVENT_OP_RELEASE;
+               rte_event_enqueue_burst(dev, port, &ev, 1);
+       }
+
        if (!cdata.quiet)
                printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
                                rte_lcore_id(), received, fwd, tx);
@@ -314,11 +338,10 @@ worker_do_tx_burst(void *arg)
        uint8_t port = data->port_id;
        uint8_t lst_qid = cdata.num_stages - 1;
        size_t fwd = 0, received = 0, tx = 0;
+       uint16_t i, nb_rx = 0, nb_tx = 0;
 
        while (!fdata->done) {
-               uint16_t i;
-               const uint16_t nb_rx = rte_event_dequeue_burst(dev, port,
-                               ev, BATCH_SIZE, 0);
+               nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0);
 
                if (nb_rx == 0) {
                        rte_pause();
@@ -347,11 +370,13 @@ worker_do_tx_burst(void *arg)
                        }
                        work();
                }
-               worker_event_enqueue_burst(dev, port, ev, nb_rx);
 
-               fwd += nb_rx;
+               nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx);
+               fwd += nb_tx;
        }
 
+       worker_cleanup(dev, port, ev, nb_tx, nb_rx);
+
        if (!cdata.quiet)
                printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
                                rte_lcore_id(), received, fwd, tx);
@@ -369,12 +394,10 @@ worker_do_tx_burst_atq(void *arg)
        uint8_t port = data->port_id;
        uint8_t lst_qid = cdata.num_stages - 1;
        size_t fwd = 0, received = 0, tx = 0;
+       uint16_t i, nb_rx = 0, nb_tx = 0;
 
        while (!fdata->done) {
-               uint16_t i;
-
-               const uint16_t nb_rx = rte_event_dequeue_burst(dev, port,
-                               ev, BATCH_SIZE, 0);
+               nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0);
 
                if (nb_rx == 0) {
                        rte_pause();
@@ -402,10 +425,12 @@ worker_do_tx_burst_atq(void *arg)
                        work();
                }
 
-               worker_event_enqueue_burst(dev, port, ev, nb_rx);
-               fwd += nb_rx;
+               nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx);
+               fwd += nb_tx;
        }
 
+       worker_cleanup(dev, port, ev, nb_tx, nb_rx);
+
        if (!cdata.quiet)
                printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
                                rte_lcore_id(), received, fwd, tx);