examples/eventdev: move common data into pipeline common
[dpdk.git] / examples / eventdev_pipeline_sw_pmd / main.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2017 Intel Corporation
3  */
4
5 #include <getopt.h>
6 #include <stdint.h>
7 #include <stdio.h>
8 #include <signal.h>
9 #include <sched.h>
10
11 #include "pipeline_common.h"
12
13 struct config_data cdata = {
14         .num_packets = (1L << 25), /* do ~32M packets */
15         .num_fids = 512,
16         .queue_type = RTE_SCHED_TYPE_ATOMIC,
17         .next_qid = {-1},
18         .qid = {-1},
19         .num_stages = 1,
20         .worker_cq_depth = 16
21 };
22
23 static bool
24 core_in_use(unsigned int lcore_id) {
25         return (fdata->rx_core[lcore_id] || fdata->sched_core[lcore_id] ||
26                 fdata->tx_core[lcore_id] || fdata->worker_core[lcore_id]);
27 }
28
29 static void
30 eth_tx_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
31                         void *userdata)
32 {
33         int port_id = (uintptr_t) userdata;
34         unsigned int _sent = 0;
35
36         do {
37                 /* Note: hard-coded TX queue */
38                 _sent += rte_eth_tx_burst(port_id, 0, &pkts[_sent],
39                                           unsent - _sent);
40         } while (_sent != unsent);
41 }
42
43 static int
44 consumer(void)
45 {
46         const uint64_t freq_khz = rte_get_timer_hz() / 1000;
47         struct rte_event packets[BATCH_SIZE];
48
49         static uint64_t received;
50         static uint64_t last_pkts;
51         static uint64_t last_time;
52         static uint64_t start_time;
53         unsigned int i, j;
54         uint8_t dev_id = cons_data.dev_id;
55         uint8_t port_id = cons_data.port_id;
56
57         uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
58                         packets, RTE_DIM(packets), 0);
59
60         if (n == 0) {
61                 for (j = 0; j < rte_eth_dev_count(); j++)
62                         rte_eth_tx_buffer_flush(j, 0, fdata->tx_buf[j]);
63                 return 0;
64         }
65         if (start_time == 0)
66                 last_time = start_time = rte_get_timer_cycles();
67
68         received += n;
69         for (i = 0; i < n; i++) {
70                 uint8_t outport = packets[i].mbuf->port;
71                 rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
72                                 packets[i].mbuf);
73
74                 packets[i].op = RTE_EVENT_OP_RELEASE;
75         }
76
77         if (cons_data.release) {
78                 uint16_t nb_tx;
79
80                 nb_tx = rte_event_enqueue_burst(dev_id, port_id, packets, n);
81                 while (nb_tx < n)
82                         nb_tx += rte_event_enqueue_burst(dev_id, port_id,
83                                                          packets + nb_tx,
84                                                          n - nb_tx);
85         }
86
87         /* Print out mpps every 1<22 packets */
88         if (!cdata.quiet && received >= last_pkts + (1<<22)) {
89                 const uint64_t now = rte_get_timer_cycles();
90                 const uint64_t total_ms = (now - start_time) / freq_khz;
91                 const uint64_t delta_ms = (now - last_time) / freq_khz;
92                 uint64_t delta_pkts = received - last_pkts;
93
94                 printf("# consumer RX=%"PRIu64", time %"PRIu64 "ms, "
95                         "avg %.3f mpps [current %.3f mpps]\n",
96                                 received,
97                                 total_ms,
98                                 received / (total_ms * 1000.0),
99                                 delta_pkts / (delta_ms * 1000.0));
100                 last_pkts = received;
101                 last_time = now;
102         }
103
104         cdata.num_packets -= n;
105         if (cdata.num_packets <= 0)
106                 fdata->done = 1;
107
108         return 0;
109 }
110
111 static inline void
112 schedule_devices(unsigned int lcore_id)
113 {
114         if (fdata->rx_core[lcore_id]) {
115                 rte_service_run_iter_on_app_lcore(fdata->rxadptr_service_id,
116                                 !fdata->rx_single);
117         }
118
119         if (fdata->sched_core[lcore_id]) {
120                 rte_service_run_iter_on_app_lcore(fdata->evdev_service_id,
121                                 !fdata->sched_single);
122                 if (cdata.dump_dev_signal) {
123                         rte_event_dev_dump(0, stdout);
124                         cdata.dump_dev_signal = 0;
125                 }
126         }
127
128         if (fdata->tx_core[lcore_id] && (fdata->tx_single ||
129             rte_atomic32_cmpset(&(fdata->tx_lock), 0, 1))) {
130                 consumer();
131                 rte_atomic32_clear((rte_atomic32_t *)&(fdata->tx_lock));
132         }
133 }
134
135 static inline void
136 work(struct rte_mbuf *m)
137 {
138         struct ether_hdr *eth;
139         struct ether_addr addr;
140
141         /* change mac addresses on packet (to use mbuf data) */
142         /*
143          * FIXME Swap mac address properly and also handle the
144          * case for both odd and even number of stages that the
145          * addresses end up the same at the end of the pipeline
146          */
147         eth = rte_pktmbuf_mtod(m, struct ether_hdr *);
148         ether_addr_copy(&eth->d_addr, &addr);
149         ether_addr_copy(&addr, &eth->d_addr);
150
151         /* do a number of cycles of work per packet */
152         volatile uint64_t start_tsc = rte_rdtsc();
153         while (rte_rdtsc() < start_tsc + cdata.worker_cycles)
154                 rte_pause();
155 }
156
157 static int
158 worker(void *arg)
159 {
160         struct rte_event events[BATCH_SIZE];
161
162         struct worker_data *data = (struct worker_data *)arg;
163         uint8_t dev_id = data->dev_id;
164         uint8_t port_id = data->port_id;
165         size_t sent = 0, received = 0;
166         unsigned int lcore_id = rte_lcore_id();
167
168         while (!fdata->done) {
169                 uint16_t i;
170
171                 schedule_devices(lcore_id);
172
173                 if (!fdata->worker_core[lcore_id]) {
174                         rte_pause();
175                         continue;
176                 }
177
178                 const uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
179                                 events, RTE_DIM(events), 0);
180
181                 if (nb_rx == 0) {
182                         rte_pause();
183                         continue;
184                 }
185                 received += nb_rx;
186
187                 for (i = 0; i < nb_rx; i++) {
188
189                         /* The first worker stage does classification */
190                         if (events[i].queue_id == cdata.qid[0])
191                                 events[i].flow_id = events[i].mbuf->hash.rss
192                                                         % cdata.num_fids;
193
194                         events[i].queue_id = cdata.next_qid[events[i].queue_id];
195                         events[i].op = RTE_EVENT_OP_FORWARD;
196                         events[i].sched_type = cdata.queue_type;
197
198                         work(events[i].mbuf);
199                 }
200                 uint16_t nb_tx = rte_event_enqueue_burst(dev_id, port_id,
201                                 events, nb_rx);
202                 while (nb_tx < nb_rx && !fdata->done)
203                         nb_tx += rte_event_enqueue_burst(dev_id, port_id,
204                                                         events + nb_tx,
205                                                         nb_rx - nb_tx);
206                 sent += nb_tx;
207         }
208
209         if (!cdata.quiet)
210                 printf("  worker %u thread done. RX=%zu TX=%zu\n",
211                                 rte_lcore_id(), received, sent);
212
213         return 0;
214 }
215
216 /*
217  * Parse the coremask given as argument (hexadecimal string) and fill
218  * the global configuration (core role and core count) with the parsed
219  * value.
220  */
221 static int xdigit2val(unsigned char c)
222 {
223         int val;
224
225         if (isdigit(c))
226                 val = c - '0';
227         else if (isupper(c))
228                 val = c - 'A' + 10;
229         else
230                 val = c - 'a' + 10;
231         return val;
232 }
233
234 static uint64_t
235 parse_coremask(const char *coremask)
236 {
237         int i, j, idx = 0;
238         unsigned int count = 0;
239         char c;
240         int val;
241         uint64_t mask = 0;
242         const int32_t BITS_HEX = 4;
243
244         if (coremask == NULL)
245                 return -1;
246         /* Remove all blank characters ahead and after .
247          * Remove 0x/0X if exists.
248          */
249         while (isblank(*coremask))
250                 coremask++;
251         if (coremask[0] == '0' && ((coremask[1] == 'x')
252                 || (coremask[1] == 'X')))
253                 coremask += 2;
254         i = strlen(coremask);
255         while ((i > 0) && isblank(coremask[i - 1]))
256                 i--;
257         if (i == 0)
258                 return -1;
259
260         for (i = i - 1; i >= 0 && idx < MAX_NUM_CORE; i--) {
261                 c = coremask[i];
262                 if (isxdigit(c) == 0) {
263                         /* invalid characters */
264                         return -1;
265                 }
266                 val = xdigit2val(c);
267                 for (j = 0; j < BITS_HEX && idx < MAX_NUM_CORE; j++, idx++) {
268                         if ((1 << j) & val) {
269                                 mask |= (1UL << idx);
270                                 count++;
271                         }
272                 }
273         }
274         for (; i >= 0; i--)
275                 if (coremask[i] != '0')
276                         return -1;
277         if (count == 0)
278                 return -1;
279         return mask;
280 }
281
282 static struct option long_options[] = {
283         {"workers", required_argument, 0, 'w'},
284         {"packets", required_argument, 0, 'n'},
285         {"atomic-flows", required_argument, 0, 'f'},
286         {"num_stages", required_argument, 0, 's'},
287         {"rx-mask", required_argument, 0, 'r'},
288         {"tx-mask", required_argument, 0, 't'},
289         {"sched-mask", required_argument, 0, 'e'},
290         {"cq-depth", required_argument, 0, 'c'},
291         {"work-cycles", required_argument, 0, 'W'},
292         {"queue-priority", no_argument, 0, 'P'},
293         {"parallel", no_argument, 0, 'p'},
294         {"ordered", no_argument, 0, 'o'},
295         {"quiet", no_argument, 0, 'q'},
296         {"dump", no_argument, 0, 'D'},
297         {0, 0, 0, 0}
298 };
299
300 static void
301 usage(void)
302 {
303         const char *usage_str =
304                 "  Usage: eventdev_demo [options]\n"
305                 "  Options:\n"
306                 "  -n, --packets=N              Send N packets (default ~32M), 0 implies no limit\n"
307                 "  -f, --atomic-flows=N         Use N random flows from 1 to N (default 16)\n"
308                 "  -s, --num_stages=N           Use N atomic stages (default 1)\n"
309                 "  -r, --rx-mask=core mask      Run NIC rx on CPUs in core mask\n"
310                 "  -w, --worker-mask=core mask  Run worker on CPUs in core mask\n"
311                 "  -t, --tx-mask=core mask      Run NIC tx on CPUs in core mask\n"
312                 "  -e  --sched-mask=core mask   Run scheduler on CPUs in core mask\n"
313                 "  -c  --cq-depth=N             Worker CQ depth (default 16)\n"
314                 "  -W  --work-cycles=N          Worker cycles (default 0)\n"
315                 "  -P  --queue-priority         Enable scheduler queue prioritization\n"
316                 "  -o, --ordered                Use ordered scheduling\n"
317                 "  -p, --parallel               Use parallel scheduling\n"
318                 "  -q, --quiet                  Minimize printed output\n"
319                 "  -D, --dump                   Print detailed statistics before exit"
320                 "\n";
321         fprintf(stderr, "%s", usage_str);
322         exit(1);
323 }
324
325 static void
326 parse_app_args(int argc, char **argv)
327 {
328         /* Parse cli options*/
329         int option_index;
330         int c;
331         opterr = 0;
332         uint64_t rx_lcore_mask = 0;
333         uint64_t tx_lcore_mask = 0;
334         uint64_t sched_lcore_mask = 0;
335         uint64_t worker_lcore_mask = 0;
336         int i;
337
338         for (;;) {
339                 c = getopt_long(argc, argv, "r:t:e:c:w:n:f:s:poPqDW:",
340                                 long_options, &option_index);
341                 if (c == -1)
342                         break;
343
344                 int popcnt = 0;
345                 switch (c) {
346                 case 'n':
347                         cdata.num_packets = (int64_t)atol(optarg);
348                         if (cdata.num_packets == 0)
349                                 cdata.num_packets = INT64_MAX;
350                         break;
351                 case 'f':
352                         cdata.num_fids = (unsigned int)atoi(optarg);
353                         break;
354                 case 's':
355                         cdata.num_stages = (unsigned int)atoi(optarg);
356                         break;
357                 case 'c':
358                         cdata.worker_cq_depth = (unsigned int)atoi(optarg);
359                         break;
360                 case 'W':
361                         cdata.worker_cycles = (unsigned int)atoi(optarg);
362                         break;
363                 case 'P':
364                         cdata.enable_queue_priorities = 1;
365                         break;
366                 case 'o':
367                         cdata.queue_type = RTE_SCHED_TYPE_ORDERED;
368                         break;
369                 case 'p':
370                         cdata.queue_type = RTE_SCHED_TYPE_PARALLEL;
371                         break;
372                 case 'q':
373                         cdata.quiet = 1;
374                         break;
375                 case 'D':
376                         cdata.dump_dev = 1;
377                         break;
378                 case 'w':
379                         worker_lcore_mask = parse_coremask(optarg);
380                         break;
381                 case 'r':
382                         rx_lcore_mask = parse_coremask(optarg);
383                         popcnt = __builtin_popcountll(rx_lcore_mask);
384                         fdata->rx_single = (popcnt == 1);
385                         break;
386                 case 't':
387                         tx_lcore_mask = parse_coremask(optarg);
388                         popcnt = __builtin_popcountll(tx_lcore_mask);
389                         fdata->tx_single = (popcnt == 1);
390                         break;
391                 case 'e':
392                         sched_lcore_mask = parse_coremask(optarg);
393                         popcnt = __builtin_popcountll(sched_lcore_mask);
394                         fdata->sched_single = (popcnt == 1);
395                         break;
396                 default:
397                         usage();
398                 }
399         }
400
401         if (worker_lcore_mask == 0 || rx_lcore_mask == 0 ||
402             sched_lcore_mask == 0 || tx_lcore_mask == 0) {
403                 printf("Core part of pipeline was not assigned any cores. "
404                         "This will stall the pipeline, please check core masks "
405                         "(use -h for details on setting core masks):\n"
406                         "\trx: %"PRIu64"\n\ttx: %"PRIu64"\n\tsched: %"PRIu64
407                         "\n\tworkers: %"PRIu64"\n",
408                         rx_lcore_mask, tx_lcore_mask, sched_lcore_mask,
409                         worker_lcore_mask);
410                 rte_exit(-1, "Fix core masks\n");
411         }
412         if (cdata.num_stages == 0 || cdata.num_stages > MAX_NUM_STAGES)
413                 usage();
414
415         for (i = 0; i < MAX_NUM_CORE; i++) {
416                 fdata->rx_core[i] = !!(rx_lcore_mask & (1UL << i));
417                 fdata->tx_core[i] = !!(tx_lcore_mask & (1UL << i));
418                 fdata->sched_core[i] = !!(sched_lcore_mask & (1UL << i));
419                 fdata->worker_core[i] = !!(worker_lcore_mask & (1UL << i));
420
421                 if (fdata->worker_core[i])
422                         cdata.num_workers++;
423                 if (core_in_use(i))
424                         cdata.active_cores++;
425         }
426 }
427
428 static inline void
429 init_rx_adapter(uint16_t nb_ports)
430 {
431         int i;
432         int ret;
433         uint8_t evdev_id = 0;
434         struct rte_event_dev_info dev_info;
435
436         ret = rte_event_dev_info_get(evdev_id, &dev_info);
437
438         struct rte_event_port_conf rx_p_conf = {
439                 .dequeue_depth = 8,
440                 .enqueue_depth = 8,
441                 .new_event_threshold = 1200,
442         };
443
444         if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
445                 rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth;
446         if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
447                 rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth;
448
449         ret = rte_event_eth_rx_adapter_create(cdata.rx_adapter_id, evdev_id,
450                         &rx_p_conf);
451         if (ret)
452                 rte_exit(EXIT_FAILURE, "failed to create rx adapter[%d]",
453                                 cdata.rx_adapter_id);
454
455         struct rte_event_eth_rx_adapter_queue_conf queue_conf = {
456                 .ev.sched_type = cdata.queue_type,
457                 .ev.queue_id = cdata.qid[0],
458         };
459
460         for (i = 0; i < nb_ports; i++) {
461                 uint32_t cap;
462
463                 ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap);
464                 if (ret)
465                         rte_exit(EXIT_FAILURE,
466                                         "failed to get event rx adapter "
467                                         "capabilities");
468
469                 ret = rte_event_eth_rx_adapter_queue_add(cdata.rx_adapter_id, i,
470                                 -1, &queue_conf);
471                 if (ret)
472                         rte_exit(EXIT_FAILURE,
473                                         "Failed to add queues to Rx adapter");
474         }
475
476         ret = rte_event_eth_rx_adapter_service_id_get(cdata.rx_adapter_id,
477                                 &fdata->rxadptr_service_id);
478         if (ret != -ESRCH && ret != 0) {
479                 rte_exit(EXIT_FAILURE,
480                         "Error getting the service ID for Rx adapter\n");
481         }
482         rte_service_runstate_set(fdata->rxadptr_service_id, 1);
483         rte_service_set_runstate_mapped_check(fdata->rxadptr_service_id, 0);
484
485         ret = rte_event_eth_rx_adapter_start(cdata.rx_adapter_id);
486         if (ret)
487                 rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed",
488                                 cdata.rx_adapter_id);
489
490 }
491
492 /*
493  * Initializes a given port using global settings and with the RX buffers
494  * coming from the mbuf_pool passed as a parameter.
495  */
496 static inline int
497 port_init(uint8_t port, struct rte_mempool *mbuf_pool)
498 {
499         static const struct rte_eth_conf port_conf_default = {
500                 .rxmode = {
501                         .mq_mode = ETH_MQ_RX_RSS,
502                         .max_rx_pkt_len = ETHER_MAX_LEN,
503                         .ignore_offload_bitfield = 1,
504                 },
505                 .rx_adv_conf = {
506                         .rss_conf = {
507                                 .rss_hf = ETH_RSS_IP |
508                                           ETH_RSS_TCP |
509                                           ETH_RSS_UDP,
510                         }
511                 }
512         };
513         const uint16_t rx_rings = 1, tx_rings = 1;
514         const uint16_t rx_ring_size = 512, tx_ring_size = 512;
515         struct rte_eth_conf port_conf = port_conf_default;
516         int retval;
517         uint16_t q;
518         struct rte_eth_dev_info dev_info;
519         struct rte_eth_txconf txconf;
520
521         if (port >= rte_eth_dev_count())
522                 return -1;
523
524         rte_eth_dev_info_get(port, &dev_info);
525         if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
526                 port_conf.txmode.offloads |=
527                         DEV_TX_OFFLOAD_MBUF_FAST_FREE;
528
529         /* Configure the Ethernet device. */
530         retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf);
531         if (retval != 0)
532                 return retval;
533
534         /* Allocate and set up 1 RX queue per Ethernet port. */
535         for (q = 0; q < rx_rings; q++) {
536                 retval = rte_eth_rx_queue_setup(port, q, rx_ring_size,
537                                 rte_eth_dev_socket_id(port), NULL, mbuf_pool);
538                 if (retval < 0)
539                         return retval;
540         }
541
542         txconf = dev_info.default_txconf;
543         txconf.txq_flags = ETH_TXQ_FLAGS_IGNORE;
544         txconf.offloads = port_conf_default.txmode.offloads;
545         /* Allocate and set up 1 TX queue per Ethernet port. */
546         for (q = 0; q < tx_rings; q++) {
547                 retval = rte_eth_tx_queue_setup(port, q, tx_ring_size,
548                                 rte_eth_dev_socket_id(port), &txconf);
549                 if (retval < 0)
550                         return retval;
551         }
552
553         /* Start the Ethernet port. */
554         retval = rte_eth_dev_start(port);
555         if (retval < 0)
556                 return retval;
557
558         /* Display the port MAC address. */
559         struct ether_addr addr;
560         rte_eth_macaddr_get(port, &addr);
561         printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
562                            " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
563                         (unsigned int)port,
564                         addr.addr_bytes[0], addr.addr_bytes[1],
565                         addr.addr_bytes[2], addr.addr_bytes[3],
566                         addr.addr_bytes[4], addr.addr_bytes[5]);
567
568         /* Enable RX in promiscuous mode for the Ethernet device. */
569         rte_eth_promiscuous_enable(port);
570
571         return 0;
572 }
573
574 static int
575 init_ports(unsigned int num_ports)
576 {
577         uint8_t portid;
578         unsigned int i;
579
580         struct rte_mempool *mp = rte_pktmbuf_pool_create("packet_pool",
581                         /* mbufs */ 16384 * num_ports,
582                         /* cache_size */ 512,
583                         /* priv_size*/ 0,
584                         /* data_room_size */ RTE_MBUF_DEFAULT_BUF_SIZE,
585                         rte_socket_id());
586
587         for (portid = 0; portid < num_ports; portid++)
588                 if (port_init(portid, mp) != 0)
589                         rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8 "\n",
590                                         portid);
591
592         for (i = 0; i < num_ports; i++) {
593                 void *userdata = (void *)(uintptr_t) i;
594                 fdata->tx_buf[i] =
595                         rte_malloc(NULL, RTE_ETH_TX_BUFFER_SIZE(32), 0);
596                 if (fdata->tx_buf[i] == NULL)
597                         rte_panic("Out of memory\n");
598                 rte_eth_tx_buffer_init(fdata->tx_buf[i], 32);
599                 rte_eth_tx_buffer_set_err_callback(fdata->tx_buf[i],
600                                                    eth_tx_buffer_retry,
601                                                    userdata);
602         }
603
604         return 0;
605 }
606
607 struct port_link {
608         uint8_t queue_id;
609         uint8_t priority;
610 };
611
612 static int
613 setup_eventdev(struct cons_data *cons_data,
614                 struct worker_data *worker_data)
615 {
616         const uint8_t dev_id = 0;
617         /* +1 stages is for a SINGLE_LINK TX stage */
618         const uint8_t nb_queues = cdata.num_stages + 1;
619         /* + 1 for consumer */
620         const uint8_t nb_ports = cdata.num_workers + 1;
621         struct rte_event_dev_config config = {
622                         .nb_event_queues = nb_queues,
623                         .nb_event_ports = nb_ports,
624                         .nb_events_limit  = 4096,
625                         .nb_event_queue_flows = 1024,
626                         .nb_event_port_dequeue_depth = 128,
627                         .nb_event_port_enqueue_depth = 128,
628         };
629         struct rte_event_port_conf wkr_p_conf = {
630                         .dequeue_depth = cdata.worker_cq_depth,
631                         .enqueue_depth = 64,
632                         .new_event_threshold = 4096,
633         };
634         struct rte_event_queue_conf wkr_q_conf = {
635                         .schedule_type = cdata.queue_type,
636                         .priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
637                         .nb_atomic_flows = 1024,
638                         .nb_atomic_order_sequences = 1024,
639         };
640         struct rte_event_port_conf tx_p_conf = {
641                         .dequeue_depth = 128,
642                         .enqueue_depth = 128,
643                         .new_event_threshold = 4096,
644         };
645         const struct rte_event_queue_conf tx_q_conf = {
646                         .priority = RTE_EVENT_DEV_PRIORITY_HIGHEST,
647                         .event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_LINK,
648         };
649
650         struct port_link worker_queues[MAX_NUM_STAGES];
651         uint8_t disable_implicit_release;
652         struct port_link tx_queue;
653         unsigned int i;
654
655         int ret, ndev = rte_event_dev_count();
656         if (ndev < 1) {
657                 printf("%d: No Eventdev Devices Found\n", __LINE__);
658                 return -1;
659         }
660
661         struct rte_event_dev_info dev_info;
662         ret = rte_event_dev_info_get(dev_id, &dev_info);
663         printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name);
664
665         disable_implicit_release = (dev_info.event_dev_cap &
666                                     RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE);
667
668         wkr_p_conf.disable_implicit_release = disable_implicit_release;
669         tx_p_conf.disable_implicit_release = disable_implicit_release;
670
671         if (dev_info.max_event_port_dequeue_depth <
672                         config.nb_event_port_dequeue_depth)
673                 config.nb_event_port_dequeue_depth =
674                                 dev_info.max_event_port_dequeue_depth;
675         if (dev_info.max_event_port_enqueue_depth <
676                         config.nb_event_port_enqueue_depth)
677                 config.nb_event_port_enqueue_depth =
678                                 dev_info.max_event_port_enqueue_depth;
679
680         ret = rte_event_dev_configure(dev_id, &config);
681         if (ret < 0) {
682                 printf("%d: Error configuring device\n", __LINE__);
683                 return -1;
684         }
685
686         /* Q creation - one load balanced per pipeline stage*/
687         printf("  Stages:\n");
688         for (i = 0; i < cdata.num_stages; i++) {
689                 if (rte_event_queue_setup(dev_id, i, &wkr_q_conf) < 0) {
690                         printf("%d: error creating qid %d\n", __LINE__, i);
691                         return -1;
692                 }
693                 cdata.qid[i] = i;
694                 cdata.next_qid[i] = i+1;
695                 worker_queues[i].queue_id = i;
696                 if (cdata.enable_queue_priorities) {
697                         /* calculate priority stepping for each stage, leaving
698                          * headroom of 1 for the SINGLE_LINK TX below
699                          */
700                         const uint32_t prio_delta =
701                                 (RTE_EVENT_DEV_PRIORITY_LOWEST-1) /  nb_queues;
702
703                         /* higher priority for queues closer to tx */
704                         wkr_q_conf.priority =
705                                 RTE_EVENT_DEV_PRIORITY_LOWEST - prio_delta * i;
706                 }
707
708                 const char *type_str = "Atomic";
709                 switch (wkr_q_conf.schedule_type) {
710                 case RTE_SCHED_TYPE_ORDERED:
711                         type_str = "Ordered";
712                         break;
713                 case RTE_SCHED_TYPE_PARALLEL:
714                         type_str = "Parallel";
715                         break;
716                 }
717                 printf("\tStage %d, Type %s\tPriority = %d\n", i, type_str,
718                                 wkr_q_conf.priority);
719         }
720         printf("\n");
721
722         /* final queue for sending to TX core */
723         if (rte_event_queue_setup(dev_id, i, &tx_q_conf) < 0) {
724                 printf("%d: error creating qid %d\n", __LINE__, i);
725                 return -1;
726         }
727         tx_queue.queue_id = i;
728         tx_queue.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST;
729
730         if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
731                 wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
732         if (wkr_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
733                 wkr_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
734
735         /* set up one port per worker, linking to all stage queues */
736         for (i = 0; i < cdata.num_workers; i++) {
737                 struct worker_data *w = &worker_data[i];
738                 w->dev_id = dev_id;
739                 if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) {
740                         printf("Error setting up port %d\n", i);
741                         return -1;
742                 }
743
744                 uint32_t s;
745                 for (s = 0; s < cdata.num_stages; s++) {
746                         if (rte_event_port_link(dev_id, i,
747                                                 &worker_queues[s].queue_id,
748                                                 &worker_queues[s].priority,
749                                                 1) != 1) {
750                                 printf("%d: error creating link for port %d\n",
751                                                 __LINE__, i);
752                                 return -1;
753                         }
754                 }
755                 w->port_id = i;
756         }
757
758         if (tx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
759                 tx_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
760         if (tx_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
761                 tx_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
762
763         /* port for consumer, linked to TX queue */
764         if (rte_event_port_setup(dev_id, i, &tx_p_conf) < 0) {
765                 printf("Error setting up port %d\n", i);
766                 return -1;
767         }
768         if (rte_event_port_link(dev_id, i, &tx_queue.queue_id,
769                                 &tx_queue.priority, 1) != 1) {
770                 printf("%d: error creating link for port %d\n",
771                                 __LINE__, i);
772                 return -1;
773         }
774         *cons_data = (struct cons_data){.dev_id = dev_id,
775                                         .port_id = i,
776                                         .release = disable_implicit_release };
777
778         ret = rte_event_dev_service_id_get(dev_id,
779                                 &fdata->evdev_service_id);
780         if (ret != -ESRCH && ret != 0) {
781                 printf("Error getting the service ID for sw eventdev\n");
782                 return -1;
783         }
784         rte_service_runstate_set(fdata->evdev_service_id, 1);
785         rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0);
786         if (rte_event_dev_start(dev_id) < 0) {
787                 printf("Error starting eventdev\n");
788                 return -1;
789         }
790
791         return dev_id;
792 }
793
794 static void
795 signal_handler(int signum)
796 {
797         if (fdata->done)
798                 rte_exit(1, "Exiting on signal %d\n", signum);
799         if (signum == SIGINT || signum == SIGTERM) {
800                 printf("\n\nSignal %d received, preparing to exit...\n",
801                                 signum);
802                 fdata->done = 1;
803         }
804         if (signum == SIGTSTP)
805                 rte_event_dev_dump(0, stdout);
806 }
807
808 static inline uint64_t
809 port_stat(int dev_id, int32_t p)
810 {
811         char statname[64];
812         snprintf(statname, sizeof(statname), "port_%u_rx", p);
813         return rte_event_dev_xstats_by_name_get(dev_id, statname, NULL);
814 }
815
816 int
817 main(int argc, char **argv)
818 {
819         struct worker_data *worker_data;
820         unsigned int num_ports;
821         int lcore_id;
822         int err;
823
824         signal(SIGINT, signal_handler);
825         signal(SIGTERM, signal_handler);
826         signal(SIGTSTP, signal_handler);
827
828         err = rte_eal_init(argc, argv);
829         if (err < 0)
830                 rte_panic("Invalid EAL arguments\n");
831
832         argc -= err;
833         argv += err;
834
835         fdata = rte_malloc(NULL, sizeof(struct fastpath_data), 0);
836         if (fdata == NULL)
837                 rte_panic("Out of memory\n");
838
839         /* Parse cli options*/
840         parse_app_args(argc, argv);
841
842         num_ports = rte_eth_dev_count();
843         if (num_ports == 0)
844                 rte_panic("No ethernet ports found\n");
845
846         const unsigned int cores_needed = cdata.active_cores;
847
848         if (!cdata.quiet) {
849                 printf("  Config:\n");
850                 printf("\tports: %u\n", num_ports);
851                 printf("\tworkers: %u\n", cdata.num_workers);
852                 printf("\tpackets: %"PRIi64"\n", cdata.num_packets);
853                 printf("\tQueue-prio: %u\n", cdata.enable_queue_priorities);
854                 if (cdata.queue_type == RTE_SCHED_TYPE_ORDERED)
855                         printf("\tqid0 type: ordered\n");
856                 if (cdata.queue_type == RTE_SCHED_TYPE_ATOMIC)
857                         printf("\tqid0 type: atomic\n");
858                 printf("\tCores available: %u\n", rte_lcore_count());
859                 printf("\tCores used: %u\n", cores_needed);
860         }
861
862         if (rte_lcore_count() < cores_needed)
863                 rte_panic("Too few cores (%d < %d)\n", rte_lcore_count(),
864                                 cores_needed);
865
866         const unsigned int ndevs = rte_event_dev_count();
867         if (ndevs == 0)
868                 rte_panic("No dev_id devs found. Pasl in a --vdev eventdev.\n");
869         if (ndevs > 1)
870                 fprintf(stderr, "Warning: More than one eventdev, using idx 0");
871
872         worker_data = rte_calloc(0, cdata.num_workers,
873                         sizeof(worker_data[0]), 0);
874         if (worker_data == NULL)
875                 rte_panic("rte_calloc failed\n");
876
877         int dev_id = setup_eventdev(&cons_data, worker_data);
878         if (dev_id < 0)
879                 rte_exit(EXIT_FAILURE, "Error setting up eventdev\n");
880
881         init_ports(num_ports);
882         init_rx_adapter(num_ports);
883
884         int worker_idx = 0;
885         RTE_LCORE_FOREACH_SLAVE(lcore_id) {
886                 if (lcore_id >= MAX_NUM_CORE)
887                         break;
888
889                 if (!fdata->rx_core[lcore_id] &&
890                         !fdata->worker_core[lcore_id] &&
891                         !fdata->tx_core[lcore_id] &&
892                         !fdata->sched_core[lcore_id])
893                         continue;
894
895                 if (fdata->rx_core[lcore_id])
896                         printf(
897                                 "[%s()] lcore %d executing NIC Rx\n",
898                                 __func__, lcore_id);
899
900                 if (fdata->tx_core[lcore_id])
901                         printf(
902                                 "[%s()] lcore %d executing NIC Tx, and using eventdev port %u\n",
903                                 __func__, lcore_id, cons_data.port_id);
904
905                 if (fdata->sched_core[lcore_id])
906                         printf("[%s()] lcore %d executing scheduler\n",
907                                         __func__, lcore_id);
908
909                 if (fdata->worker_core[lcore_id])
910                         printf(
911                                 "[%s()] lcore %d executing worker, using eventdev port %u\n",
912                                 __func__, lcore_id,
913                                 worker_data[worker_idx].port_id);
914
915                 err = rte_eal_remote_launch(worker, &worker_data[worker_idx],
916                                             lcore_id);
917                 if (err) {
918                         rte_panic("Failed to launch worker on core %d\n",
919                                         lcore_id);
920                         continue;
921                 }
922                 if (fdata->worker_core[lcore_id])
923                         worker_idx++;
924         }
925
926         lcore_id = rte_lcore_id();
927
928         if (core_in_use(lcore_id))
929                 worker(&worker_data[worker_idx++]);
930
931         rte_eal_mp_wait_lcore();
932
933         if (cdata.dump_dev)
934                 rte_event_dev_dump(dev_id, stdout);
935
936         if (!cdata.quiet && (port_stat(dev_id, worker_data[0].port_id) !=
937                         (uint64_t)-ENOTSUP)) {
938                 printf("\nPort Workload distribution:\n");
939                 uint32_t i;
940                 uint64_t tot_pkts = 0;
941                 uint64_t pkts_per_wkr[RTE_MAX_LCORE] = {0};
942                 for (i = 0; i < cdata.num_workers; i++) {
943                         pkts_per_wkr[i] =
944                                 port_stat(dev_id, worker_data[i].port_id);
945                         tot_pkts += pkts_per_wkr[i];
946                 }
947                 for (i = 0; i < cdata.num_workers; i++) {
948                         float pc = pkts_per_wkr[i]  * 100 /
949                                 ((float)tot_pkts);
950                         printf("worker %i :\t%.1f %% (%"PRIu64" pkts)\n",
951                                         i, pc, pkts_per_wkr[i]);
952                 }
953
954         }
955
956         return 0;
957 }