worker_event_enqueue(const uint8_t dev, const uint8_t port,
struct rte_event *ev)
{
- while (rte_event_enqueue_burst(dev, port, ev, 1) != 1)
+ while (!rte_event_enqueue_burst(dev, port, ev, 1) && !fdata->done)
rte_pause();
}
-static __rte_always_inline void
+static __rte_always_inline uint16_t
worker_event_enqueue_burst(const uint8_t dev, const uint8_t port,
- struct rte_event *ev, const uint16_t nb_rx)
+ struct rte_event *ev, const uint16_t nb_rx)
{
uint16_t enq;
enq = rte_event_enqueue_burst(dev, port, ev, nb_rx);
- while (enq < nb_rx) {
+ while (enq < nb_rx && !fdata->done)
enq += rte_event_enqueue_burst(dev, port,
ev + enq, nb_rx - enq);
- }
+
+ return enq;
}
static __rte_always_inline void
{
exchange_mac(ev->mbuf);
rte_event_eth_tx_adapter_txq_set(ev->mbuf, 0);
- while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0))
+ while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0) &&
+ !fdata->done)
rte_pause();
}
}
}
+ if (ev.u64) {
+ ev.op = RTE_EVENT_OP_RELEASE;
+ rte_event_enqueue_burst(dev, port, &ev, 1);
+ }
+
if (!cdata.quiet)
printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
rte_lcore_id(), received, fwd, tx);
}
}
+ if (ev.u64) {
+ ev.op = RTE_EVENT_OP_RELEASE;
+ rte_event_enqueue_burst(dev, port, &ev, 1);
+ }
+
if (!cdata.quiet)
printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
rte_lcore_id(), received, fwd, tx);
const uint8_t dev = data->dev_id;
const uint8_t port = data->port_id;
size_t fwd = 0, received = 0, tx = 0;
+ uint16_t nb_tx = 0, nb_rx = 0, i;
while (!fdata->done) {
- uint16_t i;
- uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
- BATCH_SIZE, 0);
+ nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0);
if (!nb_rx) {
rte_pause();
work();
}
- worker_event_enqueue_burst(dev, port, ev, nb_rx);
- fwd += nb_rx;
+ nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx);
+ fwd += nb_tx;
}
+ worker_cleanup(dev, port, ev, nb_tx, nb_rx);
+
if (!cdata.quiet)
printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
rte_lcore_id(), received, fwd, tx);
const uint8_t dev = data->dev_id;
const uint8_t port = data->port_id;
size_t fwd = 0, received = 0, tx = 0;
+ uint16_t i, nb_rx = 0, nb_tx = 0;
while (!fdata->done) {
- uint16_t i;
- uint16_t nb_rx = rte_event_dequeue_burst(dev, port, ev,
- BATCH_SIZE, 0);
+ nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0);
if (!nb_rx) {
rte_pause();
work();
}
- worker_event_enqueue_burst(dev, port, ev, nb_rx);
- fwd += nb_rx;
+ nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx);
+ fwd += nb_tx;
}
+ worker_cleanup(dev, port, ev, nb_tx, nb_rx);
+
if (!cdata.quiet)
printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
rte_lcore_id(), received, fwd, tx);
fwd++;
}
+ if (ev.u64) {
+ ev.op = RTE_EVENT_OP_RELEASE;
+ rte_event_enqueue_burst(dev, port, &ev, 1);
+ }
+
if (!cdata.quiet)
printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
rte_lcore_id(), received, fwd, tx);
fwd++;
}
+ if (ev.u64) {
+ ev.op = RTE_EVENT_OP_RELEASE;
+ rte_event_enqueue_burst(dev, port, &ev, 1);
+ }
+
if (!cdata.quiet)
printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
rte_lcore_id(), received, fwd, tx);
uint8_t port = data->port_id;
uint8_t lst_qid = cdata.num_stages - 1;
size_t fwd = 0, received = 0, tx = 0;
+ uint16_t i, nb_rx = 0, nb_tx = 0;
while (!fdata->done) {
- uint16_t i;
- const uint16_t nb_rx = rte_event_dequeue_burst(dev, port,
- ev, BATCH_SIZE, 0);
+ nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0);
if (nb_rx == 0) {
rte_pause();
}
work();
}
- worker_event_enqueue_burst(dev, port, ev, nb_rx);
- fwd += nb_rx;
+ nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx);
+ fwd += nb_tx;
}
+ worker_cleanup(dev, port, ev, nb_tx, nb_rx);
+
if (!cdata.quiet)
printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
rte_lcore_id(), received, fwd, tx);
uint8_t port = data->port_id;
uint8_t lst_qid = cdata.num_stages - 1;
size_t fwd = 0, received = 0, tx = 0;
+ uint16_t i, nb_rx = 0, nb_tx = 0;
while (!fdata->done) {
- uint16_t i;
-
- const uint16_t nb_rx = rte_event_dequeue_burst(dev, port,
- ev, BATCH_SIZE, 0);
+ nb_rx = rte_event_dequeue_burst(dev, port, ev, BATCH_SIZE, 0);
if (nb_rx == 0) {
rte_pause();
work();
}
- worker_event_enqueue_burst(dev, port, ev, nb_rx);
- fwd += nb_rx;
+ nb_tx = worker_event_enqueue_burst(dev, port, ev, nb_rx);
+ fwd += nb_tx;
}
+ worker_cleanup(dev, port, ev, nb_tx, nb_rx);
+
if (!cdata.quiet)
printf(" worker %u thread done. RX=%zu FWD=%zu TX=%zu\n",
rte_lcore_id(), received, fwd, tx);
struct rte_event_dev_config config = {
.nb_event_queues = nb_queues,
.nb_event_ports = nb_ports,
+ .nb_single_link_event_port_queues = 0,
.nb_events_limit = 4096,
.nb_event_queue_flows = 1024,
.nb_event_port_dequeue_depth = 128,
.dequeue_depth = cdata.worker_cq_depth,
.enqueue_depth = 64,
.new_event_threshold = 4096,
+ .event_port_cfg = RTE_EVENT_PORT_CFG_HINT_WORKER,
};
struct rte_event_queue_conf wkr_q_conf = {
.schedule_type = cdata.queue_type,
return 0;
}
+/*
+ * Initializes a given port using global settings and with the RX buffers
+ * coming from the mbuf_pool passed as a parameter.
+ */
+static inline int
+port_init(uint8_t port, struct rte_mempool *mbuf_pool)
+{
+ struct rte_eth_rxconf rx_conf;
+ static const struct rte_eth_conf port_conf_default = {
+ .rxmode = {
+ .mq_mode = RTE_ETH_MQ_RX_RSS,
+ },
+ .rx_adv_conf = {
+ .rss_conf = {
+ .rss_hf = RTE_ETH_RSS_IP |
+ RTE_ETH_RSS_TCP |
+ RTE_ETH_RSS_UDP,
+ }
+ }
+ };
+ const uint16_t rx_rings = 1, tx_rings = 1;
+ const uint16_t rx_ring_size = 512, tx_ring_size = 512;
+ struct rte_eth_conf port_conf = port_conf_default;
+ int retval;
+ uint16_t q;
+ struct rte_eth_dev_info dev_info;
+ struct rte_eth_txconf txconf;
+
+ if (!rte_eth_dev_is_valid_port(port))
+ return -1;
+
+ retval = rte_eth_dev_info_get(port, &dev_info);
+ if (retval != 0) {
+ printf("Error during getting device (port %u) info: %s\n",
+ port, strerror(-retval));
+ return retval;
+ }
+
+ if (dev_info.tx_offload_capa & RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE)
+ port_conf.txmode.offloads |=
+ RTE_ETH_TX_OFFLOAD_MBUF_FAST_FREE;
+ rx_conf = dev_info.default_rxconf;
+ rx_conf.offloads = port_conf.rxmode.offloads;
+
+ port_conf.rx_adv_conf.rss_conf.rss_hf &=
+ dev_info.flow_type_rss_offloads;
+ if (port_conf.rx_adv_conf.rss_conf.rss_hf !=
+ port_conf_default.rx_adv_conf.rss_conf.rss_hf) {
+ printf("Port %u modified RSS hash function based on hardware support,"
+ "requested:%#"PRIx64" configured:%#"PRIx64"\n",
+ port,
+ port_conf_default.rx_adv_conf.rss_conf.rss_hf,
+ port_conf.rx_adv_conf.rss_conf.rss_hf);
+ }
+
+ /* Configure the Ethernet device. */
+ retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf);
+ if (retval != 0)
+ return retval;
+
+ /* Allocate and set up 1 RX queue per Ethernet port. */
+ for (q = 0; q < rx_rings; q++) {
+ retval = rte_eth_rx_queue_setup(port, q, rx_ring_size,
+ rte_eth_dev_socket_id(port), &rx_conf,
+ mbuf_pool);
+ if (retval < 0)
+ return retval;
+ }
+
+ txconf = dev_info.default_txconf;
+ txconf.offloads = port_conf_default.txmode.offloads;
+ /* Allocate and set up 1 TX queue per Ethernet port. */
+ for (q = 0; q < tx_rings; q++) {
+ retval = rte_eth_tx_queue_setup(port, q, tx_ring_size,
+ rte_eth_dev_socket_id(port), &txconf);
+ if (retval < 0)
+ return retval;
+ }
+
+ /* Display the port MAC address. */
+ struct rte_ether_addr addr;
+ retval = rte_eth_macaddr_get(port, &addr);
+ if (retval != 0) {
+ printf("Failed to get MAC address (port %u): %s\n",
+ port, rte_strerror(-retval));
+ return retval;
+ }
+
+ printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
+ " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
+ (unsigned int)port, RTE_ETHER_ADDR_BYTES(&addr));
+
+ /* Enable RX in promiscuous mode for the Ethernet device. */
+ retval = rte_eth_promiscuous_enable(port);
+ if (retval != 0)
+ return retval;
+
+ return 0;
+}
+
+static int
+init_ports(uint16_t num_ports)
+{
+ uint16_t portid;
+
+ if (!cdata.num_mbuf)
+ cdata.num_mbuf = 16384 * num_ports;
+
+ struct rte_mempool *mp = rte_pktmbuf_pool_create("packet_pool",
+ /* mbufs */ cdata.num_mbuf,
+ /* cache_size */ 512,
+ /* priv_size*/ 0,
+ /* data_room_size */ RTE_MBUF_DEFAULT_BUF_SIZE,
+ rte_socket_id());
+
+ RTE_ETH_FOREACH_DEV(portid)
+ if (port_init(portid, mp) != 0)
+ rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu16 "\n",
+ portid);
+
+ return 0;
+}
+
static void
init_adapters(uint16_t nb_ports)
{
.dequeue_depth = cdata.worker_cq_depth,
.enqueue_depth = 64,
.new_event_threshold = 4096,
+ .event_port_cfg = RTE_EVENT_PORT_CFG_HINT_PRODUCER,
};
+ init_ports(nb_ports);
if (adptr_p_conf.new_event_threshold > dev_info.max_num_events)
adptr_p_conf.new_event_threshold = dev_info.max_num_events;
if (adptr_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)