examples/eventdev: add non burst mode generic worker
authorPavan Nikhilesh <pbhagavatula@caviumnetworks.com>
Wed, 10 Jan 2018 11:10:04 +0000 (16:40 +0530)
committerJerin Jacob <jerin.jacob@caviumnetworks.com>
Fri, 19 Jan 2018 15:09:56 +0000 (16:09 +0100)
Currently, worker uses burst dequeue and burst enqueue to forward events.
Add a non burst mode based on the event dev capabilities.

Signed-off-by: Pavan Nikhilesh <pbhagavatula@caviumnetworks.com>
Acked-by: Harry van Haaren <harry.van.haaren@intel.com>
examples/eventdev_pipeline_sw_pmd/main.c
examples/eventdev_pipeline_sw_pmd/pipeline_worker_generic.c

index f260944..357584d 100644 (file)
@@ -365,8 +365,16 @@ static void
 do_capability_setup(uint16_t nb_ethdev, uint8_t eventdev_id)
 {
        RTE_SET_USED(nb_ethdev);
-       RTE_SET_USED(eventdev_id);
-       set_worker_generic_setup_data(&fdata->cap, 1);
+       uint8_t burst = 0;
+
+       struct rte_event_dev_info eventdev_info;
+       memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
+
+       rte_event_dev_info_get(eventdev_id, &eventdev_info);
+       burst = eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_BURST_MODE ? 1 :
+               0;
+
+       set_worker_generic_setup_data(&fdata->cap, burst);
 }
 
 static void
index d1b0e1d..f452390 100644 (file)
@@ -6,6 +6,59 @@
 
 #include "pipeline_common.h"
 
+static __rte_always_inline int
+worker_generic(void *arg)
+{
+       struct rte_event ev;
+
+       struct worker_data *data = (struct worker_data *)arg;
+       uint8_t dev_id = data->dev_id;
+       uint8_t port_id = data->port_id;
+       size_t sent = 0, received = 0;
+       unsigned int lcore_id = rte_lcore_id();
+
+       while (!fdata->done) {
+
+               if (fdata->cap.scheduler)
+                       fdata->cap.scheduler(lcore_id);
+
+               if (!fdata->worker_core[lcore_id]) {
+                       rte_pause();
+                       continue;
+               }
+
+               const uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
+                               &ev, 1, 0);
+
+               if (nb_rx == 0) {
+                       rte_pause();
+                       continue;
+               }
+               received++;
+
+               /* The first worker stage does classification */
+               if (ev.queue_id == cdata.qid[0])
+                       ev.flow_id = ev.mbuf->hash.rss
+                                               % cdata.num_fids;
+
+               ev.queue_id = cdata.next_qid[ev.queue_id];
+               ev.op = RTE_EVENT_OP_FORWARD;
+               ev.sched_type = cdata.queue_type;
+
+               work(ev.mbuf);
+
+               while (rte_event_enqueue_burst(dev_id, port_id, &ev, 1) != 1)
+                       rte_pause();
+               sent++;
+       }
+
+       if (!cdata.quiet)
+               printf("  worker %u thread done. RX=%zu TX=%zu\n",
+                               rte_lcore_id(), received, sent);
+
+       return 0;
+}
+
 static int
 worker_generic_burst(void *arg)
 {
@@ -66,6 +119,69 @@ worker_generic_burst(void *arg)
        return 0;
 }
 
+static __rte_always_inline int
+consumer(void)
+{
+       const uint64_t freq_khz = rte_get_timer_hz() / 1000;
+       struct rte_event packet;
+
+       static uint64_t received;
+       static uint64_t last_pkts;
+       static uint64_t last_time;
+       static uint64_t start_time;
+       int i;
+       uint8_t dev_id = cons_data.dev_id;
+       uint8_t port_id = cons_data.port_id;
+
+       do {
+               uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
+                               &packet, 1, 0);
+
+               if (n == 0) {
+                       for (i = 0; i < rte_eth_dev_count(); i++)
+                               rte_eth_tx_buffer_flush(i, 0, fdata->tx_buf[i]);
+                       return 0;
+               }
+               if (start_time == 0)
+                       last_time = start_time = rte_get_timer_cycles();
+
+               received++;
+               uint8_t outport = packet.mbuf->port;
+
+               rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
+                               packet.mbuf);
+
+               if (cons_data.release)
+                       rte_event_enqueue_burst(dev_id, port_id,
+                                                               &packet, n);
+
+               /* Print out mpps every 1<22 packets */
+               if (!cdata.quiet && received >= last_pkts + (1<<22)) {
+                       const uint64_t now = rte_get_timer_cycles();
+                       const uint64_t total_ms = (now - start_time) / freq_khz;
+                       const uint64_t delta_ms = (now - last_time) / freq_khz;
+                       uint64_t delta_pkts = received - last_pkts;
+
+                       printf("# %s RX=%"PRIu64", time %"PRIu64 "ms, "
+                                       "avg %.3f mpps [current %.3f mpps]\n",
+                                       __func__,
+                                       received,
+                                       total_ms,
+                                       received / (total_ms * 1000.0),
+                                       delta_pkts / (delta_ms * 1000.0));
+                       last_pkts = received;
+                       last_time = now;
+               }
+
+               cdata.num_packets--;
+               if (cdata.num_packets <= 0)
+                       fdata->done = 1;
+       /* Be stuck in this loop if single. */
+       } while (!fdata->done && fdata->tx_single);
+
+       return 0;
+}
+
 static __rte_always_inline int
 consumer_burst(void)
 {
@@ -430,9 +546,13 @@ generic_opt_check(void)
 void
 set_worker_generic_setup_data(struct setup_data *caps, bool burst)
 {
-       RTE_SET_USED(burst);
-       caps->consumer = consumer_burst;
-       caps->worker = worker_generic_burst;
+       if (burst) {
+               caps->consumer = consumer_burst;
+               caps->worker = worker_generic_burst;
+       } else {
+               caps->consumer = consumer;
+               caps->worker = worker_generic;
+       }
 
        caps->adptr_setup = init_rx_adapter;
        caps->scheduler = schedule_devices;