examples/ip_reassembly: various updates
authorIntel <intel.com>
Mon, 22 Jul 2013 22:00:00 +0000 (00:00 +0200)
committerThomas Monjalon <thomas.monjalon@6wind.com>
Tue, 17 Sep 2013 12:16:10 +0000 (14:16 +0200)
- postpone calls to rte_pktmbuf_free() when a mbuf is not used anymore
- add some tx statistics

Signed-off-by: Intel
examples/ip_reassembly/ipv4_frag_tbl.h
examples/ip_reassembly/ipv4_rsmbl.h
examples/ip_reassembly/main.c

index 33b493b..e234a0e 100644 (file)
@@ -203,9 +203,10 @@ ipv4_frag_lookup(struct ipv4_frag_tbl *tbl,
 }
 
 static inline void
-ipv4_frag_tbl_del(struct ipv4_frag_tbl *tbl,  struct ipv4_frag_pkt *fp)
+ipv4_frag_tbl_del(struct ipv4_frag_tbl *tbl, struct ipv4_frag_death_row *dr,
+       struct ipv4_frag_pkt *fp)
 {
-       ipv4_frag_free(fp);
+       ipv4_frag_free(fp, dr);
        IPV4_FRAG_KEY_INVALIDATE(&fp->key);
        TAILQ_REMOVE(&tbl->lru, fp, lru);
        tbl->use_entries--;
@@ -224,10 +225,10 @@ ipv4_frag_tbl_add(struct ipv4_frag_tbl *tbl,  struct ipv4_frag_pkt *fp,
 }
 
 static inline void
-ipv4_frag_tbl_reuse(struct ipv4_frag_tbl *tbl,  struct ipv4_frag_pkt *fp,
-       uint64_t tms)
+ipv4_frag_tbl_reuse(struct ipv4_frag_tbl *tbl, struct ipv4_frag_death_row *dr,
+       struct ipv4_frag_pkt *fp, uint64_t tms)
 {
-       ipv4_frag_free(fp);
+       ipv4_frag_free(fp, dr);
        ipv4_frag_reset(fp, tms);
        TAILQ_REMOVE(&tbl->lru, fp, lru);
        TAILQ_INSERT_TAIL(&tbl->lru, fp, lru);
@@ -240,8 +241,8 @@ ipv4_frag_tbl_reuse(struct ipv4_frag_tbl *tbl,  struct ipv4_frag_pkt *fp,
  * If the entry is stale, then free and reuse it.
  */
 static inline struct ipv4_frag_pkt *
-ipv4_frag_find(struct ipv4_frag_tbl *tbl, const struct ipv4_frag_key *key,
-       uint64_t tms)
+ipv4_frag_find(struct ipv4_frag_tbl *tbl, struct ipv4_frag_death_row *dr,
+       const struct ipv4_frag_key *key, uint64_t tms)
 {
        struct ipv4_frag_pkt *pkt, *free, *stale, *lru;
        uint64_t max_cycles;
@@ -260,7 +261,7 @@ ipv4_frag_find(struct ipv4_frag_tbl *tbl, const struct ipv4_frag_key *key,
 
                /*timed-out entry, free and invalidate it*/
                if (stale != NULL) {
-                       ipv4_frag_tbl_del(tbl, stale);
+                       ipv4_frag_tbl_del(tbl, dr, stale);
                        free = stale;
 
                /*
@@ -272,7 +273,7 @@ ipv4_frag_find(struct ipv4_frag_tbl *tbl, const struct ipv4_frag_key *key,
                                tbl->max_entries <= tbl->use_entries) {
                        lru = TAILQ_FIRST(&tbl->lru);
                        if (max_cycles + lru->start < tms) {
-                               ipv4_frag_tbl_del(tbl,  lru);
+                               ipv4_frag_tbl_del(tbl, dr, lru);
                        } else {
                                free = NULL;
                                IPV4_FRAG_TBL_STAT_UPDATE(&tbl->stat,
@@ -292,7 +293,7 @@ ipv4_frag_find(struct ipv4_frag_tbl *tbl, const struct ipv4_frag_key *key,
         * and reuse it.
         */
        } else if (max_cycles + pkt->start < tms) {
-               ipv4_frag_tbl_reuse(tbl,  pkt, tms);
+               ipv4_frag_tbl_reuse(tbl, dr, pkt, tms);
        }
 
        IPV4_FRAG_TBL_STAT_UPDATE(&tbl->stat, fail_total, (pkt == NULL));
index bc94030..01cb95b 100644 (file)
@@ -69,7 +69,7 @@ struct ipv4_frag_key {
 
 #define        IPV4_FRAG_KEY_CMP(k1, k2)       \
        (((k1)->src_dst ^ (k2)->src_dst) | ((k1)->id ^ (k2)->id))
-       
+
 
 /*
  * Fragmented packet to reassemble.
@@ -85,6 +85,14 @@ struct ipv4_frag_pkt {
        struct ipv4_frag     frags[MAX_FRAG_NUM];
 } __rte_cache_aligned;
 
+
+struct ipv4_frag_death_row {
+       uint32_t cnt;
+       struct rte_mbuf *row[MAX_PKT_BURST * (MAX_FRAG_NUM + 1)];
+};
+
+#define        IPV4_FRAG_MBUF2DR(dr, mb)       ((dr)->row[(dr)->cnt++] = (mb))
+
 /* logging macros. */
 
 #ifdef IPV4_FRAG_DEBUG
@@ -112,18 +120,42 @@ ipv4_frag_reset(struct ipv4_frag_pkt *fp, uint64_t tms)
 }
 
 static inline void
-ipv4_frag_free(struct ipv4_frag_pkt *fp)
+ipv4_frag_free(struct ipv4_frag_pkt *fp, struct ipv4_frag_death_row *dr)
 {
-       uint32_t i;
+       uint32_t i, k;
 
+       k = dr->cnt;
        for (i = 0; i != fp->last_idx; i++) {
                if (fp->frags[i].mb != NULL) {
-                       rte_pktmbuf_free(fp->frags[i].mb);
+                       dr->row[k++] = fp->frags[i].mb;
                        fp->frags[i].mb = NULL;
                }
        }
 
        fp->last_idx = 0;
+       dr->cnt = k;
+}
+
+static inline void
+ipv4_frag_free_death_row(struct ipv4_frag_death_row *dr, uint32_t prefetch)
+{
+       uint32_t i, k, n;
+
+       k = RTE_MIN(prefetch, dr->cnt);
+       n = dr->cnt;
+
+       for (i = 0; i != k; i++) 
+               rte_prefetch0(dr->row[i]);
+
+       for (i = 0; i != n - k; i++) {
+               rte_prefetch0(dr->row[i + k]);
+               rte_pktmbuf_free(dr->row[i]);
+       }
+
+       for (; i != n; i++)
+               rte_pktmbuf_free(dr->row[i]);
+
+       dr->cnt = 0;
 }
 
 /*
@@ -214,8 +246,8 @@ ipv4_frag_reassemble(const struct ipv4_frag_pkt *fp)
 }
 
 static inline struct rte_mbuf *
-ipv4_frag_process(struct ipv4_frag_pkt *fp, struct rte_mbuf *mb,
-       uint16_t ofs, uint16_t len, uint16_t more_frags)
+ipv4_frag_process(struct ipv4_frag_pkt *fp, struct ipv4_frag_death_row *dr,
+       struct rte_mbuf *mb, uint16_t ofs, uint16_t len, uint16_t more_frags)
 {
        uint32_t idx;
 
@@ -259,9 +291,9 @@ ipv4_frag_process(struct ipv4_frag_pkt *fp, struct rte_mbuf *mb,
                        fp->frags[LAST_FRAG_IDX].len);
 
                /* free all fragments, invalidate the entry. */
-               ipv4_frag_free(fp);
+               ipv4_frag_free(fp, dr);
                IPV4_FRAG_KEY_INVALIDATE(&fp->key);
-               rte_pktmbuf_free(mb);
+               IPV4_FRAG_MBUF2DR(dr, mb);
 
                return (NULL);
        }
@@ -300,7 +332,7 @@ ipv4_frag_process(struct ipv4_frag_pkt *fp, struct rte_mbuf *mb,
                        fp->frags[LAST_FRAG_IDX].len);
 
                /* free associated resources. */
-               ipv4_frag_free(fp);
+               ipv4_frag_free(fp, dr);
        }
 
        /* we are done with that entry, invalidate it. */
@@ -331,8 +363,9 @@ ipv4_frag_process(struct ipv4_frag_pkt *fp, struct rte_mbuf *mb,
  *   - not all fragments of the packet are collected yet.
  */
 static inline struct rte_mbuf *
-ipv4_frag_mbuf(struct ipv4_frag_tbl *tbl, struct rte_mbuf *mb, uint64_t tms,
-       struct ipv4_hdr *ip_hdr, uint16_t ip_ofs, uint16_t ip_flag)
+ipv4_frag_mbuf(struct ipv4_frag_tbl *tbl, struct ipv4_frag_death_row *dr,
+       struct rte_mbuf *mb, uint64_t tms, struct ipv4_hdr *ip_hdr,
+       uint16_t ip_ofs, uint16_t ip_flag)
 {
        struct ipv4_frag_pkt *fp;
        struct ipv4_frag_key key;
@@ -358,8 +391,8 @@ ipv4_frag_mbuf(struct ipv4_frag_tbl *tbl, struct rte_mbuf *mb, uint64_t tms,
                tbl->use_entries);
 
        /* try to find/add entry into the fragment's table. */
-       if ((fp = ipv4_frag_find(tbl, &key, tms)) == NULL) {
-               rte_pktmbuf_free(mb);
+       if ((fp = ipv4_frag_find(tbl, dr, &key, tms)) == NULL) {
+               IPV4_FRAG_MBUF2DR(dr, mb);
                return (NULL);
        }
 
@@ -374,7 +407,7 @@ ipv4_frag_mbuf(struct ipv4_frag_tbl *tbl, struct rte_mbuf *mb, uint64_t tms,
                
 
        /* process the fragmented packet. */
-       mb = ipv4_frag_process(fp, mb, ip_ofs, ip_len, ip_flag);
+       mb = ipv4_frag_process(fp, dr, mb, ip_ofs, ip_len, ip_flag);
        ipv4_frag_inuse(tbl, fp);
 
        IPV4_FRAG_LOG(DEBUG, "%s:%d:\n"
index e18dfe8..291fa8d 100644 (file)
@@ -74,9 +74,7 @@
 #include <rte_tcp.h>
 #include <rte_udp.h>
 #include <rte_string_fns.h>
-
 #include "main.h"
-#include "ipv4_rsmbl.h"
 
 #define APP_LOOKUP_EXACT_MATCH          0
 #define APP_LOOKUP_LPM                  1
 #error "APP_LOOKUP_METHOD set to incorrect value"
 #endif
 
+#define MAX_PKT_BURST 32
+
+#include "ipv4_rsmbl.h"
+
 #ifndef IPv6_BYTES
 #define IPv6_BYTES_FMT "%02x%02x:%02x%02x:%02x%02x:%02x%02x:"\
                        "%02x%02x:%02x%02x:%02x%02x:%02x%02x"
@@ -156,7 +158,6 @@ static uint32_t max_flow_ttl = DEF_FLOW_TTL;
 #define TX_HTHRESH 0  /**< Default values of TX host threshold reg. */
 #define TX_WTHRESH 0  /**< Default values of TX write-back threshold reg. */
 
-#define MAX_PKT_BURST 32
 #define BURST_TX_DRAIN_US 100 /* TX drain every ~100us */
 
 #define NB_SOCKETS 8
@@ -169,6 +170,7 @@ static uint32_t max_flow_ttl = DEF_FLOW_TTL;
  */
 #define RTE_TEST_RX_DESC_DEFAULT 128
 #define RTE_TEST_TX_DESC_DEFAULT 512
+
 static uint16_t nb_rxd = RTE_TEST_RX_DESC_DEFAULT;
 static uint16_t nb_txd = RTE_TEST_TX_DESC_DEFAULT;
 
@@ -181,8 +183,10 @@ static int promiscuous_on = 0; /**< Ports set in promiscuous mode off by default
 static int numa_on = 1; /**< NUMA is enabled by default. */
 
 struct mbuf_table {
-       uint16_t len;
-       struct rte_mbuf *m_table[MAX_PKT_BURST];
+       uint32_t len;
+       uint32_t head;
+       uint32_t tail;
+       struct rte_mbuf *m_table[0];
 };
 
 struct lcore_rx_queue {
@@ -380,11 +384,23 @@ static lookup_struct_t *ipv4_l3fwd_lookup_struct[NB_SOCKETS];
 static lookup6_struct_t *ipv6_l3fwd_lookup_struct[NB_SOCKETS];
 #endif
 
+struct tx_lcore_stat {
+       uint64_t call;
+       uint64_t drop;
+       uint64_t queue;
+       uint64_t send;
+};
+
+#ifdef IPV4_FRAG_TBL_STAT
+#define        TX_LCORE_STAT_UPDATE(s, f, v)   ((s)->f += (v))
+#else
+#define        TX_LCORE_STAT_UPDATE(s, f, v)   do {} while (0)
+#endif /* IPV4_FRAG_TBL_STAT */
+
 struct lcore_conf {
        uint16_t n_rx_queue;
        struct lcore_rx_queue rx_queue_list[MAX_RX_QUEUE_PER_LCORE];
        uint16_t tx_queue_id[MAX_PORTS];
-       struct mbuf_table tx_mbufs[MAX_PORTS];
        lookup_struct_t * ipv4_lookup_struct;
 #if (APP_LOOKUP_METHOD == APP_LOOKUP_LPM)
        lookup6_struct_t * ipv6_lookup_struct;
@@ -393,54 +409,75 @@ struct lcore_conf {
 #endif
        struct ipv4_frag_tbl *frag_tbl[MAX_RX_QUEUE_PER_LCORE];
        struct rte_mempool *pool[MAX_RX_QUEUE_PER_LCORE];
+       struct ipv4_frag_death_row death_row;
+       struct mbuf_table *tx_mbufs[MAX_PORTS];
+       struct tx_lcore_stat tx_stat;
 } __rte_cache_aligned;
 
 static struct lcore_conf lcore_conf[RTE_MAX_LCORE];
 
-/* Send burst of packets on an output interface */
-static inline int
-send_burst(struct lcore_conf *qconf, uint16_t n, uint8_t port)
+/*
+ * If number of queued packets reached given threahold, then
+ * send burst of packets on an output interface.
+ */
+static inline uint32_t
+send_burst(struct lcore_conf *qconf, uint32_t thresh, uint8_t port)
 {
-       struct rte_mbuf **m_table;
-       int ret;
-       uint16_t queueid;
+       uint32_t fill, len, k, n;
+       struct mbuf_table *txmb;
+
+       txmb = qconf->tx_mbufs[port];
+       len = txmb->len;
+
+       if ((int32_t)(fill = txmb->head - txmb->tail) < 0)
+               fill += len;
+
+       if (fill >= thresh) {
+               n = RTE_MIN(len - txmb->tail, fill);
+                       
+               k = rte_eth_tx_burst(port, qconf->tx_queue_id[port],
+                       txmb->m_table + txmb->tail, (uint16_t)n);
 
-       queueid = qconf->tx_queue_id[port];
-       m_table = (struct rte_mbuf **)qconf->tx_mbufs[port].m_table;
+               TX_LCORE_STAT_UPDATE(&qconf->tx_stat, call, 1);
+               TX_LCORE_STAT_UPDATE(&qconf->tx_stat, send, k);
 
-       ret = rte_eth_tx_burst(port, queueid, m_table, n);
-       if (unlikely(ret < n)) {
-               do {
-                       rte_pktmbuf_free(m_table[ret]);
-               } while (++ret < n);
+               fill -= k;
+               if ((txmb->tail += k) == len)
+                       txmb->tail = 0;
        }
 
-       return 0;
+       return (fill);
 }
 
 /* Enqueue a single packet, and send burst if queue is filled */
 static inline int
 send_single_packet(struct rte_mbuf *m, uint8_t port)
 {
-       uint32_t lcore_id;
-       uint16_t len;
+       uint32_t fill, lcore_id, len;
        struct lcore_conf *qconf;
+       struct mbuf_table *txmb;
 
        lcore_id = rte_lcore_id();
-
        qconf = &lcore_conf[lcore_id];
-       len = qconf->tx_mbufs[port].len;
-       qconf->tx_mbufs[port].m_table[len] = m;
-       len++;
-
-       /* enough pkts to be sent */
-       if (unlikely(len == MAX_PKT_BURST)) {
-               send_burst(qconf, MAX_PKT_BURST, port);
-               len = 0;
+
+       txmb = qconf->tx_mbufs[port];
+       len = txmb->len;
+
+       fill = send_burst(qconf, MAX_PKT_BURST, port);
+
+       if (fill == len - 1) {
+               TX_LCORE_STAT_UPDATE(&qconf->tx_stat, drop, 1);
+               rte_pktmbuf_free(txmb->m_table[txmb->tail]);
+               if (++txmb->tail == len)
+                       txmb->tail = 0;
        }
+               
+       TX_LCORE_STAT_UPDATE(&qconf->tx_stat, queue, 1);
+       txmb->m_table[txmb->head] = m;
+       if(++txmb->head == len)
+               txmb->head = 0;
 
-       qconf->tx_mbufs[port].len = len;
-       return 0;
+       return (0);
 }
 
 #ifdef DO_RFC_1812_CHECKS
@@ -637,15 +674,17 @@ l3fwd_simple_forward(struct rte_mbuf *m, uint8_t portid, uint32_t queue,
 
                        struct rte_mbuf *mo;
                        struct ipv4_frag_tbl *tbl;
+                       struct ipv4_frag_death_row *dr;
 
                        tbl = qconf->frag_tbl[queue];
+                       dr = &qconf->death_row;
 
                        /* prepare mbuf: setup l2_len/l3_len. */
                        m->pkt.vlan_macip.f.l2_len = sizeof(*eth_hdr);
                        m->pkt.vlan_macip.f.l3_len = sizeof(*ipv4_hdr);
 
                        /* process this fragment. */
-                       if ((mo = ipv4_frag_mbuf(tbl, m, tms, ipv4_hdr,
+                       if ((mo = ipv4_frag_mbuf(tbl, dr, m, tms, ipv4_hdr,
                                        ip_ofs, ip_flag)) == NULL) 
                                /* no packet to send out. */
                                return;
@@ -659,8 +698,10 @@ l3fwd_simple_forward(struct rte_mbuf *m, uint8_t portid, uint32_t queue,
                        }
                }
 
-               dst_port = get_ipv4_dst_port(ipv4_hdr, portid, qconf->ipv4_lookup_struct);
-               if (dst_port >= MAX_PORTS || (enabled_port_mask & 1 << dst_port) == 0)
+               dst_port = get_ipv4_dst_port(ipv4_hdr, portid,
+                       qconf->ipv4_lookup_struct);
+               if (dst_port >= MAX_PORTS ||
+                               (enabled_port_mask & 1 << dst_port) == 0)
                        dst_port = portid;
 
                /* 02:00:00:00:00:xx */
@@ -743,12 +784,8 @@ main_loop(__attribute__((unused)) void *dummy)
                         * portid), but it is not called so often
                         */
                        for (portid = 0; portid < MAX_PORTS; portid++) {
-                               if (qconf->tx_mbufs[portid].len == 0)
-                                       continue;
-                               send_burst(&lcore_conf[lcore_id],
-                                       qconf->tx_mbufs[portid].len,
-                                       portid);
-                               qconf->tx_mbufs[portid].len = 0;
+                               if ((enabled_port_mask & (1 << portid)) != 0)
+                                       send_burst(qconf, 1, portid);
                        }
 
                        prev_tsc = cur_tsc;
@@ -784,6 +821,9 @@ main_loop(__attribute__((unused)) void *dummy)
                                l3fwd_simple_forward(pkts_burst[j], portid,
                                        i, qconf, cur_tsc);
                        }
+
+                       ipv4_frag_free_death_row(&qconf->death_row,
+                               PREFETCH_OFFSET);
                }
        }
 }
@@ -1384,9 +1424,29 @@ check_all_ports_link_status(uint8_t port_num, uint32_t port_mask)
                }
        }
 }
+static void
+setup_port_tbl(struct lcore_conf *qconf, uint32_t lcore, int socket,
+       uint32_t port)
+{
+       struct mbuf_table *mtb;
+       uint32_t n;
+       size_t sz;
+
+       n = RTE_MAX(max_flow_num, 2UL * MAX_PKT_BURST);
+       sz = sizeof (*mtb) + sizeof (mtb->m_table[0]) *  n;
+
+       if ((mtb = rte_zmalloc_socket(__func__, sz, CACHE_LINE_SIZE,
+                       socket)) == NULL)
+               rte_exit(EXIT_FAILURE, "%s() for lcore: %u, port: %u "
+                       "failed to allocate %zu bytes\n",
+                       __func__, lcore, port, sz);
+
+       mtb->len = n;
+       qconf->tx_mbufs[port] = mtb;
+}
 
 static void
-setup_queue_frag_tbl(struct lcore_conf *qconf, uint32_t lcore, int socket,
+setup_queue_tbl(struct lcore_conf *qconf, uint32_t lcore, int socket,
        uint32_t queue)
 {
        uint32_t nb_mbuf;
@@ -1403,10 +1463,16 @@ setup_queue_frag_tbl(struct lcore_conf *qconf, uint32_t lcore, int socket,
                        "lcore: %u for queue: %u failed\n",
                        max_flow_num, lcore, queue);
 
-       nb_mbuf = max_flow_num * MAX_FRAG_NUM;
+       /*
+        * At any given moment up to <max_flow_num * (MAX_FRAG_NUM - 1)>
+        * mbufs could be stored int the fragment table.
+        * Plus, each TX queue can hold up to <max_flow_num> packets.
+        */ 
+
+       nb_mbuf = 2 * RTE_MAX(max_flow_num, 2UL * MAX_PKT_BURST) * MAX_FRAG_NUM;
        nb_mbuf *= (port_conf.rxmode.max_rx_pkt_len + BUF_SIZE - 1) / BUF_SIZE;
-       nb_mbuf += RTE_TEST_RX_DESC_DEFAULT + MAX_PKT_BURST +
-               RTE_TEST_TX_DESC_DEFAULT;
+       nb_mbuf += RTE_TEST_RX_DESC_DEFAULT + RTE_TEST_TX_DESC_DEFAULT;
+
        nb_mbuf = RTE_MAX(nb_mbuf, (uint32_t)DEF_MBUF_NUM);
                
        rte_snprintf(buf, sizeof(buf), "mbuf_pool_%u_%u", lcore, queue);
@@ -1419,7 +1485,7 @@ setup_queue_frag_tbl(struct lcore_conf *qconf, uint32_t lcore, int socket,
 }
 
 static void
-queue_frag_tbl_dump_stat(void)
+queue_dump_stat(void)
 {
        uint32_t i, lcore;
        const struct lcore_conf *qconf;
@@ -1436,6 +1502,14 @@ queue_frag_tbl_dump_stat(void)
                                lcore,  qconf->rx_queue_list[i].port_id,
                                qconf->rx_queue_list[i].queue_id);
                        ipv4_frag_tbl_dump_stat(stdout, qconf->frag_tbl[i]);
+                       fprintf(stdout, "TX bursts:\t%" PRIu64 "\n"
+                               "TX packets _queued:\t%" PRIu64 "\n"
+                               "TX packets dropped:\t%" PRIu64 "\n"
+                               "TX packets send:\t%" PRIu64 "\n",
+                               qconf->tx_stat.call,
+                               qconf->tx_stat.queue,
+                               qconf->tx_stat.drop,
+                               qconf->tx_stat.send);
                }
        }
 }
@@ -1443,7 +1517,7 @@ queue_frag_tbl_dump_stat(void)
 static void
 signal_handler(int signum)
 {
-       queue_frag_tbl_dump_stat();
+       queue_dump_stat();
        if (signum != SIGUSR1)
                rte_exit(0, "received signal: %d, exiting\n", signum);
 }
@@ -1549,6 +1623,7 @@ MAIN(int argc, char **argv)
 
                        qconf = &lcore_conf[lcore_id];
                        qconf->tx_queue_id[portid] = queueid;
+                       setup_port_tbl(qconf, lcore_id, socketid, portid);
                        queueid++;
                }
                printf("\n");
@@ -1573,7 +1648,7 @@ MAIN(int argc, char **argv)
                        printf("rxq=%d,%d,%d ", portid, queueid, socketid);
                        fflush(stdout);
 
-                       setup_queue_frag_tbl(qconf, lcore_id, socketid, queue);
+                       setup_queue_tbl(qconf, lcore_id, socketid, queue);
 
                        ret = rte_eth_rx_queue_setup(portid, queueid, nb_rxd,
                                        socketid, &rx_conf, qconf->pool[queue]);