examples/eventdev: support Rx adapter
[dpdk.git] / examples / eventdev_pipeline_sw_pmd / main.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2017 Intel Corporation
3  */
4
5 #include <getopt.h>
6 #include <stdint.h>
7 #include <stdio.h>
8 #include <signal.h>
9 #include <sched.h>
10 #include <stdbool.h>
11
12 #include <rte_eal.h>
13 #include <rte_mempool.h>
14 #include <rte_mbuf.h>
15 #include <rte_launch.h>
16 #include <rte_malloc.h>
17 #include <rte_random.h>
18 #include <rte_cycles.h>
19 #include <rte_ethdev.h>
20 #include <rte_eventdev.h>
21 #include <rte_event_eth_rx_adapter.h>
22 #include <rte_service.h>
23
24 #define MAX_NUM_STAGES 8
25 #define BATCH_SIZE 16
26 #define MAX_NUM_CORE 64
27
28 struct cons_data {
29         uint8_t dev_id;
30         uint8_t port_id;
31         uint8_t release;
32 } __rte_cache_aligned;
33
34 static struct cons_data cons_data;
35
36 struct worker_data {
37         uint8_t dev_id;
38         uint8_t port_id;
39 } __rte_cache_aligned;
40
41 struct fastpath_data {
42         volatile int done;
43         uint32_t tx_lock;
44         uint32_t evdev_service_id;
45         uint32_t rxadptr_service_id;
46         bool rx_single;
47         bool tx_single;
48         bool sched_single;
49         unsigned int rx_core[MAX_NUM_CORE];
50         unsigned int tx_core[MAX_NUM_CORE];
51         unsigned int sched_core[MAX_NUM_CORE];
52         unsigned int worker_core[MAX_NUM_CORE];
53         struct rte_eth_dev_tx_buffer *tx_buf[RTE_MAX_ETHPORTS];
54 };
55
56 static struct fastpath_data *fdata;
57
58 struct config_data {
59         unsigned int active_cores;
60         unsigned int num_workers;
61         int64_t num_packets;
62         unsigned int num_fids;
63         int queue_type;
64         int worker_cycles;
65         int enable_queue_priorities;
66         int quiet;
67         int dump_dev;
68         int dump_dev_signal;
69         unsigned int num_stages;
70         unsigned int worker_cq_depth;
71         int16_t next_qid[MAX_NUM_STAGES+2];
72         int16_t qid[MAX_NUM_STAGES];
73         uint8_t rx_adapter_id;
74 };
75
76 static struct config_data cdata = {
77         .num_packets = (1L << 25), /* do ~32M packets */
78         .num_fids = 512,
79         .queue_type = RTE_SCHED_TYPE_ATOMIC,
80         .next_qid = {-1},
81         .qid = {-1},
82         .num_stages = 1,
83         .worker_cq_depth = 16
84 };
85
86 static bool
87 core_in_use(unsigned int lcore_id) {
88         return (fdata->rx_core[lcore_id] || fdata->sched_core[lcore_id] ||
89                 fdata->tx_core[lcore_id] || fdata->worker_core[lcore_id]);
90 }
91
92 static void
93 eth_tx_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
94                         void *userdata)
95 {
96         int port_id = (uintptr_t) userdata;
97         unsigned int _sent = 0;
98
99         do {
100                 /* Note: hard-coded TX queue */
101                 _sent += rte_eth_tx_burst(port_id, 0, &pkts[_sent],
102                                           unsent - _sent);
103         } while (_sent != unsent);
104 }
105
106 static int
107 consumer(void)
108 {
109         const uint64_t freq_khz = rte_get_timer_hz() / 1000;
110         struct rte_event packets[BATCH_SIZE];
111
112         static uint64_t received;
113         static uint64_t last_pkts;
114         static uint64_t last_time;
115         static uint64_t start_time;
116         unsigned int i, j;
117         uint8_t dev_id = cons_data.dev_id;
118         uint8_t port_id = cons_data.port_id;
119
120         uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
121                         packets, RTE_DIM(packets), 0);
122
123         if (n == 0) {
124                 for (j = 0; j < rte_eth_dev_count(); j++)
125                         rte_eth_tx_buffer_flush(j, 0, fdata->tx_buf[j]);
126                 return 0;
127         }
128         if (start_time == 0)
129                 last_time = start_time = rte_get_timer_cycles();
130
131         received += n;
132         for (i = 0; i < n; i++) {
133                 uint8_t outport = packets[i].mbuf->port;
134                 rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
135                                 packets[i].mbuf);
136
137                 packets[i].op = RTE_EVENT_OP_RELEASE;
138         }
139
140         if (cons_data.release) {
141                 uint16_t nb_tx;
142
143                 nb_tx = rte_event_enqueue_burst(dev_id, port_id, packets, n);
144                 while (nb_tx < n)
145                         nb_tx += rte_event_enqueue_burst(dev_id, port_id,
146                                                          packets + nb_tx,
147                                                          n - nb_tx);
148         }
149
150         /* Print out mpps every 1<22 packets */
151         if (!cdata.quiet && received >= last_pkts + (1<<22)) {
152                 const uint64_t now = rte_get_timer_cycles();
153                 const uint64_t total_ms = (now - start_time) / freq_khz;
154                 const uint64_t delta_ms = (now - last_time) / freq_khz;
155                 uint64_t delta_pkts = received - last_pkts;
156
157                 printf("# consumer RX=%"PRIu64", time %"PRIu64 "ms, "
158                         "avg %.3f mpps [current %.3f mpps]\n",
159                                 received,
160                                 total_ms,
161                                 received / (total_ms * 1000.0),
162                                 delta_pkts / (delta_ms * 1000.0));
163                 last_pkts = received;
164                 last_time = now;
165         }
166
167         cdata.num_packets -= n;
168         if (cdata.num_packets <= 0)
169                 fdata->done = 1;
170
171         return 0;
172 }
173
174 static inline void
175 schedule_devices(unsigned int lcore_id)
176 {
177         if (fdata->rx_core[lcore_id]) {
178                 rte_service_run_iter_on_app_lcore(fdata->rxadptr_service_id,
179                                 !fdata->rx_single);
180         }
181
182         if (fdata->sched_core[lcore_id]) {
183                 rte_service_run_iter_on_app_lcore(fdata->evdev_service_id,
184                                 !fdata->sched_single);
185                 if (cdata.dump_dev_signal) {
186                         rte_event_dev_dump(0, stdout);
187                         cdata.dump_dev_signal = 0;
188                 }
189         }
190
191         if (fdata->tx_core[lcore_id] && (fdata->tx_single ||
192             rte_atomic32_cmpset(&(fdata->tx_lock), 0, 1))) {
193                 consumer();
194                 rte_atomic32_clear((rte_atomic32_t *)&(fdata->tx_lock));
195         }
196 }
197
198 static inline void
199 work(struct rte_mbuf *m)
200 {
201         struct ether_hdr *eth;
202         struct ether_addr addr;
203
204         /* change mac addresses on packet (to use mbuf data) */
205         /*
206          * FIXME Swap mac address properly and also handle the
207          * case for both odd and even number of stages that the
208          * addresses end up the same at the end of the pipeline
209          */
210         eth = rte_pktmbuf_mtod(m, struct ether_hdr *);
211         ether_addr_copy(&eth->d_addr, &addr);
212         ether_addr_copy(&addr, &eth->d_addr);
213
214         /* do a number of cycles of work per packet */
215         volatile uint64_t start_tsc = rte_rdtsc();
216         while (rte_rdtsc() < start_tsc + cdata.worker_cycles)
217                 rte_pause();
218 }
219
220 static int
221 worker(void *arg)
222 {
223         struct rte_event events[BATCH_SIZE];
224
225         struct worker_data *data = (struct worker_data *)arg;
226         uint8_t dev_id = data->dev_id;
227         uint8_t port_id = data->port_id;
228         size_t sent = 0, received = 0;
229         unsigned int lcore_id = rte_lcore_id();
230
231         while (!fdata->done) {
232                 uint16_t i;
233
234                 schedule_devices(lcore_id);
235
236                 if (!fdata->worker_core[lcore_id]) {
237                         rte_pause();
238                         continue;
239                 }
240
241                 const uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
242                                 events, RTE_DIM(events), 0);
243
244                 if (nb_rx == 0) {
245                         rte_pause();
246                         continue;
247                 }
248                 received += nb_rx;
249
250                 for (i = 0; i < nb_rx; i++) {
251
252                         /* The first worker stage does classification */
253                         if (events[i].queue_id == cdata.qid[0])
254                                 events[i].flow_id = events[i].mbuf->hash.rss
255                                                         % cdata.num_fids;
256
257                         events[i].queue_id = cdata.next_qid[events[i].queue_id];
258                         events[i].op = RTE_EVENT_OP_FORWARD;
259                         events[i].sched_type = cdata.queue_type;
260
261                         work(events[i].mbuf);
262                 }
263                 uint16_t nb_tx = rte_event_enqueue_burst(dev_id, port_id,
264                                 events, nb_rx);
265                 while (nb_tx < nb_rx && !fdata->done)
266                         nb_tx += rte_event_enqueue_burst(dev_id, port_id,
267                                                         events + nb_tx,
268                                                         nb_rx - nb_tx);
269                 sent += nb_tx;
270         }
271
272         if (!cdata.quiet)
273                 printf("  worker %u thread done. RX=%zu TX=%zu\n",
274                                 rte_lcore_id(), received, sent);
275
276         return 0;
277 }
278
279 /*
280  * Parse the coremask given as argument (hexadecimal string) and fill
281  * the global configuration (core role and core count) with the parsed
282  * value.
283  */
284 static int xdigit2val(unsigned char c)
285 {
286         int val;
287
288         if (isdigit(c))
289                 val = c - '0';
290         else if (isupper(c))
291                 val = c - 'A' + 10;
292         else
293                 val = c - 'a' + 10;
294         return val;
295 }
296
297 static uint64_t
298 parse_coremask(const char *coremask)
299 {
300         int i, j, idx = 0;
301         unsigned int count = 0;
302         char c;
303         int val;
304         uint64_t mask = 0;
305         const int32_t BITS_HEX = 4;
306
307         if (coremask == NULL)
308                 return -1;
309         /* Remove all blank characters ahead and after .
310          * Remove 0x/0X if exists.
311          */
312         while (isblank(*coremask))
313                 coremask++;
314         if (coremask[0] == '0' && ((coremask[1] == 'x')
315                 || (coremask[1] == 'X')))
316                 coremask += 2;
317         i = strlen(coremask);
318         while ((i > 0) && isblank(coremask[i - 1]))
319                 i--;
320         if (i == 0)
321                 return -1;
322
323         for (i = i - 1; i >= 0 && idx < MAX_NUM_CORE; i--) {
324                 c = coremask[i];
325                 if (isxdigit(c) == 0) {
326                         /* invalid characters */
327                         return -1;
328                 }
329                 val = xdigit2val(c);
330                 for (j = 0; j < BITS_HEX && idx < MAX_NUM_CORE; j++, idx++) {
331                         if ((1 << j) & val) {
332                                 mask |= (1UL << idx);
333                                 count++;
334                         }
335                 }
336         }
337         for (; i >= 0; i--)
338                 if (coremask[i] != '0')
339                         return -1;
340         if (count == 0)
341                 return -1;
342         return mask;
343 }
344
345 static struct option long_options[] = {
346         {"workers", required_argument, 0, 'w'},
347         {"packets", required_argument, 0, 'n'},
348         {"atomic-flows", required_argument, 0, 'f'},
349         {"num_stages", required_argument, 0, 's'},
350         {"rx-mask", required_argument, 0, 'r'},
351         {"tx-mask", required_argument, 0, 't'},
352         {"sched-mask", required_argument, 0, 'e'},
353         {"cq-depth", required_argument, 0, 'c'},
354         {"work-cycles", required_argument, 0, 'W'},
355         {"queue-priority", no_argument, 0, 'P'},
356         {"parallel", no_argument, 0, 'p'},
357         {"ordered", no_argument, 0, 'o'},
358         {"quiet", no_argument, 0, 'q'},
359         {"dump", no_argument, 0, 'D'},
360         {0, 0, 0, 0}
361 };
362
363 static void
364 usage(void)
365 {
366         const char *usage_str =
367                 "  Usage: eventdev_demo [options]\n"
368                 "  Options:\n"
369                 "  -n, --packets=N              Send N packets (default ~32M), 0 implies no limit\n"
370                 "  -f, --atomic-flows=N         Use N random flows from 1 to N (default 16)\n"
371                 "  -s, --num_stages=N           Use N atomic stages (default 1)\n"
372                 "  -r, --rx-mask=core mask      Run NIC rx on CPUs in core mask\n"
373                 "  -w, --worker-mask=core mask  Run worker on CPUs in core mask\n"
374                 "  -t, --tx-mask=core mask      Run NIC tx on CPUs in core mask\n"
375                 "  -e  --sched-mask=core mask   Run scheduler on CPUs in core mask\n"
376                 "  -c  --cq-depth=N             Worker CQ depth (default 16)\n"
377                 "  -W  --work-cycles=N          Worker cycles (default 0)\n"
378                 "  -P  --queue-priority         Enable scheduler queue prioritization\n"
379                 "  -o, --ordered                Use ordered scheduling\n"
380                 "  -p, --parallel               Use parallel scheduling\n"
381                 "  -q, --quiet                  Minimize printed output\n"
382                 "  -D, --dump                   Print detailed statistics before exit"
383                 "\n";
384         fprintf(stderr, "%s", usage_str);
385         exit(1);
386 }
387
388 static void
389 parse_app_args(int argc, char **argv)
390 {
391         /* Parse cli options*/
392         int option_index;
393         int c;
394         opterr = 0;
395         uint64_t rx_lcore_mask = 0;
396         uint64_t tx_lcore_mask = 0;
397         uint64_t sched_lcore_mask = 0;
398         uint64_t worker_lcore_mask = 0;
399         int i;
400
401         for (;;) {
402                 c = getopt_long(argc, argv, "r:t:e:c:w:n:f:s:poPqDW:",
403                                 long_options, &option_index);
404                 if (c == -1)
405                         break;
406
407                 int popcnt = 0;
408                 switch (c) {
409                 case 'n':
410                         cdata.num_packets = (int64_t)atol(optarg);
411                         if (cdata.num_packets == 0)
412                                 cdata.num_packets = INT64_MAX;
413                         break;
414                 case 'f':
415                         cdata.num_fids = (unsigned int)atoi(optarg);
416                         break;
417                 case 's':
418                         cdata.num_stages = (unsigned int)atoi(optarg);
419                         break;
420                 case 'c':
421                         cdata.worker_cq_depth = (unsigned int)atoi(optarg);
422                         break;
423                 case 'W':
424                         cdata.worker_cycles = (unsigned int)atoi(optarg);
425                         break;
426                 case 'P':
427                         cdata.enable_queue_priorities = 1;
428                         break;
429                 case 'o':
430                         cdata.queue_type = RTE_SCHED_TYPE_ORDERED;
431                         break;
432                 case 'p':
433                         cdata.queue_type = RTE_SCHED_TYPE_PARALLEL;
434                         break;
435                 case 'q':
436                         cdata.quiet = 1;
437                         break;
438                 case 'D':
439                         cdata.dump_dev = 1;
440                         break;
441                 case 'w':
442                         worker_lcore_mask = parse_coremask(optarg);
443                         break;
444                 case 'r':
445                         rx_lcore_mask = parse_coremask(optarg);
446                         popcnt = __builtin_popcountll(rx_lcore_mask);
447                         fdata->rx_single = (popcnt == 1);
448                         break;
449                 case 't':
450                         tx_lcore_mask = parse_coremask(optarg);
451                         popcnt = __builtin_popcountll(tx_lcore_mask);
452                         fdata->tx_single = (popcnt == 1);
453                         break;
454                 case 'e':
455                         sched_lcore_mask = parse_coremask(optarg);
456                         popcnt = __builtin_popcountll(sched_lcore_mask);
457                         fdata->sched_single = (popcnt == 1);
458                         break;
459                 default:
460                         usage();
461                 }
462         }
463
464         if (worker_lcore_mask == 0 || rx_lcore_mask == 0 ||
465             sched_lcore_mask == 0 || tx_lcore_mask == 0) {
466                 printf("Core part of pipeline was not assigned any cores. "
467                         "This will stall the pipeline, please check core masks "
468                         "(use -h for details on setting core masks):\n"
469                         "\trx: %"PRIu64"\n\ttx: %"PRIu64"\n\tsched: %"PRIu64
470                         "\n\tworkers: %"PRIu64"\n",
471                         rx_lcore_mask, tx_lcore_mask, sched_lcore_mask,
472                         worker_lcore_mask);
473                 rte_exit(-1, "Fix core masks\n");
474         }
475         if (cdata.num_stages == 0 || cdata.num_stages > MAX_NUM_STAGES)
476                 usage();
477
478         for (i = 0; i < MAX_NUM_CORE; i++) {
479                 fdata->rx_core[i] = !!(rx_lcore_mask & (1UL << i));
480                 fdata->tx_core[i] = !!(tx_lcore_mask & (1UL << i));
481                 fdata->sched_core[i] = !!(sched_lcore_mask & (1UL << i));
482                 fdata->worker_core[i] = !!(worker_lcore_mask & (1UL << i));
483
484                 if (fdata->worker_core[i])
485                         cdata.num_workers++;
486                 if (core_in_use(i))
487                         cdata.active_cores++;
488         }
489 }
490
491 static inline void
492 init_rx_adapter(uint16_t nb_ports)
493 {
494         int i;
495         int ret;
496         uint8_t evdev_id = 0;
497         struct rte_event_dev_info dev_info;
498
499         ret = rte_event_dev_info_get(evdev_id, &dev_info);
500
501         struct rte_event_port_conf rx_p_conf = {
502                 .dequeue_depth = 8,
503                 .enqueue_depth = 8,
504                 .new_event_threshold = 1200,
505         };
506
507         if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
508                 rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth;
509         if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
510                 rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth;
511
512         ret = rte_event_eth_rx_adapter_create(cdata.rx_adapter_id, evdev_id,
513                         &rx_p_conf);
514         if (ret)
515                 rte_exit(EXIT_FAILURE, "failed to create rx adapter[%d]",
516                                 cdata.rx_adapter_id);
517
518         struct rte_event_eth_rx_adapter_queue_conf queue_conf = {
519                 .ev.sched_type = cdata.queue_type,
520                 .ev.queue_id = cdata.qid[0],
521         };
522
523         for (i = 0; i < nb_ports; i++) {
524                 uint32_t cap;
525
526                 ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap);
527                 if (ret)
528                         rte_exit(EXIT_FAILURE,
529                                         "failed to get event rx adapter "
530                                         "capabilities");
531
532                 ret = rte_event_eth_rx_adapter_queue_add(cdata.rx_adapter_id, i,
533                                 -1, &queue_conf);
534                 if (ret)
535                         rte_exit(EXIT_FAILURE,
536                                         "Failed to add queues to Rx adapter");
537         }
538
539         ret = rte_event_eth_rx_adapter_service_id_get(cdata.rx_adapter_id,
540                                 &fdata->rxadptr_service_id);
541         if (ret != -ESRCH && ret != 0) {
542                 rte_exit(EXIT_FAILURE,
543                         "Error getting the service ID for Rx adapter\n");
544         }
545         rte_service_runstate_set(fdata->rxadptr_service_id, 1);
546         rte_service_set_runstate_mapped_check(fdata->rxadptr_service_id, 0);
547
548         ret = rte_event_eth_rx_adapter_start(cdata.rx_adapter_id);
549         if (ret)
550                 rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed",
551                                 cdata.rx_adapter_id);
552
553 }
554
555 /*
556  * Initializes a given port using global settings and with the RX buffers
557  * coming from the mbuf_pool passed as a parameter.
558  */
559 static inline int
560 port_init(uint8_t port, struct rte_mempool *mbuf_pool)
561 {
562         static const struct rte_eth_conf port_conf_default = {
563                 .rxmode = {
564                         .mq_mode = ETH_MQ_RX_RSS,
565                         .max_rx_pkt_len = ETHER_MAX_LEN,
566                         .ignore_offload_bitfield = 1,
567                 },
568                 .rx_adv_conf = {
569                         .rss_conf = {
570                                 .rss_hf = ETH_RSS_IP |
571                                           ETH_RSS_TCP |
572                                           ETH_RSS_UDP,
573                         }
574                 }
575         };
576         const uint16_t rx_rings = 1, tx_rings = 1;
577         const uint16_t rx_ring_size = 512, tx_ring_size = 512;
578         struct rte_eth_conf port_conf = port_conf_default;
579         int retval;
580         uint16_t q;
581         struct rte_eth_dev_info dev_info;
582         struct rte_eth_txconf txconf;
583
584         if (port >= rte_eth_dev_count())
585                 return -1;
586
587         rte_eth_dev_info_get(port, &dev_info);
588         if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
589                 port_conf.txmode.offloads |=
590                         DEV_TX_OFFLOAD_MBUF_FAST_FREE;
591
592         /* Configure the Ethernet device. */
593         retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf);
594         if (retval != 0)
595                 return retval;
596
597         /* Allocate and set up 1 RX queue per Ethernet port. */
598         for (q = 0; q < rx_rings; q++) {
599                 retval = rte_eth_rx_queue_setup(port, q, rx_ring_size,
600                                 rte_eth_dev_socket_id(port), NULL, mbuf_pool);
601                 if (retval < 0)
602                         return retval;
603         }
604
605         txconf = dev_info.default_txconf;
606         txconf.txq_flags = ETH_TXQ_FLAGS_IGNORE;
607         txconf.offloads = port_conf_default.txmode.offloads;
608         /* Allocate and set up 1 TX queue per Ethernet port. */
609         for (q = 0; q < tx_rings; q++) {
610                 retval = rte_eth_tx_queue_setup(port, q, tx_ring_size,
611                                 rte_eth_dev_socket_id(port), &txconf);
612                 if (retval < 0)
613                         return retval;
614         }
615
616         /* Start the Ethernet port. */
617         retval = rte_eth_dev_start(port);
618         if (retval < 0)
619                 return retval;
620
621         /* Display the port MAC address. */
622         struct ether_addr addr;
623         rte_eth_macaddr_get(port, &addr);
624         printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
625                            " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
626                         (unsigned int)port,
627                         addr.addr_bytes[0], addr.addr_bytes[1],
628                         addr.addr_bytes[2], addr.addr_bytes[3],
629                         addr.addr_bytes[4], addr.addr_bytes[5]);
630
631         /* Enable RX in promiscuous mode for the Ethernet device. */
632         rte_eth_promiscuous_enable(port);
633
634         return 0;
635 }
636
637 static int
638 init_ports(unsigned int num_ports)
639 {
640         uint8_t portid;
641         unsigned int i;
642
643         struct rte_mempool *mp = rte_pktmbuf_pool_create("packet_pool",
644                         /* mbufs */ 16384 * num_ports,
645                         /* cache_size */ 512,
646                         /* priv_size*/ 0,
647                         /* data_room_size */ RTE_MBUF_DEFAULT_BUF_SIZE,
648                         rte_socket_id());
649
650         for (portid = 0; portid < num_ports; portid++)
651                 if (port_init(portid, mp) != 0)
652                         rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8 "\n",
653                                         portid);
654
655         for (i = 0; i < num_ports; i++) {
656                 void *userdata = (void *)(uintptr_t) i;
657                 fdata->tx_buf[i] =
658                         rte_malloc(NULL, RTE_ETH_TX_BUFFER_SIZE(32), 0);
659                 if (fdata->tx_buf[i] == NULL)
660                         rte_panic("Out of memory\n");
661                 rte_eth_tx_buffer_init(fdata->tx_buf[i], 32);
662                 rte_eth_tx_buffer_set_err_callback(fdata->tx_buf[i],
663                                                    eth_tx_buffer_retry,
664                                                    userdata);
665         }
666
667         return 0;
668 }
669
670 struct port_link {
671         uint8_t queue_id;
672         uint8_t priority;
673 };
674
675 static int
676 setup_eventdev(struct cons_data *cons_data,
677                 struct worker_data *worker_data)
678 {
679         const uint8_t dev_id = 0;
680         /* +1 stages is for a SINGLE_LINK TX stage */
681         const uint8_t nb_queues = cdata.num_stages + 1;
682         /* + 1 for consumer */
683         const uint8_t nb_ports = cdata.num_workers + 1;
684         struct rte_event_dev_config config = {
685                         .nb_event_queues = nb_queues,
686                         .nb_event_ports = nb_ports,
687                         .nb_events_limit  = 4096,
688                         .nb_event_queue_flows = 1024,
689                         .nb_event_port_dequeue_depth = 128,
690                         .nb_event_port_enqueue_depth = 128,
691         };
692         struct rte_event_port_conf wkr_p_conf = {
693                         .dequeue_depth = cdata.worker_cq_depth,
694                         .enqueue_depth = 64,
695                         .new_event_threshold = 4096,
696         };
697         struct rte_event_queue_conf wkr_q_conf = {
698                         .schedule_type = cdata.queue_type,
699                         .priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
700                         .nb_atomic_flows = 1024,
701                         .nb_atomic_order_sequences = 1024,
702         };
703         struct rte_event_port_conf tx_p_conf = {
704                         .dequeue_depth = 128,
705                         .enqueue_depth = 128,
706                         .new_event_threshold = 4096,
707         };
708         const struct rte_event_queue_conf tx_q_conf = {
709                         .priority = RTE_EVENT_DEV_PRIORITY_HIGHEST,
710                         .event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_LINK,
711         };
712
713         struct port_link worker_queues[MAX_NUM_STAGES];
714         uint8_t disable_implicit_release;
715         struct port_link tx_queue;
716         unsigned int i;
717
718         int ret, ndev = rte_event_dev_count();
719         if (ndev < 1) {
720                 printf("%d: No Eventdev Devices Found\n", __LINE__);
721                 return -1;
722         }
723
724         struct rte_event_dev_info dev_info;
725         ret = rte_event_dev_info_get(dev_id, &dev_info);
726         printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name);
727
728         disable_implicit_release = (dev_info.event_dev_cap &
729                                     RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE);
730
731         wkr_p_conf.disable_implicit_release = disable_implicit_release;
732         tx_p_conf.disable_implicit_release = disable_implicit_release;
733
734         if (dev_info.max_event_port_dequeue_depth <
735                         config.nb_event_port_dequeue_depth)
736                 config.nb_event_port_dequeue_depth =
737                                 dev_info.max_event_port_dequeue_depth;
738         if (dev_info.max_event_port_enqueue_depth <
739                         config.nb_event_port_enqueue_depth)
740                 config.nb_event_port_enqueue_depth =
741                                 dev_info.max_event_port_enqueue_depth;
742
743         ret = rte_event_dev_configure(dev_id, &config);
744         if (ret < 0) {
745                 printf("%d: Error configuring device\n", __LINE__);
746                 return -1;
747         }
748
749         /* Q creation - one load balanced per pipeline stage*/
750         printf("  Stages:\n");
751         for (i = 0; i < cdata.num_stages; i++) {
752                 if (rte_event_queue_setup(dev_id, i, &wkr_q_conf) < 0) {
753                         printf("%d: error creating qid %d\n", __LINE__, i);
754                         return -1;
755                 }
756                 cdata.qid[i] = i;
757                 cdata.next_qid[i] = i+1;
758                 worker_queues[i].queue_id = i;
759                 if (cdata.enable_queue_priorities) {
760                         /* calculate priority stepping for each stage, leaving
761                          * headroom of 1 for the SINGLE_LINK TX below
762                          */
763                         const uint32_t prio_delta =
764                                 (RTE_EVENT_DEV_PRIORITY_LOWEST-1) /  nb_queues;
765
766                         /* higher priority for queues closer to tx */
767                         wkr_q_conf.priority =
768                                 RTE_EVENT_DEV_PRIORITY_LOWEST - prio_delta * i;
769                 }
770
771                 const char *type_str = "Atomic";
772                 switch (wkr_q_conf.schedule_type) {
773                 case RTE_SCHED_TYPE_ORDERED:
774                         type_str = "Ordered";
775                         break;
776                 case RTE_SCHED_TYPE_PARALLEL:
777                         type_str = "Parallel";
778                         break;
779                 }
780                 printf("\tStage %d, Type %s\tPriority = %d\n", i, type_str,
781                                 wkr_q_conf.priority);
782         }
783         printf("\n");
784
785         /* final queue for sending to TX core */
786         if (rte_event_queue_setup(dev_id, i, &tx_q_conf) < 0) {
787                 printf("%d: error creating qid %d\n", __LINE__, i);
788                 return -1;
789         }
790         tx_queue.queue_id = i;
791         tx_queue.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST;
792
793         if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
794                 wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
795         if (wkr_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
796                 wkr_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
797
798         /* set up one port per worker, linking to all stage queues */
799         for (i = 0; i < cdata.num_workers; i++) {
800                 struct worker_data *w = &worker_data[i];
801                 w->dev_id = dev_id;
802                 if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) {
803                         printf("Error setting up port %d\n", i);
804                         return -1;
805                 }
806
807                 uint32_t s;
808                 for (s = 0; s < cdata.num_stages; s++) {
809                         if (rte_event_port_link(dev_id, i,
810                                                 &worker_queues[s].queue_id,
811                                                 &worker_queues[s].priority,
812                                                 1) != 1) {
813                                 printf("%d: error creating link for port %d\n",
814                                                 __LINE__, i);
815                                 return -1;
816                         }
817                 }
818                 w->port_id = i;
819         }
820
821         if (tx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
822                 tx_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
823         if (tx_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
824                 tx_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
825
826         /* port for consumer, linked to TX queue */
827         if (rte_event_port_setup(dev_id, i, &tx_p_conf) < 0) {
828                 printf("Error setting up port %d\n", i);
829                 return -1;
830         }
831         if (rte_event_port_link(dev_id, i, &tx_queue.queue_id,
832                                 &tx_queue.priority, 1) != 1) {
833                 printf("%d: error creating link for port %d\n",
834                                 __LINE__, i);
835                 return -1;
836         }
837         *cons_data = (struct cons_data){.dev_id = dev_id,
838                                         .port_id = i,
839                                         .release = disable_implicit_release };
840
841         ret = rte_event_dev_service_id_get(dev_id,
842                                 &fdata->evdev_service_id);
843         if (ret != -ESRCH && ret != 0) {
844                 printf("Error getting the service ID for sw eventdev\n");
845                 return -1;
846         }
847         rte_service_runstate_set(fdata->evdev_service_id, 1);
848         rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0);
849         if (rte_event_dev_start(dev_id) < 0) {
850                 printf("Error starting eventdev\n");
851                 return -1;
852         }
853
854         return dev_id;
855 }
856
857 static void
858 signal_handler(int signum)
859 {
860         if (fdata->done)
861                 rte_exit(1, "Exiting on signal %d\n", signum);
862         if (signum == SIGINT || signum == SIGTERM) {
863                 printf("\n\nSignal %d received, preparing to exit...\n",
864                                 signum);
865                 fdata->done = 1;
866         }
867         if (signum == SIGTSTP)
868                 rte_event_dev_dump(0, stdout);
869 }
870
871 static inline uint64_t
872 port_stat(int dev_id, int32_t p)
873 {
874         char statname[64];
875         snprintf(statname, sizeof(statname), "port_%u_rx", p);
876         return rte_event_dev_xstats_by_name_get(dev_id, statname, NULL);
877 }
878
879 int
880 main(int argc, char **argv)
881 {
882         struct worker_data *worker_data;
883         unsigned int num_ports;
884         int lcore_id;
885         int err;
886
887         signal(SIGINT, signal_handler);
888         signal(SIGTERM, signal_handler);
889         signal(SIGTSTP, signal_handler);
890
891         err = rte_eal_init(argc, argv);
892         if (err < 0)
893                 rte_panic("Invalid EAL arguments\n");
894
895         argc -= err;
896         argv += err;
897
898         fdata = rte_malloc(NULL, sizeof(struct fastpath_data), 0);
899         if (fdata == NULL)
900                 rte_panic("Out of memory\n");
901
902         /* Parse cli options*/
903         parse_app_args(argc, argv);
904
905         num_ports = rte_eth_dev_count();
906         if (num_ports == 0)
907                 rte_panic("No ethernet ports found\n");
908
909         const unsigned int cores_needed = cdata.active_cores;
910
911         if (!cdata.quiet) {
912                 printf("  Config:\n");
913                 printf("\tports: %u\n", num_ports);
914                 printf("\tworkers: %u\n", cdata.num_workers);
915                 printf("\tpackets: %"PRIi64"\n", cdata.num_packets);
916                 printf("\tQueue-prio: %u\n", cdata.enable_queue_priorities);
917                 if (cdata.queue_type == RTE_SCHED_TYPE_ORDERED)
918                         printf("\tqid0 type: ordered\n");
919                 if (cdata.queue_type == RTE_SCHED_TYPE_ATOMIC)
920                         printf("\tqid0 type: atomic\n");
921                 printf("\tCores available: %u\n", rte_lcore_count());
922                 printf("\tCores used: %u\n", cores_needed);
923         }
924
925         if (rte_lcore_count() < cores_needed)
926                 rte_panic("Too few cores (%d < %d)\n", rte_lcore_count(),
927                                 cores_needed);
928
929         const unsigned int ndevs = rte_event_dev_count();
930         if (ndevs == 0)
931                 rte_panic("No dev_id devs found. Pasl in a --vdev eventdev.\n");
932         if (ndevs > 1)
933                 fprintf(stderr, "Warning: More than one eventdev, using idx 0");
934
935         worker_data = rte_calloc(0, cdata.num_workers,
936                         sizeof(worker_data[0]), 0);
937         if (worker_data == NULL)
938                 rte_panic("rte_calloc failed\n");
939
940         int dev_id = setup_eventdev(&cons_data, worker_data);
941         if (dev_id < 0)
942                 rte_exit(EXIT_FAILURE, "Error setting up eventdev\n");
943
944         init_ports(num_ports);
945         init_rx_adapter(num_ports);
946
947         int worker_idx = 0;
948         RTE_LCORE_FOREACH_SLAVE(lcore_id) {
949                 if (lcore_id >= MAX_NUM_CORE)
950                         break;
951
952                 if (!fdata->rx_core[lcore_id] &&
953                         !fdata->worker_core[lcore_id] &&
954                         !fdata->tx_core[lcore_id] &&
955                         !fdata->sched_core[lcore_id])
956                         continue;
957
958                 if (fdata->rx_core[lcore_id])
959                         printf(
960                                 "[%s()] lcore %d executing NIC Rx\n",
961                                 __func__, lcore_id);
962
963                 if (fdata->tx_core[lcore_id])
964                         printf(
965                                 "[%s()] lcore %d executing NIC Tx, and using eventdev port %u\n",
966                                 __func__, lcore_id, cons_data.port_id);
967
968                 if (fdata->sched_core[lcore_id])
969                         printf("[%s()] lcore %d executing scheduler\n",
970                                         __func__, lcore_id);
971
972                 if (fdata->worker_core[lcore_id])
973                         printf(
974                                 "[%s()] lcore %d executing worker, using eventdev port %u\n",
975                                 __func__, lcore_id,
976                                 worker_data[worker_idx].port_id);
977
978                 err = rte_eal_remote_launch(worker, &worker_data[worker_idx],
979                                             lcore_id);
980                 if (err) {
981                         rte_panic("Failed to launch worker on core %d\n",
982                                         lcore_id);
983                         continue;
984                 }
985                 if (fdata->worker_core[lcore_id])
986                         worker_idx++;
987         }
988
989         lcore_id = rte_lcore_id();
990
991         if (core_in_use(lcore_id))
992                 worker(&worker_data[worker_idx++]);
993
994         rte_eal_mp_wait_lcore();
995
996         if (cdata.dump_dev)
997                 rte_event_dev_dump(dev_id, stdout);
998
999         if (!cdata.quiet && (port_stat(dev_id, worker_data[0].port_id) !=
1000                         (uint64_t)-ENOTSUP)) {
1001                 printf("\nPort Workload distribution:\n");
1002                 uint32_t i;
1003                 uint64_t tot_pkts = 0;
1004                 uint64_t pkts_per_wkr[RTE_MAX_LCORE] = {0};
1005                 for (i = 0; i < cdata.num_workers; i++) {
1006                         pkts_per_wkr[i] =
1007                                 port_stat(dev_id, worker_data[i].port_id);
1008                         tot_pkts += pkts_per_wkr[i];
1009                 }
1010                 for (i = 0; i < cdata.num_workers; i++) {
1011                         float pc = pkts_per_wkr[i]  * 100 /
1012                                 ((float)tot_pkts);
1013                         printf("worker %i :\t%.1f %% (%"PRIu64" pkts)\n",
1014                                         i, pc, pkts_per_wkr[i]);
1015                 }
1016
1017         }
1018
1019         return 0;
1020 }