ca5f4578ec39955d7737c6aa19d50a2270131746
[dpdk.git] / app / test-eventdev / test_pipeline_queue.c
1 /*
2  * SPDX-License-Identifier: BSD-3-Clause
3  * Copyright 2017 Cavium, Inc.
4  */
5
6 #include "test_pipeline_common.h"
7
8 /* See http://dpdk.org/doc/guides/tools/testeventdev.html for test details */
9
10 static __rte_always_inline int
11 pipeline_queue_nb_event_queues(struct evt_options *opt)
12 {
13         uint16_t eth_count = rte_eth_dev_count_avail();
14
15         return (eth_count * opt->nb_stages) + eth_count;
16 }
17
18 static int
19 pipeline_queue_worker_single_stage_tx(void *arg)
20 {
21         PIPELINE_WROKER_SINGLE_STAGE_INIT;
22
23         while (t->done == false) {
24                 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
25
26                 if (!event) {
27                         rte_pause();
28                         continue;
29                 }
30
31                 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
32                         pipeline_tx_pkt(ev.mbuf);
33                         w->processed_pkts++;
34                 } else {
35                         ev.queue_id++;
36                         pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
37                         pipeline_event_enqueue(dev, port, &ev);
38                 }
39         }
40
41         return 0;
42 }
43
44 static int
45 pipeline_queue_worker_single_stage_fwd(void *arg)
46 {
47         PIPELINE_WROKER_SINGLE_STAGE_INIT;
48         const uint8_t tx_queue = t->tx_service.queue_id;
49
50         while (t->done == false) {
51                 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
52
53                 if (!event) {
54                         rte_pause();
55                         continue;
56                 }
57
58                 ev.queue_id = tx_queue;
59                 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
60                 pipeline_event_enqueue(dev, port, &ev);
61                 w->processed_pkts++;
62         }
63
64         return 0;
65 }
66
67 static int
68 pipeline_queue_worker_single_stage_burst_tx(void *arg)
69 {
70         PIPELINE_WROKER_SINGLE_STAGE_BURST_INIT;
71
72         while (t->done == false) {
73                 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
74                                 BURST_SIZE, 0);
75
76                 if (!nb_rx) {
77                         rte_pause();
78                         continue;
79                 }
80
81                 for (i = 0; i < nb_rx; i++) {
82                         rte_prefetch0(ev[i + 1].mbuf);
83                         if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
84
85                                 pipeline_tx_pkt(ev[i].mbuf);
86                                 ev[i].op = RTE_EVENT_OP_RELEASE;
87                                 w->processed_pkts++;
88                         } else {
89                                 ev[i].queue_id++;
90                                 pipeline_fwd_event(&ev[i],
91                                                 RTE_SCHED_TYPE_ATOMIC);
92                         }
93                 }
94
95                 pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
96         }
97
98         return 0;
99 }
100
101 static int
102 pipeline_queue_worker_single_stage_burst_fwd(void *arg)
103 {
104         PIPELINE_WROKER_SINGLE_STAGE_BURST_INIT;
105         const uint8_t tx_queue = t->tx_service.queue_id;
106
107         while (t->done == false) {
108                 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
109                                 BURST_SIZE, 0);
110
111                 if (!nb_rx) {
112                         rte_pause();
113                         continue;
114                 }
115
116                 for (i = 0; i < nb_rx; i++) {
117                         rte_prefetch0(ev[i + 1].mbuf);
118                         ev[i].queue_id = tx_queue;
119                         pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
120                         w->processed_pkts++;
121                 }
122
123                 pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
124         }
125
126         return 0;
127 }
128
129
130 static int
131 pipeline_queue_worker_multi_stage_tx(void *arg)
132 {
133         PIPELINE_WROKER_MULTI_STAGE_INIT;
134         const uint8_t nb_stages = t->opt->nb_stages + 1;
135
136         while (t->done == false) {
137                 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
138
139                 if (!event) {
140                         rte_pause();
141                         continue;
142                 }
143
144                 cq_id = ev.queue_id % nb_stages;
145
146                 if (cq_id >= last_queue) {
147                         if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
148
149                                 pipeline_tx_pkt(ev.mbuf);
150                                 w->processed_pkts++;
151                                 continue;
152                         }
153                         ev.queue_id += (cq_id == last_queue) ? 1 : 0;
154                         pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
155                 } else {
156                         ev.queue_id++;
157                         pipeline_fwd_event(&ev, sched_type_list[cq_id]);
158                 }
159
160                 pipeline_event_enqueue(dev, port, &ev);
161         }
162         return 0;
163 }
164
165 static int
166 pipeline_queue_worker_multi_stage_fwd(void *arg)
167 {
168         PIPELINE_WROKER_MULTI_STAGE_INIT;
169         const uint8_t nb_stages = t->opt->nb_stages + 1;
170         const uint8_t tx_queue = t->tx_service.queue_id;
171
172         while (t->done == false) {
173                 uint16_t event = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
174
175                 if (!event) {
176                         rte_pause();
177                         continue;
178                 }
179
180                 cq_id = ev.queue_id % nb_stages;
181
182                 if (cq_id == last_queue) {
183                         ev.queue_id = tx_queue;
184                         pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
185                         w->processed_pkts++;
186                 } else {
187                         ev.queue_id++;
188                         pipeline_fwd_event(&ev, sched_type_list[cq_id]);
189                 }
190
191                 pipeline_event_enqueue(dev, port, &ev);
192         }
193         return 0;
194 }
195
196 static int
197 pipeline_queue_worker_multi_stage_burst_tx(void *arg)
198 {
199         PIPELINE_WROKER_MULTI_STAGE_BURST_INIT;
200         const uint8_t nb_stages = t->opt->nb_stages + 1;
201
202         while (t->done == false) {
203                 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
204                                 BURST_SIZE, 0);
205
206                 if (!nb_rx) {
207                         rte_pause();
208                         continue;
209                 }
210
211                 for (i = 0; i < nb_rx; i++) {
212                         rte_prefetch0(ev[i + 1].mbuf);
213                         cq_id = ev[i].queue_id % nb_stages;
214
215                         if (cq_id >= last_queue) {
216                                 if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
217
218                                         pipeline_tx_pkt(ev[i].mbuf);
219                                         ev[i].op = RTE_EVENT_OP_RELEASE;
220                                         w->processed_pkts++;
221                                         continue;
222                                 }
223
224                                 ev[i].queue_id += (cq_id == last_queue) ? 1 : 0;
225                                 pipeline_fwd_event(&ev[i],
226                                                 RTE_SCHED_TYPE_ATOMIC);
227                         } else {
228                                 ev[i].queue_id++;
229                                 pipeline_fwd_event(&ev[i],
230                                                 sched_type_list[cq_id]);
231                         }
232
233                 }
234
235                 pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
236         }
237         return 0;
238 }
239
240 static int
241 pipeline_queue_worker_multi_stage_burst_fwd(void *arg)
242 {
243         PIPELINE_WROKER_MULTI_STAGE_BURST_INIT;
244         const uint8_t nb_stages = t->opt->nb_stages + 1;
245         const uint8_t tx_queue = t->tx_service.queue_id;
246
247         while (t->done == false) {
248                 uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
249                                 BURST_SIZE, 0);
250
251                 if (!nb_rx) {
252                         rte_pause();
253                         continue;
254                 }
255
256                 for (i = 0; i < nb_rx; i++) {
257                         rte_prefetch0(ev[i + 1].mbuf);
258                         cq_id = ev[i].queue_id % nb_stages;
259
260                         if (cq_id == last_queue) {
261                                 ev[i].queue_id = tx_queue;
262                                 pipeline_fwd_event(&ev[i],
263                                                 RTE_SCHED_TYPE_ATOMIC);
264                                 w->processed_pkts++;
265                         } else {
266                                 ev[i].queue_id++;
267                                 pipeline_fwd_event(&ev[i],
268                                                 sched_type_list[cq_id]);
269                         }
270                 }
271
272                 pipeline_event_enqueue_burst(dev, port, ev, nb_rx);
273         }
274         return 0;
275 }
276
277 static int
278 worker_wrapper(void *arg)
279 {
280         struct worker_data *w  = arg;
281         struct evt_options *opt = w->t->opt;
282         const bool burst = evt_has_burst_mode(w->dev_id);
283         const bool mt_safe = !w->t->mt_unsafe;
284         const uint8_t nb_stages = opt->nb_stages;
285         RTE_SET_USED(opt);
286
287         if (nb_stages == 1) {
288                 if (!burst && mt_safe)
289                         return pipeline_queue_worker_single_stage_tx(arg);
290                 else if (!burst && !mt_safe)
291                         return pipeline_queue_worker_single_stage_fwd(arg);
292                 else if (burst && mt_safe)
293                         return pipeline_queue_worker_single_stage_burst_tx(arg);
294                 else if (burst && !mt_safe)
295                         return pipeline_queue_worker_single_stage_burst_fwd(
296                                         arg);
297         } else {
298                 if (!burst && mt_safe)
299                         return pipeline_queue_worker_multi_stage_tx(arg);
300                 else if (!burst && !mt_safe)
301                         return pipeline_queue_worker_multi_stage_fwd(arg);
302                 else if (burst && mt_safe)
303                         return pipeline_queue_worker_multi_stage_burst_tx(arg);
304                 else if (burst && !mt_safe)
305                         return pipeline_queue_worker_multi_stage_burst_fwd(arg);
306
307         }
308         rte_panic("invalid worker\n");
309 }
310
311 static int
312 pipeline_queue_launch_lcores(struct evt_test *test, struct evt_options *opt)
313 {
314         struct test_pipeline *t = evt_test_priv(test);
315
316         if (t->mt_unsafe)
317                 rte_service_component_runstate_set(t->tx_service.service_id, 1);
318         return pipeline_launch_lcores(test, opt, worker_wrapper);
319 }
320
321 static int
322 pipeline_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt)
323 {
324         int ret;
325         int nb_ports;
326         int nb_queues;
327         int nb_stages = opt->nb_stages;
328         uint8_t queue;
329         struct rte_event_dev_info info;
330         struct test_pipeline *t = evt_test_priv(test);
331         uint8_t tx_evqueue_id = 0;
332         uint8_t queue_arr[RTE_EVENT_MAX_QUEUES_PER_DEV];
333         uint8_t nb_worker_queues = 0;
334
335         nb_ports = evt_nr_active_lcores(opt->wlcores);
336         nb_queues = rte_eth_dev_count_avail() * (nb_stages);
337
338         /* Extra port for Tx service. */
339         if (t->mt_unsafe) {
340                 tx_evqueue_id = nb_queues;
341                 nb_ports++;
342                 nb_queues++;
343         } else
344                 nb_queues += rte_eth_dev_count_avail();
345
346         rte_event_dev_info_get(opt->dev_id, &info);
347
348         const struct rte_event_dev_config config = {
349                         .nb_event_queues = nb_queues,
350                         .nb_event_ports = nb_ports,
351                         .nb_events_limit  = info.max_num_events,
352                         .nb_event_queue_flows = opt->nb_flows,
353                         .nb_event_port_dequeue_depth =
354                                 info.max_event_port_dequeue_depth,
355                         .nb_event_port_enqueue_depth =
356                                 info.max_event_port_enqueue_depth,
357         };
358         ret = rte_event_dev_configure(opt->dev_id, &config);
359         if (ret) {
360                 evt_err("failed to configure eventdev %d", opt->dev_id);
361                 return ret;
362         }
363
364         struct rte_event_queue_conf q_conf = {
365                         .priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
366                         .nb_atomic_flows = opt->nb_flows,
367                         .nb_atomic_order_sequences = opt->nb_flows,
368         };
369         /* queue configurations */
370         for (queue = 0; queue < nb_queues; queue++) {
371                 uint8_t slot;
372
373                 if (!t->mt_unsafe) {
374                         slot = queue % (nb_stages + 1);
375                         q_conf.schedule_type = slot == nb_stages ?
376                                 RTE_SCHED_TYPE_ATOMIC :
377                                 opt->sched_type_list[slot];
378                 } else {
379                         slot = queue % nb_stages;
380
381                         if (queue == tx_evqueue_id) {
382                                 q_conf.schedule_type = RTE_SCHED_TYPE_ATOMIC;
383                                 q_conf.event_queue_cfg =
384                                         RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
385                         } else {
386                                 q_conf.schedule_type =
387                                         opt->sched_type_list[slot];
388                                 queue_arr[nb_worker_queues] = queue;
389                                 nb_worker_queues++;
390                         }
391                 }
392
393                 ret = rte_event_queue_setup(opt->dev_id, queue, &q_conf);
394                 if (ret) {
395                         evt_err("failed to setup queue=%d", queue);
396                         return ret;
397                 }
398         }
399
400         if (opt->wkr_deq_dep > info.max_event_port_dequeue_depth)
401                 opt->wkr_deq_dep = info.max_event_port_dequeue_depth;
402
403         /* port configuration */
404         const struct rte_event_port_conf p_conf = {
405                         .dequeue_depth = opt->wkr_deq_dep,
406                         .enqueue_depth = info.max_event_port_dequeue_depth,
407                         .new_event_threshold = info.max_num_events,
408         };
409
410         /*
411          * If tx is multi thread safe then allow workers to do Tx else use Tx
412          * service to Tx packets.
413          */
414         if (t->mt_unsafe) {
415                 ret = pipeline_event_port_setup(test, opt, queue_arr,
416                                 nb_worker_queues, p_conf);
417                 if (ret)
418                         return ret;
419
420                 ret = pipeline_event_tx_service_setup(test, opt, tx_evqueue_id,
421                                 nb_ports - 1, p_conf);
422
423         } else
424                 ret = pipeline_event_port_setup(test, opt, NULL, nb_queues,
425                                 p_conf);
426
427         if (ret)
428                 return ret;
429         /*
430          * The pipelines are setup in the following manner:
431          *
432          * eth_dev_count = 2, nb_stages = 2.
433          *
434          * Multi thread safe :
435          *      queues = 6
436          *      stride = 3
437          *
438          *      event queue pipelines:
439          *      eth0 -> q0 -> q1 -> (q2->tx)
440          *      eth1 -> q3 -> q4 -> (q5->tx)
441          *
442          *      q2, q5 configured as ATOMIC
443          *
444          * Multi thread unsafe :
445          *      queues = 5
446          *      stride = 2
447          *
448          *      event queue pipelines:
449          *      eth0 -> q0 -> q1
450          *                      } (q4->tx) Tx service
451          *      eth1 -> q2 -> q3
452          *
453          *      q4 configured as SINGLE_LINK|ATOMIC
454          */
455         ret = pipeline_event_rx_adapter_setup(opt,
456                         t->mt_unsafe ? nb_stages : nb_stages + 1, p_conf);
457         if (ret)
458                 return ret;
459
460         if (!evt_has_distributed_sched(opt->dev_id)) {
461                 uint32_t service_id;
462                 rte_event_dev_service_id_get(opt->dev_id, &service_id);
463                 ret = evt_service_setup(service_id);
464                 if (ret) {
465                         evt_err("No service lcore found to run event dev.");
466                         return ret;
467                 }
468         }
469
470         ret = rte_event_dev_start(opt->dev_id);
471         if (ret) {
472                 evt_err("failed to start eventdev %d", opt->dev_id);
473                 return ret;
474         }
475
476         return 0;
477 }
478
479 static void
480 pipeline_queue_opt_dump(struct evt_options *opt)
481 {
482         pipeline_opt_dump(opt, pipeline_queue_nb_event_queues(opt));
483 }
484
485 static int
486 pipeline_queue_opt_check(struct evt_options *opt)
487 {
488         return pipeline_opt_check(opt, pipeline_queue_nb_event_queues(opt));
489 }
490
491 static bool
492 pipeline_queue_capability_check(struct evt_options *opt)
493 {
494         struct rte_event_dev_info dev_info;
495
496         rte_event_dev_info_get(opt->dev_id, &dev_info);
497         if (dev_info.max_event_queues < pipeline_queue_nb_event_queues(opt) ||
498                         dev_info.max_event_ports <
499                         evt_nr_active_lcores(opt->wlcores)) {
500                 evt_err("not enough eventdev queues=%d/%d or ports=%d/%d",
501                         pipeline_queue_nb_event_queues(opt),
502                         dev_info.max_event_queues,
503                         evt_nr_active_lcores(opt->wlcores),
504                         dev_info.max_event_ports);
505         }
506
507         return true;
508 }
509
510 static const struct evt_test_ops pipeline_queue =  {
511         .cap_check          = pipeline_queue_capability_check,
512         .opt_check          = pipeline_queue_opt_check,
513         .opt_dump           = pipeline_queue_opt_dump,
514         .test_setup         = pipeline_test_setup,
515         .mempool_setup      = pipeline_mempool_setup,
516         .ethdev_setup       = pipeline_ethdev_setup,
517         .eventdev_setup     = pipeline_queue_eventdev_setup,
518         .launch_lcores      = pipeline_queue_launch_lcores,
519         .eventdev_destroy   = pipeline_eventdev_destroy,
520         .mempool_destroy    = pipeline_mempool_destroy,
521         .ethdev_destroy     = pipeline_ethdev_destroy,
522         .test_result        = pipeline_test_result,
523         .test_destroy       = pipeline_test_destroy,
524 };
525
526 EVT_TEST_REGISTER(pipeline_queue);