}
static __rte_always_inline void
-worker_tx_pkt(struct rte_mbuf *mbuf)
+worker_tx_pkt(const uint8_t dev, const uint8_t port, struct rte_event *ev)
{
- exchange_mac(mbuf);
- while (rte_eth_tx_burst(mbuf->port, 0, &mbuf, 1) != 1)
+ exchange_mac(ev->mbuf);
+ rte_event_eth_tx_adapter_txq_set(ev->mbuf, 0);
+ while (!rte_event_eth_tx_adapter_enqueue(dev, port, ev, 1, 0))
rte_pause();
}
received++;
if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
- worker_tx_pkt(ev.mbuf);
+ worker_tx_pkt(dev, port, &ev);
tx++;
- continue;
+ } else {
+ work();
+ ev.queue_id++;
+ worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+ worker_event_enqueue(dev, port, &ev);
+ fwd++;
}
- work();
- ev.queue_id++;
- worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
- worker_event_enqueue(dev, port, &ev);
- fwd++;
}
if (!cdata.quiet)
received++;
if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
- worker_tx_pkt(ev.mbuf);
+ worker_tx_pkt(dev, port, &ev);
tx++;
- continue;
+ } else {
+ work();
+ worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
+ worker_event_enqueue(dev, port, &ev);
+ fwd++;
}
- work();
- worker_fwd_event(&ev, RTE_SCHED_TYPE_ATOMIC);
- worker_event_enqueue(dev, port, &ev);
- fwd++;
}
if (!cdata.quiet)
rte_prefetch0(ev[i + 1].mbuf);
if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
- worker_tx_pkt(ev[i].mbuf);
+ worker_tx_pkt(dev, port, &ev[i]);
ev[i].op = RTE_EVENT_OP_RELEASE;
tx++;
rte_prefetch0(ev[i + 1].mbuf);
if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
- worker_tx_pkt(ev[i].mbuf);
+ worker_tx_pkt(dev, port, &ev[i]);
ev[i].op = RTE_EVENT_OP_RELEASE;
tx++;
} else
if (cq_id >= lst_qid) {
if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
- worker_tx_pkt(ev.mbuf);
+ worker_tx_pkt(dev, port, &ev);
tx++;
continue;
}
if (cq_id == lst_qid) {
if (ev.sched_type == RTE_SCHED_TYPE_ATOMIC) {
- worker_tx_pkt(ev.mbuf);
+ worker_tx_pkt(dev, port, &ev);
tx++;
continue;
}
if (cq_id >= lst_qid) {
if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
- worker_tx_pkt(ev[i].mbuf);
+ worker_tx_pkt(dev, port, &ev[i]);
tx++;
ev[i].op = RTE_EVENT_OP_RELEASE;
continue;
if (cq_id == lst_qid) {
if (ev[i].sched_type == RTE_SCHED_TYPE_ATOMIC) {
- worker_tx_pkt(ev[i].mbuf);
+ worker_tx_pkt(dev, port, &ev[i]);
tx++;
ev[i].op = RTE_EVENT_OP_RELEASE;
continue;
}
static int
-setup_eventdev_worker_tx(struct cons_data *cons_data,
- struct worker_data *worker_data)
+setup_eventdev_worker_tx_enq(struct worker_data *worker_data)
{
- RTE_SET_USED(cons_data);
uint8_t i;
const uint8_t atq = cdata.all_type_queues ? 1 : 0;
const uint8_t dev_id = 0;
const uint8_t nb_ports = cdata.num_workers;
uint8_t nb_slots = 0;
- uint8_t nb_queues = rte_eth_dev_count();
+ uint8_t nb_queues = rte_eth_dev_count_avail();
/*
* In case where all type queues are not enabled, use queues equal to
*/
if (!atq) {
nb_queues *= cdata.num_stages;
- nb_queues += rte_eth_dev_count();
+ nb_queues += rte_eth_dev_count_avail();
}
struct rte_event_dev_config config = {
ret = rte_event_dev_info_get(dev_id, &dev_info);
printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name);
+ if (dev_info.max_num_events < config.nb_events_limit)
+ config.nb_events_limit = dev_info.max_num_events;
if (dev_info.max_event_port_dequeue_depth <
config.nb_event_port_dequeue_depth)
config.nb_event_port_dequeue_depth =
}
printf("\n");
+ if (wkr_p_conf.new_event_threshold > config.nb_events_limit)
+ wkr_p_conf.new_event_threshold = config.nb_events_limit;
if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
if (wkr_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
}
rte_service_runstate_set(fdata->evdev_service_id, 1);
rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0);
- if (rte_event_dev_start(dev_id) < 0) {
- printf("Error starting eventdev\n");
- return -1;
- }
+
+ if (rte_event_dev_start(dev_id) < 0)
+ rte_exit(EXIT_FAILURE, "Error starting eventdev");
return dev_id;
}
return 0;
}
+/*
+ * Initializes a given port using global settings and with the RX buffers
+ * coming from the mbuf_pool passed as a parameter.
+ */
+static inline int
+port_init(uint8_t port, struct rte_mempool *mbuf_pool)
+{
+ struct rte_eth_rxconf rx_conf;
+ static const struct rte_eth_conf port_conf_default = {
+ .rxmode = {
+ .mq_mode = ETH_MQ_RX_RSS,
+ .max_rx_pkt_len = RTE_ETHER_MAX_LEN,
+ },
+ .rx_adv_conf = {
+ .rss_conf = {
+ .rss_hf = ETH_RSS_IP |
+ ETH_RSS_TCP |
+ ETH_RSS_UDP,
+ }
+ }
+ };
+ const uint16_t rx_rings = 1, tx_rings = 1;
+ const uint16_t rx_ring_size = 512, tx_ring_size = 512;
+ struct rte_eth_conf port_conf = port_conf_default;
+ int retval;
+ uint16_t q;
+ struct rte_eth_dev_info dev_info;
+ struct rte_eth_txconf txconf;
+
+ if (!rte_eth_dev_is_valid_port(port))
+ return -1;
+
+ retval = rte_eth_dev_info_get(port, &dev_info);
+ if (retval != 0) {
+ printf("Error during getting device (port %u) info: %s\n",
+ port, strerror(-retval));
+ return retval;
+ }
+
+ if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
+ port_conf.txmode.offloads |=
+ DEV_TX_OFFLOAD_MBUF_FAST_FREE;
+ rx_conf = dev_info.default_rxconf;
+ rx_conf.offloads = port_conf.rxmode.offloads;
+
+ port_conf.rx_adv_conf.rss_conf.rss_hf &=
+ dev_info.flow_type_rss_offloads;
+ if (port_conf.rx_adv_conf.rss_conf.rss_hf !=
+ port_conf_default.rx_adv_conf.rss_conf.rss_hf) {
+ printf("Port %u modified RSS hash function based on hardware support,"
+ "requested:%#"PRIx64" configured:%#"PRIx64"\n",
+ port,
+ port_conf_default.rx_adv_conf.rss_conf.rss_hf,
+ port_conf.rx_adv_conf.rss_conf.rss_hf);
+ }
+
+ /* Configure the Ethernet device. */
+ retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf);
+ if (retval != 0)
+ return retval;
+
+ /* Allocate and set up 1 RX queue per Ethernet port. */
+ for (q = 0; q < rx_rings; q++) {
+ retval = rte_eth_rx_queue_setup(port, q, rx_ring_size,
+ rte_eth_dev_socket_id(port), &rx_conf,
+ mbuf_pool);
+ if (retval < 0)
+ return retval;
+ }
+
+ txconf = dev_info.default_txconf;
+ txconf.offloads = port_conf_default.txmode.offloads;
+ /* Allocate and set up 1 TX queue per Ethernet port. */
+ for (q = 0; q < tx_rings; q++) {
+ retval = rte_eth_tx_queue_setup(port, q, tx_ring_size,
+ rte_eth_dev_socket_id(port), &txconf);
+ if (retval < 0)
+ return retval;
+ }
+
+ /* Display the port MAC address. */
+ struct rte_ether_addr addr;
+ retval = rte_eth_macaddr_get(port, &addr);
+ if (retval != 0) {
+ printf("Failed to get MAC address (port %u): %s\n",
+ port, rte_strerror(-retval));
+ return retval;
+ }
+
+ printf("Port %u MAC: %02" PRIx8 " %02" PRIx8 " %02" PRIx8
+ " %02" PRIx8 " %02" PRIx8 " %02" PRIx8 "\n",
+ (unsigned int)port,
+ addr.addr_bytes[0], addr.addr_bytes[1],
+ addr.addr_bytes[2], addr.addr_bytes[3],
+ addr.addr_bytes[4], addr.addr_bytes[5]);
+
+ /* Enable RX in promiscuous mode for the Ethernet device. */
+ retval = rte_eth_promiscuous_enable(port);
+ if (retval != 0)
+ return retval;
+
+ return 0;
+}
+
+static int
+init_ports(uint16_t num_ports)
+{
+ uint16_t portid;
+
+ if (!cdata.num_mbuf)
+ cdata.num_mbuf = 16384 * num_ports;
+
+ struct rte_mempool *mp = rte_pktmbuf_pool_create("packet_pool",
+ /* mbufs */ cdata.num_mbuf,
+ /* cache_size */ 512,
+ /* priv_size*/ 0,
+ /* data_room_size */ RTE_MBUF_DEFAULT_BUF_SIZE,
+ rte_socket_id());
+
+ RTE_ETH_FOREACH_DEV(portid)
+ if (port_init(portid, mp) != 0)
+ rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu16 "\n",
+ portid);
+
+ return 0;
+}
+
static void
-init_rx_adapter(uint16_t nb_ports)
+init_adapters(uint16_t nb_ports)
{
int i;
int ret;
ret = rte_event_dev_info_get(evdev_id, &dev_info);
adptr_services = rte_zmalloc(NULL, sizeof(struct rx_adptr_services), 0);
- struct rte_event_port_conf rx_p_conf = {
- .dequeue_depth = 8,
- .enqueue_depth = 8,
- .new_event_threshold = 1200,
+ struct rte_event_port_conf adptr_p_conf = {
+ .dequeue_depth = cdata.worker_cq_depth,
+ .enqueue_depth = 64,
+ .new_event_threshold = 4096,
};
- if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
- rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth;
- if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
- rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth;
+ init_ports(nb_ports);
+ if (adptr_p_conf.new_event_threshold > dev_info.max_num_events)
+ adptr_p_conf.new_event_threshold = dev_info.max_num_events;
+ if (adptr_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
+ adptr_p_conf.dequeue_depth =
+ dev_info.max_event_port_dequeue_depth;
+ if (adptr_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
+ adptr_p_conf.enqueue_depth =
+ dev_info.max_event_port_enqueue_depth;
-
- struct rte_event_eth_rx_adapter_queue_conf queue_conf = {
- .ev.sched_type = cdata.queue_type,
- };
+ struct rte_event_eth_rx_adapter_queue_conf queue_conf;
+ memset(&queue_conf, 0, sizeof(queue_conf));
+ queue_conf.ev.sched_type = cdata.queue_type;
for (i = 0; i < nb_ports; i++) {
uint32_t cap;
uint32_t service_id;
- ret = rte_event_eth_rx_adapter_create(i, evdev_id, &rx_p_conf);
+ ret = rte_event_eth_rx_adapter_create(i, evdev_id,
+ &adptr_p_conf);
if (ret)
rte_exit(EXIT_FAILURE,
- "failed to create rx adapter[%d]",
- cdata.rx_adapter_id);
+ "failed to create rx adapter[%d]", i);
ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap);
if (ret)
rte_exit(EXIT_FAILURE,
"Failed to add queues to Rx adapter");
-
/* Producer needs to be scheduled. */
if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT)) {
ret = rte_event_eth_rx_adapter_service_id_get(i,
ret = rte_event_eth_rx_adapter_start(i);
if (ret)
rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed",
- cdata.rx_adapter_id);
+ i);
+ }
+
+ /* We already know that Tx adapter has INTERNAL port cap*/
+ ret = rte_event_eth_tx_adapter_create(cdata.tx_adapter_id, evdev_id,
+ &adptr_p_conf);
+ if (ret)
+ rte_exit(EXIT_FAILURE, "failed to create tx adapter[%d]",
+ cdata.tx_adapter_id);
+
+ for (i = 0; i < nb_ports; i++) {
+ ret = rte_event_eth_tx_adapter_queue_add(cdata.tx_adapter_id, i,
+ -1);
+ if (ret)
+ rte_exit(EXIT_FAILURE,
+ "Failed to add queues to Tx adapter");
}
+ ret = rte_event_eth_tx_adapter_start(cdata.tx_adapter_id);
+ if (ret)
+ rte_exit(EXIT_FAILURE, "Tx adapter[%d] start failed",
+ cdata.tx_adapter_id);
+
if (adptr_services->nb_rx_adptrs) {
struct rte_service_spec service;
&fdata->rxadptr_service_id);
if (ret)
rte_exit(EXIT_FAILURE,
- "Rx adapter[%d] service register failed",
- cdata.rx_adapter_id);
+ "Rx adapter service register failed");
rte_service_runstate_set(fdata->rxadptr_service_id, 1);
rte_service_component_runstate_set(fdata->rxadptr_service_id,
rte_free(adptr_services);
}
- if (!adptr_services->nb_rx_adptrs && fdata->cap.consumer == NULL &&
- (dev_info.event_dev_cap &
+ if (!adptr_services->nb_rx_adptrs && (dev_info.event_dev_cap &
RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))
fdata->cap.scheduler = NULL;
-
- if (dev_info.event_dev_cap & RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)
- memset(fdata->sched_core, 0,
- sizeof(unsigned int) * MAX_NUM_CORE);
}
static void
-worker_tx_opt_check(void)
+worker_tx_enq_opt_check(void)
{
int i;
int ret;
uint32_t cap = 0;
uint8_t rx_needed = 0;
+ uint8_t sched_needed = 0;
struct rte_event_dev_info eventdev_info;
memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES))
rte_exit(EXIT_FAILURE,
"Event dev doesn't support all type queues\n");
+ sched_needed = !(eventdev_info.event_dev_cap &
+ RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED);
- for (i = 0; i < rte_eth_dev_count(); i++) {
+ RTE_ETH_FOREACH_DEV(i) {
ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap);
if (ret)
rte_exit(EXIT_FAILURE,
- "failed to get event rx adapter "
- "capabilities");
+ "failed to get event rx adapter capabilities");
rx_needed |=
!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT);
}
if (cdata.worker_lcore_mask == 0 ||
(rx_needed && cdata.rx_lcore_mask == 0) ||
- (cdata.sched_lcore_mask == 0 &&
- !(eventdev_info.event_dev_cap &
- RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))) {
+ (sched_needed && cdata.sched_lcore_mask == 0)) {
printf("Core part of pipeline was not assigned any cores. "
"This will stall the pipeline, please check core masks "
"(use -h for details on setting core masks):\n"
- "\trx: %"PRIu64"\n\ttx: %"PRIu64"\n\tsched: %"PRIu64
- "\n\tworkers: %"PRIu64"\n",
- cdata.rx_lcore_mask, cdata.tx_lcore_mask,
- cdata.sched_lcore_mask,
- cdata.worker_lcore_mask);
+ "\trx: %"PRIu64"\n\tsched: %"PRIu64
+ "\n\tworkers: %"PRIu64"\n", cdata.rx_lcore_mask,
+ cdata.sched_lcore_mask, cdata.worker_lcore_mask);
rte_exit(-1, "Fix core masks\n");
}
+
+ if (!sched_needed)
+ memset(fdata->sched_core, 0,
+ sizeof(unsigned int) * MAX_NUM_CORE);
+ if (!rx_needed)
+ memset(fdata->rx_core, 0,
+ sizeof(unsigned int) * MAX_NUM_CORE);
+
+ memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE);
}
static worker_loop
}
void
-set_worker_tx_setup_data(struct setup_data *caps, bool burst)
+set_worker_tx_enq_setup_data(struct setup_data *caps, bool burst)
{
if (cdata.num_stages == 1)
caps->worker = get_worker_single_stage(burst);
else
caps->worker = get_worker_multi_stage(burst);
- memset(fdata->tx_core, 0, sizeof(unsigned int) * MAX_NUM_CORE);
-
- caps->check_opt = worker_tx_opt_check;
- caps->consumer = NULL;
+ caps->check_opt = worker_tx_enq_opt_check;
caps->scheduler = schedule_devices;
- caps->evdev_setup = setup_eventdev_worker_tx;
- caps->adptr_setup = init_rx_adapter;
+ caps->evdev_setup = setup_eventdev_worker_tx_enq;
+ caps->adptr_setup = init_adapters;
}