2 * SPDX-License-Identifier: BSD-3-Clause
3 * Copyright(c) 2010-2014 Intel Corporation
9 #include <rte_bus_vdev.h>
10 #include <rte_errno.h>
11 #include <rte_cycles.h>
12 #include <rte_memzone.h>
14 #include "opdl_evdev.h"
15 #include "opdl_ring.h"
19 static __rte_always_inline uint32_t
20 enqueue_check(struct opdl_port *p,
21 const struct rte_event ev[],
27 if (p->opdl->do_validation) {
29 for (i = 0; i < num; i++) {
30 if (ev[i].queue_id != p->next_external_qid) {
31 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
32 "ERROR - port:[%u] - event wants"
33 " to enq to q_id[%u],"
34 " but should be [%u]",
35 opdl_pmd_dev_id(p->opdl),
38 p->next_external_qid);
45 if (p->p_type == OPDL_PURE_RX_PORT ||
46 p->p_type == OPDL_ASYNC_PORT) {
49 p->port_stat[claim_pkts_requested] += num;
50 p->port_stat[claim_pkts_granted] += num_events;
51 p->port_stat[claim_non_empty]++;
52 p->start_cycles = rte_rdtsc();
54 p->port_stat[claim_empty]++;
58 if (p->start_cycles) {
59 uint64_t end_cycles = rte_rdtsc();
60 p->port_stat[total_cycles] +=
61 end_cycles - p->start_cycles;
66 ev[0].queue_id != p->next_external_qid) {
75 static __rte_always_inline void
76 update_on_dequeue(struct opdl_port *p,
77 struct rte_event ev[],
81 if (p->opdl->do_validation) {
83 for (i = 0; i < num; i++)
85 p->opdl->queue[p->queue_id].external_qid;
89 p->port_stat[claim_pkts_requested] += num;
90 p->port_stat[claim_pkts_granted] += num_events;
91 p->port_stat[claim_non_empty]++;
92 p->start_cycles = rte_rdtsc();
94 p->port_stat[claim_empty]++;
100 p->opdl->queue[p->queue_id].external_qid;
112 opdl_rx_error_enqueue(struct opdl_port *p,
113 const struct rte_event ev[],
128 * This function handles enqueue for a single input stage_inst with
129 * threadsafe disabled or enabled. eg 1 thread using a stage_inst or
130 * multiple threads sharing a stage_inst
134 opdl_rx_enqueue(struct opdl_port *p,
135 const struct rte_event ev[],
138 uint16_t enqueued = 0;
140 enqueued = opdl_ring_input(opdl_stage_get_opdl_ring(p->enq_stage_inst),
144 if (!enqueue_check(p, ev, num, enqueued))
160 opdl_tx_error_dequeue(struct opdl_port *p,
161 struct rte_event ev[],
174 * TX single threaded claim
176 * This function handles dequeue for a single worker stage_inst with
177 * threadsafe disabled. eg 1 thread using an stage_inst
181 opdl_tx_dequeue_single_thread(struct opdl_port *p,
182 struct rte_event ev[],
187 struct opdl_ring *ring;
189 ring = opdl_stage_get_opdl_ring(p->deq_stage_inst);
191 returned = opdl_ring_copy_to_burst(ring,
197 update_on_dequeue(p, ev, num, returned);
203 * TX multi threaded claim
205 * This function handles dequeue for multiple worker stage_inst with
206 * threadsafe disabled. eg multiple stage_inst each with its own instance
210 opdl_tx_dequeue_multi_inst(struct opdl_port *p,
211 struct rte_event ev[],
214 uint32_t num_events = 0;
216 num_events = opdl_stage_claim(p->deq_stage_inst,
223 update_on_dequeue(p, ev, num, num_events);
225 return opdl_stage_disclaim(p->deq_stage_inst, num_events, false);
230 * Worker thread claim
235 opdl_claim(struct opdl_port *p, struct rte_event ev[], uint16_t num)
237 uint32_t num_events = 0;
239 if (unlikely(num > MAX_OPDL_CONS_Q_DEPTH)) {
240 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
241 "Attempt to dequeue num of events larger than port (%d) max",
242 opdl_pmd_dev_id(p->opdl),
249 num_events = opdl_stage_claim(p->deq_stage_inst,
257 update_on_dequeue(p, ev, num, num_events);
263 * Worker thread disclaim
267 opdl_disclaim(struct opdl_port *p, const struct rte_event ev[], uint16_t num)
269 uint16_t enqueued = 0;
273 for (i = 0; i < num; i++)
274 opdl_ring_cas_slot(p->enq_stage_inst, &ev[i],
277 enqueued = opdl_stage_disclaim(p->enq_stage_inst,
281 return enqueue_check(p, ev, num, enqueued);
284 static __rte_always_inline struct opdl_stage *
285 stage_for_port(struct opdl_queue *q, unsigned int i)
287 if (q->q_pos == OPDL_Q_POS_START || q->q_pos == OPDL_Q_POS_MIDDLE)
288 return q->ports[i]->enq_stage_inst;
290 return q->ports[i]->deq_stage_inst;
293 static int opdl_add_deps(struct opdl_evdev *device,
299 struct opdl_ring *ring;
300 struct opdl_queue *queue = &device->queue[q_id];
301 struct opdl_queue *queue_deps = &device->queue[deps_q_id];
302 struct opdl_stage *dep_stages[OPDL_PORTS_MAX];
304 /* sanity check that all stages are for same opdl ring */
305 for (i = 0; i < queue->nb_ports; i++) {
306 struct opdl_ring *r =
307 opdl_stage_get_opdl_ring(stage_for_port(queue, i));
308 for (j = 0; j < queue_deps->nb_ports; j++) {
309 struct opdl_ring *rj =
310 opdl_stage_get_opdl_ring(
311 stage_for_port(queue_deps, j));
313 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
314 "Stages and dependents"
315 " are not for same opdl ring",
316 opdl_pmd_dev_id(device));
318 for (k = 0; k < device->nb_opdls; k++) {
319 opdl_ring_dump(device->opdl[k],
327 /* Gather all stages instance in deps */
328 for (i = 0; i < queue_deps->nb_ports; i++)
329 dep_stages[i] = stage_for_port(queue_deps, i);
332 /* Add all deps for each port->stage_inst in this queue */
333 for (i = 0; i < queue->nb_ports; i++) {
335 ring = opdl_stage_get_opdl_ring(stage_for_port(queue, i));
337 status = opdl_stage_deps_add(ring,
338 stage_for_port(queue, i),
339 queue->ports[i]->num_instance,
340 queue->ports[i]->instance_id,
342 queue_deps->nb_ports);
351 opdl_add_event_handlers(struct rte_eventdev *dev)
355 struct opdl_evdev *device = opdl_pmd_priv(dev);
358 for (i = 0; i < device->max_port_nb; i++) {
360 struct opdl_port *port = &device->ports[i];
362 if (port->configured) {
363 if (port->p_type == OPDL_PURE_RX_PORT) {
364 port->enq = opdl_rx_enqueue;
365 port->deq = opdl_tx_error_dequeue;
367 } else if (port->p_type == OPDL_PURE_TX_PORT) {
369 port->enq = opdl_rx_error_enqueue;
371 if (port->num_instance == 1)
373 opdl_tx_dequeue_single_thread;
375 port->deq = opdl_tx_dequeue_multi_inst;
377 } else if (port->p_type == OPDL_REGULAR_PORT) {
379 port->enq = opdl_disclaim;
380 port->deq = opdl_claim;
382 } else if (port->p_type == OPDL_ASYNC_PORT) {
384 port->enq = opdl_rx_enqueue;
386 /* Always single instance */
387 port->deq = opdl_tx_dequeue_single_thread;
389 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
390 "port:[%u] has invalid port type - ",
391 opdl_pmd_dev_id(port->opdl),
396 port->initialized = 1;
401 fprintf(stdout, "Success - enqueue/dequeue handler(s) added\n");
406 build_all_dependencies(struct rte_eventdev *dev)
411 struct opdl_evdev *device = opdl_pmd_priv(dev);
413 uint8_t start_qid = 0;
415 for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) {
416 struct opdl_queue *queue = &device->queue[i];
417 if (!queue->initialized)
420 if (queue->q_pos == OPDL_Q_POS_START) {
425 if (queue->q_pos == OPDL_Q_POS_MIDDLE) {
426 err = opdl_add_deps(device, i, i-1);
428 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
429 "dependency addition for queue:[%u] - FAILED",
431 queue->external_qid);
436 if (queue->q_pos == OPDL_Q_POS_END) {
437 /* Add this dependency */
438 err = opdl_add_deps(device, i, i-1);
440 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
441 "dependency addition for queue:[%u] - FAILED",
443 queue->external_qid);
446 /* Add dependency for rx on tx */
447 err = opdl_add_deps(device, start_qid, i);
449 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
450 "dependency addition for queue:[%u] - FAILED",
452 queue->external_qid);
459 fprintf(stdout, "Success - dependencies built\n");
464 check_queues_linked(struct rte_eventdev *dev)
469 struct opdl_evdev *device = opdl_pmd_priv(dev);
472 for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) {
473 struct opdl_queue *queue = &device->queue[i];
475 if (!queue->initialized)
478 if (queue->external_qid == OPDL_INVALID_QID)
481 if (queue->nb_ports == 0) {
482 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
483 "queue:[%u] has no associated ports",
491 if ((i - nb_iq) != device->max_queue_nb) {
492 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
493 "%u queues counted but should be %u",
496 device->max_queue_nb);
505 destroy_queues_and_rings(struct rte_eventdev *dev)
507 struct opdl_evdev *device = opdl_pmd_priv(dev);
510 for (i = 0; i < device->nb_opdls; i++) {
512 opdl_ring_free(device->opdl[i]);
515 memset(&device->queue,
517 sizeof(struct opdl_queue)
518 * RTE_EVENT_MAX_QUEUES_PER_DEV);
521 #define OPDL_ID(d)(d->nb_opdls - 1)
523 static __rte_always_inline void
524 initialise_queue(struct opdl_evdev *device,
528 struct opdl_queue *queue = &device->queue[device->nb_queues];
531 queue->q_type = OPDL_Q_TYPE_ORDERED;
532 queue->external_qid = OPDL_INVALID_QID;
534 queue->q_type = device->q_md[i].type;
535 queue->external_qid = device->q_md[i].ext_id;
536 /* Add ex->in for queues setup */
537 device->q_map_ex_to_in[queue->external_qid] = device->nb_queues;
539 queue->opdl_id = OPDL_ID(device);
542 queue->configured = 1;
548 static __rte_always_inline int
549 create_opdl(struct opdl_evdev *device)
553 char name[RTE_MEMZONE_NAMESIZE];
555 snprintf(name, RTE_MEMZONE_NAMESIZE,
556 "%s_%u", device->service_name, device->nb_opdls);
558 device->opdl[device->nb_opdls] =
559 opdl_ring_create(name,
560 device->nb_events_limit,
561 sizeof(struct rte_event),
562 device->max_port_nb * 2,
565 if (!device->opdl[device->nb_opdls]) {
566 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
567 "opdl ring %u creation - FAILED",
568 opdl_pmd_dev_id(device),
577 static __rte_always_inline int
578 create_link_opdl(struct opdl_evdev *device, uint32_t index)
583 if (device->q_md[index + 1].type !=
584 OPDL_Q_TYPE_SINGLE_LINK) {
586 /* async queue with regular
590 /* create a new opdl ring */
591 err = create_opdl(device);
594 * dummy queue for new opdl
596 initialise_queue(device,
603 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
604 "queue %u, two consecutive"
605 " SINGLE_LINK queues, not allowed",
606 opdl_pmd_dev_id(device),
615 create_queues_and_rings(struct rte_eventdev *dev)
619 struct opdl_evdev *device = opdl_pmd_priv(dev);
621 device->nb_queues = 0;
623 if (device->nb_ports != device->max_port_nb) {
624 PMD_DRV_LOG(ERR, "Number ports setup:%u NOT EQUAL to max port"
625 " number:%u for this device",
627 device->max_port_nb);
632 /* We will have at least one opdl so create it now */
633 err = create_opdl(device);
638 /* Create 1st "dummy" queue */
639 initialise_queue(device,
644 for (i = 0; i < device->nb_q_md; i++) {
647 if (!device->q_md[i].setup) {
649 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
650 "queue meta data slot %u"
651 " not setup - FAILING",
656 } else if (device->q_md[i].type !=
657 OPDL_Q_TYPE_SINGLE_LINK) {
659 if (!device->q_md[i + 1].setup) {
660 /* Create a simple ORDERED/ATOMIC
663 initialise_queue(device,
668 /* Create a simple ORDERED/ATOMIC
669 * queue in the middle
671 initialise_queue(device,
675 } else if (device->q_md[i].type ==
676 OPDL_Q_TYPE_SINGLE_LINK) {
678 /* create last queue for this opdl */
679 initialise_queue(device,
683 err = create_link_opdl(device, i);
693 destroy_queues_and_rings(dev);
700 initialise_all_other_ports(struct rte_eventdev *dev)
703 struct opdl_stage *stage_inst = NULL;
705 struct opdl_evdev *device = opdl_pmd_priv(dev);
708 for (i = 0; i < device->nb_ports; i++) {
709 struct opdl_port *port = &device->ports[i];
710 struct opdl_queue *queue = &device->queue[port->queue_id];
712 if (port->queue_id == 0) {
714 } else if (queue->q_type != OPDL_Q_TYPE_SINGLE_LINK) {
716 if (queue->q_pos == OPDL_Q_POS_MIDDLE) {
718 /* Regular port with claim/disclaim */
719 stage_inst = opdl_stage_add(
720 device->opdl[queue->opdl_id],
723 port->deq_stage_inst = stage_inst;
724 port->enq_stage_inst = stage_inst;
726 if (queue->q_type == OPDL_Q_TYPE_ATOMIC)
727 port->atomic_claim = true;
729 port->atomic_claim = false;
731 port->p_type = OPDL_REGULAR_PORT;
733 /* Add the port to the queue array of ports */
734 queue->ports[queue->nb_ports] = port;
735 port->instance_id = queue->nb_ports;
737 } else if (queue->q_pos == OPDL_Q_POS_END) {
740 stage_inst = opdl_stage_add(
741 device->opdl[queue->opdl_id],
744 port->deq_stage_inst = stage_inst;
745 port->enq_stage_inst = NULL;
746 port->p_type = OPDL_PURE_TX_PORT;
748 /* Add the port to the queue array of ports */
749 queue->ports[queue->nb_ports] = port;
750 port->instance_id = queue->nb_ports;
754 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
755 "port %u:, linked incorrectly"
756 " to a q_pos START/INVALID %u",
757 opdl_pmd_dev_id(port->opdl),
764 } else if (queue->q_type == OPDL_Q_TYPE_SINGLE_LINK) {
766 port->p_type = OPDL_ASYNC_PORT;
769 stage_inst = opdl_stage_add(
770 device->opdl[queue->opdl_id],
772 false); /* First stage */
773 port->deq_stage_inst = stage_inst;
775 /* Add the port to the queue array of ports */
776 queue->ports[queue->nb_ports] = port;
777 port->instance_id = queue->nb_ports;
780 if (queue->nb_ports > 1) {
781 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
782 "queue %u:, setup as SINGLE_LINK"
783 " but has more than one port linked",
784 opdl_pmd_dev_id(port->opdl),
785 queue->external_qid);
790 /* -- single instance rx for next opdl -- */
792 device->q_map_ex_to_in[queue->external_qid] + 1;
793 if (next_qid < RTE_EVENT_MAX_QUEUES_PER_DEV &&
794 device->queue[next_qid].configured) {
796 /* Remap the queue */
797 queue = &device->queue[next_qid];
799 stage_inst = opdl_stage_add(
800 device->opdl[queue->opdl_id],
803 port->enq_stage_inst = stage_inst;
805 /* Add the port to the queue array of ports */
806 queue->ports[queue->nb_ports] = port;
807 port->instance_id = queue->nb_ports;
809 if (queue->nb_ports > 1) {
810 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
811 "dummy queue %u: for "
813 "SINGLE_LINK but has more "
814 "than one port linked",
815 opdl_pmd_dev_id(port->opdl),
821 /* Set this queue to initialized as it is never
822 * referenced by any ports
824 queue->initialized = 1;
829 /* Now that all ports are initialised we need to
830 * setup the last bit of stage md
833 for (i = 0; i < device->nb_ports; i++) {
834 struct opdl_port *port = &device->ports[i];
835 struct opdl_queue *queue =
836 &device->queue[port->queue_id];
838 if (port->configured &&
839 (port->queue_id != OPDL_INVALID_QID)) {
840 if (queue->nb_ports == 0) {
841 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
842 "queue:[%u] has no ports"
844 opdl_pmd_dev_id(port->opdl),
850 port->num_instance = queue->nb_ports;
851 port->initialized = 1;
852 queue->initialized = 1;
854 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
855 "Port:[%u] not configured invalid"
856 " queue configuration",
857 opdl_pmd_dev_id(port->opdl),
868 initialise_queue_zero_ports(struct rte_eventdev *dev)
872 struct opdl_stage *stage_inst = NULL;
873 struct opdl_queue *queue = NULL;
875 struct opdl_evdev *device = opdl_pmd_priv(dev);
877 /* Assign queue zero and figure out how many Q0 ports we have */
879 for (i = 0; i < device->nb_ports; i++) {
880 struct opdl_port *port = &device->ports[i];
881 if (port->queue_id == OPDL_INVALID_QID) {
883 port->external_qid = OPDL_INVALID_QID;
884 port->p_type = OPDL_PURE_RX_PORT;
889 /* Create the stage */
890 stage_inst = opdl_stage_add(device->opdl[0],
891 (mt_rx > 1 ? true : false),
895 /* Assign the new created input stage to all relevant ports */
896 for (i = 0; i < device->nb_ports; i++) {
897 struct opdl_port *port = &device->ports[i];
898 if (port->queue_id == 0) {
899 queue = &device->queue[port->queue_id];
900 port->enq_stage_inst = stage_inst;
901 port->deq_stage_inst = NULL;
902 port->configured = 1;
903 port->initialized = 1;
905 queue->ports[queue->nb_ports] = port;
906 port->instance_id = queue->nb_ports;
917 assign_internal_queue_ids(struct rte_eventdev *dev)
920 struct opdl_evdev *device = opdl_pmd_priv(dev);
923 for (i = 0; i < device->nb_ports; i++) {
924 struct opdl_port *port = &device->ports[i];
925 if (port->external_qid != OPDL_INVALID_QID) {
927 device->q_map_ex_to_in[port->external_qid];
929 /* Now do the external_qid of the next queue */
930 struct opdl_queue *queue =
931 &device->queue[port->queue_id];
932 if (queue->q_pos == OPDL_Q_POS_END)
933 port->next_external_qid =
934 device->queue[port->queue_id + 2].external_qid;
936 port->next_external_qid =
937 device->queue[port->queue_id + 1].external_qid;