examples: fix optind reset
[dpdk.git] / examples / distributor / main.c
index 78fe3e9..7b8a759 100644 (file)
 #include <rte_cycles.h>
 #include <rte_malloc.h>
 #include <rte_debug.h>
+#include <rte_prefetch.h>
 #include <rte_distributor.h>
 
 #define RX_RING_SIZE 256
 #define TX_RING_SIZE 512
 #define NUM_MBUFS ((64*1024)-1)
-#define MBUF_DATA_SIZE (2048 + RTE_PKTMBUF_HEADROOM)
 #define MBUF_CACHE_SIZE 250
 #define BURST_SIZE 32
 #define RTE_RING_SZ 1024
 
-/* uncommnet below line to enable debug logs */
-/* #define DEBUG */
-
-#ifdef DEBUG
-#define LOG_LEVEL RTE_LOG_DEBUG
-#define LOG_DEBUG(log_type, fmt, args...) do { \
-       RTE_LOG(DEBUG, log_type, fmt, ##args)           \
-} while (0)
-#else
-#define LOG_LEVEL RTE_LOG_INFO
-#define LOG_DEBUG(log_type, fmt, args...) do {} while (0)
-#endif
-
 #define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1
 
 /* mask of enabled ports */
@@ -178,19 +165,25 @@ struct lcore_params {
        struct rte_mempool *mem_pool;
 };
 
-static void
+static int
 quit_workers(struct rte_distributor *d, struct rte_mempool *p)
 {
        const unsigned num_workers = rte_lcore_count() - 2;
        unsigned i;
        struct rte_mbuf *bufs[num_workers];
-       rte_mempool_get_bulk(p, (void *)bufs, num_workers);
+
+       if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) {
+               printf("line %d: Error getting mbufs from pool\n", __LINE__);
+               return -1;
+       }
 
        for (i = 0; i < num_workers; i++)
                bufs[i]->hash.rss = i << 1;
 
        rte_distributor_process(d, bufs, num_workers);
        rte_mempool_put_bulk(p, (void *)bufs, num_workers);
+
+       return 0;
 }
 
 static int
@@ -228,19 +221,28 @@ lcore_rx(struct lcore_params *p)
                struct rte_mbuf *bufs[BURST_SIZE*2];
                const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
                                BURST_SIZE);
+               if (unlikely(nb_rx == 0)) {
+                       if (++port == nb_ports)
+                               port = 0;
+                       continue;
+               }
                app_stats.rx.rx_pkts += nb_rx;
 
                rte_distributor_process(d, bufs, nb_rx);
                const uint16_t nb_ret = rte_distributor_returned_pkts(d,
                                bufs, BURST_SIZE*2);
                app_stats.rx.returned_pkts += nb_ret;
-               if (unlikely(nb_ret == 0))
+               if (unlikely(nb_ret == 0)) {
+                       if (++port == nb_ports)
+                               port = 0;
                        continue;
+               }
 
                uint16_t sent = rte_ring_enqueue_burst(r, (void *)bufs, nb_ret);
                app_stats.rx.enqueued_pkts += sent;
                if (unlikely(sent < nb_ret)) {
-                       LOG_DEBUG(DISTRAPP, "%s:Packet loss due to full ring\n", __func__);
+                       RTE_LOG_DP(DEBUG, DISTRAPP,
+                               "%s:Packet loss due to full ring\n", __func__);
                        while (sent < nb_ret)
                                rte_pktmbuf_free(bufs[sent++]);
                }
@@ -258,7 +260,8 @@ lcore_rx(struct lcore_params *p)
         * get packets till quit_signal is actually been
         * received and they gracefully shutdown
         */
-       quit_workers(d, mem_pool);
+       if (quit_workers(d, mem_pool) != 0)
+               return -1;
        /* rx thread should quit at last */
        return 0;
 }
@@ -271,7 +274,8 @@ flush_one_port(struct output_buffer *outbuf, uint8_t outp)
        app_stats.tx.tx_pkts += nb_tx;
 
        if (unlikely(nb_tx < outbuf->count)) {
-               LOG_DEBUG(DISTRAPP, "%s:Packet loss with tx_burst\n", __func__);
+               RTE_LOG_DP(DEBUG, DISTRAPP,
+                       "%s:Packet loss with tx_burst\n", __func__);
                do {
                        rte_pktmbuf_free(outbuf->mbufs[nb_tx]);
                } while (++nb_tx < outbuf->count);
@@ -336,13 +340,13 @@ lcore_tx(struct rte_ring *in_r)
 
                        /* for traffic we receive, queue it up for transmit */
                        uint16_t i;
-                       _mm_prefetch(bufs[0], 0);
-                       _mm_prefetch(bufs[1], 0);
-                       _mm_prefetch(bufs[2], 0);
+                       rte_prefetch_non_temporal((void *)bufs[0]);
+                       rte_prefetch_non_temporal((void *)bufs[1]);
+                       rte_prefetch_non_temporal((void *)bufs[2]);
                        for (i = 0; i < nb_rx; i++) {
                                struct output_buffer *outbuf;
                                uint8_t outp;
-                               _mm_prefetch(bufs[i + 3], 0);
+                               rte_prefetch_non_temporal((void *)bufs[i + 3]);
                                /*
                                 * workers should update in_port to hold the
                                 * output port value
@@ -483,7 +487,7 @@ parse_args(int argc, char **argv)
 
        argv[optind-1] = prgname;
 
-       optind = 0; /* reset getopt lib */
+       optind = 1; /* reset getopt lib */
        return 0;
 }
 
@@ -529,8 +533,8 @@ main(int argc, char *argv[])
                                "when using a single port\n");
 
        mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL",
-               NUM_MBUFS * nb_ports, MBUF_CACHE_SIZE, 0, MBUF_DATA_SIZE,
-               rte_socket_id());
+               NUM_MBUFS * nb_ports, MBUF_CACHE_SIZE, 0,
+               RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
        if (mbuf_pool == NULL)
                rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n");
        nb_ports_available = nb_ports;
@@ -588,7 +592,9 @@ main(int argc, char *argv[])
        }
        /* call lcore_main on master core only */
        struct lcore_params p = { 0, d, output_ring, mbuf_pool};
-       lcore_rx(&p);
+
+       if (lcore_rx(&p) != 0)
+               return -1;
 
        RTE_LCORE_FOREACH_SLAVE(lcore_id) {
                if (rte_eal_wait_lcore(lcore_id) < 0)