}
static inline void
-ipv4_frag_tbl_del(struct ipv4_frag_tbl *tbl, struct ipv4_frag_pkt *fp)
+ipv4_frag_tbl_del(struct ipv4_frag_tbl *tbl, struct ipv4_frag_death_row *dr,
+ struct ipv4_frag_pkt *fp)
{
- ipv4_frag_free(fp);
+ ipv4_frag_free(fp, dr);
IPV4_FRAG_KEY_INVALIDATE(&fp->key);
TAILQ_REMOVE(&tbl->lru, fp, lru);
tbl->use_entries--;
}
static inline void
-ipv4_frag_tbl_reuse(struct ipv4_frag_tbl *tbl, struct ipv4_frag_pkt *fp,
- uint64_t tms)
+ipv4_frag_tbl_reuse(struct ipv4_frag_tbl *tbl, struct ipv4_frag_death_row *dr,
+ struct ipv4_frag_pkt *fp, uint64_t tms)
{
- ipv4_frag_free(fp);
+ ipv4_frag_free(fp, dr);
ipv4_frag_reset(fp, tms);
TAILQ_REMOVE(&tbl->lru, fp, lru);
TAILQ_INSERT_TAIL(&tbl->lru, fp, lru);
* If the entry is stale, then free and reuse it.
*/
static inline struct ipv4_frag_pkt *
-ipv4_frag_find(struct ipv4_frag_tbl *tbl, const struct ipv4_frag_key *key,
- uint64_t tms)
+ipv4_frag_find(struct ipv4_frag_tbl *tbl, struct ipv4_frag_death_row *dr,
+ const struct ipv4_frag_key *key, uint64_t tms)
{
struct ipv4_frag_pkt *pkt, *free, *stale, *lru;
uint64_t max_cycles;
/*timed-out entry, free and invalidate it*/
if (stale != NULL) {
- ipv4_frag_tbl_del(tbl, stale);
+ ipv4_frag_tbl_del(tbl, dr, stale);
free = stale;
/*
tbl->max_entries <= tbl->use_entries) {
lru = TAILQ_FIRST(&tbl->lru);
if (max_cycles + lru->start < tms) {
- ipv4_frag_tbl_del(tbl, lru);
+ ipv4_frag_tbl_del(tbl, dr, lru);
} else {
free = NULL;
IPV4_FRAG_TBL_STAT_UPDATE(&tbl->stat,
* and reuse it.
*/
} else if (max_cycles + pkt->start < tms) {
- ipv4_frag_tbl_reuse(tbl, pkt, tms);
+ ipv4_frag_tbl_reuse(tbl, dr, pkt, tms);
}
IPV4_FRAG_TBL_STAT_UPDATE(&tbl->stat, fail_total, (pkt == NULL));
#define IPV4_FRAG_KEY_CMP(k1, k2) \
(((k1)->src_dst ^ (k2)->src_dst) | ((k1)->id ^ (k2)->id))
-
+
/*
* Fragmented packet to reassemble.
struct ipv4_frag frags[MAX_FRAG_NUM];
} __rte_cache_aligned;
+
+struct ipv4_frag_death_row {
+ uint32_t cnt;
+ struct rte_mbuf *row[MAX_PKT_BURST * (MAX_FRAG_NUM + 1)];
+};
+
+#define IPV4_FRAG_MBUF2DR(dr, mb) ((dr)->row[(dr)->cnt++] = (mb))
+
/* logging macros. */
#ifdef IPV4_FRAG_DEBUG
}
static inline void
-ipv4_frag_free(struct ipv4_frag_pkt *fp)
+ipv4_frag_free(struct ipv4_frag_pkt *fp, struct ipv4_frag_death_row *dr)
{
- uint32_t i;
+ uint32_t i, k;
+ k = dr->cnt;
for (i = 0; i != fp->last_idx; i++) {
if (fp->frags[i].mb != NULL) {
- rte_pktmbuf_free(fp->frags[i].mb);
+ dr->row[k++] = fp->frags[i].mb;
fp->frags[i].mb = NULL;
}
}
fp->last_idx = 0;
+ dr->cnt = k;
+}
+
+static inline void
+ipv4_frag_free_death_row(struct ipv4_frag_death_row *dr, uint32_t prefetch)
+{
+ uint32_t i, k, n;
+
+ k = RTE_MIN(prefetch, dr->cnt);
+ n = dr->cnt;
+
+ for (i = 0; i != k; i++)
+ rte_prefetch0(dr->row[i]);
+
+ for (i = 0; i != n - k; i++) {
+ rte_prefetch0(dr->row[i + k]);
+ rte_pktmbuf_free(dr->row[i]);
+ }
+
+ for (; i != n; i++)
+ rte_pktmbuf_free(dr->row[i]);
+
+ dr->cnt = 0;
}
/*
}
static inline struct rte_mbuf *
-ipv4_frag_process(struct ipv4_frag_pkt *fp, struct rte_mbuf *mb,
- uint16_t ofs, uint16_t len, uint16_t more_frags)
+ipv4_frag_process(struct ipv4_frag_pkt *fp, struct ipv4_frag_death_row *dr,
+ struct rte_mbuf *mb, uint16_t ofs, uint16_t len, uint16_t more_frags)
{
uint32_t idx;
fp->frags[LAST_FRAG_IDX].len);
/* free all fragments, invalidate the entry. */
- ipv4_frag_free(fp);
+ ipv4_frag_free(fp, dr);
IPV4_FRAG_KEY_INVALIDATE(&fp->key);
- rte_pktmbuf_free(mb);
+ IPV4_FRAG_MBUF2DR(dr, mb);
return (NULL);
}
fp->frags[LAST_FRAG_IDX].len);
/* free associated resources. */
- ipv4_frag_free(fp);
+ ipv4_frag_free(fp, dr);
}
/* we are done with that entry, invalidate it. */
* - not all fragments of the packet are collected yet.
*/
static inline struct rte_mbuf *
-ipv4_frag_mbuf(struct ipv4_frag_tbl *tbl, struct rte_mbuf *mb, uint64_t tms,
- struct ipv4_hdr *ip_hdr, uint16_t ip_ofs, uint16_t ip_flag)
+ipv4_frag_mbuf(struct ipv4_frag_tbl *tbl, struct ipv4_frag_death_row *dr,
+ struct rte_mbuf *mb, uint64_t tms, struct ipv4_hdr *ip_hdr,
+ uint16_t ip_ofs, uint16_t ip_flag)
{
struct ipv4_frag_pkt *fp;
struct ipv4_frag_key key;
tbl->use_entries);
/* try to find/add entry into the fragment's table. */
- if ((fp = ipv4_frag_find(tbl, &key, tms)) == NULL) {
- rte_pktmbuf_free(mb);
+ if ((fp = ipv4_frag_find(tbl, dr, &key, tms)) == NULL) {
+ IPV4_FRAG_MBUF2DR(dr, mb);
return (NULL);
}
/* process the fragmented packet. */
- mb = ipv4_frag_process(fp, mb, ip_ofs, ip_len, ip_flag);
+ mb = ipv4_frag_process(fp, dr, mb, ip_ofs, ip_len, ip_flag);
ipv4_frag_inuse(tbl, fp);
IPV4_FRAG_LOG(DEBUG, "%s:%d:\n"
#include <rte_tcp.h>
#include <rte_udp.h>
#include <rte_string_fns.h>
-
#include "main.h"
-#include "ipv4_rsmbl.h"
#define APP_LOOKUP_EXACT_MATCH 0
#define APP_LOOKUP_LPM 1
#error "APP_LOOKUP_METHOD set to incorrect value"
#endif
+#define MAX_PKT_BURST 32
+
+#include "ipv4_rsmbl.h"
+
#ifndef IPv6_BYTES
#define IPv6_BYTES_FMT "%02x%02x:%02x%02x:%02x%02x:%02x%02x:"\
"%02x%02x:%02x%02x:%02x%02x:%02x%02x"
#define TX_HTHRESH 0 /**< Default values of TX host threshold reg. */
#define TX_WTHRESH 0 /**< Default values of TX write-back threshold reg. */
-#define MAX_PKT_BURST 32
#define BURST_TX_DRAIN_US 100 /* TX drain every ~100us */
#define NB_SOCKETS 8
*/
#define RTE_TEST_RX_DESC_DEFAULT 128
#define RTE_TEST_TX_DESC_DEFAULT 512
+
static uint16_t nb_rxd = RTE_TEST_RX_DESC_DEFAULT;
static uint16_t nb_txd = RTE_TEST_TX_DESC_DEFAULT;
static int numa_on = 1; /**< NUMA is enabled by default. */
struct mbuf_table {
- uint16_t len;
- struct rte_mbuf *m_table[MAX_PKT_BURST];
+ uint32_t len;
+ uint32_t head;
+ uint32_t tail;
+ struct rte_mbuf *m_table[0];
};
struct lcore_rx_queue {
static lookup6_struct_t *ipv6_l3fwd_lookup_struct[NB_SOCKETS];
#endif
+struct tx_lcore_stat {
+ uint64_t call;
+ uint64_t drop;
+ uint64_t queue;
+ uint64_t send;
+};
+
+#ifdef IPV4_FRAG_TBL_STAT
+#define TX_LCORE_STAT_UPDATE(s, f, v) ((s)->f += (v))
+#else
+#define TX_LCORE_STAT_UPDATE(s, f, v) do {} while (0)
+#endif /* IPV4_FRAG_TBL_STAT */
+
struct lcore_conf {
uint16_t n_rx_queue;
struct lcore_rx_queue rx_queue_list[MAX_RX_QUEUE_PER_LCORE];
uint16_t tx_queue_id[MAX_PORTS];
- struct mbuf_table tx_mbufs[MAX_PORTS];
lookup_struct_t * ipv4_lookup_struct;
#if (APP_LOOKUP_METHOD == APP_LOOKUP_LPM)
lookup6_struct_t * ipv6_lookup_struct;
#endif
struct ipv4_frag_tbl *frag_tbl[MAX_RX_QUEUE_PER_LCORE];
struct rte_mempool *pool[MAX_RX_QUEUE_PER_LCORE];
+ struct ipv4_frag_death_row death_row;
+ struct mbuf_table *tx_mbufs[MAX_PORTS];
+ struct tx_lcore_stat tx_stat;
} __rte_cache_aligned;
static struct lcore_conf lcore_conf[RTE_MAX_LCORE];
-/* Send burst of packets on an output interface */
-static inline int
-send_burst(struct lcore_conf *qconf, uint16_t n, uint8_t port)
+/*
+ * If number of queued packets reached given threahold, then
+ * send burst of packets on an output interface.
+ */
+static inline uint32_t
+send_burst(struct lcore_conf *qconf, uint32_t thresh, uint8_t port)
{
- struct rte_mbuf **m_table;
- int ret;
- uint16_t queueid;
+ uint32_t fill, len, k, n;
+ struct mbuf_table *txmb;
+
+ txmb = qconf->tx_mbufs[port];
+ len = txmb->len;
+
+ if ((int32_t)(fill = txmb->head - txmb->tail) < 0)
+ fill += len;
+
+ if (fill >= thresh) {
+ n = RTE_MIN(len - txmb->tail, fill);
+
+ k = rte_eth_tx_burst(port, qconf->tx_queue_id[port],
+ txmb->m_table + txmb->tail, (uint16_t)n);
- queueid = qconf->tx_queue_id[port];
- m_table = (struct rte_mbuf **)qconf->tx_mbufs[port].m_table;
+ TX_LCORE_STAT_UPDATE(&qconf->tx_stat, call, 1);
+ TX_LCORE_STAT_UPDATE(&qconf->tx_stat, send, k);
- ret = rte_eth_tx_burst(port, queueid, m_table, n);
- if (unlikely(ret < n)) {
- do {
- rte_pktmbuf_free(m_table[ret]);
- } while (++ret < n);
+ fill -= k;
+ if ((txmb->tail += k) == len)
+ txmb->tail = 0;
}
- return 0;
+ return (fill);
}
/* Enqueue a single packet, and send burst if queue is filled */
static inline int
send_single_packet(struct rte_mbuf *m, uint8_t port)
{
- uint32_t lcore_id;
- uint16_t len;
+ uint32_t fill, lcore_id, len;
struct lcore_conf *qconf;
+ struct mbuf_table *txmb;
lcore_id = rte_lcore_id();
-
qconf = &lcore_conf[lcore_id];
- len = qconf->tx_mbufs[port].len;
- qconf->tx_mbufs[port].m_table[len] = m;
- len++;
-
- /* enough pkts to be sent */
- if (unlikely(len == MAX_PKT_BURST)) {
- send_burst(qconf, MAX_PKT_BURST, port);
- len = 0;
+
+ txmb = qconf->tx_mbufs[port];
+ len = txmb->len;
+
+ fill = send_burst(qconf, MAX_PKT_BURST, port);
+
+ if (fill == len - 1) {
+ TX_LCORE_STAT_UPDATE(&qconf->tx_stat, drop, 1);
+ rte_pktmbuf_free(txmb->m_table[txmb->tail]);
+ if (++txmb->tail == len)
+ txmb->tail = 0;
}
+
+ TX_LCORE_STAT_UPDATE(&qconf->tx_stat, queue, 1);
+ txmb->m_table[txmb->head] = m;
+ if(++txmb->head == len)
+ txmb->head = 0;
- qconf->tx_mbufs[port].len = len;
- return 0;
+ return (0);
}
#ifdef DO_RFC_1812_CHECKS
struct rte_mbuf *mo;
struct ipv4_frag_tbl *tbl;
+ struct ipv4_frag_death_row *dr;
tbl = qconf->frag_tbl[queue];
+ dr = &qconf->death_row;
/* prepare mbuf: setup l2_len/l3_len. */
m->pkt.vlan_macip.f.l2_len = sizeof(*eth_hdr);
m->pkt.vlan_macip.f.l3_len = sizeof(*ipv4_hdr);
/* process this fragment. */
- if ((mo = ipv4_frag_mbuf(tbl, m, tms, ipv4_hdr,
+ if ((mo = ipv4_frag_mbuf(tbl, dr, m, tms, ipv4_hdr,
ip_ofs, ip_flag)) == NULL)
/* no packet to send out. */
return;
}
}
- dst_port = get_ipv4_dst_port(ipv4_hdr, portid, qconf->ipv4_lookup_struct);
- if (dst_port >= MAX_PORTS || (enabled_port_mask & 1 << dst_port) == 0)
+ dst_port = get_ipv4_dst_port(ipv4_hdr, portid,
+ qconf->ipv4_lookup_struct);
+ if (dst_port >= MAX_PORTS ||
+ (enabled_port_mask & 1 << dst_port) == 0)
dst_port = portid;
/* 02:00:00:00:00:xx */
* portid), but it is not called so often
*/
for (portid = 0; portid < MAX_PORTS; portid++) {
- if (qconf->tx_mbufs[portid].len == 0)
- continue;
- send_burst(&lcore_conf[lcore_id],
- qconf->tx_mbufs[portid].len,
- portid);
- qconf->tx_mbufs[portid].len = 0;
+ if ((enabled_port_mask & (1 << portid)) != 0)
+ send_burst(qconf, 1, portid);
}
prev_tsc = cur_tsc;
l3fwd_simple_forward(pkts_burst[j], portid,
i, qconf, cur_tsc);
}
+
+ ipv4_frag_free_death_row(&qconf->death_row,
+ PREFETCH_OFFSET);
}
}
}
}
}
}
+static void
+setup_port_tbl(struct lcore_conf *qconf, uint32_t lcore, int socket,
+ uint32_t port)
+{
+ struct mbuf_table *mtb;
+ uint32_t n;
+ size_t sz;
+
+ n = RTE_MAX(max_flow_num, 2UL * MAX_PKT_BURST);
+ sz = sizeof (*mtb) + sizeof (mtb->m_table[0]) * n;
+
+ if ((mtb = rte_zmalloc_socket(__func__, sz, CACHE_LINE_SIZE,
+ socket)) == NULL)
+ rte_exit(EXIT_FAILURE, "%s() for lcore: %u, port: %u "
+ "failed to allocate %zu bytes\n",
+ __func__, lcore, port, sz);
+
+ mtb->len = n;
+ qconf->tx_mbufs[port] = mtb;
+}
static void
-setup_queue_frag_tbl(struct lcore_conf *qconf, uint32_t lcore, int socket,
+setup_queue_tbl(struct lcore_conf *qconf, uint32_t lcore, int socket,
uint32_t queue)
{
uint32_t nb_mbuf;
"lcore: %u for queue: %u failed\n",
max_flow_num, lcore, queue);
- nb_mbuf = max_flow_num * MAX_FRAG_NUM;
+ /*
+ * At any given moment up to <max_flow_num * (MAX_FRAG_NUM - 1)>
+ * mbufs could be stored int the fragment table.
+ * Plus, each TX queue can hold up to <max_flow_num> packets.
+ */
+
+ nb_mbuf = 2 * RTE_MAX(max_flow_num, 2UL * MAX_PKT_BURST) * MAX_FRAG_NUM;
nb_mbuf *= (port_conf.rxmode.max_rx_pkt_len + BUF_SIZE - 1) / BUF_SIZE;
- nb_mbuf += RTE_TEST_RX_DESC_DEFAULT + MAX_PKT_BURST +
- RTE_TEST_TX_DESC_DEFAULT;
+ nb_mbuf += RTE_TEST_RX_DESC_DEFAULT + RTE_TEST_TX_DESC_DEFAULT;
+
nb_mbuf = RTE_MAX(nb_mbuf, (uint32_t)DEF_MBUF_NUM);
rte_snprintf(buf, sizeof(buf), "mbuf_pool_%u_%u", lcore, queue);
}
static void
-queue_frag_tbl_dump_stat(void)
+queue_dump_stat(void)
{
uint32_t i, lcore;
const struct lcore_conf *qconf;
lcore, qconf->rx_queue_list[i].port_id,
qconf->rx_queue_list[i].queue_id);
ipv4_frag_tbl_dump_stat(stdout, qconf->frag_tbl[i]);
+ fprintf(stdout, "TX bursts:\t%" PRIu64 "\n"
+ "TX packets _queued:\t%" PRIu64 "\n"
+ "TX packets dropped:\t%" PRIu64 "\n"
+ "TX packets send:\t%" PRIu64 "\n",
+ qconf->tx_stat.call,
+ qconf->tx_stat.queue,
+ qconf->tx_stat.drop,
+ qconf->tx_stat.send);
}
}
}
static void
signal_handler(int signum)
{
- queue_frag_tbl_dump_stat();
+ queue_dump_stat();
if (signum != SIGUSR1)
rte_exit(0, "received signal: %d, exiting\n", signum);
}
qconf = &lcore_conf[lcore_id];
qconf->tx_queue_id[portid] = queueid;
+ setup_port_tbl(qconf, lcore_id, socketid, portid);
queueid++;
}
printf("\n");
printf("rxq=%d,%d,%d ", portid, queueid, socketid);
fflush(stdout);
- setup_queue_frag_tbl(qconf, lcore_id, socketid, queue);
+ setup_queue_tbl(qconf, lcore_id, socketid, queue);
ret = rte_eth_rx_queue_setup(portid, queueid, nb_rxd,
socketid, &rx_conf, qconf->pool[queue]);