From: Pavan Nikhilesh Date: Wed, 10 Jan 2018 11:10:08 +0000 (+0530) Subject: examples/eventdev: add all type queue option X-Git-Tag: spdx-start~173 X-Git-Url: http://git.droids-corp.org/?a=commitdiff_plain;h=afbfb754927928840ac28dfb46c21146a8fd2907;p=dpdk.git examples/eventdev: add all type queue option Added configurable option to make queue type as all type queues i.e. RTE_EVENT_QUEUE_CFG_ALL_TYPES based on event dev capability RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES. This can be enabled by supplying '-a' as a cmdline argument. Signed-off-by: Pavan Nikhilesh Acked-by: Harry van Haaren --- diff --git a/examples/eventdev_pipeline_sw_pmd/main.c b/examples/eventdev_pipeline_sw_pmd/main.c index e07f68eb69..81ba3406cf 100644 --- a/examples/eventdev_pipeline_sw_pmd/main.c +++ b/examples/eventdev_pipeline_sw_pmd/main.c @@ -120,6 +120,7 @@ static struct option long_options[] = { {"parallel", no_argument, 0, 'p'}, {"ordered", no_argument, 0, 'o'}, {"quiet", no_argument, 0, 'q'}, + {"use-atq", no_argument, 0, 'a'}, {"dump", no_argument, 0, 'D'}, {0, 0, 0, 0} }; @@ -143,6 +144,7 @@ usage(void) " -o, --ordered Use ordered scheduling\n" " -p, --parallel Use parallel scheduling\n" " -q, --quiet Minimize printed output\n" + " -a, --use-atq Use all type queues\n" " -D, --dump Print detailed statistics before exit" "\n"; fprintf(stderr, "%s", usage_str); @@ -163,7 +165,7 @@ parse_app_args(int argc, char **argv) int i; for (;;) { - c = getopt_long(argc, argv, "r:t:e:c:w:n:f:s:poPqDW:", + c = getopt_long(argc, argv, "r:t:e:c:w:n:f:s:paoPqDW:", long_options, &option_index); if (c == -1) break; @@ -196,6 +198,9 @@ parse_app_args(int argc, char **argv) case 'p': cdata.queue_type = RTE_SCHED_TYPE_PARALLEL; break; + case 'a': + cdata.all_type_queues = 1; + break; case 'q': cdata.quiet = 1; break; diff --git a/examples/eventdev_pipeline_sw_pmd/pipeline_common.h b/examples/eventdev_pipeline_sw_pmd/pipeline_common.h index e063200505..66553038ca 100644 --- a/examples/eventdev_pipeline_sw_pmd/pipeline_common.h +++ b/examples/eventdev_pipeline_sw_pmd/pipeline_common.h @@ -77,6 +77,7 @@ struct config_data { int quiet; int dump_dev; int dump_dev_signal; + int all_type_queues; unsigned int num_stages; unsigned int worker_cq_depth; unsigned int rx_stride; diff --git a/examples/eventdev_pipeline_sw_pmd/pipeline_worker_generic.c b/examples/eventdev_pipeline_sw_pmd/pipeline_worker_generic.c index 90f87709c3..2c51f4a306 100644 --- a/examples/eventdev_pipeline_sw_pmd/pipeline_worker_generic.c +++ b/examples/eventdev_pipeline_sw_pmd/pipeline_worker_generic.c @@ -516,6 +516,11 @@ generic_opt_check(void) memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info)); rte_event_dev_info_get(0, &eventdev_info); + if (cdata.all_type_queues && !(eventdev_info.event_dev_cap & + RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES)) + rte_exit(EXIT_FAILURE, + "Event dev doesn't support all type queues\n"); + for (i = 0; i < rte_eth_dev_count(); i++) { ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap); if (ret) diff --git a/examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c b/examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c index 419d8d4107..25eabbd6ca 100644 --- a/examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c +++ b/examples/eventdev_pipeline_sw_pmd/pipeline_worker_tx.c @@ -94,6 +94,52 @@ worker_do_tx(void *arg) return 0; } +static int +worker_do_tx_atq(void *arg) +{ + struct rte_event ev; + + struct worker_data *data = (struct worker_data *)arg; + const uint8_t dev = data->dev_id; + const uint8_t port = data->port_id; + const uint8_t lst_qid = cdata.num_stages - 1; + size_t fwd = 0, received = 0, tx = 0; + + while (!fdata->done) { + + if (!rte_event_dequeue_burst(dev, port, &ev, 1, 0)) { + rte_pause(); + continue; + } + + received++; + const uint8_t cq_id = ev.sub_event_type % cdata.num_stages; + + if (cq_id == lst_qid) { + if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { + worker_tx_pkt(ev.mbuf); + tx++; + continue; + } + + worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); + } else { + ev.sub_event_type++; + worker_fwd_event(&ev, cdata.queue_type); + } + work(); + + worker_event_enqueue(dev, port, &ev); + fwd++; + } + + if (!cdata.quiet) + printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", + rte_lcore_id(), received, fwd, tx); + + return 0; +} + static int worker_do_tx_burst(void *arg) { @@ -149,17 +195,81 @@ worker_do_tx_burst(void *arg) return 0; } +static int +worker_do_tx_burst_atq(void *arg) +{ + struct rte_event ev[BATCH_SIZE]; + + struct worker_data *data = (struct worker_data *)arg; + uint8_t dev = data->dev_id; + uint8_t port = data->port_id; + uint8_t lst_qid = cdata.num_stages - 1; + size_t fwd = 0, received = 0, tx = 0; + + while (!fdata->done) { + uint16_t i; + + const uint16_t nb_rx = rte_event_dequeue_burst(dev, port, + ev, BATCH_SIZE, 0); + + if (nb_rx == 0) { + rte_pause(); + continue; + } + received += nb_rx; + + for (i = 0; i < nb_rx; i++) { + const uint8_t cq_id = ev[i].sub_event_type % + cdata.num_stages; + + if (cq_id == lst_qid) { + if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { + worker_tx_pkt(ev[i].mbuf); + tx++; + ev[i].op = RTE_EVENT_OP_RELEASE; + continue; + } + + worker_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC); + } else { + ev[i].sub_event_type++; + worker_fwd_event(&ev[i], cdata.queue_type); + } + work(); + } + + worker_event_enqueue_burst(dev, port, ev, nb_rx); + fwd += nb_rx; + } + + if (!cdata.quiet) + printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n", + rte_lcore_id(), received, fwd, tx); + + return 0; +} + static int setup_eventdev_worker_tx(struct cons_data *cons_data, struct worker_data *worker_data) { RTE_SET_USED(cons_data); uint8_t i; + const uint8_t atq = cdata.all_type_queues ? 1 : 0; const uint8_t dev_id = 0; const uint8_t nb_ports = cdata.num_workers; uint8_t nb_slots = 0; - uint8_t nb_queues = rte_eth_dev_count() * cdata.num_stages; - nb_queues += rte_eth_dev_count(); + uint8_t nb_queues = rte_eth_dev_count(); + + /* + * In case where all type queues are not enabled, use queues equal to + * number of stages * eth_dev_count and one extra queue per pipeline + * for Tx. + */ + if (!atq) { + nb_queues *= cdata.num_stages; + nb_queues += rte_eth_dev_count(); + } struct rte_event_dev_config config = { .nb_event_queues = nb_queues, @@ -211,12 +321,19 @@ setup_eventdev_worker_tx(struct cons_data *cons_data, printf(" Stages:\n"); for (i = 0; i < nb_queues; i++) { - uint8_t slot; + if (atq) { - nb_slots = cdata.num_stages + 1; - slot = i % nb_slots; - wkr_q_conf.schedule_type = slot == cdata.num_stages ? - RTE_SCHED_TYPE_ATOMIC : cdata.queue_type; + nb_slots = cdata.num_stages; + wkr_q_conf.event_queue_cfg = + RTE_EVENT_QUEUE_CFG_ALL_TYPES; + } else { + uint8_t slot; + + nb_slots = cdata.num_stages + 1; + slot = i % nb_slots; + wkr_q_conf.schedule_type = slot == cdata.num_stages ? + RTE_SCHED_TYPE_ATOMIC : cdata.queue_type; + } if (rte_event_queue_setup(dev_id, i, &wkr_q_conf) < 0) { printf("%d: error creating qid %d\n", __LINE__, i); @@ -286,7 +403,7 @@ setup_eventdev_worker_tx(struct cons_data *cons_data, * * This forms two set of queue pipelines 0->1->2->tx and 3->4->5->tx. */ - cdata.rx_stride = nb_slots; + cdata.rx_stride = atq ? 1 : nb_slots; ret = rte_event_dev_service_id_get(dev_id, &fdata->evdev_service_id); if (ret != -ESRCH && ret != 0) { @@ -450,6 +567,11 @@ worker_tx_opt_check(void) memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info)); rte_event_dev_info_get(0, &eventdev_info); + if (cdata.all_type_queues && !(eventdev_info.event_dev_cap & + RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES)) + rte_exit(EXIT_FAILURE, + "Event dev doesn't support all type queues\n"); + for (i = 0; i < rte_eth_dev_count(); i++) { ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap); if (ret) @@ -477,13 +599,33 @@ worker_tx_opt_check(void) } } +static worker_loop +get_worker_loop_burst(uint8_t atq) +{ + if (atq) + return worker_do_tx_burst_atq; + + return worker_do_tx_burst; +} + +static worker_loop +get_worker_loop_non_burst(uint8_t atq) +{ + if (atq) + return worker_do_tx_atq; + + return worker_do_tx; +} + void set_worker_tx_setup_data(struct setup_data *caps, bool burst) { + uint8_t atq = cdata.all_type_queues ? 1 : 0; + if (burst) - caps->worker = worker_do_tx_burst; + caps->worker = get_worker_loop_burst(atq); else - caps->worker = worker_do_tx; + caps->worker = get_worker_loop_non_burst(atq); memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE);