examples: take promiscuous mode switch result into account
[dpdk.git] / examples / distributor / main.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2017 Intel Corporation
3  */
4
5 #include <stdint.h>
6 #include <inttypes.h>
7 #include <unistd.h>
8 #include <signal.h>
9 #include <getopt.h>
10
11 #include <rte_eal.h>
12 #include <rte_ethdev.h>
13 #include <rte_cycles.h>
14 #include <rte_malloc.h>
15 #include <rte_debug.h>
16 #include <rte_prefetch.h>
17 #include <rte_distributor.h>
18 #include <rte_pause.h>
19 #include <rte_power.h>
20
21 #define RX_RING_SIZE 1024
22 #define TX_RING_SIZE 1024
23 #define NUM_MBUFS ((64*1024)-1)
24 #define MBUF_CACHE_SIZE 128
25 #define BURST_SIZE 64
26 #define SCHED_RX_RING_SZ 8192
27 #define SCHED_TX_RING_SZ 65536
28 #define BURST_SIZE_TX 32
29
30 #define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1
31
32 #define ANSI_COLOR_RED     "\x1b[31m"
33 #define ANSI_COLOR_RESET   "\x1b[0m"
34
35 /* mask of enabled ports */
36 static uint32_t enabled_port_mask;
37 volatile uint8_t quit_signal;
38 volatile uint8_t quit_signal_rx;
39 volatile uint8_t quit_signal_dist;
40 volatile uint8_t quit_signal_work;
41 unsigned int power_lib_initialised;
42
43 static volatile struct app_stats {
44         struct {
45                 uint64_t rx_pkts;
46                 uint64_t returned_pkts;
47                 uint64_t enqueued_pkts;
48                 uint64_t enqdrop_pkts;
49         } rx __rte_cache_aligned;
50         int pad1 __rte_cache_aligned;
51
52         struct {
53                 uint64_t in_pkts;
54                 uint64_t ret_pkts;
55                 uint64_t sent_pkts;
56                 uint64_t enqdrop_pkts;
57         } dist __rte_cache_aligned;
58         int pad2 __rte_cache_aligned;
59
60         struct {
61                 uint64_t dequeue_pkts;
62                 uint64_t tx_pkts;
63                 uint64_t enqdrop_pkts;
64         } tx __rte_cache_aligned;
65         int pad3 __rte_cache_aligned;
66
67         uint64_t worker_pkts[64] __rte_cache_aligned;
68
69         int pad4 __rte_cache_aligned;
70
71         uint64_t worker_bursts[64][8] __rte_cache_aligned;
72
73         int pad5 __rte_cache_aligned;
74
75         uint64_t port_rx_pkts[64] __rte_cache_aligned;
76         uint64_t port_tx_pkts[64] __rte_cache_aligned;
77 } app_stats;
78
79 struct app_stats prev_app_stats;
80
81 static const struct rte_eth_conf port_conf_default = {
82         .rxmode = {
83                 .mq_mode = ETH_MQ_RX_RSS,
84                 .max_rx_pkt_len = RTE_ETHER_MAX_LEN,
85         },
86         .txmode = {
87                 .mq_mode = ETH_MQ_TX_NONE,
88         },
89         .rx_adv_conf = {
90                 .rss_conf = {
91                         .rss_hf = ETH_RSS_IP | ETH_RSS_UDP |
92                                 ETH_RSS_TCP | ETH_RSS_SCTP,
93                 }
94         },
95 };
96
97 struct output_buffer {
98         unsigned count;
99         struct rte_mbuf *mbufs[BURST_SIZE];
100 };
101
102 static void print_stats(void);
103
104 /*
105  * Initialises a given port using global settings and with the rx buffers
106  * coming from the mbuf_pool passed as parameter
107  */
108 static inline int
109 port_init(uint16_t port, struct rte_mempool *mbuf_pool)
110 {
111         struct rte_eth_conf port_conf = port_conf_default;
112         const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1;
113         int retval;
114         uint16_t q;
115         uint16_t nb_rxd = RX_RING_SIZE;
116         uint16_t nb_txd = TX_RING_SIZE;
117         struct rte_eth_dev_info dev_info;
118         struct rte_eth_txconf txconf;
119
120         if (!rte_eth_dev_is_valid_port(port))
121                 return -1;
122
123         retval = rte_eth_dev_info_get(port, &dev_info);
124         if (retval != 0) {
125                 printf("Error during getting device (port %u) info: %s\n",
126                                 port, strerror(-retval));
127                 return retval;
128         }
129
130         if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
131                 port_conf.txmode.offloads |=
132                         DEV_TX_OFFLOAD_MBUF_FAST_FREE;
133
134         port_conf.rx_adv_conf.rss_conf.rss_hf &=
135                 dev_info.flow_type_rss_offloads;
136         if (port_conf.rx_adv_conf.rss_conf.rss_hf !=
137                         port_conf_default.rx_adv_conf.rss_conf.rss_hf) {
138                 printf("Port %u modified RSS hash function based on hardware support,"
139                         "requested:%#"PRIx64" configured:%#"PRIx64"\n",
140                         port,
141                         port_conf_default.rx_adv_conf.rss_conf.rss_hf,
142                         port_conf.rx_adv_conf.rss_conf.rss_hf);
143         }
144
145         retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf);
146         if (retval != 0)
147                 return retval;
148
149         retval = rte_eth_dev_adjust_nb_rx_tx_desc(port, &nb_rxd, &nb_txd);
150         if (retval != 0)
151                 return retval;
152
153         for (q = 0; q < rxRings; q++) {
154                 retval = rte_eth_rx_queue_setup(port, q, nb_rxd,
155                                                 rte_eth_dev_socket_id(port),
156                                                 NULL, mbuf_pool);
157                 if (retval < 0)
158                         return retval;
159         }
160
161         txconf = dev_info.default_txconf;
162         txconf.offloads = port_conf.txmode.offloads;
163         for (q = 0; q < txRings; q++) {
164                 retval = rte_eth_tx_queue_setup(port, q, nb_txd,
165                                                 rte_eth_dev_socket_id(port),
166                                                 &txconf);
167                 if (retval < 0)
168                         return retval;
169         }
170
171         retval = rte_eth_dev_start(port);
172         if (retval < 0)
173                 return retval;
174
175         struct rte_eth_link link;
176         rte_eth_link_get_nowait(port, &link);
177         while (!link.link_status) {
178                 printf("Waiting for Link up on port %"PRIu16"\n", port);
179                 sleep(1);
180                 rte_eth_link_get_nowait(port, &link);
181         }
182
183         if (!link.link_status) {
184                 printf("Link down on port %"PRIu16"\n", port);
185                 return 0;
186         }
187
188         struct rte_ether_addr addr;
189         rte_eth_macaddr_get(port, &addr);
190         printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8
191                         " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n",
192                         port,
193                         addr.addr_bytes[0], addr.addr_bytes[1],
194                         addr.addr_bytes[2], addr.addr_bytes[3],
195                         addr.addr_bytes[4], addr.addr_bytes[5]);
196
197         retval = rte_eth_promiscuous_enable(port);
198         if (retval != 0)
199                 return retval;
200
201         return 0;
202 }
203
204 struct lcore_params {
205         unsigned worker_id;
206         struct rte_distributor *d;
207         struct rte_ring *rx_dist_ring;
208         struct rte_ring *dist_tx_ring;
209         struct rte_mempool *mem_pool;
210 };
211
212 static int
213 lcore_rx(struct lcore_params *p)
214 {
215         const uint16_t nb_ports = rte_eth_dev_count_avail();
216         const int socket_id = rte_socket_id();
217         uint16_t port;
218         struct rte_mbuf *bufs[BURST_SIZE*2];
219
220         RTE_ETH_FOREACH_DEV(port) {
221                 /* skip ports that are not enabled */
222                 if ((enabled_port_mask & (1 << port)) == 0)
223                         continue;
224
225                 if (rte_eth_dev_socket_id(port) > 0 &&
226                                 rte_eth_dev_socket_id(port) != socket_id)
227                         printf("WARNING, port %u is on remote NUMA node to "
228                                         "RX thread.\n\tPerformance will not "
229                                         "be optimal.\n", port);
230         }
231
232         printf("\nCore %u doing packet RX.\n", rte_lcore_id());
233         port = 0;
234         while (!quit_signal_rx) {
235
236                 /* skip ports that are not enabled */
237                 if ((enabled_port_mask & (1 << port)) == 0) {
238                         if (++port == nb_ports)
239                                 port = 0;
240                         continue;
241                 }
242                 const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
243                                 BURST_SIZE);
244                 if (unlikely(nb_rx == 0)) {
245                         if (++port == nb_ports)
246                                 port = 0;
247                         continue;
248                 }
249                 app_stats.rx.rx_pkts += nb_rx;
250
251 /*
252  * You can run the distributor on the rx core with this code. Returned
253  * packets are then send straight to the tx core.
254  */
255 #if 0
256         rte_distributor_process(d, bufs, nb_rx);
257         const uint16_t nb_ret = rte_distributor_returned_pktsd,
258                         bufs, BURST_SIZE*2);
259
260                 app_stats.rx.returned_pkts += nb_ret;
261                 if (unlikely(nb_ret == 0)) {
262                         if (++port == nb_ports)
263                                 port = 0;
264                         continue;
265                 }
266
267                 struct rte_ring *tx_ring = p->dist_tx_ring;
268                 uint16_t sent = rte_ring_enqueue_burst(tx_ring,
269                                 (void *)bufs, nb_ret, NULL);
270 #else
271                 uint16_t nb_ret = nb_rx;
272                 /*
273                  * Swap the following two lines if you want the rx traffic
274                  * to go directly to tx, no distribution.
275                  */
276                 struct rte_ring *out_ring = p->rx_dist_ring;
277                 /* struct rte_ring *out_ring = p->dist_tx_ring; */
278
279                 uint16_t sent = rte_ring_enqueue_burst(out_ring,
280                                 (void *)bufs, nb_ret, NULL);
281 #endif
282
283                 app_stats.rx.enqueued_pkts += sent;
284                 if (unlikely(sent < nb_ret)) {
285                         app_stats.rx.enqdrop_pkts +=  nb_ret - sent;
286                         RTE_LOG_DP(DEBUG, DISTRAPP,
287                                 "%s:Packet loss due to full ring\n", __func__);
288                         while (sent < nb_ret)
289                                 rte_pktmbuf_free(bufs[sent++]);
290                 }
291                 if (++port == nb_ports)
292                         port = 0;
293         }
294         if (power_lib_initialised)
295                 rte_power_exit(rte_lcore_id());
296         /* set worker & tx threads quit flag */
297         printf("\nCore %u exiting rx task.\n", rte_lcore_id());
298         quit_signal = 1;
299         return 0;
300 }
301
302 static inline void
303 flush_one_port(struct output_buffer *outbuf, uint8_t outp)
304 {
305         unsigned int nb_tx = rte_eth_tx_burst(outp, 0,
306                         outbuf->mbufs, outbuf->count);
307         app_stats.tx.tx_pkts += outbuf->count;
308
309         if (unlikely(nb_tx < outbuf->count)) {
310                 app_stats.tx.enqdrop_pkts +=  outbuf->count - nb_tx;
311                 do {
312                         rte_pktmbuf_free(outbuf->mbufs[nb_tx]);
313                 } while (++nb_tx < outbuf->count);
314         }
315         outbuf->count = 0;
316 }
317
318 static inline void
319 flush_all_ports(struct output_buffer *tx_buffers)
320 {
321         uint16_t outp;
322
323         RTE_ETH_FOREACH_DEV(outp) {
324                 /* skip ports that are not enabled */
325                 if ((enabled_port_mask & (1 << outp)) == 0)
326                         continue;
327
328                 if (tx_buffers[outp].count == 0)
329                         continue;
330
331                 flush_one_port(&tx_buffers[outp], outp);
332         }
333 }
334
335
336
337 static int
338 lcore_distributor(struct lcore_params *p)
339 {
340         struct rte_ring *in_r = p->rx_dist_ring;
341         struct rte_ring *out_r = p->dist_tx_ring;
342         struct rte_mbuf *bufs[BURST_SIZE * 4];
343         struct rte_distributor *d = p->d;
344
345         printf("\nCore %u acting as distributor core.\n", rte_lcore_id());
346         while (!quit_signal_dist) {
347                 const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
348                                 (void *)bufs, BURST_SIZE*1, NULL);
349                 if (nb_rx) {
350                         app_stats.dist.in_pkts += nb_rx;
351
352                         /* Distribute the packets */
353                         rte_distributor_process(d, bufs, nb_rx);
354                         /* Handle Returns */
355                         const uint16_t nb_ret =
356                                 rte_distributor_returned_pkts(d,
357                                         bufs, BURST_SIZE*2);
358
359                         if (unlikely(nb_ret == 0))
360                                 continue;
361                         app_stats.dist.ret_pkts += nb_ret;
362
363                         uint16_t sent = rte_ring_enqueue_burst(out_r,
364                                         (void *)bufs, nb_ret, NULL);
365                         app_stats.dist.sent_pkts += sent;
366                         if (unlikely(sent < nb_ret)) {
367                                 app_stats.dist.enqdrop_pkts += nb_ret - sent;
368                                 RTE_LOG(DEBUG, DISTRAPP,
369                                         "%s:Packet loss due to full out ring\n",
370                                         __func__);
371                                 while (sent < nb_ret)
372                                         rte_pktmbuf_free(bufs[sent++]);
373                         }
374                 }
375         }
376         printf("\nCore %u exiting distributor task.\n", rte_lcore_id());
377         quit_signal_work = 1;
378         if (power_lib_initialised)
379                 rte_power_exit(rte_lcore_id());
380         rte_distributor_flush(d);
381         /* Unblock any returns so workers can exit */
382         rte_distributor_clear_returns(d);
383         quit_signal_rx = 1;
384         return 0;
385 }
386
387
388 static int
389 lcore_tx(struct rte_ring *in_r)
390 {
391         static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS];
392         const int socket_id = rte_socket_id();
393         uint16_t port;
394
395         RTE_ETH_FOREACH_DEV(port) {
396                 /* skip ports that are not enabled */
397                 if ((enabled_port_mask & (1 << port)) == 0)
398                         continue;
399
400                 if (rte_eth_dev_socket_id(port) > 0 &&
401                                 rte_eth_dev_socket_id(port) != socket_id)
402                         printf("WARNING, port %u is on remote NUMA node to "
403                                         "TX thread.\n\tPerformance will not "
404                                         "be optimal.\n", port);
405         }
406
407         printf("\nCore %u doing packet TX.\n", rte_lcore_id());
408         while (!quit_signal) {
409
410                 RTE_ETH_FOREACH_DEV(port) {
411                         /* skip ports that are not enabled */
412                         if ((enabled_port_mask & (1 << port)) == 0)
413                                 continue;
414
415                         struct rte_mbuf *bufs[BURST_SIZE_TX];
416                         const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
417                                         (void *)bufs, BURST_SIZE_TX, NULL);
418                         app_stats.tx.dequeue_pkts += nb_rx;
419
420                         /* if we get no traffic, flush anything we have */
421                         if (unlikely(nb_rx == 0)) {
422                                 flush_all_ports(tx_buffers);
423                                 continue;
424                         }
425
426                         /* for traffic we receive, queue it up for transmit */
427                         uint16_t i;
428                         rte_prefetch_non_temporal((void *)bufs[0]);
429                         rte_prefetch_non_temporal((void *)bufs[1]);
430                         rte_prefetch_non_temporal((void *)bufs[2]);
431                         for (i = 0; i < nb_rx; i++) {
432                                 struct output_buffer *outbuf;
433                                 uint8_t outp;
434                                 rte_prefetch_non_temporal((void *)bufs[i + 3]);
435                                 /*
436                                  * workers should update in_port to hold the
437                                  * output port value
438                                  */
439                                 outp = bufs[i]->port;
440                                 /* skip ports that are not enabled */
441                                 if ((enabled_port_mask & (1 << outp)) == 0)
442                                         continue;
443
444                                 outbuf = &tx_buffers[outp];
445                                 outbuf->mbufs[outbuf->count++] = bufs[i];
446                                 if (outbuf->count == BURST_SIZE_TX)
447                                         flush_one_port(outbuf, outp);
448                         }
449                 }
450         }
451         if (power_lib_initialised)
452                 rte_power_exit(rte_lcore_id());
453         printf("\nCore %u exiting tx task.\n", rte_lcore_id());
454         return 0;
455 }
456
457 static void
458 int_handler(int sig_num)
459 {
460         printf("Exiting on signal %d\n", sig_num);
461         /* set quit flag for rx thread to exit */
462         quit_signal_dist = 1;
463 }
464
465 static void
466 print_stats(void)
467 {
468         struct rte_eth_stats eth_stats;
469         unsigned int i, j;
470         const unsigned int num_workers = rte_lcore_count() - 4;
471
472         RTE_ETH_FOREACH_DEV(i) {
473                 rte_eth_stats_get(i, &eth_stats);
474                 app_stats.port_rx_pkts[i] = eth_stats.ipackets;
475                 app_stats.port_tx_pkts[i] = eth_stats.opackets;
476         }
477
478         printf("\n\nRX Thread:\n");
479         RTE_ETH_FOREACH_DEV(i) {
480                 printf("Port %u Pktsin : %5.2f\n", i,
481                                 (app_stats.port_rx_pkts[i] -
482                                 prev_app_stats.port_rx_pkts[i])/1000000.0);
483                 prev_app_stats.port_rx_pkts[i] = app_stats.port_rx_pkts[i];
484         }
485         printf(" - Received:    %5.2f\n",
486                         (app_stats.rx.rx_pkts -
487                         prev_app_stats.rx.rx_pkts)/1000000.0);
488         printf(" - Returned:    %5.2f\n",
489                         (app_stats.rx.returned_pkts -
490                         prev_app_stats.rx.returned_pkts)/1000000.0);
491         printf(" - Enqueued:    %5.2f\n",
492                         (app_stats.rx.enqueued_pkts -
493                         prev_app_stats.rx.enqueued_pkts)/1000000.0);
494         printf(" - Dropped:     %s%5.2f%s\n", ANSI_COLOR_RED,
495                         (app_stats.rx.enqdrop_pkts -
496                         prev_app_stats.rx.enqdrop_pkts)/1000000.0,
497                         ANSI_COLOR_RESET);
498
499         printf("Distributor thread:\n");
500         printf(" - In:          %5.2f\n",
501                         (app_stats.dist.in_pkts -
502                         prev_app_stats.dist.in_pkts)/1000000.0);
503         printf(" - Returned:    %5.2f\n",
504                         (app_stats.dist.ret_pkts -
505                         prev_app_stats.dist.ret_pkts)/1000000.0);
506         printf(" - Sent:        %5.2f\n",
507                         (app_stats.dist.sent_pkts -
508                         prev_app_stats.dist.sent_pkts)/1000000.0);
509         printf(" - Dropped      %s%5.2f%s\n", ANSI_COLOR_RED,
510                         (app_stats.dist.enqdrop_pkts -
511                         prev_app_stats.dist.enqdrop_pkts)/1000000.0,
512                         ANSI_COLOR_RESET);
513
514         printf("TX thread:\n");
515         printf(" - Dequeued:    %5.2f\n",
516                         (app_stats.tx.dequeue_pkts -
517                         prev_app_stats.tx.dequeue_pkts)/1000000.0);
518         RTE_ETH_FOREACH_DEV(i) {
519                 printf("Port %u Pktsout: %5.2f\n",
520                                 i, (app_stats.port_tx_pkts[i] -
521                                 prev_app_stats.port_tx_pkts[i])/1000000.0);
522                 prev_app_stats.port_tx_pkts[i] = app_stats.port_tx_pkts[i];
523         }
524         printf(" - Transmitted: %5.2f\n",
525                         (app_stats.tx.tx_pkts -
526                         prev_app_stats.tx.tx_pkts)/1000000.0);
527         printf(" - Dropped:     %s%5.2f%s\n", ANSI_COLOR_RED,
528                         (app_stats.tx.enqdrop_pkts -
529                         prev_app_stats.tx.enqdrop_pkts)/1000000.0,
530                         ANSI_COLOR_RESET);
531
532         prev_app_stats.rx.rx_pkts = app_stats.rx.rx_pkts;
533         prev_app_stats.rx.returned_pkts = app_stats.rx.returned_pkts;
534         prev_app_stats.rx.enqueued_pkts = app_stats.rx.enqueued_pkts;
535         prev_app_stats.rx.enqdrop_pkts = app_stats.rx.enqdrop_pkts;
536         prev_app_stats.dist.in_pkts = app_stats.dist.in_pkts;
537         prev_app_stats.dist.ret_pkts = app_stats.dist.ret_pkts;
538         prev_app_stats.dist.sent_pkts = app_stats.dist.sent_pkts;
539         prev_app_stats.dist.enqdrop_pkts = app_stats.dist.enqdrop_pkts;
540         prev_app_stats.tx.dequeue_pkts = app_stats.tx.dequeue_pkts;
541         prev_app_stats.tx.tx_pkts = app_stats.tx.tx_pkts;
542         prev_app_stats.tx.enqdrop_pkts = app_stats.tx.enqdrop_pkts;
543
544         for (i = 0; i < num_workers; i++) {
545                 printf("Worker %02u Pkts: %5.2f. Bursts(1-8): ", i,
546                                 (app_stats.worker_pkts[i] -
547                                 prev_app_stats.worker_pkts[i])/1000000.0);
548                 for (j = 0; j < 8; j++) {
549                         printf("%"PRIu64" ", app_stats.worker_bursts[i][j]);
550                         app_stats.worker_bursts[i][j] = 0;
551                 }
552                 printf("\n");
553                 prev_app_stats.worker_pkts[i] = app_stats.worker_pkts[i];
554         }
555 }
556
557 static int
558 lcore_worker(struct lcore_params *p)
559 {
560         struct rte_distributor *d = p->d;
561         const unsigned id = p->worker_id;
562         unsigned int num = 0;
563         unsigned int i;
564
565         /*
566          * for single port, xor_val will be zero so we won't modify the output
567          * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa
568          */
569         const unsigned xor_val = (rte_eth_dev_count_avail() > 1);
570         struct rte_mbuf *buf[8] __rte_cache_aligned;
571
572         for (i = 0; i < 8; i++)
573                 buf[i] = NULL;
574
575         app_stats.worker_pkts[p->worker_id] = 1;
576
577         printf("\nCore %u acting as worker core.\n", rte_lcore_id());
578         while (!quit_signal_work) {
579                 num = rte_distributor_get_pkt(d, id, buf, buf, num);
580                 /* Do a little bit of work for each packet */
581                 for (i = 0; i < num; i++) {
582                         uint64_t t = rte_rdtsc()+100;
583
584                         while (rte_rdtsc() < t)
585                                 rte_pause();
586                         buf[i]->port ^= xor_val;
587                 }
588
589                 app_stats.worker_pkts[p->worker_id] += num;
590                 if (num > 0)
591                         app_stats.worker_bursts[p->worker_id][num-1]++;
592         }
593         if (power_lib_initialised)
594                 rte_power_exit(rte_lcore_id());
595         rte_free(p);
596         return 0;
597 }
598
599 static int
600 init_power_library(void)
601 {
602         int ret = 0, lcore_id;
603         RTE_LCORE_FOREACH_SLAVE(lcore_id) {
604                 /* init power management library */
605                 ret = rte_power_init(lcore_id);
606                 if (ret) {
607                         RTE_LOG(ERR, POWER,
608                                 "Library initialization failed on core %u\n",
609                                 lcore_id);
610                         /*
611                          * Return on first failure, we'll fall back
612                          * to non-power operation
613                          */
614                         return ret;
615                 }
616         }
617         return ret;
618 }
619
620 /* display usage */
621 static void
622 print_usage(const char *prgname)
623 {
624         printf("%s [EAL options] -- -p PORTMASK\n"
625                         "  -p PORTMASK: hexadecimal bitmask of ports to configure\n",
626                         prgname);
627 }
628
629 static int
630 parse_portmask(const char *portmask)
631 {
632         char *end = NULL;
633         unsigned long pm;
634
635         /* parse hexadecimal string */
636         pm = strtoul(portmask, &end, 16);
637         if ((portmask[0] == '\0') || (end == NULL) || (*end != '\0'))
638                 return -1;
639
640         if (pm == 0)
641                 return -1;
642
643         return pm;
644 }
645
646 /* Parse the argument given in the command line of the application */
647 static int
648 parse_args(int argc, char **argv)
649 {
650         int opt;
651         char **argvopt;
652         int option_index;
653         char *prgname = argv[0];
654         static struct option lgopts[] = {
655                 {NULL, 0, 0, 0}
656         };
657
658         argvopt = argv;
659
660         while ((opt = getopt_long(argc, argvopt, "p:",
661                         lgopts, &option_index)) != EOF) {
662
663                 switch (opt) {
664                 /* portmask */
665                 case 'p':
666                         enabled_port_mask = parse_portmask(optarg);
667                         if (enabled_port_mask == 0) {
668                                 printf("invalid portmask\n");
669                                 print_usage(prgname);
670                                 return -1;
671                         }
672                         break;
673
674                 default:
675                         print_usage(prgname);
676                         return -1;
677                 }
678         }
679
680         if (optind <= 1) {
681                 print_usage(prgname);
682                 return -1;
683         }
684
685         argv[optind-1] = prgname;
686
687         optind = 1; /* reset getopt lib */
688         return 0;
689 }
690
691 /* Main function, does initialization and calls the per-lcore functions */
692 int
693 main(int argc, char *argv[])
694 {
695         struct rte_mempool *mbuf_pool;
696         struct rte_distributor *d;
697         struct rte_ring *dist_tx_ring;
698         struct rte_ring *rx_dist_ring;
699         struct rte_power_core_capabilities lcore_cap;
700         unsigned int lcore_id, worker_id = 0;
701         int distr_core_id = -1, rx_core_id = -1, tx_core_id = -1;
702         unsigned nb_ports;
703         uint16_t portid;
704         uint16_t nb_ports_available;
705         uint64_t t, freq;
706
707         /* catch ctrl-c so we can print on exit */
708         signal(SIGINT, int_handler);
709
710         /* init EAL */
711         int ret = rte_eal_init(argc, argv);
712         if (ret < 0)
713                 rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
714         argc -= ret;
715         argv += ret;
716
717         /* parse application arguments (after the EAL ones) */
718         ret = parse_args(argc, argv);
719         if (ret < 0)
720                 rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n");
721
722         if (rte_lcore_count() < 5)
723                 rte_exit(EXIT_FAILURE, "Error, This application needs at "
724                                 "least 5 logical cores to run:\n"
725                                 "1 lcore for stats (can be core 0)\n"
726                                 "1 lcore for packet RX\n"
727                                 "1 lcore for distribution\n"
728                                 "1 lcore for packet TX\n"
729                                 "and at least 1 lcore for worker threads\n");
730
731         if (init_power_library() == 0)
732                 power_lib_initialised = 1;
733
734         nb_ports = rte_eth_dev_count_avail();
735         if (nb_ports == 0)
736                 rte_exit(EXIT_FAILURE, "Error: no ethernet ports detected\n");
737         if (nb_ports != 1 && (nb_ports & 1))
738                 rte_exit(EXIT_FAILURE, "Error: number of ports must be even, except "
739                                 "when using a single port\n");
740
741         mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL",
742                 NUM_MBUFS * nb_ports, MBUF_CACHE_SIZE, 0,
743                 RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
744         if (mbuf_pool == NULL)
745                 rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n");
746         nb_ports_available = nb_ports;
747
748         /* initialize all ports */
749         RTE_ETH_FOREACH_DEV(portid) {
750                 /* skip ports that are not enabled */
751                 if ((enabled_port_mask & (1 << portid)) == 0) {
752                         printf("\nSkipping disabled port %d\n", portid);
753                         nb_ports_available--;
754                         continue;
755                 }
756                 /* init port */
757                 printf("Initializing port %u... done\n", portid);
758
759                 if (port_init(portid, mbuf_pool) != 0)
760                         rte_exit(EXIT_FAILURE, "Cannot initialize port %u\n",
761                                         portid);
762         }
763
764         if (!nb_ports_available) {
765                 rte_exit(EXIT_FAILURE,
766                                 "All available ports are disabled. Please set portmask.\n");
767         }
768
769         d = rte_distributor_create("PKT_DIST", rte_socket_id(),
770                         rte_lcore_count() - 4,
771                         RTE_DIST_ALG_BURST);
772         if (d == NULL)
773                 rte_exit(EXIT_FAILURE, "Cannot create distributor\n");
774
775         /*
776          * scheduler ring is read by the transmitter core, and written to
777          * by scheduler core
778          */
779         dist_tx_ring = rte_ring_create("Output_ring", SCHED_TX_RING_SZ,
780                         rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
781         if (dist_tx_ring == NULL)
782                 rte_exit(EXIT_FAILURE, "Cannot create output ring\n");
783
784         rx_dist_ring = rte_ring_create("Input_ring", SCHED_RX_RING_SZ,
785                         rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
786         if (rx_dist_ring == NULL)
787                 rte_exit(EXIT_FAILURE, "Cannot create output ring\n");
788
789         if (power_lib_initialised) {
790                 /*
791                  * Here we'll pre-assign lcore ids to the rx, tx and
792                  * distributor workloads if there's higher frequency
793                  * on those cores e.g. if Turbo Boost is enabled.
794                  * It's also worth mentioning that it will assign cores in a
795                  * specific order, so that if there's less than three
796                  * available, the higher frequency cores will go to the
797                  * distributor first, then rx, then tx.
798                  */
799                 RTE_LCORE_FOREACH_SLAVE(lcore_id) {
800
801                         rte_power_get_capabilities(lcore_id, &lcore_cap);
802
803                         if (lcore_cap.priority != 1)
804                                 continue;
805
806                         if (distr_core_id < 0) {
807                                 distr_core_id = lcore_id;
808                                 printf("Distributor on priority core %d\n",
809                                         lcore_id);
810                                 continue;
811                         }
812                         if (rx_core_id < 0) {
813                                 rx_core_id = lcore_id;
814                                 printf("Rx on priority core %d\n",
815                                         lcore_id);
816                                 continue;
817                         }
818                         if (tx_core_id < 0) {
819                                 tx_core_id = lcore_id;
820                                 printf("Tx on priority core %d\n",
821                                         lcore_id);
822                                 continue;
823                         }
824                 }
825         }
826
827         /*
828          * If there's any of the key workloads left without an lcore_id
829          * after the high performing core assignment above, pre-assign
830          * them here.
831          */
832         RTE_LCORE_FOREACH_SLAVE(lcore_id) {
833                 if (lcore_id == (unsigned int)distr_core_id ||
834                                 lcore_id == (unsigned int)rx_core_id ||
835                                 lcore_id == (unsigned int)tx_core_id)
836                         continue;
837                 if (distr_core_id < 0) {
838                         distr_core_id = lcore_id;
839                         printf("Distributor on core %d\n", lcore_id);
840                         continue;
841                 }
842                 if (rx_core_id < 0) {
843                         rx_core_id = lcore_id;
844                         printf("Rx on core %d\n", lcore_id);
845                         continue;
846                 }
847                 if (tx_core_id < 0) {
848                         tx_core_id = lcore_id;
849                         printf("Tx on core %d\n", lcore_id);
850                         continue;
851                 }
852         }
853
854         printf(" tx id %d, dist id %d, rx id %d\n",
855                         tx_core_id,
856                         distr_core_id,
857                         rx_core_id);
858
859         /*
860          * Kick off all the worker threads first, avoiding the pre-assigned
861          * lcore_ids for tx, rx and distributor workloads.
862          */
863         RTE_LCORE_FOREACH_SLAVE(lcore_id) {
864                 if (lcore_id == (unsigned int)distr_core_id ||
865                                 lcore_id == (unsigned int)rx_core_id ||
866                                 lcore_id == (unsigned int)tx_core_id)
867                         continue;
868                 printf("Starting thread %d as worker, lcore_id %d\n",
869                                 worker_id, lcore_id);
870                 struct lcore_params *p =
871                         rte_malloc(NULL, sizeof(*p), 0);
872                 if (!p)
873                         rte_panic("malloc failure\n");
874                 *p = (struct lcore_params){worker_id++, d, rx_dist_ring,
875                         dist_tx_ring, mbuf_pool};
876
877                 rte_eal_remote_launch((lcore_function_t *)lcore_worker,
878                                 p, lcore_id);
879         }
880
881         /* Start tx core */
882         rte_eal_remote_launch((lcore_function_t *)lcore_tx,
883                         dist_tx_ring, tx_core_id);
884
885         /* Start distributor core */
886         struct lcore_params *pd =
887                 rte_malloc(NULL, sizeof(*pd), 0);
888         if (!pd)
889                 rte_panic("malloc failure\n");
890         *pd = (struct lcore_params){worker_id++, d,
891                 rx_dist_ring, dist_tx_ring, mbuf_pool};
892         rte_eal_remote_launch(
893                         (lcore_function_t *)lcore_distributor,
894                         pd, distr_core_id);
895
896         /* Start rx core */
897         struct lcore_params *pr =
898                 rte_malloc(NULL, sizeof(*pr), 0);
899         if (!pr)
900                 rte_panic("malloc failure\n");
901         *pr = (struct lcore_params){worker_id++, d, rx_dist_ring,
902                 dist_tx_ring, mbuf_pool};
903         rte_eal_remote_launch((lcore_function_t *)lcore_rx,
904                         pr, rx_core_id);
905
906         freq = rte_get_timer_hz();
907         t = rte_rdtsc() + freq;
908         while (!quit_signal_dist) {
909                 if (t < rte_rdtsc()) {
910                         print_stats();
911                         t = rte_rdtsc() + freq;
912                 }
913                 usleep(1000);
914         }
915
916         RTE_LCORE_FOREACH_SLAVE(lcore_id) {
917                 if (rte_eal_wait_lcore(lcore_id) < 0)
918                         return -1;
919         }
920
921         print_stats();
922
923         rte_free(pd);
924         rte_free(pr);
925
926         return 0;
927 }