]> git.droids-corp.org - dpdk.git/commitdiff
app/eventdev: clean up worker state before exit
authorPavan Nikhilesh <pbhagavatula@marvell.com>
Fri, 13 May 2022 16:07:15 +0000 (21:37 +0530)
committerJerin Jacob <jerinj@marvell.com>
Tue, 17 May 2022 14:43:12 +0000 (16:43 +0200)
Event ports are configured to implicitly release the scheduler contexts
currently held in the next call to rte_event_dequeue_burst().
A worker core might still hold a scheduling context during exit, as the
next call to rte_event_dequeue_burst() is never made.
This might lead to deadlock based on the worker exit timing and when
there are very less number of flows.

Add clean up function to release any scheduling contexts held by the
worker by using RTE_EVENT_OP_RELEASE.

Signed-off-by: Pavan Nikhilesh <pbhagavatula@marvell.com>
Acked-by: Jerin Jacob <jerinj@marvell.com>
app/test-eventdev/test_perf_atq.c
app/test-eventdev/test_perf_common.c
app/test-eventdev/test_perf_common.h
app/test-eventdev/test_perf_queue.c
app/test-eventdev/test_pipeline_atq.c
app/test-eventdev/test_pipeline_common.c
app/test-eventdev/test_pipeline_common.h
app/test-eventdev/test_pipeline_queue.c

index bac3ea602fbe39c16f300aaf3083efb0bf9e2593..5a0b190384fca85fa8cbc1091778677161e316e4 100644 (file)
@@ -37,13 +37,14 @@ atq_fwd_event(struct rte_event *const ev, uint8_t *const sched_type_list,
 static int
 perf_atq_worker(void *arg, const int enable_fwd_latency)
 {
-       PERF_WORKER_INIT;
+       uint16_t enq = 0, deq = 0;
        struct rte_event ev;
+       PERF_WORKER_INIT;
 
        while (t->done == false) {
-               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 
-               if (!event) {
+               if (!deq) {
                        rte_pause();
                        continue;
                }
@@ -78,24 +79,29 @@ perf_atq_worker(void *arg, const int enable_fwd_latency)
                                         bufs, sz, cnt);
                } else {
                        atq_fwd_event(&ev, sched_type_list, nb_stages);
-                       while (rte_event_enqueue_burst(dev, port, &ev, 1) != 1)
-                               rte_pause();
+                       do {
+                               enq = rte_event_enqueue_burst(dev, port, &ev,
+                                                             1);
+                       } while (!enq && !t->done);
                }
        }
+
+       perf_worker_cleanup(pool, dev, port, &ev, enq, deq);
+
        return 0;
 }
 
 static int
 perf_atq_worker_burst(void *arg, const int enable_fwd_latency)
 {
-       PERF_WORKER_INIT;
-       uint16_t i;
        /* +1 to avoid prefetch out of array check */
        struct rte_event ev[BURST_SIZE + 1];
+       uint16_t enq = 0, nb_rx = 0;
+       PERF_WORKER_INIT;
+       uint16_t i;
 
        while (t->done == false) {
-               uint16_t const nb_rx = rte_event_dequeue_burst(dev, port, ev,
-                               BURST_SIZE, 0);
+               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 
                if (!nb_rx) {
                        rte_pause();
@@ -146,14 +152,15 @@ perf_atq_worker_burst(void *arg, const int enable_fwd_latency)
                        }
                }
 
-               uint16_t enq;
-
                enq = rte_event_enqueue_burst(dev, port, ev, nb_rx);
-               while (enq < nb_rx) {
+               while ((enq < nb_rx) && !t->done) {
                        enq += rte_event_enqueue_burst(dev, port,
                                                        ev + enq, nb_rx - enq);
                }
        }
+
+       perf_worker_cleanup(pool, dev, port, ev, enq, nb_rx);
+
        return 0;
 }
 
index 4cf16b4267786632535b2156111ab28deaee8184..b51a1004256b6b45d4b8326541515aabec0d1b12 100644 (file)
@@ -985,6 +985,23 @@ perf_opt_dump(struct evt_options *opt, uint8_t nb_queues)
        evt_dump("prod_enq_burst_sz", "%d", opt->prod_enq_burst_sz);
 }
 
