net/mlx4: fix RxQ errors stat
[dpdk.git] / drivers / event / opdl / opdl_evdev_init.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation
3  */
4
5 #include <inttypes.h>
6 #include <string.h>
7
8 #include <rte_bus_vdev.h>
9 #include <rte_errno.h>
10 #include <rte_cycles.h>
11 #include <rte_memzone.h>
12
13 #include "opdl_evdev.h"
14 #include "opdl_ring.h"
15 #include "opdl_log.h"
16
17
18 static __rte_always_inline uint32_t
19 enqueue_check(struct opdl_port *p,
20                 const struct rte_event ev[],
21                 uint16_t num,
22                 uint16_t num_events)
23 {
24         uint16_t i;
25
26         if (p->opdl->do_validation) {
27
28                 for (i = 0; i < num; i++) {
29                         if (ev[i].queue_id != p->next_external_qid) {
30                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
31                                              "ERROR - port:[%u] - event wants"
32                                              " to enq to q_id[%u],"
33                                              " but should be [%u]",
34                                              opdl_pmd_dev_id(p->opdl),
35                                              p->id,
36                                              ev[i].queue_id,
37                                              p->next_external_qid);
38                                 rte_errno = -EINVAL;
39                                 return 0;
40                         }
41                 }
42
43                 /* Stats */
44                 if (p->p_type == OPDL_PURE_RX_PORT ||
45                                 p->p_type == OPDL_ASYNC_PORT) {
46                         /* Stats */
47                         if (num_events) {
48                                 p->port_stat[claim_pkts_requested] += num;
49                                 p->port_stat[claim_pkts_granted] += num_events;
50                                 p->port_stat[claim_non_empty]++;
51                                 p->start_cycles = rte_rdtsc();
52                         } else {
53                                 p->port_stat[claim_empty]++;
54                                 p->start_cycles = 0;
55                         }
56                 } else {
57                         if (p->start_cycles) {
58                                 uint64_t end_cycles = rte_rdtsc();
59                                 p->port_stat[total_cycles] +=
60                                         end_cycles - p->start_cycles;
61                         }
62                 }
63         } else {
64                 if (num > 0 &&
65                                 ev[0].queue_id != p->next_external_qid) {
66                         rte_errno = -EINVAL;
67                         return 0;
68                 }
69         }
70
71         return num;
72 }
73
74 static __rte_always_inline void
75 update_on_dequeue(struct opdl_port *p,
76                 struct rte_event ev[],
77                 uint16_t num,
78                 uint16_t num_events)
79 {
80         if (p->opdl->do_validation) {
81                 int16_t i;
82                 for (i = 0; i < num; i++)
83                         ev[i].queue_id =
84                                 p->opdl->queue[p->queue_id].external_qid;
85
86                 /* Stats */
87                 if (num_events) {
88                         p->port_stat[claim_pkts_requested] += num;
89                         p->port_stat[claim_pkts_granted] += num_events;
90                         p->port_stat[claim_non_empty]++;
91                         p->start_cycles = rte_rdtsc();
92                 } else {
93                         p->port_stat[claim_empty]++;
94                         p->start_cycles = 0;
95                 }
96         } else {
97                 if (num > 0)
98                         ev[0].queue_id =
99                                 p->opdl->queue[p->queue_id].external_qid;
100         }
101 }
102
103
104 /*
105  * Error RX enqueue:
106  *
107  *
108  */
109
110 static uint16_t
111 opdl_rx_error_enqueue(struct opdl_port *p,
112                 const struct rte_event ev[],
113                 uint16_t num)
114 {
115         RTE_SET_USED(p);
116         RTE_SET_USED(ev);
117         RTE_SET_USED(num);
118
119         rte_errno = -ENOSPC;
120
121         return 0;
122 }
123
124 /*
125  * RX enqueue:
126  *
127  * This function handles enqueue for a single input stage_inst with
128  *      threadsafe disabled or enabled. eg 1 thread using a stage_inst or
129  *      multiple threads sharing a stage_inst
130  */
131
132 static uint16_t
133 opdl_rx_enqueue(struct opdl_port *p,
134                 const struct rte_event ev[],
135                 uint16_t num)
136 {
137         uint16_t enqueued = 0;
138
139         enqueued = opdl_ring_input(opdl_stage_get_opdl_ring(p->enq_stage_inst),
140                                    ev,
141                                    num,
142                                    false);
143         if (!enqueue_check(p, ev, num, enqueued))
144                 return 0;
145
146
147         if (enqueued < num)
148                 rte_errno = -ENOSPC;
149
150         return enqueued;
151 }
152
153 /*
154  * Error TX handler
155  *
156  */
157
158 static uint16_t
159 opdl_tx_error_dequeue(struct opdl_port *p,
160                 struct rte_event ev[],
161                 uint16_t num)
162 {
163         RTE_SET_USED(p);
164         RTE_SET_USED(ev);
165         RTE_SET_USED(num);
166
167         rte_errno = -ENOSPC;
168
169         return 0;
170 }
171
172 /*
173  * TX single threaded claim
174  *
175  * This function handles dequeue for a single worker stage_inst with
176  *      threadsafe disabled. eg 1 thread using an stage_inst
177  */
178
179 static uint16_t
180 opdl_tx_dequeue_single_thread(struct opdl_port *p,
181                         struct rte_event ev[],
182                         uint16_t num)
183 {
184         uint16_t returned;
185
186         struct opdl_ring  *ring;
187
188         ring = opdl_stage_get_opdl_ring(p->deq_stage_inst);
189
190         returned = opdl_ring_copy_to_burst(ring,
191                                            p->deq_stage_inst,
192                                            ev,
193                                            num,
194                                            false);
195
196         update_on_dequeue(p, ev, num, returned);
197
198         return returned;
199 }
200
201 /*
202  * TX multi threaded claim
203  *
204  * This function handles dequeue for multiple worker stage_inst with
205  *      threadsafe disabled. eg multiple stage_inst each with its own instance
206  */
207
208 static uint16_t
209 opdl_tx_dequeue_multi_inst(struct opdl_port *p,
210                         struct rte_event ev[],
211                         uint16_t num)
212 {
213         uint32_t num_events = 0;
214
215         num_events = opdl_stage_claim(p->deq_stage_inst,
216                                     (void *)ev,
217                                     num,
218                                     NULL,
219                                     false,
220                                     false);
221
222         update_on_dequeue(p, ev, num, num_events);
223
224         return opdl_stage_disclaim(p->deq_stage_inst, num_events, false);
225 }
226
227
228 /*
229  * Worker thread claim
230  *
231  */
232
233 static uint16_t
234 opdl_claim(struct opdl_port *p, struct rte_event ev[], uint16_t num)
235 {
236         uint32_t num_events = 0;
237
238         if (unlikely(num > MAX_OPDL_CONS_Q_DEPTH)) {
239                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
240                              "Attempt to dequeue num of events larger than port (%d) max",
241                              opdl_pmd_dev_id(p->opdl),
242                              p->id);
243                 rte_errno = -EINVAL;
244                 return 0;
245         }
246
247
248         num_events = opdl_stage_claim(p->deq_stage_inst,
249                         (void *)ev,
250                         num,
251                         NULL,
252                         false,
253                         p->atomic_claim);
254
255
256         update_on_dequeue(p, ev, num, num_events);
257
258         return num_events;
259 }
260
261 /*
262  * Worker thread disclaim
263  */
264
265 static uint16_t
266 opdl_disclaim(struct opdl_port *p, const struct rte_event ev[], uint16_t num)
267 {
268         uint16_t enqueued = 0;
269
270         uint32_t i = 0;
271
272         for (i = 0; i < num; i++)
273                 opdl_ring_cas_slot(p->enq_stage_inst, &ev[i],
274                                 i, p->atomic_claim);
275
276         enqueued = opdl_stage_disclaim(p->enq_stage_inst,
277                                        num,
278                                        false);
279
280         return enqueue_check(p, ev, num, enqueued);
281 }
282
283 static __rte_always_inline struct opdl_stage *
284 stage_for_port(struct opdl_queue *q, unsigned int i)
285 {
286         if (q->q_pos == OPDL_Q_POS_START || q->q_pos == OPDL_Q_POS_MIDDLE)
287                 return q->ports[i]->enq_stage_inst;
288         else
289                 return q->ports[i]->deq_stage_inst;
290 }
291
292 static int opdl_add_deps(struct opdl_evdev *device,
293                          int q_id,
294                          int deps_q_id)
295 {
296         unsigned int i, j;
297         int status;
298         struct opdl_ring  *ring;
299         struct opdl_queue *queue = &device->queue[q_id];
300         struct opdl_queue *queue_deps = &device->queue[deps_q_id];
301         struct opdl_stage *dep_stages[OPDL_PORTS_MAX];
302
303         /* sanity check that all stages are for same opdl ring */
304         for (i = 0; i < queue->nb_ports; i++) {
305                 struct opdl_ring *r =
306                         opdl_stage_get_opdl_ring(stage_for_port(queue, i));
307                 for (j = 0; j < queue_deps->nb_ports; j++) {
308                         struct opdl_ring *rj =
309                                 opdl_stage_get_opdl_ring(
310                                                 stage_for_port(queue_deps, j));
311                         if (r != rj) {
312                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
313                                              "Stages and dependents"
314                                              " are not for same opdl ring",
315                                              opdl_pmd_dev_id(device));
316                                 uint32_t k;
317                                 for (k = 0; k < device->nb_opdls; k++) {
318                                         opdl_ring_dump(device->opdl[k],
319                                                         stdout);
320                                 }
321                                 return -EINVAL;
322                         }
323                 }
324         }
325
326         /* Gather all stages instance in deps */
327         for (i = 0; i < queue_deps->nb_ports; i++)
328                 dep_stages[i] = stage_for_port(queue_deps, i);
329
330
331         /* Add all deps for each port->stage_inst in this queue */
332         for (i = 0; i < queue->nb_ports; i++) {
333
334                 ring = opdl_stage_get_opdl_ring(stage_for_port(queue, i));
335
336                 status = opdl_stage_deps_add(ring,
337                                 stage_for_port(queue, i),
338                                 queue->ports[i]->num_instance,
339                                 queue->ports[i]->instance_id,
340                                 dep_stages,
341                                 queue_deps->nb_ports);
342                 if (status < 0)
343                         return -EINVAL;
344         }
345
346         return 0;
347 }
348
349 int
350 opdl_add_event_handlers(struct rte_eventdev *dev)
351 {
352         int err = 0;
353
354         struct opdl_evdev *device = opdl_pmd_priv(dev);
355         unsigned int i;
356
357         for (i = 0; i < device->max_port_nb; i++) {
358
359                 struct opdl_port *port = &device->ports[i];
360
361                 if (port->configured) {
362                         if (port->p_type == OPDL_PURE_RX_PORT) {
363                                 port->enq = opdl_rx_enqueue;
364                                 port->deq = opdl_tx_error_dequeue;
365
366                         } else if (port->p_type == OPDL_PURE_TX_PORT) {
367
368                                 port->enq = opdl_rx_error_enqueue;
369
370                                 if (port->num_instance == 1)
371                                         port->deq =
372                                                 opdl_tx_dequeue_single_thread;
373                                 else
374                                         port->deq = opdl_tx_dequeue_multi_inst;
375
376                         } else if (port->p_type == OPDL_REGULAR_PORT) {
377
378                                 port->enq = opdl_disclaim;
379                                 port->deq = opdl_claim;
380
381                         } else if (port->p_type == OPDL_ASYNC_PORT) {
382
383                                 port->enq = opdl_rx_enqueue;
384
385                                 /* Always single instance */
386                                 port->deq = opdl_tx_dequeue_single_thread;
387                         } else {
388                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
389                                              "port:[%u] has invalid port type - ",
390                                              opdl_pmd_dev_id(port->opdl),
391                                              port->id);
392                                 err = -EINVAL;
393                                 break;
394                         }
395                         port->initialized = 1;
396                 }
397         }
398
399         if (!err)
400                 fprintf(stdout, "Success - enqueue/dequeue handler(s) added\n");
401         return err;
402 }
403
404 int
405 build_all_dependencies(struct rte_eventdev *dev)
406 {
407
408         int err = 0;
409         unsigned int i;
410         struct opdl_evdev *device = opdl_pmd_priv(dev);
411
412         uint8_t start_qid = 0;
413
414         for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) {
415                 struct opdl_queue *queue = &device->queue[i];
416                 if (!queue->initialized)
417                         break;
418
419                 if (queue->q_pos == OPDL_Q_POS_START) {
420                         start_qid = i;
421                         continue;
422                 }
423
424                 if (queue->q_pos == OPDL_Q_POS_MIDDLE) {
425                         err = opdl_add_deps(device, i, i-1);
426                         if (err < 0) {
427                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
428                                              "dependency addition for queue:[%u] - FAILED",
429                                              dev->data->dev_id,
430                                              queue->external_qid);
431                                 break;
432                         }
433                 }
434
435                 if (queue->q_pos == OPDL_Q_POS_END) {
436                         /* Add this dependency */
437                         err = opdl_add_deps(device, i, i-1);
438                         if (err < 0) {
439                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
440                                              "dependency addition for queue:[%u] - FAILED",
441                                              dev->data->dev_id,
442                                              queue->external_qid);
443                                 break;
444                         }
445                         /* Add dependency for rx on tx */
446                         err = opdl_add_deps(device, start_qid, i);
447                         if (err < 0) {
448                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
449                                              "dependency addition for queue:[%u] - FAILED",
450                                              dev->data->dev_id,
451                                              queue->external_qid);
452                                 break;
453                         }
454                 }
455         }
456
457         if (!err)
458                 fprintf(stdout, "Success - dependencies built\n");
459
460         return err;
461 }
462 int
463 check_queues_linked(struct rte_eventdev *dev)
464 {
465
466         int err = 0;
467         unsigned int i;
468         struct opdl_evdev *device = opdl_pmd_priv(dev);
469         uint32_t nb_iq = 0;
470
471         for (i = 0; i < RTE_EVENT_MAX_QUEUES_PER_DEV; i++) {
472                 struct opdl_queue *queue = &device->queue[i];
473
474                 if (!queue->initialized)
475                         break;
476
477                 if (queue->external_qid == OPDL_INVALID_QID)
478                         nb_iq++;
479
480                 if (queue->nb_ports == 0) {
481                         PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
482                                      "queue:[%u] has no associated ports",
483                                      dev->data->dev_id,
484                                      i);
485                         err = -EINVAL;
486                         break;
487                 }
488         }
489         if (!err) {
490                 if ((i - nb_iq) != device->max_queue_nb) {
491                         PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
492                                      "%u queues counted but should be %u",
493                                      dev->data->dev_id,
494                                      i - nb_iq,
495                                      device->max_queue_nb);
496                         err = -1;
497                 }
498
499         }
500         return err;
501 }
502
503 void
504 destroy_queues_and_rings(struct rte_eventdev *dev)
505 {
506         struct opdl_evdev *device = opdl_pmd_priv(dev);
507         uint32_t i;
508
509         for (i = 0; i < device->nb_opdls; i++) {
510                 if (device->opdl[i])
511                         opdl_ring_free(device->opdl[i]);
512         }
513
514         memset(&device->queue,
515                         0,
516                         sizeof(struct opdl_queue)
517                         * RTE_EVENT_MAX_QUEUES_PER_DEV);
518 }
519
520 #define OPDL_ID(d)(d->nb_opdls - 1)
521
522 static __rte_always_inline void
523 initialise_queue(struct opdl_evdev *device,
524                 enum queue_pos pos,
525                 int32_t i)
526 {
527         struct opdl_queue *queue = &device->queue[device->nb_queues];
528
529         if (i == -1) {
530                 queue->q_type = OPDL_Q_TYPE_ORDERED;
531                 queue->external_qid = OPDL_INVALID_QID;
532         } else {
533                 queue->q_type = device->q_md[i].type;
534                 queue->external_qid = device->q_md[i].ext_id;
535                 /* Add ex->in for queues setup */
536                 device->q_map_ex_to_in[queue->external_qid] = device->nb_queues;
537         }
538         queue->opdl_id = OPDL_ID(device);
539         queue->q_pos = pos;
540         queue->nb_ports = 0;
541         queue->configured = 1;
542
543         device->nb_queues++;
544 }
545
546
547 static __rte_always_inline int
548 create_opdl(struct opdl_evdev *device)
549 {
550         int err = 0;
551
552         char name[RTE_MEMZONE_NAMESIZE];
553
554         snprintf(name, RTE_MEMZONE_NAMESIZE,
555                         "%s_%u", device->service_name, device->nb_opdls);
556
557         device->opdl[device->nb_opdls] =
558                 opdl_ring_create(name,
559                                 device->nb_events_limit,
560                                 sizeof(struct rte_event),
561                                 device->max_port_nb * 2,
562                                 device->socket);
563
564         if (!device->opdl[device->nb_opdls]) {
565                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
566                              "opdl ring %u creation - FAILED",
567                              opdl_pmd_dev_id(device),
568                              device->nb_opdls);
569                 err = -EINVAL;
570         } else {
571                 device->nb_opdls++;
572         }
573         return err;
574 }
575
576 static __rte_always_inline int
577 create_link_opdl(struct opdl_evdev *device, uint32_t index)
578 {
579
580         int err = 0;
581
582         if (device->q_md[index + 1].type !=
583                         OPDL_Q_TYPE_SINGLE_LINK) {
584
585                 /* async queue with regular
586                  * queue following it
587                  */
588
589                 /* create a new opdl ring */
590                 err = create_opdl(device);
591                 if (!err) {
592                         /* create an initial
593                          * dummy queue for new opdl
594                          */
595                         initialise_queue(device,
596                                         OPDL_Q_POS_START,
597                                         -1);
598                 } else {
599                         err = -EINVAL;
600                 }
601         } else {
602                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
603                              "queue %u, two consecutive"
604                              " SINGLE_LINK queues, not allowed",
605                              opdl_pmd_dev_id(device),
606                              index);
607                 err = -EINVAL;
608         }
609
610         return err;
611 }
612
613 int
614 create_queues_and_rings(struct rte_eventdev *dev)
615 {
616         int err = 0;
617
618         struct opdl_evdev *device = opdl_pmd_priv(dev);
619
620         device->nb_queues = 0;
621
622         if (device->nb_ports != device->max_port_nb) {
623                 PMD_DRV_LOG(ERR, "Number ports setup:%u NOT EQUAL to max port"
624                                 " number:%u for this device",
625                                 device->nb_ports,
626                                 device->max_port_nb);
627                 err = -1;
628         }
629
630         if (!err) {
631                 /* We will have at least one opdl so create it now */
632                 err = create_opdl(device);
633         }
634
635         if (!err) {
636
637                 /* Create 1st "dummy" queue */
638                 initialise_queue(device,
639                                  OPDL_Q_POS_START,
640                                  -1);
641
642                 uint32_t i;
643                 for (i = 0; i < device->nb_q_md; i++) {
644
645                         /* Check */
646                         if (!device->q_md[i].setup) {
647
648                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
649                                              "queue meta data slot %u"
650                                              " not setup - FAILING",
651                                              dev->data->dev_id,
652                                              i);
653                                 err = -EINVAL;
654                                 break;
655                         } else if (device->q_md[i].type !=
656                                         OPDL_Q_TYPE_SINGLE_LINK) {
657
658                                 if (!device->q_md[i + 1].setup) {
659                                         /* Create a simple ORDERED/ATOMIC
660                                          * queue at the end
661                                          */
662                                         initialise_queue(device,
663                                                         OPDL_Q_POS_END,
664                                                         i);
665
666                                 } else {
667                                         /* Create a simple ORDERED/ATOMIC
668                                          * queue in the middle
669                                          */
670                                         initialise_queue(device,
671                                                         OPDL_Q_POS_MIDDLE,
672                                                         i);
673                                 }
674                         } else if (device->q_md[i].type ==
675                                         OPDL_Q_TYPE_SINGLE_LINK) {
676
677                                 /* create last queue for this opdl */
678                                 initialise_queue(device,
679                                                 OPDL_Q_POS_END,
680                                                 i);
681
682                                 err = create_link_opdl(device, i);
683
684                                 if (err)
685                                         break;
686
687
688                         }
689                 }
690         }
691         if (err)
692                 destroy_queues_and_rings(dev);
693
694         return err;
695 }
696
697
698 int
699 initialise_all_other_ports(struct rte_eventdev *dev)
700 {
701         int err = 0;
702         struct opdl_stage *stage_inst = NULL;
703
704         struct opdl_evdev *device = opdl_pmd_priv(dev);
705
706         uint32_t i;
707         for (i = 0; i < device->nb_ports; i++) {
708                 struct opdl_port *port = &device->ports[i];
709                 struct opdl_queue *queue = &device->queue[port->queue_id];
710
711                 if (port->queue_id == 0) {
712                         continue;
713                 } else if (queue->q_type != OPDL_Q_TYPE_SINGLE_LINK) {
714
715                         if (queue->q_pos == OPDL_Q_POS_MIDDLE) {
716
717                                 /* Regular port with claim/disclaim */
718                                 stage_inst = opdl_stage_add(
719                                         device->opdl[queue->opdl_id],
720                                                 false,
721                                                 false);
722                                 port->deq_stage_inst = stage_inst;
723                                 port->enq_stage_inst = stage_inst;
724
725                                 if (queue->q_type == OPDL_Q_TYPE_ATOMIC)
726                                         port->atomic_claim = true;
727                                 else
728                                         port->atomic_claim = false;
729
730                                 port->p_type =  OPDL_REGULAR_PORT;
731
732                                 /* Add the port to the queue array of ports */
733                                 queue->ports[queue->nb_ports] = port;
734                                 port->instance_id = queue->nb_ports;
735                                 queue->nb_ports++;
736                                 opdl_stage_set_queue_id(stage_inst,
737                                                 port->queue_id);
738
739                         } else if (queue->q_pos == OPDL_Q_POS_END) {
740
741                                 /* tx port  */
742                                 stage_inst = opdl_stage_add(
743                                         device->opdl[queue->opdl_id],
744                                                 false,
745                                                 false);
746                                 port->deq_stage_inst = stage_inst;
747                                 port->enq_stage_inst = NULL;
748                                 port->p_type = OPDL_PURE_TX_PORT;
749
750                                 /* Add the port to the queue array of ports */
751                                 queue->ports[queue->nb_ports] = port;
752                                 port->instance_id = queue->nb_ports;
753                                 queue->nb_ports++;
754                         } else {
755
756                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
757                                              "port %u:, linked incorrectly"
758                                              " to a q_pos START/INVALID %u",
759                                              opdl_pmd_dev_id(port->opdl),
760                                              port->id,
761                                              queue->q_pos);
762                                 err = -EINVAL;
763                                 break;
764                         }
765
766                 } else if (queue->q_type == OPDL_Q_TYPE_SINGLE_LINK) {
767
768                         port->p_type = OPDL_ASYNC_PORT;
769
770                         /* -- tx -- */
771                         stage_inst = opdl_stage_add(
772                                 device->opdl[queue->opdl_id],
773                                         false,
774                                         false); /* First stage */
775                         port->deq_stage_inst = stage_inst;
776
777                         /* Add the port to the queue array of ports */
778                         queue->ports[queue->nb_ports] = port;
779                         port->instance_id = queue->nb_ports;
780                         queue->nb_ports++;
781
782                         if (queue->nb_ports > 1) {
783                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
784                                              "queue %u:, setup as SINGLE_LINK"
785                                              " but has more than one port linked",
786                                              opdl_pmd_dev_id(port->opdl),
787                                              queue->external_qid);
788                                 err = -EINVAL;
789                                 break;
790                         }
791
792                         /* -- single instance rx for next opdl -- */
793                         uint8_t next_qid =
794                                 device->q_map_ex_to_in[queue->external_qid] + 1;
795                         if (next_qid < RTE_EVENT_MAX_QUEUES_PER_DEV &&
796                                         device->queue[next_qid].configured) {
797
798                                 /* Remap the queue */
799                                 queue = &device->queue[next_qid];
800
801                                 stage_inst = opdl_stage_add(
802                                         device->opdl[queue->opdl_id],
803                                                 false,
804                                                 true);
805                                 port->enq_stage_inst = stage_inst;
806
807                                 /* Add the port to the queue array of ports */
808                                 queue->ports[queue->nb_ports] = port;
809                                 port->instance_id = queue->nb_ports;
810                                 queue->nb_ports++;
811                                 if (queue->nb_ports > 1) {
812                                         PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
813                                                 "dummy queue %u: for "
814                                                 "port %u, "
815                                                 "SINGLE_LINK but has more "
816                                                 "than one port linked",
817                                                 opdl_pmd_dev_id(port->opdl),
818                                                 next_qid,
819                                                 port->id);
820                                         err = -EINVAL;
821                                         break;
822                                 }
823                                 /* Set this queue to initialized as it is never
824                                  * referenced by any ports
825                                  */
826                                 queue->initialized = 1;
827                         }
828                 }
829         }
830
831         /* Now that all ports are initialised we need to
832          * setup the last bit of stage md
833          */
834         if (!err) {
835                 for (i = 0; i < device->nb_ports; i++) {
836                         struct opdl_port *port = &device->ports[i];
837                         struct opdl_queue *queue =
838                                 &device->queue[port->queue_id];
839
840                         if (port->configured &&
841                                         (port->queue_id != OPDL_INVALID_QID)) {
842                                 if (queue->nb_ports == 0) {
843                                         PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
844                                                 "queue:[%u] has no ports"
845                                                 " linked to it",
846                                                 opdl_pmd_dev_id(port->opdl),
847                                                 port->id);
848                                         err = -EINVAL;
849                                         break;
850                                 }
851
852                                 port->num_instance = queue->nb_ports;
853                                 port->initialized = 1;
854                                 queue->initialized = 1;
855                         } else {
856                                 PMD_DRV_LOG(ERR, "DEV_ID:[%02d] : "
857                                              "Port:[%u] not configured  invalid"
858                                              " queue configuration",
859                                              opdl_pmd_dev_id(port->opdl),
860                                              port->id);
861                                 err = -EINVAL;
862                                 break;
863                         }
864                 }
865         }
866         return err;
867 }
868
869 int
870 initialise_queue_zero_ports(struct rte_eventdev *dev)
871 {
872         int err = 0;
873         uint8_t mt_rx = 0;
874         struct opdl_stage *stage_inst = NULL;
875         struct opdl_queue *queue = NULL;
876
877         struct opdl_evdev *device = opdl_pmd_priv(dev);
878
879         /* Assign queue zero and figure out how many Q0 ports we have */
880         uint32_t i;
881         for (i = 0; i < device->nb_ports; i++) {
882                 struct opdl_port *port = &device->ports[i];
883                 if (port->queue_id == OPDL_INVALID_QID) {
884                         port->queue_id = 0;
885                         port->external_qid = OPDL_INVALID_QID;
886                         port->p_type = OPDL_PURE_RX_PORT;
887                         mt_rx++;
888                 }
889         }
890
891         /* Create the stage */
892         stage_inst = opdl_stage_add(device->opdl[0],
893                         (mt_rx > 1 ? true : false),
894                         true);
895         if (stage_inst) {
896
897                 /* Assign the new created input stage to all relevant ports */
898                 for (i = 0; i < device->nb_ports; i++) {
899                         struct opdl_port *port = &device->ports[i];
900                         if (port->queue_id == 0) {
901                                 queue = &device->queue[port->queue_id];
902                                 port->enq_stage_inst = stage_inst;
903                                 port->deq_stage_inst = NULL;
904                                 port->configured = 1;
905                                 port->initialized = 1;
906
907                                 queue->ports[queue->nb_ports] = port;
908                                 port->instance_id = queue->nb_ports;
909                                 queue->nb_ports++;
910                         }
911                 }
912         } else {
913                 err = -1;
914         }
915         return err;
916 }
917
918 int
919 assign_internal_queue_ids(struct rte_eventdev *dev)
920 {
921         int err = 0;
922         struct opdl_evdev *device = opdl_pmd_priv(dev);
923         uint32_t i;
924
925         for (i = 0; i < device->nb_ports; i++) {
926                 struct opdl_port *port = &device->ports[i];
927                 if (port->external_qid != OPDL_INVALID_QID) {
928                         port->queue_id =
929                                 device->q_map_ex_to_in[port->external_qid];
930
931                         /* Now do the external_qid of the next queue */
932                         struct opdl_queue *queue =
933                                 &device->queue[port->queue_id];
934                         if (queue->q_pos == OPDL_Q_POS_END)
935                                 port->next_external_qid =
936                                 device->queue[port->queue_id + 2].external_qid;
937                         else
938                                 port->next_external_qid =
939                                 device->queue[port->queue_id + 1].external_qid;
940                 }
941         }
942         return err;
943 }