From 52548a6df510e811956e88e82b77c60b88ce0fd7 Mon Sep 17 00:00:00 2001 From: Pavan Nikhilesh Date: Wed, 10 Jan 2018 16:40:04 +0530 Subject: [PATCH] examples/eventdev: add non burst mode generic worker Currently, worker uses burst dequeue and burst enqueue to forward events. Add a non burst mode based on the event dev capabilities. Signed-off-by: Pavan Nikhilesh Acked-by: Harry van Haaren --- examples/eventdev_pipeline_sw_pmd/main.c | 12 +- .../pipeline_worker_generic.c | 126 +++++++++++++++++- 2 files changed, 133 insertions(+), 5 deletions(-) diff --git a/examples/eventdev_pipeline_sw_pmd/main.c b/examples/eventdev_pipeline_sw_pmd/main.c index f260944fd4..357584d9f8 100644 --- a/examples/eventdev_pipeline_sw_pmd/main.c +++ b/examples/eventdev_pipeline_sw_pmd/main.c @@ -365,8 +365,16 @@ static void do_capability_setup(uint16_t nb_ethdev, uint8_t eventdev_id) { RTE_SET_USED(nb_ethdev); - RTE_SET_USED(eventdev_id); - set_worker_generic_setup_data(&fdata->cap, 1); + uint8_t burst = 0; + + struct rte_event_dev_info eventdev_info; + memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info)); + + rte_event_dev_info_get(eventdev_id, &eventdev_info); + burst = eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_BURST_MODE ? 1 : + 0; + + set_worker_generic_setup_data(&fdata->cap, burst); } static void diff --git a/examples/eventdev_pipeline_sw_pmd/pipeline_worker_generic.c b/examples/eventdev_pipeline_sw_pmd/pipeline_worker_generic.c index d1b0e1db1b..f4523902b8 100644 --- a/examples/eventdev_pipeline_sw_pmd/pipeline_worker_generic.c +++ b/examples/eventdev_pipeline_sw_pmd/pipeline_worker_generic.c @@ -6,6 +6,59 @@ #include "pipeline_common.h" +static __rte_always_inline int +worker_generic(void *arg) +{ + struct rte_event ev; + + struct worker_data *data = (struct worker_data *)arg; + uint8_t dev_id = data->dev_id; + uint8_t port_id = data->port_id; + size_t sent = 0, received = 0; + unsigned int lcore_id = rte_lcore_id(); + + while (!fdata->done) { + + if (fdata->cap.scheduler) + fdata->cap.scheduler(lcore_id); + + if (!fdata->worker_core[lcore_id]) { + rte_pause(); + continue; + } + + const uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id, + &ev, 1, 0); + + if (nb_rx == 0) { + rte_pause(); + continue; + } + received++; + + /* The first worker stage does classification */ + if (ev.queue_id == cdata.qid[0]) + ev.flow_id = ev.mbuf->hash.rss + % cdata.num_fids; + + ev.queue_id = cdata.next_qid[ev.queue_id]; + ev.op = RTE_EVENT_OP_FORWARD; + ev.sched_type = cdata.queue_type; + + work(ev.mbuf); + + while (rte_event_enqueue_burst(dev_id, port_id, &ev, 1) != 1) + rte_pause(); + sent++; + } + + if (!cdata.quiet) + printf(" worker %u thread done. RX=%zu TX=%zu\n", + rte_lcore_id(), received, sent); + + return 0; +} + static int worker_generic_burst(void *arg) { @@ -66,6 +119,69 @@ worker_generic_burst(void *arg) return 0; } +static __rte_always_inline int +consumer(void) +{ + const uint64_t freq_khz = rte_get_timer_hz() / 1000; + struct rte_event packet; + + static uint64_t received; + static uint64_t last_pkts; + static uint64_t last_time; + static uint64_t start_time; + int i; + uint8_t dev_id = cons_data.dev_id; + uint8_t port_id = cons_data.port_id; + + do { + uint16_t n = rte_event_dequeue_burst(dev_id, port_id, + &packet, 1, 0); + + if (n == 0) { + for (i = 0; i < rte_eth_dev_count(); i++) + rte_eth_tx_buffer_flush(i, 0, fdata->tx_buf[i]); + return 0; + } + if (start_time == 0) + last_time = start_time = rte_get_timer_cycles(); + + received++; + uint8_t outport = packet.mbuf->port; + + rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport], + packet.mbuf); + + if (cons_data.release) + rte_event_enqueue_burst(dev_id, port_id, + &packet, n); + + /* Print out mpps every 1<22 packets */ + if (!cdata.quiet && received >= last_pkts + (1<<22)) { + const uint64_t now = rte_get_timer_cycles(); + const uint64_t total_ms = (now - start_time) / freq_khz; + const uint64_t delta_ms = (now - last_time) / freq_khz; + uint64_t delta_pkts = received - last_pkts; + + printf("# %s RX=%"PRIu64", time %"PRIu64 "ms, " + "avg %.3f mpps [current %.3f mpps]\n", + __func__, + received, + total_ms, + received / (total_ms * 1000.0), + delta_pkts / (delta_ms * 1000.0)); + last_pkts = received; + last_time = now; + } + + cdata.num_packets--; + if (cdata.num_packets <= 0) + fdata->done = 1; + /* Be stuck in this loop if single. */ + } while (!fdata->done && fdata->tx_single); + + return 0; +} + static __rte_always_inline int consumer_burst(void) { @@ -430,9 +546,13 @@ generic_opt_check(void) void set_worker_generic_setup_data(struct setup_data *caps, bool burst) { - RTE_SET_USED(burst); - caps->consumer = consumer_burst; - caps->worker = worker_generic_burst; + if (burst) { + caps->consumer = consumer_burst; + caps->worker = worker_generic_burst; + } else { + caps->consumer = consumer; + caps->worker = worker_generic; + } caps->adptr_setup = init_rx_adapter; caps->scheduler = schedule_devices; -- 2.20.1