+void
+perf_worker_cleanup(struct rte_mempool *const pool, uint8_t dev_id,
+                   uint8_t port_id, struct rte_event events[], uint16_t nb_enq,
+                   uint16_t nb_deq)
+{
+       int i;
+
+       if (nb_deq) {
+               for (i = nb_enq; i < nb_deq; i++)
+                       rte_mempool_put(pool, events[i].event_ptr);
+
+               for (i = 0; i < nb_deq; i++)
+                       events[i].op = RTE_EVENT_OP_RELEASE;
+               rte_event_enqueue_burst(dev_id, port_id, events, nb_deq);
+       }
+}
+
 void
 perf_eventdev_destroy(struct evt_test *test, struct evt_options *opt)
 {
index e504bb1df9bac795d9a9d6b8183c89fd910ed6b9..f6bfc73be0a75c4d84e978b1ff099146717ecd4e 100644 (file)
@@ -184,5 +184,8 @@ void perf_cryptodev_destroy(struct evt_test *test, struct evt_options *opt);
 void perf_ethdev_destroy(struct evt_test *test, struct evt_options *opt);
 void perf_ethdev_rx_stop(struct evt_test *test, struct evt_options *opt);
 void perf_mempool_destroy(struct evt_test *test, struct evt_options *opt);
+void perf_worker_cleanup(struct rte_mempool *const pool, uint8_t dev_id,
+                        uint8_t port_id, struct rte_event events[],
+                        uint16_t nb_enq, uint16_t nb_deq);
 
 #endif /* _TEST_PERF_COMMON_ */
index 108f1742a7239f5372d0f12fb0c244f2eb4d4669..b498cacef6571f6f9b503288d50d69ec7eb67c4c 100644 (file)
@@ -39,13 +39,14 @@ fwd_event(struct rte_event *const ev, uint8_t *const sched_type_list,
 static int
 perf_queue_worker(void *arg, const int enable_fwd_latency)
 {
-       PERF_WORKER_INIT;
+       uint16_t enq = 0, deq = 0;
        struct rte_event ev;
+       PERF_WORKER_INIT;
 
        while (t->done == false) {
-               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 
-               if (!event) {
+               if (!deq) {
                        rte_pause();
                        continue;
                }
@@ -80,24 +81,29 @@ perf_queue_worker(void *arg, const int enable_fwd_latency)
                                        &ev, w, bufs, sz, cnt);
                } else {
                        fwd_event(&ev, sched_type_list, nb_stages);
-                       while (rte_event_enqueue_burst(dev, port, &ev, 1) != 1)
-                               rte_pause();
+                       do {
+                               enq = rte_event_enqueue_burst(dev, port, &ev,
+                                                             1);
+                       } while (!enq && !t->done);
                }
        }
+
+       perf_worker_cleanup(pool, dev, port, &ev, enq, deq);
+
        return 0;
 }
 
 static int
 perf_queue_worker_burst(void *arg, const int enable_fwd_latency)
 {
-       PERF_WORKER_INIT;
-       uint16_t i;
        /* +1 to avoid prefetch out of array check */
        struct rte_event ev[BURST_SIZE + 1];
+       uint16_t enq = 0, nb_rx = 0;
+       PERF_WORKER_INIT;
+       uint16_t i;
 
        while (t->done == false) {
-               uint16_t const nb_rx = rte_event_dequeue_burst(dev, port, ev,
-                               BURST_SIZE, 0);
+               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 
                if (!nb_rx) {
                        rte_pause();
@@ -147,14 +153,16 @@ perf_queue_worker_burst(void *arg, const int enable_fwd_latency)
                        }
                }
 
-               uint16_t enq;
 
                enq = rte_event_enqueue_burst(dev, port, ev, nb_rx);
-               while (enq < nb_rx) {
+               while (enq < nb_rx && !t->done) {
                        enq += rte_event_enqueue_burst(dev, port,
                                                        ev + enq, nb_rx - enq);
                }
        }
+
+       perf_worker_cleanup(pool, dev, port, ev, enq, nb_rx);
+
        return 0;
 }
 
index 79218502ba32902b81a6e4218cb410f6be7264ce..4b101971277ce957e5a3af48e1a9d774d4cfa2c6 100644 (file)
@@ -21,18 +21,20 @@ static __rte_noinline int
 pipeline_atq_worker_single_stage_tx(void *arg)
 {
        PIPELINE_WORKER_SINGLE_STAGE_INIT;
+       uint8_t enq = 0, deq = 0;
 
        while (t->done == false) {
-               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 
-               if (!event) {
+               if (!deq) {
                        rte_pause();
                        continue;
                }
 
-               pipeline_event_tx(dev, port, &ev);
+               deq = pipeline_event_tx(dev, port, &ev, t);
                w->processed_pkts++;
        }
+       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
 
        return 0;
 }
@@ -42,20 +44,22 @@ pipeline_atq_worker_single_stage_fwd(void *arg)
 {
        PIPELINE_WORKER_SINGLE_STAGE_INIT;
        const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint8_t enq = 0, deq = 0;
 
        while (t->done == false) {
-               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 
-               if (!event) {
+               if (!deq) {
                        rte_pause();
                        continue;
                }
 
                ev.queue_id = tx_queue[ev.mbuf->port];
                pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
-               pipeline_event_enqueue(dev, port, &ev);
+               enq = pipeline_event_enqueue(dev, port, &ev, t);
                w->processed_pkts++;
        }
+       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
 
        return 0;
 }
@@ -64,10 +68,10 @@ static __rte_noinline int
 pipeline_atq_worker_single_stage_burst_tx(void *arg)
 {
        PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
+       uint16_t nb_rx = 0, nb_tx = 0;
 
        while (t->done == false) {
-               uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
-                               BURST_SIZE, 0);
+               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 
                if (!nb_rx) {
                        rte_pause();
@@ -79,9 +83,10 @@ pipeline_atq_worker_single_stage_burst_tx(void *arg)
                        rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
                }
 
-               pipeline_event_tx_burst(dev, port, ev, nb_rx);
-               w->processed_pkts += nb_rx;
+               nb_tx = pipeline_event_tx_burst(dev, port, ev, nb_rx, t);
+               w->processed_pkts += nb_tx;
        }
+       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
 
        return 0;
 }
@@ -91,10 +96,10 @@ pipeline_atq_worker_single_stage_burst_fwd(void *arg)
 {
        PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
        const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint16_t nb_rx = 0, nb_tx = 0;
 
        while (t->done == false) {
-               uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
-                               BURST_SIZE, 0);
+               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 
                if (!nb_rx) {
                        rte_pause();
@@ -108,9 +113,10 @@ pipeline_atq_worker_single_stage_burst_fwd(void *arg)
                        pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
                }
 
-               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
-               w->processed_pkts += nb_rx;
+               nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
+               w->processed_pkts += nb_tx;
        }
+       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
 
        return 0;
 }
@@ -119,19 +125,21 @@ static __rte_noinline int
 pipeline_atq_worker_single_stage_tx_vector(void *arg)
 {
        PIPELINE_WORKER_SINGLE_STAGE_INIT;
+       uint8_t enq = 0, deq = 0;
        uint16_t vector_sz;
 
        while (!t->done) {
-               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 
-               if (!event) {
+               if (!deq) {
                        rte_pause();
                        continue;
                }
                vector_sz = ev.vec->nb_elem;
-               pipeline_event_tx_vector(dev, port, &ev);
+               enq = pipeline_event_tx_vector(dev, port, &ev, t);
                w->processed_pkts += vector_sz;
        }
+       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
 
        return 0;
 }
@@ -141,12 +149,13 @@ pipeline_atq_worker_single_stage_fwd_vector(void *arg)
 {
        PIPELINE_WORKER_SINGLE_STAGE_INIT;
        const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint8_t enq = 0, deq = 0;
        uint16_t vector_sz;
 
        while (!t->done) {
-               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 
-               if (!event) {
+               if (!deq) {
                        rte_pause();
                        continue;
                }
@@ -155,9 +164,10 @@ pipeline_atq_worker_single_stage_fwd_vector(void *arg)
                ev.queue_id = tx_queue[ev.vec->port];
                ev.vec->queue = 0;
                pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
-               pipeline_event_enqueue(dev, port, &ev);
+               enq = pipeline_event_enqueue(dev, port, &ev, t);
                w->processed_pkts += vector_sz;
        }
+       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
 
        return 0;
 }
@@ -166,11 +176,11 @@ static __rte_noinline int
 pipeline_atq_worker_single_stage_burst_tx_vector(void *arg)
 {
        PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
+       uint16_t nb_rx = 0, nb_tx = 0;
        uint16_t vector_sz;
 
        while (!t->done) {
-               uint16_t nb_rx =
-                       rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 
                if (!nb_rx) {
                        rte_pause();
@@ -182,9 +192,10 @@ pipeline_atq_worker_single_stage_burst_tx_vector(void *arg)
                        ev[i].vec->queue = 0;
                }
 
-               pipeline_event_tx_burst(dev, port, ev, nb_rx);
+               nb_tx = pipeline_event_tx_burst(dev, port, ev, nb_rx, t);
                w->processed_pkts += vector_sz;
        }
+       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
 
        return 0;
 }
@@ -194,11 +205,11 @@ pipeline_atq_worker_single_stage_burst_fwd_vector(void *arg)
 {
        PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
        const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint16_t nb_rx = 0, nb_tx = 0;
        uint16_t vector_sz;
 
        while (!t->done) {
-               uint16_t nb_rx =
-                       rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 
                if (!nb_rx) {
                        rte_pause();
@@ -214,9 +225,10 @@ pipeline_atq_worker_single_stage_burst_fwd_vector(void *arg)
                                                  RTE_SCHED_TYPE_ATOMIC);
                }
 
-               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+               nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
                w->processed_pkts += vector_sz;
        }
+       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
 
        return 0;
 }
@@ -225,11 +237,12 @@ static __rte_noinline int
 pipeline_atq_worker_multi_stage_tx(void *arg)
 {
        PIPELINE_WORKER_MULTI_STAGE_INIT;
+       uint8_t enq = 0, deq = 0;
 
        while (t->done == false) {
-               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 
-               if (!event) {
+               if (!deq) {
                        rte_pause();
                        continue;
                }
@@ -237,15 +250,16 @@ pipeline_atq_worker_multi_stage_tx(void *arg)
                cq_id = ev.sub_event_type % nb_stages;
 
                if (cq_id == last_queue) {
-                       pipeline_event_tx(dev, port, &ev);
+                       enq = pipeline_event_tx(dev, port, &ev, t);
                        w->processed_pkts++;
                        continue;
                }
 
                ev.sub_event_type++;
                pipeline_fwd_event(&ev, sched_type_list[cq_id]);
-               pipeline_event_enqueue(dev, port, &ev);
+               enq = pipeline_event_enqueue(dev, port, &ev, t);
        }
+       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
 
        return 0;
 }
@@ -255,11 +269,12 @@ pipeline_atq_worker_multi_stage_fwd(void *arg)
 {
        PIPELINE_WORKER_MULTI_STAGE_INIT;
        const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint8_t enq = 0, deq = 0;
 
        while (t->done == false) {
-               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 
-               if (!event) {
+               if (!deq) {
                        rte_pause();
                        continue;
                }
@@ -275,8 +290,9 @@ pipeline_atq_worker_multi_stage_fwd(void *arg)
                        pipeline_fwd_event(&ev, sched_type_list[cq_id]);
                }
 
-               pipeline_event_enqueue(dev, port, &ev);
+               enq = pipeline_event_enqueue(dev, port, &ev, t);
        }
+       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
 
        return 0;
 }
@@ -285,10 +301,10 @@ static __rte_noinline int
 pipeline_atq_worker_multi_stage_burst_tx(void *arg)
 {
        PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
+       uint16_t nb_rx = 0, nb_tx = 0;
 
        while (t->done == false) {
-               uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
-                               BURST_SIZE, 0);
+               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 
                if (!nb_rx) {
                        rte_pause();
@@ -300,7 +316,7 @@ pipeline_atq_worker_multi_stage_burst_tx(void *arg)
                        cq_id = ev[i].sub_event_type % nb_stages;
 
                        if (cq_id == last_queue) {
-                               pipeline_event_tx(dev, port, &ev[i]);
+                               pipeline_event_tx(dev, port, &ev[i], t);
                                ev[i].op = RTE_EVENT_OP_RELEASE;
                                w->processed_pkts++;
                                continue;
@@ -310,8 +326,9 @@ pipeline_atq_worker_multi_stage_burst_tx(void *arg)
                        pipeline_fwd_event(&ev[i], sched_type_list[cq_id]);
                }
 
-               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+               nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
        }
+       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
 
        return 0;
 }
@@ -321,10 +338,10 @@ pipeline_atq_worker_multi_stage_burst_fwd(void *arg)
 {
        PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
        const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint16_t nb_rx = 0, nb_tx = 0;
 
        while (t->done == false) {
-               uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
-                               BURST_SIZE, 0);
+               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 
                if (!nb_rx) {
                        rte_pause();
@@ -347,8 +364,9 @@ pipeline_atq_worker_multi_stage_burst_fwd(void *arg)
                        }
                }
 
-               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+               nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
        }
+       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
 
        return 0;
 }
@@ -357,12 +375,13 @@ static __rte_noinline int
 pipeline_atq_worker_multi_stage_tx_vector(void *arg)
 {
        PIPELINE_WORKER_MULTI_STAGE_INIT;
+       uint8_t enq = 0, deq = 0;
        uint16_t vector_sz;
 
        while (!t->done) {
-               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 
-               if (!event) {
+               if (!deq) {
                        rte_pause();
                        continue;
                }
@@ -371,15 +390,16 @@ pipeline_atq_worker_multi_stage_tx_vector(void *arg)
 
                if (cq_id == last_queue) {
                        vector_sz = ev.vec->nb_elem;
-                       pipeline_event_tx_vector(dev, port, &ev);
+                       enq = pipeline_event_tx_vector(dev, port, &ev, t);
                        w->processed_pkts += vector_sz;
                        continue;
                }
 
                ev.sub_event_type++;
                pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
-               pipeline_event_enqueue(dev, port, &ev);
+               enq = pipeline_event_enqueue(dev, port, &ev, t);
        }
+       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
 
        return 0;
 }
@@ -389,12 +409,13 @@ pipeline_atq_worker_multi_stage_fwd_vector(void *arg)
 {
        PIPELINE_WORKER_MULTI_STAGE_INIT;
        const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint8_t enq = 0, deq = 0;
        uint16_t vector_sz;
 
        while (!t->done) {
-               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 
-               if (!event) {
+               if (!deq) {
                        rte_pause();
                        continue;
                }
@@ -406,14 +427,15 @@ pipeline_atq_worker_multi_stage_fwd_vector(void *arg)
                        ev.vec->queue = 0;
                        vector_sz = ev.vec->nb_elem;
                        pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
-                       pipeline_event_enqueue(dev, port, &ev);
+                       enq = pipeline_event_enqueue(dev, port, &ev, t);
                        w->processed_pkts += vector_sz;
                } else {
                        ev.sub_event_type++;
                        pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
-                       pipeline_event_enqueue(dev, port, &ev);
+                       enq = pipeline_event_enqueue(dev, port, &ev, t);
                }
        }
+       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
 
        return 0;
 }
@@ -422,11 +444,11 @@ static __rte_noinline int
 pipeline_atq_worker_multi_stage_burst_tx_vector(void *arg)
 {
        PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
+       uint16_t nb_rx = 0, nb_tx = 0;
        uint16_t vector_sz;
 
        while (!t->done) {
-               uint16_t nb_rx =
-                       rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 
                if (!nb_rx) {
                        rte_pause();
@@ -438,7 +460,7 @@ pipeline_atq_worker_multi_stage_burst_tx_vector(void *arg)
 
                        if (cq_id == last_queue) {
                                vector_sz = ev[i].vec->nb_elem;
-                               pipeline_event_tx_vector(dev, port, &ev[i]);
+                               pipeline_event_tx_vector(dev, port, &ev[i], t);
                                ev[i].op = RTE_EVENT_OP_RELEASE;
                                w->processed_pkts += vector_sz;
                                continue;
@@ -449,8 +471,9 @@ pipeline_atq_worker_multi_stage_burst_tx_vector(void *arg)
                                                  sched_type_list[cq_id]);
                }
 
-               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+               nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
        }
+       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
 
        return 0;
 }
@@ -460,11 +483,11 @@ pipeline_atq_worker_multi_stage_burst_fwd_vector(void *arg)
 {
        PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
        const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint16_t nb_rx = 0, nb_tx = 0;
        uint16_t vector_sz;
 
        while (!t->done) {
-               uint16_t nb_rx =
-                       rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 
                if (!nb_rx) {
                        rte_pause();
@@ -488,8 +511,9 @@ pipeline_atq_worker_multi_stage_burst_fwd_vector(void *arg)
                        }
                }
 
-               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+               nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
        }
+       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
 
        return 0;
 }
index 29b64014d772dfd99f1939a43ebaa2736baa8aed..d8e80903b2152305950ce04c1e8fb884fe117e8e 100644 (file)
@@ -505,6 +505,45 @@ pipeline_event_tx_adapter_setup(struct evt_options *opt,
        return ret;
 }
 
+static void
+pipeline_vector_array_free(struct rte_event events[], uint16_t num)
+{
+       uint16_t i;
+
+       for (i = 0; i < num; i++) {
+               rte_pktmbuf_free_bulk(events[i].vec->mbufs,
+                                     events[i].vec->nb_elem);
+               rte_mempool_put(rte_mempool_from_obj(events[i].vec),
+                               events[i].vec);
+       }
+}
+
+void
+pipeline_worker_cleanup(uint8_t dev, uint8_t port, struct rte_event ev[],
+                       uint16_t enq, uint16_t deq)
+{
+       int i;
+
+       if (!(deq - enq))
+               return;
+
+       if (deq) {
+               for (i = enq; i < deq; i++) {
+                       if (ev[i].op == RTE_EVENT_OP_RELEASE)
+                               continue;
+                       if (ev[i].event_type & RTE_EVENT_TYPE_VECTOR)
+                               pipeline_vector_array_free(&ev[i], 1);
+                       else
+                               rte_pktmbuf_free(ev[i].mbuf);
+               }
+
+               for (i = 0; i < deq; i++)
+                       ev[i].op = RTE_EVENT_OP_RELEASE;
+
+               rte_event_enqueue_burst(dev, port, ev, deq);
+       }
+}
+
 void
 pipeline_ethdev_rx_stop(struct evt_test *test, struct evt_options *opt)
 {
index c979c33772fa8e5d9866aca5fc1da186d6b65e0b..a6443faea45ba1d5b0aba11def5502c107b02073 100644 (file)
@@ -109,59 +109,80 @@ pipeline_fwd_event_vector(struct rte_event *ev, uint8_t sched)
        ev->sched_type = sched;
 }
 
-static __rte_always_inline void
+static __rte_always_inline uint8_t
 pipeline_event_tx(const uint8_t dev, const uint8_t port,
-               struct rte_event * const ev)
+                 struct rte_event *const ev, struct test_pipeline *t)
 {
+       uint8_t enq;
+
        rte_event_eth_tx_adapter_txq_set(ev->mbuf, 0);
-       while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0))
-               rte_pause();
+       do {
+               enq = rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0);
+       } while (!enq && !t->done);
+
+       return enq;
 }
 
