#include <rte_debug.h>
#include <rte_prefetch.h>
#include <rte_distributor.h>
+#include <rte_pause.h>
-#define RX_RING_SIZE 256
+#define RX_RING_SIZE 512
#define TX_RING_SIZE 512
#define NUM_MBUFS ((64*1024)-1)
-#define MBUF_CACHE_SIZE 250
-#define BURST_SIZE 32
+#define MBUF_CACHE_SIZE 128
+#define BURST_SIZE 64
#define SCHED_RX_RING_SZ 8192
#define SCHED_TX_RING_SZ 65536
-#define RTE_RING_SZ 1024
+#define BURST_SIZE_TX 32
#define RTE_LOGTYPE_DISTRAPP RTE_LOGTYPE_USER1
const uint8_t nb_ports = rte_eth_dev_count();
const int socket_id = rte_socket_id();
uint8_t port;
+ struct rte_mbuf *bufs[BURST_SIZE*2];
for (port = 0; port < nb_ports; port++) {
/* skip ports that are not enabled */
port = 0;
continue;
}
- struct rte_mbuf *bufs[BURST_SIZE*2];
const uint16_t nb_rx = rte_eth_rx_burst(port, 0, bufs,
BURST_SIZE);
if (unlikely(nb_rx == 0)) {
struct rte_ring *tx_ring = p->dist_tx_ring;
uint16_t sent = rte_ring_enqueue_burst(tx_ring,
- (void *)bufs, nb_ret);
+ (void *)bufs, nb_ret, NULL);
#else
uint16_t nb_ret = nb_rx;
/*
/* struct rte_ring *out_ring = p->dist_tx_ring; */
uint16_t sent = rte_ring_enqueue_burst(out_ring,
- (void *)bufs, nb_ret);
+ (void *)bufs, nb_ret, NULL);
#endif
app_stats.rx.enqueued_pkts += sent;
if (unlikely(sent < nb_ret)) {
+ app_stats.rx.enqdrop_pkts += nb_ret - sent;
RTE_LOG_DP(DEBUG, DISTRAPP,
"%s:Packet loss due to full ring\n", __func__);
while (sent < nb_ret)
static inline void
flush_one_port(struct output_buffer *outbuf, uint8_t outp)
{
- unsigned nb_tx = rte_eth_tx_burst(outp, 0, outbuf->mbufs,
- outbuf->count);
- app_stats.tx.tx_pkts += nb_tx;
+ unsigned int nb_tx = rte_eth_tx_burst(outp, 0,
+ outbuf->mbufs, outbuf->count);
+ app_stats.tx.tx_pkts += outbuf->count;
if (unlikely(nb_tx < outbuf->count)) {
- RTE_LOG_DP(DEBUG, DISTRAPP,
- "%s:Packet loss with tx_burst\n", __func__);
+ app_stats.tx.enqdrop_pkts += outbuf->count - nb_tx;
do {
rte_pktmbuf_free(outbuf->mbufs[nb_tx]);
} while (++nb_tx < outbuf->count);
flush_all_ports(struct output_buffer *tx_buffers, uint8_t nb_ports)
{
uint8_t outp;
+
for (outp = 0; outp < nb_ports; outp++) {
/* skip ports that are not enabled */
if ((enabled_port_mask & (1 << outp)) == 0)
printf("\nCore %u acting as distributor core.\n", rte_lcore_id());
while (!quit_signal_dist) {
const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
- (void *)bufs, BURST_SIZE*1);
+ (void *)bufs, BURST_SIZE*1, NULL);
if (nb_rx) {
app_stats.dist.in_pkts += nb_rx;
app_stats.dist.ret_pkts += nb_ret;
uint16_t sent = rte_ring_enqueue_burst(out_r,
- (void *)bufs, nb_ret);
+ (void *)bufs, nb_ret, NULL);
app_stats.dist.sent_pkts += sent;
if (unlikely(sent < nb_ret)) {
app_stats.dist.enqdrop_pkts += nb_ret - sent;
if ((enabled_port_mask & (1 << port)) == 0)
continue;
- struct rte_mbuf *bufs[BURST_SIZE];
+ struct rte_mbuf *bufs[BURST_SIZE_TX];
const uint16_t nb_rx = rte_ring_dequeue_burst(in_r,
- (void *)bufs, BURST_SIZE);
+ (void *)bufs, BURST_SIZE_TX, NULL);
app_stats.tx.dequeue_pkts += nb_rx;
/* if we get no traffic, flush anything we have */
outbuf = &tx_buffers[outp];
outbuf->mbufs[outbuf->count++] = bufs[i];
- if (outbuf->count == BURST_SIZE)
+ if (outbuf->count == BURST_SIZE_TX)
flush_one_port(outbuf, outp);
}
}
}
+ printf("\nCore %u exiting tx task.\n", rte_lcore_id());
return 0;
}
for (i = 0; i < 8; i++)
buf[i] = NULL;
+ app_stats.worker_pkts[p->worker_id] = 1;
+
printf("\nCore %u acting as worker core.\n", rte_lcore_id());
while (!quit_signal_work) {
num = rte_distributor_get_pkt(d, id, buf, buf, num);
rte_pause();
buf[i]->port ^= xor_val;
}
+
+ app_stats.worker_pkts[p->worker_id] += num;
+ if (num > 0)
+ app_stats.worker_bursts[p->worker_id][num-1]++;
}
return 0;
}
if (ret < 0)
rte_exit(EXIT_FAILURE, "Invalid distributor parameters\n");
- if (rte_lcore_count() < 4)
+ if (rte_lcore_count() < 5)
rte_exit(EXIT_FAILURE, "Error, This application needs at "
- "least 4 logical cores to run:\n"
+ "least 5 logical cores to run:\n"
+ "1 lcore for stats (can be core 0)\n"
"1 lcore for packet RX\n"
"1 lcore for distribution\n"
"1 lcore for packet TX\n"
}
d = rte_distributor_create("PKT_DIST", rte_socket_id(),
- rte_lcore_count() - 3,
+ rte_lcore_count() - 4,
RTE_DIST_ALG_BURST);
if (d == NULL)
rte_exit(EXIT_FAILURE, "Cannot create distributor\n");
/* tx core */
rte_eal_remote_launch((lcore_function_t *)lcore_tx,
dist_tx_ring, lcore_id);
+ } else if (worker_id == rte_lcore_count() - 2) {
+ printf("Starting rx on worker_id %d, lcore_id %d\n",
+ worker_id, lcore_id);
+ /* rx core */
+ struct lcore_params *p =
+ rte_malloc(NULL, sizeof(*p), 0);
+ if (!p)
+ rte_panic("malloc failure\n");
+ *p = (struct lcore_params){worker_id, d, rx_dist_ring,
+ dist_tx_ring, mbuf_pool};
+ rte_eal_remote_launch((lcore_function_t *)lcore_rx,
+ p, lcore_id);
} else {
+ printf("Starting worker on worker_id %d, lcore_id %d\n",
+ worker_id, lcore_id);
struct lcore_params *p =
rte_malloc(NULL, sizeof(*p), 0);
if (!p)
}
worker_id++;
}
- /* call lcore_main on master core only */
- struct lcore_params p = { 0, d, rx_dist_ring, dist_tx_ring, mbuf_pool};
-
- if (lcore_rx(&p) != 0)
- return -1;
freq = rte_get_timer_hz();
t = rte_rdtsc() + freq;