From fa8054c8c889a8aef3b14f0e246f2513aa31dbc9 Mon Sep 17 00:00:00 2001 From: Pavan Nikhilesh Date: Wed, 10 Jan 2018 16:40:06 +0530 Subject: [PATCH] examples/eventdev: add thread safe Tx worker pipeline Add worker pipeline when Tx is multi thread safe. Probe Ethernet dev capabilities and select it it is supported. Signed-off-by: Pavan Nikhilesh Acked-by: Harry van Haaren --- examples/eventdev_pipeline_sw_pmd/Makefile | 1 + examples/eventdev_pipeline_sw_pmd/main.c | 18 +- .../pipeline_common.h | 5 + .../pipeline_worker_tx.c | 425 ++++++++++++++++++ 4 files changed, 447 insertions(+), 2 deletions(-) create mode 100644 examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c diff --git a/examples/eventdev_pipeline_sw_pmd/Makefile b/examples/eventdev_pipeline_sw_pmd/Makefile index f2578da059..c099adff3c 100644 --- a/examples/eventdev_pipeline_sw_pmd/Makefile +++ b/examples/eventdev_pipeline_sw_pmd/Makefile @@ -16,6 +16,7 @@ APP = eventdev_pipeline_sw_pmd # all source are stored in SRCS-y SRCS-y := main.c SRCS-y += pipeline_worker_generic.c +SRCS-y += pipeline_worker_tx.c CFLAGS += -O3 CFLAGS += $(WERROR_FLAGS) diff --git a/examples/eventdev_pipeline_sw_pmd/main.c b/examples/eventdev_pipeline_sw_pmd/main.c index 357584d9f8..e07f68eb69 100644 --- a/examples/eventdev_pipeline_sw_pmd/main.c +++ b/examples/eventdev_pipeline_sw_pmd/main.c @@ -364,9 +364,20 @@ init_ports(unsigned int num_ports) static void do_capability_setup(uint16_t nb_ethdev, uint8_t eventdev_id) { - RTE_SET_USED(nb_ethdev); + int i; + uint8_t mt_unsafe = 0; uint8_t burst = 0; + for (i = 0; i < nb_ethdev; i++) { + struct rte_eth_dev_info dev_info; + memset(&dev_info, 0, sizeof(struct rte_eth_dev_info)); + + rte_eth_dev_info_get(i, &dev_info); + /* Check if it is safe ask worker to tx. */ + mt_unsafe |= !(dev_info.tx_offload_capa & + DEV_TX_OFFLOAD_MT_LOCKFREE); + } + struct rte_event_dev_info eventdev_info; memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info)); @@ -374,7 +385,10 @@ do_capability_setup(uint16_t nb_ethdev, uint8_t eventdev_id) burst = eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_BURST_MODE ? 1 : 0; - set_worker_generic_setup_data(&fdata->cap, burst); + if (mt_unsafe) + set_worker_generic_setup_data(&fdata->cap, burst); + else + set_worker_tx_setup_data(&fdata->cap, burst); } static void diff --git a/examples/eventdev_pipeline_sw_pmd/pipeline_common.h b/examples/eventdev_pipeline_sw_pmd/pipeline_common.h index d58059b78e..e063200505 100644 --- a/examples/eventdev_pipeline_sw_pmd/pipeline_common.h +++ b/examples/eventdev_pipeline_sw_pmd/pipeline_common.h @@ -79,6 +79,10 @@ struct config_data { int dump_dev_signal; unsigned int num_stages; unsigned int worker_cq_depth; + unsigned int rx_stride; + /* Use rx stride value to reduce congestion in entry queue when using + * multiple eth ports by forming multiple event queue pipelines. + */ int16_t next_qid[MAX_NUM_STAGES+2]; int16_t qid[MAX_NUM_STAGES]; uint8_t rx_adapter_id; @@ -144,3 +148,4 @@ schedule_devices(unsigned int lcore_id) } void set_worker_generic_setup_data(struct setup_data *caps, bool burst); +void set_worker_tx_setup_data(struct setup_data *caps, bool burst); diff --git a/examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c b/examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c new file mode 100644 index 0000000000..397b1013fb --- /dev/null +++ b/examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c @@ -0,0 +1,425 @@ +/* + * SPDX-License-Identifier: BSD-3-Clause + * Copyright(c) 2010-2014 Intel Corporation + * Copyright 2017 Cavium, Inc. + */ + +#include "pipeline_common.h" + +static __rte_always_inline void +worker_fwd_event(struct rte_event *ev, uint8_t sched) +{ + ev->event_type = RTE_EVENT_TYPE_CPU; + ev->op = RTE_EVENT_OP_FORWARD; + ev->sched_type = sched; +} + +static __rte_always_inline void +worker_event_enqueue(const uint8_t dev, const uint8_t port, + struct rte_event *ev) +{ + while (rte_event_enqueue_burst(dev, port, ev, 1) != 1) + rte_pause(); +} + +static __rte_always_inline void +worker_tx_pkt(struct rte_mbuf *mbuf) +{ + exchange_mac(mbuf); + while (rte_eth_tx_burst(mbuf->port, 0, &mbuf, 1) != 1) + rte_pause(); +} + +/* Multi stage Pipeline Workers */ + +static int +worker_do_tx(void *arg) +{ + struct rte_event ev; + + struct worker_data *data = (struct worker_data *)arg; + const uint8_t dev = data->dev_id; + const uint8_t port = data->port_id; + const uint8_t lst_qid = cdata.num_stages - 1; + size_t fwd = 0, received = 0, tx = 0; + + + while (!fdata->done) { + + if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { + rte_pause(); + continue; + } + + received++; + const uint8_t cq_id = ev.queue_id % cdata.num_stages; + + if (cq_id >= lst_qid) { + if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { + worker_tx_pkt(ev.mbuf); + tx++; + continue; + } + + worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); + ev.queue_id = (cq_id == lst_qid) ? + cdata.next_qid[ev.queue_id] : ev.queue_id; + } else { + ev.queue_id = cdata.next_qid[ev.queue_id]; + worker_fwd_event(&ev, cdata.queue_type); + } + work(); + + worker_event_enqueue(dev, port, &ev); + fwd++; + } + + if (!cdata.quiet) + printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", + rte_lcore_id(), received, fwd, tx); + + return 0; +} + +static int +setup_eventdev_worker_tx(struct cons_data *cons_data, + struct worker_data *worker_data) +{ + RTE_SET_USED(cons_data); + uint8_t i; + const uint8_t dev_id = 0; + const uint8_t nb_ports = cdata.num_workers; + uint8_t nb_slots = 0; + uint8_t nb_queues = rte_eth_dev_count() * cdata.num_stages; + nb_queues += rte_eth_dev_count(); + + struct rte_event_dev_config config = { + .nb_event_queues = nb_queues, + .nb_event_ports = nb_ports, + .nb_events_limit = 4096, + .nb_event_queue_flows = 1024, + .nb_event_port_dequeue_depth = 128, + .nb_event_port_enqueue_depth = 128, + }; + struct rte_event_port_conf wkr_p_conf = { + .dequeue_depth = cdata.worker_cq_depth, + .enqueue_depth = 64, + .new_event_threshold = 4096, + }; + struct rte_event_queue_conf wkr_q_conf = { + .schedule_type = cdata.queue_type, + .priority = RTE_EVENT_DEV_PRIORITY_NORMAL, + .nb_atomic_flows = 1024, + .nb_atomic_order_sequences = 1024, + }; + + int ret, ndev = rte_event_dev_count(); + + if (ndev < 1) { + printf("%d: No Eventdev Devices Found\n", __LINE__); + return -1; + } + + + struct rte_event_dev_info dev_info; + ret = rte_event_dev_info_get(dev_id, &dev_info); + printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name); + + if (dev_info.max_event_port_dequeue_depth < + config.nb_event_port_dequeue_depth) + config.nb_event_port_dequeue_depth = + dev_info.max_event_port_dequeue_depth; + if (dev_info.max_event_port_enqueue_depth < + config.nb_event_port_enqueue_depth) + config.nb_event_port_enqueue_depth = + dev_info.max_event_port_enqueue_depth; + + ret = rte_event_dev_configure(dev_id, &config); + if (ret < 0) { + printf("%d: Error configuring device\n", __LINE__); + return -1; + } + + printf(" Stages:\n"); + for (i = 0; i < nb_queues; i++) { + + uint8_t slot; + + nb_slots = cdata.num_stages + 1; + slot = i % nb_slots; + wkr_q_conf.schedule_type = slot == cdata.num_stages ? + RTE_SCHED_TYPE_ATOMIC : cdata.queue_type; + + if (rte_event_queue_setup(dev_id, i, &wkr_q_conf) < 0) { + printf("%d: error creating qid %d\n", __LINE__, i); + return -1; + } + cdata.qid[i] = i; + cdata.next_qid[i] = i+1; + if (cdata.enable_queue_priorities) { + const uint32_t prio_delta = + (RTE_EVENT_DEV_PRIORITY_LOWEST) / + nb_slots; + + /* higher priority for queues closer to tx */ + wkr_q_conf.priority = + RTE_EVENT_DEV_PRIORITY_LOWEST - prio_delta * + (i % nb_slots); + } + + const char *type_str = "Atomic"; + switch (wkr_q_conf.schedule_type) { + case RTE_SCHED_TYPE_ORDERED: + type_str = "Ordered"; + break; + case RTE_SCHED_TYPE_PARALLEL: + type_str = "Parallel"; + break; + } + printf("\tStage %d, Type %s\tPriority = %d\n", i, type_str, + wkr_q_conf.priority); + } + + printf("\n"); + if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth) + wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth; + if (wkr_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth) + wkr_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth; + + /* set up one port per worker, linking to all stage queues */ + for (i = 0; i < cdata.num_workers; i++) { + struct worker_data *w = &worker_data[i]; + w->dev_id = dev_id; + if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) { + printf("Error setting up port %d\n", i); + return -1; + } + + if (rte_event_port_link(dev_id, i, NULL, NULL, 0) + != nb_queues) { + printf("%d: error creating link for port %d\n", + __LINE__, i); + return -1; + } + w->port_id = i; + } + /* + * Reduce the load on ingress event queue by splitting the traffic + * across multiple event queues. + * for example, nb_stages = 2 and nb_ethdev = 2 then + * + * nb_queues = (2 * 2) + 2 = 6 (non atq) + * rx_stride = 3 + * + * So, traffic is split across queue 0 and queue 3 since queue id for + * rx adapter is chosen * i.e in the above + * case eth port 0, 1 will inject packets into event queue 0, 3 + * respectively. + * + * This forms two set of queue pipelines 0->1->2->tx and 3->4->5->tx. + */ + cdata.rx_stride = nb_slots; + ret = rte_event_dev_service_id_get(dev_id, + &fdata->evdev_service_id); + if (ret != -ESRCH && ret != 0) { + printf("Error getting the service ID\n"); + return -1; + } + rte_service_runstate_set(fdata->evdev_service_id, 1); + rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0); + if (rte_event_dev_start(dev_id) < 0) { + printf("Error starting eventdev\n"); + return -1; + } + + return dev_id; +} + + +struct rx_adptr_services { + uint16_t nb_rx_adptrs; + uint32_t *rx_adpt_arr; +}; + +static int32_t +service_rx_adapter(void *arg) +{ + int i; + struct rx_adptr_services *adptr_services = arg; + + for (i = 0; i < adptr_services->nb_rx_adptrs; i++) + rte_service_run_iter_on_app_lcore( + adptr_services->rx_adpt_arr[i], 1); + return 0; +} + +static void +init_rx_adapter(uint16_t nb_ports) +{ + int i; + int ret; + uint8_t evdev_id = 0; + struct rx_adptr_services *adptr_services = NULL; + struct rte_event_dev_info dev_info; + + ret = rte_event_dev_info_get(evdev_id, &dev_info); + adptr_services = rte_zmalloc(NULL, sizeof(struct rx_adptr_services), 0); + + struct rte_event_port_conf rx_p_conf = { + .dequeue_depth = 8, + .enqueue_depth = 8, + .new_event_threshold = 1200, + }; + + if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth) + rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth; + if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth) + rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth; + + + struct rte_event_eth_rx_adapter_queue_conf queue_conf = { + .ev.sched_type = cdata.queue_type, + }; + + for (i = 0; i < nb_ports; i++) { + uint32_t cap; + uint32_t service_id; + + ret = rte_event_eth_rx_adapter_create(i, evdev_id, &rx_p_conf); + if (ret) + rte_exit(EXIT_FAILURE, + "failed to create rx adapter[%d]", + cdata.rx_adapter_id); + + ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap); + if (ret) + rte_exit(EXIT_FAILURE, + "failed to get event rx adapter " + "capabilities"); + + queue_conf.ev.queue_id = cdata.rx_stride ? + (i * cdata.rx_stride) + : (uint8_t)cdata.qid[0]; + + ret = rte_event_eth_rx_adapter_queue_add(i, i, -1, &queue_conf); + if (ret) + rte_exit(EXIT_FAILURE, + "Failed to add queues to Rx adapter"); + + + /* Producer needs to be scheduled. */ + if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) { + ret = rte_event_eth_rx_adapter_service_id_get(i, + &service_id); + if (ret != -ESRCH && ret != 0) { + rte_exit(EXIT_FAILURE, + "Error getting the service ID for rx adptr\n"); + } + + rte_service_runstate_set(service_id, 1); + rte_service_set_runstate_mapped_check(service_id, 0); + + adptr_services->nb_rx_adptrs++; + adptr_services->rx_adpt_arr = rte_realloc( + adptr_services->rx_adpt_arr, + adptr_services->nb_rx_adptrs * + sizeof(uint32_t), 0); + adptr_services->rx_adpt_arr[ + adptr_services->nb_rx_adptrs - 1] = + service_id; + } + + ret = rte_event_eth_rx_adapter_start(i); + if (ret) + rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed", + cdata.rx_adapter_id); + } + + if (adptr_services->nb_rx_adptrs) { + struct rte_service_spec service; + + memset(&service, 0, sizeof(struct rte_service_spec)); + snprintf(service.name, sizeof(service.name), "rx_service"); + service.callback = service_rx_adapter; + service.callback_userdata = (void *)adptr_services; + + int32_t ret = rte_service_component_register(&service, + &fdata->rxadptr_service_id); + if (ret) + rte_exit(EXIT_FAILURE, + "Rx adapter[%d] service register failed", + cdata.rx_adapter_id); + + rte_service_runstate_set(fdata->rxadptr_service_id, 1); + rte_service_component_runstate_set(fdata->rxadptr_service_id, + 1); + rte_service_set_runstate_mapped_check(fdata->rxadptr_service_id, + 0); + } else { + memset(fdata->rx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE); + rte_free(adptr_services); + } + + if (!adptr_services->nb_rx_adptrs && fdata->cap.consumer == NULL && + (dev_info.event_dev_cap & + RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)) + fdata->cap.scheduler = NULL; + + if (dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED) + memset(fdata->sched_core, 0, + sizeof(unsigned int) * MAX_NUM_CORE); +} + +static void +worker_tx_opt_check(void) +{ + int i; + int ret; + uint32_t cap = 0; + uint8_t rx_needed = 0; + struct rte_event_dev_info eventdev_info; + + memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info)); + rte_event_dev_info_get(0, &eventdev_info); + + for (i = 0; i < rte_eth_dev_count(); i++) { + ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap); + if (ret) + rte_exit(EXIT_FAILURE, + "failed to get event rx adapter " + "capabilities"); + rx_needed |= + !(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT); + } + + if (cdata.worker_lcore_mask == 0 || + (rx_needed && cdata.rx_lcore_mask == 0) || + (cdata.sched_lcore_mask == 0 && + !(eventdev_info.event_dev_cap & + RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))) { + printf("Core part of pipeline was not assigned any cores. " + "This will stall the pipeline, please check core masks " + "(use -h for details on setting core masks):\n" + "\trx: %"PRIu64"\n\ttx: %"PRIu64"\n\tsched: %"PRIu64 + "\n\tworkers: %"PRIu64"\n", + cdata.rx_lcore_mask, cdata.tx_lcore_mask, + cdata.sched_lcore_mask, + cdata.worker_lcore_mask); + rte_exit(-1, "Fix core masks\n"); + } +} + +void +set_worker_tx_setup_data(struct setup_data *caps, bool burst) +{ + RTE_SET_USED(burst); + caps->worker = worker_do_tx; + + memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE); + + caps->check_opt = worker_tx_opt_check; + caps->consumer = NULL; + caps->scheduler = schedule_devices; + caps->evdev_setup = setup_eventdev_worker_tx; + caps->adptr_setup = init_rx_adapter; +} -- 2.20.1