From: Intel Date: Mon, 22 Jul 2013 22:00:00 +0000 (+0200) Subject: examples/ip_reassembly: various updates X-Git-Tag: spdx-start~11164 X-Git-Url: http://git.droids-corp.org/?a=commitdiff_plain;h=595ea7dc80789e26424a8efe0b0e2a99a0da3230;p=dpdk.git examples/ip_reassembly: various updates - postpone calls to rte_pktmbuf_free() when a mbuf is not used anymore - add some tx statistics Signed-off-by: Intel --- diff --git a/examples/ip_reassembly/ipv4_frag_tbl.h b/examples/ip_reassembly/ipv4_frag_tbl.h index 33b493bafb..e234a0e04c 100644 --- a/examples/ip_reassembly/ipv4_frag_tbl.h +++ b/examples/ip_reassembly/ipv4_frag_tbl.h @@ -203,9 +203,10 @@ ipv4_frag_lookup(struct ipv4_frag_tbl *tbl, } static inline void -ipv4_frag_tbl_del(struct ipv4_frag_tbl *tbl, struct ipv4_frag_pkt *fp) +ipv4_frag_tbl_del(struct ipv4_frag_tbl *tbl, struct ipv4_frag_death_row *dr, + struct ipv4_frag_pkt *fp) { - ipv4_frag_free(fp); + ipv4_frag_free(fp, dr); IPV4_FRAG_KEY_INVALIDATE(&fp->key); TAILQ_REMOVE(&tbl->lru, fp, lru); tbl->use_entries--; @@ -224,10 +225,10 @@ ipv4_frag_tbl_add(struct ipv4_frag_tbl *tbl, struct ipv4_frag_pkt *fp, } static inline void -ipv4_frag_tbl_reuse(struct ipv4_frag_tbl *tbl, struct ipv4_frag_pkt *fp, - uint64_t tms) +ipv4_frag_tbl_reuse(struct ipv4_frag_tbl *tbl, struct ipv4_frag_death_row *dr, + struct ipv4_frag_pkt *fp, uint64_t tms) { - ipv4_frag_free(fp); + ipv4_frag_free(fp, dr); ipv4_frag_reset(fp, tms); TAILQ_REMOVE(&tbl->lru, fp, lru); TAILQ_INSERT_TAIL(&tbl->lru, fp, lru); @@ -240,8 +241,8 @@ ipv4_frag_tbl_reuse(struct ipv4_frag_tbl *tbl, struct ipv4_frag_pkt *fp, * If the entry is stale, then free and reuse it. */ static inline struct ipv4_frag_pkt * -ipv4_frag_find(struct ipv4_frag_tbl *tbl, const struct ipv4_frag_key *key, - uint64_t tms) +ipv4_frag_find(struct ipv4_frag_tbl *tbl, struct ipv4_frag_death_row *dr, + const struct ipv4_frag_key *key, uint64_t tms) { struct ipv4_frag_pkt *pkt, *free, *stale, *lru; uint64_t max_cycles; @@ -260,7 +261,7 @@ ipv4_frag_find(struct ipv4_frag_tbl *tbl, const struct ipv4_frag_key *key, /*timed-out entry, free and invalidate it*/ if (stale != NULL) { - ipv4_frag_tbl_del(tbl, stale); + ipv4_frag_tbl_del(tbl, dr, stale); free = stale; /* @@ -272,7 +273,7 @@ ipv4_frag_find(struct ipv4_frag_tbl *tbl, const struct ipv4_frag_key *key, tbl->max_entries <= tbl->use_entries) { lru = TAILQ_FIRST(&tbl->lru); if (max_cycles + lru->start < tms) { - ipv4_frag_tbl_del(tbl, lru); + ipv4_frag_tbl_del(tbl, dr, lru); } else { free = NULL; IPV4_FRAG_TBL_STAT_UPDATE(&tbl->stat, @@ -292,7 +293,7 @@ ipv4_frag_find(struct ipv4_frag_tbl *tbl, const struct ipv4_frag_key *key, * and reuse it. */ } else if (max_cycles + pkt->start < tms) { - ipv4_frag_tbl_reuse(tbl, pkt, tms); + ipv4_frag_tbl_reuse(tbl, dr, pkt, tms); } IPV4_FRAG_TBL_STAT_UPDATE(&tbl->stat, fail_total, (pkt == NULL)); diff --git a/examples/ip_reassembly/ipv4_rsmbl.h b/examples/ip_reassembly/ipv4_rsmbl.h index bc94030d67..01cb95b07d 100644 --- a/examples/ip_reassembly/ipv4_rsmbl.h +++ b/examples/ip_reassembly/ipv4_rsmbl.h @@ -69,7 +69,7 @@ struct ipv4_frag_key { #define IPV4_FRAG_KEY_CMP(k1, k2) \ (((k1)->src_dst ^ (k2)->src_dst) | ((k1)->id ^ (k2)->id)) - + /* * Fragmented packet to reassemble. @@ -85,6 +85,14 @@ struct ipv4_frag_pkt { struct ipv4_frag frags[MAX_FRAG_NUM]; } __rte_cache_aligned; + +struct ipv4_frag_death_row { + uint32_t cnt; + struct rte_mbuf *row[MAX_PKT_BURST * (MAX_FRAG_NUM + 1)]; +}; + +#define IPV4_FRAG_MBUF2DR(dr, mb) ((dr)->row[(dr)->cnt++] = (mb)) + /* logging macros. */ #ifdef IPV4_FRAG_DEBUG @@ -112,18 +120,42 @@ ipv4_frag_reset(struct ipv4_frag_pkt *fp, uint64_t tms) } static inline void -ipv4_frag_free(struct ipv4_frag_pkt *fp) +ipv4_frag_free(struct ipv4_frag_pkt *fp, struct ipv4_frag_death_row *dr) { - uint32_t i; + uint32_t i, k; + k = dr->cnt; for (i = 0; i != fp->last_idx; i++) { if (fp->frags[i].mb != NULL) { - rte_pktmbuf_free(fp->frags[i].mb); + dr->row[k++] = fp->frags[i].mb; fp->frags[i].mb = NULL; } } fp->last_idx = 0; + dr->cnt = k; +} + +static inline void +ipv4_frag_free_death_row(struct ipv4_frag_death_row *dr, uint32_t prefetch) +{ + uint32_t i, k, n; + + k = RTE_MIN(prefetch, dr->cnt); + n = dr->cnt; + + for (i = 0; i != k; i++) + rte_prefetch0(dr->row[i]); + + for (i = 0; i != n - k; i++) { + rte_prefetch0(dr->row[i + k]); + rte_pktmbuf_free(dr->row[i]); + } + + for (; i != n; i++) + rte_pktmbuf_free(dr->row[i]); + + dr->cnt = 0; } /* @@ -214,8 +246,8 @@ ipv4_frag_reassemble(const struct ipv4_frag_pkt *fp) } static inline struct rte_mbuf * -ipv4_frag_process(struct ipv4_frag_pkt *fp, struct rte_mbuf *mb, - uint16_t ofs, uint16_t len, uint16_t more_frags) +ipv4_frag_process(struct ipv4_frag_pkt *fp, struct ipv4_frag_death_row *dr, + struct rte_mbuf *mb, uint16_t ofs, uint16_t len, uint16_t more_frags) { uint32_t idx; @@ -259,9 +291,9 @@ ipv4_frag_process(struct ipv4_frag_pkt *fp, struct rte_mbuf *mb, fp->frags[LAST_FRAG_IDX].len); /* free all fragments, invalidate the entry. */ - ipv4_frag_free(fp); + ipv4_frag_free(fp, dr); IPV4_FRAG_KEY_INVALIDATE(&fp->key); - rte_pktmbuf_free(mb); + IPV4_FRAG_MBUF2DR(dr, mb); return (NULL); } @@ -300,7 +332,7 @@ ipv4_frag_process(struct ipv4_frag_pkt *fp, struct rte_mbuf *mb, fp->frags[LAST_FRAG_IDX].len); /* free associated resources. */ - ipv4_frag_free(fp); + ipv4_frag_free(fp, dr); } /* we are done with that entry, invalidate it. */ @@ -331,8 +363,9 @@ ipv4_frag_process(struct ipv4_frag_pkt *fp, struct rte_mbuf *mb, * - not all fragments of the packet are collected yet. */ static inline struct rte_mbuf * -ipv4_frag_mbuf(struct ipv4_frag_tbl *tbl, struct rte_mbuf *mb, uint64_t tms, - struct ipv4_hdr *ip_hdr, uint16_t ip_ofs, uint16_t ip_flag) +ipv4_frag_mbuf(struct ipv4_frag_tbl *tbl, struct ipv4_frag_death_row *dr, + struct rte_mbuf *mb, uint64_t tms, struct ipv4_hdr *ip_hdr, + uint16_t ip_ofs, uint16_t ip_flag) { struct ipv4_frag_pkt *fp; struct ipv4_frag_key key; @@ -358,8 +391,8 @@ ipv4_frag_mbuf(struct ipv4_frag_tbl *tbl, struct rte_mbuf *mb, uint64_t tms, tbl->use_entries); /* try to find/add entry into the fragment's table. */ - if ((fp = ipv4_frag_find(tbl, &key, tms)) == NULL) { - rte_pktmbuf_free(mb); + if ((fp = ipv4_frag_find(tbl, dr, &key, tms)) == NULL) { + IPV4_FRAG_MBUF2DR(dr, mb); return (NULL); } @@ -374,7 +407,7 @@ ipv4_frag_mbuf(struct ipv4_frag_tbl *tbl, struct rte_mbuf *mb, uint64_t tms, /* process the fragmented packet. */ - mb = ipv4_frag_process(fp, mb, ip_ofs, ip_len, ip_flag); + mb = ipv4_frag_process(fp, dr, mb, ip_ofs, ip_len, ip_flag); ipv4_frag_inuse(tbl, fp); IPV4_FRAG_LOG(DEBUG, "%s:%d:\n" diff --git a/examples/ip_reassembly/main.c b/examples/ip_reassembly/main.c index e18dfe89c5..291fa8d900 100644 --- a/examples/ip_reassembly/main.c +++ b/examples/ip_reassembly/main.c @@ -74,9 +74,7 @@ #include #include #include - #include "main.h" -#include "ipv4_rsmbl.h" #define APP_LOOKUP_EXACT_MATCH 0 #define APP_LOOKUP_LPM 1 @@ -95,6 +93,10 @@ #error "APP_LOOKUP_METHOD set to incorrect value" #endif +#define MAX_PKT_BURST 32 + +#include "ipv4_rsmbl.h" + #ifndef IPv6_BYTES #define IPv6_BYTES_FMT "%02x%02x:%02x%02x:%02x%02x:%02x%02x:"\ "%02x%02x:%02x%02x:%02x%02x:%02x%02x" @@ -156,7 +158,6 @@ static uint32_t max_flow_ttl = DEF_FLOW_TTL; #define TX_HTHRESH 0 /**< Default values of TX host threshold reg. */ #define TX_WTHRESH 0 /**< Default values of TX write-back threshold reg. */ -#define MAX_PKT_BURST 32 #define BURST_TX_DRAIN_US 100 /* TX drain every ~100us */ #define NB_SOCKETS 8 @@ -169,6 +170,7 @@ static uint32_t max_flow_ttl = DEF_FLOW_TTL; */ #define RTE_TEST_RX_DESC_DEFAULT 128 #define RTE_TEST_TX_DESC_DEFAULT 512 + static uint16_t nb_rxd = RTE_TEST_RX_DESC_DEFAULT; static uint16_t nb_txd = RTE_TEST_TX_DESC_DEFAULT; @@ -181,8 +183,10 @@ static int promiscuous_on = 0; /**< Ports set in promiscuous mode off by default static int numa_on = 1; /**< NUMA is enabled by default. */ struct mbuf_table { - uint16_t len; - struct rte_mbuf *m_table[MAX_PKT_BURST]; + uint32_t len; + uint32_t head; + uint32_t tail; + struct rte_mbuf *m_table[0]; }; struct lcore_rx_queue { @@ -380,11 +384,23 @@ static lookup_struct_t *ipv4_l3fwd_lookup_struct[NB_SOCKETS]; static lookup6_struct_t *ipv6_l3fwd_lookup_struct[NB_SOCKETS]; #endif +struct tx_lcore_stat { + uint64_t call; + uint64_t drop; + uint64_t queue; + uint64_t send; +}; + +#ifdef IPV4_FRAG_TBL_STAT +#define TX_LCORE_STAT_UPDATE(s, f, v) ((s)->f += (v)) +#else +#define TX_LCORE_STAT_UPDATE(s, f, v) do {} while (0) +#endif /* IPV4_FRAG_TBL_STAT */ + struct lcore_conf { uint16_t n_rx_queue; struct lcore_rx_queue rx_queue_list[MAX_RX_QUEUE_PER_LCORE]; uint16_t tx_queue_id[MAX_PORTS]; - struct mbuf_table tx_mbufs[MAX_PORTS]; lookup_struct_t * ipv4_lookup_struct; #if (APP_LOOKUP_METHOD == APP_LOOKUP_LPM) lookup6_struct_t * ipv6_lookup_struct; @@ -393,54 +409,75 @@ struct lcore_conf { #endif struct ipv4_frag_tbl *frag_tbl[MAX_RX_QUEUE_PER_LCORE]; struct rte_mempool *pool[MAX_RX_QUEUE_PER_LCORE]; + struct ipv4_frag_death_row death_row; + struct mbuf_table *tx_mbufs[MAX_PORTS]; + struct tx_lcore_stat tx_stat; } __rte_cache_aligned; static struct lcore_conf lcore_conf[RTE_MAX_LCORE]; -/* Send burst of packets on an output interface */ -static inline int -send_burst(struct lcore_conf *qconf, uint16_t n, uint8_t port) +/* + * If number of queued packets reached given threahold, then + * send burst of packets on an output interface. + */ +static inline uint32_t +send_burst(struct lcore_conf *qconf, uint32_t thresh, uint8_t port) { - struct rte_mbuf **m_table; - int ret; - uint16_t queueid; + uint32_t fill, len, k, n; + struct mbuf_table *txmb; + + txmb = qconf->tx_mbufs[port]; + len = txmb->len; + + if ((int32_t)(fill = txmb->head - txmb->tail) < 0) + fill += len; + + if (fill >= thresh) { + n = RTE_MIN(len - txmb->tail, fill); + + k = rte_eth_tx_burst(port, qconf->tx_queue_id[port], + txmb->m_table + txmb->tail, (uint16_t)n); - queueid = qconf->tx_queue_id[port]; - m_table = (struct rte_mbuf **)qconf->tx_mbufs[port].m_table; + TX_LCORE_STAT_UPDATE(&qconf->tx_stat, call, 1); + TX_LCORE_STAT_UPDATE(&qconf->tx_stat, send, k); - ret = rte_eth_tx_burst(port, queueid, m_table, n); - if (unlikely(ret < n)) { - do { - rte_pktmbuf_free(m_table[ret]); - } while (++ret < n); + fill -= k; + if ((txmb->tail += k) == len) + txmb->tail = 0; } - return 0; + return (fill); } /* Enqueue a single packet, and send burst if queue is filled */ static inline int send_single_packet(struct rte_mbuf *m, uint8_t port) { - uint32_t lcore_id; - uint16_t len; + uint32_t fill, lcore_id, len; struct lcore_conf *qconf; + struct mbuf_table *txmb; lcore_id = rte_lcore_id(); - qconf = &lcore_conf[lcore_id]; - len = qconf->tx_mbufs[port].len; - qconf->tx_mbufs[port].m_table[len] = m; - len++; - - /* enough pkts to be sent */ - if (unlikely(len == MAX_PKT_BURST)) { - send_burst(qconf, MAX_PKT_BURST, port); - len = 0; + + txmb = qconf->tx_mbufs[port]; + len = txmb->len; + + fill = send_burst(qconf, MAX_PKT_BURST, port); + + if (fill == len - 1) { + TX_LCORE_STAT_UPDATE(&qconf->tx_stat, drop, 1); + rte_pktmbuf_free(txmb->m_table[txmb->tail]); + if (++txmb->tail == len) + txmb->tail = 0; } + + TX_LCORE_STAT_UPDATE(&qconf->tx_stat, queue, 1); + txmb->m_table[txmb->head] = m; + if(++txmb->head == len) + txmb->head = 0; - qconf->tx_mbufs[port].len = len; - return 0; + return (0); } #ifdef DO_RFC_1812_CHECKS @@ -637,15 +674,17 @@ l3fwd_simple_forward(struct rte_mbuf *m, uint8_t portid, uint32_t queue, struct rte_mbuf *mo; struct ipv4_frag_tbl *tbl; + struct ipv4_frag_death_row *dr; tbl = qconf->frag_tbl[queue]; + dr = &qconf->death_row; /* prepare mbuf: setup l2_len/l3_len. */ m->pkt.vlan_macip.f.l2_len = sizeof(*eth_hdr); m->pkt.vlan_macip.f.l3_len = sizeof(*ipv4_hdr); /* process this fragment. */ - if ((mo = ipv4_frag_mbuf(tbl, m, tms, ipv4_hdr, + if ((mo = ipv4_frag_mbuf(tbl, dr, m, tms, ipv4_hdr, ip_ofs, ip_flag)) == NULL) /* no packet to send out. */ return; @@ -659,8 +698,10 @@ l3fwd_simple_forward(struct rte_mbuf *m, uint8_t portid, uint32_t queue, } } - dst_port = get_ipv4_dst_port(ipv4_hdr, portid, qconf->ipv4_lookup_struct); - if (dst_port >= MAX_PORTS || (enabled_port_mask & 1 << dst_port) == 0) + dst_port = get_ipv4_dst_port(ipv4_hdr, portid, + qconf->ipv4_lookup_struct); + if (dst_port >= MAX_PORTS || + (enabled_port_mask & 1 << dst_port) == 0) dst_port = portid; /* 02:00:00:00:00:xx */ @@ -743,12 +784,8 @@ main_loop(__attribute__((unused)) void *dummy) * portid), but it is not called so often */ for (portid = 0; portid < MAX_PORTS; portid++) { - if (qconf->tx_mbufs[portid].len == 0) - continue; - send_burst(&lcore_conf[lcore_id], - qconf->tx_mbufs[portid].len, - portid); - qconf->tx_mbufs[portid].len = 0; + if ((enabled_port_mask & (1 << portid)) != 0) + send_burst(qconf, 1, portid); } prev_tsc = cur_tsc; @@ -784,6 +821,9 @@ main_loop(__attribute__((unused)) void *dummy) l3fwd_simple_forward(pkts_burst[j], portid, i, qconf, cur_tsc); } + + ipv4_frag_free_death_row(&qconf->death_row, + PREFETCH_OFFSET); } } } @@ -1384,9 +1424,29 @@ check_all_ports_link_status(uint8_t port_num, uint32_t port_mask) } } } +static void +setup_port_tbl(struct lcore_conf *qconf, uint32_t lcore, int socket, + uint32_t port) +{ + struct mbuf_table *mtb; + uint32_t n; + size_t sz; + + n = RTE_MAX(max_flow_num, 2UL * MAX_PKT_BURST); + sz = sizeof (*mtb) + sizeof (mtb->m_table[0]) * n; + + if ((mtb = rte_zmalloc_socket(__func__, sz, CACHE_LINE_SIZE, + socket)) == NULL) + rte_exit(EXIT_FAILURE, "%s() for lcore: %u, port: %u " + "failed to allocate %zu bytes\n", + __func__, lcore, port, sz); + + mtb->len = n; + qconf->tx_mbufs[port] = mtb; +} static void -setup_queue_frag_tbl(struct lcore_conf *qconf, uint32_t lcore, int socket, +setup_queue_tbl(struct lcore_conf *qconf, uint32_t lcore, int socket, uint32_t queue) { uint32_t nb_mbuf; @@ -1403,10 +1463,16 @@ setup_queue_frag_tbl(struct lcore_conf *qconf, uint32_t lcore, int socket, "lcore: %u for queue: %u failed\n", max_flow_num, lcore, queue); - nb_mbuf = max_flow_num * MAX_FRAG_NUM; + /* + * At any given moment up to + * mbufs could be stored int the fragment table. + * Plus, each TX queue can hold up to packets. + */ + + nb_mbuf = 2 * RTE_MAX(max_flow_num, 2UL * MAX_PKT_BURST) * MAX_FRAG_NUM; nb_mbuf *= (port_conf.rxmode.max_rx_pkt_len + BUF_SIZE - 1) / BUF_SIZE; - nb_mbuf += RTE_TEST_RX_DESC_DEFAULT + MAX_PKT_BURST + - RTE_TEST_TX_DESC_DEFAULT; + nb_mbuf += RTE_TEST_RX_DESC_DEFAULT + RTE_TEST_TX_DESC_DEFAULT; + nb_mbuf = RTE_MAX(nb_mbuf, (uint32_t)DEF_MBUF_NUM); rte_snprintf(buf, sizeof(buf), "mbuf_pool_%u_%u", lcore, queue); @@ -1419,7 +1485,7 @@ setup_queue_frag_tbl(struct lcore_conf *qconf, uint32_t lcore, int socket, } static void -queue_frag_tbl_dump_stat(void) +queue_dump_stat(void) { uint32_t i, lcore; const struct lcore_conf *qconf; @@ -1436,6 +1502,14 @@ queue_frag_tbl_dump_stat(void) lcore, qconf->rx_queue_list[i].port_id, qconf->rx_queue_list[i].queue_id); ipv4_frag_tbl_dump_stat(stdout, qconf->frag_tbl[i]); + fprintf(stdout, "TX bursts:\t%" PRIu64 "\n" + "TX packets _queued:\t%" PRIu64 "\n" + "TX packets dropped:\t%" PRIu64 "\n" + "TX packets send:\t%" PRIu64 "\n", + qconf->tx_stat.call, + qconf->tx_stat.queue, + qconf->tx_stat.drop, + qconf->tx_stat.send); } } } @@ -1443,7 +1517,7 @@ queue_frag_tbl_dump_stat(void) static void signal_handler(int signum) { - queue_frag_tbl_dump_stat(); + queue_dump_stat(); if (signum != SIGUSR1) rte_exit(0, "received signal: %d, exiting\n", signum); } @@ -1549,6 +1623,7 @@ MAIN(int argc, char **argv) qconf = &lcore_conf[lcore_id]; qconf->tx_queue_id[portid] = queueid; + setup_port_tbl(qconf, lcore_id, socketid, portid); queueid++; } printf("\n"); @@ -1573,7 +1648,7 @@ MAIN(int argc, char **argv) printf("rxq=%d,%d,%d ", portid, queueid, socketid); fflush(stdout); - setup_queue_frag_tbl(qconf, lcore_id, socketid, queue); + setup_queue_tbl(qconf, lcore_id, socketid, queue); ret = rte_eth_rx_queue_setup(portid, queueid, nb_rxd, socketid, &rx_conf, qconf->pool[queue]);