app/eventdev: clean up worker state before exit
[dpdk.git] / app / test-eventdev / test_pipeline_queue.c
1 /*
2  * SPDX-License-Identifier: BSD-3-Clause
3  * Copyright 2017 Cavium, Inc.
4  */
5
6 #include "test_pipeline_common.h"
7
8 /* See http://doc.dpdk.org/guides/tools/testeventdev.html for test details */
9
10 static __rte_always_inline int
11 pipeline_queue_nb_event_queues(struct evt_options *opt)
12 {
13         uint16_t eth_count = rte_eth_dev_count_avail();
14
15         return (eth_count * opt->nb_stages) + eth_count;
16 }
17
18 typedef int (*pipeline_queue_worker_t)(void *arg);
19
20 static __rte_noinline int
21 pipeline_queue_worker_single_stage_tx(void *arg)
22 {
23         PIPELINE_WORKER_SINGLE_STAGE_INIT;
24         uint8_t enq = 0, deq = 0;
25
26         while (t->done == false) {
27                 deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
28
29                 if (!deq) {
30                         rte_pause();
31                         continue;
32                 }
33
34                 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
35                         enq = pipeline_event_tx(dev, port, &ev, t);
36                         ev.op = RTE_EVENT_OP_RELEASE;
37                         w->processed_pkts++;
38                 } else {
39                         ev.queue_id++;
40                         pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
41                         enq = pipeline_event_enqueue(dev, port, &ev, t);
42                 }
43         }
44         pipeline_worker_cleanup(dev, port, &ev, enq, deq);
45
46         return 0;
47 }
48
49 static __rte_noinline int
50 pipeline_queue_worker_single_stage_fwd(void *arg)
51 {
52         PIPELINE_WORKER_SINGLE_STAGE_INIT;
53         const uint8_t *tx_queue = t->tx_evqueue_id;
54         uint8_t enq = 0, deq = 0;
55
56         while (t->done == false) {
57                 deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
58
59                 if (!deq) {
60                         rte_pause();
61                         continue;
62                 }
63
64                 ev.queue_id = tx_queue[ev.mbuf->port];
65                 rte_event_eth_tx_adapter_txq_set(ev.mbuf, 0);
66                 pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
67                 enq = pipeline_event_enqueue(dev, port, &ev, t);
68                 w->processed_pkts++;
69         }
70         pipeline_worker_cleanup(dev, port, &ev, enq, deq);
71
72         return 0;
73 }
74
75 static __rte_noinline int
76 pipeline_queue_worker_single_stage_burst_tx(void *arg)
77 {
78         PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
79         uint16_t nb_rx = 0, nb_tx = 0;
80
81         while (t->done == false) {
82                 nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
83
84                 if (!nb_rx) {
85                         rte_pause();
86                         continue;
87                 }
88
89                 for (i = 0; i < nb_rx; i++) {
90                         rte_prefetch0(ev[i + 1].mbuf);
91                         if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
92                                 pipeline_event_tx(dev, port, &ev[i], t);
93                                 ev[i].op = RTE_EVENT_OP_RELEASE;
94                                 w->processed_pkts++;
95                         } else {
96                                 ev[i].queue_id++;
97                                 pipeline_fwd_event(&ev[i],
98                                                 RTE_SCHED_TYPE_ATOMIC);
99                         }
100                 }
101                 nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
102         }
103         pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
104
105         return 0;
106 }
107
108 static __rte_noinline int
109 pipeline_queue_worker_single_stage_burst_fwd(void *arg)
110 {
111         PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
112         const uint8_t *tx_queue = t->tx_evqueue_id;
113         uint16_t nb_rx = 0, nb_tx = 0;
114
115         while (t->done == false) {
116                 nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
117
118                 if (!nb_rx) {
119                         rte_pause();
120                         continue;
121                 }
122
123                 for (i = 0; i < nb_rx; i++) {
124                         rte_prefetch0(ev[i + 1].mbuf);
125                         ev[i].queue_id = tx_queue[ev[i].mbuf->port];
126                         rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
127                         pipeline_fwd_event(&ev[i], RTE_SCHED_TYPE_ATOMIC);
128                 }
129
130                 nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
131                 w->processed_pkts += nb_rx;
132         }
133         pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
134
135         return 0;
136 }
137
138 static __rte_noinline int
139 pipeline_queue_worker_single_stage_tx_vector(void *arg)
140 {
141         PIPELINE_WORKER_SINGLE_STAGE_INIT;
142         uint8_t enq = 0, deq = 0;
143         uint16_t vector_sz;
144
145         while (!t->done) {
146                 deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
147
148                 if (!deq) {
149                         rte_pause();
150                         continue;
151                 }
152
153                 if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
154                         vector_sz = ev.vec->nb_elem;
155                         enq = pipeline_event_tx_vector(dev, port, &ev, t);
156                         ev.op = RTE_EVENT_OP_RELEASE;
157                         w->processed_pkts += vector_sz;
158                 } else {
159                         ev.queue_id++;
160                         pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
161                         enq = pipeline_event_enqueue(dev, port, &ev, t);
162                 }
163         }
164         pipeline_worker_cleanup(dev, port, &ev, enq, deq);
165
166         return 0;
167 }
168
169 static __rte_noinline int
170 pipeline_queue_worker_single_stage_fwd_vector(void *arg)
171 {
172         PIPELINE_WORKER_SINGLE_STAGE_INIT;
173         const uint8_t *tx_queue = t->tx_evqueue_id;
174         uint8_t enq = 0, deq = 0;
175         uint16_t vector_sz;
176
177         while (!t->done) {
178                 deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
179
180                 if (!deq) {
181                         rte_pause();
182                         continue;
183                 }
184
185                 ev.queue_id = tx_queue[ev.vec->port];
186                 ev.vec->queue = 0;
187                 vector_sz = ev.vec->nb_elem;
188                 pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
189                 enq = pipeline_event_enqueue(dev, port, &ev, t);
190                 w->processed_pkts += vector_sz;
191         }
192         pipeline_worker_cleanup(dev, port, &ev, enq, deq);
193
194         return 0;
195 }
196
197 static __rte_noinline int
198 pipeline_queue_worker_single_stage_burst_tx_vector(void *arg)
199 {
200         PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
201         uint16_t nb_rx = 0, nb_tx = 0;
202         uint16_t vector_sz;
203
204         while (!t->done) {
205                 nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
206
207                 if (!nb_rx) {
208                         rte_pause();
209                         continue;
210                 }
211
212                 for (i = 0; i < nb_rx; i++) {
213                         if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
214                                 vector_sz = ev[i].vec->nb_elem;
215                                 pipeline_event_tx_vector(dev, port, &ev[i], t);
216                                 ev[i].op = RTE_EVENT_OP_RELEASE;
217                                 w->processed_pkts += vector_sz;
218                         } else {
219                                 ev[i].queue_id++;
220                                 pipeline_fwd_event_vector(
221                                         &ev[i], RTE_SCHED_TYPE_ATOMIC);
222                         }
223                 }
224
225                 nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
226         }
227         pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
228
229         return 0;
230 }
231
232 static __rte_noinline int
233 pipeline_queue_worker_single_stage_burst_fwd_vector(void *arg)
234 {
235         PIPELINE_WORKER_SINGLE_STAGE_BURST_INIT;
236         const uint8_t *tx_queue = t->tx_evqueue_id;
237         uint16_t nb_rx = 0, nb_tx = 0;
238         uint16_t vector_sz;
239
240         while (!t->done) {
241                 nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
242
243                 if (!nb_rx) {
244                         rte_pause();
245                         continue;
246                 }
247
248                 vector_sz = 0;
249                 for (i = 0; i < nb_rx; i++) {
250                         ev[i].queue_id = tx_queue[ev[i].vec->port];
251                         ev[i].vec->queue = 0;
252                         vector_sz += ev[i].vec->nb_elem;
253                         pipeline_fwd_event_vector(&ev[i],
254                                                   RTE_SCHED_TYPE_ATOMIC);
255                 }
256
257                 nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
258                 w->processed_pkts += vector_sz;
259         }
260         pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
261
262         return 0;
263 }
264
265 static __rte_noinline int
266 pipeline_queue_worker_multi_stage_tx(void *arg)
267 {
268         PIPELINE_WORKER_MULTI_STAGE_INIT;
269         const uint8_t *tx_queue = t->tx_evqueue_id;
270         uint8_t enq = 0, deq = 0;
271
272         while (t->done == false) {
273                 deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
274
275                 if (!deq) {
276                         rte_pause();
277                         continue;
278                 }
279
280                 cq_id = ev.queue_id % nb_stages;
281
282                 if (ev.queue_id == tx_queue[ev.mbuf->port]) {
283                         enq = pipeline_event_tx(dev, port, &ev, t);
284                         ev.op = RTE_EVENT_OP_RELEASE;
285                         w->processed_pkts++;
286                         continue;
287                 }
288
289                 ev.queue_id++;
290                 pipeline_fwd_event(&ev, cq_id != last_queue ?
291                                 sched_type_list[cq_id] :
292                                 RTE_SCHED_TYPE_ATOMIC);
293                 enq = pipeline_event_enqueue(dev, port, &ev, t);
294         }
295         pipeline_worker_cleanup(dev, port, &ev, enq, deq);
296
297         return 0;
298 }
299
300 static __rte_noinline int
301 pipeline_queue_worker_multi_stage_fwd(void *arg)
302 {
303         PIPELINE_WORKER_MULTI_STAGE_INIT;
304         const uint8_t *tx_queue = t->tx_evqueue_id;
305         uint8_t enq = 0, deq = 0;
306
307         while (t->done == false) {
308                 deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
309
310                 if (!deq) {
311                         rte_pause();
312                         continue;
313                 }
314
315                 cq_id = ev.queue_id % nb_stages;
316
317                 if (cq_id == last_queue) {
318                         ev.queue_id = tx_queue[ev.mbuf->port];
319                         rte_event_eth_tx_adapter_txq_set(ev.mbuf, 0);
320                         pipeline_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
321                         enq = pipeline_event_enqueue(dev, port, &ev, t);
322                         w->processed_pkts++;
323                 } else {
324                         ev.queue_id++;
325                         pipeline_fwd_event(&ev, sched_type_list[cq_id]);
326                         enq = pipeline_event_enqueue(dev, port, &ev, t);
327                 }
328         }
329         pipeline_worker_cleanup(dev, port, &ev, enq, deq);
330
331         return 0;
332 }
333
334 static __rte_noinline int
335 pipeline_queue_worker_multi_stage_burst_tx(void *arg)
336 {
337         PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
338         const uint8_t *tx_queue = t->tx_evqueue_id;
339         uint16_t nb_rx = 0, nb_tx = 0;
340
341         while (t->done == false) {
342                 nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
343
344                 if (!nb_rx) {
345                         rte_pause();
346                         continue;
347                 }
348
349                 for (i = 0; i < nb_rx; i++) {
350                         rte_prefetch0(ev[i + 1].mbuf);
351                         cq_id = ev[i].queue_id % nb_stages;
352
353                         if (ev[i].queue_id == tx_queue[ev[i].mbuf->port]) {
354                                 pipeline_event_tx(dev, port, &ev[i], t);
355                                 ev[i].op = RTE_EVENT_OP_RELEASE;
356                                 w->processed_pkts++;
357                                 continue;
358                         }
359
360                         ev[i].queue_id++;
361                         pipeline_fwd_event(&ev[i], cq_id != last_queue ?
362                                         sched_type_list[cq_id] :
363                                         RTE_SCHED_TYPE_ATOMIC);
364                 }
365                 nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
366         }
367         pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
368
369         return 0;
370 }
371
372 static __rte_noinline int
373 pipeline_queue_worker_multi_stage_burst_fwd(void *arg)
374 {
375         PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
376         const uint8_t *tx_queue = t->tx_evqueue_id;
377         uint16_t nb_rx = 0, nb_tx = 0;
378
379         while (t->done == false) {
380                 uint16_t processed_pkts = 0;
381                 nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
382
383                 if (!nb_rx) {
384                         rte_pause();
385                         continue;
386                 }
387
388                 for (i = 0; i < nb_rx; i++) {
389                         rte_prefetch0(ev[i + 1].mbuf);
390                         cq_id = ev[i].queue_id % nb_stages;
391
392                         if (cq_id == last_queue) {
393                                 ev[i].queue_id = tx_queue[ev[i].mbuf->port];
394                                 rte_event_eth_tx_adapter_txq_set(ev[i].mbuf, 0);
395                                 pipeline_fwd_event(&ev[i],
396                                                 RTE_SCHED_TYPE_ATOMIC);
397                                 processed_pkts++;
398                         } else {
399                                 ev[i].queue_id++;
400                                 pipeline_fwd_event(&ev[i],
401                                                 sched_type_list[cq_id]);
402                         }
403                 }
404
405                 nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
406                 w->processed_pkts += processed_pkts;
407         }
408         pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
409
410         return 0;
411 }
412
413 static __rte_noinline int
414 pipeline_queue_worker_multi_stage_tx_vector(void *arg)
415 {
416         PIPELINE_WORKER_MULTI_STAGE_INIT;
417         const uint8_t *tx_queue = t->tx_evqueue_id;
418         uint8_t enq = 0, deq = 0;
419         uint16_t vector_sz;
420
421         while (!t->done) {
422                 deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
423
424                 if (!deq) {
425                         rte_pause();
426                         continue;
427                 }
428
429                 cq_id = ev.queue_id % nb_stages;
430
431                 if (ev.queue_id == tx_queue[ev.vec->port]) {
432                         vector_sz = ev.vec->nb_elem;
433                         enq = pipeline_event_tx_vector(dev, port, &ev, t);
434                         w->processed_pkts += vector_sz;
435                         ev.op = RTE_EVENT_OP_RELEASE;
436                         continue;
437                 }
438
439                 ev.queue_id++;
440                 pipeline_fwd_event_vector(&ev, cq_id != last_queue
441                                                        ? sched_type_list[cq_id]
442                                                        : RTE_SCHED_TYPE_ATOMIC);
443                 enq = pipeline_event_enqueue(dev, port, &ev, t);
444         }
445         pipeline_worker_cleanup(dev, port, &ev, enq, deq);
446
447         return 0;
448 }
449
450 static __rte_noinline int
451 pipeline_queue_worker_multi_stage_fwd_vector(void *arg)
452 {
453         PIPELINE_WORKER_MULTI_STAGE_INIT;
454         const uint8_t *tx_queue = t->tx_evqueue_id;
455         uint8_t enq = 0, deq = 0;
456         uint16_t vector_sz;
457
458         while (!t->done) {
459                 deq = rte_event_dequeue_burst(dev, port, &ev, 1, 0);
460
461                 if (!deq) {
462                         rte_pause();
463                         continue;
464                 }
465
466                 cq_id = ev.queue_id % nb_stages;
467
468                 if (cq_id == last_queue) {
469                         vector_sz = ev.vec->nb_elem;
470                         ev.queue_id = tx_queue[ev.vec->port];
471                         pipeline_fwd_event_vector(&ev, RTE_SCHED_TYPE_ATOMIC);
472                         w->processed_pkts += vector_sz;
473                 } else {
474                         ev.queue_id++;
475                         pipeline_fwd_event_vector(&ev, sched_type_list[cq_id]);
476                 }
477
478                 enq = pipeline_event_enqueue(dev, port, &ev, t);
479         }
480         pipeline_worker_cleanup(dev, port, &ev, enq, deq);
481
482         return 0;
483 }
484
485 static __rte_noinline int
486 pipeline_queue_worker_multi_stage_burst_tx_vector(void *arg)
487 {
488         PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
489         const uint8_t *tx_queue = t->tx_evqueue_id;
490         uint16_t nb_rx = 0, nb_tx = 0;
491         uint16_t vector_sz;
492
493         while (!t->done) {
494                 nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
495
496                 if (!nb_rx) {
497                         rte_pause();
498                         continue;
499                 }
500
501                 for (i = 0; i < nb_rx; i++) {
502                         cq_id = ev[i].queue_id % nb_stages;
503
504                         if (ev[i].queue_id == tx_queue[ev[i].vec->port]) {
505                                 vector_sz = ev[i].vec->nb_elem;
506                                 pipeline_event_tx_vector(dev, port, &ev[i], t);
507                                 ev[i].op = RTE_EVENT_OP_RELEASE;
508                                 w->processed_pkts += vector_sz;
509                                 continue;
510                         }
511
512                         ev[i].queue_id++;
513                         pipeline_fwd_event_vector(
514                                 &ev[i], cq_id != last_queue
515                                                 ? sched_type_list[cq_id]
516                                                 : RTE_SCHED_TYPE_ATOMIC);
517                 }
518
519                 nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
520         }
521         pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
522
523         return 0;
524 }
525
526 static __rte_noinline int
527 pipeline_queue_worker_multi_stage_burst_fwd_vector(void *arg)
528 {
529         PIPELINE_WORKER_MULTI_STAGE_BURST_INIT;
530         const uint8_t *tx_queue = t->tx_evqueue_id;
531         uint16_t nb_rx = 0, nb_tx = 0;
532         uint16_t vector_sz;
533
534         while (!t->done) {
535                 nb_rx = rte_event_dequeue_burst(dev, port, ev, BURST_SIZE, 0);
536
537                 if (!nb_rx) {
538                         rte_pause();
539                         continue;
540                 }
541
542                 for (i = 0; i < nb_rx; i++) {
543                         cq_id = ev[i].queue_id % nb_stages;
544
545                         if (cq_id == last_queue) {
546                                 ev[i].queue_id = tx_queue[ev[i].vec->port];
547                                 vector_sz = ev[i].vec->nb_elem;
548                                 pipeline_fwd_event_vector(
549                                         &ev[i], RTE_SCHED_TYPE_ATOMIC);
550                                 w->processed_pkts += vector_sz;
551                         } else {
552                                 ev[i].queue_id++;
553                                 pipeline_fwd_event_vector(
554                                         &ev[i], sched_type_list[cq_id]);
555                         }
556                 }
557
558                 nb_tx = pipeline_event_enqueue_burst(dev, port, ev, nb_rx, t);
559         }
560         pipeline_worker_cleanup(dev, port, ev, nb_tx, nb_rx);
561
562         return 0;
563 }
564
565 static int
566 worker_wrapper(void *arg)
567 {
568         struct worker_data *w  = arg;
569         struct evt_options *opt = w->t->opt;
570         const bool burst = evt_has_burst_mode(w->dev_id);
571         const bool internal_port = w->t->internal_port;
572         const uint8_t nb_stages = opt->nb_stages;
573         /*vector/burst/internal_port*/
574         const pipeline_queue_worker_t
575         pipeline_queue_worker_single_stage[2][2][2] = {
576                 [0][0][0] = pipeline_queue_worker_single_stage_fwd,
577                 [0][0][1] = pipeline_queue_worker_single_stage_tx,
578                 [0][1][0] = pipeline_queue_worker_single_stage_burst_fwd,
579                 [0][1][1] = pipeline_queue_worker_single_stage_burst_tx,
580                 [1][0][0] = pipeline_queue_worker_single_stage_fwd_vector,
581                 [1][0][1] = pipeline_queue_worker_single_stage_tx_vector,
582                 [1][1][0] = pipeline_queue_worker_single_stage_burst_fwd_vector,
583                 [1][1][1] = pipeline_queue_worker_single_stage_burst_tx_vector,
584         };
585         const pipeline_queue_worker_t
586         pipeline_queue_worker_multi_stage[2][2][2] = {
587                 [0][0][0] = pipeline_queue_worker_multi_stage_fwd,
588                 [0][0][1] = pipeline_queue_worker_multi_stage_tx,
589                 [0][1][0] = pipeline_queue_worker_multi_stage_burst_fwd,
590                 [0][1][1] = pipeline_queue_worker_multi_stage_burst_tx,
591                 [1][0][0] = pipeline_queue_worker_multi_stage_fwd_vector,
592                 [1][0][1] = pipeline_queue_worker_multi_stage_tx_vector,
593                 [1][1][0] = pipeline_queue_worker_multi_stage_burst_fwd_vector,
594                 [1][1][1] = pipeline_queue_worker_multi_stage_burst_tx_vector,
595         };
596
597         if (nb_stages == 1)
598                 return (pipeline_queue_worker_single_stage[opt->ena_vector]
599                                                           [burst]
600                                                           [internal_port])(arg);
601         else
602                 return (pipeline_queue_worker_multi_stage[opt->ena_vector]
603                                                          [burst]
604                                                          [internal_port])(arg);
605
606         rte_panic("invalid worker\n");
607 }
608
609 static int
610 pipeline_queue_launch_lcores(struct evt_test *test, struct evt_options *opt)
611 {
612         return pipeline_launch_lcores(test, opt, worker_wrapper);
613 }
614
615 static int
616 pipeline_queue_eventdev_setup(struct evt_test *test, struct evt_options *opt)
617 {
618         int ret;
619         int nb_ports;
620         int nb_queues;
621         int nb_stages = opt->nb_stages;
622         uint8_t queue;
623         uint8_t tx_evport_id = 0;
624         uint8_t tx_evqueue_id[RTE_MAX_ETHPORTS];
625         uint8_t queue_arr[RTE_EVENT_MAX_QUEUES_PER_DEV];
626         uint8_t nb_worker_queues = 0;
627         uint16_t prod = 0;
628         struct rte_event_dev_info info;
629         struct test_pipeline *t = evt_test_priv(test);
630
631         nb_ports = evt_nr_active_lcores(opt->wlcores);
632         nb_queues = rte_eth_dev_count_avail() * (nb_stages);
633
634         /* One queue for Tx adapter per port */
635         nb_queues += rte_eth_dev_count_avail();
636
637         memset(tx_evqueue_id, 0, sizeof(uint8_t) * RTE_MAX_ETHPORTS);
638         memset(queue_arr, 0, sizeof(uint8_t) * RTE_EVENT_MAX_QUEUES_PER_DEV);
639
640         rte_event_dev_info_get(opt->dev_id, &info);
641         ret = evt_configure_eventdev(opt, nb_queues, nb_ports);
642         if (ret) {
643                 evt_err("failed to configure eventdev %d", opt->dev_id);
644                 return ret;
645         }
646
647         struct rte_event_queue_conf q_conf = {
648                         .priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
649                         .nb_atomic_flows = opt->nb_flows,
650                         .nb_atomic_order_sequences = opt->nb_flows,
651         };
652         /* queue configurations */
653         for (queue = 0; queue < nb_queues; queue++) {
654                 uint8_t slot;
655
656                 q_conf.event_queue_cfg = 0;
657                 slot = queue % (nb_stages + 1);
658                 if (slot == nb_stages) {
659                         q_conf.schedule_type = RTE_SCHED_TYPE_ATOMIC;
660                         if (!t->internal_port) {
661                                 q_conf.event_queue_cfg =
662                                         RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
663                         }
664                         tx_evqueue_id[prod++] = queue;
665                 } else {
666                         q_conf.schedule_type = opt->sched_type_list[slot];
667                         queue_arr[nb_worker_queues] = queue;
668                         nb_worker_queues++;
669                 }
670
671                 ret = rte_event_queue_setup(opt->dev_id, queue, &q_conf);
672                 if (ret) {
673                         evt_err("failed to setup queue=%d", queue);
674                         return ret;
675                 }
676         }
677
678         if (opt->wkr_deq_dep > info.max_event_port_dequeue_depth)
679                 opt->wkr_deq_dep = info.max_event_port_dequeue_depth;
680
681         /* port configuration */
682         const struct rte_event_port_conf p_conf = {
683                         .dequeue_depth = opt->wkr_deq_dep,
684                         .enqueue_depth = info.max_event_port_dequeue_depth,
685                         .new_event_threshold = info.max_num_events,
686         };
687
688         if (!t->internal_port) {
689                 ret = pipeline_event_port_setup(test, opt, queue_arr,
690                                 nb_worker_queues, p_conf);
691                 if (ret)
692                         return ret;
693         } else
694                 ret = pipeline_event_port_setup(test, opt, NULL, nb_queues,
695                                 p_conf);
696
697         if (ret)
698                 return ret;
699         /*
700          * The pipelines are setup in the following manner:
701          *
702          * eth_dev_count = 2, nb_stages = 2.
703          *
704          *      queues = 6
705          *      stride = 3
706          *
707          *      event queue pipelines:
708          *      eth0 -> q0 -> q1 -> (q2->tx)
709          *      eth1 -> q3 -> q4 -> (q5->tx)
710          *
711          *      q2, q5 configured as ATOMIC | SINGLE_LINK
712          *
713          */
714         ret = pipeline_event_rx_adapter_setup(opt, nb_stages + 1, p_conf);
715         if (ret)
716                 return ret;
717
718         ret = pipeline_event_tx_adapter_setup(opt, p_conf);
719         if (ret)
720                 return ret;
721
722         if (!evt_has_distributed_sched(opt->dev_id)) {
723                 uint32_t service_id;
724                 rte_event_dev_service_id_get(opt->dev_id, &service_id);
725                 ret = evt_service_setup(service_id);
726                 if (ret) {
727                         evt_err("No service lcore found to run event dev.");
728                         return ret;
729                 }
730         }
731
732         /* Connect the tx_evqueue_id to the Tx adapter port */
733         if (!t->internal_port) {
734                 RTE_ETH_FOREACH_DEV(prod) {
735                         ret = rte_event_eth_tx_adapter_event_port_get(prod,
736                                         &tx_evport_id);
737                         if (ret) {
738                                 evt_err("Unable to get Tx adptr[%d] evprt[%d]",
739                                                 prod, tx_evport_id);
740                                 return ret;
741                         }
742
743                         if (rte_event_port_link(opt->dev_id, tx_evport_id,
744                                                 &tx_evqueue_id[prod],
745                                                 NULL, 1) != 1) {
746                                 evt_err("Unable to link Tx adptr[%d] evprt[%d]",
747                                                 prod, tx_evport_id);
748                                 return ret;
749                         }
750                 }
751         }
752
753         ret = rte_event_dev_start(opt->dev_id);
754         if (ret) {
755                 evt_err("failed to start eventdev %d", opt->dev_id);
756                 return ret;
757         }
758
759
760         RTE_ETH_FOREACH_DEV(prod) {
761                 ret = rte_eth_dev_start(prod);
762                 if (ret) {
763                         evt_err("Ethernet dev [%d] failed to start."
764                                         " Using synthetic producer", prod);
765                         return ret;
766                 }
767
768         }
769
770         RTE_ETH_FOREACH_DEV(prod) {
771                 ret = rte_event_eth_rx_adapter_start(prod);
772                 if (ret) {
773                         evt_err("Rx adapter[%d] start failed", prod);
774                         return ret;
775                 }
776
777                 ret = rte_event_eth_tx_adapter_start(prod);
778                 if (ret) {
779                         evt_err("Tx adapter[%d] start failed", prod);
780                         return ret;
781                 }
782         }
783
784         memcpy(t->tx_evqueue_id, tx_evqueue_id, sizeof(uint8_t) *
785                         RTE_MAX_ETHPORTS);
786
787         return 0;
788 }
789
790 static void
791 pipeline_queue_opt_dump(struct evt_options *opt)
792 {
793         pipeline_opt_dump(opt, pipeline_queue_nb_event_queues(opt));
794 }
795
796 static int
797 pipeline_queue_opt_check(struct evt_options *opt)
798 {
799         return pipeline_opt_check(opt, pipeline_queue_nb_event_queues(opt));
800 }
801
802 static bool
803 pipeline_queue_capability_check(struct evt_options *opt)
804 {
805         struct rte_event_dev_info dev_info;
806
807         rte_event_dev_info_get(opt->dev_id, &dev_info);
808         if (dev_info.max_event_queues < pipeline_queue_nb_event_queues(opt) ||
809                         dev_info.max_event_ports <
810                         evt_nr_active_lcores(opt->wlcores)) {
811                 evt_err("not enough eventdev queues=%d/%d or ports=%d/%d",
812                         pipeline_queue_nb_event_queues(opt),
813                         dev_info.max_event_queues,
814                         evt_nr_active_lcores(opt->wlcores),
815                         dev_info.max_event_ports);
816         }
817
818         return true;
819 }
820
821 static const struct evt_test_ops pipeline_queue =  {
822         .cap_check          = pipeline_queue_capability_check,
823         .opt_check          = pipeline_queue_opt_check,
824         .opt_dump           = pipeline_queue_opt_dump,
825         .test_setup         = pipeline_test_setup,
826         .mempool_setup      = pipeline_mempool_setup,
827         .ethdev_setup       = pipeline_ethdev_setup,
828         .eventdev_setup     = pipeline_queue_eventdev_setup,
829         .launch_lcores      = pipeline_queue_launch_lcores,
830         .ethdev_rx_stop     = pipeline_ethdev_rx_stop,
831         .eventdev_destroy   = pipeline_eventdev_destroy,
832         .mempool_destroy    = pipeline_mempool_destroy,
833         .ethdev_destroy     = pipeline_ethdev_destroy,
834         .test_result        = pipeline_test_result,
835         .test_destroy       = pipeline_test_destroy,
836 };
837
838 EVT_TEST_REGISTER(pipeline_queue);