X-Git-Url: http://git.droids-corp.org/?a=blobdiff_plain;f=examples%2Feventdev_pipeline%2Fpipeline_worker_tx.c;h=85eb075fc8714c023868711ccab4540861e62d6b;hb=23d25f1a40175343c8a27d539d47fecef477ecf5;hp=fc98128ece03ebd15d6cc21db93b3c00e112a92a;hpb=8728ccf37615904cf23fb8763895b05c9a3c6b0c;p=dpdk.git diff --git a/examples/eventdev_pipeline/pipeline_worker_tx.c b/examples/eventdev_pipeline/pipeline_worker_tx.c index fc98128ece..85eb075fc8 100644 --- a/examples/eventdev_pipeline/pipeline_worker_tx.c +++ b/examples/eventdev_pipeline/pipeline_worker_tx.c @@ -36,10 +36,11 @@ worker_event_enqueue_burst(const uint8_t dev, const uint8_t port, } static __rte_always_inline void -worker_tx_pkt(struct rte_mbuf *mbuf) +worker_tx_pkt(const uint8_t dev, const uint8_t port, struct rte_event *ev) { - exchange_mac(mbuf); - while (rte_eth_tx_burst(mbuf->port, 0, &mbuf, 1) != 1) + exchange_mac(ev->mbuf); + rte_event_eth_tx_adapter_txq_set(ev->mbuf, 0); + while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1)) rte_pause(); } @@ -64,15 +65,15 @@ worker_do_tx_single(void *arg) received++; if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { - worker_tx_pkt(ev.mbuf); + worker_tx_pkt(dev, port, &ev); tx++; - continue; + } else { + work(); + ev.queue_id++; + worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); + worker_event_enqueue(dev, port, &ev); + fwd++; } - work(); - ev.queue_id++; - worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); - worker_event_enqueue(dev, port, &ev); - fwd++; } if (!cdata.quiet) @@ -100,14 +101,14 @@ worker_do_tx_single_atq(void *arg) received++; if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { - worker_tx_pkt(ev.mbuf); + worker_tx_pkt(dev, port, &ev); tx++; - continue; + } else { + work(); + worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); + worker_event_enqueue(dev, port, &ev); + fwd++; } - work(); - worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC); - worker_event_enqueue(dev, port, &ev); - fwd++; } if (!cdata.quiet) @@ -141,7 +142,7 @@ worker_do_tx_single_burst(void *arg) rte_prefetch0(ev[i + 1].mbuf); if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { - worker_tx_pkt(ev[i].mbuf); + worker_tx_pkt(dev, port, &ev[i]); ev[i].op = RTE_EVENT_OP_RELEASE; tx++; @@ -188,7 +189,7 @@ worker_do_tx_single_burst_atq(void *arg) rte_prefetch0(ev[i + 1].mbuf); if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { - worker_tx_pkt(ev[i].mbuf); + worker_tx_pkt(dev, port, &ev[i]); ev[i].op = RTE_EVENT_OP_RELEASE; tx++; } else @@ -232,7 +233,7 @@ worker_do_tx(void *arg) if (cq_id >= lst_qid) { if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { - worker_tx_pkt(ev.mbuf); + worker_tx_pkt(dev, port, &ev); tx++; continue; } @@ -280,7 +281,7 @@ worker_do_tx_atq(void *arg) if (cq_id == lst_qid) { if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) { - worker_tx_pkt(ev.mbuf); + worker_tx_pkt(dev, port, &ev); tx++; continue; } @@ -330,7 +331,7 @@ worker_do_tx_burst(void *arg) if (cq_id >= lst_qid) { if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { - worker_tx_pkt(ev[i].mbuf); + worker_tx_pkt(dev, port, &ev[i]); tx++; ev[i].op = RTE_EVENT_OP_RELEASE; continue; @@ -387,7 +388,7 @@ worker_do_tx_burst_atq(void *arg) if (cq_id == lst_qid) { if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) { - worker_tx_pkt(ev[i].mbuf); + worker_tx_pkt(dev, port, &ev[i]); tx++; ev[i].op = RTE_EVENT_OP_RELEASE; continue; @@ -413,16 +414,14 @@ worker_do_tx_burst_atq(void *arg) } static int -setup_eventdev_worker_tx(struct cons_data *cons_data, - struct worker_data *worker_data) +setup_eventdev_worker_tx_enq(struct worker_data *worker_data) { - RTE_SET_USED(cons_data); uint8_t i; const uint8_t atq = cdata.all_type_queues ? 1 : 0; const uint8_t dev_id = 0; const uint8_t nb_ports = cdata.num_workers; uint8_t nb_slots = 0; - uint8_t nb_queues = rte_eth_dev_count(); + uint8_t nb_queues = rte_eth_dev_count_avail(); /* * In case where all type queues are not enabled, use queues equal to @@ -431,7 +430,7 @@ setup_eventdev_worker_tx(struct cons_data *cons_data, */ if (!atq) { nb_queues *= cdata.num_stages; - nb_queues += rte_eth_dev_count(); + nb_queues += rte_eth_dev_count_avail(); } struct rte_event_dev_config config = { @@ -575,10 +574,9 @@ setup_eventdev_worker_tx(struct cons_data *cons_data, } rte_service_runstate_set(fdata->evdev_service_id, 1); rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0); - if (rte_event_dev_start(dev_id) < 0) { - printf("Error starting eventdev\n"); - return -1; - } + + if (rte_event_dev_start(dev_id) < 0) + rte_exit(EXIT_FAILURE, "Error starting eventdev"); return dev_id; } @@ -602,7 +600,7 @@ service_rx_adapter(void *arg) } static void -init_rx_adapter(uint16_t nb_ports) +init_adapters(uint16_t nb_ports) { int i; int ret; @@ -613,17 +611,18 @@ init_rx_adapter(uint16_t nb_ports) ret = rte_event_dev_info_get(evdev_id, &dev_info); adptr_services = rte_zmalloc(NULL, sizeof(struct rx_adptr_services), 0); - struct rte_event_port_conf rx_p_conf = { - .dequeue_depth = 8, - .enqueue_depth = 8, - .new_event_threshold = 1200, + struct rte_event_port_conf adptr_p_conf = { + .dequeue_depth = cdata.worker_cq_depth, + .enqueue_depth = 64, + .new_event_threshold = 4096, }; - if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth) - rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth; - if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth) - rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth; - + if (adptr_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth) + adptr_p_conf.dequeue_depth = + dev_info.max_event_port_dequeue_depth; + if (adptr_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth) + adptr_p_conf.enqueue_depth = + dev_info.max_event_port_enqueue_depth; struct rte_event_eth_rx_adapter_queue_conf queue_conf; memset(&queue_conf, 0, sizeof(queue_conf)); @@ -633,11 +632,11 @@ init_rx_adapter(uint16_t nb_ports) uint32_t cap; uint32_t service_id; - ret = rte_event_eth_rx_adapter_create(i, evdev_id, &rx_p_conf); + ret = rte_event_eth_rx_adapter_create(i, evdev_id, + &adptr_p_conf); if (ret) rte_exit(EXIT_FAILURE, - "failed to create rx adapter[%d]", - cdata.rx_adapter_id); + "failed to create rx adapter[%d]", i); ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap); if (ret) @@ -654,7 +653,6 @@ init_rx_adapter(uint16_t nb_ports) rte_exit(EXIT_FAILURE, "Failed to add queues to Rx adapter"); - /* Producer needs to be scheduled. */ if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) { ret = rte_event_eth_rx_adapter_service_id_get(i, @@ -680,9 +678,29 @@ init_rx_adapter(uint16_t nb_ports) ret = rte_event_eth_rx_adapter_start(i); if (ret) rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed", - cdata.rx_adapter_id); + i); } + /* We already know that Tx adapter has INTERNAL port cap*/ + ret = rte_event_eth_tx_adapter_create(cdata.tx_adapter_id, evdev_id, + &adptr_p_conf); + if (ret) + rte_exit(EXIT_FAILURE, "failed to create tx adapter[%d]", + cdata.tx_adapter_id); + + for (i = 0; i < nb_ports; i++) { + ret = rte_event_eth_tx_adapter_queue_add(cdata.tx_adapter_id, i, + -1); + if (ret) + rte_exit(EXIT_FAILURE, + "Failed to add queues to Tx adapter"); + } + + ret = rte_event_eth_tx_adapter_start(cdata.tx_adapter_id); + if (ret) + rte_exit(EXIT_FAILURE, "Tx adapter[%d] start failed", + cdata.tx_adapter_id); + if (adptr_services->nb_rx_adptrs) { struct rte_service_spec service; @@ -695,8 +713,7 @@ init_rx_adapter(uint16_t nb_ports) &fdata->rxadptr_service_id); if (ret) rte_exit(EXIT_FAILURE, - "Rx adapter[%d] service register failed", - cdata.rx_adapter_id); + "Rx adapter service register failed"); rte_service_runstate_set(fdata->rxadptr_service_id, 1); rte_service_component_runstate_set(fdata->rxadptr_service_id, @@ -708,23 +725,19 @@ init_rx_adapter(uint16_t nb_ports) rte_free(adptr_services); } - if (!adptr_services->nb_rx_adptrs && fdata->cap.consumer == NULL && - (dev_info.event_dev_cap & + if (!adptr_services->nb_rx_adptrs && (dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)) fdata->cap.scheduler = NULL; - - if (dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED) - memset(fdata->sched_core, 0, - sizeof(unsigned int) * MAX_NUM_CORE); } static void -worker_tx_opt_check(void) +worker_tx_enq_opt_check(void) { int i; int ret; uint32_t cap = 0; uint8_t rx_needed = 0; + uint8_t sched_needed = 0; struct rte_event_dev_info eventdev_info; memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info)); @@ -734,32 +747,38 @@ worker_tx_opt_check(void) RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES)) rte_exit(EXIT_FAILURE, "Event dev doesn't support all type queues\n"); + sched_needed = !(eventdev_info.event_dev_cap & + RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED); RTE_ETH_FOREACH_DEV(i) { ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap); if (ret) rte_exit(EXIT_FAILURE, - "failed to get event rx adapter " - "capabilities"); + "failed to get event rx adapter capabilities"); rx_needed |= !(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT); } if (cdata.worker_lcore_mask == 0 || (rx_needed && cdata.rx_lcore_mask == 0) || - (cdata.sched_lcore_mask == 0 && - !(eventdev_info.event_dev_cap & - RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))) { + (sched_needed && cdata.sched_lcore_mask == 0)) { printf("Core part of pipeline was not assigned any cores. " "This will stall the pipeline, please check core masks " "(use -h for details on setting core masks):\n" - "\trx: %"PRIu64"\n\ttx: %"PRIu64"\n\tsched: %"PRIu64 - "\n\tworkers: %"PRIu64"\n", - cdata.rx_lcore_mask, cdata.tx_lcore_mask, - cdata.sched_lcore_mask, - cdata.worker_lcore_mask); + "\trx: %"PRIu64"\n\tsched: %"PRIu64 + "\n\tworkers: %"PRIu64"\n", cdata.rx_lcore_mask, + cdata.sched_lcore_mask, cdata.worker_lcore_mask); rte_exit(-1, "Fix core masks\n"); } + + if (!sched_needed) + memset(fdata->sched_core, 0, + sizeof(unsigned int) * MAX_NUM_CORE); + if (!rx_needed) + memset(fdata->rx_core, 0, + sizeof(unsigned int) * MAX_NUM_CORE); + + memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE); } static worker_loop @@ -821,18 +840,15 @@ get_worker_multi_stage(bool burst) } void -set_worker_tx_setup_data(struct setup_data *caps, bool burst) +set_worker_tx_enq_setup_data(struct setup_data *caps, bool burst) { if (cdata.num_stages == 1) caps->worker = get_worker_single_stage(burst); else caps->worker = get_worker_multi_stage(burst); - memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE); - - caps->check_opt = worker_tx_opt_check; - caps->consumer = NULL; + caps->check_opt = worker_tx_enq_opt_check; caps->scheduler = schedule_devices; - caps->evdev_setup = setup_eventdev_worker_tx; - caps->adptr_setup = init_rx_adapter; + caps->evdev_setup = setup_eventdev_worker_tx_enq; + caps->adptr_setup = init_adapters; }