1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2010-2017 Intel Corporation
12 #include <rte_ethdev.h>
13 #include <rte_cycles.h>
14 #include <rte_malloc.h>
15 #include <rte_debug.h>
16 #include <rte_prefetch.h>
17 #include <rte_distributor.h>
18 #include <rte_pause.h>
20 #define RX_RING_SIZE 512
21 #define TX_RING_SIZE 512
22 #define NUM_MBUFS ((64*1024)-1)
23 #define MBUF_CACHE_SIZE 128
25 #define SCHED_RX_RING_SZ 8192
26 #define SCHED_TX_RING_SZ 65536
27 #define BURST_SIZE_TX 32
29 #define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1
31 #define ANSI_COLOR_RED "\x1b[31m"
32 #define ANSI_COLOR_RESET "\x1b[0m"
34 /* mask of enabled ports */
35 static uint32_t enabled_port_mask;
36 volatile uint8_t quit_signal;
37 volatile uint8_t quit_signal_rx;
38 volatile uint8_t quit_signal_dist;
39 volatile uint8_t quit_signal_work;
41 static volatile struct app_stats {
44 uint64_t returned_pkts;
45 uint64_t enqueued_pkts;
46 uint64_t enqdrop_pkts;
47 } rx __rte_cache_aligned;
48 int pad1 __rte_cache_aligned;
54 uint64_t enqdrop_pkts;
55 } dist __rte_cache_aligned;
56 int pad2 __rte_cache_aligned;
59 uint64_t dequeue_pkts;
61 uint64_t enqdrop_pkts;
62 } tx __rte_cache_aligned;
63 int pad3 __rte_cache_aligned;
65 uint64_t worker_pkts[64] __rte_cache_aligned;
67 int pad4 __rte_cache_aligned;
69 uint64_t worker_bursts[64][8] __rte_cache_aligned;
71 int pad5 __rte_cache_aligned;
73 uint64_t port_rx_pkts[64] __rte_cache_aligned;
74 uint64_t port_tx_pkts[64] __rte_cache_aligned;
77 struct app_stats prev_app_stats;
79 static const struct rte_eth_conf port_conf_default = {
81 .mq_mode = ETH_MQ_RX_RSS,
82 .max_rx_pkt_len = ETHER_MAX_LEN,
85 .mq_mode = ETH_MQ_TX_NONE,
89 .rss_hf = ETH_RSS_IP | ETH_RSS_UDP |
90 ETH_RSS_TCP | ETH_RSS_SCTP,
95 struct output_buffer {
97 struct rte_mbuf *mbufs[BURST_SIZE];
100 static void print_stats(void);
103 * Initialises a given port using global settings and with the rx buffers
104 * coming from the mbuf_pool passed as parameter
107 port_init(uint16_t port, struct rte_mempool *mbuf_pool)
109 struct rte_eth_conf port_conf = port_conf_default;
110 const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1;
113 uint16_t nb_rxd = RX_RING_SIZE;
114 uint16_t nb_txd = TX_RING_SIZE;
116 if (port >= rte_eth_dev_count())
119 retval = rte_eth_dev_configure(port, rxRings, txRings, &port_conf);
123 retval = rte_eth_dev_adjust_nb_rx_tx_desc(port, &nb_rxd, &nb_txd);
127 for (q = 0; q < rxRings; q++) {
128 retval = rte_eth_rx_queue_setup(port, q, nb_rxd,
129 rte_eth_dev_socket_id(port),
135 for (q = 0; q < txRings; q++) {
136 retval = rte_eth_tx_queue_setup(port, q, nb_txd,
137 rte_eth_dev_socket_id(port),
143 retval = rte_eth_dev_start(port);
147 struct rte_eth_link link;
148 rte_eth_link_get_nowait(port, &link);
149 while (!link.link_status) {
150 printf("Waiting for Link up on port %"PRIu16"\n", port);
152 rte_eth_link_get_nowait(port, &link);
155 if (!link.link_status) {
156 printf("Link down on port %"PRIu16"\n", port);
160 struct ether_addr addr;
161 rte_eth_macaddr_get(port, &addr);
162 printf("Port %u MAC: %02"PRIx8" %02"PRIx8" %02"PRIx8
163 " %02"PRIx8" %02"PRIx8" %02"PRIx8"\n",
165 addr.addr_bytes[0], addr.addr_bytes[1],
166 addr.addr_bytes[2], addr.addr_bytes[3],
167 addr.addr_bytes[4], addr.addr_bytes[5]);
169 rte_eth_promiscuous_enable(port);
174 struct lcore_params {
176 struct rte_distributor *d;
177 struct rte_ring *rx_dist_ring;
178 struct rte_ring *dist_tx_ring;
179 struct rte_mempool *mem_pool;
183 lcore_rx(struct lcore_params *p)
185 const uint16_t nb_ports = rte_eth_dev_count();
186 const int socket_id = rte_socket_id();
188 struct rte_mbuf *bufs[BURST_SIZE*2];
190 for (port = 0; port < nb_ports; port++) {
191 /* skip ports that are not enabled */
192 if ((enabled_port_mask & (1 << port)) == 0)
195 if (rte_eth_dev_socket_id(port) > 0 &&
196 rte_eth_dev_socket_id(port) != socket_id)
197 printf("WARNING, port %u is on remote NUMA node to "
198 "RX thread.\n\tPerformance will not "
199 "be optimal.\n", port);
202 printf("\nCore %u doing packet RX.\n", rte_lcore_id());
204 while (!quit_signal_rx) {
206 /* skip ports that are not enabled */
207 if ((enabled_port_mask & (1 << port)) == 0) {
208 if (++port == nb_ports)
212 const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
214 if (unlikely(nb_rx == 0)) {
215 if (++port == nb_ports)
219 app_stats.rx.rx_pkts += nb_rx;
222 * You can run the distributor on the rx core with this code. Returned
223 * packets are then send straight to the tx core.
226 rte_distributor_process(d, bufs, nb_rx);
227 const uint16_t nb_ret = rte_distributor_returned_pktsd,
230 app_stats.rx.returned_pkts += nb_ret;
231 if (unlikely(nb_ret == 0)) {
232 if (++port == nb_ports)
237 struct rte_ring *tx_ring = p->dist_tx_ring;
238 uint16_t sent = rte_ring_enqueue_burst(tx_ring,
239 (void *)bufs, nb_ret, NULL);
241 uint16_t nb_ret = nb_rx;
243 * Swap the following two lines if you want the rx traffic
244 * to go directly to tx, no distribution.
246 struct rte_ring *out_ring = p->rx_dist_ring;
247 /* struct rte_ring *out_ring = p->dist_tx_ring; */
249 uint16_t sent = rte_ring_enqueue_burst(out_ring,
250 (void *)bufs, nb_ret, NULL);
253 app_stats.rx.enqueued_pkts += sent;
254 if (unlikely(sent < nb_ret)) {
255 app_stats.rx.enqdrop_pkts += nb_ret - sent;
256 RTE_LOG_DP(DEBUG, DISTRAPP,
257 "%s:Packet loss due to full ring\n", __func__);
258 while (sent < nb_ret)
259 rte_pktmbuf_free(bufs[sent++]);
261 if (++port == nb_ports)
264 /* set worker & tx threads quit flag */
265 printf("\nCore %u exiting rx task.\n", rte_lcore_id());
271 flush_one_port(struct output_buffer *outbuf, uint8_t outp)
273 unsigned int nb_tx = rte_eth_tx_burst(outp, 0,
274 outbuf->mbufs, outbuf->count);
275 app_stats.tx.tx_pkts += outbuf->count;
277 if (unlikely(nb_tx < outbuf->count)) {
278 app_stats.tx.enqdrop_pkts += outbuf->count - nb_tx;
280 rte_pktmbuf_free(outbuf->mbufs[nb_tx]);
281 } while (++nb_tx < outbuf->count);
287 flush_all_ports(struct output_buffer *tx_buffers, uint16_t nb_ports)
291 for (outp = 0; outp < nb_ports; outp++) {
292 /* skip ports that are not enabled */
293 if ((enabled_port_mask & (1 << outp)) == 0)
296 if (tx_buffers[outp].count == 0)
299 flush_one_port(&tx_buffers[outp], outp);
306 lcore_distributor(struct lcore_params *p)
308 struct rte_ring *in_r = p->rx_dist_ring;
309 struct rte_ring *out_r = p->dist_tx_ring;
310 struct rte_mbuf *bufs[BURST_SIZE * 4];
311 struct rte_distributor *d = p->d;
313 printf("\nCore %u acting as distributor core.\n", rte_lcore_id());
314 while (!quit_signal_dist) {
315 const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
316 (void *)bufs, BURST_SIZE*1, NULL);
318 app_stats.dist.in_pkts += nb_rx;
320 /* Distribute the packets */
321 rte_distributor_process(d, bufs, nb_rx);
323 const uint16_t nb_ret =
324 rte_distributor_returned_pkts(d,
327 if (unlikely(nb_ret == 0))
329 app_stats.dist.ret_pkts += nb_ret;
331 uint16_t sent = rte_ring_enqueue_burst(out_r,
332 (void *)bufs, nb_ret, NULL);
333 app_stats.dist.sent_pkts += sent;
334 if (unlikely(sent < nb_ret)) {
335 app_stats.dist.enqdrop_pkts += nb_ret - sent;
336 RTE_LOG(DEBUG, DISTRAPP,
337 "%s:Packet loss due to full out ring\n",
339 while (sent < nb_ret)
340 rte_pktmbuf_free(bufs[sent++]);
344 printf("\nCore %u exiting distributor task.\n", rte_lcore_id());
345 quit_signal_work = 1;
347 rte_distributor_flush(d);
348 /* Unblock any returns so workers can exit */
349 rte_distributor_clear_returns(d);
356 lcore_tx(struct rte_ring *in_r)
358 static struct output_buffer tx_buffers[RTE_MAX_ETHPORTS];
359 const uint16_t nb_ports = rte_eth_dev_count();
360 const int socket_id = rte_socket_id();
363 for (port = 0; port < nb_ports; port++) {
364 /* skip ports that are not enabled */
365 if ((enabled_port_mask & (1 << port)) == 0)
368 if (rte_eth_dev_socket_id(port) > 0 &&
369 rte_eth_dev_socket_id(port) != socket_id)
370 printf("WARNING, port %u is on remote NUMA node to "
371 "TX thread.\n\tPerformance will not "
372 "be optimal.\n", port);
375 printf("\nCore %u doing packet TX.\n", rte_lcore_id());
376 while (!quit_signal) {
378 for (port = 0; port < nb_ports; port++) {
379 /* skip ports that are not enabled */
380 if ((enabled_port_mask & (1 << port)) == 0)
383 struct rte_mbuf *bufs[BURST_SIZE_TX];
384 const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
385 (void *)bufs, BURST_SIZE_TX, NULL);
386 app_stats.tx.dequeue_pkts += nb_rx;
388 /* if we get no traffic, flush anything we have */
389 if (unlikely(nb_rx == 0)) {
390 flush_all_ports(tx_buffers, nb_ports);
394 /* for traffic we receive, queue it up for transmit */
396 rte_prefetch_non_temporal((void *)bufs[0]);
397 rte_prefetch_non_temporal((void *)bufs[1]);
398 rte_prefetch_non_temporal((void *)bufs[2]);
399 for (i = 0; i < nb_rx; i++) {
400 struct output_buffer *outbuf;
402 rte_prefetch_non_temporal((void *)bufs[i + 3]);
404 * workers should update in_port to hold the
407 outp = bufs[i]->port;
408 /* skip ports that are not enabled */
409 if ((enabled_port_mask & (1 << outp)) == 0)
412 outbuf = &tx_buffers[outp];
413 outbuf->mbufs[outbuf->count++] = bufs[i];
414 if (outbuf->count == BURST_SIZE_TX)
415 flush_one_port(outbuf, outp);
419 printf("\nCore %u exiting tx task.\n", rte_lcore_id());
424 int_handler(int sig_num)
426 printf("Exiting on signal %d\n", sig_num);
427 /* set quit flag for rx thread to exit */
428 quit_signal_dist = 1;
434 struct rte_eth_stats eth_stats;
436 const unsigned int num_workers = rte_lcore_count() - 4;
438 for (i = 0; i < rte_eth_dev_count(); i++) {
439 rte_eth_stats_get(i, ð_stats);
440 app_stats.port_rx_pkts[i] = eth_stats.ipackets;
441 app_stats.port_tx_pkts[i] = eth_stats.opackets;
444 printf("\n\nRX Thread:\n");
445 for (i = 0; i < rte_eth_dev_count(); i++) {
446 printf("Port %u Pktsin : %5.2f\n", i,
447 (app_stats.port_rx_pkts[i] -
448 prev_app_stats.port_rx_pkts[i])/1000000.0);
449 prev_app_stats.port_rx_pkts[i] = app_stats.port_rx_pkts[i];
451 printf(" - Received: %5.2f\n",
452 (app_stats.rx.rx_pkts -
453 prev_app_stats.rx.rx_pkts)/1000000.0);
454 printf(" - Returned: %5.2f\n",
455 (app_stats.rx.returned_pkts -
456 prev_app_stats.rx.returned_pkts)/1000000.0);
457 printf(" - Enqueued: %5.2f\n",
458 (app_stats.rx.enqueued_pkts -
459 prev_app_stats.rx.enqueued_pkts)/1000000.0);
460 printf(" - Dropped: %s%5.2f%s\n", ANSI_COLOR_RED,
461 (app_stats.rx.enqdrop_pkts -
462 prev_app_stats.rx.enqdrop_pkts)/1000000.0,
465 printf("Distributor thread:\n");
466 printf(" - In: %5.2f\n",
467 (app_stats.dist.in_pkts -
468 prev_app_stats.dist.in_pkts)/1000000.0);
469 printf(" - Returned: %5.2f\n",
470 (app_stats.dist.ret_pkts -
471 prev_app_stats.dist.ret_pkts)/1000000.0);
472 printf(" - Sent: %5.2f\n",
473 (app_stats.dist.sent_pkts -
474 prev_app_stats.dist.sent_pkts)/1000000.0);
475 printf(" - Dropped %s%5.2f%s\n", ANSI_COLOR_RED,
476 (app_stats.dist.enqdrop_pkts -
477 prev_app_stats.dist.enqdrop_pkts)/1000000.0,
480 printf("TX thread:\n");
481 printf(" - Dequeued: %5.2f\n",
482 (app_stats.tx.dequeue_pkts -
483 prev_app_stats.tx.dequeue_pkts)/1000000.0);
484 for (i = 0; i < rte_eth_dev_count(); i++) {
485 printf("Port %u Pktsout: %5.2f\n",
486 i, (app_stats.port_tx_pkts[i] -
487 prev_app_stats.port_tx_pkts[i])/1000000.0);
488 prev_app_stats.port_tx_pkts[i] = app_stats.port_tx_pkts[i];
490 printf(" - Transmitted: %5.2f\n",
491 (app_stats.tx.tx_pkts -
492 prev_app_stats.tx.tx_pkts)/1000000.0);
493 printf(" - Dropped: %s%5.2f%s\n", ANSI_COLOR_RED,
494 (app_stats.tx.enqdrop_pkts -
495 prev_app_stats.tx.enqdrop_pkts)/1000000.0,
498 prev_app_stats.rx.rx_pkts = app_stats.rx.rx_pkts;
499 prev_app_stats.rx.returned_pkts = app_stats.rx.returned_pkts;
500 prev_app_stats.rx.enqueued_pkts = app_stats.rx.enqueued_pkts;
501 prev_app_stats.rx.enqdrop_pkts = app_stats.rx.enqdrop_pkts;
502 prev_app_stats.dist.in_pkts = app_stats.dist.in_pkts;
503 prev_app_stats.dist.ret_pkts = app_stats.dist.ret_pkts;
504 prev_app_stats.dist.sent_pkts = app_stats.dist.sent_pkts;
505 prev_app_stats.dist.enqdrop_pkts = app_stats.dist.enqdrop_pkts;
506 prev_app_stats.tx.dequeue_pkts = app_stats.tx.dequeue_pkts;
507 prev_app_stats.tx.tx_pkts = app_stats.tx.tx_pkts;
508 prev_app_stats.tx.enqdrop_pkts = app_stats.tx.enqdrop_pkts;
510 for (i = 0; i < num_workers; i++) {
511 printf("Worker %02u Pkts: %5.2f. Bursts(1-8): ", i,
512 (app_stats.worker_pkts[i] -
513 prev_app_stats.worker_pkts[i])/1000000.0);
514 for (j = 0; j < 8; j++) {
515 printf("%"PRIu64" ", app_stats.worker_bursts[i][j]);
516 app_stats.worker_bursts[i][j] = 0;
519 prev_app_stats.worker_pkts[i] = app_stats.worker_pkts[i];
524 lcore_worker(struct lcore_params *p)
526 struct rte_distributor *d = p->d;
527 const unsigned id = p->worker_id;
528 unsigned int num = 0;
532 * for single port, xor_val will be zero so we won't modify the output
533 * port, otherwise we send traffic from 0 to 1, 2 to 3, and vice versa
535 const unsigned xor_val = (rte_eth_dev_count() > 1);
536 struct rte_mbuf *buf[8] __rte_cache_aligned;
538 for (i = 0; i < 8; i++)
541 app_stats.worker_pkts[p->worker_id] = 1;
543 printf("\nCore %u acting as worker core.\n", rte_lcore_id());
544 while (!quit_signal_work) {
545 num = rte_distributor_get_pkt(d, id, buf, buf, num);
546 /* Do a little bit of work for each packet */
547 for (i = 0; i < num; i++) {
548 uint64_t t = rte_rdtsc()+100;
550 while (rte_rdtsc() < t)
552 buf[i]->port ^= xor_val;
555 app_stats.worker_pkts[p->worker_id] += num;
557 app_stats.worker_bursts[p->worker_id][num-1]++;
564 print_usage(const char *prgname)
566 printf("%s [EAL options] -- -p PORTMASK\n"
567 " -p PORTMASK: hexadecimal bitmask of ports to configure\n",
572 parse_portmask(const char *portmask)
577 /* parse hexadecimal string */
578 pm = strtoul(portmask, &end, 16);
579 if ((portmask[0] == '\0') || (end == NULL) || (*end != '\0'))
588 /* Parse the argument given in the command line of the application */
590 parse_args(int argc, char **argv)
595 char *prgname = argv[0];
596 static struct option lgopts[] = {
602 while ((opt = getopt_long(argc, argvopt, "p:",
603 lgopts, &option_index)) != EOF) {
608 enabled_port_mask = parse_portmask(optarg);
609 if (enabled_port_mask == 0) {
610 printf("invalid portmask\n");
611 print_usage(prgname);
617 print_usage(prgname);
623 print_usage(prgname);
627 argv[optind-1] = prgname;
629 optind = 1; /* reset getopt lib */
633 /* Main function, does initialization and calls the per-lcore functions */
635 main(int argc, char *argv[])
637 struct rte_mempool *mbuf_pool;
638 struct rte_distributor *d;
639 struct rte_ring *dist_tx_ring;
640 struct rte_ring *rx_dist_ring;
641 unsigned lcore_id, worker_id = 0;
644 uint16_t nb_ports_available;
647 /* catch ctrl-c so we can print on exit */
648 signal(SIGINT, int_handler);
651 int ret = rte_eal_init(argc, argv);
653 rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
657 /* parse application arguments (after the EAL ones) */
658 ret = parse_args(argc, argv);
660 rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n");
662 if (rte_lcore_count() < 5)
663 rte_exit(EXIT_FAILURE, "Error, This application needs at "
664 "least 5 logical cores to run:\n"
665 "1 lcore for stats (can be core 0)\n"
666 "1 lcore for packet RX\n"
667 "1 lcore for distribution\n"
668 "1 lcore for packet TX\n"
669 "and at least 1 lcore for worker threads\n");
671 nb_ports = rte_eth_dev_count();
673 rte_exit(EXIT_FAILURE, "Error: no ethernet ports detected\n");
674 if (nb_ports != 1 && (nb_ports & 1))
675 rte_exit(EXIT_FAILURE, "Error: number of ports must be even, except "
676 "when using a single port\n");
678 mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL",
679 NUM_MBUFS * nb_ports, MBUF_CACHE_SIZE, 0,
680 RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
681 if (mbuf_pool == NULL)
682 rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n");
683 nb_ports_available = nb_ports;
685 /* initialize all ports */
686 for (portid = 0; portid < nb_ports; portid++) {
687 /* skip ports that are not enabled */
688 if ((enabled_port_mask & (1 << portid)) == 0) {
689 printf("\nSkipping disabled port %d\n", portid);
690 nb_ports_available--;
694 printf("Initializing port %u... done\n", portid);
696 if (port_init(portid, mbuf_pool) != 0)
697 rte_exit(EXIT_FAILURE, "Cannot initialize port %u\n",
701 if (!nb_ports_available) {
702 rte_exit(EXIT_FAILURE,
703 "All available ports are disabled. Please set portmask.\n");
706 d = rte_distributor_create("PKT_DIST", rte_socket_id(),
707 rte_lcore_count() - 4,
710 rte_exit(EXIT_FAILURE, "Cannot create distributor\n");
713 * scheduler ring is read by the transmitter core, and written to
716 dist_tx_ring = rte_ring_create("Output_ring", SCHED_TX_RING_SZ,
717 rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
718 if (dist_tx_ring == NULL)
719 rte_exit(EXIT_FAILURE, "Cannot create output ring\n");
721 rx_dist_ring = rte_ring_create("Input_ring", SCHED_RX_RING_SZ,
722 rte_socket_id(), RING_F_SC_DEQ | RING_F_SP_ENQ);
723 if (rx_dist_ring == NULL)
724 rte_exit(EXIT_FAILURE, "Cannot create output ring\n");
726 RTE_LCORE_FOREACH_SLAVE(lcore_id) {
727 if (worker_id == rte_lcore_count() - 3) {
728 printf("Starting distributor on lcore_id %d\n",
730 /* distributor core */
731 struct lcore_params *p =
732 rte_malloc(NULL, sizeof(*p), 0);
734 rte_panic("malloc failure\n");
735 *p = (struct lcore_params){worker_id, d,
736 rx_dist_ring, dist_tx_ring, mbuf_pool};
737 rte_eal_remote_launch(
738 (lcore_function_t *)lcore_distributor,
740 } else if (worker_id == rte_lcore_count() - 4) {
741 printf("Starting tx on worker_id %d, lcore_id %d\n",
742 worker_id, lcore_id);
744 rte_eal_remote_launch((lcore_function_t *)lcore_tx,
745 dist_tx_ring, lcore_id);
746 } else if (worker_id == rte_lcore_count() - 2) {
747 printf("Starting rx on worker_id %d, lcore_id %d\n",
748 worker_id, lcore_id);
750 struct lcore_params *p =
751 rte_malloc(NULL, sizeof(*p), 0);
753 rte_panic("malloc failure\n");
754 *p = (struct lcore_params){worker_id, d, rx_dist_ring,
755 dist_tx_ring, mbuf_pool};
756 rte_eal_remote_launch((lcore_function_t *)lcore_rx,
759 printf("Starting worker on worker_id %d, lcore_id %d\n",
760 worker_id, lcore_id);
761 struct lcore_params *p =
762 rte_malloc(NULL, sizeof(*p), 0);
764 rte_panic("malloc failure\n");
765 *p = (struct lcore_params){worker_id, d, rx_dist_ring,
766 dist_tx_ring, mbuf_pool};
768 rte_eal_remote_launch((lcore_function_t *)lcore_worker,
774 freq = rte_get_timer_hz();
775 t = rte_rdtsc() + freq;
776 while (!quit_signal_dist) {
777 if (t < rte_rdtsc()) {
779 t = rte_rdtsc() + freq;
784 RTE_LCORE_FOREACH_SLAVE(lcore_id) {
785 if (rte_eal_wait_lcore(lcore_id) < 0)