ethdev: introduce Rx queue offloads API
[dpdk.git] / examples / distributor / main.c
index 96d6454..87603d0 100644 (file)
 #include <rte_debug.h>
 #include <rte_prefetch.h>
 #include <rte_distributor.h>
 #include <rte_debug.h>
 #include <rte_prefetch.h>
 #include <rte_distributor.h>
+#include <rte_pause.h>
 
 
-#define RX_RING_SIZE 256
+#define RX_RING_SIZE 512
 #define TX_RING_SIZE 512
 #define NUM_MBUFS ((64*1024)-1)
 #define TX_RING_SIZE 512
 #define NUM_MBUFS ((64*1024)-1)
-#define MBUF_CACHE_SIZE 250
-#define BURST_SIZE 32
+#define MBUF_CACHE_SIZE 128
+#define BURST_SIZE 64
 #define SCHED_RX_RING_SZ 8192
 #define SCHED_TX_RING_SZ 65536
 #define SCHED_RX_RING_SZ 8192
 #define SCHED_TX_RING_SZ 65536
-#define RTE_RING_SZ 1024
+#define BURST_SIZE_TX 32
 
 #define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1
 
 
 #define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1
 
@@ -137,6 +138,8 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
        const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1;
        int retval;
        uint16_t q;
        const uint16_t rxRings = 1, txRings = rte_lcore_count() - 1;
        int retval;
        uint16_t q;
+       uint16_t nb_rxd = RX_RING_SIZE;
+       uint16_t nb_txd = TX_RING_SIZE;
 
        if (port >= rte_eth_dev_count())
                return -1;
 
        if (port >= rte_eth_dev_count())
                return -1;
@@ -145,8 +148,12 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
        if (retval != 0)
                return retval;
 
        if (retval != 0)
                return retval;
 
