net: fix RARP generation
[dpdk.git] / drivers / event / opdl / opdl_evdev_init.c
1 /*-
2  * SPDX-License-Identifier: BSD-3-Clause
3  * Copyright(c) 2010-2014 Intel Corporation
4  */
5
6 #include <inttypes.h>
7 #include <string.h>
8
9 #include <rte_bus_vdev.h>
10 #include <rte_errno.h>
11 #include <rte_cycles.h>
12 #include <rte_memzone.h>
13
14 #include "opdl_evdev.h"
15 #include "opdl_ring.h"
16 #include "opdl_log.h"
17
18
19 static __rte_always_inline uint32_t
20 enqueue_check(struct opdl_port *p,
21                 const struct rte_event ev[],
22                 uint16_t num,
23                 uint16_t num_events)
24 {
25         uint16_t i;
26
27         if (p->opdl->do_validation) {
28
29                 for (i = 0; i < num; i++) {
30                         if (ev[i].queue_id != p->next_external_qid) {
31                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
32                                              "ERROR - port:[%u] - event wants"
33                                              " to enq to q_id[%u],"
34                                              " but should be [%u]",
35                                              opdl_pmd_dev_id(p->opdl),
36                                              p->id,
37                                              ev[i].queue_id,
38                                              p->next_external_qid);
39                                 rte_errno = -EINVAL;
40                                 return 0;
41                         }
42                 }
43
44                 /* Stats */
45                 if (p->p_type == OPDL_PURE_RX_PORT ||
46                                 p->p_type == OPDL_ASYNC_PORT) {
47                         /* Stats */
48                         if (num_events) {
49                                 p->port_stat[claim_pkts_requested] += num;
50                                 p->port_stat[claim_pkts_granted] += num_events;
51                                 p->port_stat[claim_non_empty]++;
52                                 p->start_cycles = rte_rdtsc();
53                         } else {
54                                 p->port_stat[claim_empty]++;
55                                 p->start_cycles = 0;
56                         }
57                 } else {
58                         if (p->start_cycles) {
59                                 uint64_t end_cycles = rte_rdtsc();
60                                 p->port_stat[total_cycles] +=
61                                         end_cycles - p->start_cycles;
62                         }
63                 }
64         } else {
65                 if (num > 0 &&
66                                 ev[0].queue_id != p->next_external_qid) {
67                         rte_errno = -EINVAL;
68                         return 0;
69                 }
70         }
71
72         return num;
73 }
74
75 static __rte_always_inline void
76 update_on_dequeue(struct opdl_port *p,
77                 struct rte_event ev[],
78                 uint16_t num,
79                 uint16_t num_events)
80 {
81         if (p->opdl->do_validation) {
82                 int16_t i;
83                 for (i = 0; i < num; i++)
84                         ev[i].queue_id =
85                                 p->opdl->queue[p->queue_id].external_qid;
86
87                 /* Stats */
88                 if (num_events) {
89                         p->port_stat[claim_pkts_requested] += num;
90                         p->port_stat[claim_pkts_granted] += num_events;
91                         p->port_stat[claim_non_empty]++;
92                         p->start_cycles = rte_rdtsc();
93                 } else {
94                         p->port_stat[claim_empty]++;
95                         p->start_cycles = 0;
96                 }
97         } else {
98                 if (num > 0)
99                         ev[0].queue_id =
100                                 p->opdl->queue[p->queue_id].external_qid;
101         }
102 }
103
104
105 /*
106  * Error RX enqueue:
107  *
108  *
109  */
110
111 static uint16_t
112 opdl_rx_error_enqueue(struct opdl_port *p,
113                 const struct rte_event ev[],
114                 uint16_t num)
115 {
116         RTE_SET_USED(p);
117         RTE_SET_USED(ev);
118         RTE_SET_USED(num);
119
120         rte_errno = -ENOSPC;
121
122         return 0;
123 }
124
125 /*
126  * RX enqueue:
127  *
128  * This function handles enqueue for a single input stage_inst with
129  *      threadsafe disabled or enabled. eg 1 thread using a stage_inst or
130  *      multiple threads sharing a stage_inst
131  */
132
133 static uint16_t
134 opdl_rx_enqueue(struct opdl_port *p,
135                 const struct rte_event ev[],
136                 uint16_t num)
137 {
138         uint16_t enqueued = 0;
139
140         enqueued = opdl_ring_input(opdl_stage_get_opdl_ring(p->enq_stage_inst),
141                                    ev,
142                                    num,
143                                    false);
144         if (!enqueue_check(p, ev, num, enqueued))
145                 return 0;
146
147
148         if (enqueued < num)
149                 rte_errno = -ENOSPC;
150
151         return enqueued;
152 }
153
154 /*
155  * Error TX handler
156  *
157  */
158
159 static uint16_t
160 opdl_tx_error_dequeue(struct opdl_port *p,
161                 struct rte_event ev[],
162                 uint16_t num)
163 {
164         RTE_SET_USED(p);
165         RTE_SET_USED(ev);
166         RTE_SET_USED(num);
167
168         rte_errno = -ENOSPC;
169
170         return 0;
171 }
172
173 /*
174  * TX single threaded claim
175  *
176  * This function handles dequeue for a single worker stage_inst with
177  *      threadsafe disabled. eg 1 thread using an stage_inst
178  */
179
180 static uint16_t
181 opdl_tx_dequeue_single_thread(struct opdl_port *p,
182                         struct rte_event ev[],
183                         uint16_t num)
184 {
185         uint16_t returned;
186
187         struct opdl_ring  *ring;
188
189         ring = opdl_stage_get_opdl_ring(p->deq_stage_inst);
190
191         returned = opdl_ring_copy_to_burst(ring,
192                                            p->deq_stage_inst,
193                                            ev,
194                                            num,
195                                            false);
196
197         update_on_dequeue(p, ev, num, returned);
198
199         return returned;
200 }
201
202 /*
203  * TX multi threaded claim
204  *
205  * This function handles dequeue for multiple worker stage_inst with
206  *      threadsafe disabled. eg multiple stage_inst each with its own instance
207  */
208
209 static uint16_t
210 opdl_tx_dequeue_multi_inst(struct opdl_port *p,
211                         struct rte_event ev[],
212                         uint16_t num)
213 {
214         uint32_t num_events = 0;
215
216         num_events = opdl_stage_claim(p->deq_stage_inst,
217                                     (void *)ev,
218                                     num,
219                                     NULL,
220                                     false,
221                                     false);
222
223         update_on_dequeue(p, ev, num, num_events);
224
225         return opdl_stage_disclaim(p->deq_stage_inst, num_events, false);
226 }
227
228
229 /*
230  * Worker thread claim
231  *
232  */
233
234 static uint16_t
235 opdl_claim(struct opdl_port *p, struct rte_event ev[], uint16_t num)
236 {
237         uint32_t num_events = 0;
238
239         if (unlikely(num > MAX_OPDL_CONS_Q_DEPTH)) {
240                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
241                              "Attempt to dequeue num of events larger than port (%d) max",
242                              opdl_pmd_dev_id(p->opdl),
243                              p->id);
244                 rte_errno = -EINVAL;
245                 return 0;
246         }
247
248
249         num_events = opdl_stage_claim(p->deq_stage_inst,
250                         (void *)ev,
251                         num,
252                         NULL,
253                         false,
254                         p->atomic_claim);
255
256
257         update_on_dequeue(p, ev, num, num_events);
258
259         return num_events;
260 }
261
262 /*
263  * Worker thread disclaim
264  */
265
266 static uint16_t
267 opdl_disclaim(struct opdl_port *p, const struct rte_event ev[], uint16_t num)
268 {
269         uint16_t enqueued = 0;
270
271         uint32_t i = 0;
272
273         for (i = 0; i < num; i++)
274                 opdl_ring_cas_slot(p->enq_stage_inst, &ev[i],
275                                 i, p->atomic_claim);
276
277         enqueued = opdl_stage_disclaim(p->enq_stage_inst,
278                                        num,
279                                        false);
280
281         return enqueue_check(p, ev, num, enqueued);
282 }
283
284 static __rte_always_inline struct opdl_stage *
285 stage_for_port(struct opdl_queue *q, unsigned int i)
286 {
287         if (q->q_pos == OPDL_Q_POS_START || q->q_pos == OPDL_Q_POS_MIDDLE)
288                 return q->ports[i]->enq_stage_inst;
289         else
290                 return q->ports[i]->deq_stage_inst;
291 }
292
293 static int opdl_add_deps(struct opdl_evdev *device,
294                          int q_id,
295                          int deps_q_id)
296 {
297         unsigned int i, j;
298         int status;
299         struct opdl_ring  *ring;
300         struct opdl_queue *queue = &device->queue[q_id];
301         struct opdl_queue *queue_deps = &device->queue[deps_q_id];
302         struct opdl_stage *dep_stages[OPDL_PORTS_MAX];
303
304         /* sanity check that all stages are for same opdl ring */
305         for (i = 0; i < queue->nb_ports; i++) {
306                 struct opdl_ring *r =
307                         opdl_stage_get_opdl_ring(stage_for_port(queue, i));
308                 for (j = 0; j < queue_deps->nb_ports; j++) {
309                         struct opdl_ring *rj =
310                                 opdl_stage_get_opdl_ring(
311                                                 stage_for_port(queue_deps, j));
312                         if (r != rj) {
313                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
314                                              "Stages and dependents"
315                                              " are not for same opdl ring",
316                                              opdl_pmd_dev_id(device));
317                                 for (uint32_t k = 0;
318                                                 k < device->nb_opdls; k++) {
319                                         opdl_ring_dump(device->opdl[k],
320                                                         stdout);
321                                 }
322                                 return -EINVAL;
323                         }
324                 }
325         }
326
327         /* Gather all stages instance in deps */
328         for (i = 0; i < queue_deps->nb_ports; i++)
329                 dep_stages[i] = stage_for_port(queue_deps, i);
330
331
332         /* Add all deps for each port->stage_inst in this queue */
333         for (i = 0; i < queue->nb_ports; i++) {
334
335                 ring = opdl_stage_get_opdl_ring(stage_for_port(queue, i));
336
337                 status = opdl_stage_deps_add(ring,
338                                 stage_for_port(queue, i),
339                                 queue->ports[i]->num_instance,
340                                 queue->ports[i]->instance_id,
341                                 dep_stages,
342                                 queue_deps->nb_ports);
343                 if (status < 0)
344                         return -EINVAL;
345         }
346
347         return 0;
348 }
349
350 int
351 opdl_add_event_handlers(struct rte_eventdev *dev)
352 {
353         int err = 0;
354
355         struct opdl_evdev *device = opdl_pmd_priv(dev);
356         unsigned int i;
357
358         for (i = 0; i < device->max_port_nb; i++) {
359
360                 struct opdl_port *port = &device->ports[i];
361
362                 if (port->configured) {
363                         if (port->p_type == OPDL_PURE_RX_PORT) {
364                                 port->enq = opdl_rx_enqueue;
365                                 port->deq = opdl_tx_error_dequeue;
366
367                         } else if (port->p_type == OPDL_PURE_TX_PORT) {
368
369                                 port->enq = opdl_rx_error_enqueue;
370
371                                 if (port->num_instance == 1)
372                                         port->deq =
373                                                 opdl_tx_dequeue_single_thread;
374                                 else
375                                         port->deq = opdl_tx_dequeue_multi_inst;
376
377                         } else if (port->p_type == OPDL_REGULAR_PORT) {
378
379                                 port->enq = opdl_disclaim;
380                                 port->deq = opdl_claim;
381
382                         } else if (port->p_type == OPDL_ASYNC_PORT) {
383
384                                 port->enq = opdl_rx_enqueue;
385
386                                 /* Always single instance */
387                                 port->deq = opdl_tx_dequeue_single_thread;
388                         } else {
389                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
390                                              "port:[%u] has invalid port type - ",
391                                              opdl_pmd_dev_id(port->opdl),
392                                              port->id);
393                                 err = -EINVAL;
394                                 break;
395                         }
396                         port->initialized = 1;
397                 }
398         }
399
400         if (!err)
401                 fprintf(stdout, "Success - enqueue/dequeue handler(s) added\n");
402         return err;
403 }
404
405 int
406 build_all_dependencies(struct rte_eventdev *dev)
407 {
408
409         int err = 0;
410         unsigned int i;
411         struct opdl_evdev *device = opdl_pmd_priv(dev);
412
413         uint8_t start_qid = 0;
414
415         for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) {
416                 struct opdl_queue *queue = &device->queue[i];
417                 if (!queue->initialized)
418                         break;
419
420                 if (queue->q_pos == OPDL_Q_POS_START) {
421                         start_qid = i;
422                         continue;
423                 }
424
425                 if (queue->q_pos == OPDL_Q_POS_MIDDLE) {
426                         err = opdl_add_deps(device, i, i-1);
427                         if (err < 0) {
428                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
429                                              "dependency addition for queue:[%u] - FAILED",
430                                              dev->data->dev_id,
431                                              queue->external_qid);
432                                 break;
433                         }
434                 }
435
436                 if (queue->q_pos == OPDL_Q_POS_END) {
437                         /* Add this dependency */
438                         err = opdl_add_deps(device, i, i-1);
439                         if (err < 0) {
440                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
441                                              "dependency addition for queue:[%u] - FAILED",
442                                              dev->data->dev_id,
443                                              queue->external_qid);
444                                 break;
445                         }
446                         /* Add dependency for rx on tx */
447                         err = opdl_add_deps(device, start_qid, i);
448                         if (err < 0) {
449                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
450                                              "dependency addition for queue:[%u] - FAILED",
451                                              dev->data->dev_id,
452                                              queue->external_qid);
453                                 break;
454                         }
455                 }
456         }
457
458         if (!err)
459                 fprintf(stdout, "Success - dependencies built\n");
460
461         return err;
462 }
463 int
464 check_queues_linked(struct rte_eventdev *dev)
465 {
466
467         int err = 0;
468         unsigned int i;
469         struct opdl_evdev *device = opdl_pmd_priv(dev);
470         uint32_t nb_iq = 0;
471
472         for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) {
473                 struct opdl_queue *queue = &device->queue[i];
474
475                 if (!queue->initialized)
476                         break;
477
478                 if (queue->external_qid == OPDL_INVALID_QID)
479                         nb_iq++;
480
481                 if (queue->nb_ports == 0) {
482                         PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
483                                      "queue:[%u] has no associated ports",
484                                      dev->data->dev_id,
485                                      i);
486                         err = -EINVAL;
487                         break;
488                 }
489         }
490         if (!err) {
491                 if ((i - nb_iq) != device->max_queue_nb) {
492                         PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
493                                      "%u queues counted but should be %u",
494                                      dev->data->dev_id,
495                                      i - nb_iq,
496                                      device->max_queue_nb);
497                         err = -1;
498                 }
499
500         }
501         return err;
502 }
503
504 void
505 destroy_queues_and_rings(struct rte_eventdev *dev)
506 {
507         struct opdl_evdev *device = opdl_pmd_priv(dev);
508
509         for (uint32_t i = 0; i < device->nb_opdls; i++) {
510                 if (device->opdl[i])
511                         opdl_ring_free(device->opdl[i]);
512         }
513
514         memset(&device->queue,
515                         0,
516                         sizeof(struct opdl_queue)
517                         * RTE_EVENT_MAX_QUEUES_PER_DEV);
518 }
519
520 #define OPDL_ID(d)(d->nb_opdls - 1)
521
522 static __rte_always_inline void
523 initialise_queue(struct opdl_evdev *device,
524                 enum queue_pos pos,
525                 int32_t i)
526 {
527         struct opdl_queue *queue = &device->queue[device->nb_queues];
528
529         if (i == -1) {
530                 queue->q_type = OPDL_Q_TYPE_ORDERED;
531                 queue->external_qid = OPDL_INVALID_QID;
532         } else {
533                 queue->q_type = device->q_md[i].type;
534                 queue->external_qid = device->q_md[i].ext_id;
535                 /* Add ex->in for queues setup */
536                 device->q_map_ex_to_in[queue->external_qid] = device->nb_queues;
537         }
538         queue->opdl_id = OPDL_ID(device);
539         queue->q_pos = pos;
540         queue->nb_ports = 0;
541         queue->configured = 1;
542
543         device->nb_queues++;
544 }
545
546
547 static __rte_always_inline int
548 create_opdl(struct opdl_evdev *device)
549 {
550         int err = 0;
551
552         char name[RTE_MEMZONE_NAMESIZE];
553
554         snprintf(name, RTE_MEMZONE_NAMESIZE,
555                         "%s_%u", device->service_name, device->nb_opdls);
556
557         device->opdl[device->nb_opdls] =
558                 opdl_ring_create(name,
559                                 device->nb_events_limit,
560                                 sizeof(struct rte_event),
561                                 device->max_port_nb * 2,
562                                 device->socket);
563
564         if (!device->opdl[device->nb_opdls]) {
565                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
566                              "opdl ring %u creation - FAILED",
567                              opdl_pmd_dev_id(device),
568                              device->nb_opdls);
569                 err = -EINVAL;
570         } else {
571                 device->nb_opdls++;
572         }
573         return err;
574 }
575
576 static __rte_always_inline int
577 create_link_opdl(struct opdl_evdev *device, uint32_t index)
578 {
579
580         int err = 0;
581
582         if (device->q_md[index + 1].type !=
583                         OPDL_Q_TYPE_SINGLE_LINK) {
584
585                 /* async queue with regular
586                  * queue following it
587                  */
588
589                 /* create a new opdl ring */
590                 err = create_opdl(device);
591                 if (!err) {
592                         /* create an initial
593                          * dummy queue for new opdl
594                          */
595                         initialise_queue(device,
596                                         OPDL_Q_POS_START,
597                                         -1);
598                 } else {
599                         err = -EINVAL;
600                 }
601         } else {
602                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
603                              "queue %u, two consecutive"
604                              " SINGLE_LINK queues, not allowed",
605                              opdl_pmd_dev_id(device),
606                              index);
607                 err = -EINVAL;
608         }
609
610         return err;
611 }
612
613 int
614 create_queues_and_rings(struct rte_eventdev *dev)
615 {
616         int err = 0;
617
618         struct opdl_evdev *device = opdl_pmd_priv(dev);
619
620         device->nb_queues = 0;
621
622         if (device->nb_ports != device->max_port_nb) {
623                 PMD_DRV_LOG(ERR, "Number ports setup:%u NOT EQUAL to max port"
624                                 " number:%u for this device",
625                                 device->nb_ports,
626                                 device->max_port_nb);
627                 err = -1;
628         }
629
630         if (!err) {
631                 /* We will have at least one opdl so create it now */
632                 err = create_opdl(device);
633         }
634
635         if (!err) {
636
637                 /* Create 1st "dummy" queue */
638                 initialise_queue(device,
639                                  OPDL_Q_POS_START,
640                                  -1);
641
642                 for (uint32_t i = 0; i < device->nb_q_md; i++) {
643
644                         /* Check */
645                         if (!device->q_md[i].setup) {
646
647                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
648                                              "queue meta data slot %u"
649                                              " not setup - FAILING",
650                                              dev->data->dev_id,
651                                              i);
652                                 err = -EINVAL;
653                                 break;
654                         } else if (device->q_md[i].type !=
655                                         OPDL_Q_TYPE_SINGLE_LINK) {
656
657                                 if (!device->q_md[i + 1].setup) {
658                                         /* Create a simple ORDERED/ATOMIC
659                                          * queue at the end
660                                          */
661                                         initialise_queue(device,
662                                                         OPDL_Q_POS_END,
663                                                         i);
664
665                                 } else {
666                                         /* Create a simple ORDERED/ATOMIC
667                                          * queue in the middle
668                                          */
669                                         initialise_queue(device,
670                                                         OPDL_Q_POS_MIDDLE,
671                                                         i);
672                                 }
673                         } else if (device->q_md[i].type ==
674                                         OPDL_Q_TYPE_SINGLE_LINK) {
675
676                                 /* create last queue for this opdl */
677                                 initialise_queue(device,
678                                                 OPDL_Q_POS_END,
679                                                 i);
680
681                                 err = create_link_opdl(device, i);
682
683                                 if (err)
684                                         break;
685
686
687                         }
688                 }
689         }
690         if (err)
691                 destroy_queues_and_rings(dev);
692
693         return err;
694 }
695
696
697 int
698 initialise_all_other_ports(struct rte_eventdev *dev)
699 {
700         int err = 0;
701         struct opdl_stage *stage_inst = NULL;
702
703         struct opdl_evdev *device = opdl_pmd_priv(dev);
704
705         for (uint32_t i = 0; i < device->nb_ports; i++) {
706                 struct opdl_port *port = &device->ports[i];
707                 struct opdl_queue *queue = &device->queue[port->queue_id];
708
709                 if (port->queue_id == 0) {
710                         continue;
711                 } else if (queue->q_type != OPDL_Q_TYPE_SINGLE_LINK) {
712
713                         if (queue->q_pos == OPDL_Q_POS_MIDDLE) {
714
715                                 /* Regular port with claim/disclaim */
716                                 stage_inst = opdl_stage_add(
717                                         device->opdl[queue->opdl_id],
718                                                 false,
719                                                 false);
720                                 port->deq_stage_inst = stage_inst;
721                                 port->enq_stage_inst = stage_inst;
722
723                                 if (queue->q_type == OPDL_Q_TYPE_ATOMIC)
724                                         port->atomic_claim = true;
725                                 else
726                                         port->atomic_claim = false;
727
728                                 port->p_type =  OPDL_REGULAR_PORT;
729
730                                 /* Add the port to the queue array of ports */
731                                 queue->ports[queue->nb_ports] = port;
732                                 port->instance_id = queue->nb_ports;
733                                 queue->nb_ports++;
734                         } else if (queue->q_pos == OPDL_Q_POS_END) {
735
736                                 /* tx port  */
737                                 stage_inst = opdl_stage_add(
738                                         device->opdl[queue->opdl_id],
739                                                 false,
740                                                 false);
741                                 port->deq_stage_inst = stage_inst;
742                                 port->enq_stage_inst = NULL;
743                                 port->p_type = OPDL_PURE_TX_PORT;
744
745                                 /* Add the port to the queue array of ports */
746                                 queue->ports[queue->nb_ports] = port;
747                                 port->instance_id = queue->nb_ports;
748                                 queue->nb_ports++;
749                         } else {
750
751                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
752                                              "port %u:, linked incorrectly"
753                                              " to a q_pos START/INVALID %u",
754                                              opdl_pmd_dev_id(port->opdl),
755                                              port->id,
756                                              queue->q_pos);
757                                 err = -EINVAL;
758                                 break;
759                         }
760
761                 } else if (queue->q_type == OPDL_Q_TYPE_SINGLE_LINK) {
762
763                         port->p_type = OPDL_ASYNC_PORT;
764
765                         /* -- tx -- */
766                         stage_inst = opdl_stage_add(
767                                 device->opdl[queue->opdl_id],
768                                         false,
769                                         false); /* First stage */
770                         port->deq_stage_inst = stage_inst;
771
772                         /* Add the port to the queue array of ports */
773                         queue->ports[queue->nb_ports] = port;
774                         port->instance_id = queue->nb_ports;
775                         queue->nb_ports++;
776
777                         if (queue->nb_ports > 1) {
778                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
779                                              "queue %u:, setup as SINGLE_LINK"
780                                              " but has more than one port linked",
781                                              opdl_pmd_dev_id(port->opdl),
782                                              queue->external_qid);
783                                 err = -EINVAL;
784                                 break;
785                         }
786
787                         /* -- single instance rx for next opdl -- */
788                         uint8_t next_qid =
789                                 device->q_map_ex_to_in[queue->external_qid] + 1;
790                         if (next_qid < RTE_EVENT_MAX_QUEUES_PER_DEV &&
791                                         device->queue[next_qid].configured) {
792
793                                 /* Remap the queue */
794                                 queue = &device->queue[next_qid];
795
796                                 stage_inst = opdl_stage_add(
797                                         device->opdl[queue->opdl_id],
798                                                 false,
799                                                 true);
800                                 port->enq_stage_inst = stage_inst;
801
802                                 /* Add the port to the queue array of ports */
803                                 queue->ports[queue->nb_ports] = port;
804                                 port->instance_id = queue->nb_ports;
805                                 queue->nb_ports++;
806                                 if (queue->nb_ports > 1) {
807                                         PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
808                                                 "dummy queue %u: for "
809                                                 "port %u, "
810                                                 "SINGLE_LINK but has more "
811                                                 "than one port linked",
812                                                 opdl_pmd_dev_id(port->opdl),
813                                                 next_qid,
814                                                 port->id);
815                                         err = -EINVAL;
816                                         break;
817                                 }
818                                 /* Set this queue to initialized as it is never
819                                  * referenced by any ports
820                                  */
821                                 queue->initialized = 1;
822                         }
823                 }
824         }
825
826         /* Now that all ports are initialised we need to
827          * setup the last bit of stage md
828          */
829         if (!err) {
830                 for (uint32_t i = 0; i < device->nb_ports; i++) {
831                         struct opdl_port *port = &device->ports[i];
832                         struct opdl_queue *queue =
833                                 &device->queue[port->queue_id];
834
835                         if (port->configured &&
836                                         (port->queue_id != OPDL_INVALID_QID)) {
837                                 if (queue->nb_ports == 0) {
838                                         PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
839                                                 "queue:[%u] has no ports"
840                                                 " linked to it",
841                                                 opdl_pmd_dev_id(port->opdl),
842                                                 port->id);
843                                         err = -EINVAL;
844                                         break;
845                                 }
846
847                                 port->num_instance = queue->nb_ports;
848                                 port->initialized = 1;
849                                 queue->initialized = 1;
850                         } else {
851                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
852                                              "Port:[%u] not configured  invalid"
853                                              " queue configuration",
854                                              opdl_pmd_dev_id(port->opdl),
855                                              port->id);
856                                 err = -EINVAL;
857                                 break;
858                         }
859                 }
860         }
861         return err;
862 }
863
864 int
865 initialise_queue_zero_ports(struct rte_eventdev *dev)
866 {
867         int err = 0;
868         uint8_t mt_rx = 0;
869         struct opdl_stage *stage_inst = NULL;
870         struct opdl_queue *queue = NULL;
871
872         struct opdl_evdev *device = opdl_pmd_priv(dev);
873
874         /* Assign queue zero and figure out how many Q0 ports we have */
875         for (uint32_t i = 0; i < device->nb_ports; i++) {
876                 struct opdl_port *port = &device->ports[i];
877                 if (port->queue_id == OPDL_INVALID_QID) {
878                         port->queue_id = 0;
879                         port->external_qid = OPDL_INVALID_QID;
880                         port->p_type = OPDL_PURE_RX_PORT;
881                         mt_rx++;
882                 }
883         }
884
885         /* Create the stage */
886         stage_inst = opdl_stage_add(device->opdl[0],
887                         (mt_rx > 1 ? true : false),
888                         true);
889         if (stage_inst) {
890
891                 /* Assign the new created input stage to all relevant ports */
892                 for (uint32_t i = 0; i < device->nb_ports; i++) {
893                         struct opdl_port *port = &device->ports[i];
894                         if (port->queue_id == 0) {
895                                 queue = &device->queue[port->queue_id];
896                                 port->enq_stage_inst = stage_inst;
897                                 port->deq_stage_inst = NULL;
898                                 port->configured = 1;
899                                 port->initialized = 1;
900
901                                 queue->ports[queue->nb_ports] = port;
902                                 port->instance_id = queue->nb_ports;
903                                 queue->nb_ports++;
904                         }
905                 }
906         } else {
907                 err = -1;
908         }
909         return err;
910 }
911
912 int
913 assign_internal_queue_ids(struct rte_eventdev *dev)
914 {
915         int err = 0;
916         struct opdl_evdev *device = opdl_pmd_priv(dev);
917
918         for (uint32_t i = 0; i < device->nb_ports; i++) {
919                 struct opdl_port *port = &device->ports[i];
920                 if (port->external_qid != OPDL_INVALID_QID) {
921                         port->queue_id =
922                                 device->q_map_ex_to_in[port->external_qid];
923
924                         /* Now do the external_qid of the next queue */
925                         struct opdl_queue *queue =
926                                 &device->queue[port->queue_id];
927                         if (queue->q_pos == OPDL_Q_POS_END)
928                                 port->next_external_qid =
929                                 device->queue[port->queue_id + 2].external_qid;
930                         else
931                                 port->next_external_qid =
932                                 device->queue[port->queue_id + 1].external_qid;
933                 }
934         }
935         return err;
936 }