2 * SPDX-License-Identifier: BSD-3-Clause
3 * Copyright 2017 Cavium, Inc.
6 #include "test_pipeline_common.h"
8 /* See http://dpdk.org/doc/guides/tools/testeventdev.html for test details */
10 static __rte_always_inline int
11 pipeline_queue_nb_event_queues(struct evt_options *opt)
13 uint16_t eth_count = rte_eth_dev_count_avail();
15 return (eth_count * opt->nb_stages) + eth_count;
19 pipeline_queue_worker_single_stage_tx(void *arg)
21 PIPELINE_WROKER_SINGLE_STAGE_INIT;
23 while (t->done == false) {
24 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
31 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
32 pipeline_tx_pkt(ev.mbuf);
36 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
37 pipeline_event_enqueue(dev, port, &ev);
45 pipeline_queue_worker_single_stage_fwd(void *arg)
47 PIPELINE_WROKER_SINGLE_STAGE_INIT;
48 const uint8_t tx_queue = t->tx_service.queue_id;
50 while (t->done == false) {
51 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
58 ev.queue_id = tx_queue;
59 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
60 pipeline_event_enqueue(dev, port, &ev);
68 pipeline_queue_worker_single_stage_burst_tx(void *arg)
70 PIPELINE_WROKER_SINGLE_STAGE_BURST_INIT;
72 while (t->done == false) {
73 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
81 for (i = 0; i < nb_rx; i++) {
82 rte_prefetch0(ev[i + 1].mbuf);
83 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
85 pipeline_tx_pkt(ev[i].mbuf);
86 ev[i].op = RTE_EVENT_OP_RELEASE;
90 pipeline_fwd_event(&ev[i],
91 RTE_SCHED_TYPE_ATOMIC);
95 pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
102 pipeline_queue_worker_single_stage_burst_fwd(void *arg)
104 PIPELINE_WROKER_SINGLE_STAGE_BURST_INIT;
105 const uint8_t tx_queue = t->tx_service.queue_id;
107 while (t->done == false) {
108 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
116 for (i = 0; i < nb_rx; i++) {
117 rte_prefetch0(ev[i + 1].mbuf);
118 ev[i].queue_id = tx_queue;
119 pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
123 pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
131 pipeline_queue_worker_multi_stage_tx(void *arg)
133 PIPELINE_WROKER_MULTI_STAGE_INIT;
134 const uint8_t nb_stages = t->opt->nb_stages + 1;
136 while (t->done == false) {
137 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
144 cq_id = ev.queue_id % nb_stages;
146 if (cq_id >= last_queue) {
147 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
149 pipeline_tx_pkt(ev.mbuf);
153 ev.queue_id += (cq_id == last_queue) ? 1 : 0;
154 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
157 pipeline_fwd_event(&ev, sched_type_list[cq_id]);
160 pipeline_event_enqueue(dev, port, &ev);
166 pipeline_queue_worker_multi_stage_fwd(void *arg)
168 PIPELINE_WROKER_MULTI_STAGE_INIT;
169 const uint8_t nb_stages = t->opt->nb_stages + 1;
170 const uint8_t tx_queue = t->tx_service.queue_id;
172 while (t->done == false) {
173 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
180 cq_id = ev.queue_id % nb_stages;
182 if (cq_id == last_queue) {
183 ev.queue_id = tx_queue;
184 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
188 pipeline_fwd_event(&ev, sched_type_list[cq_id]);
191 pipeline_event_enqueue(dev, port, &ev);
197 pipeline_queue_worker_multi_stage_burst_tx(void *arg)
199 PIPELINE_WROKER_MULTI_STAGE_BURST_INIT;
200 const uint8_t nb_stages = t->opt->nb_stages + 1;
202 while (t->done == false) {
203 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
211 for (i = 0; i < nb_rx; i++) {
212 rte_prefetch0(ev[i + 1].mbuf);
213 cq_id = ev[i].queue_id % nb_stages;
215 if (cq_id >= last_queue) {
216 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
218 pipeline_tx_pkt(ev[i].mbuf);
219 ev[i].op = RTE_EVENT_OP_RELEASE;
224 ev[i].queue_id += (cq_id == last_queue) ? 1 : 0;
225 pipeline_fwd_event(&ev[i],
226 RTE_SCHED_TYPE_ATOMIC);
229 pipeline_fwd_event(&ev[i],
230 sched_type_list[cq_id]);
235 pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
241 pipeline_queue_worker_multi_stage_burst_fwd(void *arg)
243 PIPELINE_WROKER_MULTI_STAGE_BURST_INIT;
244 const uint8_t nb_stages = t->opt->nb_stages + 1;
245 const uint8_t tx_queue = t->tx_service.queue_id;
247 while (t->done == false) {
248 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
256 for (i = 0; i < nb_rx; i++) {
257 rte_prefetch0(ev[i + 1].mbuf);
258 cq_id = ev[i].queue_id % nb_stages;
260 if (cq_id == last_queue) {
261 ev[i].queue_id = tx_queue;
262 pipeline_fwd_event(&ev[i],
263 RTE_SCHED_TYPE_ATOMIC);
267 pipeline_fwd_event(&ev[i],
268 sched_type_list[cq_id]);
272 pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
278 worker_wrapper(void *arg)
280 struct worker_data *w = arg;
281 struct evt_options *opt = w->t->opt;
282 const bool burst = evt_has_burst_mode(w->dev_id);
283 const bool mt_safe = !w->t->mt_unsafe;
284 const uint8_t nb_stages = opt->nb_stages;
287 if (nb_stages == 1) {
288 if (!burst && mt_safe)
289 return pipeline_queue_worker_single_stage_tx(arg);
290 else if (!burst && !mt_safe)
291 return pipeline_queue_worker_single_stage_fwd(arg);
292 else if (burst && mt_safe)
293 return pipeline_queue_worker_single_stage_burst_tx(arg);
294 else if (burst && !mt_safe)
295 return pipeline_queue_worker_single_stage_burst_fwd(
298 if (!burst && mt_safe)
299 return pipeline_queue_worker_multi_stage_tx(arg);
300 else if (!burst && !mt_safe)
301 return pipeline_queue_worker_multi_stage_fwd(arg);
302 else if (burst && mt_safe)
303 return pipeline_queue_worker_multi_stage_burst_tx(arg);
304 else if (burst && !mt_safe)
305 return pipeline_queue_worker_multi_stage_burst_fwd(arg);
308 rte_panic("invalid worker\n");
312 pipeline_queue_launch_lcores(struct evt_test *test, struct evt_options *opt)
314 struct test_pipeline *t = evt_test_priv(test);
317 rte_service_component_runstate_set(t->tx_service.service_id, 1);
318 return pipeline_launch_lcores(test, opt, worker_wrapper);
322 pipeline_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt)
327 int nb_stages = opt->nb_stages;
329 struct rte_event_dev_info info;
330 struct test_pipeline *t = evt_test_priv(test);
331 uint8_t tx_evqueue_id = 0;
332 uint8_t queue_arr[RTE_EVENT_MAX_QUEUES_PER_DEV];
333 uint8_t nb_worker_queues = 0;
335 nb_ports = evt_nr_active_lcores(opt->wlcores);
336 nb_queues = rte_eth_dev_count_avail() * (nb_stages);
338 /* Extra port for Tx service. */
340 tx_evqueue_id = nb_queues;
344 nb_queues += rte_eth_dev_count_avail();
346 rte_event_dev_info_get(opt->dev_id, &info);
348 const struct rte_event_dev_config config = {
349 .nb_event_queues = nb_queues,
350 .nb_event_ports = nb_ports,
351 .nb_events_limit = info.max_num_events,
352 .nb_event_queue_flows = opt->nb_flows,
353 .nb_event_port_dequeue_depth =
354 info.max_event_port_dequeue_depth,
355 .nb_event_port_enqueue_depth =
356 info.max_event_port_enqueue_depth,
358 ret = rte_event_dev_configure(opt->dev_id, &config);
360 evt_err("failed to configure eventdev %d", opt->dev_id);
364 struct rte_event_queue_conf q_conf = {
365 .priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
366 .nb_atomic_flows = opt->nb_flows,
367 .nb_atomic_order_sequences = opt->nb_flows,
369 /* queue configurations */
370 for (queue = 0; queue < nb_queues; queue++) {
374 slot = queue % (nb_stages + 1);
375 q_conf.schedule_type = slot == nb_stages ?
376 RTE_SCHED_TYPE_ATOMIC :
377 opt->sched_type_list[slot];
379 slot = queue % nb_stages;
381 if (queue == tx_evqueue_id) {
382 q_conf.schedule_type = RTE_SCHED_TYPE_ATOMIC;
383 q_conf.event_queue_cfg =
384 RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
386 q_conf.schedule_type =
387 opt->sched_type_list[slot];
388 queue_arr[nb_worker_queues] = queue;
393 ret = rte_event_queue_setup(opt->dev_id, queue, &q_conf);
395 evt_err("failed to setup queue=%d", queue);
400 if (opt->wkr_deq_dep > info.max_event_port_dequeue_depth)
401 opt->wkr_deq_dep = info.max_event_port_dequeue_depth;
403 /* port configuration */
404 const struct rte_event_port_conf p_conf = {
405 .dequeue_depth = opt->wkr_deq_dep,
406 .enqueue_depth = info.max_event_port_dequeue_depth,
407 .new_event_threshold = info.max_num_events,
411 * If tx is multi thread safe then allow workers to do Tx else use Tx
412 * service to Tx packets.
415 ret = pipeline_event_port_setup(test, opt, queue_arr,
416 nb_worker_queues, p_conf);
420 ret = pipeline_event_tx_service_setup(test, opt, tx_evqueue_id,
421 nb_ports - 1, p_conf);
424 ret = pipeline_event_port_setup(test, opt, NULL, nb_queues,
430 * The pipelines are setup in the following manner:
432 * eth_dev_count = 2, nb_stages = 2.
434 * Multi thread safe :
438 * event queue pipelines:
439 * eth0 -> q0 -> q1 -> (q2->tx)
440 * eth1 -> q3 -> q4 -> (q5->tx)
442 * q2, q5 configured as ATOMIC
444 * Multi thread unsafe :
448 * event queue pipelines:
450 * } (q4->tx) Tx service
453 * q4 configured as SINGLE_LINK|ATOMIC
455 ret = pipeline_event_rx_adapter_setup(opt,
456 t->mt_unsafe ? nb_stages : nb_stages + 1, p_conf);
460 if (!evt_has_distributed_sched(opt->dev_id)) {
462 rte_event_dev_service_id_get(opt->dev_id, &service_id);
463 ret = evt_service_setup(service_id);
465 evt_err("No service lcore found to run event dev.");
470 ret = rte_event_dev_start(opt->dev_id);
472 evt_err("failed to start eventdev %d", opt->dev_id);
480 pipeline_queue_opt_dump(struct evt_options *opt)
482 pipeline_opt_dump(opt, pipeline_queue_nb_event_queues(opt));
486 pipeline_queue_opt_check(struct evt_options *opt)
488 return pipeline_opt_check(opt, pipeline_queue_nb_event_queues(opt));
492 pipeline_queue_capability_check(struct evt_options *opt)
494 struct rte_event_dev_info dev_info;
496 rte_event_dev_info_get(opt->dev_id, &dev_info);
497 if (dev_info.max_event_queues < pipeline_queue_nb_event_queues(opt) ||
498 dev_info.max_event_ports <
499 evt_nr_active_lcores(opt->wlcores)) {
500 evt_err("not enough eventdev queues=%d/%d or ports=%d/%d",
501 pipeline_queue_nb_event_queues(opt),
502 dev_info.max_event_queues,
503 evt_nr_active_lcores(opt->wlcores),
504 dev_info.max_event_ports);
510 static const struct evt_test_ops pipeline_queue = {
511 .cap_check = pipeline_queue_capability_check,
512 .opt_check = pipeline_queue_opt_check,
513 .opt_dump = pipeline_queue_opt_dump,
514 .test_setup = pipeline_test_setup,
515 .mempool_setup = pipeline_mempool_setup,
516 .ethdev_setup = pipeline_ethdev_setup,
517 .eventdev_setup = pipeline_queue_eventdev_setup,
518 .launch_lcores = pipeline_queue_launch_lcores,
519 .eventdev_destroy = pipeline_eventdev_destroy,
520 .mempool_destroy = pipeline_mempool_destroy,
521 .ethdev_destroy = pipeline_ethdev_destroy,
522 .test_result = pipeline_test_result,
523 .test_destroy = pipeline_test_destroy,
526 EVT_TEST_REGISTER(pipeline_queue);