-static __rte_always_inline void
+static __rte_always_inline uint8_t
 pipeline_event_tx_vector(const uint8_t dev, const uint8_t port,
-                        struct rte_event *const ev)
+                        struct rte_event *const ev, struct test_pipeline *t)
 {
+       uint8_t enq;
+
        ev->vec->queue = 0;
+       do {
+               enq = rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0);
+       } while (!enq && !t->done);
 
-       while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0))
-               rte_pause();
+       return enq;
 }
 
-static __rte_always_inline void
+static __rte_always_inline uint16_t
 pipeline_event_tx_burst(const uint8_t dev, const uint8_t port,
-               struct rte_event *ev, const uint16_t nb_rx)
+                       struct rte_event *ev, const uint16_t nb_rx,
+                       struct test_pipeline *t)
 {
        uint16_t enq;
 
        enq = rte_event_eth_tx_adapter_enqueue(dev, port, ev, nb_rx, 0);
-       while (enq < nb_rx) {
+       while (enq < nb_rx && !t->done) {
                enq += rte_event_eth_tx_adapter_enqueue(dev, port,
                                ev + enq, nb_rx - enq, 0);
        }
+
+       return enq;
 }
 
-static __rte_always_inline void
+static __rte_always_inline uint8_t
 pipeline_event_enqueue(const uint8_t dev, const uint8_t port,
-               struct rte_event *ev)
+                      struct rte_event *ev, struct test_pipeline *t)
 {
-       while (rte_event_enqueue_burst(dev, port, ev, 1) != 1)
-               rte_pause();
+       uint8_t enq;
+
+       do {
+               enq = rte_event_enqueue_burst(dev, port, ev, 1);
+       } while (!enq && !t->done);
+
+       return enq;
 }
 
