From c47f70e17d8c6d24e0f679a542002bf7991e0add Mon Sep 17 00:00:00 2001 From: Pavan Nikhilesh Date: Wed, 10 Jan 2018 16:40:07 +0530 Subject: [PATCH] examples/eventdev: add burst for thread safe pipeline Add burst mode worker pipeline when Tx is multi thread safe. Signed-off-by: Pavan Nikhilesh Acked-by: Harry van Haaren --- .../pipeline_worker_tx.c | 74 ++++++++++++++++++- 1 file changed, 72 insertions(+), 2 deletions(-) diff --git a/examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c b/examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c index 397b1013fb..419d8d4107 100644 --- a/examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c +++ b/examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c @@ -22,6 +22,19 @@ worker_event_enqueue(const uint8_t dev, const uint8_t port, rte_pause(); } +static __rte_always_inline void +worker_event_enqueue_burst(const uint8_t dev, const uint8_t port, + struct rte_event *ev, const uint16_t nb_rx) +{ + uint16_t enq; + + enq = rte_event_enqueue_burst(dev, port, ev, nb_rx); + while (enq < nb_rx) { + enq += rte_event_enqueue_burst(dev, port, + ev + enq, nb_rx - enq); + } +} + static __rte_always_inline void worker_tx_pkt(struct rte_mbuf *mbuf) { @@ -81,6 +94,61 @@ worker_do_tx(void *arg) return 0; } +static int +worker_do_tx_burst(void *arg) +{ + struct rte_event ev[BATCH_SIZE]; + + struct worker_data *data = (struct worker_data *)arg; + uint8_t dev = data->dev_id; + uint8_t port = data->port_id; + uint8_t lst_qid = cdata.num_stages - 1; + size_t fwd = 0, received = 0, tx = 0; + + while (!fdata->done) { + uint16_t i; + const uint16_t nb_rx = rte_event_dequeue_burst(dev, port, + ev, BATCH_SIZE, 0); + + if (nb_rx == 0) { + rte_pause(); + continue; + } + received += nb_rx; + + for (i = 0; i < nb_rx; i++) { + const uint8_t cq_id = ev[i].queue_id % cdata.num_stages; + + if (cq_id >= lst_qid) { + if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { + worker_tx_pkt(ev[i].mbuf); + tx++; + ev[i].op = RTE_EVENT_OP_RELEASE; + continue; + } + ev[i].queue_id = (cq_id == lst_qid) ? + cdata.next_qid[ev[i].queue_id] : + ev[i].queue_id; + + worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); + } else { + ev[i].queue_id = cdata.next_qid[ev[i].queue_id]; + worker_fwd_event(&ev[i], cdata.queue_type); + } + work(); + } + worker_event_enqueue_burst(dev, port, ev, nb_rx); + + fwd += nb_rx; + } + + if (!cdata.quiet) + printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", + rte_lcore_id(), received, fwd, tx); + + return 0; +} + static int setup_eventdev_worker_tx(struct cons_data *cons_data, struct worker_data *worker_data) @@ -412,8 +480,10 @@ worker_tx_opt_check(void) void set_worker_tx_setup_data(struct setup_data *caps, bool burst) { - RTE_SET_USED(burst); - caps->worker = worker_do_tx; + if (burst) + caps->worker = worker_do_tx_burst; + else + caps->worker = worker_do_tx; memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE); -- 2.20.1