2 * SPDX-License-Identifier: BSD-3-Clause
3 * Copyright 2017 Cavium, Inc.
6 #include "test_pipeline_common.h"
8 /* See http://doc.dpdk.org/guides/tools/testeventdev.html for test details */
10 static __rte_always_inline int
11 pipeline_atq_nb_event_queues(struct evt_options *opt)
15 return rte_eth_dev_count_avail();
18 typedef int (*pipeline_atq_worker_t)(void *arg);
20 static __rte_noinline int
21 pipeline_atq_worker_single_stage_tx(void *arg)
23 PIPELINE_WORKER_SINGLE_STAGE_INIT;
24 uint8_t enq = 0, deq = 0;
26 while (t->done == false) {
27 deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
34 deq = pipeline_event_tx(dev, port, &ev, t);
37 pipeline_worker_cleanup(dev, port, &ev, enq, deq);
42 static __rte_noinline int
43 pipeline_atq_worker_single_stage_fwd(void *arg)
45 PIPELINE_WORKER_SINGLE_STAGE_INIT;
46 const uint8_t *tx_queue = t->tx_evqueue_id;
47 uint8_t enq = 0, deq = 0;
49 while (t->done == false) {
50 deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
57 ev.queue_id = tx_queue[ev.mbuf->port];
58 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
59 enq = pipeline_event_enqueue(dev, port, &ev, t);
62 pipeline_worker_cleanup(dev, port, &ev, enq, deq);
67 static __rte_noinline int
68 pipeline_atq_worker_single_stage_burst_tx(void *arg)
70 PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
71 uint16_t nb_rx = 0, nb_tx = 0;
73 while (t->done == false) {
74 nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
81 for (i = 0; i < nb_rx; i++) {
82 rte_prefetch0(ev[i + 1].mbuf);
83 rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
86 nb_tx = pipeline_event_tx_burst(dev, port, ev, nb_rx, t);
87 w->processed_pkts += nb_tx;
89 pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
94 static __rte_noinline int
95 pipeline_atq_worker_single_stage_burst_fwd(void *arg)
97 PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
98 const uint8_t *tx_queue = t->tx_evqueue_id;
99 uint16_t nb_rx = 0, nb_tx = 0;
101 while (t->done == false) {
102 nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
109 for (i = 0; i < nb_rx; i++) {
110 rte_prefetch0(ev[i + 1].mbuf);
111 rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
112 ev[i].queue_id = tx_queue[ev[i].mbuf->port];
113 pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
116 nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
117 w->processed_pkts += nb_tx;
119 pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
124 static __rte_noinline int
125 pipeline_atq_worker_single_stage_tx_vector(void *arg)
127 PIPELINE_WORKER_SINGLE_STAGE_INIT;
128 uint8_t enq = 0, deq = 0;
132 deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
138 vector_sz = ev.vec->nb_elem;
139 enq = pipeline_event_tx_vector(dev, port, &ev, t);
140 w->processed_pkts += vector_sz;
142 pipeline_worker_cleanup(dev, port, &ev, enq, deq);
147 static __rte_noinline int
148 pipeline_atq_worker_single_stage_fwd_vector(void *arg)
150 PIPELINE_WORKER_SINGLE_STAGE_INIT;
151 const uint8_t *tx_queue = t->tx_evqueue_id;
152 uint8_t enq = 0, deq = 0;
156 deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
163 vector_sz = ev.vec->nb_elem;
164 ev.queue_id = tx_queue[ev.vec->port];
166 pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
167 enq = pipeline_event_enqueue(dev, port, &ev, t);
168 w->processed_pkts += vector_sz;
170 pipeline_worker_cleanup(dev, port, &ev, enq, deq);
175 static __rte_noinline int
176 pipeline_atq_worker_single_stage_burst_tx_vector(void *arg)
178 PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
179 uint16_t nb_rx = 0, nb_tx = 0;
183 nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
190 for (i = 0; i < nb_rx; i++) {
191 vector_sz += ev[i].vec->nb_elem;
192 ev[i].vec->queue = 0;
195 nb_tx = pipeline_event_tx_burst(dev, port, ev, nb_rx, t);
196 w->processed_pkts += vector_sz;
198 pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
203 static __rte_noinline int
204 pipeline_atq_worker_single_stage_burst_fwd_vector(void *arg)
206 PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
207 const uint8_t *tx_queue = t->tx_evqueue_id;
208 uint16_t nb_rx = 0, nb_tx = 0;
212 nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
220 for (i = 0; i < nb_rx; i++) {
221 ev[i].queue_id = tx_queue[ev[i].vec->port];
222 ev[i].vec->queue = 0;
223 vector_sz += ev[i].vec->nb_elem;
224 pipeline_fwd_event_vector(&ev[i],
225 RTE_SCHED_TYPE_ATOMIC);
228 nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
229 w->processed_pkts += vector_sz;
231 pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
236 static __rte_noinline int
237 pipeline_atq_worker_multi_stage_tx(void *arg)
239 PIPELINE_WORKER_MULTI_STAGE_INIT;
240 uint8_t enq = 0, deq = 0;
242 while (t->done == false) {
243 deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
250 cq_id = ev.sub_event_type % nb_stages;
252 if (cq_id == last_queue) {
253 enq = pipeline_event_tx(dev, port, &ev, t);
259 pipeline_fwd_event(&ev, sched_type_list[cq_id]);
260 enq = pipeline_event_enqueue(dev, port, &ev, t);
262 pipeline_worker_cleanup(dev, port, &ev, enq, deq);
267 static __rte_noinline int
268 pipeline_atq_worker_multi_stage_fwd(void *arg)
270 PIPELINE_WORKER_MULTI_STAGE_INIT;
271 const uint8_t *tx_queue = t->tx_evqueue_id;
272 uint8_t enq = 0, deq = 0;
274 while (t->done == false) {
275 deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
282 cq_id = ev.sub_event_type % nb_stages;
284 if (cq_id == last_queue) {
285 ev.queue_id = tx_queue[ev.mbuf->port];
286 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
290 pipeline_fwd_event(&ev, sched_type_list[cq_id]);
293 enq = pipeline_event_enqueue(dev, port, &ev, t);
295 pipeline_worker_cleanup(dev, port, &ev, enq, deq);
300 static __rte_noinline int
301 pipeline_atq_worker_multi_stage_burst_tx(void *arg)
303 PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
304 uint16_t nb_rx = 0, nb_tx = 0;
306 while (t->done == false) {
307 nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
314 for (i = 0; i < nb_rx; i++) {
315 rte_prefetch0(ev[i + 1].mbuf);
316 cq_id = ev[i].sub_event_type % nb_stages;
318 if (cq_id == last_queue) {
319 pipeline_event_tx(dev, port, &ev[i], t);
320 ev[i].op = RTE_EVENT_OP_RELEASE;
325 ev[i].sub_event_type++;
326 pipeline_fwd_event(&ev[i], sched_type_list[cq_id]);
329 nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
331 pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
336 static __rte_noinline int
337 pipeline_atq_worker_multi_stage_burst_fwd(void *arg)
339 PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
340 const uint8_t *tx_queue = t->tx_evqueue_id;
341 uint16_t nb_rx = 0, nb_tx = 0;
343 while (t->done == false) {
344 nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
351 for (i = 0; i < nb_rx; i++) {
352 rte_prefetch0(ev[i + 1].mbuf);
353 cq_id = ev[i].sub_event_type % nb_stages;
355 if (cq_id == last_queue) {
357 ev[i].queue_id = tx_queue[ev[i].mbuf->port];
358 pipeline_fwd_event(&ev[i],
359 RTE_SCHED_TYPE_ATOMIC);
361 ev[i].sub_event_type++;
362 pipeline_fwd_event(&ev[i],
363 sched_type_list[cq_id]);
367 nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
369 pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
374 static __rte_noinline int
375 pipeline_atq_worker_multi_stage_tx_vector(void *arg)
377 PIPELINE_WORKER_MULTI_STAGE_INIT;
378 uint8_t enq = 0, deq = 0;
382 deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
389 cq_id = ev.sub_event_type % nb_stages;
391 if (cq_id == last_queue) {
392 vector_sz = ev.vec->nb_elem;
393 enq = pipeline_event_tx_vector(dev, port, &ev, t);
394 w->processed_pkts += vector_sz;
399 pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
400 enq = pipeline_event_enqueue(dev, port, &ev, t);
402 pipeline_worker_cleanup(dev, port, &ev, enq, deq);
407 static __rte_noinline int
408 pipeline_atq_worker_multi_stage_fwd_vector(void *arg)
410 PIPELINE_WORKER_MULTI_STAGE_INIT;
411 const uint8_t *tx_queue = t->tx_evqueue_id;
412 uint8_t enq = 0, deq = 0;
416 deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
423 cq_id = ev.sub_event_type % nb_stages;
425 if (cq_id == last_queue) {
426 ev.queue_id = tx_queue[ev.vec->port];
428 vector_sz = ev.vec->nb_elem;
429 pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
430 enq = pipeline_event_enqueue(dev, port, &ev, t);
431 w->processed_pkts += vector_sz;
434 pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
435 enq = pipeline_event_enqueue(dev, port, &ev, t);
438 pipeline_worker_cleanup(dev, port, &ev, enq, deq);
443 static __rte_noinline int
444 pipeline_atq_worker_multi_stage_burst_tx_vector(void *arg)
446 PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
447 uint16_t nb_rx = 0, nb_tx = 0;
451 nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
458 for (i = 0; i < nb_rx; i++) {
459 cq_id = ev[i].sub_event_type % nb_stages;
461 if (cq_id == last_queue) {
462 vector_sz = ev[i].vec->nb_elem;
463 pipeline_event_tx_vector(dev, port, &ev[i], t);
464 ev[i].op = RTE_EVENT_OP_RELEASE;
465 w->processed_pkts += vector_sz;
469 ev[i].sub_event_type++;
470 pipeline_fwd_event_vector(&ev[i],
471 sched_type_list[cq_id]);
474 nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
476 pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
481 static __rte_noinline int
482 pipeline_atq_worker_multi_stage_burst_fwd_vector(void *arg)
484 PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
485 const uint8_t *tx_queue = t->tx_evqueue_id;
486 uint16_t nb_rx = 0, nb_tx = 0;
490 nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
497 for (i = 0; i < nb_rx; i++) {
498 cq_id = ev[i].sub_event_type % nb_stages;
500 if (cq_id == last_queue) {
501 vector_sz = ev[i].vec->nb_elem;
502 ev[i].queue_id = tx_queue[ev[i].vec->port];
503 ev[i].vec->queue = 0;
504 pipeline_fwd_event_vector(
505 &ev[i], RTE_SCHED_TYPE_ATOMIC);
506 w->processed_pkts += vector_sz;
508 ev[i].sub_event_type++;
509 pipeline_fwd_event_vector(
510 &ev[i], sched_type_list[cq_id]);
514 nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
516 pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
522 worker_wrapper(void *arg)
524 struct worker_data *w = arg;
525 struct evt_options *opt = w->t->opt;
526 const bool burst = evt_has_burst_mode(w->dev_id);
527 const bool internal_port = w->t->internal_port;
528 const uint8_t nb_stages = opt->nb_stages;
529 /*vector/burst/internal_port*/
530 const pipeline_atq_worker_t
531 pipeline_atq_worker_single_stage[2][2][2] = {
532 [0][0][0] = pipeline_atq_worker_single_stage_fwd,
533 [0][0][1] = pipeline_atq_worker_single_stage_tx,
534 [0][1][0] = pipeline_atq_worker_single_stage_burst_fwd,
535 [0][1][1] = pipeline_atq_worker_single_stage_burst_tx,
536 [1][0][0] = pipeline_atq_worker_single_stage_fwd_vector,
537 [1][0][1] = pipeline_atq_worker_single_stage_tx_vector,
538 [1][1][0] = pipeline_atq_worker_single_stage_burst_fwd_vector,
539 [1][1][1] = pipeline_atq_worker_single_stage_burst_tx_vector,
541 const pipeline_atq_worker_t
542 pipeline_atq_worker_multi_stage[2][2][2] = {
543 [0][0][0] = pipeline_atq_worker_multi_stage_fwd,
544 [0][0][1] = pipeline_atq_worker_multi_stage_tx,
545 [0][1][0] = pipeline_atq_worker_multi_stage_burst_fwd,
546 [0][1][1] = pipeline_atq_worker_multi_stage_burst_tx,
547 [1][0][0] = pipeline_atq_worker_multi_stage_fwd_vector,
548 [1][0][1] = pipeline_atq_worker_multi_stage_tx_vector,
549 [1][1][0] = pipeline_atq_worker_multi_stage_burst_fwd_vector,
550 [1][1][1] = pipeline_atq_worker_multi_stage_burst_tx_vector,
554 return (pipeline_atq_worker_single_stage[opt->ena_vector][burst]
555 [internal_port])(arg);
557 return (pipeline_atq_worker_multi_stage[opt->ena_vector][burst]
558 [internal_port])(arg);
560 rte_panic("invalid worker\n");
564 pipeline_atq_launch_lcores(struct evt_test *test, struct evt_options *opt)
566 return pipeline_launch_lcores(test, opt, worker_wrapper);
570 pipeline_atq_eventdev_setup(struct evt_test *test, struct evt_options *opt)
575 uint8_t queue, is_prod;
576 uint8_t tx_evqueue_id[RTE_MAX_ETHPORTS];
577 uint8_t queue_arr[RTE_EVENT_MAX_QUEUES_PER_DEV];
578 uint8_t nb_worker_queues = 0;
579 uint8_t tx_evport_id = 0;
581 struct rte_event_dev_info info;
582 struct test_pipeline *t = evt_test_priv(test);
584 nb_ports = evt_nr_active_lcores(opt->wlcores);
585 nb_queues = rte_eth_dev_count_avail();
587 memset(tx_evqueue_id, 0, sizeof(uint8_t) * RTE_MAX_ETHPORTS);
588 memset(queue_arr, 0, sizeof(uint8_t) * RTE_EVENT_MAX_QUEUES_PER_DEV);
589 /* One queue for Tx adapter per port */
590 if (!t->internal_port) {
591 RTE_ETH_FOREACH_DEV(prod) {
592 tx_evqueue_id[prod] = nb_queues;
597 rte_event_dev_info_get(opt->dev_id, &info);
599 ret = evt_configure_eventdev(opt, nb_queues, nb_ports);
601 evt_err("failed to configure eventdev %d", opt->dev_id);
605 struct rte_event_queue_conf q_conf = {
606 .priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
607 .nb_atomic_flows = opt->nb_flows,
608 .nb_atomic_order_sequences = opt->nb_flows,
610 /* queue configurations */
611 for (queue = 0; queue < nb_queues; queue++) {
612 q_conf.event_queue_cfg = RTE_EVENT_QUEUE_CFG_ALL_TYPES;
614 if (!t->internal_port) {
616 RTE_ETH_FOREACH_DEV(prod) {
617 if (queue == tx_evqueue_id[prod]) {
618 q_conf.event_queue_cfg =
619 RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
625 queue_arr[nb_worker_queues] = queue;
630 ret = rte_event_queue_setup(opt->dev_id, queue, &q_conf);
632 evt_err("failed to setup queue=%d", queue);
637 if (opt->wkr_deq_dep > info.max_event_port_dequeue_depth)
638 opt->wkr_deq_dep = info.max_event_port_dequeue_depth;
640 /* port configuration */
641 const struct rte_event_port_conf p_conf = {
642 .dequeue_depth = opt->wkr_deq_dep,
643 .enqueue_depth = info.max_event_port_dequeue_depth,
644 .new_event_threshold = info.max_num_events,
647 if (!t->internal_port)
648 ret = pipeline_event_port_setup(test, opt, queue_arr,
649 nb_worker_queues, p_conf);
651 ret = pipeline_event_port_setup(test, opt, NULL, nb_queues,
658 * The pipelines are setup in the following manner:
660 * eth_dev_count = 2, nb_stages = 2, atq mode
662 * eth0, eth1 have Internal port capability :
666 * event queue pipelines:
670 * q0, q1 are configured as ATQ so, all the different stages can
671 * be enqueued on the same queue.
673 * eth0, eth1 use Tx adapters service core :
677 * event queue pipelines:
678 * eth0 -> q0 -> q2 -> Tx
679 * eth1 -> q1 -> q3 -> Tx
681 * q0, q1 are configured as stated above.
682 * q2, q3 configured as SINGLE_LINK.
684 ret = pipeline_event_rx_adapter_setup(opt, 1, p_conf);
687 ret = pipeline_event_tx_adapter_setup(opt, p_conf);
691 if (!evt_has_distributed_sched(opt->dev_id)) {
693 rte_event_dev_service_id_get(opt->dev_id, &service_id);
694 ret = evt_service_setup(service_id);
696 evt_err("No service lcore found to run event dev.");
701 /* Connect the tx_evqueue_id to the Tx adapter port */
702 if (!t->internal_port) {
703 RTE_ETH_FOREACH_DEV(prod) {
704 ret = rte_event_eth_tx_adapter_event_port_get(prod,
707 evt_err("Unable to get Tx adapter[%d]", prod);
711 if (rte_event_port_link(opt->dev_id, tx_evport_id,
712 &tx_evqueue_id[prod],
714 evt_err("Unable to link Tx adptr[%d] evprt[%d]",
721 ret = rte_event_dev_start(opt->dev_id);
723 evt_err("failed to start eventdev %d", opt->dev_id);
728 RTE_ETH_FOREACH_DEV(prod) {
729 ret = rte_eth_dev_start(prod);
731 evt_err("Ethernet dev [%d] failed to start."
732 " Using synthetic producer", prod);
737 RTE_ETH_FOREACH_DEV(prod) {
738 ret = rte_event_eth_rx_adapter_start(prod);
740 evt_err("Rx adapter[%d] start failed", prod);
744 ret = rte_event_eth_tx_adapter_start(prod);
746 evt_err("Tx adapter[%d] start failed", prod);
751 memcpy(t->tx_evqueue_id, tx_evqueue_id, sizeof(uint8_t) *
758 pipeline_atq_opt_dump(struct evt_options *opt)
760 pipeline_opt_dump(opt, pipeline_atq_nb_event_queues(opt));
764 pipeline_atq_opt_check(struct evt_options *opt)
766 return pipeline_opt_check(opt, pipeline_atq_nb_event_queues(opt));
770 pipeline_atq_capability_check(struct evt_options *opt)
772 struct rte_event_dev_info dev_info;
774 rte_event_dev_info_get(opt->dev_id, &dev_info);
775 if (dev_info.max_event_queues < pipeline_atq_nb_event_queues(opt) ||
776 dev_info.max_event_ports <
777 evt_nr_active_lcores(opt->wlcores)) {
778 evt_err("not enough eventdev queues=%d/%d or ports=%d/%d",
779 pipeline_atq_nb_event_queues(opt),
780 dev_info.max_event_queues,
781 evt_nr_active_lcores(opt->wlcores),
782 dev_info.max_event_ports);
784 if (!evt_has_all_types_queue(opt->dev_id))
790 static const struct evt_test_ops pipeline_atq = {
791 .cap_check = pipeline_atq_capability_check,
792 .opt_check = pipeline_atq_opt_check,
793 .opt_dump = pipeline_atq_opt_dump,
794 .test_setup = pipeline_test_setup,
795 .mempool_setup = pipeline_mempool_setup,
796 .ethdev_setup = pipeline_ethdev_setup,
797 .eventdev_setup = pipeline_atq_eventdev_setup,
798 .launch_lcores = pipeline_atq_launch_lcores,
799 .ethdev_rx_stop = pipeline_ethdev_rx_stop,
800 .eventdev_destroy = pipeline_eventdev_destroy,
801 .mempool_destroy = pipeline_mempool_destroy,
802 .ethdev_destroy = pipeline_ethdev_destroy,
803 .test_result = pipeline_test_result,
804 .test_destroy = pipeline_test_destroy,
807 EVT_TEST_REGISTER(pipeline_atq);