-static __rte_always_inline void
+static __rte_always_inline uint16_t
 pipeline_event_enqueue_burst(const uint8_t dev, const uint8_t port,
-               struct rte_event *ev, const uint16_t nb_rx)
+                            struct rte_event *ev, const uint16_t nb_rx,
+                            struct test_pipeline *t)
 {
        uint16_t enq;
 
        enq = rte_event_enqueue_burst(dev, port, ev, nb_rx);
-       while (enq < nb_rx) {
+       while (enq < nb_rx && !t->done) {
                enq += rte_event_enqueue_burst(dev, port,
                                                ev + enq, nb_rx - enq);
        }
+
+       return enq;
 }
 
+
 static inline int
 pipeline_nb_event_ports(struct evt_options *opt)
 {
@@ -188,5 +209,7 @@ void pipeline_eventdev_destroy(struct evt_test *test, struct evt_options *opt);
 void pipeline_ethdev_destroy(struct evt_test *test, struct evt_options *opt);
 void pipeline_ethdev_rx_stop(struct evt_test *test, struct evt_options *opt);
 void pipeline_mempool_destroy(struct evt_test *test, struct evt_options *opt);
+void pipeline_worker_cleanup(uint8_t dev, uint8_t port, struct rte_event ev[],
+                            uint16_t enq, uint16_t deq);
 
 #endif /* _TEST_PIPELINE_COMMON_ */
index 343f8f3b1d52e1c33131ca6db1a12cd477b01e4a..e989396474be677468eb8457336932ad115eb7fd 100644 (file)
@@ -21,24 +21,27 @@ static __rte_noinline int
 pipeline_queue_worker_single_stage_tx(void *arg)
 {
        PIPELINE_WORKER_SINGLE_STAGE_INIT;
+       uint8_t enq = 0, deq = 0;
 
        while (t->done == false) {
-               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 
-               if (!event) {
+               if (!deq) {
                        rte_pause();
                        continue;
                }
 
                if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
-                       pipeline_event_tx(dev, port, &ev);
+                       enq = pipeline_event_tx(dev, port, &ev, t);
+                       ev.op = RTE_EVENT_OP_RELEASE;
                        w->processed_pkts++;
                } else {
                        ev.queue_id++;
                        pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
-                       pipeline_event_enqueue(dev, port, &ev);
+                       enq = pipeline_event_enqueue(dev, port, &ev, t);
                }
        }
+       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
 
        return 0;
 }
@@ -48,11 +51,12 @@ pipeline_queue_worker_single_stage_fwd(void *arg)
 {
        PIPELINE_WORKER_SINGLE_STAGE_INIT;
        const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint8_t enq = 0, deq = 0;
 
        while (t->done == false) {
-               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 
-               if (!event) {
+               if (!deq) {
                        rte_pause();
                        continue;
                }
@@ -60,9 +64,10 @@ pipeline_queue_worker_single_stage_fwd(void *arg)
                ev.queue_id = tx_queue[ev.mbuf->port];
                rte_event_eth_tx_adapter_txq_set(ev.mbuf, 0);
                pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
-               pipeline_event_enqueue(dev, port, &ev);
+               enq = pipeline_event_enqueue(dev, port, &ev, t);
                w->processed_pkts++;
        }
+       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
 
        return 0;
 }
@@ -71,10 +76,10 @@ static __rte_noinline int
 pipeline_queue_worker_single_stage_burst_tx(void *arg)
 {
        PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
+       uint16_t nb_rx = 0, nb_tx = 0;
 
        while (t->done == false) {
-               uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
-                               BURST_SIZE, 0);
+               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 
                if (!nb_rx) {
                        rte_pause();
@@ -84,17 +89,18 @@ pipeline_queue_worker_single_stage_burst_tx(void *arg)
                for (i = 0; i < nb_rx; i++) {
                        rte_prefetch0(ev[i + 1].mbuf);
                        if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
-                               pipeline_event_tx(dev, port, &ev[i]);
+                               pipeline_event_tx(dev, port, &ev[i], t);
+                               ev[i].op = RTE_EVENT_OP_RELEASE;
                                w->processed_pkts++;
                        } else {
                                ev[i].queue_id++;
                                pipeline_fwd_event(&ev[i],
                                                RTE_SCHED_TYPE_ATOMIC);
-                               pipeline_event_enqueue_burst(dev, port, ev,
-                                               nb_rx);
                        }
                }
+               nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
        }
+       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
 
        return 0;
 }
@@ -104,10 +110,10 @@ pipeline_queue_worker_single_stage_burst_fwd(void *arg)
 {
        PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
        const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint16_t nb_rx = 0, nb_tx = 0;
 
        while (t->done == false) {
-               uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
-                               BURST_SIZE, 0);
+               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 
                if (!nb_rx) {
                        rte_pause();
@@ -121,9 +127,10 @@ pipeline_queue_worker_single_stage_burst_fwd(void *arg)
                        pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
                }
 
-               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+               nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
                w->processed_pkts += nb_rx;
        }
+       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
 
        return 0;
 }
@@ -132,26 +139,29 @@ static __rte_noinline int
 pipeline_queue_worker_single_stage_tx_vector(void *arg)
 {
        PIPELINE_WORKER_SINGLE_STAGE_INIT;
+       uint8_t enq = 0, deq = 0;
        uint16_t vector_sz;
 
        while (!t->done) {
-               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 
-               if (!event) {
+               if (!deq) {
                        rte_pause();
                        continue;
                }
 
                if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
                        vector_sz = ev.vec->nb_elem;
-                       pipeline_event_tx_vector(dev, port, &ev);
+                       enq = pipeline_event_tx_vector(dev, port, &ev, t);
+                       ev.op = RTE_EVENT_OP_RELEASE;
                        w->processed_pkts += vector_sz;
                } else {
                        ev.queue_id++;
                        pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
-                       pipeline_event_enqueue(dev, port, &ev);
+                       enq = pipeline_event_enqueue(dev, port, &ev, t);
                }
        }
+       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
 
        return 0;
 }
@@ -161,12 +171,13 @@ pipeline_queue_worker_single_stage_fwd_vector(void *arg)
 {
        PIPELINE_WORKER_SINGLE_STAGE_INIT;
        const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint8_t enq = 0, deq = 0;
        uint16_t vector_sz;
 
        while (!t->done) {
-               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 
-               if (!event) {
+               if (!deq) {
                        rte_pause();
                        continue;
                }
@@ -175,9 +186,10 @@ pipeline_queue_worker_single_stage_fwd_vector(void *arg)
                ev.vec->queue = 0;
                vector_sz = ev.vec->nb_elem;
                pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
-               pipeline_event_enqueue(dev, port, &ev);
+               enq = pipeline_event_enqueue(dev, port, &ev, t);
                w->processed_pkts += vector_sz;
        }
+       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
 
        return 0;
 }
@@ -186,11 +198,11 @@ static __rte_noinline int
 pipeline_queue_worker_single_stage_burst_tx_vector(void *arg)
 {
        PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
+       uint16_t nb_rx = 0, nb_tx = 0;
        uint16_t vector_sz;
 
        while (!t->done) {
-               uint16_t nb_rx =
-                       rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 
                if (!nb_rx) {
                        rte_pause();
@@ -200,7 +212,7 @@ pipeline_queue_worker_single_stage_burst_tx_vector(void *arg)
                for (i = 0; i < nb_rx; i++) {
                        if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
                                vector_sz = ev[i].vec->nb_elem;
-                               pipeline_event_tx_vector(dev, port, &ev[i]);
+                               pipeline_event_tx_vector(dev, port, &ev[i], t);
                                ev[i].op = RTE_EVENT_OP_RELEASE;
                                w->processed_pkts += vector_sz;
                        } else {
@@ -210,8 +222,9 @@ pipeline_queue_worker_single_stage_burst_tx_vector(void *arg)
                        }
                }
 
-               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+               nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
        }
+       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
 
        return 0;
 }
@@ -221,11 +234,11 @@ pipeline_queue_worker_single_stage_burst_fwd_vector(void *arg)
 {
        PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
        const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint16_t nb_rx = 0, nb_tx = 0;
        uint16_t vector_sz;
 
        while (!t->done) {
-               uint16_t nb_rx =
-                       rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 
                if (!nb_rx) {
                        rte_pause();
@@ -241,9 +254,10 @@ pipeline_queue_worker_single_stage_burst_fwd_vector(void *arg)
                                                  RTE_SCHED_TYPE_ATOMIC);
                }
 
-               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+               nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
                w->processed_pkts += vector_sz;
        }
+       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
 
        return 0;
 }
@@ -253,11 +267,12 @@ pipeline_queue_worker_multi_stage_tx(void *arg)
 {
        PIPELINE_WORKER_MULTI_STAGE_INIT;
        const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint8_t enq = 0, deq = 0;
 
        while (t->done == false) {
-               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 
-               if (!event) {
+               if (!deq) {
                        rte_pause();
                        continue;
                }
@@ -265,7 +280,8 @@ pipeline_queue_worker_multi_stage_tx(void *arg)
                cq_id = ev.queue_id % nb_stages;
 
                if (ev.queue_id == tx_queue[ev.mbuf->port]) {
-                       pipeline_event_tx(dev, port, &ev);
+                       enq = pipeline_event_tx(dev, port, &ev, t);
+                       ev.op = RTE_EVENT_OP_RELEASE;
                        w->processed_pkts++;
                        continue;
                }
@@ -274,8 +290,9 @@ pipeline_queue_worker_multi_stage_tx(void *arg)
                pipeline_fwd_event(&ev, cq_id != last_queue ?
                                sched_type_list[cq_id] :
                                RTE_SCHED_TYPE_ATOMIC);
-               pipeline_event_enqueue(dev, port, &ev);
+               enq = pipeline_event_enqueue(dev, port, &ev, t);
        }
+       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
 
        return 0;
 }
@@ -285,11 +302,12 @@ pipeline_queue_worker_multi_stage_fwd(void *arg)
 {
        PIPELINE_WORKER_MULTI_STAGE_INIT;
        const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint8_t enq = 0, deq = 0;
 
        while (t->done == false) {
-               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 
-               if (!event) {
+               if (!deq) {
                        rte_pause();
                        continue;
                }
@@ -300,14 +318,15 @@ pipeline_queue_worker_multi_stage_fwd(void *arg)
                        ev.queue_id = tx_queue[ev.mbuf->port];
                        rte_event_eth_tx_adapter_txq_set(ev.mbuf, 0);
                        pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
-                       pipeline_event_enqueue(dev, port, &ev);
+                       enq = pipeline_event_enqueue(dev, port, &ev, t);
                        w->processed_pkts++;
                } else {
                        ev.queue_id++;
                        pipeline_fwd_event(&ev, sched_type_list[cq_id]);
-                       pipeline_event_enqueue(dev, port, &ev);
+                       enq = pipeline_event_enqueue(dev, port, &ev, t);
                }
        }
+       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
 
        return 0;
 }
@@ -317,10 +336,10 @@ pipeline_queue_worker_multi_stage_burst_tx(void *arg)
 {
        PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
        const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint16_t nb_rx = 0, nb_tx = 0;
 
        while (t->done == false) {
-               uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
-                               BURST_SIZE, 0);
+               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 
                if (!nb_rx) {
                        rte_pause();
@@ -332,7 +351,8 @@ pipeline_queue_worker_multi_stage_burst_tx(void *arg)
                        cq_id = ev[i].queue_id % nb_stages;
 
                        if (ev[i].queue_id == tx_queue[ev[i].mbuf->port]) {
-                               pipeline_event_tx(dev, port, &ev[i]);
+                               pipeline_event_tx(dev, port, &ev[i], t);
+                               ev[i].op = RTE_EVENT_OP_RELEASE;
                                w->processed_pkts++;
                                continue;
                        }
@@ -341,9 +361,10 @@ pipeline_queue_worker_multi_stage_burst_tx(void *arg)
                        pipeline_fwd_event(&ev[i], cq_id != last_queue ?
                                        sched_type_list[cq_id] :
                                        RTE_SCHED_TYPE_ATOMIC);
-                       pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
                }
+               nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
        }
+       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
 
        return 0;
 }
@@ -353,11 +374,11 @@ pipeline_queue_worker_multi_stage_burst_fwd(void *arg)
 {
        PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
        const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint16_t nb_rx = 0, nb_tx = 0;
 
        while (t->done == false) {
                uint16_t processed_pkts = 0;
-               uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
-                               BURST_SIZE, 0);
+               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 
                if (!nb_rx) {
                        rte_pause();
@@ -381,9 +402,10 @@ pipeline_queue_worker_multi_stage_burst_fwd(void *arg)
                        }
                }
 
-               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+               nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
                w->processed_pkts += processed_pkts;
        }
