2 * SPDX-License-Identifier: BSD-3-Clause
3 * Copyright 2016 Intel Corporation.
4 * Copyright 2017 Cavium, Inc.
7 #include "pipeline_common.h"
9 static __rte_always_inline int
10 worker_generic(void *arg)
14 struct worker_data *data = (struct worker_data *)arg;
15 uint8_t dev_id = data->dev_id;
16 uint8_t port_id = data->port_id;
17 size_t sent = 0, received = 0;
18 unsigned int lcore_id = rte_lcore_id();
20 while (!fdata->done) {
22 if (fdata->cap.scheduler)
23 fdata->cap.scheduler(lcore_id);
25 if (!fdata->worker_core[lcore_id]) {
30 const uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
39 /* The first worker stage does classification */
40 if (ev.queue_id == cdata.qid[0])
41 ev.flow_id = ev.mbuf->hash.rss
44 ev.queue_id = cdata.next_qid[ev.queue_id];
45 ev.op = RTE_EVENT_OP_FORWARD;
46 ev.sched_type = cdata.queue_type;
50 while (rte_event_enqueue_burst(dev_id, port_id, &ev, 1) != 1)
56 printf(" worker %u thread done. RX=%zu TX=%zu\n",
57 rte_lcore_id(), received, sent);
63 worker_generic_burst(void *arg)
65 struct rte_event events[BATCH_SIZE];
67 struct worker_data *data = (struct worker_data *)arg;
68 uint8_t dev_id = data->dev_id;
69 uint8_t port_id = data->port_id;
70 size_t sent = 0, received = 0;
71 unsigned int lcore_id = rte_lcore_id();
73 while (!fdata->done) {
76 if (fdata->cap.scheduler)
77 fdata->cap.scheduler(lcore_id);
79 if (!fdata->worker_core[lcore_id]) {
84 const uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
85 events, RTE_DIM(events), 0);
93 for (i = 0; i < nb_rx; i++) {
95 /* The first worker stage does classification */
96 if (events[i].queue_id == cdata.qid[0])
97 events[i].flow_id = events[i].mbuf->hash.rss
100 events[i].queue_id = cdata.next_qid[events[i].queue_id];
101 events[i].op = RTE_EVENT_OP_FORWARD;
102 events[i].sched_type = cdata.queue_type;
104 work(events[i].mbuf);
106 uint16_t nb_tx = rte_event_enqueue_burst(dev_id, port_id,
108 while (nb_tx < nb_rx && !fdata->done)
109 nb_tx += rte_event_enqueue_burst(dev_id, port_id,
116 printf(" worker %u thread done. RX=%zu TX=%zu\n",
117 rte_lcore_id(), received, sent);
122 static __rte_always_inline int
125 const uint64_t freq_khz = rte_get_timer_hz() / 1000;
126 struct rte_event packet;
128 static uint64_t received;
129 static uint64_t last_pkts;
130 static uint64_t last_time;
131 static uint64_t start_time;
133 uint8_t dev_id = cons_data.dev_id;
134 uint8_t port_id = cons_data.port_id;
137 uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
141 for (i = 0; i < rte_eth_dev_count(); i++)
142 rte_eth_tx_buffer_flush(i, 0, fdata->tx_buf[i]);
146 last_time = start_time = rte_get_timer_cycles();
149 uint8_t outport = packet.mbuf->port;
151 rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
154 if (cons_data.release)
155 rte_event_enqueue_burst(dev_id, port_id,
158 /* Print out mpps every 1<22 packets */
159 if (!cdata.quiet && received >= last_pkts + (1<<22)) {
160 const uint64_t now = rte_get_timer_cycles();
161 const uint64_t total_ms = (now - start_time) / freq_khz;
162 const uint64_t delta_ms = (now - last_time) / freq_khz;
163 uint64_t delta_pkts = received - last_pkts;
165 printf("# %s RX=%"PRIu64", time %"PRIu64 "ms, "
166 "avg %.3f mpps [current %.3f mpps]\n",
170 received / (total_ms * 1000.0),
171 delta_pkts / (delta_ms * 1000.0));
172 last_pkts = received;
177 if (cdata.num_packets <= 0)
179 /* Be stuck in this loop if single. */
180 } while (!fdata->done && fdata->tx_single);
185 static __rte_always_inline int
188 const uint64_t freq_khz = rte_get_timer_hz() / 1000;
189 struct rte_event packets[BATCH_SIZE];
191 static uint64_t received;
192 static uint64_t last_pkts;
193 static uint64_t last_time;
194 static uint64_t start_time;
196 uint8_t dev_id = cons_data.dev_id;
197 uint8_t port_id = cons_data.port_id;
198 uint16_t nb_ports = rte_eth_dev_count();
201 uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
202 packets, RTE_DIM(packets), 0);
205 for (j = 0; j < nb_ports; j++)
206 rte_eth_tx_buffer_flush(j, 0, fdata->tx_buf[j]);
210 last_time = start_time = rte_get_timer_cycles();
213 for (i = 0; i < n; i++) {
214 uint8_t outport = packets[i].mbuf->port;
215 rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
218 packets[i].op = RTE_EVENT_OP_RELEASE;
221 if (cons_data.release) {
224 nb_tx = rte_event_enqueue_burst(dev_id, port_id,
227 nb_tx += rte_event_enqueue_burst(dev_id,
228 port_id, packets + nb_tx,
232 /* Print out mpps every 1<22 packets */
233 if (!cdata.quiet && received >= last_pkts + (1<<22)) {
234 const uint64_t now = rte_get_timer_cycles();
235 const uint64_t total_ms = (now - start_time) / freq_khz;
236 const uint64_t delta_ms = (now - last_time) / freq_khz;
237 uint64_t delta_pkts = received - last_pkts;
239 printf("# consumer RX=%"PRIu64", time %"PRIu64 "ms, "
240 "avg %.3f mpps [current %.3f mpps]\n",
243 received / (total_ms * 1000.0),
244 delta_pkts / (delta_ms * 1000.0));
245 last_pkts = received;
249 cdata.num_packets -= n;
250 if (cdata.num_packets <= 0)
252 /* Be stuck in this loop if single. */
253 } while (!fdata->done && fdata->tx_single);
259 setup_eventdev_generic(struct cons_data *cons_data,
260 struct worker_data *worker_data)
262 const uint8_t dev_id = 0;
263 /* +1 stages is for a SINGLE_LINK TX stage */
264 const uint8_t nb_queues = cdata.num_stages + 1;
265 /* + 1 is one port for consumer */
266 const uint8_t nb_ports = cdata.num_workers + 1;
267 struct rte_event_dev_config config = {
268 .nb_event_queues = nb_queues,
269 .nb_event_ports = nb_ports,
270 .nb_events_limit = 4096,
271 .nb_event_queue_flows = 1024,
272 .nb_event_port_dequeue_depth = 128,
273 .nb_event_port_enqueue_depth = 128,
275 struct rte_event_port_conf wkr_p_conf = {
276 .dequeue_depth = cdata.worker_cq_depth,
278 .new_event_threshold = 4096,
280 struct rte_event_queue_conf wkr_q_conf = {
281 .schedule_type = cdata.queue_type,
282 .priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
283 .nb_atomic_flows = 1024,
284 .nb_atomic_order_sequences = 1024,
286 struct rte_event_port_conf tx_p_conf = {
287 .dequeue_depth = 128,
288 .enqueue_depth = 128,
289 .new_event_threshold = 4096,
291 struct rte_event_queue_conf tx_q_conf = {
292 .priority = RTE_EVENT_DEV_PRIORITY_HIGHEST,
293 .event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_LINK,
296 struct port_link worker_queues[MAX_NUM_STAGES];
297 uint8_t disable_implicit_release;
298 struct port_link tx_queue;
301 int ret, ndev = rte_event_dev_count();
303 printf("%d: No Eventdev Devices Found\n", __LINE__);
307 struct rte_event_dev_info dev_info;
308 ret = rte_event_dev_info_get(dev_id, &dev_info);
309 printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name);
311 disable_implicit_release = (dev_info.event_dev_cap &
312 RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE);
314 wkr_p_conf.disable_implicit_release = disable_implicit_release;
315 tx_p_conf.disable_implicit_release = disable_implicit_release;
317 if (dev_info.max_event_port_dequeue_depth <
318 config.nb_event_port_dequeue_depth)
319 config.nb_event_port_dequeue_depth =
320 dev_info.max_event_port_dequeue_depth;
321 if (dev_info.max_event_port_enqueue_depth <
322 config.nb_event_port_enqueue_depth)
323 config.nb_event_port_enqueue_depth =
324 dev_info.max_event_port_enqueue_depth;
326 ret = rte_event_dev_configure(dev_id, &config);
328 printf("%d: Error configuring device\n", __LINE__);
332 /* Q creation - one load balanced per pipeline stage*/
333 printf(" Stages:\n");
334 for (i = 0; i < cdata.num_stages; i++) {
335 if (rte_event_queue_setup(dev_id, i, &wkr_q_conf) < 0) {
336 printf("%d: error creating qid %d\n", __LINE__, i);
340 cdata.next_qid[i] = i+1;
341 worker_queues[i].queue_id = i;
342 if (cdata.enable_queue_priorities) {
343 /* calculate priority stepping for each stage, leaving
344 * headroom of 1 for the SINGLE_LINK TX below
346 const uint32_t prio_delta =
347 (RTE_EVENT_DEV_PRIORITY_LOWEST-1) / nb_queues;
349 /* higher priority for queues closer to tx */
350 wkr_q_conf.priority =
351 RTE_EVENT_DEV_PRIORITY_LOWEST - prio_delta * i;
354 const char *type_str = "Atomic";
355 switch (wkr_q_conf.schedule_type) {
356 case RTE_SCHED_TYPE_ORDERED:
357 type_str = "Ordered";
359 case RTE_SCHED_TYPE_PARALLEL:
360 type_str = "Parallel";
363 printf("\tStage %d, Type %s\tPriority = %d\n", i, type_str,
364 wkr_q_conf.priority);
368 /* final queue for sending to TX core */
369 if (rte_event_queue_setup(dev_id, i, &tx_q_conf) < 0) {
370 printf("%d: error creating qid %d\n", __LINE__, i);
373 tx_queue.queue_id = i;
374 tx_queue.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST;
376 if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
377 wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
378 if (wkr_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
379 wkr_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
381 /* set up one port per worker, linking to all stage queues */
382 for (i = 0; i < cdata.num_workers; i++) {
383 struct worker_data *w = &worker_data[i];
385 if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) {
386 printf("Error setting up port %d\n", i);
391 for (s = 0; s < cdata.num_stages; s++) {
392 if (rte_event_port_link(dev_id, i,
393 &worker_queues[s].queue_id,
394 &worker_queues[s].priority,
396 printf("%d: error creating link for port %d\n",
404 if (tx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
405 tx_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
406 if (tx_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
407 tx_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
409 /* port for consumer, linked to TX queue */
410 if (rte_event_port_setup(dev_id, i, &tx_p_conf) < 0) {
411 printf("Error setting up port %d\n", i);
414 if (rte_event_port_link(dev_id, i, &tx_queue.queue_id,
415 &tx_queue.priority, 1) != 1) {
416 printf("%d: error creating link for port %d\n",
420 *cons_data = (struct cons_data){.dev_id = dev_id,
422 .release = disable_implicit_release };
424 ret = rte_event_dev_service_id_get(dev_id,
425 &fdata->evdev_service_id);
426 if (ret != -ESRCH && ret != 0) {
427 printf("Error getting the service ID for sw eventdev\n");
430 rte_service_runstate_set(fdata->evdev_service_id, 1);
431 rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0);
432 if (rte_event_dev_start(dev_id) < 0) {
433 printf("Error starting eventdev\n");
441 init_rx_adapter(uint16_t nb_ports)
445 uint8_t evdev_id = 0;
446 struct rte_event_dev_info dev_info;
448 ret = rte_event_dev_info_get(evdev_id, &dev_info);
450 struct rte_event_port_conf rx_p_conf = {
453 .new_event_threshold = 1200,
456 if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
457 rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth;
458 if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
459 rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth;
461 /* Create one adapter for all the ethernet ports. */
462 ret = rte_event_eth_rx_adapter_create(cdata.rx_adapter_id, evdev_id,
465 rte_exit(EXIT_FAILURE, "failed to create rx adapter[%d]",
466 cdata.rx_adapter_id);
468 struct rte_event_eth_rx_adapter_queue_conf queue_conf = {
469 .ev.sched_type = cdata.queue_type,
470 .ev.queue_id = cdata.qid[0],
473 for (i = 0; i < nb_ports; i++) {
476 ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap);
478 rte_exit(EXIT_FAILURE,
479 "failed to get event rx adapter "
482 ret = rte_event_eth_rx_adapter_queue_add(cdata.rx_adapter_id, i,
485 rte_exit(EXIT_FAILURE,
486 "Failed to add queues to Rx adapter");
489 ret = rte_event_eth_rx_adapter_service_id_get(cdata.rx_adapter_id,
490 &fdata->rxadptr_service_id);
491 if (ret != -ESRCH && ret != 0) {
492 rte_exit(EXIT_FAILURE,
493 "Error getting the service ID for sw eventdev\n");
495 rte_service_runstate_set(fdata->rxadptr_service_id, 1);
496 rte_service_set_runstate_mapped_check(fdata->rxadptr_service_id, 0);
498 ret = rte_event_eth_rx_adapter_start(cdata.rx_adapter_id);
500 rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed",
501 cdata.rx_adapter_id);
505 generic_opt_check(void)
510 uint8_t rx_needed = 0;
511 struct rte_event_dev_info eventdev_info;
513 memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
514 rte_event_dev_info_get(0, &eventdev_info);
516 for (i = 0; i < rte_eth_dev_count(); i++) {
517 ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap);
519 rte_exit(EXIT_FAILURE,
520 "failed to get event rx adapter capabilities");
522 !(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT);
525 if (cdata.worker_lcore_mask == 0 ||
526 (rx_needed && cdata.rx_lcore_mask == 0) ||
527 cdata.tx_lcore_mask == 0 || (cdata.sched_lcore_mask == 0
528 && !(eventdev_info.event_dev_cap &
529 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))) {
530 printf("Core part of pipeline was not assigned any cores. "
531 "This will stall the pipeline, please check core masks "
532 "(use -h for details on setting core masks):\n"
533 "\trx: %"PRIu64"\n\ttx: %"PRIu64"\n\tsched: %"PRIu64
534 "\n\tworkers: %"PRIu64"\n",
535 cdata.rx_lcore_mask, cdata.tx_lcore_mask,
536 cdata.sched_lcore_mask,
537 cdata.worker_lcore_mask);
538 rte_exit(-1, "Fix core masks\n");
541 if (eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)
542 memset(fdata->sched_core, 0,
543 sizeof(unsigned int) * MAX_NUM_CORE);
547 set_worker_generic_setup_data(struct setup_data *caps, bool burst)
550 caps->consumer = consumer_burst;
551 caps->worker = worker_generic_burst;
553 caps->consumer = consumer;
554 caps->worker = worker_generic;
557 caps->adptr_setup = init_rx_adapter;
558 caps->scheduler = schedule_devices;
559 caps->evdev_setup = setup_eventdev_generic;
560 caps->check_opt = generic_opt_check;