1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2017 Intel Corporation
8 #include <rte_bus_vdev.h>
10 #include <rte_cycles.h>
11 #include <rte_memzone.h>
13 #include "opdl_evdev.h"
14 #include "opdl_ring.h"
18 static __rte_always_inline uint32_t
19 enqueue_check(struct opdl_port *p,
20 const struct rte_event ev[],
26 if (p->opdl->do_validation) {
28 for (i = 0; i < num; i++) {
29 if (ev[i].queue_id != p->next_external_qid) {
30 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
31 "ERROR - port:[%u] - event wants"
32 " to enq to q_id[%u],"
33 " but should be [%u]",
34 opdl_pmd_dev_id(p->opdl),
37 p->next_external_qid);
44 if (p->p_type == OPDL_PURE_RX_PORT ||
45 p->p_type == OPDL_ASYNC_PORT) {
48 p->port_stat[claim_pkts_requested] += num;
49 p->port_stat[claim_pkts_granted] += num_events;
50 p->port_stat[claim_non_empty]++;
51 p->start_cycles = rte_rdtsc();
53 p->port_stat[claim_empty]++;
57 if (p->start_cycles) {
58 uint64_t end_cycles = rte_rdtsc();
59 p->port_stat[total_cycles] +=
60 end_cycles - p->start_cycles;
65 ev[0].queue_id != p->next_external_qid) {
74 static __rte_always_inline void
75 update_on_dequeue(struct opdl_port *p,
76 struct rte_event ev[],
80 if (p->opdl->do_validation) {
82 for (i = 0; i < num; i++)
84 p->opdl->queue[p->queue_id].external_qid;
88 p->port_stat[claim_pkts_requested] += num;
89 p->port_stat[claim_pkts_granted] += num_events;
90 p->port_stat[claim_non_empty]++;
91 p->start_cycles = rte_rdtsc();
93 p->port_stat[claim_empty]++;
99 p->opdl->queue[p->queue_id].external_qid;
111 opdl_rx_error_enqueue(struct opdl_port *p,
112 const struct rte_event ev[],
127 * This function handles enqueue for a single input stage_inst with
128 * threadsafe disabled or enabled. eg 1 thread using a stage_inst or
129 * multiple threads sharing a stage_inst
133 opdl_rx_enqueue(struct opdl_port *p,
134 const struct rte_event ev[],
137 uint16_t enqueued = 0;
139 enqueued = opdl_ring_input(opdl_stage_get_opdl_ring(p->enq_stage_inst),
143 if (!enqueue_check(p, ev, num, enqueued))
159 opdl_tx_error_dequeue(struct opdl_port *p,
160 struct rte_event ev[],
173 * TX single threaded claim
175 * This function handles dequeue for a single worker stage_inst with
176 * threadsafe disabled. eg 1 thread using an stage_inst
180 opdl_tx_dequeue_single_thread(struct opdl_port *p,
181 struct rte_event ev[],
186 struct opdl_ring *ring;
188 ring = opdl_stage_get_opdl_ring(p->deq_stage_inst);
190 returned = opdl_ring_copy_to_burst(ring,
196 update_on_dequeue(p, ev, num, returned);
202 * TX multi threaded claim
204 * This function handles dequeue for multiple worker stage_inst with
205 * threadsafe disabled. eg multiple stage_inst each with its own instance
209 opdl_tx_dequeue_multi_inst(struct opdl_port *p,
210 struct rte_event ev[],
213 uint32_t num_events = 0;
215 num_events = opdl_stage_claim(p->deq_stage_inst,
222 update_on_dequeue(p, ev, num, num_events);
224 return opdl_stage_disclaim(p->deq_stage_inst, num_events, false);
229 * Worker thread claim
234 opdl_claim(struct opdl_port *p, struct rte_event ev[], uint16_t num)
236 uint32_t num_events = 0;
238 if (unlikely(num > MAX_OPDL_CONS_Q_DEPTH)) {
239 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
240 "Attempt to dequeue num of events larger than port (%d) max",
241 opdl_pmd_dev_id(p->opdl),
248 num_events = opdl_stage_claim(p->deq_stage_inst,
256 update_on_dequeue(p, ev, num, num_events);
262 * Worker thread disclaim
266 opdl_disclaim(struct opdl_port *p, const struct rte_event ev[], uint16_t num)
268 uint16_t enqueued = 0;
272 for (i = 0; i < num; i++)
273 opdl_ring_cas_slot(p->enq_stage_inst, &ev[i],
276 enqueued = opdl_stage_disclaim(p->enq_stage_inst,
280 return enqueue_check(p, ev, num, enqueued);
283 static __rte_always_inline struct opdl_stage *
284 stage_for_port(struct opdl_queue *q, unsigned int i)
286 if (q->q_pos == OPDL_Q_POS_START || q->q_pos == OPDL_Q_POS_MIDDLE)
287 return q->ports[i]->enq_stage_inst;
289 return q->ports[i]->deq_stage_inst;
292 static int opdl_add_deps(struct opdl_evdev *device,
298 struct opdl_ring *ring;
299 struct opdl_queue *queue = &device->queue[q_id];
300 struct opdl_queue *queue_deps = &device->queue[deps_q_id];
301 struct opdl_stage *dep_stages[OPDL_PORTS_MAX];
303 /* sanity check that all stages are for same opdl ring */
304 for (i = 0; i < queue->nb_ports; i++) {
305 struct opdl_ring *r =
306 opdl_stage_get_opdl_ring(stage_for_port(queue, i));
307 for (j = 0; j < queue_deps->nb_ports; j++) {
308 struct opdl_ring *rj =
309 opdl_stage_get_opdl_ring(
310 stage_for_port(queue_deps, j));
312 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
313 "Stages and dependents"
314 " are not for same opdl ring",
315 opdl_pmd_dev_id(device));
317 for (k = 0; k < device->nb_opdls; k++) {
318 opdl_ring_dump(device->opdl[k],
326 /* Gather all stages instance in deps */
327 for (i = 0; i < queue_deps->nb_ports; i++)
328 dep_stages[i] = stage_for_port(queue_deps, i);
331 /* Add all deps for each port->stage_inst in this queue */
332 for (i = 0; i < queue->nb_ports; i++) {
334 ring = opdl_stage_get_opdl_ring(stage_for_port(queue, i));
336 status = opdl_stage_deps_add(ring,
337 stage_for_port(queue, i),
338 queue->ports[i]->num_instance,
339 queue->ports[i]->instance_id,
341 queue_deps->nb_ports);
350 opdl_add_event_handlers(struct rte_eventdev *dev)
354 struct opdl_evdev *device = opdl_pmd_priv(dev);
357 for (i = 0; i < device->max_port_nb; i++) {
359 struct opdl_port *port = &device->ports[i];
361 if (port->configured) {
362 if (port->p_type == OPDL_PURE_RX_PORT) {
363 port->enq = opdl_rx_enqueue;
364 port->deq = opdl_tx_error_dequeue;
366 } else if (port->p_type == OPDL_PURE_TX_PORT) {
368 port->enq = opdl_rx_error_enqueue;
370 if (port->num_instance == 1)
372 opdl_tx_dequeue_single_thread;
374 port->deq = opdl_tx_dequeue_multi_inst;
376 } else if (port->p_type == OPDL_REGULAR_PORT) {
378 port->enq = opdl_disclaim;
379 port->deq = opdl_claim;
381 } else if (port->p_type == OPDL_ASYNC_PORT) {
383 port->enq = opdl_rx_enqueue;
385 /* Always single instance */
386 port->deq = opdl_tx_dequeue_single_thread;
388 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
389 "port:[%u] has invalid port type - ",
390 opdl_pmd_dev_id(port->opdl),
395 port->initialized = 1;
400 fprintf(stdout, "Success - enqueue/dequeue handler(s) added\n");
405 build_all_dependencies(struct rte_eventdev *dev)
410 struct opdl_evdev *device = opdl_pmd_priv(dev);
412 uint8_t start_qid = 0;
414 for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) {
415 struct opdl_queue *queue = &device->queue[i];
416 if (!queue->initialized)
419 if (queue->q_pos == OPDL_Q_POS_START) {
424 if (queue->q_pos == OPDL_Q_POS_MIDDLE) {
425 err = opdl_add_deps(device, i, i-1);
427 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
428 "dependency addition for queue:[%u] - FAILED",
430 queue->external_qid);
435 if (queue->q_pos == OPDL_Q_POS_END) {
436 /* Add this dependency */
437 err = opdl_add_deps(device, i, i-1);
439 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
440 "dependency addition for queue:[%u] - FAILED",
442 queue->external_qid);
445 /* Add dependency for rx on tx */
446 err = opdl_add_deps(device, start_qid, i);
448 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
449 "dependency addition for queue:[%u] - FAILED",
451 queue->external_qid);
458 fprintf(stdout, "Success - dependencies built\n");
463 check_queues_linked(struct rte_eventdev *dev)
468 struct opdl_evdev *device = opdl_pmd_priv(dev);
471 for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) {
472 struct opdl_queue *queue = &device->queue[i];
474 if (!queue->initialized)
477 if (queue->external_qid == OPDL_INVALID_QID)
480 if (queue->nb_ports == 0) {
481 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
482 "queue:[%u] has no associated ports",
490 if ((i - nb_iq) != device->max_queue_nb) {
491 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
492 "%u queues counted but should be %u",
495 device->max_queue_nb);
504 destroy_queues_and_rings(struct rte_eventdev *dev)
506 struct opdl_evdev *device = opdl_pmd_priv(dev);
509 for (i = 0; i < device->nb_opdls; i++) {
511 opdl_ring_free(device->opdl[i]);
514 memset(&device->queue,
516 sizeof(struct opdl_queue)
517 * RTE_EVENT_MAX_QUEUES_PER_DEV);
520 #define OPDL_ID(d)(d->nb_opdls - 1)
522 static __rte_always_inline void
523 initialise_queue(struct opdl_evdev *device,
527 struct opdl_queue *queue = &device->queue[device->nb_queues];
530 queue->q_type = OPDL_Q_TYPE_ORDERED;
531 queue->external_qid = OPDL_INVALID_QID;
533 queue->q_type = device->q_md[i].type;
534 queue->external_qid = device->q_md[i].ext_id;
535 /* Add ex->in for queues setup */
536 device->q_map_ex_to_in[queue->external_qid] = device->nb_queues;
538 queue->opdl_id = OPDL_ID(device);
541 queue->configured = 1;
547 static __rte_always_inline int
548 create_opdl(struct opdl_evdev *device)
552 char name[RTE_MEMZONE_NAMESIZE];
554 snprintf(name, RTE_MEMZONE_NAMESIZE,
555 "%s_%u", device->service_name, device->nb_opdls);
557 device->opdl[device->nb_opdls] =
558 opdl_ring_create(name,
559 device->nb_events_limit,
560 sizeof(struct rte_event),
561 device->max_port_nb * 2,
564 if (!device->opdl[device->nb_opdls]) {
565 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
566 "opdl ring %u creation - FAILED",
567 opdl_pmd_dev_id(device),
576 static __rte_always_inline int
577 create_link_opdl(struct opdl_evdev *device, uint32_t index)
582 if (device->q_md[index + 1].type !=
583 OPDL_Q_TYPE_SINGLE_LINK) {
585 /* async queue with regular
589 /* create a new opdl ring */
590 err = create_opdl(device);
593 * dummy queue for new opdl
595 initialise_queue(device,
602 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
603 "queue %u, two consecutive"
604 " SINGLE_LINK queues, not allowed",
605 opdl_pmd_dev_id(device),
614 create_queues_and_rings(struct rte_eventdev *dev)
618 struct opdl_evdev *device = opdl_pmd_priv(dev);
620 device->nb_queues = 0;
622 if (device->nb_ports != device->max_port_nb) {
623 PMD_DRV_LOG(ERR, "Number ports setup:%u NOT EQUAL to max port"
624 " number:%u for this device",
626 device->max_port_nb);
631 /* We will have at least one opdl so create it now */
632 err = create_opdl(device);
637 /* Create 1st "dummy" queue */
638 initialise_queue(device,
643 for (i = 0; i < device->nb_q_md; i++) {
646 if (!device->q_md[i].setup) {
648 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
649 "queue meta data slot %u"
650 " not setup - FAILING",
655 } else if (device->q_md[i].type !=
656 OPDL_Q_TYPE_SINGLE_LINK) {
658 if (!device->q_md[i + 1].setup) {
659 /* Create a simple ORDERED/ATOMIC
662 initialise_queue(device,
667 /* Create a simple ORDERED/ATOMIC
668 * queue in the middle
670 initialise_queue(device,
674 } else if (device->q_md[i].type ==
675 OPDL_Q_TYPE_SINGLE_LINK) {
677 /* create last queue for this opdl */
678 initialise_queue(device,
682 err = create_link_opdl(device, i);
692 destroy_queues_and_rings(dev);
699 initialise_all_other_ports(struct rte_eventdev *dev)
702 struct opdl_stage *stage_inst = NULL;
704 struct opdl_evdev *device = opdl_pmd_priv(dev);
707 for (i = 0; i < device->nb_ports; i++) {
708 struct opdl_port *port = &device->ports[i];
709 struct opdl_queue *queue = &device->queue[port->queue_id];
711 if (port->queue_id == 0) {
713 } else if (queue->q_type != OPDL_Q_TYPE_SINGLE_LINK) {
715 if (queue->q_pos == OPDL_Q_POS_MIDDLE) {
717 /* Regular port with claim/disclaim */
718 stage_inst = opdl_stage_add(
719 device->opdl[queue->opdl_id],
722 port->deq_stage_inst = stage_inst;
723 port->enq_stage_inst = stage_inst;
725 if (queue->q_type == OPDL_Q_TYPE_ATOMIC)
726 port->atomic_claim = true;
728 port->atomic_claim = false;
730 port->p_type = OPDL_REGULAR_PORT;
732 /* Add the port to the queue array of ports */
733 queue->ports[queue->nb_ports] = port;
734 port->instance_id = queue->nb_ports;
736 opdl_stage_set_queue_id(stage_inst,
739 } else if (queue->q_pos == OPDL_Q_POS_END) {
742 stage_inst = opdl_stage_add(
743 device->opdl[queue->opdl_id],
746 port->deq_stage_inst = stage_inst;
747 port->enq_stage_inst = NULL;
748 port->p_type = OPDL_PURE_TX_PORT;
750 /* Add the port to the queue array of ports */
751 queue->ports[queue->nb_ports] = port;
752 port->instance_id = queue->nb_ports;
756 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
757 "port %u:, linked incorrectly"
758 " to a q_pos START/INVALID %u",
759 opdl_pmd_dev_id(port->opdl),
766 } else if (queue->q_type == OPDL_Q_TYPE_SINGLE_LINK) {
768 port->p_type = OPDL_ASYNC_PORT;
771 stage_inst = opdl_stage_add(
772 device->opdl[queue->opdl_id],
774 false); /* First stage */
775 port->deq_stage_inst = stage_inst;
777 /* Add the port to the queue array of ports */
778 queue->ports[queue->nb_ports] = port;
779 port->instance_id = queue->nb_ports;
782 if (queue->nb_ports > 1) {
783 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
784 "queue %u:, setup as SINGLE_LINK"
785 " but has more than one port linked",
786 opdl_pmd_dev_id(port->opdl),
787 queue->external_qid);
792 /* -- single instance rx for next opdl -- */
794 device->q_map_ex_to_in[queue->external_qid] + 1;
795 if (next_qid < RTE_EVENT_MAX_QUEUES_PER_DEV &&
796 device->queue[next_qid].configured) {
798 /* Remap the queue */
799 queue = &device->queue[next_qid];
801 stage_inst = opdl_stage_add(
802 device->opdl[queue->opdl_id],
805 port->enq_stage_inst = stage_inst;
807 /* Add the port to the queue array of ports */
808 queue->ports[queue->nb_ports] = port;
809 port->instance_id = queue->nb_ports;
811 if (queue->nb_ports > 1) {
812 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
813 "dummy queue %u: for "
815 "SINGLE_LINK but has more "
816 "than one port linked",
817 opdl_pmd_dev_id(port->opdl),
823 /* Set this queue to initialized as it is never
824 * referenced by any ports
826 queue->initialized = 1;
831 /* Now that all ports are initialised we need to
832 * setup the last bit of stage md
835 for (i = 0; i < device->nb_ports; i++) {
836 struct opdl_port *port = &device->ports[i];
837 struct opdl_queue *queue =
838 &device->queue[port->queue_id];
840 if (port->configured &&
841 (port->queue_id != OPDL_INVALID_QID)) {
842 if (queue->nb_ports == 0) {
843 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
844 "queue:[%u] has no ports"
846 opdl_pmd_dev_id(port->opdl),
852 port->num_instance = queue->nb_ports;
853 port->initialized = 1;
854 queue->initialized = 1;
856 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
857 "Port:[%u] not configured invalid"
858 " queue configuration",
859 opdl_pmd_dev_id(port->opdl),
870 initialise_queue_zero_ports(struct rte_eventdev *dev)
874 struct opdl_stage *stage_inst = NULL;
875 struct opdl_queue *queue = NULL;
877 struct opdl_evdev *device = opdl_pmd_priv(dev);
879 /* Assign queue zero and figure out how many Q0 ports we have */
881 for (i = 0; i < device->nb_ports; i++) {
882 struct opdl_port *port = &device->ports[i];
883 if (port->queue_id == OPDL_INVALID_QID) {
885 port->external_qid = OPDL_INVALID_QID;
886 port->p_type = OPDL_PURE_RX_PORT;
891 /* Create the stage */
892 stage_inst = opdl_stage_add(device->opdl[0],
893 (mt_rx > 1 ? true : false),
897 /* Assign the new created input stage to all relevant ports */
898 for (i = 0; i < device->nb_ports; i++) {
899 struct opdl_port *port = &device->ports[i];
900 if (port->queue_id == 0) {
901 queue = &device->queue[port->queue_id];
902 port->enq_stage_inst = stage_inst;
903 port->deq_stage_inst = NULL;
904 port->configured = 1;
905 port->initialized = 1;
907 queue->ports[queue->nb_ports] = port;
908 port->instance_id = queue->nb_ports;
919 assign_internal_queue_ids(struct rte_eventdev *dev)
922 struct opdl_evdev *device = opdl_pmd_priv(dev);
925 for (i = 0; i < device->nb_ports; i++) {
926 struct opdl_port *port = &device->ports[i];
927 if (port->external_qid != OPDL_INVALID_QID) {
929 device->q_map_ex_to_in[port->external_qid];
931 /* Now do the external_qid of the next queue */
932 struct opdl_queue *queue =
933 &device->queue[port->queue_id];
934 if (queue->q_pos == OPDL_Q_POS_END)
935 port->next_external_qid =
936 device->queue[port->queue_id + 2].external_qid;
938 port->next_external_qid =
939 device->queue[port->queue_id + 1].external_qid;