event/opdl: rework loops to comply with dpdk style
[dpdk.git] / drivers / event / opdl / opdl_evdev_init.c
1 /*-
2  * SPDX-License-Identifier: BSD-3-Clause
3  * Copyright(c) 2010-2014 Intel Corporation
4  */
5
6 #include <inttypes.h>
7 #include <string.h>
8
9 #include <rte_bus_vdev.h>
10 #include <rte_errno.h>
11 #include <rte_cycles.h>
12 #include <rte_memzone.h>
13
14 #include "opdl_evdev.h"
15 #include "opdl_ring.h"
16 #include "opdl_log.h"
17
18
19 static __rte_always_inline uint32_t
20 enqueue_check(struct opdl_port *p,
21                 const struct rte_event ev[],
22                 uint16_t num,
23                 uint16_t num_events)
24 {
25         uint16_t i;
26
27         if (p->opdl->do_validation) {
28
29                 for (i = 0; i < num; i++) {
30                         if (ev[i].queue_id != p->next_external_qid) {
31                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
32                                              "ERROR - port:[%u] - event wants"
33                                              " to enq to q_id[%u],"
34                                              " but should be [%u]",
35                                              opdl_pmd_dev_id(p->opdl),
36                                              p->id,
37                                              ev[i].queue_id,
38                                              p->next_external_qid);
39                                 rte_errno = -EINVAL;
40                                 return 0;
41                         }
42                 }
43
44                 /* Stats */
45                 if (p->p_type == OPDL_PURE_RX_PORT ||
46                                 p->p_type == OPDL_ASYNC_PORT) {
47                         /* Stats */
48                         if (num_events) {
49                                 p->port_stat[claim_pkts_requested] += num;
50                                 p->port_stat[claim_pkts_granted] += num_events;
51                                 p->port_stat[claim_non_empty]++;
52                                 p->start_cycles = rte_rdtsc();
53                         } else {
54                                 p->port_stat[claim_empty]++;
55                                 p->start_cycles = 0;
56                         }
57                 } else {
58                         if (p->start_cycles) {
59                                 uint64_t end_cycles = rte_rdtsc();
60                                 p->port_stat[total_cycles] +=
61                                         end_cycles - p->start_cycles;
62                         }
63                 }
64         } else {
65                 if (num > 0 &&
66                                 ev[0].queue_id != p->next_external_qid) {
67                         rte_errno = -EINVAL;
68                         return 0;
69                 }
70         }
71
72         return num;
73 }
74
75 static __rte_always_inline void
76 update_on_dequeue(struct opdl_port *p,
77                 struct rte_event ev[],
78                 uint16_t num,
79                 uint16_t num_events)
80 {
81         if (p->opdl->do_validation) {
82                 int16_t i;
83                 for (i = 0; i < num; i++)
84                         ev[i].queue_id =
85                                 p->opdl->queue[p->queue_id].external_qid;
86
87                 /* Stats */
88                 if (num_events) {
89                         p->port_stat[claim_pkts_requested] += num;
90                         p->port_stat[claim_pkts_granted] += num_events;
91                         p->port_stat[claim_non_empty]++;
92                         p->start_cycles = rte_rdtsc();
93                 } else {
94                         p->port_stat[claim_empty]++;
95                         p->start_cycles = 0;
96                 }
97         } else {
98                 if (num > 0)
99                         ev[0].queue_id =
100                                 p->opdl->queue[p->queue_id].external_qid;
101         }
102 }
103
104
105 /*
106  * Error RX enqueue:
107  *
108  *
109  */
110
111 static uint16_t
112 opdl_rx_error_enqueue(struct opdl_port *p,
113                 const struct rte_event ev[],
114                 uint16_t num)
115 {
116         RTE_SET_USED(p);
117         RTE_SET_USED(ev);
118         RTE_SET_USED(num);
119
120         rte_errno = -ENOSPC;
121
122         return 0;
123 }
124
125 /*
126  * RX enqueue:
127  *
128  * This function handles enqueue for a single input stage_inst with
129  *      threadsafe disabled or enabled. eg 1 thread using a stage_inst or
130  *      multiple threads sharing a stage_inst
131  */
132
133 static uint16_t
134 opdl_rx_enqueue(struct opdl_port *p,
135                 const struct rte_event ev[],
136                 uint16_t num)
137 {
138         uint16_t enqueued = 0;
139
140         enqueued = opdl_ring_input(opdl_stage_get_opdl_ring(p->enq_stage_inst),
141                                    ev,
142                                    num,
143                                    false);
144         if (!enqueue_check(p, ev, num, enqueued))
145                 return 0;
146
147
148         if (enqueued < num)
149                 rte_errno = -ENOSPC;
150
151         return enqueued;
152 }
153
154 /*
155  * Error TX handler
156  *
157  */
158
159 static uint16_t
160 opdl_tx_error_dequeue(struct opdl_port *p,
161                 struct rte_event ev[],
162                 uint16_t num)
163 {
164         RTE_SET_USED(p);
165         RTE_SET_USED(ev);
166         RTE_SET_USED(num);
167
168         rte_errno = -ENOSPC;
169
170         return 0;
171 }
172
173 /*
174  * TX single threaded claim
175  *
176  * This function handles dequeue for a single worker stage_inst with
177  *      threadsafe disabled. eg 1 thread using an stage_inst
178  */
179
180 static uint16_t
181 opdl_tx_dequeue_single_thread(struct opdl_port *p,
182                         struct rte_event ev[],
183                         uint16_t num)
184 {
185         uint16_t returned;
186
187         struct opdl_ring  *ring;
188
189         ring = opdl_stage_get_opdl_ring(p->deq_stage_inst);
190
191         returned = opdl_ring_copy_to_burst(ring,
192                                            p->deq_stage_inst,
193                                            ev,
194                                            num,
195                                            false);
196
197         update_on_dequeue(p, ev, num, returned);
198
199         return returned;
200 }
201
202 /*
203  * TX multi threaded claim
204  *
205  * This function handles dequeue for multiple worker stage_inst with
206  *      threadsafe disabled. eg multiple stage_inst each with its own instance
207  */
208
209 static uint16_t
210 opdl_tx_dequeue_multi_inst(struct opdl_port *p,
211                         struct rte_event ev[],
212                         uint16_t num)
213 {
214         uint32_t num_events = 0;
215
216         num_events = opdl_stage_claim(p->deq_stage_inst,
217                                     (void *)ev,
218                                     num,
219                                     NULL,
220                                     false,
221                                     false);
222
223         update_on_dequeue(p, ev, num, num_events);
224
225         return opdl_stage_disclaim(p->deq_stage_inst, num_events, false);
226 }
227
228
229 /*
230  * Worker thread claim
231  *
232  */
233
234 static uint16_t
235 opdl_claim(struct opdl_port *p, struct rte_event ev[], uint16_t num)
236 {
237         uint32_t num_events = 0;
238
239         if (unlikely(num > MAX_OPDL_CONS_Q_DEPTH)) {
240                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
241                              "Attempt to dequeue num of events larger than port (%d) max",
242                              opdl_pmd_dev_id(p->opdl),
243                              p->id);
244                 rte_errno = -EINVAL;
245                 return 0;
246         }
247
248
249         num_events = opdl_stage_claim(p->deq_stage_inst,
250                         (void *)ev,
251                         num,
252                         NULL,
253                         false,
254                         p->atomic_claim);
255
256
257         update_on_dequeue(p, ev, num, num_events);
258
259         return num_events;
260 }
261
262 /*
263  * Worker thread disclaim
264  */
265
266 static uint16_t
267 opdl_disclaim(struct opdl_port *p, const struct rte_event ev[], uint16_t num)
268 {
269         uint16_t enqueued = 0;
270
271         uint32_t i = 0;
272
273         for (i = 0; i < num; i++)
274                 opdl_ring_cas_slot(p->enq_stage_inst, &ev[i],
275                                 i, p->atomic_claim);
276
277         enqueued = opdl_stage_disclaim(p->enq_stage_inst,
278                                        num,
279                                        false);
280
281         return enqueue_check(p, ev, num, enqueued);
282 }
283
284 static __rte_always_inline struct opdl_stage *
285 stage_for_port(struct opdl_queue *q, unsigned int i)
286 {
287         if (q->q_pos == OPDL_Q_POS_START || q->q_pos == OPDL_Q_POS_MIDDLE)
288                 return q->ports[i]->enq_stage_inst;
289         else
290                 return q->ports[i]->deq_stage_inst;
291 }
292
293 static int opdl_add_deps(struct opdl_evdev *device,
294                          int q_id,
295                          int deps_q_id)
296 {
297         unsigned int i, j;
298         int status;
299         struct opdl_ring  *ring;
300         struct opdl_queue *queue = &device->queue[q_id];
301         struct opdl_queue *queue_deps = &device->queue[deps_q_id];
302         struct opdl_stage *dep_stages[OPDL_PORTS_MAX];
303
304         /* sanity check that all stages are for same opdl ring */
305         for (i = 0; i < queue->nb_ports; i++) {
306                 struct opdl_ring *r =
307                         opdl_stage_get_opdl_ring(stage_for_port(queue, i));
308                 for (j = 0; j < queue_deps->nb_ports; j++) {
309                         struct opdl_ring *rj =
310                                 opdl_stage_get_opdl_ring(
311                                                 stage_for_port(queue_deps, j));
312                         if (r != rj) {
313                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
314                                              "Stages and dependents"
315                                              " are not for same opdl ring",
316                                              opdl_pmd_dev_id(device));
317                                 uint32_t k;
318                                 for (k = 0; k < device->nb_opdls; k++) {
319                                         opdl_ring_dump(device->opdl[k],
320                                                         stdout);
321                                 }
322                                 return -EINVAL;
323                         }
324                 }
325         }
326
327         /* Gather all stages instance in deps */
328         for (i = 0; i < queue_deps->nb_ports; i++)
329                 dep_stages[i] = stage_for_port(queue_deps, i);
330
331
332         /* Add all deps for each port->stage_inst in this queue */
333         for (i = 0; i < queue->nb_ports; i++) {
334
335                 ring = opdl_stage_get_opdl_ring(stage_for_port(queue, i));
336
337                 status = opdl_stage_deps_add(ring,
338                                 stage_for_port(queue, i),
339                                 queue->ports[i]->num_instance,
340                                 queue->ports[i]->instance_id,
341                                 dep_stages,
342                                 queue_deps->nb_ports);
343                 if (status < 0)
344                         return -EINVAL;
345         }
346
347         return 0;
348 }
349
350 int
351 opdl_add_event_handlers(struct rte_eventdev *dev)
352 {
353         int err = 0;
354
355         struct opdl_evdev *device = opdl_pmd_priv(dev);
356         unsigned int i;
357
358         for (i = 0; i < device->max_port_nb; i++) {
359
360                 struct opdl_port *port = &device->ports[i];
361
362                 if (port->configured) {
363                         if (port->p_type == OPDL_PURE_RX_PORT) {
364                                 port->enq = opdl_rx_enqueue;
365                                 port->deq = opdl_tx_error_dequeue;
366
367                         } else if (port->p_type == OPDL_PURE_TX_PORT) {
368
369                                 port->enq = opdl_rx_error_enqueue;
370
371                                 if (port->num_instance == 1)
372                                         port->deq =
373                                                 opdl_tx_dequeue_single_thread;
374                                 else
375                                         port->deq = opdl_tx_dequeue_multi_inst;
376
377                         } else if (port->p_type == OPDL_REGULAR_PORT) {
378
379                                 port->enq = opdl_disclaim;
380                                 port->deq = opdl_claim;
381
382                         } else if (port->p_type == OPDL_ASYNC_PORT) {
383
384                                 port->enq = opdl_rx_enqueue;
385
386                                 /* Always single instance */
387                                 port->deq = opdl_tx_dequeue_single_thread;
388                         } else {
389                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
390                                              "port:[%u] has invalid port type - ",
391                                              opdl_pmd_dev_id(port->opdl),
392                                              port->id);
393                                 err = -EINVAL;
394                                 break;
395                         }
396                         port->initialized = 1;
397                 }
398         }
399
400         if (!err)
401                 fprintf(stdout, "Success - enqueue/dequeue handler(s) added\n");
402         return err;
403 }
404
405 int
406 build_all_dependencies(struct rte_eventdev *dev)
407 {
408
409         int err = 0;
410         unsigned int i;
411         struct opdl_evdev *device = opdl_pmd_priv(dev);
412
413         uint8_t start_qid = 0;
414
415         for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) {
416                 struct opdl_queue *queue = &device->queue[i];
417                 if (!queue->initialized)
418                         break;
419
420                 if (queue->q_pos == OPDL_Q_POS_START) {
421                         start_qid = i;
422                         continue;
423                 }
424
425                 if (queue->q_pos == OPDL_Q_POS_MIDDLE) {
426                         err = opdl_add_deps(device, i, i-1);
427                         if (err < 0) {
428                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
429                                              "dependency addition for queue:[%u] - FAILED",
430                                              dev->data->dev_id,
431                                              queue->external_qid);
432                                 break;
433                         }
434                 }
435
436                 if (queue->q_pos == OPDL_Q_POS_END) {
437                         /* Add this dependency */
438                         err = opdl_add_deps(device, i, i-1);
439                         if (err < 0) {
440                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
441                                              "dependency addition for queue:[%u] - FAILED",
442                                              dev->data->dev_id,
443                                              queue->external_qid);
444                                 break;
445                         }
446                         /* Add dependency for rx on tx */
447                         err = opdl_add_deps(device, start_qid, i);
448                         if (err < 0) {
449                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
450                                              "dependency addition for queue:[%u] - FAILED",
451                                              dev->data->dev_id,
452                                              queue->external_qid);
453                                 break;
454                         }
455                 }
456         }
457
458         if (!err)
459                 fprintf(stdout, "Success - dependencies built\n");
460
461         return err;
462 }
463 int
464 check_queues_linked(struct rte_eventdev *dev)
465 {
466
467         int err = 0;
468         unsigned int i;
469         struct opdl_evdev *device = opdl_pmd_priv(dev);
470         uint32_t nb_iq = 0;
471
472         for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) {
473                 struct opdl_queue *queue = &device->queue[i];
474
475                 if (!queue->initialized)
476                         break;
477
478                 if (queue->external_qid == OPDL_INVALID_QID)
479                         nb_iq++;
480
481                 if (queue->nb_ports == 0) {
482                         PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
483                                      "queue:[%u] has no associated ports",
484                                      dev->data->dev_id,
485                                      i);
486                         err = -EINVAL;
487                         break;
488                 }
489         }
490         if (!err) {
491                 if ((i - nb_iq) != device->max_queue_nb) {
492                         PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
493                                      "%u queues counted but should be %u",
494                                      dev->data->dev_id,
495                                      i - nb_iq,
496                                      device->max_queue_nb);
497                         err = -1;
498                 }
499
500         }
501         return err;
502 }
503
504 void
505 destroy_queues_and_rings(struct rte_eventdev *dev)
506 {
507         struct opdl_evdev *device = opdl_pmd_priv(dev);
508         uint32_t i;
509
510         for (i = 0; i < device->nb_opdls; i++) {
511                 if (device->opdl[i])
512                         opdl_ring_free(device->opdl[i]);
513         }
514
515         memset(&device->queue,
516                         0,
517                         sizeof(struct opdl_queue)
518                         * RTE_EVENT_MAX_QUEUES_PER_DEV);
519 }
520
521 #define OPDL_ID(d)(d->nb_opdls - 1)
522
523 static __rte_always_inline void
524 initialise_queue(struct opdl_evdev *device,
525                 enum queue_pos pos,
526                 int32_t i)
527 {
528         struct opdl_queue *queue = &device->queue[device->nb_queues];
529
530         if (i == -1) {
531                 queue->q_type = OPDL_Q_TYPE_ORDERED;
532                 queue->external_qid = OPDL_INVALID_QID;
533         } else {
534                 queue->q_type = device->q_md[i].type;
535                 queue->external_qid = device->q_md[i].ext_id;
536                 /* Add ex->in for queues setup */
537                 device->q_map_ex_to_in[queue->external_qid] = device->nb_queues;
538         }
539         queue->opdl_id = OPDL_ID(device);
540         queue->q_pos = pos;
541         queue->nb_ports = 0;
542         queue->configured = 1;
543
544         device->nb_queues++;
545 }
546
547
548 static __rte_always_inline int
549 create_opdl(struct opdl_evdev *device)
550 {
551         int err = 0;
552
553         char name[RTE_MEMZONE_NAMESIZE];
554
555         snprintf(name, RTE_MEMZONE_NAMESIZE,
556                         "%s_%u", device->service_name, device->nb_opdls);
557
558         device->opdl[device->nb_opdls] =
559                 opdl_ring_create(name,
560                                 device->nb_events_limit,
561                                 sizeof(struct rte_event),
562                                 device->max_port_nb * 2,
563                                 device->socket);
564
565         if (!device->opdl[device->nb_opdls]) {
566                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
567                              "opdl ring %u creation - FAILED",
568                              opdl_pmd_dev_id(device),
569                              device->nb_opdls);
570                 err = -EINVAL;
571         } else {
572                 device->nb_opdls++;
573         }
574         return err;
575 }
576
577 static __rte_always_inline int
578 create_link_opdl(struct opdl_evdev *device, uint32_t index)
579 {
580
581         int err = 0;
582
583         if (device->q_md[index + 1].type !=
584                         OPDL_Q_TYPE_SINGLE_LINK) {
585
586                 /* async queue with regular
587                  * queue following it
588                  */
589
590                 /* create a new opdl ring */
591                 err = create_opdl(device);
592                 if (!err) {
593                         /* create an initial
594                          * dummy queue for new opdl
595                          */
596                         initialise_queue(device,
597                                         OPDL_Q_POS_START,
598                                         -1);
599                 } else {
600                         err = -EINVAL;
601                 }
602         } else {
603                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
604                              "queue %u, two consecutive"
605                              " SINGLE_LINK queues, not allowed",
606                              opdl_pmd_dev_id(device),
607                              index);
608                 err = -EINVAL;
609         }
610
611         return err;
612 }
613
614 int
615 create_queues_and_rings(struct rte_eventdev *dev)
616 {
617         int err = 0;
618
619         struct opdl_evdev *device = opdl_pmd_priv(dev);
620
621         device->nb_queues = 0;
622
623         if (device->nb_ports != device->max_port_nb) {
624                 PMD_DRV_LOG(ERR, "Number ports setup:%u NOT EQUAL to max port"
625                                 " number:%u for this device",
626                                 device->nb_ports,
627                                 device->max_port_nb);
628                 err = -1;
629         }
630
631         if (!err) {
632                 /* We will have at least one opdl so create it now */
633                 err = create_opdl(device);
634         }
635
636         if (!err) {
637
638                 /* Create 1st "dummy" queue */
639                 initialise_queue(device,
640                                  OPDL_Q_POS_START,
641                                  -1);
642
643                 uint32_t i;
644                 for (i = 0; i < device->nb_q_md; i++) {
645
646                         /* Check */
647                         if (!device->q_md[i].setup) {
648
649                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
650                                              "queue meta data slot %u"
651                                              " not setup - FAILING",
652                                              dev->data->dev_id,
653                                              i);
654                                 err = -EINVAL;
655                                 break;
656                         } else if (device->q_md[i].type !=
657                                         OPDL_Q_TYPE_SINGLE_LINK) {
658
659                                 if (!device->q_md[i + 1].setup) {
660                                         /* Create a simple ORDERED/ATOMIC
661                                          * queue at the end
662                                          */
663                                         initialise_queue(device,
664                                                         OPDL_Q_POS_END,
665                                                         i);
666
667                                 } else {
668                                         /* Create a simple ORDERED/ATOMIC
669                                          * queue in the middle
670                                          */
671                                         initialise_queue(device,
672                                                         OPDL_Q_POS_MIDDLE,
673                                                         i);
674                                 }
675                         } else if (device->q_md[i].type ==
676                                         OPDL_Q_TYPE_SINGLE_LINK) {
677
678                                 /* create last queue for this opdl */
679                                 initialise_queue(device,
680                                                 OPDL_Q_POS_END,
681                                                 i);
682
683                                 err = create_link_opdl(device, i);
684
685                                 if (err)
686                                         break;
687
688
689                         }
690                 }
691         }
692         if (err)
693                 destroy_queues_and_rings(dev);
694
695         return err;
696 }
697
698
699 int
700 initialise_all_other_ports(struct rte_eventdev *dev)
701 {
702         int err = 0;
703         struct opdl_stage *stage_inst = NULL;
704
705         struct opdl_evdev *device = opdl_pmd_priv(dev);
706
707         uint32_t i;
708         for (i = 0; i < device->nb_ports; i++) {
709                 struct opdl_port *port = &device->ports[i];
710                 struct opdl_queue *queue = &device->queue[port->queue_id];
711
712                 if (port->queue_id == 0) {
713                         continue;
714                 } else if (queue->q_type != OPDL_Q_TYPE_SINGLE_LINK) {
715
716                         if (queue->q_pos == OPDL_Q_POS_MIDDLE) {
717
718                                 /* Regular port with claim/disclaim */
719                                 stage_inst = opdl_stage_add(
720                                         device->opdl[queue->opdl_id],
721                                                 false,
722                                                 false);
723                                 port->deq_stage_inst = stage_inst;
724                                 port->enq_stage_inst = stage_inst;
725
726                                 if (queue->q_type == OPDL_Q_TYPE_ATOMIC)
727                                         port->atomic_claim = true;
728                                 else
729                                         port->atomic_claim = false;
730
731                                 port->p_type =  OPDL_REGULAR_PORT;
732
733                                 /* Add the port to the queue array of ports */
734                                 queue->ports[queue->nb_ports] = port;
735                                 port->instance_id = queue->nb_ports;
736                                 queue->nb_ports++;
737                         } else if (queue->q_pos == OPDL_Q_POS_END) {
738
739                                 /* tx port  */
740                                 stage_inst = opdl_stage_add(
741                                         device->opdl[queue->opdl_id],
742                                                 false,
743                                                 false);
744                                 port->deq_stage_inst = stage_inst;
745                                 port->enq_stage_inst = NULL;
746                                 port->p_type = OPDL_PURE_TX_PORT;
747
748                                 /* Add the port to the queue array of ports */
749                                 queue->ports[queue->nb_ports] = port;
750                                 port->instance_id = queue->nb_ports;
751                                 queue->nb_ports++;
752                         } else {
753
754                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
755                                              "port %u:, linked incorrectly"
756                                              " to a q_pos START/INVALID %u",
757                                              opdl_pmd_dev_id(port->opdl),
758                                              port->id,
759                                              queue->q_pos);
760                                 err = -EINVAL;
761                                 break;
762                         }
763
764                 } else if (queue->q_type == OPDL_Q_TYPE_SINGLE_LINK) {
765
766                         port->p_type = OPDL_ASYNC_PORT;
767
768                         /* -- tx -- */
769                         stage_inst = opdl_stage_add(
770                                 device->opdl[queue->opdl_id],
771                                         false,
772                                         false); /* First stage */
773                         port->deq_stage_inst = stage_inst;
774
775                         /* Add the port to the queue array of ports */
776                         queue->ports[queue->nb_ports] = port;
777                         port->instance_id = queue->nb_ports;
778                         queue->nb_ports++;
779
780                         if (queue->nb_ports > 1) {
781                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
782                                              "queue %u:, setup as SINGLE_LINK"
783                                              " but has more than one port linked",
784                                              opdl_pmd_dev_id(port->opdl),
785                                              queue->external_qid);
786                                 err = -EINVAL;
787                                 break;
788                         }
789
790                         /* -- single instance rx for next opdl -- */
791                         uint8_t next_qid =
792                                 device->q_map_ex_to_in[queue->external_qid] + 1;
793                         if (next_qid < RTE_EVENT_MAX_QUEUES_PER_DEV &&
794                                         device->queue[next_qid].configured) {
795
796                                 /* Remap the queue */
797                                 queue = &device->queue[next_qid];
798
799                                 stage_inst = opdl_stage_add(
800                                         device->opdl[queue->opdl_id],
801                                                 false,
802                                                 true);
803                                 port->enq_stage_inst = stage_inst;
804
805                                 /* Add the port to the queue array of ports */
806                                 queue->ports[queue->nb_ports] = port;
807                                 port->instance_id = queue->nb_ports;
808                                 queue->nb_ports++;
809                                 if (queue->nb_ports > 1) {
810                                         PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
811                                                 "dummy queue %u: for "
812                                                 "port %u, "
813                                                 "SINGLE_LINK but has more "
814                                                 "than one port linked",
815                                                 opdl_pmd_dev_id(port->opdl),
816                                                 next_qid,
817                                                 port->id);
818                                         err = -EINVAL;
819                                         break;
820                                 }
821                                 /* Set this queue to initialized as it is never
822                                  * referenced by any ports
823                                  */
824                                 queue->initialized = 1;
825                         }
826                 }
827         }
828
829         /* Now that all ports are initialised we need to
830          * setup the last bit of stage md
831          */
832         if (!err) {
833                 for (i = 0; i < device->nb_ports; i++) {
834                         struct opdl_port *port = &device->ports[i];
835                         struct opdl_queue *queue =
836                                 &device->queue[port->queue_id];
837
838                         if (port->configured &&
839                                         (port->queue_id != OPDL_INVALID_QID)) {
840                                 if (queue->nb_ports == 0) {
841                                         PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
842                                                 "queue:[%u] has no ports"
843                                                 " linked to it",
844                                                 opdl_pmd_dev_id(port->opdl),
845                                                 port->id);
846                                         err = -EINVAL;
847                                         break;
848                                 }
849
850                                 port->num_instance = queue->nb_ports;
851                                 port->initialized = 1;
852                                 queue->initialized = 1;
853                         } else {
854                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
855                                              "Port:[%u] not configured  invalid"
856                                              " queue configuration",
857                                              opdl_pmd_dev_id(port->opdl),
858                                              port->id);
859                                 err = -EINVAL;
860                                 break;
861                         }
862                 }
863         }
864         return err;
865 }
866
867 int
868 initialise_queue_zero_ports(struct rte_eventdev *dev)
869 {
870         int err = 0;
871         uint8_t mt_rx = 0;
872         struct opdl_stage *stage_inst = NULL;
873         struct opdl_queue *queue = NULL;
874
875         struct opdl_evdev *device = opdl_pmd_priv(dev);
876
877         /* Assign queue zero and figure out how many Q0 ports we have */
878         uint32_t i;
879         for (i = 0; i < device->nb_ports; i++) {
880                 struct opdl_port *port = &device->ports[i];
881                 if (port->queue_id == OPDL_INVALID_QID) {
882                         port->queue_id = 0;
883                         port->external_qid = OPDL_INVALID_QID;
884                         port->p_type = OPDL_PURE_RX_PORT;
885                         mt_rx++;
886                 }
887         }
888
889         /* Create the stage */
890         stage_inst = opdl_stage_add(device->opdl[0],
891                         (mt_rx > 1 ? true : false),
892                         true);
893         if (stage_inst) {
894
895                 /* Assign the new created input stage to all relevant ports */
896                 for (i = 0; i < device->nb_ports; i++) {
897                         struct opdl_port *port = &device->ports[i];
898                         if (port->queue_id == 0) {
899                                 queue = &device->queue[port->queue_id];
900                                 port->enq_stage_inst = stage_inst;
901                                 port->deq_stage_inst = NULL;
902                                 port->configured = 1;
903                                 port->initialized = 1;
904
905                                 queue->ports[queue->nb_ports] = port;
906                                 port->instance_id = queue->nb_ports;
907                                 queue->nb_ports++;
908                         }
909                 }
910         } else {
911                 err = -1;
912         }
913         return err;
914 }
915
916 int
917 assign_internal_queue_ids(struct rte_eventdev *dev)
918 {
919         int err = 0;
920         struct opdl_evdev *device = opdl_pmd_priv(dev);
921         uint32_t i;
922
923         for (i = 0; i < device->nb_ports; i++) {
924                 struct opdl_port *port = &device->ports[i];
925                 if (port->external_qid != OPDL_INVALID_QID) {
926                         port->queue_id =
927                                 device->q_map_ex_to_in[port->external_qid];
928
929                         /* Now do the external_qid of the next queue */
930                         struct opdl_queue *queue =
931                                 &device->queue[port->queue_id];
932                         if (queue->q_pos == OPDL_Q_POS_END)
933                                 port->next_external_qid =
934                                 device->queue[port->queue_id + 2].external_qid;
935                         else
936                                 port->next_external_qid =
937                                 device->queue[port->queue_id + 1].external_qid;
938                 }
939         }
940         return err;
941 }