8c4a8feec0c2d19c77c53808b33ff640065f76be
[dpdk.git] / examples / distributor / main.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2017 Intel Corporation
3  */
4
5 #include <stdint.h>
6 #include <inttypes.h>
7 #include <unistd.h>
8 #include <signal.h>
9 #include <getopt.h>
10
11 #include <rte_eal.h>
12 #include <rte_ethdev.h>
13 #include <rte_cycles.h>
14 #include <rte_malloc.h>
15 #include <rte_debug.h>
16 #include <rte_prefetch.h>
17 #include <rte_distributor.h>
18 #include <rte_pause.h>
19 #include <rte_power.h>
20
21 #define RX_RING_SIZE 1024
22 #define TX_RING_SIZE 1024
23 #define NUM_MBUFS ((64*1024)-1)
24 #define MBUF_CACHE_SIZE 128
25 #define BURST_SIZE 64
26 #define SCHED_RX_RING_SZ 8192
27 #define SCHED_TX_RING_SZ 65536
28 #define BURST_SIZE_TX 32
29
30 #define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1
31
32 #define ANSI_COLOR_RED     "\x1b[31m"
33 #define ANSI_COLOR_RESET   "\x1b[0m"
34
35 /* mask of enabled ports */
36 static uint32_t enabled_port_mask;
37 volatile uint8_t quit_signal;
38 volatile uint8_t quit_signal_rx;
39 volatile uint8_t quit_signal_dist;
40 volatile uint8_t quit_signal_work;
41 unsigned int power_lib_initialised;
42
43 static volatile struct app_stats {
44         struct {
45                 uint64_t rx_pkts;
46                 uint64_t returned_pkts;
47                 uint64_t enqueued_pkts;
48                 uint64_t enqdrop_pkts;
49         } rx __rte_cache_aligned;
50         int pad1 __rte_cache_aligned;
51
52         struct {
53                 uint64_t in_pkts;
54                 uint64_t ret_pkts;
55                 uint64_t sent_pkts;
56                 uint64_t enqdrop_pkts;
57         } dist __rte_cache_aligned;
58         int pad2 __rte_cache_aligned;
59
60         struct {
61                 uint64_t dequeue_pkts;
62                 uint64_t tx_pkts;
63                 uint64_t enqdrop_pkts;
64         } tx __rte_cache_aligned;
65         int pad3 __rte_cache_aligned;
66
67         uint64_t worker_pkts[64] __rte_cache_aligned;
68
69         int pad4 __rte_cache_aligned;
70
71         uint64_t worker_bursts[64][8] __rte_cache_aligned;
72
73         int pad5 __rte_cache_aligned;
74
75         uint64_t port_rx_pkts[64] __rte_cache_aligned;
76         uint64_t port_tx_pkts[64] __rte_cache_aligned;
77 } app_stats;
78
79 struct app_stats prev_app_stats;
80
81 static const struct rte_eth_conf port_conf_default = {
82         .rxmode = {
83                 .mq_mode = ETH_MQ_RX_RSS,
84         },
85         .txmode = {
86                 .mq_mode = ETH_MQ_TX_NONE,
87         },
88         .rx_adv_conf = {
89                 .rss_conf = {
90                         .rss_hf = ETH_RSS_IP | ETH_RSS_UDP |
91                                 ETH_RSS_TCP | ETH_RSS_SCTP,
92                 }
93         },
94 };
95
96 struct output_buffer {
97         unsigned count;
98         struct rte_mbuf *mbufs[BURST_SIZE];
99 };
100
101 static void print_stats(void);
102
103 /*
104  * Initialises a given port using global settings and with the rx buffers
105  * coming from the mbuf_pool passed as parameter
106  */
107 static inline int
108 port_init(uint16_t port, struct rte_mempool *mbuf_pool)
109 {
110         struct rte_eth_conf port_conf = port_conf_default;
111         const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1;
112         int retval;
113         uint16_t q;
114         uint16_t nb_rxd = RX_RING_SIZE;
115         uint16_t nb_txd = TX_RING_SIZE;
116         struct rte_eth_dev_info dev_info;
117         struct rte_eth_txconf txconf;
118
119         if (!rte_eth_dev_is_valid_port(port))
120                 return -1;
121
122         retval = rte_eth_dev_info_get(port, &dev_info);
123         if (retval != 0) {
124                 printf("Error during getting device (port %u) info: %s\n",
125                                 port, strerror(-retval));
126                 return retval;
127         }
128
129         if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
130                 port_conf.txmode.offloads |=
131                         DEV_TX_OFFLOAD_MBUF_FAST_FREE;
132
133         port_conf.rx_adv_conf.rss_conf.rss_hf &=
134                 dev_info.flow_type_rss_offloads;
135         if (port_conf.rx_adv_conf.rss_conf.rss_hf !=
136                         port_conf_default.rx_adv_conf.rss_conf.rss_hf) {
137                 printf("Port %u modified RSS hash function based on hardware support,"
138                         "requested:%#"PRIx64" configured:%#"PRIx64"\n",
139                         port,
140                         port_conf_default.rx_adv_conf.rss_conf.rss_hf,
141                         port_conf.rx_adv_conf.rss_conf.rss_hf);
142         }
143
144         retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf);
145         if (retval != 0)
146                 return retval;
147
148         retval = rte_eth_dev_adjust_nb_rx_tx_desc(port, &nb_rxd, &nb_txd);
149         if (retval != 0)
150                 return retval;
151
152         for (q = 0; q < rxRings; q++) {
153                 retval = rte_eth_rx_queue_setup(port, q, nb_rxd,
154                                                 rte_eth_dev_socket_id(port),
155                                                 NULL, mbuf_pool);
156                 if (retval < 0)
157                         return retval;
158         }
159
160         txconf = dev_info.default_txconf;
161         txconf.offloads = port_conf.txmode.offloads;
162         for (q = 0; q < txRings; q++) {
163                 retval = rte_eth_tx_queue_setup(port, q, nb_txd,
164                                                 rte_eth_dev_socket_id(port),
165                                                 &txconf);
166                 if (retval < 0)
167                         return retval;
168         }
169
170         retval = rte_eth_dev_start(port);
171         if (retval < 0)
172                 return retval;
173
174         struct rte_eth_link link;
175         do {
176                 retval = rte_eth_link_get_nowait(port, &link);
177                 if (retval < 0) {
178                         printf("Failed link get (port %u): %s\n",
179                                 port, rte_strerror(-retval));
180                         return retval;
181                 } else if (link.link_status)
182                         break;
183
184                 printf("Waiting for Link up on port %"PRIu16"\n", port);
185                 sleep(1);
186         } while (!link.link_status);
187
188         if (!link.link_status) {
189                 printf("Link down on port %"PRIu16"\n", port);
190                 return 0;
191         }
192
193         struct rte_ether_addr addr;
194         retval = rte_eth_macaddr_get(port, &addr);
195         if (retval < 0) {
196                 printf("Failed to get MAC address (port %u): %s\n",
197                                 port, rte_strerror(-retval));
198                 return retval;
199         }
200
201         printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8
202                         " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n",
203                         port, RTE_ETHER_ADDR_BYTES(&addr));
204
205         retval = rte_eth_promiscuous_enable(port);
206         if (retval != 0)
207                 return retval;
208
209         return 0;
210 }
211
212 struct lcore_params {
213         unsigned worker_id;
214         struct rte_distributor *d;
215         struct rte_ring *rx_dist_ring;
216         struct rte_ring *dist_tx_ring;
217         struct rte_mempool *mem_pool;
218 };
219
220 static int
221 lcore_rx(struct lcore_params *p)
222 {
223         const uint16_t nb_ports = rte_eth_dev_count_avail();
224         const int socket_id = rte_socket_id();
225         uint16_t port;
226         struct rte_mbuf *bufs[BURST_SIZE*2];
227
228         RTE_ETH_FOREACH_DEV(port) {
229                 /* skip ports that are not enabled */
230                 if ((enabled_port_mask & (1 << port)) == 0)
231                         continue;
232
233                 if (rte_eth_dev_socket_id(port) > 0 &&
234                                 rte_eth_dev_socket_id(port) != socket_id)
235                         printf("WARNING, port %u is on remote NUMA node to "
236                                         "RX thread.\n\tPerformance will not "
237                                         "be optimal.\n", port);
238         }
239
240         printf("\nCore %u doing packet RX.\n", rte_lcore_id());
241         port = 0;
242         while (!quit_signal_rx) {
243
244                 /* skip ports that are not enabled */
245                 if ((enabled_port_mask & (1 << port)) == 0) {
246                         if (++port == nb_ports)
247                                 port = 0;
248                         continue;
249                 }
250                 const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
251                                 BURST_SIZE);
252                 if (unlikely(nb_rx == 0)) {
253                         if (++port == nb_ports)
254                                 port = 0;
255                         continue;
256                 }
257                 app_stats.rx.rx_pkts += nb_rx;
258
259 /*
260  * You can run the distributor on the rx core with this code. Returned
261  * packets are then send straight to the tx core.
262  */
263 #if 0
264         rte_distributor_process(d, bufs, nb_rx);
265         const uint16_t nb_ret = rte_distributor_returned_pktsd,
266                         bufs, BURST_SIZE*2);
267
268                 app_stats.rx.returned_pkts += nb_ret;
269                 if (unlikely(nb_ret == 0)) {
270                         if (++port == nb_ports)
271                                 port = 0;
272                         continue;
273                 }
274
275                 struct rte_ring *tx_ring = p->dist_tx_ring;
276                 uint16_t sent = rte_ring_enqueue_burst(tx_ring,
277                                 (void *)bufs, nb_ret, NULL);
278 #else
279                 uint16_t nb_ret = nb_rx;
280                 /*
281                  * Swap the following two lines if you want the rx traffic
282                  * to go directly to tx, no distribution.
283                  */
284                 struct rte_ring *out_ring = p->rx_dist_ring;
285                 /* struct rte_ring *out_ring = p->dist_tx_ring; */
286
287                 uint16_t sent = rte_ring_enqueue_burst(out_ring,
288                                 (void *)bufs, nb_ret, NULL);
289 #endif
290
291                 app_stats.rx.enqueued_pkts += sent;
292                 if (unlikely(sent < nb_ret)) {
293                         app_stats.rx.enqdrop_pkts +=  nb_ret - sent;
294                         RTE_LOG_DP(DEBUG, DISTRAPP,
295                                 "%s:Packet loss due to full ring\n", __func__);
296                         while (sent < nb_ret)
297                                 rte_pktmbuf_free(bufs[sent++]);
298                 }
299                 if (++port == nb_ports)
300                         port = 0;
301         }
302         if (power_lib_initialised)
303                 rte_power_exit(rte_lcore_id());
304         /* set worker & tx threads quit flag */
305         printf("\nCore %u exiting rx task.\n", rte_lcore_id());
306         quit_signal = 1;
307         return 0;
308 }
309
310 static inline void
311 flush_one_port(struct output_buffer *outbuf, uint8_t outp)
312 {
313         unsigned int nb_tx = rte_eth_tx_burst(outp, 0,
314                         outbuf->mbufs, outbuf->count);
315         app_stats.tx.tx_pkts += outbuf->count;
316
317         if (unlikely(nb_tx < outbuf->count)) {
318                 app_stats.tx.enqdrop_pkts +=  outbuf->count - nb_tx;
319                 do {
320                         rte_pktmbuf_free(outbuf->mbufs[nb_tx]);
321                 } while (++nb_tx < outbuf->count);
322         }
323         outbuf->count = 0;
324 }
325
326 static inline void
327 flush_all_ports(struct output_buffer *tx_buffers)
328 {
329         uint16_t outp;
330
331         RTE_ETH_FOREACH_DEV(outp) {
332                 /* skip ports that are not enabled */
333                 if ((enabled_port_mask & (1 << outp)) == 0)
334                         continue;
335
336                 if (tx_buffers[outp].count == 0)
337                         continue;
338
339                 flush_one_port(&tx_buffers[outp], outp);
340         }
341 }
342
343
344
345 static int
346 lcore_distributor(struct lcore_params *p)
347 {
348         struct rte_ring *in_r = p->rx_dist_ring;
349         struct rte_ring *out_r = p->dist_tx_ring;
350         struct rte_mbuf *bufs[BURST_SIZE * 4];
351         struct rte_distributor *d = p->d;
352
353         printf("\nCore %u acting as distributor core.\n", rte_lcore_id());
354         while (!quit_signal_dist) {
355                 const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
356                                 (void *)bufs, BURST_SIZE*1, NULL);
357                 if (nb_rx) {
358                         app_stats.dist.in_pkts += nb_rx;
359
360                         /* Distribute the packets */
361                         rte_distributor_process(d, bufs, nb_rx);
362                         /* Handle Returns */
363                         const uint16_t nb_ret =
364                                 rte_distributor_returned_pkts(d,
365                                         bufs, BURST_SIZE*2);
366
367                         if (unlikely(nb_ret == 0))
368                                 continue;
369                         app_stats.dist.ret_pkts += nb_ret;
370
371                         uint16_t sent = rte_ring_enqueue_burst(out_r,
372                                         (void *)bufs, nb_ret, NULL);
373                         app_stats.dist.sent_pkts += sent;
374                         if (unlikely(sent < nb_ret)) {
375                                 app_stats.dist.enqdrop_pkts += nb_ret - sent;
376                                 RTE_LOG(DEBUG, DISTRAPP,
377                                         "%s:Packet loss due to full out ring\n",
378                                         __func__);
379                                 while (sent < nb_ret)
380                                         rte_pktmbuf_free(bufs[sent++]);
381                         }
382                 }
383         }
384         printf("\nCore %u exiting distributor task.\n", rte_lcore_id());
385         quit_signal_work = 1;
386         if (power_lib_initialised)
387                 rte_power_exit(rte_lcore_id());
388         rte_distributor_flush(d);
389         /* Unblock any returns so workers can exit */
390         rte_distributor_clear_returns(d);
391         quit_signal_rx = 1;
392         return 0;
393 }
394
395
396 static int
397 lcore_tx(struct rte_ring *in_r)
398 {
399         static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS];
400         const int socket_id = rte_socket_id();
401         uint16_t port;
402
403         RTE_ETH_FOREACH_DEV(port) {
404                 /* skip ports that are not enabled */
405                 if ((enabled_port_mask & (1 << port)) == 0)
406                         continue;
407
408                 if (rte_eth_dev_socket_id(port) > 0 &&
409                                 rte_eth_dev_socket_id(port) != socket_id)
410                         printf("WARNING, port %u is on remote NUMA node to "
411                                         "TX thread.\n\tPerformance will not "
412                                         "be optimal.\n", port);
413         }
414
415         printf("\nCore %u doing packet TX.\n", rte_lcore_id());
416         while (!quit_signal) {
417
418                 RTE_ETH_FOREACH_DEV(port) {
419                         /* skip ports that are not enabled */
420                         if ((enabled_port_mask & (1 << port)) == 0)
421                                 continue;
422
423                         struct rte_mbuf *bufs[BURST_SIZE_TX];
424                         const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
425                                         (void *)bufs, BURST_SIZE_TX, NULL);
426                         app_stats.tx.dequeue_pkts += nb_rx;
427
428                         /* if we get no traffic, flush anything we have */
429                         if (unlikely(nb_rx == 0)) {
430                                 flush_all_ports(tx_buffers);
431                                 continue;
432                         }
433
434                         /* for traffic we receive, queue it up for transmit */
435                         uint16_t i;
436                         rte_prefetch_non_temporal((void *)bufs[0]);
437                         rte_prefetch_non_temporal((void *)bufs[1]);
438                         rte_prefetch_non_temporal((void *)bufs[2]);
439                         for (i = 0; i < nb_rx; i++) {
440                                 struct output_buffer *outbuf;
441                                 uint8_t outp;
442                                 rte_prefetch_non_temporal((void *)bufs[i + 3]);
443                                 /*
444                                  * workers should update in_port to hold the
445                                  * output port value
446                                  */
447                                 outp = bufs[i]->port;
448                                 /* skip ports that are not enabled */
449                                 if ((enabled_port_mask & (1 << outp)) == 0)
450                                         continue;
451
452                                 outbuf = &tx_buffers[outp];
453                                 outbuf->mbufs[outbuf->count++] = bufs[i];
454                                 if (outbuf->count == BURST_SIZE_TX)
455                                         flush_one_port(outbuf, outp);
456                         }
457                 }
458         }
459         if (power_lib_initialised)
460                 rte_power_exit(rte_lcore_id());
461         printf("\nCore %u exiting tx task.\n", rte_lcore_id());
462         return 0;
463 }
464
465 static void
466 int_handler(int sig_num)
467 {
468         printf("Exiting on signal %d\n", sig_num);
469         /* set quit flag for rx thread to exit */
470         quit_signal_dist = 1;
471 }
472
473 static void
474 print_stats(void)
475 {
476         struct rte_eth_stats eth_stats;
477         unsigned int i, j;
478         const unsigned int num_workers = rte_lcore_count() - 4;
479
480         RTE_ETH_FOREACH_DEV(i) {
481                 rte_eth_stats_get(i, &eth_stats);
482                 app_stats.port_rx_pkts[i] = eth_stats.ipackets;
483                 app_stats.port_tx_pkts[i] = eth_stats.opackets;
484         }
485
486         printf("\n\nRX Thread:\n");
487         RTE_ETH_FOREACH_DEV(i) {
488                 printf("Port %u Pktsin : %5.2f\n", i,
489                                 (app_stats.port_rx_pkts[i] -
490                                 prev_app_stats.port_rx_pkts[i])/1000000.0);
491                 prev_app_stats.port_rx_pkts[i] = app_stats.port_rx_pkts[i];
492         }
493         printf(" - Received:    %5.2f\n",
494                         (app_stats.rx.rx_pkts -
495                         prev_app_stats.rx.rx_pkts)/1000000.0);
496         printf(" - Returned:    %5.2f\n",
497                         (app_stats.rx.returned_pkts -
498                         prev_app_stats.rx.returned_pkts)/1000000.0);
499         printf(" - Enqueued:    %5.2f\n",
500                         (app_stats.rx.enqueued_pkts -
501                         prev_app_stats.rx.enqueued_pkts)/1000000.0);
502         printf(" - Dropped:     %s%5.2f%s\n", ANSI_COLOR_RED,
503                         (app_stats.rx.enqdrop_pkts -
504                         prev_app_stats.rx.enqdrop_pkts)/1000000.0,
505                         ANSI_COLOR_RESET);
506
507         printf("Distributor thread:\n");
508         printf(" - In:          %5.2f\n",
509                         (app_stats.dist.in_pkts -
510                         prev_app_stats.dist.in_pkts)/1000000.0);
511         printf(" - Returned:    %5.2f\n",
512                         (app_stats.dist.ret_pkts -
513                         prev_app_stats.dist.ret_pkts)/1000000.0);
514         printf(" - Sent:        %5.2f\n",
515                         (app_stats.dist.sent_pkts -
516                         prev_app_stats.dist.sent_pkts)/1000000.0);
517         printf(" - Dropped      %s%5.2f%s\n", ANSI_COLOR_RED,
518                         (app_stats.dist.enqdrop_pkts -
519                         prev_app_stats.dist.enqdrop_pkts)/1000000.0,
520                         ANSI_COLOR_RESET);
521
522         printf("TX thread:\n");
523         printf(" - Dequeued:    %5.2f\n",
524                         (app_stats.tx.dequeue_pkts -
525                         prev_app_stats.tx.dequeue_pkts)/1000000.0);
526         RTE_ETH_FOREACH_DEV(i) {
527                 printf("Port %u Pktsout: %5.2f\n",
528                                 i, (app_stats.port_tx_pkts[i] -
529                                 prev_app_stats.port_tx_pkts[i])/1000000.0);
530                 prev_app_stats.port_tx_pkts[i] = app_stats.port_tx_pkts[i];
531         }
532         printf(" - Transmitted: %5.2f\n",
533                         (app_stats.tx.tx_pkts -
534                         prev_app_stats.tx.tx_pkts)/1000000.0);
535         printf(" - Dropped:     %s%5.2f%s\n", ANSI_COLOR_RED,
536                         (app_stats.tx.enqdrop_pkts -
537                         prev_app_stats.tx.enqdrop_pkts)/1000000.0,
538                         ANSI_COLOR_RESET);
539
540         prev_app_stats.rx.rx_pkts = app_stats.rx.rx_pkts;
541         prev_app_stats.rx.returned_pkts = app_stats.rx.returned_pkts;
542         prev_app_stats.rx.enqueued_pkts = app_stats.rx.enqueued_pkts;
543         prev_app_stats.rx.enqdrop_pkts = app_stats.rx.enqdrop_pkts;
544         prev_app_stats.dist.in_pkts = app_stats.dist.in_pkts;
545         prev_app_stats.dist.ret_pkts = app_stats.dist.ret_pkts;
546         prev_app_stats.dist.sent_pkts = app_stats.dist.sent_pkts;
547         prev_app_stats.dist.enqdrop_pkts = app_stats.dist.enqdrop_pkts;
548         prev_app_stats.tx.dequeue_pkts = app_stats.tx.dequeue_pkts;
549         prev_app_stats.tx.tx_pkts = app_stats.tx.tx_pkts;
550         prev_app_stats.tx.enqdrop_pkts = app_stats.tx.enqdrop_pkts;
551
552         for (i = 0; i < num_workers; i++) {
553                 printf("Worker %02u Pkts: %5.2f. Bursts(1-8): ", i,
554                                 (app_stats.worker_pkts[i] -
555                                 prev_app_stats.worker_pkts[i])/1000000.0);
556                 for (j = 0; j < 8; j++) {
557                         printf("%"PRIu64" ", app_stats.worker_bursts[i][j]);
558                         app_stats.worker_bursts[i][j] = 0;
559                 }
560                 printf("\n");
561                 prev_app_stats.worker_pkts[i] = app_stats.worker_pkts[i];
562         }
563 }
564
565 static int
566 lcore_worker(struct lcore_params *p)
567 {
568         struct rte_distributor *d = p->d;
569         const unsigned id = p->worker_id;
570         unsigned int num = 0;
571         unsigned int i;
572
573         /*
574          * for single port, xor_val will be zero so we won't modify the output
575          * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa
576          */
577         const unsigned xor_val = (rte_eth_dev_count_avail() > 1);
578         struct rte_mbuf *buf[8] __rte_cache_aligned;
579
580         for (i = 0; i < 8; i++)
581                 buf[i] = NULL;
582
583         app_stats.worker_pkts[p->worker_id] = 1;
584
585         printf("\nCore %u acting as worker core.\n", rte_lcore_id());
586         while (!quit_signal_work) {
587                 num = rte_distributor_get_pkt(d, id, buf, buf, num);
588                 /* Do a little bit of work for each packet */
589                 for (i = 0; i < num; i++) {
590                         uint64_t t = rte_rdtsc()+100;
591
592                         while (rte_rdtsc() < t)
593                                 rte_pause();
594                         buf[i]->port ^= xor_val;
595                 }
596
597                 app_stats.worker_pkts[p->worker_id] += num;
598                 if (num > 0)
599                         app_stats.worker_bursts[p->worker_id][num-1]++;
600         }
601         if (power_lib_initialised)
602                 rte_power_exit(rte_lcore_id());
603         rte_free(p);
604         return 0;
605 }
606
607 static int
608 init_power_library(void)
609 {
610         int ret = 0, lcore_id;
611         RTE_LCORE_FOREACH_WORKER(lcore_id) {
612                 /* init power management library */
613                 ret = rte_power_init(lcore_id);
614                 if (ret) {
615                         RTE_LOG(ERR, POWER,
616                                 "Library initialization failed on core %u\n",
617                                 lcore_id);
618                         /*
619                          * Return on first failure, we'll fall back
620                          * to non-power operation
621                          */
622                         return ret;
623                 }
624         }
625         return ret;
626 }
627
628 /* display usage */
629 static void
630 print_usage(const char *prgname)
631 {
632         printf("%s [EAL options] -- -p PORTMASK\n"
633                         "  -p PORTMASK: hexadecimal bitmask of ports to configure\n",
634                         prgname);
635 }
636
637 static int
638 parse_portmask(const char *portmask)
639 {
640         char *end = NULL;
641         unsigned long pm;
642
643         /* parse hexadecimal string */
644         pm = strtoul(portmask, &end, 16);
645         if ((portmask[0] == '\0') || (end == NULL) || (*end != '\0'))
646                 return 0;
647
648         return pm;
649 }
650
651 /* Parse the argument given in the command line of the application */
652 static int
653 parse_args(int argc, char **argv)
654 {
655         int opt;
656         char **argvopt;
657         int option_index;
658         char *prgname = argv[0];
659         static struct option lgopts[] = {
660                 {NULL, 0, 0, 0}
661         };
662
663         argvopt = argv;
664
665         while ((opt = getopt_long(argc, argvopt, "p:",
666                         lgopts, &option_index)) != EOF) {
667
668                 switch (opt) {
669                 /* portmask */
670                 case 'p':
671                         enabled_port_mask = parse_portmask(optarg);
672                         if (enabled_port_mask == 0) {
673                                 printf("invalid portmask\n");
674                                 print_usage(prgname);
675                                 return -1;
676                         }
677                         break;
678
679                 default:
680                         print_usage(prgname);
681                         return -1;
682                 }
683         }
684
685         if (optind <= 1) {
686                 print_usage(prgname);
687                 return -1;
688         }
689
690         argv[optind-1] = prgname;
691
692         optind = 1; /* reset getopt lib */
693         return 0;
694 }
695
696 /* Main function, does initialization and calls the per-lcore functions */
697 int
698 main(int argc, char *argv[])
699 {
700         struct rte_mempool *mbuf_pool;
701         struct rte_distributor *d;
702         struct rte_ring *dist_tx_ring;
703         struct rte_ring *rx_dist_ring;
704         struct rte_power_core_capabilities lcore_cap;
705         unsigned int lcore_id, worker_id = 0;
706         int distr_core_id = -1, rx_core_id = -1, tx_core_id = -1;
707         unsigned nb_ports;
708         uint16_t portid;
709         uint16_t nb_ports_available;
710         uint64_t t, freq;
711
712         /* catch ctrl-c so we can print on exit */
713         signal(SIGINT, int_handler);
714
715         /* init EAL */
716         int ret = rte_eal_init(argc, argv);
717         if (ret < 0)
718                 rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
719         argc -= ret;
720         argv += ret;
721
722         /* parse application arguments (after the EAL ones) */
723         ret = parse_args(argc, argv);
724         if (ret < 0)
725                 rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n");
726
727         if (rte_lcore_count() < 5)
728                 rte_exit(EXIT_FAILURE, "Error, This application needs at "
729                                 "least 5 logical cores to run:\n"
730                                 "1 lcore for stats (can be core 0)\n"
731                                 "1 lcore for packet RX\n"
732                                 "1 lcore for distribution\n"
733                                 "1 lcore for packet TX\n"
734                                 "and at least 1 lcore for worker threads\n");
735
736         if (init_power_library() == 0)
737                 power_lib_initialised = 1;
738
739         nb_ports = rte_eth_dev_count_avail();
740         if (nb_ports == 0)
741                 rte_exit(EXIT_FAILURE, "Error: no ethernet ports detected\n");
742         if (nb_ports != 1 && (nb_ports & 1))
743                 rte_exit(EXIT_FAILURE, "Error: number of ports must be even, except "
744                                 "when using a single port\n");
745
746         mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL",
747                 NUM_MBUFS * nb_ports, MBUF_CACHE_SIZE, 0,
748                 RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
749         if (mbuf_pool == NULL)
750                 rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n");
751         nb_ports_available = nb_ports;
752
753         /* initialize all ports */
754         RTE_ETH_FOREACH_DEV(portid) {
755                 /* skip ports that are not enabled */
756                 if ((enabled_port_mask & (1 << portid)) == 0) {
757                         printf("\nSkipping disabled port %d\n", portid);
758                         nb_ports_available--;
759                         continue;
760                 }
761                 /* init port */
762                 printf("Initializing port %u... done\n", portid);
763
764                 if (port_init(portid, mbuf_pool) != 0)
765                         rte_exit(EXIT_FAILURE, "Cannot initialize port %u\n",
766                                         portid);
767         }
768
769         if (!nb_ports_available) {
770                 rte_exit(EXIT_FAILURE,
771                                 "All available ports are disabled. Please set portmask.\n");
772         }
773
774         d = rte_distributor_create("PKT_DIST", rte_socket_id(),
775                         rte_lcore_count() - 4,
776                         RTE_DIST_ALG_BURST);
777         if (d == NULL)
778                 rte_exit(EXIT_FAILURE, "Cannot create distributor\n");
779
780         /*
781          * scheduler ring is read by the transmitter core, and written to
782          * by scheduler core
783          */
784         dist_tx_ring = rte_ring_create("Output_ring", SCHED_TX_RING_SZ,
785                         rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
786         if (dist_tx_ring == NULL)
787                 rte_exit(EXIT_FAILURE, "Cannot create output ring\n");
788
789         rx_dist_ring = rte_ring_create("Input_ring", SCHED_RX_RING_SZ,
790                         rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
791         if (rx_dist_ring == NULL)
792                 rte_exit(EXIT_FAILURE, "Cannot create output ring\n");
793
794         if (power_lib_initialised) {
795                 /*
796                  * Here we'll pre-assign lcore ids to the rx, tx and
797                  * distributor workloads if there's higher frequency
798                  * on those cores e.g. if Turbo Boost is enabled.
799                  * It's also worth mentioning that it will assign cores in a
800                  * specific order, so that if there's less than three
801                  * available, the higher frequency cores will go to the
802                  * distributor first, then rx, then tx.
803                  */
804                 RTE_LCORE_FOREACH_WORKER(lcore_id) {
805
806                         rte_power_get_capabilities(lcore_id, &lcore_cap);
807
808                         if (lcore_cap.priority != 1)
809                                 continue;
810
811                         if (distr_core_id < 0) {
812                                 distr_core_id = lcore_id;
813                                 printf("Distributor on priority core %d\n",
814                                         lcore_id);
815                                 continue;
816                         }
817                         if (rx_core_id < 0) {
818                                 rx_core_id = lcore_id;
819                                 printf("Rx on priority core %d\n",
820                                         lcore_id);
821                                 continue;
822                         }
823                         if (tx_core_id < 0) {
824                                 tx_core_id = lcore_id;
825                                 printf("Tx on priority core %d\n",
826                                         lcore_id);
827                                 continue;
828                         }
829                 }
830         }
831
832         /*
833          * If there's any of the key workloads left without an lcore_id
834          * after the high performing core assignment above, pre-assign
835          * them here.
836          */
837         RTE_LCORE_FOREACH_WORKER(lcore_id) {
838                 if (lcore_id == (unsigned int)distr_core_id ||
839                                 lcore_id == (unsigned int)rx_core_id ||
840                                 lcore_id == (unsigned int)tx_core_id)
841                         continue;
842                 if (distr_core_id < 0) {
843                         distr_core_id = lcore_id;
844                         printf("Distributor on core %d\n", lcore_id);
845                         continue;
846                 }
847                 if (rx_core_id < 0) {
848                         rx_core_id = lcore_id;
849                         printf("Rx on core %d\n", lcore_id);
850                         continue;
851                 }
852                 if (tx_core_id < 0) {
853                         tx_core_id = lcore_id;
854                         printf("Tx on core %d\n", lcore_id);
855                         continue;
856                 }
857         }
858
859         printf(" tx id %d, dist id %d, rx id %d\n",
860                         tx_core_id,
861                         distr_core_id,
862                         rx_core_id);
863
864         /*
865          * Kick off all the worker threads first, avoiding the pre-assigned
866          * lcore_ids for tx, rx and distributor workloads.
867          */
868         RTE_LCORE_FOREACH_WORKER(lcore_id) {
869                 if (lcore_id == (unsigned int)distr_core_id ||
870                                 lcore_id == (unsigned int)rx_core_id ||
871                                 lcore_id == (unsigned int)tx_core_id)
872                         continue;
873                 printf("Starting thread %d as worker, lcore_id %d\n",
874                                 worker_id, lcore_id);
875                 struct lcore_params *p =
876                         rte_malloc(NULL, sizeof(*p), 0);
877                 if (!p)
878                         rte_panic("malloc failure\n");
879                 *p = (struct lcore_params){worker_id++, d, rx_dist_ring,
880                         dist_tx_ring, mbuf_pool};
881
882                 rte_eal_remote_launch((lcore_function_t *)lcore_worker,
883                                 p, lcore_id);
884         }
885
886         /* Start tx core */
887         rte_eal_remote_launch((lcore_function_t *)lcore_tx,
888                         dist_tx_ring, tx_core_id);
889
890         /* Start distributor core */
891         struct lcore_params *pd =
892                 rte_malloc(NULL, sizeof(*pd), 0);
893         if (!pd)
894                 rte_panic("malloc failure\n");
895         *pd = (struct lcore_params){worker_id++, d,
896                 rx_dist_ring, dist_tx_ring, mbuf_pool};
897         rte_eal_remote_launch(
898                         (lcore_function_t *)lcore_distributor,
899                         pd, distr_core_id);
900
901         /* Start rx core */
902         struct lcore_params *pr =
903                 rte_malloc(NULL, sizeof(*pr), 0);
904         if (!pr)
905                 rte_panic("malloc failure\n");
906         *pr = (struct lcore_params){worker_id++, d, rx_dist_ring,
907                 dist_tx_ring, mbuf_pool};
908         rte_eal_remote_launch((lcore_function_t *)lcore_rx,
909                         pr, rx_core_id);
910
911         freq = rte_get_timer_hz();
912         t = rte_rdtsc() + freq;
913         while (!quit_signal_dist) {
914                 if (t < rte_rdtsc()) {
915                         print_stats();
916                         t = rte_rdtsc() + freq;
917                 }
918                 usleep(1000);
919         }
920
921         RTE_LCORE_FOREACH_WORKER(lcore_id) {
922                 if (rte_eal_wait_lcore(lcore_id) < 0)
923                         return -1;
924         }
925
926         print_stats();
927
928         rte_free(pd);
929         rte_free(pr);
930
931         /* clean up the EAL */
932         rte_eal_cleanup();
933
934         return 0;
935 }