+       retval = rte_eth_dev_adjust_nb_rx_tx_desc(port, &nb_rxd, &nb_txd);
+       if (retval != 0)
+               return retval;
+
        for (q = 0; q < rxRings; q++) {
        for (q = 0; q < rxRings; q++) {
-               retval = rte_eth_rx_queue_setup(port, q, RX_RING_SIZE,
+               retval = rte_eth_rx_queue_setup(port, q, nb_rxd,
                                                rte_eth_dev_socket_id(port),
                                                NULL, mbuf_pool);
                if (retval < 0)
                                                rte_eth_dev_socket_id(port),
                                                NULL, mbuf_pool);
                if (retval < 0)
@@ -154,7 +161,7 @@ port_init(uint8_t port, struct rte_mempool *mbuf_pool)
        }
 
        for (q = 0; q < txRings; q++) {
        }
 
        for (q = 0; q < txRings; q++) {
-               retval = rte_eth_tx_queue_setup(port, q, TX_RING_SIZE,
+               retval = rte_eth_tx_queue_setup(port, q, nb_txd,
                                                rte_eth_dev_socket_id(port),
                                                NULL);
                if (retval < 0)
                                                rte_eth_dev_socket_id(port),
                                                NULL);
                if (retval < 0)
@@ -206,6 +213,7 @@ lcore_rx(struct lcore_params *p)
        const uint8_t nb_ports = rte_eth_dev_count();
        const int socket_id = rte_socket_id();
        uint8_t port;
        const uint8_t nb_ports = rte_eth_dev_count();
        const int socket_id = rte_socket_id();
        uint8_t port;
+       struct rte_mbuf *bufs[BURST_SIZE*2];
 
        for (port = 0; port < nb_ports; port++) {
                /* skip ports that are not enabled */
 
        for (port = 0; port < nb_ports; port++) {
                /* skip ports that are not enabled */
@@ -229,7 +237,6 @@ lcore_rx(struct lcore_params *p)
                                port = 0;
                        continue;
                }
                                port = 0;
                        continue;
                }
-               struct rte_mbuf *bufs[BURST_SIZE*2];
                const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
                                BURST_SIZE);
                if (unlikely(nb_rx == 0)) {
                const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
                                BURST_SIZE);
                if (unlikely(nb_rx == 0)) {
@@ -257,7 +264,7 @@ lcore_rx(struct lcore_params *p)
 
                struct rte_ring *tx_ring = p->dist_tx_ring;
                uint16_t sent = rte_ring_enqueue_burst(tx_ring,
 
                struct rte_ring *tx_ring = p->dist_tx_ring;
                uint16_t sent = rte_ring_enqueue_burst(tx_ring,
-                               (void *)bufs, nb_ret);
+                               (void *)bufs, nb_ret, NULL);
 #else
                uint16_t nb_ret = nb_rx;
                /*
 #else
                uint16_t nb_ret = nb_rx;
                /*
@@ -268,11 +275,12 @@ lcore_rx(struct lcore_params *p)
                /* struct rte_ring *out_ring = p->dist_tx_ring; */
 
                uint16_t sent = rte_ring_enqueue_burst(out_ring,
                /* struct rte_ring *out_ring = p->dist_tx_ring; */
 
                uint16_t sent = rte_ring_enqueue_burst(out_ring,
-                               (void *)bufs, nb_ret);
+                               (void *)bufs, nb_ret, NULL);
 #endif
 
                app_stats.rx.enqueued_pkts += sent;
                if (unlikely(sent < nb_ret)) {
 #endif
 
                app_stats.rx.enqueued_pkts += sent;
                if (unlikely(sent < nb_ret)) {
+                       app_stats.rx.enqdrop_pkts +=  nb_ret - sent;
                        RTE_LOG_DP(DEBUG, DISTRAPP,
                                "%s:Packet loss due to full ring\n", __func__);
                        while (sent < nb_ret)
                        RTE_LOG_DP(DEBUG, DISTRAPP,
                                "%s:Packet loss due to full ring\n", __func__);
                        while (sent < nb_ret)
@@ -290,13 +298,12 @@ lcore_rx(struct lcore_params *p)
 static inline void
 flush_one_port(struct output_buffer *outbuf, uint8_t outp)
 {
 static inline void
 flush_one_port(struct output_buffer *outbuf, uint8_t outp)
 {
-       unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs,
-                       outbuf->count);
-       app_stats.tx.tx_pkts += nb_tx;
+       unsigned int nb_tx = rte_eth_tx_burst(outp, 0,
+                       outbuf->mbufs, outbuf->count);
+       app_stats.tx.tx_pkts += outbuf->count;
 
        if (unlikely(nb_tx < outbuf->count)) {
 
        if (unlikely(nb_tx < outbuf->count)) {
-               RTE_LOG_DP(DEBUG, DISTRAPP,
-                       "%s:Packet loss with tx_burst\n", __func__);
+               app_stats.tx.enqdrop_pkts +=  outbuf->count - nb_tx;
                do {
                        rte_pktmbuf_free(outbuf->mbufs[nb_tx]);
                } while (++nb_tx < outbuf->count);
                do {
                        rte_pktmbuf_free(outbuf->mbufs[nb_tx]);
                } while (++nb_tx < outbuf->count);
@@ -308,6 +315,7 @@ static inline void
 flush_all_ports(struct output_buffer *tx_buffers, uint8_t nb_ports)
 {
        uint8_t outp;
 flush_all_ports(struct output_buffer *tx_buffers, uint8_t nb_ports)
 {
        uint8_t outp;
+
        for (outp = 0; outp < nb_ports; outp++) {
                /* skip ports that are not enabled */
                if ((enabled_port_mask & (1 << outp)) == 0)
        for (outp = 0; outp < nb_ports; outp++) {
                /* skip ports that are not enabled */
                if ((enabled_port_mask & (1 << outp)) == 0)
@@ -333,7 +341,7 @@ lcore_distributor(struct lcore_params *p)
        printf("\nCore %u acting as distributor core.\n", rte_lcore_id());
        while (!quit_signal_dist) {
                const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
        printf("\nCore %u acting as distributor core.\n", rte_lcore_id());
        while (!quit_signal_dist) {
                const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
-                               (void *)bufs, BURST_SIZE*1);
+                               (void *)bufs, BURST_SIZE*1, NULL);
                if (nb_rx) {
                        app_stats.dist.in_pkts += nb_rx;
 
                if (nb_rx) {
                        app_stats.dist.in_pkts += nb_rx;
 
@@ -349,7 +357,7 @@ lcore_distributor(struct lcore_params *p)
                        app_stats.dist.ret_pkts += nb_ret;
 
                        uint16_t sent = rte_ring_enqueue_burst(out_r,
                        app_stats.dist.ret_pkts += nb_ret;
 
                        uint16_t sent = rte_ring_enqueue_burst(out_r,
-                                       (void *)bufs, nb_ret);
+                                       (void *)bufs, nb_ret, NULL);
                        app_stats.dist.sent_pkts += sent;
                        if (unlikely(sent < nb_ret)) {
                                app_stats.dist.enqdrop_pkts += nb_ret - sent;
                        app_stats.dist.sent_pkts += sent;
                        if (unlikely(sent < nb_ret)) {
                                app_stats.dist.enqdrop_pkts += nb_ret - sent;
@@ -400,9 +408,9 @@ lcore_tx(struct rte_ring *in_r)
                        if ((enabled_port_mask & (1 << port)) == 0)
                                continue;
 
                        if ((enabled_port_mask & (1 << port)) == 0)
                                continue;
 
-                       struct rte_mbuf *bufs[BURST_SIZE];
+                       struct rte_mbuf *bufs[BURST_SIZE_TX];
                        const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
                        const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
-                                       (void *)bufs, BURST_SIZE);
+                                       (void *)bufs, BURST_SIZE_TX, NULL);
                        app_stats.tx.dequeue_pkts += nb_rx;
 
                        /* if we get no traffic, flush anything we have */
                        app_stats.tx.dequeue_pkts += nb_rx;
 
                        /* if we get no traffic, flush anything we have */
@@ -431,11 +439,12 @@ lcore_tx(struct rte_ring *in_r)
 
                                outbuf = &tx_buffers[outp];
                                outbuf->mbufs[outbuf->count++] = bufs[i];
 
                                outbuf = &tx_buffers[outp];
                                outbuf->mbufs[outbuf->count++] = bufs[i];
-                               if (outbuf->count == BURST_SIZE)
+                               if (outbuf->count == BURST_SIZE_TX)
                                        flush_one_port(outbuf, outp);
                        }
                }
        }
                                        flush_one_port(outbuf, outp);
                        }
                }
        }
+       printf("\nCore %u exiting tx task.\n", rte_lcore_id());
        return 0;
 }
 
        return 0;
 }
 
@@ -557,6 +566,8 @@ lcore_worker(struct lcore_params *p)
        for (i = 0; i < 8; i++)
                buf[i] = NULL;
 
        for (i = 0; i < 8; i++)
                buf[i] = NULL;
 
+       app_stats.worker_pkts[p->worker_id] = 1;
+
        printf("\nCore %u acting as worker core.\n", rte_lcore_id());
        while (!quit_signal_work) {
                num = rte_distributor_get_pkt(d, id, buf, buf, num);
        printf("\nCore %u acting as worker core.\n", rte_lcore_id());
        while (!quit_signal_work) {
                num = rte_distributor_get_pkt(d, id, buf, buf, num);
@@ -568,6 +579,10 @@ lcore_worker(struct lcore_params *p)
                                rte_pause();
                        buf[i]->port ^= xor_val;
                }
                                rte_pause();
                        buf[i]->port ^= xor_val;
                }
+
+               app_stats.worker_pkts[p->worker_id] += num;
+               if (num > 0)
+                       app_stats.worker_bursts[p->worker_id][num-1]++;
        }
        return 0;
 }
        }
        return 0;
 }
@@ -672,9 +687,10 @@ main(int argc, char *argv[])
        if (ret < 0)
                rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n");
 
        if (ret < 0)
                rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n");
 
-       if (rte_lcore_count() < 4)
+       if (rte_lcore_count() < 5)
                rte_exit(EXIT_FAILURE, "Error, This application needs at "
                rte_exit(EXIT_FAILURE, "Error, This application needs at "
-                               "least 4 logical cores to run:\n"
+                               "least 5 logical cores to run:\n"
+                               "1 lcore for stats (can be core 0)\n"
                                "1 lcore for packet RX\n"
                                "1 lcore for distribution\n"
                                "1 lcore for packet TX\n"
                                "1 lcore for packet RX\n"
                                "1 lcore for distribution\n"
                                "1 lcore for packet TX\n"
@@ -716,7 +732,7 @@ main(int argc, char *argv[])
        }
 
        d = rte_distributor_create("PKT_DIST", rte_socket_id(),
        }
 
        d = rte_distributor_create("PKT_DIST", rte_socket_id(),
-                       rte_lcore_count() - 3,
+                       rte_lcore_count() - 4,
                        RTE_DIST_ALG_BURST);
        if (d == NULL)
                rte_exit(EXIT_FAILURE, "Cannot create distributor\n");
                        RTE_DIST_ALG_BURST);
        if (d == NULL)
                rte_exit(EXIT_FAILURE, "Cannot create distributor\n");
@@ -755,7 +771,21 @@ main(int argc, char *argv[])
                        /* tx core */
                        rte_eal_remote_launch((lcore_function_t *)lcore_tx,
                                        dist_tx_ring, lcore_id);
                        /* tx core */
                        rte_eal_remote_launch((lcore_function_t *)lcore_tx,
                                        dist_tx_ring, lcore_id);
+               } else if (worker_id == rte_lcore_count() - 2) {
+                       printf("Starting rx on worker_id %d, lcore_id %d\n",
+                                       worker_id, lcore_id);
+                       /* rx core */
+                       struct lcore_params *p =
+                                       rte_malloc(NULL, sizeof(*p), 0);
+                       if (!p)
+                               rte_panic("malloc failure\n");
+                       *p = (struct lcore_params){worker_id, d, rx_dist_ring,
+                                       dist_tx_ring, mbuf_pool};
+                       rte_eal_remote_launch((lcore_function_t *)lcore_rx,
+                                       p, lcore_id);
                } else {
                } else {
+                       printf("Starting worker on worker_id %d, lcore_id %d\n",
+                                       worker_id, lcore_id);
                        struct lcore_params *p =
                                        rte_malloc(NULL, sizeof(*p), 0);
                        if (!p)
                        struct lcore_params *p =
                                        rte_malloc(NULL, sizeof(*p), 0);
                        if (!p)
@@ -768,11 +798,6 @@ main(int argc, char *argv[])
                }
                worker_id++;
        }
                }
                worker_id++;
        }
-       /* call lcore_main on master core only */
-       struct lcore_params p = { 0, d, rx_dist_ring, dist_tx_ring, mbuf_pool};
-
-       if (lcore_rx(&p) != 0)
-               return -1;
 
        freq = rte_get_timer_hz();
        t = rte_rdtsc() + freq;
 
        freq = rte_get_timer_hz();
        t = rte_rdtsc() + freq;