+       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
 
        return 0;
 }
@@ -393,12 +415,13 @@ pipeline_queue_worker_multi_stage_tx_vector(void *arg)
 {
        PIPELINE_WORKER_MULTI_STAGE_INIT;
        const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint8_t enq = 0, deq = 0;
        uint16_t vector_sz;
 
        while (!t->done) {
-               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 
-               if (!event) {
+               if (!deq) {
                        rte_pause();
                        continue;
                }
@@ -407,8 +430,9 @@ pipeline_queue_worker_multi_stage_tx_vector(void *arg)
 
                if (ev.queue_id == tx_queue[ev.vec->port]) {
                        vector_sz = ev.vec->nb_elem;
-                       pipeline_event_tx_vector(dev, port, &ev);
+                       enq = pipeline_event_tx_vector(dev, port, &ev, t);
                        w->processed_pkts += vector_sz;
+                       ev.op = RTE_EVENT_OP_RELEASE;
                        continue;
                }
 
@@ -416,8 +440,9 @@ pipeline_queue_worker_multi_stage_tx_vector(void *arg)
                pipeline_fwd_event_vector(&ev, cq_id != last_queue
                                                       ? sched_type_list[cq_id]
                                                       : RTE_SCHED_TYPE_ATOMIC);
-               pipeline_event_enqueue(dev, port, &ev);
+               enq = pipeline_event_enqueue(dev, port, &ev, t);
        }
+       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
 
        return 0;
 }
@@ -427,12 +452,13 @@ pipeline_queue_worker_multi_stage_fwd_vector(void *arg)
 {
        PIPELINE_WORKER_MULTI_STAGE_INIT;
        const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint8_t enq = 0, deq = 0;
        uint16_t vector_sz;
 
        while (!t->done) {
-               uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
+               deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
 
-               if (!event) {
+               if (!deq) {
                        rte_pause();
                        continue;
                }
@@ -449,8 +475,9 @@ pipeline_queue_worker_multi_stage_fwd_vector(void *arg)
                        pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
                }
 
-               pipeline_event_enqueue(dev, port, &ev);
+               enq = pipeline_event_enqueue(dev, port, &ev, t);
        }
+       pipeline_worker_cleanup(dev, port, &ev, enq, deq);
 
        return 0;
 }
@@ -460,11 +487,11 @@ pipeline_queue_worker_multi_stage_burst_tx_vector(void *arg)
 {
        PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
        const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint16_t nb_rx = 0, nb_tx = 0;
        uint16_t vector_sz;
 
        while (!t->done) {
-               uint16_t nb_rx =
-                       rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 
                if (!nb_rx) {
                        rte_pause();
@@ -476,7 +503,7 @@ pipeline_queue_worker_multi_stage_burst_tx_vector(void *arg)
 
                        if (ev[i].queue_id == tx_queue[ev[i].vec->port]) {
                                vector_sz = ev[i].vec->nb_elem;
-                               pipeline_event_tx_vector(dev, port, &ev[i]);
+                               pipeline_event_tx_vector(dev, port, &ev[i], t);
                                ev[i].op = RTE_EVENT_OP_RELEASE;
                                w->processed_pkts += vector_sz;
                                continue;
@@ -489,8 +516,9 @@ pipeline_queue_worker_multi_stage_burst_tx_vector(void *arg)
                                                : RTE_SCHED_TYPE_ATOMIC);
                }
 
-               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+               nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
        }
+       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
 
        return 0;
 }
@@ -500,11 +528,11 @@ pipeline_queue_worker_multi_stage_burst_fwd_vector(void *arg)
 {
        PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
        const uint8_t *tx_queue = t->tx_evqueue_id;
+       uint16_t nb_rx = 0, nb_tx = 0;
        uint16_t vector_sz;
 
        while (!t->done) {
-               uint16_t nb_rx =
-                       rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
+               nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
 
                if (!nb_rx) {
                        rte_pause();
@@ -527,8 +555,9 @@ pipeline_queue_worker_multi_stage_burst_fwd_vector(void *arg)
                        }
                }
 
-               pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
+               nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
        }
+       pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
 
        return 0;
 }