examples/eventdev: add burst for thread safe pipeline
authorPavan Nikhilesh <pbhagavatula@caviumnetworks.com>
Wed, 10 Jan 2018 11:10:07 +0000 (16:40 +0530)
committerJerin Jacob <jerin.jacob@caviumnetworks.com>
Fri, 19 Jan 2018 15:09:56 +0000 (16:09 +0100)
Add burst mode worker pipeline when Tx is multi thread safe.

Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
Acked-by: Harry van Haaren <harry.van.haaren@intel.com>
examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c

index 397b101..419d8d4 100644 (file)
@@ -22,6 +22,19 @@ worker_event_enqueue(const uint8_t dev, const uint8_t port,
                rte_pause();
 }
 
+static __rte_always_inline void
+worker_event_enqueue_burst(const uint8_t dev, const uint8_t port,
+               struct rte_event *ev, const uint16_t nb_rx)
+{
+       uint16_t enq;
+
+       enq = rte_event_enqueue_burst(dev, port, ev, nb_rx);
+       while (enq < nb_rx) {
+               enq += rte_event_enqueue_burst(dev, port,
+                                               ev + enq, nb_rx - enq);
+       }
+}
+
 static __rte_always_inline void
 worker_tx_pkt(struct rte_mbuf *mbuf)
 {
@@ -81,6 +94,61 @@ worker_do_tx(void *arg)
        return 0;
 }
 
+static int
+worker_do_tx_burst(void *arg)
+{
+       struct rte_event ev[BATCH_SIZE];
+
+       struct worker_data *data = (struct worker_data *)arg;
+       uint8_t dev = data->dev_id;
+       uint8_t port = data->port_id;
+       uint8_t lst_qid = cdata.num_stages - 1;
+       size_t fwd = 0, received = 0, tx = 0;
+
+       while (!fdata->done) {
+               uint16_t i;
+               const uint16_t nb_rx = rte_event_dequeue_burst(dev, port,
+                               ev, BATCH_SIZE, 0);
+
+               if (nb_rx == 0) {
+                       rte_pause();
+                       continue;
+               }
+               received += nb_rx;
+
+               for (i = 0; i < nb_rx; i++) {
+                       const uint8_t cq_id = ev[i].queue_id % cdata.num_stages;
+
+                       if (cq_id >= lst_qid) {
+                               if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
+                                       worker_tx_pkt(ev[i].mbuf);
+                                       tx++;
+                                       ev[i].op = RTE_EVENT_OP_RELEASE;
+                                       continue;
+                               }
+                               ev[i].queue_id = (cq_id == lst_qid) ?
+                                       cdata.next_qid[ev[i].queue_id] :
+                                       ev[i].queue_id;
+
+                               worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
+                       } else {
+                               ev[i].queue_id = cdata.next_qid[ev[i].queue_id];
+                               worker_fwd_event(&ev[i], cdata.queue_type);
+                       }
+                       work();
+               }
+               worker_event_enqueue_burst(dev, port, ev, nb_rx);
+
+               fwd += nb_rx;
+       }
+
+       if (!cdata.quiet)
+               printf("  worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
+                               rte_lcore_id(), received, fwd, tx);
+
+       return 0;
+}
+
 static int
 setup_eventdev_worker_tx(struct cons_data *cons_data,
                struct worker_data *worker_data)
@@ -412,8 +480,10 @@ worker_tx_opt_check(void)
 void
 set_worker_tx_setup_data(struct setup_data *caps, bool burst)
 {
-       RTE_SET_USED(burst);
-       caps->worker = worker_do_tx;
+       if (burst)
+               caps->worker = worker_do_tx_burst;
+       else
+               caps->worker = worker_do_tx;
 
        memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE);