b5499bb129687f99ca946c56c6ab2bc1ddee4936
[dpdk.git] / examples / distributor / main.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2017 Intel Corporation
3  */
4
5 #include <stdint.h>
6 #include <inttypes.h>
7 #include <unistd.h>
8 #include <signal.h>
9 #include <getopt.h>
10
11 #include <rte_eal.h>
12 #include <rte_ethdev.h>
13 #include <rte_cycles.h>
14 #include <rte_malloc.h>
15 #include <rte_debug.h>
16 #include <rte_prefetch.h>
17 #include <rte_distributor.h>
18 #include <rte_pause.h>
19 #include <rte_power.h>
20
21 #define RX_RING_SIZE 1024
22 #define TX_RING_SIZE 1024
23 #define NUM_MBUFS ((64*1024)-1)
24 #define MBUF_CACHE_SIZE 128
25 #define BURST_SIZE 64
26 #define SCHED_RX_RING_SZ 8192
27 #define SCHED_TX_RING_SZ 65536
28 #define BURST_SIZE_TX 32
29
30 #define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1
31
32 #define ANSI_COLOR_RED     "\x1b[31m"
33 #define ANSI_COLOR_RESET   "\x1b[0m"
34
35 /* mask of enabled ports */
36 static uint32_t enabled_port_mask;
37 volatile uint8_t quit_signal;
38 volatile uint8_t quit_signal_rx;
39 volatile uint8_t quit_signal_dist;
40 volatile uint8_t quit_signal_work;
41 unsigned int power_lib_initialised;
42
43 static volatile struct app_stats {
44         struct {
45                 uint64_t rx_pkts;
46                 uint64_t returned_pkts;
47                 uint64_t enqueued_pkts;
48                 uint64_t enqdrop_pkts;
49         } rx __rte_cache_aligned;
50         int pad1 __rte_cache_aligned;
51
52         struct {
53                 uint64_t in_pkts;
54                 uint64_t ret_pkts;
55                 uint64_t sent_pkts;
56                 uint64_t enqdrop_pkts;
57         } dist __rte_cache_aligned;
58         int pad2 __rte_cache_aligned;
59
60         struct {
61                 uint64_t dequeue_pkts;
62                 uint64_t tx_pkts;
63                 uint64_t enqdrop_pkts;
64         } tx __rte_cache_aligned;
65         int pad3 __rte_cache_aligned;
66
67         uint64_t worker_pkts[64] __rte_cache_aligned;
68
69         int pad4 __rte_cache_aligned;
70
71         uint64_t worker_bursts[64][8] __rte_cache_aligned;
72
73         int pad5 __rte_cache_aligned;
74
75         uint64_t port_rx_pkts[64] __rte_cache_aligned;
76         uint64_t port_tx_pkts[64] __rte_cache_aligned;
77 } app_stats;
78
79 struct app_stats prev_app_stats;
80
81 static const struct rte_eth_conf port_conf_default = {
82         .rxmode = {
83                 .mq_mode = ETH_MQ_RX_RSS,
84                 .max_rx_pkt_len = ETHER_MAX_LEN,
85         },
86         .txmode = {
87                 .mq_mode = ETH_MQ_TX_NONE,
88         },
89         .rx_adv_conf = {
90                 .rss_conf = {
91                         .rss_hf = ETH_RSS_IP | ETH_RSS_UDP |
92                                 ETH_RSS_TCP | ETH_RSS_SCTP,
93                 }
94         },
95 };
96
97 struct output_buffer {
98         unsigned count;
99         struct rte_mbuf *mbufs[BURST_SIZE];
100 };
101
102 static void print_stats(void);
103
104 /*
105  * Initialises a given port using global settings and with the rx buffers
106  * coming from the mbuf_pool passed as parameter
107  */
108 static inline int
109 port_init(uint16_t port, struct rte_mempool *mbuf_pool)
110 {
111         struct rte_eth_conf port_conf = port_conf_default;
112         const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1;
113         int retval;
114         uint16_t q;
115         uint16_t nb_rxd = RX_RING_SIZE;
116         uint16_t nb_txd = TX_RING_SIZE;
117         struct rte_eth_dev_info dev_info;
118         struct rte_eth_txconf txconf;
119
120         if (!rte_eth_dev_is_valid_port(port))
121                 return -1;
122
123         rte_eth_dev_info_get(port, &dev_info);
124         if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
125                 port_conf.txmode.offloads |=
126                         DEV_TX_OFFLOAD_MBUF_FAST_FREE;
127
128         port_conf.rx_adv_conf.rss_conf.rss_hf &=
129                 dev_info.flow_type_rss_offloads;
130         if (port_conf.rx_adv_conf.rss_conf.rss_hf !=
131                         port_conf_default.rx_adv_conf.rss_conf.rss_hf) {
132                 printf("Port %u modified RSS hash function based on hardware support,"
133                         "requested:%#"PRIx64" configured:%#"PRIx64"\n",
134                         port,
135                         port_conf_default.rx_adv_conf.rss_conf.rss_hf,
136                         port_conf.rx_adv_conf.rss_conf.rss_hf);
137         }
138
139         retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf);
140         if (retval != 0)
141                 return retval;
142
143         retval = rte_eth_dev_adjust_nb_rx_tx_desc(port, &nb_rxd, &nb_txd);
144         if (retval != 0)
145                 return retval;
146
147         for (q = 0; q < rxRings; q++) {
148                 retval = rte_eth_rx_queue_setup(port, q, nb_rxd,
149                                                 rte_eth_dev_socket_id(port),
150                                                 NULL, mbuf_pool);
151                 if (retval < 0)
152                         return retval;
153         }
154
155         txconf = dev_info.default_txconf;
156         txconf.offloads = port_conf.txmode.offloads;
157         for (q = 0; q < txRings; q++) {
158                 retval = rte_eth_tx_queue_setup(port, q, nb_txd,
159                                                 rte_eth_dev_socket_id(port),
160                                                 &txconf);
161                 if (retval < 0)
162                         return retval;
163         }
164
165         retval = rte_eth_dev_start(port);
166         if (retval < 0)
167                 return retval;
168
169         struct rte_eth_link link;
170         rte_eth_link_get_nowait(port, &link);
171         while (!link.link_status) {
172                 printf("Waiting for Link up on port %"PRIu16"\n", port);
173                 sleep(1);
174                 rte_eth_link_get_nowait(port, &link);
175         }
176
177         if (!link.link_status) {
178                 printf("Link down on port %"PRIu16"\n", port);
179                 return 0;
180         }
181
182         struct ether_addr addr;
183         rte_eth_macaddr_get(port, &addr);
184         printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8
185                         " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n",
186                         port,
187                         addr.addr_bytes[0], addr.addr_bytes[1],
188                         addr.addr_bytes[2], addr.addr_bytes[3],
189                         addr.addr_bytes[4], addr.addr_bytes[5]);
190
191         rte_eth_promiscuous_enable(port);
192
193         return 0;
194 }
195
196 struct lcore_params {
197         unsigned worker_id;
198         struct rte_distributor *d;
199         struct rte_ring *rx_dist_ring;
200         struct rte_ring *dist_tx_ring;
201         struct rte_mempool *mem_pool;
202 };
203
204 static int
205 lcore_rx(struct lcore_params *p)
206 {
207         const uint16_t nb_ports = rte_eth_dev_count_avail();
208         const int socket_id = rte_socket_id();
209         uint16_t port;
210         struct rte_mbuf *bufs[BURST_SIZE*2];
211
212         RTE_ETH_FOREACH_DEV(port) {
213                 /* skip ports that are not enabled */
214                 if ((enabled_port_mask & (1 << port)) == 0)
215                         continue;
216
217                 if (rte_eth_dev_socket_id(port) > 0 &&
218                                 rte_eth_dev_socket_id(port) != socket_id)
219                         printf("WARNING, port %u is on remote NUMA node to "
220                                         "RX thread.\n\tPerformance will not "
221                                         "be optimal.\n", port);
222         }
223
224         printf("\nCore %u doing packet RX.\n", rte_lcore_id());
225         port = 0;
226         while (!quit_signal_rx) {
227
228                 /* skip ports that are not enabled */
229                 if ((enabled_port_mask & (1 << port)) == 0) {
230                         if (++port == nb_ports)
231                                 port = 0;
232                         continue;
233                 }
234                 const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
235                                 BURST_SIZE);
236                 if (unlikely(nb_rx == 0)) {
237                         if (++port == nb_ports)
238                                 port = 0;
239                         continue;
240                 }
241                 app_stats.rx.rx_pkts += nb_rx;
242
243 /*
244  * You can run the distributor on the rx core with this code. Returned
245  * packets are then send straight to the tx core.
246  */
247 #if 0
248         rte_distributor_process(d, bufs, nb_rx);
249         const uint16_t nb_ret = rte_distributor_returned_pktsd,
250                         bufs, BURST_SIZE*2);
251
252                 app_stats.rx.returned_pkts += nb_ret;
253                 if (unlikely(nb_ret == 0)) {
254                         if (++port == nb_ports)
255                                 port = 0;
256                         continue;
257                 }
258
259                 struct rte_ring *tx_ring = p->dist_tx_ring;
260                 uint16_t sent = rte_ring_enqueue_burst(tx_ring,
261                                 (void *)bufs, nb_ret, NULL);
262 #else
263                 uint16_t nb_ret = nb_rx;
264                 /*
265                  * Swap the following two lines if you want the rx traffic
266                  * to go directly to tx, no distribution.
267                  */
268                 struct rte_ring *out_ring = p->rx_dist_ring;
269                 /* struct rte_ring *out_ring = p->dist_tx_ring; */
270
271                 uint16_t sent = rte_ring_enqueue_burst(out_ring,
272                                 (void *)bufs, nb_ret, NULL);
273 #endif
274
275                 app_stats.rx.enqueued_pkts += sent;
276                 if (unlikely(sent < nb_ret)) {
277                         app_stats.rx.enqdrop_pkts +=  nb_ret - sent;
278                         RTE_LOG_DP(DEBUG, DISTRAPP,
279                                 "%s:Packet loss due to full ring\n", __func__);
280                         while (sent < nb_ret)
281                                 rte_pktmbuf_free(bufs[sent++]);
282                 }
283                 if (++port == nb_ports)
284                         port = 0;
285         }
286         if (power_lib_initialised)
287                 rte_power_exit(rte_lcore_id());
288         /* set worker & tx threads quit flag */
289         printf("\nCore %u exiting rx task.\n", rte_lcore_id());
290         quit_signal = 1;
291         return 0;
292 }
293
294 static inline void
295 flush_one_port(struct output_buffer *outbuf, uint8_t outp)
296 {
297         unsigned int nb_tx = rte_eth_tx_burst(outp, 0,
298                         outbuf->mbufs, outbuf->count);
299         app_stats.tx.tx_pkts += outbuf->count;
300
301         if (unlikely(nb_tx < outbuf->count)) {
302                 app_stats.tx.enqdrop_pkts +=  outbuf->count - nb_tx;
303                 do {
304                         rte_pktmbuf_free(outbuf->mbufs[nb_tx]);
305                 } while (++nb_tx < outbuf->count);
306         }
307         outbuf->count = 0;
308 }
309
310 static inline void
311 flush_all_ports(struct output_buffer *tx_buffers)
312 {
313         uint16_t outp;
314
315         RTE_ETH_FOREACH_DEV(outp) {
316                 /* skip ports that are not enabled */
317                 if ((enabled_port_mask & (1 << outp)) == 0)
318                         continue;
319
320                 if (tx_buffers[outp].count == 0)
321                         continue;
322
323                 flush_one_port(&tx_buffers[outp], outp);
324         }
325 }
326
327
328
329 static int
330 lcore_distributor(struct lcore_params *p)
331 {
332         struct rte_ring *in_r = p->rx_dist_ring;
333         struct rte_ring *out_r = p->dist_tx_ring;
334         struct rte_mbuf *bufs[BURST_SIZE * 4];
335         struct rte_distributor *d = p->d;
336
337         printf("\nCore %u acting as distributor core.\n", rte_lcore_id());
338         while (!quit_signal_dist) {
339                 const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
340                                 (void *)bufs, BURST_SIZE*1, NULL);
341                 if (nb_rx) {
342                         app_stats.dist.in_pkts += nb_rx;
343
344                         /* Distribute the packets */
345                         rte_distributor_process(d, bufs, nb_rx);
346                         /* Handle Returns */
347                         const uint16_t nb_ret =
348                                 rte_distributor_returned_pkts(d,
349                                         bufs, BURST_SIZE*2);
350
351                         if (unlikely(nb_ret == 0))
352                                 continue;
353                         app_stats.dist.ret_pkts += nb_ret;
354
355                         uint16_t sent = rte_ring_enqueue_burst(out_r,
356                                         (void *)bufs, nb_ret, NULL);
357                         app_stats.dist.sent_pkts += sent;
358                         if (unlikely(sent < nb_ret)) {
359                                 app_stats.dist.enqdrop_pkts += nb_ret - sent;
360                                 RTE_LOG(DEBUG, DISTRAPP,
361                                         "%s:Packet loss due to full out ring\n",
362                                         __func__);
363                                 while (sent < nb_ret)
364                                         rte_pktmbuf_free(bufs[sent++]);
365                         }
366                 }
367         }
368         printf("\nCore %u exiting distributor task.\n", rte_lcore_id());
369         quit_signal_work = 1;
370         if (power_lib_initialised)
371                 rte_power_exit(rte_lcore_id());
372         rte_distributor_flush(d);
373         /* Unblock any returns so workers can exit */
374         rte_distributor_clear_returns(d);
375         quit_signal_rx = 1;
376         return 0;
377 }
378
379
380 static int
381 lcore_tx(struct rte_ring *in_r)
382 {
383         static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS];
384         const int socket_id = rte_socket_id();
385         uint16_t port;
386
387         RTE_ETH_FOREACH_DEV(port) {
388                 /* skip ports that are not enabled */
389                 if ((enabled_port_mask & (1 << port)) == 0)
390                         continue;
391
392                 if (rte_eth_dev_socket_id(port) > 0 &&
393                                 rte_eth_dev_socket_id(port) != socket_id)
394                         printf("WARNING, port %u is on remote NUMA node to "
395                                         "TX thread.\n\tPerformance will not "
396                                         "be optimal.\n", port);
397         }
398
399         printf("\nCore %u doing packet TX.\n", rte_lcore_id());
400         while (!quit_signal) {
401
402                 RTE_ETH_FOREACH_DEV(port) {
403                         /* skip ports that are not enabled */
404                         if ((enabled_port_mask & (1 << port)) == 0)
405                                 continue;
406
407                         struct rte_mbuf *bufs[BURST_SIZE_TX];
408                         const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
409                                         (void *)bufs, BURST_SIZE_TX, NULL);
410                         app_stats.tx.dequeue_pkts += nb_rx;
411
412                         /* if we get no traffic, flush anything we have */
413                         if (unlikely(nb_rx == 0)) {
414                                 flush_all_ports(tx_buffers);
415                                 continue;
416                         }
417
418                         /* for traffic we receive, queue it up for transmit */
419                         uint16_t i;
420                         rte_prefetch_non_temporal((void *)bufs[0]);
421                         rte_prefetch_non_temporal((void *)bufs[1]);
422                         rte_prefetch_non_temporal((void *)bufs[2]);
423                         for (i = 0; i < nb_rx; i++) {
424                                 struct output_buffer *outbuf;
425                                 uint8_t outp;
426                                 rte_prefetch_non_temporal((void *)bufs[i + 3]);
427                                 /*
428                                  * workers should update in_port to hold the
429                                  * output port value
430                                  */
431                                 outp = bufs[i]->port;
432                                 /* skip ports that are not enabled */
433                                 if ((enabled_port_mask & (1 << outp)) == 0)
434                                         continue;
435
436                                 outbuf = &tx_buffers[outp];
437                                 outbuf->mbufs[outbuf->count++] = bufs[i];
438                                 if (outbuf->count == BURST_SIZE_TX)
439                                         flush_one_port(outbuf, outp);
440                         }
441                 }
442         }
443         if (power_lib_initialised)
444                 rte_power_exit(rte_lcore_id());
445         printf("\nCore %u exiting tx task.\n", rte_lcore_id());
446         return 0;
447 }
448
449 static void
450 int_handler(int sig_num)
451 {
452         printf("Exiting on signal %d\n", sig_num);
453         /* set quit flag for rx thread to exit */
454         quit_signal_dist = 1;
455 }
456
457 static void
458 print_stats(void)
459 {
460         struct rte_eth_stats eth_stats;
461         unsigned int i, j;
462         const unsigned int num_workers = rte_lcore_count() - 4;
463
464         RTE_ETH_FOREACH_DEV(i) {
465                 rte_eth_stats_get(i, &eth_stats);
466                 app_stats.port_rx_pkts[i] = eth_stats.ipackets;
467                 app_stats.port_tx_pkts[i] = eth_stats.opackets;
468         }
469
470         printf("\n\nRX Thread:\n");
471         RTE_ETH_FOREACH_DEV(i) {
472                 printf("Port %u Pktsin : %5.2f\n", i,
473                                 (app_stats.port_rx_pkts[i] -
474                                 prev_app_stats.port_rx_pkts[i])/1000000.0);
475                 prev_app_stats.port_rx_pkts[i] = app_stats.port_rx_pkts[i];
476         }
477         printf(" - Received:    %5.2f\n",
478                         (app_stats.rx.rx_pkts -
479                         prev_app_stats.rx.rx_pkts)/1000000.0);
480         printf(" - Returned:    %5.2f\n",
481                         (app_stats.rx.returned_pkts -
482                         prev_app_stats.rx.returned_pkts)/1000000.0);
483         printf(" - Enqueued:    %5.2f\n",
484                         (app_stats.rx.enqueued_pkts -
485                         prev_app_stats.rx.enqueued_pkts)/1000000.0);
486         printf(" - Dropped:     %s%5.2f%s\n", ANSI_COLOR_RED,
487                         (app_stats.rx.enqdrop_pkts -
488                         prev_app_stats.rx.enqdrop_pkts)/1000000.0,
489                         ANSI_COLOR_RESET);
490
491         printf("Distributor thread:\n");
492         printf(" - In:          %5.2f\n",
493                         (app_stats.dist.in_pkts -
494                         prev_app_stats.dist.in_pkts)/1000000.0);
495         printf(" - Returned:    %5.2f\n",
496                         (app_stats.dist.ret_pkts -
497                         prev_app_stats.dist.ret_pkts)/1000000.0);
498         printf(" - Sent:        %5.2f\n",
499                         (app_stats.dist.sent_pkts -
500                         prev_app_stats.dist.sent_pkts)/1000000.0);
501         printf(" - Dropped      %s%5.2f%s\n", ANSI_COLOR_RED,
502                         (app_stats.dist.enqdrop_pkts -
503                         prev_app_stats.dist.enqdrop_pkts)/1000000.0,
504                         ANSI_COLOR_RESET);
505
506         printf("TX thread:\n");
507         printf(" - Dequeued:    %5.2f\n",
508                         (app_stats.tx.dequeue_pkts -
509                         prev_app_stats.tx.dequeue_pkts)/1000000.0);
510         RTE_ETH_FOREACH_DEV(i) {
511                 printf("Port %u Pktsout: %5.2f\n",
512                                 i, (app_stats.port_tx_pkts[i] -
513                                 prev_app_stats.port_tx_pkts[i])/1000000.0);
514                 prev_app_stats.port_tx_pkts[i] = app_stats.port_tx_pkts[i];
515         }
516         printf(" - Transmitted: %5.2f\n",
517                         (app_stats.tx.tx_pkts -
518                         prev_app_stats.tx.tx_pkts)/1000000.0);
519         printf(" - Dropped:     %s%5.2f%s\n", ANSI_COLOR_RED,
520                         (app_stats.tx.enqdrop_pkts -
521                         prev_app_stats.tx.enqdrop_pkts)/1000000.0,
522                         ANSI_COLOR_RESET);
523
524         prev_app_stats.rx.rx_pkts = app_stats.rx.rx_pkts;
525         prev_app_stats.rx.returned_pkts = app_stats.rx.returned_pkts;
526         prev_app_stats.rx.enqueued_pkts = app_stats.rx.enqueued_pkts;
527         prev_app_stats.rx.enqdrop_pkts = app_stats.rx.enqdrop_pkts;
528         prev_app_stats.dist.in_pkts = app_stats.dist.in_pkts;
529         prev_app_stats.dist.ret_pkts = app_stats.dist.ret_pkts;
530         prev_app_stats.dist.sent_pkts = app_stats.dist.sent_pkts;
531         prev_app_stats.dist.enqdrop_pkts = app_stats.dist.enqdrop_pkts;
532         prev_app_stats.tx.dequeue_pkts = app_stats.tx.dequeue_pkts;
533         prev_app_stats.tx.tx_pkts = app_stats.tx.tx_pkts;
534         prev_app_stats.tx.enqdrop_pkts = app_stats.tx.enqdrop_pkts;
535
536         for (i = 0; i < num_workers; i++) {
537                 printf("Worker %02u Pkts: %5.2f. Bursts(1-8): ", i,
538                                 (app_stats.worker_pkts[i] -
539                                 prev_app_stats.worker_pkts[i])/1000000.0);
540                 for (j = 0; j < 8; j++) {
541                         printf("%"PRIu64" ", app_stats.worker_bursts[i][j]);
542                         app_stats.worker_bursts[i][j] = 0;
543                 }
544                 printf("\n");
545                 prev_app_stats.worker_pkts[i] = app_stats.worker_pkts[i];
546         }
547 }
548
549 static int
550 lcore_worker(struct lcore_params *p)
551 {
552         struct rte_distributor *d = p->d;
553         const unsigned id = p->worker_id;
554         unsigned int num = 0;
555         unsigned int i;
556
557         /*
558          * for single port, xor_val will be zero so we won't modify the output
559          * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa
560          */
561         const unsigned xor_val = (rte_eth_dev_count_avail() > 1);
562         struct rte_mbuf *buf[8] __rte_cache_aligned;
563
564         for (i = 0; i < 8; i++)
565                 buf[i] = NULL;
566
567         app_stats.worker_pkts[p->worker_id] = 1;
568
569         printf("\nCore %u acting as worker core.\n", rte_lcore_id());
570         while (!quit_signal_work) {
571                 num = rte_distributor_get_pkt(d, id, buf, buf, num);
572                 /* Do a little bit of work for each packet */
573                 for (i = 0; i < num; i++) {
574                         uint64_t t = rte_rdtsc()+100;
575
576                         while (rte_rdtsc() < t)
577                                 rte_pause();
578                         buf[i]->port ^= xor_val;
579                 }
580
581                 app_stats.worker_pkts[p->worker_id] += num;
582                 if (num > 0)
583                         app_stats.worker_bursts[p->worker_id][num-1]++;
584         }
585         if (power_lib_initialised)
586                 rte_power_exit(rte_lcore_id());
587         rte_free(p);
588         return 0;
589 }
590
591 static int
592 init_power_library(void)
593 {
594         int ret = 0, lcore_id;
595         RTE_LCORE_FOREACH_SLAVE(lcore_id) {
596                 /* init power management library */
597                 ret = rte_power_init(lcore_id);
598                 if (ret) {
599                         RTE_LOG(ERR, POWER,
600                                 "Library initialization failed on core %u\n",
601                                 lcore_id);
602                         /*
603                          * Return on first failure, we'll fall back
604                          * to non-power operation
605                          */
606                         return ret;
607                 }
608         }
609         return ret;
610 }
611
612 /* display usage */
613 static void
614 print_usage(const char *prgname)
615 {
616         printf("%s [EAL options] -- -p PORTMASK\n"
617                         "  -p PORTMASK: hexadecimal bitmask of ports to configure\n",
618                         prgname);
619 }
620
621 static int
622 parse_portmask(const char *portmask)
623 {
624         char *end = NULL;
625         unsigned long pm;
626
627         /* parse hexadecimal string */
628         pm = strtoul(portmask, &end, 16);
629         if ((portmask[0] == '\0') || (end == NULL) || (*end != '\0'))
630                 return -1;
631
632         if (pm == 0)
633                 return -1;
634
635         return pm;
636 }
637
638 /* Parse the argument given in the command line of the application */
639 static int
640 parse_args(int argc, char **argv)
641 {
642         int opt;
643         char **argvopt;
644         int option_index;
645         char *prgname = argv[0];
646         static struct option lgopts[] = {
647                 {NULL, 0, 0, 0}
648         };
649
650         argvopt = argv;
651
652         while ((opt = getopt_long(argc, argvopt, "p:",
653                         lgopts, &option_index)) != EOF) {
654
655                 switch (opt) {
656                 /* portmask */
657                 case 'p':
658                         enabled_port_mask = parse_portmask(optarg);
659                         if (enabled_port_mask == 0) {
660                                 printf("invalid portmask\n");
661                                 print_usage(prgname);
662                                 return -1;
663                         }
664                         break;
665
666                 default:
667                         print_usage(prgname);
668                         return -1;
669                 }
670         }
671
672         if (optind <= 1) {
673                 print_usage(prgname);
674                 return -1;
675         }
676
677         argv[optind-1] = prgname;
678
679         optind = 1; /* reset getopt lib */
680         return 0;
681 }
682
683 /* Main function, does initialization and calls the per-lcore functions */
684 int
685 main(int argc, char *argv[])
686 {
687         struct rte_mempool *mbuf_pool;
688         struct rte_distributor *d;
689         struct rte_ring *dist_tx_ring;
690         struct rte_ring *rx_dist_ring;
691         struct rte_power_core_capabilities lcore_cap;
692         unsigned int lcore_id, worker_id = 0;
693         int distr_core_id = -1, rx_core_id = -1, tx_core_id = -1;
694         unsigned nb_ports;
695         uint16_t portid;
696         uint16_t nb_ports_available;
697         uint64_t t, freq;
698
699         /* catch ctrl-c so we can print on exit */
700         signal(SIGINT, int_handler);
701
702         /* init EAL */
703         int ret = rte_eal_init(argc, argv);
704         if (ret < 0)
705                 rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
706         argc -= ret;
707         argv += ret;
708
709         /* parse application arguments (after the EAL ones) */
710         ret = parse_args(argc, argv);
711         if (ret < 0)
712                 rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n");
713
714         if (rte_lcore_count() < 5)
715                 rte_exit(EXIT_FAILURE, "Error, This application needs at "
716                                 "least 5 logical cores to run:\n"
717                                 "1 lcore for stats (can be core 0)\n"
718                                 "1 lcore for packet RX\n"
719                                 "1 lcore for distribution\n"
720                                 "1 lcore for packet TX\n"
721                                 "and at least 1 lcore for worker threads\n");
722
723         if (init_power_library() == 0)
724                 power_lib_initialised = 1;
725
726         nb_ports = rte_eth_dev_count_avail();
727         if (nb_ports == 0)
728                 rte_exit(EXIT_FAILURE, "Error: no ethernet ports detected\n");
729         if (nb_ports != 1 && (nb_ports & 1))
730                 rte_exit(EXIT_FAILURE, "Error: number of ports must be even, except "
731                                 "when using a single port\n");
732
733         mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL",
734                 NUM_MBUFS * nb_ports, MBUF_CACHE_SIZE, 0,
735                 RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
736         if (mbuf_pool == NULL)
737                 rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n");
738         nb_ports_available = nb_ports;
739
740         /* initialize all ports */
741         RTE_ETH_FOREACH_DEV(portid) {
742                 /* skip ports that are not enabled */
743                 if ((enabled_port_mask & (1 << portid)) == 0) {
744                         printf("\nSkipping disabled port %d\n", portid);
745                         nb_ports_available--;
746                         continue;
747                 }
748                 /* init port */
749                 printf("Initializing port %u... done\n", portid);
750
751                 if (port_init(portid, mbuf_pool) != 0)
752                         rte_exit(EXIT_FAILURE, "Cannot initialize port %u\n",
753                                         portid);
754         }
755
756         if (!nb_ports_available) {
757                 rte_exit(EXIT_FAILURE,
758                                 "All available ports are disabled. Please set portmask.\n");
759         }
760
761         d = rte_distributor_create("PKT_DIST", rte_socket_id(),
762                         rte_lcore_count() - 4,
763                         RTE_DIST_ALG_BURST);
764         if (d == NULL)
765                 rte_exit(EXIT_FAILURE, "Cannot create distributor\n");
766
767         /*
768          * scheduler ring is read by the transmitter core, and written to
769          * by scheduler core
770          */
771         dist_tx_ring = rte_ring_create("Output_ring", SCHED_TX_RING_SZ,
772                         rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
773         if (dist_tx_ring == NULL)
774                 rte_exit(EXIT_FAILURE, "Cannot create output ring\n");
775
776         rx_dist_ring = rte_ring_create("Input_ring", SCHED_RX_RING_SZ,
777                         rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
778         if (rx_dist_ring == NULL)
779                 rte_exit(EXIT_FAILURE, "Cannot create output ring\n");
780
781         if (power_lib_initialised) {
782                 /*
783                  * Here we'll pre-assign lcore ids to the rx, tx and
784                  * distributor workloads if there's higher frequency
785                  * on those cores e.g. if Turbo Boost is enabled.
786                  * It's also worth mentioning that it will assign cores in a
787                  * specific order, so that if there's less than three
788                  * available, the higher frequency cores will go to the
789                  * distributor first, then rx, then tx.
790                  */
791                 RTE_LCORE_FOREACH_SLAVE(lcore_id) {
792
793                         rte_power_get_capabilities(lcore_id, &lcore_cap);
794
795                         if (lcore_cap.priority != 1)
796                                 continue;
797
798                         if (distr_core_id < 0) {
799                                 distr_core_id = lcore_id;
800                                 printf("Distributor on priority core %d\n",
801                                         lcore_id);
802                                 continue;
803                         }
804                         if (rx_core_id < 0) {
805                                 rx_core_id = lcore_id;
806                                 printf("Rx on priority core %d\n",
807                                         lcore_id);
808                                 continue;
809                         }
810                         if (tx_core_id < 0) {
811                                 tx_core_id = lcore_id;
812                                 printf("Tx on priority core %d\n",
813                                         lcore_id);
814                                 continue;
815                         }
816                 }
817         }
818
819         /*
820          * If there's any of the key workloads left without an lcore_id
821          * after the high performing core assignment above, pre-assign
822          * them here.
823          */
824         RTE_LCORE_FOREACH_SLAVE(lcore_id) {
825                 if (lcore_id == (unsigned int)distr_core_id ||
826                                 lcore_id == (unsigned int)rx_core_id ||
827                                 lcore_id == (unsigned int)tx_core_id)
828                         continue;
829                 if (distr_core_id < 0) {
830                         distr_core_id = lcore_id;
831                         printf("Distributor on core %d\n", lcore_id);
832                         continue;
833                 }
834                 if (rx_core_id < 0) {
835                         rx_core_id = lcore_id;
836                         printf("Rx on core %d\n", lcore_id);
837                         continue;
838                 }
839                 if (tx_core_id < 0) {
840                         tx_core_id = lcore_id;
841                         printf("Tx on core %d\n", lcore_id);
842                         continue;
843                 }
844         }
845
846         printf(" tx id %d, dist id %d, rx id %d\n",
847                         tx_core_id,
848                         distr_core_id,
849                         rx_core_id);
850
851         /*
852          * Kick off all the worker threads first, avoiding the pre-assigned
853          * lcore_ids for tx, rx and distributor workloads.
854          */
855         RTE_LCORE_FOREACH_SLAVE(lcore_id) {
856                 if (lcore_id == (unsigned int)distr_core_id ||
857                                 lcore_id == (unsigned int)rx_core_id ||
858                                 lcore_id == (unsigned int)tx_core_id)
859                         continue;
860                 printf("Starting thread %d as worker, lcore_id %d\n",
861                                 worker_id, lcore_id);
862                 struct lcore_params *p =
863                         rte_malloc(NULL, sizeof(*p), 0);
864                 if (!p)
865                         rte_panic("malloc failure\n");
866                 *p = (struct lcore_params){worker_id++, d, rx_dist_ring,
867                         dist_tx_ring, mbuf_pool};
868
869                 rte_eal_remote_launch((lcore_function_t *)lcore_worker,
870                                 p, lcore_id);
871         }
872
873         /* Start tx core */
874         rte_eal_remote_launch((lcore_function_t *)lcore_tx,
875                         dist_tx_ring, tx_core_id);
876
877         /* Start distributor core */
878         struct lcore_params *pd =
879                 rte_malloc(NULL, sizeof(*pd), 0);
880         if (!pd)
881                 rte_panic("malloc failure\n");
882         *pd = (struct lcore_params){worker_id++, d,
883                 rx_dist_ring, dist_tx_ring, mbuf_pool};
884         rte_eal_remote_launch(
885                         (lcore_function_t *)lcore_distributor,
886                         pd, distr_core_id);
887
888         /* Start rx core */
889         struct lcore_params *pr =
890                 rte_malloc(NULL, sizeof(*pr), 0);
891         if (!pr)
892                 rte_panic("malloc failure\n");
893         *pr = (struct lcore_params){worker_id++, d, rx_dist_ring,
894                 dist_tx_ring, mbuf_pool};
895         rte_eal_remote_launch((lcore_function_t *)lcore_rx,
896                         pr, rx_core_id);
897
898         freq = rte_get_timer_hz();
899         t = rte_rdtsc() + freq;
900         while (!quit_signal_dist) {
901                 if (t < rte_rdtsc()) {
902                         print_stats();
903                         t = rte_rdtsc() + freq;
904                 }
905                 usleep(1000);
906         }
907
908         RTE_LCORE_FOREACH_SLAVE(lcore_id) {
909                 if (rte_eal_wait_lcore(lcore_id) < 0)
910                         return -1;
911         }
912
913         print_stats();
914
915         rte_free(pd);
916         rte_free(pr);
917
918         return 0;
919 }