2 * SPDX-License-Identifier: BSD-3-Clause
3 * Copyright 2016 Intel Corporation.
4 * Copyright 2017 Cavium, Inc.
7 #include "pipeline_common.h"
9 static __rte_always_inline int
10 worker_generic(void *arg)
14 struct worker_data *data = (struct worker_data *)arg;
15 uint8_t dev_id = data->dev_id;
16 uint8_t port_id = data->port_id;
17 size_t sent = 0, received = 0;
18 unsigned int lcore_id = rte_lcore_id();
20 while (!fdata->done) {
22 if (fdata->cap.scheduler)
23 fdata->cap.scheduler(lcore_id);
25 if (!fdata->worker_core[lcore_id]) {
30 const uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
39 /* The first worker stage does classification */
40 if (ev.queue_id == cdata.qid[0])
41 ev.flow_id = ev.mbuf->hash.rss
44 ev.queue_id = cdata.next_qid[ev.queue_id];
45 ev.op = RTE_EVENT_OP_FORWARD;
46 ev.sched_type = cdata.queue_type;
50 while (rte_event_enqueue_burst(dev_id, port_id, &ev, 1) != 1)
56 printf(" worker %u thread done. RX=%zu TX=%zu\n",
57 rte_lcore_id(), received, sent);
63 worker_generic_burst(void *arg)
65 struct rte_event events[BATCH_SIZE];
67 struct worker_data *data = (struct worker_data *)arg;
68 uint8_t dev_id = data->dev_id;
69 uint8_t port_id = data->port_id;
70 size_t sent = 0, received = 0;
71 unsigned int lcore_id = rte_lcore_id();
73 while (!fdata->done) {
76 if (fdata->cap.scheduler)
77 fdata->cap.scheduler(lcore_id);
79 if (!fdata->worker_core[lcore_id]) {
84 const uint16_t nb_rx = rte_event_dequeue_burst(dev_id, port_id,
85 events, RTE_DIM(events), 0);
93 for (i = 0; i < nb_rx; i++) {
95 /* The first worker stage does classification */
96 if (events[i].queue_id == cdata.qid[0])
97 events[i].flow_id = events[i].mbuf->hash.rss
100 events[i].queue_id = cdata.next_qid[events[i].queue_id];
101 events[i].op = RTE_EVENT_OP_FORWARD;
102 events[i].sched_type = cdata.queue_type;
106 uint16_t nb_tx = rte_event_enqueue_burst(dev_id, port_id,
108 while (nb_tx < nb_rx && !fdata->done)
109 nb_tx += rte_event_enqueue_burst(dev_id, port_id,
116 printf(" worker %u thread done. RX=%zu TX=%zu\n",
117 rte_lcore_id(), received, sent);
122 static __rte_always_inline int
125 const uint64_t freq_khz = rte_get_timer_hz() / 1000;
126 struct rte_event packet;
128 static uint64_t received;
129 static uint64_t last_pkts;
130 static uint64_t last_time;
131 static uint64_t start_time;
133 uint8_t dev_id = cons_data.dev_id;
134 uint8_t port_id = cons_data.port_id;
137 uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
141 for (i = 0; i < rte_eth_dev_count(); i++)
142 rte_eth_tx_buffer_flush(i, 0, fdata->tx_buf[i]);
146 last_time = start_time = rte_get_timer_cycles();
149 uint8_t outport = packet.mbuf->port;
151 exchange_mac(packet.mbuf);
152 rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
155 if (cons_data.release)
156 rte_event_enqueue_burst(dev_id, port_id,
159 /* Print out mpps every 1<22 packets */
160 if (!cdata.quiet && received >= last_pkts + (1<<22)) {
161 const uint64_t now = rte_get_timer_cycles();
162 const uint64_t total_ms = (now - start_time) / freq_khz;
163 const uint64_t delta_ms = (now - last_time) / freq_khz;
164 uint64_t delta_pkts = received - last_pkts;
166 printf("# %s RX=%"PRIu64", time %"PRIu64 "ms, "
167 "avg %.3f mpps [current %.3f mpps]\n",
171 received / (total_ms * 1000.0),
172 delta_pkts / (delta_ms * 1000.0));
173 last_pkts = received;
178 if (cdata.num_packets <= 0)
180 /* Be stuck in this loop if single. */
181 } while (!fdata->done && fdata->tx_single);
186 static __rte_always_inline int
189 const uint64_t freq_khz = rte_get_timer_hz() / 1000;
190 struct rte_event packets[BATCH_SIZE];
192 static uint64_t received;
193 static uint64_t last_pkts;
194 static uint64_t last_time;
195 static uint64_t start_time;
197 uint8_t dev_id = cons_data.dev_id;
198 uint8_t port_id = cons_data.port_id;
199 uint16_t nb_ports = rte_eth_dev_count();
202 uint16_t n = rte_event_dequeue_burst(dev_id, port_id,
203 packets, RTE_DIM(packets), 0);
206 for (j = 0; j < nb_ports; j++)
207 rte_eth_tx_buffer_flush(j, 0, fdata->tx_buf[j]);
211 last_time = start_time = rte_get_timer_cycles();
214 for (i = 0; i < n; i++) {
215 uint8_t outport = packets[i].mbuf->port;
217 exchange_mac(packets[i].mbuf);
218 rte_eth_tx_buffer(outport, 0, fdata->tx_buf[outport],
221 packets[i].op = RTE_EVENT_OP_RELEASE;
224 if (cons_data.release) {
227 nb_tx = rte_event_enqueue_burst(dev_id, port_id,
230 nb_tx += rte_event_enqueue_burst(dev_id,
231 port_id, packets + nb_tx,
235 /* Print out mpps every 1<22 packets */
236 if (!cdata.quiet && received >= last_pkts + (1<<22)) {
237 const uint64_t now = rte_get_timer_cycles();
238 const uint64_t total_ms = (now - start_time) / freq_khz;
239 const uint64_t delta_ms = (now - last_time) / freq_khz;
240 uint64_t delta_pkts = received - last_pkts;
242 printf("# consumer RX=%"PRIu64", time %"PRIu64 "ms, "
243 "avg %.3f mpps [current %.3f mpps]\n",
246 received / (total_ms * 1000.0),
247 delta_pkts / (delta_ms * 1000.0));
248 last_pkts = received;
252 cdata.num_packets -= n;
253 if (cdata.num_packets <= 0)
255 /* Be stuck in this loop if single. */
256 } while (!fdata->done && fdata->tx_single);
262 setup_eventdev_generic(struct cons_data *cons_data,
263 struct worker_data *worker_data)
265 const uint8_t dev_id = 0;
266 /* +1 stages is for a SINGLE_LINK TX stage */
267 const uint8_t nb_queues = cdata.num_stages + 1;
268 /* + 1 is one port for consumer */
269 const uint8_t nb_ports = cdata.num_workers + 1;
270 struct rte_event_dev_config config = {
271 .nb_event_queues = nb_queues,
272 .nb_event_ports = nb_ports,
273 .nb_events_limit = 4096,
274 .nb_event_queue_flows = 1024,
275 .nb_event_port_dequeue_depth = 128,
276 .nb_event_port_enqueue_depth = 128,
278 struct rte_event_port_conf wkr_p_conf = {
279 .dequeue_depth = cdata.worker_cq_depth,
281 .new_event_threshold = 4096,
283 struct rte_event_queue_conf wkr_q_conf = {
284 .schedule_type = cdata.queue_type,
285 .priority = RTE_EVENT_DEV_PRIORITY_NORMAL,
286 .nb_atomic_flows = 1024,
287 .nb_atomic_order_sequences = 1024,
289 struct rte_event_port_conf tx_p_conf = {
290 .dequeue_depth = 128,
291 .enqueue_depth = 128,
292 .new_event_threshold = 4096,
294 struct rte_event_queue_conf tx_q_conf = {
295 .priority = RTE_EVENT_DEV_PRIORITY_HIGHEST,
296 .event_queue_cfg = RTE_EVENT_QUEUE_CFG_SINGLE_LINK,
299 struct port_link worker_queues[MAX_NUM_STAGES];
300 uint8_t disable_implicit_release;
301 struct port_link tx_queue;
304 int ret, ndev = rte_event_dev_count();
306 printf("%d: No Eventdev Devices Found\n", __LINE__);
310 struct rte_event_dev_info dev_info;
311 ret = rte_event_dev_info_get(dev_id, &dev_info);
312 printf("\tEventdev %d: %s\n", dev_id, dev_info.driver_name);
314 disable_implicit_release = (dev_info.event_dev_cap &
315 RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE);
317 wkr_p_conf.disable_implicit_release = disable_implicit_release;
318 tx_p_conf.disable_implicit_release = disable_implicit_release;
320 if (dev_info.max_event_port_dequeue_depth <
321 config.nb_event_port_dequeue_depth)
322 config.nb_event_port_dequeue_depth =
323 dev_info.max_event_port_dequeue_depth;
324 if (dev_info.max_event_port_enqueue_depth <
325 config.nb_event_port_enqueue_depth)
326 config.nb_event_port_enqueue_depth =
327 dev_info.max_event_port_enqueue_depth;
329 ret = rte_event_dev_configure(dev_id, &config);
331 printf("%d: Error configuring device\n", __LINE__);
335 /* Q creation - one load balanced per pipeline stage*/
336 printf(" Stages:\n");
337 for (i = 0; i < cdata.num_stages; i++) {
338 if (rte_event_queue_setup(dev_id, i, &wkr_q_conf) < 0) {
339 printf("%d: error creating qid %d\n", __LINE__, i);
343 cdata.next_qid[i] = i+1;
344 worker_queues[i].queue_id = i;
345 if (cdata.enable_queue_priorities) {
346 /* calculate priority stepping for each stage, leaving
347 * headroom of 1 for the SINGLE_LINK TX below
349 const uint32_t prio_delta =
350 (RTE_EVENT_DEV_PRIORITY_LOWEST-1) / nb_queues;
352 /* higher priority for queues closer to tx */
353 wkr_q_conf.priority =
354 RTE_EVENT_DEV_PRIORITY_LOWEST - prio_delta * i;
357 const char *type_str = "Atomic";
358 switch (wkr_q_conf.schedule_type) {
359 case RTE_SCHED_TYPE_ORDERED:
360 type_str = "Ordered";
362 case RTE_SCHED_TYPE_PARALLEL:
363 type_str = "Parallel";
366 printf("\tStage %d, Type %s\tPriority = %d\n", i, type_str,
367 wkr_q_conf.priority);
371 /* final queue for sending to TX core */
372 if (rte_event_queue_setup(dev_id, i, &tx_q_conf) < 0) {
373 printf("%d: error creating qid %d\n", __LINE__, i);
376 tx_queue.queue_id = i;
377 tx_queue.priority = RTE_EVENT_DEV_PRIORITY_HIGHEST;
379 if (wkr_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
380 wkr_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
381 if (wkr_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
382 wkr_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
384 /* set up one port per worker, linking to all stage queues */
385 for (i = 0; i < cdata.num_workers; i++) {
386 struct worker_data *w = &worker_data[i];
388 if (rte_event_port_setup(dev_id, i, &wkr_p_conf) < 0) {
389 printf("Error setting up port %d\n", i);
394 for (s = 0; s < cdata.num_stages; s++) {
395 if (rte_event_port_link(dev_id, i,
396 &worker_queues[s].queue_id,
397 &worker_queues[s].priority,
399 printf("%d: error creating link for port %d\n",
407 if (tx_p_conf.dequeue_depth > config.nb_event_port_dequeue_depth)
408 tx_p_conf.dequeue_depth = config.nb_event_port_dequeue_depth;
409 if (tx_p_conf.enqueue_depth > config.nb_event_port_enqueue_depth)
410 tx_p_conf.enqueue_depth = config.nb_event_port_enqueue_depth;
412 /* port for consumer, linked to TX queue */
413 if (rte_event_port_setup(dev_id, i, &tx_p_conf) < 0) {
414 printf("Error setting up port %d\n", i);
417 if (rte_event_port_link(dev_id, i, &tx_queue.queue_id,
418 &tx_queue.priority, 1) != 1) {
419 printf("%d: error creating link for port %d\n",
423 *cons_data = (struct cons_data){.dev_id = dev_id,
425 .release = disable_implicit_release };
427 ret = rte_event_dev_service_id_get(dev_id,
428 &fdata->evdev_service_id);
429 if (ret != -ESRCH && ret != 0) {
430 printf("Error getting the service ID for sw eventdev\n");
433 rte_service_runstate_set(fdata->evdev_service_id, 1);
434 rte_service_set_runstate_mapped_check(fdata->evdev_service_id, 0);
435 if (rte_event_dev_start(dev_id) < 0) {
436 printf("Error starting eventdev\n");
444 init_rx_adapter(uint16_t nb_ports)
448 uint8_t evdev_id = 0;
449 struct rte_event_dev_info dev_info;
451 ret = rte_event_dev_info_get(evdev_id, &dev_info);
453 struct rte_event_port_conf rx_p_conf = {
456 .new_event_threshold = 1200,
459 if (rx_p_conf.dequeue_depth > dev_info.max_event_port_dequeue_depth)
460 rx_p_conf.dequeue_depth = dev_info.max_event_port_dequeue_depth;
461 if (rx_p_conf.enqueue_depth > dev_info.max_event_port_enqueue_depth)
462 rx_p_conf.enqueue_depth = dev_info.max_event_port_enqueue_depth;
464 /* Create one adapter for all the ethernet ports. */
465 ret = rte_event_eth_rx_adapter_create(cdata.rx_adapter_id, evdev_id,
468 rte_exit(EXIT_FAILURE, "failed to create rx adapter[%d]",
469 cdata.rx_adapter_id);
471 struct rte_event_eth_rx_adapter_queue_conf queue_conf;
472 memset(&queue_conf, 0, sizeof(queue_conf));
473 queue_conf.ev.sched_type = cdata.queue_type;
474 queue_conf.ev.queue_id = cdata.qid[0];
476 for (i = 0; i < nb_ports; i++) {
479 ret = rte_event_eth_rx_adapter_caps_get(evdev_id, i, &cap);
481 rte_exit(EXIT_FAILURE,
482 "failed to get event rx adapter "
485 ret = rte_event_eth_rx_adapter_queue_add(cdata.rx_adapter_id, i,
488 rte_exit(EXIT_FAILURE,
489 "Failed to add queues to Rx adapter");
492 ret = rte_event_eth_rx_adapter_service_id_get(cdata.rx_adapter_id,
493 &fdata->rxadptr_service_id);
494 if (ret != -ESRCH && ret != 0) {
495 rte_exit(EXIT_FAILURE,
496 "Error getting the service ID for sw eventdev\n");
498 rte_service_runstate_set(fdata->rxadptr_service_id, 1);
499 rte_service_set_runstate_mapped_check(fdata->rxadptr_service_id, 0);
501 ret = rte_event_eth_rx_adapter_start(cdata.rx_adapter_id);
503 rte_exit(EXIT_FAILURE, "Rx adapter[%d] start failed",
504 cdata.rx_adapter_id);
508 generic_opt_check(void)
513 uint8_t rx_needed = 0;
514 struct rte_event_dev_info eventdev_info;
516 memset(&eventdev_info, 0, sizeof(struct rte_event_dev_info));
517 rte_event_dev_info_get(0, &eventdev_info);
519 if (cdata.all_type_queues && !(eventdev_info.event_dev_cap &
520 RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES))
521 rte_exit(EXIT_FAILURE,
522 "Event dev doesn't support all type queues\n");
524 for (i = 0; i < rte_eth_dev_count(); i++) {
525 ret = rte_event_eth_rx_adapter_caps_get(0, i, &cap);
527 rte_exit(EXIT_FAILURE,
528 "failed to get event rx adapter capabilities");
530 !(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT);
533 if (cdata.worker_lcore_mask == 0 ||
534 (rx_needed && cdata.rx_lcore_mask == 0) ||
535 cdata.tx_lcore_mask == 0 || (cdata.sched_lcore_mask == 0
536 && !(eventdev_info.event_dev_cap &
537 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED))) {
538 printf("Core part of pipeline was not assigned any cores. "
539 "This will stall the pipeline, please check core masks "
540 "(use -h for details on setting core masks):\n"
541 "\trx: %"PRIu64"\n\ttx: %"PRIu64"\n\tsched: %"PRIu64
542 "\n\tworkers: %"PRIu64"\n",
543 cdata.rx_lcore_mask, cdata.tx_lcore_mask,
544 cdata.sched_lcore_mask,
545 cdata.worker_lcore_mask);
546 rte_exit(-1, "Fix core masks\n");
549 if (eventdev_info.event_dev_cap & RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED)
550 memset(fdata->sched_core, 0,
551 sizeof(unsigned int) * MAX_NUM_CORE);
555 set_worker_generic_setup_data(struct setup_data *caps, bool burst)
558 caps->consumer = consumer_burst;
559 caps->worker = worker_generic_burst;
561 caps->consumer = consumer;
562 caps->worker = worker_generic;
565 caps->adptr_setup = init_rx_adapter;
566 caps->scheduler = schedule_devices;
567 caps->evdev_setup = setup_eventdev_generic;
568 caps->check_opt = generic_opt_check;