fa16636ad00e2246c8798ff785be357b5ecb444a
[dpdk.git] / examples / eventdev_pipeline_sw_pmd / main.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2017 Intel Corporation
3  */
4
5 #include <getopt.h>
6 #include <stdint.h>
7 #include <stdio.h>
8 #include <signal.h>
9 #include <sched.h>
10 #include <stdbool.h>
11
12 #include <rte_eal.h>
13 #include <rte_mempool.h>
14 #include <rte_mbuf.h>
15 #include <rte_launch.h>
16 #include <rte_malloc.h>
17 #include <rte_random.h>
18 #include <rte_cycles.h>
19 #include <rte_ethdev.h>
20 #include <rte_eventdev.h>
21 #include <rte_service.h>
22
23 #define MAX_NUM_STAGES 8
24 #define BATCH_SIZE 16
25 #define MAX_NUM_CORE 64
26
27 struct prod_data {
28         uint8_t dev_id;
29         uint8_t port_id;
30         int32_t qid;
31         unsigned int num_nic_ports;
32 } __rte_cache_aligned;
33
34 struct cons_data {
35         uint8_t dev_id;
36         uint8_t port_id;
37 } __rte_cache_aligned;
38
39 static struct prod_data prod_data;
40 static struct cons_data cons_data;
41
42 struct worker_data {
43         uint8_t dev_id;
44         uint8_t port_id;
45 } __rte_cache_aligned;
46
47 struct fastpath_data {
48         volatile int done;
49         uint32_t rx_lock;
50         uint32_t tx_lock;
51         uint32_t sched_lock;
52         uint32_t evdev_service_id;
53         bool rx_single;
54         bool tx_single;
55         bool sched_single;
56         unsigned int rx_core[MAX_NUM_CORE];
57         unsigned int tx_core[MAX_NUM_CORE];
58         unsigned int sched_core[MAX_NUM_CORE];
59         unsigned int worker_core[MAX_NUM_CORE];
60         struct rte_eth_dev_tx_buffer *tx_buf[RTE_MAX_ETHPORTS];
61 };
62
63 static struct fastpath_data *fdata;
64
65 struct config_data {
66         unsigned int active_cores;
67         unsigned int num_workers;
68         int64_t num_packets;
69         unsigned int num_fids;
70         int queue_type;
71         int worker_cycles;
72         int enable_queue_priorities;
73         int quiet;
74         int dump_dev;
75         int dump_dev_signal;
76         unsigned int num_stages;
77         unsigned int worker_cq_depth;
78         int16_t next_qid[MAX_NUM_STAGES+2];
79         int16_t qid[MAX_NUM_STAGES];
80 };
81
82 static struct config_data cdata = {
83         .num_packets = (1L << 25), /* do ~32M packets */
84         .num_fids = 512,
85         .queue_type = RTE_SCHED_TYPE_ATOMIC,
86         .next_qid = {-1},
87         .qid = {-1},
88         .num_stages = 1,
89         .worker_cq_depth = 16
90 };
91
92 static bool
93 core_in_use(unsigned int lcore_id) {
94         return (fdata->rx_core[lcore_id] || fdata->sched_core[lcore_id] ||
95                 fdata->tx_core[lcore_id] || fdata->worker_core[lcore_id]);
96 }
97
98 static void
99 eth_tx_buffer_retry(struct rte_mbuf **pkts, uint16_t unsent,
100                         void *userdata)
101 {
102         int port_id = (uintptr_t) userdata;
103         unsigned int _sent = 0;
104
105         do {
106                 /* Note: hard-coded TX queue */
107                 _sent += rte_eth_tx_burst(port_id, 0, &pkts[_sent],
108                                           unsent - _sent);
109         } while (_sent != unsent);
110 }
111
112 static int
113 consumer(void)
114 {
115         const uint64_t freq_khz = rte_get_timer_hz() / 1000;
116         struct rte_event packets[BATCH_SIZE];
117
118         static uint64_t received;
119         static uint64_t last_pkts;
120         static uint64_t last_time;
121         static uint64_t start_time;
122         unsigned int i, j;
123         uint8_t dev_id = cons_data.dev_id;
124         uint8_t port_id = cons_data.port_id;
125
126         uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
127                         packets, RTE_DIM(packets), 0);
128
129         if (n == 0) {
130                 for (j = 0; j < rte_eth_dev_count(); j++)
131                         rte_eth_tx_buffer_flush(j, 0, fdata->tx_buf[j]);
132                 return 0;
133         }
134         if (start_time == 0)
135                 last_time = start_time = rte_get_timer_cycles();
136
137         received += n;
138         for (i = 0; i < n; i++) {
139                 uint8_t outport = packets[i].mbuf->port;
140                 rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
141                                 packets[i].mbuf);
142         }
143
144         /* Print out mpps every 1<22 packets */
145         if (!cdata.quiet && received >= last_pkts + (1<<22)) {
146                 const uint64_t now = rte_get_timer_cycles();
147                 const uint64_t total_ms = (now - start_time) / freq_khz;
148                 const uint64_t delta_ms = (now - last_time) / freq_khz;
149                 uint64_t delta_pkts = received - last_pkts;
150
151                 printf("# consumer RX=%"PRIu64", time %"PRIu64 "ms, "
152                         "avg %.3f mpps [current %.3f mpps]\n",
153                                 received,
154                                 total_ms,
155                                 received / (total_ms * 1000.0),
156                                 delta_pkts / (delta_ms * 1000.0));
157                 last_pkts = received;
158                 last_time = now;
159         }
160
161         cdata.num_packets -= n;
162         if (cdata.num_packets <= 0)
163                 fdata->done = 1;
164
165         return 0;
166 }
167
168 static int
169 producer(void)
170 {
171         static uint8_t eth_port;
172         struct rte_mbuf *mbufs[BATCH_SIZE+2];
173         struct rte_event ev[BATCH_SIZE+2];
174         uint32_t i, num_ports = prod_data.num_nic_ports;
175         int32_t qid = prod_data.qid;
176         uint8_t dev_id = prod_data.dev_id;
177         uint8_t port_id = prod_data.port_id;
178         uint32_t prio_idx = 0;
179
180         const uint16_t nb_rx = rte_eth_rx_burst(eth_port, 0, mbufs, BATCH_SIZE);
181         if (++eth_port == num_ports)
182                 eth_port = 0;
183         if (nb_rx == 0) {
184                 rte_pause();
185                 return 0;
186         }
187
188         for (i = 0; i < nb_rx; i++) {
189                 ev[i].flow_id = mbufs[i]->hash.rss;
190                 ev[i].op = RTE_EVENT_OP_NEW;
191                 ev[i].sched_type = cdata.queue_type;
192                 ev[i].queue_id = qid;
193                 ev[i].event_type = RTE_EVENT_TYPE_ETHDEV;
194                 ev[i].sub_event_type = 0;
195                 ev[i].priority = RTE_EVENT_DEV_PRIORITY_NORMAL;
196                 ev[i].mbuf = mbufs[i];
197                 RTE_SET_USED(prio_idx);
198         }
199
200         const int nb_tx = rte_event_enqueue_burst(dev_id, port_id, ev, nb_rx);
201         if (nb_tx != nb_rx) {
202                 for (i = nb_tx; i < nb_rx; i++)
203                         rte_pktmbuf_free(mbufs[i]);
204         }
205
206         return 0;
207 }
208
209 static inline void
210 schedule_devices(unsigned int lcore_id)
211 {
212         if (fdata->rx_core[lcore_id] && (fdata->rx_single ||
213             rte_atomic32_cmpset(&(fdata->rx_lock), 0, 1))) {
214                 producer();
215                 rte_atomic32_clear((rte_atomic32_t *)&(fdata->rx_lock));
216         }
217
218         if (fdata->sched_core[lcore_id] && (fdata->sched_single ||
219             rte_atomic32_cmpset(&(fdata->sched_lock), 0, 1))) {
220                 rte_service_run_iter_on_app_lcore(fdata->evdev_service_id, 1);
221                 if (cdata.dump_dev_signal) {
222                         rte_event_dev_dump(0, stdout);
223                         cdata.dump_dev_signal = 0;
224                 }
225                 rte_atomic32_clear((rte_atomic32_t *)&(fdata->sched_lock));
226         }
227
228         if (fdata->tx_core[lcore_id] && (fdata->tx_single ||
229             rte_atomic32_cmpset(&(fdata->tx_lock), 0, 1))) {
230                 consumer();
231                 rte_atomic32_clear((rte_atomic32_t *)&(fdata->tx_lock));
232         }
233 }
234
235 static inline void
236 work(struct rte_mbuf *m)
237 {
238         struct ether_hdr *eth;
239         struct ether_addr addr;
240
241         /* change mac addresses on packet (to use mbuf data) */
242         /*
243          * FIXME Swap mac address properly and also handle the
244          * case for both odd and even number of stages that the
245          * addresses end up the same at the end of the pipeline
246          */
247         eth = rte_pktmbuf_mtod(m, struct ether_hdr *);
248         ether_addr_copy(&eth->d_addr, &addr);
249         ether_addr_copy(&addr, &eth->d_addr);
250
251         /* do a number of cycles of work per packet */
252         volatile uint64_t start_tsc = rte_rdtsc();
253         while (rte_rdtsc() < start_tsc + cdata.worker_cycles)
254                 rte_pause();
255 }
256
257 static int
258 worker(void *arg)
259 {
260         struct rte_event events[BATCH_SIZE];
261
262         struct worker_data *data = (struct worker_data *)arg;
263         uint8_t dev_id = data->dev_id;
264         uint8_t port_id = data->port_id;
265         size_t sent = 0, received = 0;
266         unsigned int lcore_id = rte_lcore_id();
267
268         while (!fdata->done) {
269                 uint16_t i;
270
271                 schedule_devices(lcore_id);
272
273                 if (!fdata->worker_core[lcore_id]) {
274                         rte_pause();
275                         continue;
276                 }
277
278                 const uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
279                                 events, RTE_DIM(events), 0);
280
281                 if (nb_rx == 0) {
282                         rte_pause();
283                         continue;
284                 }
285                 received += nb_rx;
286
287                 for (i = 0; i < nb_rx; i++) {
288
289                         /* The first worker stage does classification */
290                         if (events[i].queue_id == cdata.qid[0])
291                                 events[i].flow_id = events[i].mbuf->hash.rss
292                                                         % cdata.num_fids;
293
294                         events[i].queue_id = cdata.next_qid[events[i].queue_id];
295                         events[i].op = RTE_EVENT_OP_FORWARD;
296                         events[i].sched_type = cdata.queue_type;
297
298                         work(events[i].mbuf);
299                 }
300                 uint16_t nb_tx = rte_event_enqueue_burst(dev_id, port_id,
301                                 events, nb_rx);
302                 while (nb_tx < nb_rx && !fdata->done)
303                         nb_tx += rte_event_enqueue_burst(dev_id, port_id,
304                                                         events + nb_tx,
305                                                         nb_rx - nb_tx);
306                 sent += nb_tx;
307         }
308
309         if (!cdata.quiet)
310                 printf("  worker %u thread done. RX=%zu TX=%zu\n",
311                                 rte_lcore_id(), received, sent);
312
313         return 0;
314 }
315
316 /*
317  * Parse the coremask given as argument (hexadecimal string) and fill
318  * the global configuration (core role and core count) with the parsed
319  * value.
320  */
321 static int xdigit2val(unsigned char c)
322 {
323         int val;
324
325         if (isdigit(c))
326                 val = c - '0';
327         else if (isupper(c))
328                 val = c - 'A' + 10;
329         else
330                 val = c - 'a' + 10;
331         return val;
332 }
333
334 static uint64_t
335 parse_coremask(const char *coremask)
336 {
337         int i, j, idx = 0;
338         unsigned int count = 0;
339         char c;
340         int val;
341         uint64_t mask = 0;
342         const int32_t BITS_HEX = 4;
343
344         if (coremask == NULL)
345                 return -1;
346         /* Remove all blank characters ahead and after .
347          * Remove 0x/0X if exists.
348          */
349         while (isblank(*coremask))
350                 coremask++;
351         if (coremask[0] == '0' && ((coremask[1] == 'x')
352                 || (coremask[1] == 'X')))
353                 coremask += 2;
354         i = strlen(coremask);
355         while ((i > 0) && isblank(coremask[i - 1]))
356                 i--;
357         if (i == 0)
358                 return -1;
359
360         for (i = i - 1; i >= 0 && idx < MAX_NUM_CORE; i--) {
361                 c = coremask[i];
362                 if (isxdigit(c) == 0) {
363                         /* invalid characters */
364                         return -1;
365                 }
366                 val = xdigit2val(c);
367                 for (j = 0; j < BITS_HEX && idx < MAX_NUM_CORE; j++, idx++) {
368                         if ((1 << j) & val) {
369                                 mask |= (1UL << idx);
370                                 count++;
371                         }
372                 }
373         }
374         for (; i >= 0; i--)
375                 if (coremask[i] != '0')
376                         return -1;
377         if (count == 0)
378                 return -1;
379         return mask;
380 }
381
382 static struct option long_options[] = {
383         {"workers", required_argument, 0, 'w'},
384         {"packets", required_argument, 0, 'n'},
385         {"atomic-flows", required_argument, 0, 'f'},
386         {"num_stages", required_argument, 0, 's'},
387         {"rx-mask", required_argument, 0, 'r'},
388         {"tx-mask", required_argument, 0, 't'},
389         {"sched-mask", required_argument, 0, 'e'},
390         {"cq-depth", required_argument, 0, 'c'},
391         {"work-cycles", required_argument, 0, 'W'},
392         {"queue-priority", no_argument, 0, 'P'},
393         {"parallel", no_argument, 0, 'p'},
394         {"ordered", no_argument, 0, 'o'},
395         {"quiet", no_argument, 0, 'q'},
396         {"dump", no_argument, 0, 'D'},
397         {0, 0, 0, 0}
398 };
399
400 static void
401 usage(void)
402 {
403         const char *usage_str =
404                 "  Usage: eventdev_demo [options]\n"
405                 "  Options:\n"
406                 "  -n, --packets=N              Send N packets (default ~32M), 0 implies no limit\n"
407                 "  -f, --atomic-flows=N         Use N random flows from 1 to N (default 16)\n"
408                 "  -s, --num_stages=N           Use N atomic stages (default 1)\n"
409                 "  -r, --rx-mask=core mask      Run NIC rx on CPUs in core mask\n"
410                 "  -w, --worker-mask=core mask  Run worker on CPUs in core mask\n"
411                 "  -t, --tx-mask=core mask      Run NIC tx on CPUs in core mask\n"
412                 "  -e  --sched-mask=core mask   Run scheduler on CPUs in core mask\n"
413                 "  -c  --cq-depth=N             Worker CQ depth (default 16)\n"
414                 "  -W  --work-cycles=N          Worker cycles (default 0)\n"
415                 "  -P  --queue-priority         Enable scheduler queue prioritization\n"
416                 "  -o, --ordered                Use ordered scheduling\n"
417                 "  -p, --parallel               Use parallel scheduling\n"
418                 "  -q, --quiet                  Minimize printed output\n"
419                 "  -D, --dump                   Print detailed statistics before exit"
420                 "\n";
421         fprintf(stderr, "%s", usage_str);
422         exit(1);
423 }
424
425 static void
426 parse_app_args(int argc, char **argv)
427 {
428         /* Parse cli options*/
429         int option_index;
430         int c;
431         opterr = 0;
432         uint64_t rx_lcore_mask = 0;
433         uint64_t tx_lcore_mask = 0;
434         uint64_t sched_lcore_mask = 0;
435         uint64_t worker_lcore_mask = 0;
436         int i;
437
438         for (;;) {
439                 c = getopt_long(argc, argv, "r:t:e:c:w:n:f:s:poPqDW:",
440                                 long_options, &option_index);
441                 if (c == -1)
442                         break;
443
444                 int popcnt = 0;
445                 switch (c) {
446                 case 'n':
447                         cdata.num_packets = (int64_t)atol(optarg);
448                         if (cdata.num_packets == 0)
449                                 cdata.num_packets = INT64_MAX;
450                         break;
451                 case 'f':
452                         cdata.num_fids = (unsigned int)atoi(optarg);
453                         break;
454                 case 's':
455                         cdata.num_stages = (unsigned int)atoi(optarg);
456                         break;
457                 case 'c':
458                         cdata.worker_cq_depth = (unsigned int)atoi(optarg);
459                         break;
460                 case 'W':
461                         cdata.worker_cycles = (unsigned int)atoi(optarg);
462                         break;
463                 case 'P':
464                         cdata.enable_queue_priorities = 1;
465                         break;
466                 case 'o':
467                         cdata.queue_type = RTE_SCHED_TYPE_ORDERED;
468                         break;
469                 case 'p':
470                         cdata.queue_type = RTE_SCHED_TYPE_PARALLEL;
471                         break;
472                 case 'q':
473                         cdata.quiet = 1;
474                         break;
475                 case 'D':
476                         cdata.dump_dev = 1;
477                         break;
478                 case 'w':
479                         worker_lcore_mask = parse_coremask(optarg);
480                         break;
481                 case 'r':
482                         rx_lcore_mask = parse_coremask(optarg);
483                         popcnt = __builtin_popcountll(rx_lcore_mask);
484                         fdata->rx_single = (popcnt == 1);
485                         break;
486                 case 't':
487                         tx_lcore_mask = parse_coremask(optarg);
488                         popcnt = __builtin_popcountll(tx_lcore_mask);
489                         fdata->tx_single = (popcnt == 1);
490                         break;
491                 case 'e':
492                         sched_lcore_mask = parse_coremask(optarg);
493                         popcnt = __builtin_popcountll(sched_lcore_mask);
494                         fdata->sched_single = (popcnt == 1);
495                         break;
496                 default:
497                         usage();
498                 }
499         }
500
501         if (worker_lcore_mask == 0 || rx_lcore_mask == 0 ||
502             sched_lcore_mask == 0 || tx_lcore_mask == 0) {
503                 printf("Core part of pipeline was not assigned any cores. "
504                         "This will stall the pipeline, please check core masks "
505                         "(use -h for details on setting core masks):\n"
506                         "\trx: %"PRIu64"\n\ttx: %"PRIu64"\n\tsched: %"PRIu64
507                         "\n\tworkers: %"PRIu64"\n",
508                         rx_lcore_mask, tx_lcore_mask, sched_lcore_mask,
509                         worker_lcore_mask);
510                 rte_exit(-1, "Fix core masks\n");
511         }
512         if (cdata.num_stages == 0 || cdata.num_stages > MAX_NUM_STAGES)
513                 usage();
514
515         for (i = 0; i < MAX_NUM_CORE; i++) {
516                 fdata->rx_core[i] = !!(rx_lcore_mask & (1UL << i));
517                 fdata->tx_core[i] = !!(tx_lcore_mask & (1UL << i));
518                 fdata->sched_core[i] = !!(sched_lcore_mask & (1UL << i));
519                 fdata->worker_core[i] = !!(worker_lcore_mask & (1UL << i));
520
521                 if (fdata->worker_core[i])
522                         cdata.num_workers++;
523                 if (core_in_use(i))
524                         cdata.active_cores++;
525         }
526 }
527
528 /*
529  * Initializes a given port using global settings and with the RX buffers
530  * coming from the mbuf_pool passed as a parameter.
531  */
532 static inline int
533 port_init(uint8_t port, struct rte_mempool *mbuf_pool)
534 {
535         static const struct rte_eth_conf port_conf_default = {
536                 .rxmode = {
537                         .mq_mode = ETH_MQ_RX_RSS,
538                         .max_rx_pkt_len = ETHER_MAX_LEN
539                 },
540                 .rx_adv_conf = {
541                         .rss_conf = {
542                                 .rss_hf = ETH_RSS_IP |
543                                           ETH_RSS_TCP |
544                                           ETH_RSS_UDP,
545                         }
546                 }
547         };
548         const uint16_t rx_rings = 1, tx_rings = 1;
549         const uint16_t rx_ring_size = 512, tx_ring_size = 512;
550         struct rte_eth_conf port_conf = port_conf_default;
551         int retval;
552         uint16_t q;
553
554         if (port >= rte_eth_dev_count())
555                 return -1;
556
557         /* Configure the Ethernet device. */
558         retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf);
559         if (retval != 0)
560                 return retval;
561
562         /* Allocate and set up 1 RX queue per Ethernet port. */
563         for (q = 0; q < rx_rings; q++) {
564                 retval = rte_eth_rx_queue_setup(port, q, rx_ring_size,
565                                 rte_eth_dev_socket_id(port), NULL, mbuf_pool);
566                 if (retval < 0)
567                         return retval;
568         }
569
570         /* Allocate and set up 1 TX queue per Ethernet port. */
571         for (q = 0; q < tx_rings; q++) {
572                 retval = rte_eth_tx_queue_setup(port, q, tx_ring_size,
573                                 rte_eth_dev_socket_id(port), NULL);
574                 if (retval < 0)
575                         return retval;
576         }
577
578         /* Start the Ethernet port. */
579         retval = rte_eth_dev_start(port);
580         if (retval < 0)
581                 return retval;
582
583         /* Display the port MAC address. */
584         struct ether_addr addr;
585         rte_eth_macaddr_get(port, &addr);
586         printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
587                            " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
588                         (unsigned int)port,
589                         addr.addr_bytes[0], addr.addr_bytes[1],
590                         addr.addr_bytes[2], addr.addr_bytes[3],
591                         addr.addr_bytes[4], addr.addr_bytes[5]);
592
593         /* Enable RX in promiscuous mode for the Ethernet device. */
594         rte_eth_promiscuous_enable(port);
595
596         return 0;
597 }
598
599 static int
600 init_ports(unsigned int num_ports)
601 {
602         uint8_t portid;
603         unsigned int i;
604
605         struct rte_mempool *mp = rte_pktmbuf_pool_create("packet_pool",
606                         /* mbufs */ 16384 * num_ports,
607                         /* cache_size */ 512,
608                         /* priv_size*/ 0,
609                         /* data_room_size */ RTE_MBUF_DEFAULT_BUF_SIZE,
610                         rte_socket_id());
611
612         for (portid = 0; portid < num_ports; portid++)
613                 if (port_init(portid, mp) != 0)
614                         rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu8 "\n",
615                                         portid);
616
617         for (i = 0; i < num_ports; i++) {
618                 void *userdata = (void *)(uintptr_t) i;
619                 fdata->tx_buf[i] =
620                         rte_malloc(NULL, RTE_ETH_TX_BUFFER_SIZE(32), 0);
621                 if (fdata->tx_buf[i] == NULL)
622                         rte_panic("Out of memory\n");
623                 rte_eth_tx_buffer_init(fdata->tx_buf[i], 32);
624                 rte_eth_tx_buffer_set_err_callback(fdata->tx_buf[i],
625                                                    eth_tx_buffer_retry,
626                                                    userdata);
627         }
628
629         return 0;
630 }
631
632 struct port_link {
633         uint8_t queue_id;
634         uint8_t priority;
635 };
636
637 static int
638 setup_eventdev(struct prod_data *prod_data,
639                 struct cons_data *cons_data,
640                 struct worker_data *worker_data)
641 {
642         const uint8_t dev_id = 0;
643         /* +1 stages is for a SINGLE_LINK TX stage */
644         const uint8_t nb_queues = cdata.num_stages + 1;
645         /* + 2 is one port for producer and one for consumer */
646         const uint8_t nb_ports = cdata.num_workers + 2;
647         struct rte_event_dev_config config = {
648                         .nb_event_queues = nb_queues,
649                         .nb_event_ports = nb_ports,
650                         .nb_events_limit  = 4096,
651                         .nb_event_queue_flows = 1024,
652                         .nb_event_port_dequeue_depth = 128,
653                         .nb_event_port_enqueue_depth = 128,
654         };
655         struct rte_event_port_conf wkr_p_conf = {
656                         .dequeue_depth = cdata.worker_cq_depth,
657                         .enqueue_depth = 64,
658                         .new_event_threshold = 4096,
659         };
660         struct rte_event_queue_conf wkr_q_conf = {
661                         .schedule_type = cdata.queue_type,
662                         .priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
663                         .nb_atomic_flows = 1024,
664                         .nb_atomic_order_sequences = 1024,
665         };
666         struct rte_event_port_conf tx_p_conf = {
667                         .dequeue_depth = 128,
668                         .enqueue_depth = 128,
669                         .new_event_threshold = 4096,
670         };
671         const struct rte_event_queue_conf tx_q_conf = {
672                         .priority = RTE_EVENT_DEV_PRIORITY_HIGHEST,
673                         .event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_LINK,
674         };
675
676         struct port_link worker_queues[MAX_NUM_STAGES];
677         struct port_link tx_queue;
678         unsigned int i;
679
680         int ret, ndev = rte_event_dev_count();
681         if (ndev < 1) {
682                 printf("%d: No Eventdev Devices Found\n", __LINE__);
683                 return -1;
684         }
685
686         struct rte_event_dev_info dev_info;
687         ret = rte_event_dev_info_get(dev_id, &dev_info);
688         printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name);
689
690         if (dev_info.max_event_port_dequeue_depth <
691                         config.nb_event_port_dequeue_depth)
692                 config.nb_event_port_dequeue_depth =
693                                 dev_info.max_event_port_dequeue_depth;
694         if (dev_info.max_event_port_enqueue_depth <
695                         config.nb_event_port_enqueue_depth)
696                 config.nb_event_port_enqueue_depth =
697                                 dev_info.max_event_port_enqueue_depth;
698
699         ret = rte_event_dev_configure(dev_id, &config);
700         if (ret < 0) {
701                 printf("%d: Error configuring device\n", __LINE__);
702                 return -1;
703         }
704
705         /* Q creation - one load balanced per pipeline stage*/
706         printf("  Stages:\n");
707         for (i = 0; i < cdata.num_stages; i++) {
708                 if (rte_event_queue_setup(dev_id, i, &wkr_q_conf) < 0) {
709                         printf("%d: error creating qid %d\n", __LINE__, i);
710                         return -1;
711                 }
712                 cdata.qid[i] = i;
713                 cdata.next_qid[i] = i+1;
714                 worker_queues[i].queue_id = i;
715                 if (cdata.enable_queue_priorities) {
716                         /* calculate priority stepping for each stage, leaving
717                          * headroom of 1 for the SINGLE_LINK TX below
718                          */
719                         const uint32_t prio_delta =
720                                 (RTE_EVENT_DEV_PRIORITY_LOWEST-1) /  nb_queues;
721
722                         /* higher priority for queues closer to tx */
723                         wkr_q_conf.priority =
724                                 RTE_EVENT_DEV_PRIORITY_LOWEST - prio_delta * i;
725                 }
726
727                 const char *type_str = "Atomic";
728                 switch (wkr_q_conf.schedule_type) {
729                 case RTE_SCHED_TYPE_ORDERED:
730                         type_str = "Ordered";
731                         break;
732                 case RTE_SCHED_TYPE_PARALLEL:
733                         type_str = "Parallel";
734                         break;
735                 }
736                 printf("\tStage %d, Type %s\tPriority = %d\n", i, type_str,
737                                 wkr_q_conf.priority);
738         }
739         printf("\n");
740
741         /* final queue for sending to TX core */
742         if (rte_event_queue_setup(dev_id, i, &tx_q_conf) < 0) {
743                 printf("%d: error creating qid %d\n", __LINE__, i);
744                 return -1;
745         }
746         tx_queue.queue_id = i;
747         tx_queue.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST;
748
749         if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
750                 wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
751         if (wkr_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
752                 wkr_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
753
754         /* set up one port per worker, linking to all stage queues */
755         for (i = 0; i < cdata.num_workers; i++) {
756                 struct worker_data *w = &worker_data[i];
757                 w->dev_id = dev_id;
758                 if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) {
759                         printf("Error setting up port %d\n", i);
760                         return -1;
761                 }
762
763                 uint32_t s;
764                 for (s = 0; s < cdata.num_stages; s++) {
765                         if (rte_event_port_link(dev_id, i,
766                                                 &worker_queues[s].queue_id,
767                                                 &worker_queues[s].priority,
768                                                 1) != 1) {
769                                 printf("%d: error creating link for port %d\n",
770                                                 __LINE__, i);
771                                 return -1;
772                         }
773                 }
774                 w->port_id = i;
775         }
776
777         if (tx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
778                 tx_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
779         if (tx_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
780                 tx_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
781
782         /* port for consumer, linked to TX queue */
783         if (rte_event_port_setup(dev_id, i, &tx_p_conf) < 0) {
784                 printf("Error setting up port %d\n", i);
785                 return -1;
786         }
787         if (rte_event_port_link(dev_id, i, &tx_queue.queue_id,
788                                 &tx_queue.priority, 1) != 1) {
789                 printf("%d: error creating link for port %d\n",
790                                 __LINE__, i);
791                 return -1;
792         }
793         /* port for producer, no links */
794         struct rte_event_port_conf rx_p_conf = {
795                         .dequeue_depth = 8,
796                         .enqueue_depth = 8,
797                         .new_event_threshold = 1200,
798         };
799
800         if (rx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
801                 rx_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
802         if (rx_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
803                 rx_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
804
805         if (rte_event_port_setup(dev_id, i + 1, &rx_p_conf) < 0) {
806                 printf("Error setting up port %d\n", i);
807                 return -1;
808         }
809
810         *prod_data = (struct prod_data){.dev_id = dev_id,
811                                         .port_id = i + 1,
812                                         .qid = cdata.qid[0] };
813         *cons_data = (struct cons_data){.dev_id = dev_id,
814                                         .port_id = i };
815
816         ret = rte_event_dev_service_id_get(dev_id,
817                                 &fdata->evdev_service_id);
818         if (ret != -ESRCH && ret != 0) {
819                 printf("Error getting the service ID for sw eventdev\n");
820                 return -1;
821         }
822         rte_service_runstate_set(fdata->evdev_service_id, 1);
823         rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0);
824         if (rte_event_dev_start(dev_id) < 0) {
825                 printf("Error starting eventdev\n");
826                 return -1;
827         }
828
829         return dev_id;
830 }
831
832 static void
833 signal_handler(int signum)
834 {
835         if (fdata->done)
836                 rte_exit(1, "Exiting on signal %d\n", signum);
837         if (signum == SIGINT || signum == SIGTERM) {
838                 printf("\n\nSignal %d received, preparing to exit...\n",
839                                 signum);
840                 fdata->done = 1;
841         }
842         if (signum == SIGTSTP)
843                 rte_event_dev_dump(0, stdout);
844 }
845
846 static inline uint64_t
847 port_stat(int dev_id, int32_t p)
848 {
849         char statname[64];
850         snprintf(statname, sizeof(statname), "port_%u_rx", p);
851         return rte_event_dev_xstats_by_name_get(dev_id, statname, NULL);
852 }
853
854 int
855 main(int argc, char **argv)
856 {
857         struct worker_data *worker_data;
858         unsigned int num_ports;
859         int lcore_id;
860         int err;
861
862         signal(SIGINT, signal_handler);
863         signal(SIGTERM, signal_handler);
864         signal(SIGTSTP, signal_handler);
865
866         err = rte_eal_init(argc, argv);
867         if (err < 0)
868                 rte_panic("Invalid EAL arguments\n");
869
870         argc -= err;
871         argv += err;
872
873         fdata = rte_malloc(NULL, sizeof(struct fastpath_data), 0);
874         if (fdata == NULL)
875                 rte_panic("Out of memory\n");
876
877         /* Parse cli options*/
878         parse_app_args(argc, argv);
879
880         num_ports = rte_eth_dev_count();
881         if (num_ports == 0)
882                 rte_panic("No ethernet ports found\n");
883
884         const unsigned int cores_needed = cdata.active_cores;
885
886         if (!cdata.quiet) {
887                 printf("  Config:\n");
888                 printf("\tports: %u\n", num_ports);
889                 printf("\tworkers: %u\n", cdata.num_workers);
890                 printf("\tpackets: %"PRIi64"\n", cdata.num_packets);
891                 printf("\tQueue-prio: %u\n", cdata.enable_queue_priorities);
892                 if (cdata.queue_type == RTE_SCHED_TYPE_ORDERED)
893                         printf("\tqid0 type: ordered\n");
894                 if (cdata.queue_type == RTE_SCHED_TYPE_ATOMIC)
895                         printf("\tqid0 type: atomic\n");
896                 printf("\tCores available: %u\n", rte_lcore_count());
897                 printf("\tCores used: %u\n", cores_needed);
898         }
899
900         if (rte_lcore_count() < cores_needed)
901                 rte_panic("Too few cores (%d < %d)\n", rte_lcore_count(),
902                                 cores_needed);
903
904         const unsigned int ndevs = rte_event_dev_count();
905         if (ndevs == 0)
906                 rte_panic("No dev_id devs found. Pasl in a --vdev eventdev.\n");
907         if (ndevs > 1)
908                 fprintf(stderr, "Warning: More than one eventdev, using idx 0");
909
910         worker_data = rte_calloc(0, cdata.num_workers,
911                         sizeof(worker_data[0]), 0);
912         if (worker_data == NULL)
913                 rte_panic("rte_calloc failed\n");
914
915         int dev_id = setup_eventdev(&prod_data, &cons_data, worker_data);
916         if (dev_id < 0)
917                 rte_exit(EXIT_FAILURE, "Error setting up eventdev\n");
918
919         prod_data.num_nic_ports = num_ports;
920         init_ports(num_ports);
921
922         int worker_idx = 0;
923         RTE_LCORE_FOREACH_SLAVE(lcore_id) {
924                 if (lcore_id >= MAX_NUM_CORE)
925                         break;
926
927                 if (!fdata->rx_core[lcore_id] &&
928                         !fdata->worker_core[lcore_id] &&
929                         !fdata->tx_core[lcore_id] &&
930                         !fdata->sched_core[lcore_id])
931                         continue;
932
933                 if (fdata->rx_core[lcore_id])
934                         printf(
935                                 "[%s()] lcore %d executing NIC Rx, and using eventdev port %u\n",
936                                 __func__, lcore_id, prod_data.port_id);
937
938                 if (fdata->tx_core[lcore_id])
939                         printf(
940                                 "[%s()] lcore %d executing NIC Tx, and using eventdev port %u\n",
941                                 __func__, lcore_id, cons_data.port_id);
942
943                 if (fdata->sched_core[lcore_id])
944                         printf("[%s()] lcore %d executing scheduler\n",
945                                         __func__, lcore_id);
946
947                 if (fdata->worker_core[lcore_id])
948                         printf(
949                                 "[%s()] lcore %d executing worker, using eventdev port %u\n",
950                                 __func__, lcore_id,
951                                 worker_data[worker_idx].port_id);
952
953                 err = rte_eal_remote_launch(worker, &worker_data[worker_idx],
954                                             lcore_id);
955                 if (err) {
956                         rte_panic("Failed to launch worker on core %d\n",
957                                         lcore_id);
958                         continue;
959                 }
960                 if (fdata->worker_core[lcore_id])
961                         worker_idx++;
962         }
963
964         lcore_id = rte_lcore_id();
965
966         if (core_in_use(lcore_id))
967                 worker(&worker_data[worker_idx++]);
968
969         rte_eal_mp_wait_lcore();
970
971         if (cdata.dump_dev)
972                 rte_event_dev_dump(dev_id, stdout);
973
974         if (!cdata.quiet && (port_stat(dev_id, worker_data[0].port_id) !=
975                         (uint64_t)-ENOTSUP)) {
976                 printf("\nPort Workload distribution:\n");
977                 uint32_t i;
978                 uint64_t tot_pkts = 0;
979                 uint64_t pkts_per_wkr[RTE_MAX_LCORE] = {0};
980                 for (i = 0; i < cdata.num_workers; i++) {
981                         pkts_per_wkr[i] =
982                                 port_stat(dev_id, worker_data[i].port_id);
983                         tot_pkts += pkts_per_wkr[i];
984                 }
985                 for (i = 0; i < cdata.num_workers; i++) {
986                         float pc = pkts_per_wkr[i]  * 100 /
987                                 ((float)tot_pkts);
988                         printf("worker %i :\t%.1f %% (%"PRIu64" pkts)\n",
989                                         i, pc, pkts_per_wkr[i]);
990                 }
991
992         }
993
994         return 0;
995 }