2 * SPDX-License-Identifier: BSD-3-Clause
3 * Copyright 2017 Cavium, Inc.
6 #include "test_pipeline_common.h"
8 /* See http://doc.dpdk.org/guides/tools/testeventdev.html for test details */
10 static __rte_always_inline int
11 pipeline_atq_nb_event_queues(struct evt_options *opt)
15 return rte_eth_dev_count_avail();
18 typedef int (*pipeline_atq_worker_t)(void *arg);
20 static __rte_noinline int
21 pipeline_atq_worker_single_stage_tx(void *arg)
23 PIPELINE_WORKER_SINGLE_STAGE_INIT;
25 while (t->done == false) {
26 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
33 pipeline_event_tx(dev, port, &ev);
40 static __rte_noinline int
41 pipeline_atq_worker_single_stage_fwd(void *arg)
43 PIPELINE_WORKER_SINGLE_STAGE_INIT;
44 const uint8_t *tx_queue = t->tx_evqueue_id;
46 while (t->done == false) {
47 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
54 ev.queue_id = tx_queue[ev.mbuf->port];
55 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
56 pipeline_event_enqueue(dev, port, &ev);
63 static __rte_noinline int
64 pipeline_atq_worker_single_stage_burst_tx(void *arg)
66 PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
68 while (t->done == false) {
69 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
77 for (i = 0; i < nb_rx; i++) {
78 rte_prefetch0(ev[i + 1].mbuf);
79 rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
82 pipeline_event_tx_burst(dev, port, ev, nb_rx);
83 w->processed_pkts += nb_rx;
89 static __rte_noinline int
90 pipeline_atq_worker_single_stage_burst_fwd(void *arg)
92 PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
93 const uint8_t *tx_queue = t->tx_evqueue_id;
95 while (t->done == false) {
96 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
104 for (i = 0; i < nb_rx; i++) {
105 rte_prefetch0(ev[i + 1].mbuf);
106 rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
107 ev[i].queue_id = tx_queue[ev[i].mbuf->port];
108 pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
111 pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
112 w->processed_pkts += nb_rx;
118 static __rte_noinline int
119 pipeline_atq_worker_single_stage_tx_vector(void *arg)
121 PIPELINE_WORKER_SINGLE_STAGE_INIT;
125 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
131 vector_sz = ev.vec->nb_elem;
132 pipeline_event_tx_vector(dev, port, &ev);
133 w->processed_pkts += vector_sz;
139 static __rte_noinline int
140 pipeline_atq_worker_single_stage_fwd_vector(void *arg)
142 PIPELINE_WORKER_SINGLE_STAGE_INIT;
143 const uint8_t *tx_queue = t->tx_evqueue_id;
147 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
154 vector_sz = ev.vec->nb_elem;
155 ev.queue_id = tx_queue[ev.vec->port];
157 pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
158 pipeline_event_enqueue(dev, port, &ev);
159 w->processed_pkts += vector_sz;
165 static __rte_noinline int
166 pipeline_atq_worker_single_stage_burst_tx_vector(void *arg)
168 PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
173 rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
180 for (i = 0; i < nb_rx; i++) {
181 vector_sz += ev[i].vec->nb_elem;
182 ev[i].vec->queue = 0;
185 pipeline_event_tx_burst(dev, port, ev, nb_rx);
186 w->processed_pkts += vector_sz;
192 static __rte_noinline int
193 pipeline_atq_worker_single_stage_burst_fwd_vector(void *arg)
195 PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
196 const uint8_t *tx_queue = t->tx_evqueue_id;
201 rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
209 for (i = 0; i < nb_rx; i++) {
210 ev[i].queue_id = tx_queue[ev[i].vec->port];
211 ev[i].vec->queue = 0;
212 vector_sz += ev[i].vec->nb_elem;
213 pipeline_fwd_event_vector(&ev[i],
214 RTE_SCHED_TYPE_ATOMIC);
217 pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
218 w->processed_pkts += vector_sz;
224 static __rte_noinline int
225 pipeline_atq_worker_multi_stage_tx(void *arg)
227 PIPELINE_WORKER_MULTI_STAGE_INIT;
229 while (t->done == false) {
230 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
237 cq_id = ev.sub_event_type % nb_stages;
239 if (cq_id == last_queue) {
240 pipeline_event_tx(dev, port, &ev);
246 pipeline_fwd_event(&ev, sched_type_list[cq_id]);
247 pipeline_event_enqueue(dev, port, &ev);
253 static __rte_noinline int
254 pipeline_atq_worker_multi_stage_fwd(void *arg)
256 PIPELINE_WORKER_MULTI_STAGE_INIT;
257 const uint8_t *tx_queue = t->tx_evqueue_id;
259 while (t->done == false) {
260 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
267 cq_id = ev.sub_event_type % nb_stages;
269 if (cq_id == last_queue) {
270 ev.queue_id = tx_queue[ev.mbuf->port];
271 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
275 pipeline_fwd_event(&ev, sched_type_list[cq_id]);
278 pipeline_event_enqueue(dev, port, &ev);
284 static __rte_noinline int
285 pipeline_atq_worker_multi_stage_burst_tx(void *arg)
287 PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
289 while (t->done == false) {
290 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
298 for (i = 0; i < nb_rx; i++) {
299 rte_prefetch0(ev[i + 1].mbuf);
300 cq_id = ev[i].sub_event_type % nb_stages;
302 if (cq_id == last_queue) {
303 pipeline_event_tx(dev, port, &ev[i]);
304 ev[i].op = RTE_EVENT_OP_RELEASE;
309 ev[i].sub_event_type++;
310 pipeline_fwd_event(&ev[i], sched_type_list[cq_id]);
313 pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
319 static __rte_noinline int
320 pipeline_atq_worker_multi_stage_burst_fwd(void *arg)
322 PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
323 const uint8_t *tx_queue = t->tx_evqueue_id;
325 while (t->done == false) {
326 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
334 for (i = 0; i < nb_rx; i++) {
335 rte_prefetch0(ev[i + 1].mbuf);
336 cq_id = ev[i].sub_event_type % nb_stages;
338 if (cq_id == last_queue) {
340 ev[i].queue_id = tx_queue[ev[i].mbuf->port];
341 pipeline_fwd_event(&ev[i],
342 RTE_SCHED_TYPE_ATOMIC);
344 ev[i].sub_event_type++;
345 pipeline_fwd_event(&ev[i],
346 sched_type_list[cq_id]);
350 pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
356 static __rte_noinline int
357 pipeline_atq_worker_multi_stage_tx_vector(void *arg)
359 PIPELINE_WORKER_MULTI_STAGE_INIT;
363 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
370 cq_id = ev.sub_event_type % nb_stages;
372 if (cq_id == last_queue) {
373 vector_sz = ev.vec->nb_elem;
374 pipeline_event_tx_vector(dev, port, &ev);
375 w->processed_pkts += vector_sz;
380 pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
381 pipeline_event_enqueue(dev, port, &ev);
387 static __rte_noinline int
388 pipeline_atq_worker_multi_stage_fwd_vector(void *arg)
390 PIPELINE_WORKER_MULTI_STAGE_INIT;
391 const uint8_t *tx_queue = t->tx_evqueue_id;
395 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
402 cq_id = ev.sub_event_type % nb_stages;
404 if (cq_id == last_queue) {
405 ev.queue_id = tx_queue[ev.vec->port];
407 vector_sz = ev.vec->nb_elem;
408 pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
409 pipeline_event_enqueue(dev, port, &ev);
410 w->processed_pkts += vector_sz;
413 pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
414 pipeline_event_enqueue(dev, port, &ev);
421 static __rte_noinline int
422 pipeline_atq_worker_multi_stage_burst_tx_vector(void *arg)
424 PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
429 rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
436 for (i = 0; i < nb_rx; i++) {
437 cq_id = ev[i].sub_event_type % nb_stages;
439 if (cq_id == last_queue) {
440 vector_sz = ev[i].vec->nb_elem;
441 pipeline_event_tx_vector(dev, port, &ev[i]);
442 ev[i].op = RTE_EVENT_OP_RELEASE;
443 w->processed_pkts += vector_sz;
447 ev[i].sub_event_type++;
448 pipeline_fwd_event_vector(&ev[i],
449 sched_type_list[cq_id]);
452 pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
458 static __rte_noinline int
459 pipeline_atq_worker_multi_stage_burst_fwd_vector(void *arg)
461 PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
462 const uint8_t *tx_queue = t->tx_evqueue_id;
467 rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
474 for (i = 0; i < nb_rx; i++) {
475 cq_id = ev[i].sub_event_type % nb_stages;
477 if (cq_id == last_queue) {
478 vector_sz = ev[i].vec->nb_elem;
479 ev[i].queue_id = tx_queue[ev[i].vec->port];
480 ev[i].vec->queue = 0;
481 pipeline_fwd_event_vector(
482 &ev[i], RTE_SCHED_TYPE_ATOMIC);
483 w->processed_pkts += vector_sz;
485 ev[i].sub_event_type++;
486 pipeline_fwd_event_vector(
487 &ev[i], sched_type_list[cq_id]);
491 pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
498 worker_wrapper(void *arg)
500 struct worker_data *w = arg;
501 struct evt_options *opt = w->t->opt;
502 const bool burst = evt_has_burst_mode(w->dev_id);
503 const bool internal_port = w->t->internal_port;
504 const uint8_t nb_stages = opt->nb_stages;
505 /*vector/burst/internal_port*/
506 const pipeline_atq_worker_t
507 pipeline_atq_worker_single_stage[2][2][2] = {
508 [0][0][0] = pipeline_atq_worker_single_stage_fwd,
509 [0][0][1] = pipeline_atq_worker_single_stage_tx,
510 [0][1][0] = pipeline_atq_worker_single_stage_burst_fwd,
511 [0][1][1] = pipeline_atq_worker_single_stage_burst_tx,
512 [1][0][0] = pipeline_atq_worker_single_stage_fwd_vector,
513 [1][0][1] = pipeline_atq_worker_single_stage_tx_vector,
514 [1][1][0] = pipeline_atq_worker_single_stage_burst_fwd_vector,
515 [1][1][1] = pipeline_atq_worker_single_stage_burst_tx_vector,
517 const pipeline_atq_worker_t
518 pipeline_atq_worker_multi_stage[2][2][2] = {
519 [0][0][0] = pipeline_atq_worker_multi_stage_fwd,
520 [0][0][1] = pipeline_atq_worker_multi_stage_tx,
521 [0][1][0] = pipeline_atq_worker_multi_stage_burst_fwd,
522 [0][1][1] = pipeline_atq_worker_multi_stage_burst_tx,
523 [1][0][0] = pipeline_atq_worker_multi_stage_fwd_vector,
524 [1][0][1] = pipeline_atq_worker_multi_stage_tx_vector,
525 [1][1][0] = pipeline_atq_worker_multi_stage_burst_fwd_vector,
526 [1][1][1] = pipeline_atq_worker_multi_stage_burst_tx_vector,
530 return (pipeline_atq_worker_single_stage[opt->ena_vector][burst]
531 [internal_port])(arg);
533 return (pipeline_atq_worker_multi_stage[opt->ena_vector][burst]
534 [internal_port])(arg);
536 rte_panic("invalid worker\n");
540 pipeline_atq_launch_lcores(struct evt_test *test, struct evt_options *opt)
542 return pipeline_launch_lcores(test, opt, worker_wrapper);
546 pipeline_atq_eventdev_setup(struct evt_test *test, struct evt_options *opt)
551 uint8_t queue, is_prod;
552 uint8_t tx_evqueue_id[RTE_MAX_ETHPORTS];
553 uint8_t queue_arr[RTE_EVENT_MAX_QUEUES_PER_DEV];
554 uint8_t nb_worker_queues = 0;
555 uint8_t tx_evport_id = 0;
557 struct rte_event_dev_info info;
558 struct test_pipeline *t = evt_test_priv(test);
560 nb_ports = evt_nr_active_lcores(opt->wlcores);
561 nb_queues = rte_eth_dev_count_avail();
563 memset(tx_evqueue_id, 0, sizeof(uint8_t) * RTE_MAX_ETHPORTS);
564 memset(queue_arr, 0, sizeof(uint8_t) * RTE_EVENT_MAX_QUEUES_PER_DEV);
565 /* One queue for Tx adapter per port */
566 if (!t->internal_port) {
567 RTE_ETH_FOREACH_DEV(prod) {
568 tx_evqueue_id[prod] = nb_queues;
573 rte_event_dev_info_get(opt->dev_id, &info);
575 ret = evt_configure_eventdev(opt, nb_queues, nb_ports);
577 evt_err("failed to configure eventdev %d", opt->dev_id);
581 struct rte_event_queue_conf q_conf = {
582 .priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
583 .nb_atomic_flows = opt->nb_flows,
584 .nb_atomic_order_sequences = opt->nb_flows,
586 /* queue configurations */
587 for (queue = 0; queue < nb_queues; queue++) {
588 q_conf.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ALL_TYPES;
590 if (!t->internal_port) {
592 RTE_ETH_FOREACH_DEV(prod) {
593 if (queue == tx_evqueue_id[prod]) {
594 q_conf.event_queue_cfg =
595 RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
601 queue_arr[nb_worker_queues] = queue;
606 ret = rte_event_queue_setup(opt->dev_id, queue, &q_conf);
608 evt_err("failed to setup queue=%d", queue);
613 if (opt->wkr_deq_dep > info.max_event_port_dequeue_depth)
614 opt->wkr_deq_dep = info.max_event_port_dequeue_depth;
616 /* port configuration */
617 const struct rte_event_port_conf p_conf = {
618 .dequeue_depth = opt->wkr_deq_dep,
619 .enqueue_depth = info.max_event_port_dequeue_depth,
620 .new_event_threshold = info.max_num_events,
623 if (!t->internal_port)
624 ret = pipeline_event_port_setup(test, opt, queue_arr,
625 nb_worker_queues, p_conf);
627 ret = pipeline_event_port_setup(test, opt, NULL, nb_queues,
634 * The pipelines are setup in the following manner:
636 * eth_dev_count = 2, nb_stages = 2, atq mode
638 * eth0, eth1 have Internal port capability :
642 * event queue pipelines:
646 * q0, q1 are configured as ATQ so, all the different stages can
647 * be enqueued on the same queue.
649 * eth0, eth1 use Tx adapters service core :
653 * event queue pipelines:
654 * eth0 -> q0 -> q2 -> Tx
655 * eth1 -> q1 -> q3 -> Tx
657 * q0, q1 are configured as stated above.
658 * q2, q3 configured as SINGLE_LINK.
660 ret = pipeline_event_rx_adapter_setup(opt, 1, p_conf);
663 ret = pipeline_event_tx_adapter_setup(opt, p_conf);
667 if (!evt_has_distributed_sched(opt->dev_id)) {
669 rte_event_dev_service_id_get(opt->dev_id, &service_id);
670 ret = evt_service_setup(service_id);
672 evt_err("No service lcore found to run event dev.");
677 /* Connect the tx_evqueue_id to the Tx adapter port */
678 if (!t->internal_port) {
679 RTE_ETH_FOREACH_DEV(prod) {
680 ret = rte_event_eth_tx_adapter_event_port_get(prod,
683 evt_err("Unable to get Tx adapter[%d]", prod);
687 if (rte_event_port_link(opt->dev_id, tx_evport_id,
688 &tx_evqueue_id[prod],
690 evt_err("Unable to link Tx adptr[%d] evprt[%d]",
697 ret = rte_event_dev_start(opt->dev_id);
699 evt_err("failed to start eventdev %d", opt->dev_id);
704 RTE_ETH_FOREACH_DEV(prod) {
705 ret = rte_eth_dev_start(prod);
707 evt_err("Ethernet dev [%d] failed to start."
708 " Using synthetic producer", prod);
713 RTE_ETH_FOREACH_DEV(prod) {
714 ret = rte_event_eth_rx_adapter_start(prod);
716 evt_err("Rx adapter[%d] start failed", prod);
720 ret = rte_event_eth_tx_adapter_start(prod);
722 evt_err("Tx adapter[%d] start failed", prod);
727 memcpy(t->tx_evqueue_id, tx_evqueue_id, sizeof(uint8_t) *
734 pipeline_atq_opt_dump(struct evt_options *opt)
736 pipeline_opt_dump(opt, pipeline_atq_nb_event_queues(opt));
740 pipeline_atq_opt_check(struct evt_options *opt)
742 return pipeline_opt_check(opt, pipeline_atq_nb_event_queues(opt));
746 pipeline_atq_capability_check(struct evt_options *opt)
748 struct rte_event_dev_info dev_info;
750 rte_event_dev_info_get(opt->dev_id, &dev_info);
751 if (dev_info.max_event_queues < pipeline_atq_nb_event_queues(opt) ||
752 dev_info.max_event_ports <
753 evt_nr_active_lcores(opt->wlcores)) {
754 evt_err("not enough eventdev queues=%d/%d or ports=%d/%d",
755 pipeline_atq_nb_event_queues(opt),
756 dev_info.max_event_queues,
757 evt_nr_active_lcores(opt->wlcores),
758 dev_info.max_event_ports);
760 if (!evt_has_all_types_queue(opt->dev_id))
766 static const struct evt_test_ops pipeline_atq = {
767 .cap_check = pipeline_atq_capability_check,
768 .opt_check = pipeline_atq_opt_check,
769 .opt_dump = pipeline_atq_opt_dump,
770 .test_setup = pipeline_test_setup,
771 .mempool_setup = pipeline_mempool_setup,
772 .ethdev_setup = pipeline_ethdev_setup,
773 .eventdev_setup = pipeline_atq_eventdev_setup,
774 .launch_lcores = pipeline_atq_launch_lcores,
775 .eventdev_destroy = pipeline_eventdev_destroy,
776 .mempool_destroy = pipeline_mempool_destroy,
777 .ethdev_destroy = pipeline_ethdev_destroy,
778 .test_result = pipeline_test_result,
779 .test_destroy = pipeline_test_destroy,
782 EVT_TEST_REGISTER(pipeline_atq);