eal: change power intrinsics API
[dpdk.git] / drivers / event / dlb / dlb.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2020 Intel Corporation
3  */
4
5 #include <assert.h>
6 #include <errno.h>
7 #include <nmmintrin.h>
8 #include <pthread.h>
9 #include <stdbool.h>
10 #include <stdint.h>
11 #include <stdio.h>
12 #include <string.h>
13 #include <sys/fcntl.h>
14 #include <sys/mman.h>
15 #include <unistd.h>
16
17 #include <rte_common.h>
18 #include <rte_config.h>
19 #include <rte_cycles.h>
20 #include <rte_debug.h>
21 #include <rte_dev.h>
22 #include <rte_errno.h>
23 #include <rte_io.h>
24 #include <rte_kvargs.h>
25 #include <rte_log.h>
26 #include <rte_malloc.h>
27 #include <rte_mbuf.h>
28 #include <rte_power_intrinsics.h>
29 #include <rte_prefetch.h>
30 #include <rte_ring.h>
31 #include <rte_string_fns.h>
32
33 #include <rte_eventdev.h>
34 #include <rte_eventdev_pmd.h>
35
36 #include "dlb_priv.h"
37 #include "dlb_iface.h"
38 #include "dlb_inline_fns.h"
39
40 /*
41  * Resources exposed to eventdev.
42  */
43 #if (RTE_EVENT_MAX_QUEUES_PER_DEV > UINT8_MAX)
44 #error "RTE_EVENT_MAX_QUEUES_PER_DEV cannot fit in member max_event_queues"
45 #endif
46 static struct rte_event_dev_info evdev_dlb_default_info = {
47         .driver_name = "", /* probe will set */
48         .min_dequeue_timeout_ns = DLB_MIN_DEQUEUE_TIMEOUT_NS,
49         .max_dequeue_timeout_ns = DLB_MAX_DEQUEUE_TIMEOUT_NS,
50 #if (RTE_EVENT_MAX_QUEUES_PER_DEV < DLB_MAX_NUM_LDB_QUEUES)
51         .max_event_queues = RTE_EVENT_MAX_QUEUES_PER_DEV,
52 #else
53         .max_event_queues = DLB_MAX_NUM_LDB_QUEUES,
54 #endif
55         .max_event_queue_flows = DLB_MAX_NUM_FLOWS,
56         .max_event_queue_priority_levels = DLB_QID_PRIORITIES,
57         .max_event_priority_levels = DLB_QID_PRIORITIES,
58         .max_event_ports = DLB_MAX_NUM_LDB_PORTS,
59         .max_event_port_dequeue_depth = DLB_MAX_CQ_DEPTH,
60         .max_event_port_enqueue_depth = DLB_MAX_ENQUEUE_DEPTH,
61         .max_event_port_links = DLB_MAX_NUM_QIDS_PER_LDB_CQ,
62         .max_num_events = DLB_MAX_NUM_LDB_CREDITS,
63         .max_single_link_event_port_queue_pairs = DLB_MAX_NUM_DIR_PORTS,
64         .event_dev_cap = (RTE_EVENT_DEV_CAP_QUEUE_QOS |
65                           RTE_EVENT_DEV_CAP_EVENT_QOS |
66                           RTE_EVENT_DEV_CAP_BURST_MODE |
67                           RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED |
68                           RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE |
69                           RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES),
70 };
71
72 struct process_local_port_data
73 dlb_port[DLB_MAX_NUM_PORTS][NUM_DLB_PORT_TYPES];
74
75 static inline uint16_t
76 dlb_event_enqueue_delayed(void *event_port,
77                           const struct rte_event events[]);
78
79 static inline uint16_t
80 dlb_event_enqueue_burst_delayed(void *event_port,
81                                 const struct rte_event events[],
82                                 uint16_t num);
83
84 static inline uint16_t
85 dlb_event_enqueue_new_burst_delayed(void *event_port,
86                                     const struct rte_event events[],
87                                     uint16_t num);
88
89 static inline uint16_t
90 dlb_event_enqueue_forward_burst_delayed(void *event_port,
91                                         const struct rte_event events[],
92                                         uint16_t num);
93
94 static int
95 dlb_hw_query_resources(struct dlb_eventdev *dlb)
96 {
97         struct dlb_hw_dev *handle = &dlb->qm_instance;
98         struct dlb_hw_resource_info *dlb_info = &handle->info;
99         int ret;
100
101         ret = dlb_iface_get_num_resources(handle,
102                                           &dlb->hw_rsrc_query_results);
103         if (ret) {
104                 DLB_LOG_ERR("get dlb num resources, err=%d\n", ret);
105                 return ret;
106         }
107
108         /* Complete filling in device resource info returned to evdev app,
109          * overriding any default values.
110          * The capabilities (CAPs) were set at compile time.
111          */
112
113         evdev_dlb_default_info.max_event_queues =
114                 dlb->hw_rsrc_query_results.num_ldb_queues;
115
116         evdev_dlb_default_info.max_event_ports =
117                 dlb->hw_rsrc_query_results.num_ldb_ports;
118
119         evdev_dlb_default_info.max_num_events =
120                 dlb->hw_rsrc_query_results.max_contiguous_ldb_credits;
121
122         /* Save off values used when creating the scheduling domain. */
123
124         handle->info.num_sched_domains =
125                 dlb->hw_rsrc_query_results.num_sched_domains;
126
127         handle->info.hw_rsrc_max.nb_events_limit =
128                 dlb->hw_rsrc_query_results.max_contiguous_ldb_credits;
129
130         handle->info.hw_rsrc_max.num_queues =
131                 dlb->hw_rsrc_query_results.num_ldb_queues +
132                 dlb->hw_rsrc_query_results.num_dir_ports;
133
134         handle->info.hw_rsrc_max.num_ldb_queues =
135                 dlb->hw_rsrc_query_results.num_ldb_queues;
136
137         handle->info.hw_rsrc_max.num_ldb_ports =
138                 dlb->hw_rsrc_query_results.num_ldb_ports;
139
140         handle->info.hw_rsrc_max.num_dir_ports =
141                 dlb->hw_rsrc_query_results.num_dir_ports;
142
143         handle->info.hw_rsrc_max.reorder_window_size =
144                 dlb->hw_rsrc_query_results.num_hist_list_entries;
145
146         rte_memcpy(dlb_info, &handle->info.hw_rsrc_max, sizeof(*dlb_info));
147
148         return 0;
149 }
150
151 static void
152 dlb_free_qe_mem(struct dlb_port *qm_port)
153 {
154         if (qm_port == NULL)
155                 return;
156
157         rte_free(qm_port->qe4);
158         qm_port->qe4 = NULL;
159
160         rte_free(qm_port->consume_qe);
161         qm_port->consume_qe = NULL;
162
163         rte_memzone_free(dlb_port[qm_port->id][PORT_TYPE(qm_port)].mz);
164         dlb_port[qm_port->id][PORT_TYPE(qm_port)].mz = NULL;
165 }
166
167 static int
168 dlb_init_consume_qe(struct dlb_port *qm_port, char *mz_name)
169 {
170         struct dlb_cq_pop_qe *qe;
171
172         qe = rte_zmalloc(mz_name,
173                         DLB_NUM_QES_PER_CACHE_LINE *
174                                 sizeof(struct dlb_cq_pop_qe),
175                         RTE_CACHE_LINE_SIZE);
176
177         if (qe == NULL) {
178                 DLB_LOG_ERR("dlb: no memory for consume_qe\n");
179                 return -ENOMEM;
180         }
181
182         qm_port->consume_qe = qe;
183
184         qe->qe_valid = 0;
185         qe->qe_frag = 0;
186         qe->qe_comp = 0;
187         qe->cq_token = 1;
188         /* Tokens value is 0-based; i.e. '0' returns 1 token, '1' returns 2,
189          * and so on.
190          */
191         qe->tokens = 0; /* set at run time */
192         qe->meas_lat = 0;
193         qe->no_dec = 0;
194         /* Completion IDs are disabled */
195         qe->cmp_id = 0;
196
197         return 0;
198 }
199
200 static int
201 dlb_init_qe_mem(struct dlb_port *qm_port, char *mz_name)
202 {
203         int ret, sz;
204
205         sz = DLB_NUM_QES_PER_CACHE_LINE * sizeof(struct dlb_enqueue_qe);
206
207         qm_port->qe4 = rte_zmalloc(mz_name, sz, RTE_CACHE_LINE_SIZE);
208
209         if (qm_port->qe4 == NULL) {
210                 DLB_LOG_ERR("dlb: no qe4 memory\n");
211                 ret = -ENOMEM;
212                 goto error_exit;
213         }
214
215         ret = dlb_init_consume_qe(qm_port, mz_name);
216         if (ret < 0) {
217                 DLB_LOG_ERR("dlb: dlb_init_consume_qe ret=%d\n", ret);
218                 goto error_exit;
219         }
220
221         return 0;
222
223 error_exit:
224
225         dlb_free_qe_mem(qm_port);
226
227         return ret;
228 }
229
230 /* Wrapper for string to int conversion. Substituted for atoi(...), which is
231  * unsafe.
232  */
233 #define DLB_BASE_10 10
234
235 static int
236 dlb_string_to_int(int *result, const char *str)
237 {
238         long ret;
239         char *endstr;
240
241         if (str == NULL || result == NULL)
242                 return -EINVAL;
243
244         errno = 0;
245         ret = strtol(str, &endstr, DLB_BASE_10);
246         if (errno)
247                 return -errno;
248
249         /* long int and int may be different width for some architectures */
250         if (ret < INT_MIN || ret > INT_MAX || endstr == str)
251                 return -EINVAL;
252
253         *result = ret;
254         return 0;
255 }
256
257 static int
258 set_numa_node(const char *key __rte_unused, const char *value, void *opaque)
259 {
260         int *socket_id = opaque;
261         int ret;
262
263         ret = dlb_string_to_int(socket_id, value);
264         if (ret < 0)
265                 return ret;
266
267         if (*socket_id > RTE_MAX_NUMA_NODES)
268                 return -EINVAL;
269
270         return 0;
271 }
272
273 static int
274 set_max_num_events(const char *key __rte_unused,
275                    const char *value,
276                    void *opaque)
277 {
278         int *max_num_events = opaque;
279         int ret;
280
281         if (value == NULL || opaque == NULL) {
282                 DLB_LOG_ERR("NULL pointer\n");
283                 return -EINVAL;
284         }
285
286         ret = dlb_string_to_int(max_num_events, value);
287         if (ret < 0)
288                 return ret;
289
290         if (*max_num_events < 0 || *max_num_events > DLB_MAX_NUM_LDB_CREDITS) {
291                 DLB_LOG_ERR("dlb: max_num_events must be between 0 and %d\n",
292                             DLB_MAX_NUM_LDB_CREDITS);
293                 return -EINVAL;
294         }
295
296         return 0;
297 }
298
299 static int
300 set_num_dir_credits(const char *key __rte_unused,
301                     const char *value,
302                     void *opaque)
303 {
304         int *num_dir_credits = opaque;
305         int ret;
306
307         if (value == NULL || opaque == NULL) {
308                 DLB_LOG_ERR("NULL pointer\n");
309                 return -EINVAL;
310         }
311
312         ret = dlb_string_to_int(num_dir_credits, value);
313         if (ret < 0)
314                 return ret;
315
316         if (*num_dir_credits < 0 ||
317             *num_dir_credits > DLB_MAX_NUM_DIR_CREDITS) {
318                 DLB_LOG_ERR("dlb: num_dir_credits must be between 0 and %d\n",
319                             DLB_MAX_NUM_DIR_CREDITS);
320                 return -EINVAL;
321         }
322         return 0;
323 }
324
325 /* VDEV-only notes:
326  * This function first unmaps all memory mappings and closes the
327  * domain's file descriptor, which causes the driver to reset the
328  * scheduling domain. Once that completes (when close() returns), we
329  * can safely free the dynamically allocated memory used by the
330  * scheduling domain.
331  *
332  * PF-only notes:
333  * We will maintain a use count and use that to determine when
334  * a reset is required.  In PF mode, we never mmap, or munmap
335  * device memory,  and we own the entire physical PCI device.
336  */
337
338 static void
339 dlb_hw_reset_sched_domain(const struct rte_eventdev *dev, bool reconfig)
340 {
341         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
342         enum dlb_configuration_state config_state;
343         int i, j;
344
345         /* Close and reset the domain */
346         dlb_iface_domain_close(dlb);
347
348         /* Free all dynamically allocated port memory */
349         for (i = 0; i < dlb->num_ports; i++)
350                 dlb_free_qe_mem(&dlb->ev_ports[i].qm_port);
351
352         /* If reconfiguring, mark the device's queues and ports as "previously
353          * configured." If the user does not reconfigure them, the PMD will
354          * reapply their previous configuration when the device is started.
355          */
356         config_state = (reconfig) ? DLB_PREV_CONFIGURED : DLB_NOT_CONFIGURED;
357
358         for (i = 0; i < dlb->num_ports; i++) {
359                 dlb->ev_ports[i].qm_port.config_state = config_state;
360                 /* Reset setup_done so ports can be reconfigured */
361                 dlb->ev_ports[i].setup_done = false;
362                 for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++)
363                         dlb->ev_ports[i].link[j].mapped = false;
364         }
365
366         for (i = 0; i < dlb->num_queues; i++)
367                 dlb->ev_queues[i].qm_queue.config_state = config_state;
368
369         for (i = 0; i < DLB_MAX_NUM_QUEUES; i++)
370                 dlb->ev_queues[i].setup_done = false;
371
372         dlb->num_ports = 0;
373         dlb->num_ldb_ports = 0;
374         dlb->num_dir_ports = 0;
375         dlb->num_queues = 0;
376         dlb->num_ldb_queues = 0;
377         dlb->num_dir_queues = 0;
378         dlb->configured = false;
379 }
380
381 static int
382 dlb_ldb_credit_pool_create(struct dlb_hw_dev *handle)
383 {
384         struct dlb_create_ldb_pool_args cfg;
385         struct dlb_cmd_response response;
386         int ret;
387
388         if (handle == NULL)
389                 return -EINVAL;
390
391         if (!handle->cfg.resources.num_ldb_credits) {
392                 handle->cfg.ldb_credit_pool_id = 0;
393                 handle->cfg.num_ldb_credits = 0;
394                 return 0;
395         }
396
397         cfg.response = (uintptr_t)&response;
398         cfg.num_ldb_credits = handle->cfg.resources.num_ldb_credits;
399
400         ret = dlb_iface_ldb_credit_pool_create(handle,
401                                                &cfg);
402         if (ret < 0) {
403                 DLB_LOG_ERR("dlb: ldb_credit_pool_create ret=%d (driver status: %s)\n",
404                             ret, dlb_error_strings[response.status]);
405         }
406
407         handle->cfg.ldb_credit_pool_id = response.id;
408         handle->cfg.num_ldb_credits = cfg.num_ldb_credits;
409
410         return ret;
411 }
412
413 static int
414 dlb_dir_credit_pool_create(struct dlb_hw_dev *handle)
415 {
416         struct dlb_create_dir_pool_args cfg;
417         struct dlb_cmd_response response;
418         int ret;
419
420         if (handle == NULL)
421                 return -EINVAL;
422
423         if (!handle->cfg.resources.num_dir_credits) {
424                 handle->cfg.dir_credit_pool_id = 0;
425                 handle->cfg.num_dir_credits = 0;
426                 return 0;
427         }
428
429         cfg.response = (uintptr_t)&response;
430         cfg.num_dir_credits = handle->cfg.resources.num_dir_credits;
431
432         ret = dlb_iface_dir_credit_pool_create(handle, &cfg);
433         if (ret < 0)
434                 DLB_LOG_ERR("dlb: dir_credit_pool_create ret=%d (driver status: %s)\n",
435                             ret, dlb_error_strings[response.status]);
436
437         handle->cfg.dir_credit_pool_id = response.id;
438         handle->cfg.num_dir_credits = cfg.num_dir_credits;
439
440         return ret;
441 }
442
443 static int
444 dlb_hw_create_sched_domain(struct dlb_hw_dev *handle,
445                            struct dlb_eventdev *dlb,
446                            const struct dlb_hw_rsrcs *resources_asked)
447 {
448         int ret = 0;
449         struct dlb_create_sched_domain_args *config_params;
450         struct dlb_cmd_response response;
451
452         if (resources_asked == NULL) {
453                 DLB_LOG_ERR("dlb: dlb_create NULL parameter\n");
454                 ret = EINVAL;
455                 goto error_exit;
456         }
457
458         /* Map generic qm resources to dlb resources */
459         config_params = &handle->cfg.resources;
460
461         config_params->response = (uintptr_t)&response;
462
463         /* DIR ports and queues */
464
465         config_params->num_dir_ports =
466                 resources_asked->num_dir_ports;
467
468         config_params->num_dir_credits =
469                 resources_asked->num_dir_credits;
470
471         /* LDB ports and queues */
472
473         config_params->num_ldb_queues =
474                 resources_asked->num_ldb_queues;
475
476         config_params->num_ldb_ports =
477                 resources_asked->num_ldb_ports;
478
479         config_params->num_ldb_credits =
480                 resources_asked->num_ldb_credits;
481
482         config_params->num_atomic_inflights =
483                 dlb->num_atm_inflights_per_queue *
484                 config_params->num_ldb_queues;
485
486         config_params->num_hist_list_entries = config_params->num_ldb_ports *
487                 DLB_NUM_HIST_LIST_ENTRIES_PER_LDB_PORT;
488
489         /* dlb limited to 1 credit pool per queue type */
490         config_params->num_ldb_credit_pools = 1;
491         config_params->num_dir_credit_pools = 1;
492
493         DLB_LOG_DBG("sched domain create - ldb_qs=%d, ldb_ports=%d, dir_ports=%d, atomic_inflights=%d, hist_list_entries=%d, ldb_credits=%d, dir_credits=%d, ldb_cred_pools=%d, dir-credit_pools=%d\n",
494                     config_params->num_ldb_queues,
495                     config_params->num_ldb_ports,
496                     config_params->num_dir_ports,
497                     config_params->num_atomic_inflights,
498                     config_params->num_hist_list_entries,
499                     config_params->num_ldb_credits,
500                     config_params->num_dir_credits,
501                     config_params->num_ldb_credit_pools,
502                     config_params->num_dir_credit_pools);
503
504         /* Configure the QM */
505
506         ret = dlb_iface_sched_domain_create(handle, config_params);
507         if (ret < 0) {
508                 DLB_LOG_ERR("dlb: domain create failed, device_id = %d, (driver ret = %d, extra status: %s)\n",
509                             handle->device_id,
510                             ret,
511                             dlb_error_strings[response.status]);
512                 goto error_exit;
513         }
514
515         handle->domain_id = response.id;
516         handle->domain_id_valid = 1;
517
518         config_params->response = 0;
519
520         ret = dlb_ldb_credit_pool_create(handle);
521         if (ret < 0) {
522                 DLB_LOG_ERR("dlb: create ldb credit pool failed\n");
523                 goto error_exit2;
524         }
525
526         ret = dlb_dir_credit_pool_create(handle);
527         if (ret < 0) {
528                 DLB_LOG_ERR("dlb: create dir credit pool failed\n");
529                 goto error_exit2;
530         }
531
532         handle->cfg.configured = true;
533
534         return 0;
535
536 error_exit2:
537         dlb_iface_domain_close(dlb);
538
539 error_exit:
540         return ret;
541 }
542
543 /* End HW specific */
544 static void
545 dlb_eventdev_info_get(struct rte_eventdev *dev,
546                       struct rte_event_dev_info *dev_info)
547 {
548         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
549         int ret;
550
551         ret = dlb_hw_query_resources(dlb);
552         if (ret) {
553                 const struct rte_eventdev_data *data = dev->data;
554
555                 DLB_LOG_ERR("get resources err=%d, devid=%d\n",
556                             ret, data->dev_id);
557                 /* fn is void, so fall through and return values set up in
558                  * probe
559                  */
560         }
561
562         /* Add num resources currently owned by this domain.
563          * These would become available if the scheduling domain were reset due
564          * to the application recalling eventdev_configure to *reconfigure* the
565          * domain.
566          */
567         evdev_dlb_default_info.max_event_ports += dlb->num_ldb_ports;
568         evdev_dlb_default_info.max_event_queues += dlb->num_ldb_queues;
569         evdev_dlb_default_info.max_num_events += dlb->num_ldb_credits;
570
571         /* In DLB A-stepping hardware, applications are limited to 128
572          * configured ports (load-balanced or directed). The reported number of
573          * available ports must reflect this.
574          */
575         if (dlb->revision < DLB_REV_B0) {
576                 int used_ports;
577
578                 used_ports = DLB_MAX_NUM_LDB_PORTS + DLB_MAX_NUM_DIR_PORTS -
579                         dlb->hw_rsrc_query_results.num_ldb_ports -
580                         dlb->hw_rsrc_query_results.num_dir_ports;
581
582                 evdev_dlb_default_info.max_event_ports =
583                         RTE_MIN(evdev_dlb_default_info.max_event_ports,
584                                 128 - used_ports);
585         }
586
587         evdev_dlb_default_info.max_event_queues =
588                 RTE_MIN(evdev_dlb_default_info.max_event_queues,
589                         RTE_EVENT_MAX_QUEUES_PER_DEV);
590
591         evdev_dlb_default_info.max_num_events =
592                 RTE_MIN(evdev_dlb_default_info.max_num_events,
593                         dlb->max_num_events_override);
594
595         *dev_info = evdev_dlb_default_info;
596 }
597
598 /* Note: 1 QM instance per QM device, QM instance/device == event device */
599 static int
600 dlb_eventdev_configure(const struct rte_eventdev *dev)
601 {
602         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
603         struct dlb_hw_dev *handle = &dlb->qm_instance;
604         struct dlb_hw_rsrcs *rsrcs = &handle->info.hw_rsrc_max;
605         const struct rte_eventdev_data *data = dev->data;
606         const struct rte_event_dev_config *config = &data->dev_conf;
607         int ret;
608
609         /* If this eventdev is already configured, we must release the current
610          * scheduling domain before attempting to configure a new one.
611          */
612         if (dlb->configured) {
613                 dlb_hw_reset_sched_domain(dev, true);
614
615                 ret = dlb_hw_query_resources(dlb);
616                 if (ret) {
617                         DLB_LOG_ERR("get resources err=%d, devid=%d\n",
618                                     ret, data->dev_id);
619                         return ret;
620                 }
621         }
622
623         if (config->nb_event_queues > rsrcs->num_queues) {
624                 DLB_LOG_ERR("nb_event_queues parameter (%d) exceeds the QM device's capabilities (%d).\n",
625                             config->nb_event_queues,
626                             rsrcs->num_queues);
627                 return -EINVAL;
628         }
629         if (config->nb_event_ports > (rsrcs->num_ldb_ports
630                         + rsrcs->num_dir_ports)) {
631                 DLB_LOG_ERR("nb_event_ports parameter (%d) exceeds the QM device's capabilities (%d).\n",
632                             config->nb_event_ports,
633                             (rsrcs->num_ldb_ports + rsrcs->num_dir_ports));
634                 return -EINVAL;
635         }
636         if (config->nb_events_limit > rsrcs->nb_events_limit) {
637                 DLB_LOG_ERR("nb_events_limit parameter (%d) exceeds the QM device's capabilities (%d).\n",
638                             config->nb_events_limit,
639                             rsrcs->nb_events_limit);
640                 return -EINVAL;
641         }
642
643         if (config->event_dev_cfg & RTE_EVENT_DEV_CFG_PER_DEQUEUE_TIMEOUT)
644                 dlb->global_dequeue_wait = false;
645         else {
646                 uint32_t timeout32;
647
648                 dlb->global_dequeue_wait = true;
649
650                 timeout32 = config->dequeue_timeout_ns;
651
652                 dlb->global_dequeue_wait_ticks =
653                         timeout32 * (rte_get_timer_hz() / 1E9);
654         }
655
656         /* Does this platform support umonitor/umwait? */
657         if (rte_cpu_get_flag_enabled(RTE_CPUFLAG_WAITPKG)) {
658                 if (RTE_LIBRTE_PMD_DLB_UMWAIT_CTL_STATE != 0 &&
659                     RTE_LIBRTE_PMD_DLB_UMWAIT_CTL_STATE != 1) {
660                         DLB_LOG_ERR("invalid value (%d) for RTE_LIBRTE_PMD_DLB_UMWAIT_CTL_STATE must be 0 or 1.\n",
661                                     RTE_LIBRTE_PMD_DLB_UMWAIT_CTL_STATE);
662                         return -EINVAL;
663                 }
664                 dlb->umwait_allowed = true;
665         }
666
667         rsrcs->num_dir_ports = config->nb_single_link_event_port_queues;
668         rsrcs->num_ldb_ports = config->nb_event_ports - rsrcs->num_dir_ports;
669         /* 1 dir queue per dir port */
670         rsrcs->num_ldb_queues = config->nb_event_queues - rsrcs->num_dir_ports;
671
672         /* Scale down nb_events_limit by 4 for directed credits, since there
673          * are 4x as many load-balanced credits.
674          */
675         rsrcs->num_ldb_credits = 0;
676         rsrcs->num_dir_credits = 0;
677
678         if (rsrcs->num_ldb_queues)
679                 rsrcs->num_ldb_credits = config->nb_events_limit;
680         if (rsrcs->num_dir_ports)
681                 rsrcs->num_dir_credits = config->nb_events_limit / 4;
682         if (dlb->num_dir_credits_override != -1)
683                 rsrcs->num_dir_credits = dlb->num_dir_credits_override;
684
685         if (dlb_hw_create_sched_domain(handle, dlb, rsrcs) < 0) {
686                 DLB_LOG_ERR("dlb_hw_create_sched_domain failed\n");
687                 return -ENODEV;
688         }
689
690         dlb->new_event_limit = config->nb_events_limit;
691         __atomic_store_n(&dlb->inflights, 0, __ATOMIC_SEQ_CST);
692
693         /* Save number of ports/queues for this event dev */
694         dlb->num_ports = config->nb_event_ports;
695         dlb->num_queues = config->nb_event_queues;
696         dlb->num_dir_ports = rsrcs->num_dir_ports;
697         dlb->num_ldb_ports = dlb->num_ports - dlb->num_dir_ports;
698         dlb->num_ldb_queues = dlb->num_queues - dlb->num_dir_ports;
699         dlb->num_dir_queues = dlb->num_dir_ports;
700         dlb->num_ldb_credits = rsrcs->num_ldb_credits;
701         dlb->num_dir_credits = rsrcs->num_dir_credits;
702
703         dlb->configured = true;
704
705         return 0;
706 }
707
708 static int16_t
709 dlb_hw_unmap_ldb_qid_from_port(struct dlb_hw_dev *handle,
710                                uint32_t qm_port_id,
711                                uint16_t qm_qid)
712 {
713         struct dlb_unmap_qid_args cfg;
714         struct dlb_cmd_response response;
715         int32_t ret;
716
717         if (handle == NULL)
718                 return -EINVAL;
719
720         cfg.response = (uintptr_t)&response;
721         cfg.port_id = qm_port_id;
722         cfg.qid = qm_qid;
723
724         ret = dlb_iface_unmap_qid(handle, &cfg);
725         if (ret < 0)
726                 DLB_LOG_ERR("dlb: unmap qid error, ret=%d (driver status: %s)\n",
727                             ret, dlb_error_strings[response.status]);
728
729         return ret;
730 }
731
732 static int
733 dlb_event_queue_detach_ldb(struct dlb_eventdev *dlb,
734                            struct dlb_eventdev_port *ev_port,
735                            struct dlb_eventdev_queue *ev_queue)
736 {
737         int ret, i;
738
739         /* Don't unlink until start time. */
740         if (dlb->run_state == DLB_RUN_STATE_STOPPED)
741                 return 0;
742
743         for (i = 0; i < DLB_MAX_NUM_QIDS_PER_LDB_CQ; i++) {
744                 if (ev_port->link[i].valid &&
745                     ev_port->link[i].queue_id == ev_queue->id)
746                         break; /* found */
747         }
748
749         /* This is expected with eventdev API!
750          * It blindly attempts to unmap all queues.
751          */
752         if (i == DLB_MAX_NUM_QIDS_PER_LDB_CQ) {
753                 DLB_LOG_DBG("dlb: ignoring LB QID %d not mapped for qm_port %d.\n",
754                             ev_queue->qm_queue.id,
755                             ev_port->qm_port.id);
756                 return 0;
757         }
758
759         ret = dlb_hw_unmap_ldb_qid_from_port(&dlb->qm_instance,
760                                              ev_port->qm_port.id,
761                                              ev_queue->qm_queue.id);
762         if (!ret)
763                 ev_port->link[i].mapped = false;
764
765         return ret;
766 }
767
768 static int
769 dlb_eventdev_port_unlink(struct rte_eventdev *dev, void *event_port,
770                          uint8_t queues[], uint16_t nb_unlinks)
771 {
772         struct dlb_eventdev_port *ev_port = event_port;
773         struct dlb_eventdev *dlb;
774         int i;
775
776         RTE_SET_USED(dev);
777
778         if (!ev_port->setup_done) {
779                 DLB_LOG_ERR("dlb: evport %d is not configured\n",
780                             ev_port->id);
781                 rte_errno = -EINVAL;
782                 return 0;
783         }
784
785         if (queues == NULL || nb_unlinks == 0) {
786                 DLB_LOG_DBG("dlb: queues is NULL or nb_unlinks is 0\n");
787                 return 0; /* Ignore and return success */
788         }
789
790         if (ev_port->qm_port.is_directed) {
791                 DLB_LOG_DBG("dlb: ignore unlink from dir port %d\n",
792                             ev_port->id);
793                 rte_errno = 0;
794                 return nb_unlinks; /* as if success */
795         }
796
797         dlb = ev_port->dlb;
798
799         for (i = 0; i < nb_unlinks; i++) {
800                 struct dlb_eventdev_queue *ev_queue;
801                 int ret, j;
802
803                 if (queues[i] >= dlb->num_queues) {
804                         DLB_LOG_ERR("dlb: invalid queue id %d\n", queues[i]);
805                         rte_errno = -EINVAL;
806                         return i; /* return index of offending queue */
807                 }
808
809                 ev_queue = &dlb->ev_queues[queues[i]];
810
811                 /* Does a link exist? */
812                 for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++)
813                         if (ev_port->link[j].queue_id == queues[i] &&
814                             ev_port->link[j].valid)
815                                 break;
816
817                 if (j == DLB_MAX_NUM_QIDS_PER_LDB_CQ)
818                         continue;
819
820                 ret = dlb_event_queue_detach_ldb(dlb, ev_port, ev_queue);
821                 if (ret) {
822                         DLB_LOG_ERR("unlink err=%d for port %d queue %d\n",
823                                     ret, ev_port->id, queues[i]);
824                         rte_errno = -ENOENT;
825                         return i; /* return index of offending queue */
826                 }
827
828                 ev_port->link[j].valid = false;
829                 ev_port->num_links--;
830                 ev_queue->num_links--;
831         }
832
833         return nb_unlinks;
834 }
835
836 static int
837 dlb_eventdev_port_unlinks_in_progress(struct rte_eventdev *dev,
838                                       void *event_port)
839 {
840         struct dlb_eventdev_port *ev_port = event_port;
841         struct dlb_eventdev *dlb;
842         struct dlb_hw_dev *handle;
843         struct dlb_pending_port_unmaps_args cfg;
844         struct dlb_cmd_response response;
845         int ret;
846
847         RTE_SET_USED(dev);
848
849         if (!ev_port->setup_done) {
850                 DLB_LOG_ERR("dlb: evport %d is not configured\n",
851                             ev_port->id);
852                 rte_errno = -EINVAL;
853                 return 0;
854         }
855
856         cfg.port_id = ev_port->qm_port.id;
857         cfg.response = (uintptr_t)&response;
858         dlb = ev_port->dlb;
859         handle = &dlb->qm_instance;
860         ret = dlb_iface_pending_port_unmaps(handle, &cfg);
861
862         if (ret < 0) {
863                 DLB_LOG_ERR("dlb: num_unlinks_in_progress ret=%d (driver status: %s)\n",
864                             ret, dlb_error_strings[response.status]);
865                 return ret;
866         }
867
868         return response.id;
869 }
870
871 static void
872 dlb_eventdev_port_default_conf_get(struct rte_eventdev *dev,
873                                    uint8_t port_id,
874                                    struct rte_event_port_conf *port_conf)
875 {
876         RTE_SET_USED(port_id);
877         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
878
879         port_conf->new_event_threshold = dlb->new_event_limit;
880         port_conf->dequeue_depth = 32;
881         port_conf->enqueue_depth = DLB_MAX_ENQUEUE_DEPTH;
882         port_conf->event_port_cfg = 0;
883 }
884
885 static void
886 dlb_eventdev_queue_default_conf_get(struct rte_eventdev *dev,
887                                     uint8_t queue_id,
888                                     struct rte_event_queue_conf *queue_conf)
889 {
890         RTE_SET_USED(dev);
891         RTE_SET_USED(queue_id);
892         queue_conf->nb_atomic_flows = 1024;
893         queue_conf->nb_atomic_order_sequences = 32;
894         queue_conf->event_queue_cfg = 0;
895         queue_conf->priority = 0;
896 }
897
898 static int
899 dlb_hw_create_ldb_port(struct dlb_eventdev *dlb,
900                        struct dlb_eventdev_port *ev_port,
901                        uint32_t dequeue_depth,
902                        uint32_t cq_depth,
903                        uint32_t enqueue_depth,
904                        uint16_t rsvd_tokens,
905                        bool use_rsvd_token_scheme)
906 {
907         struct dlb_hw_dev *handle = &dlb->qm_instance;
908         struct dlb_create_ldb_port_args cfg = {0};
909         struct dlb_cmd_response response = {0};
910         int ret;
911         struct dlb_port *qm_port = NULL;
912         char mz_name[RTE_MEMZONE_NAMESIZE];
913         uint32_t qm_port_id;
914
915         if (handle == NULL)
916                 return -EINVAL;
917
918         if (cq_depth < DLB_MIN_LDB_CQ_DEPTH) {
919                 DLB_LOG_ERR("dlb: invalid cq_depth, must be %d-%d\n",
920                         DLB_MIN_LDB_CQ_DEPTH, DLB_MAX_INPUT_QUEUE_DEPTH);
921                 return -EINVAL;
922         }
923
924         if (enqueue_depth < DLB_MIN_ENQUEUE_DEPTH) {
925                 DLB_LOG_ERR("dlb: invalid enqueue_depth, must be at least %d\n",
926                             DLB_MIN_ENQUEUE_DEPTH);
927                 return -EINVAL;
928         }
929
930         rte_spinlock_lock(&handle->resource_lock);
931
932         cfg.response = (uintptr_t)&response;
933
934         /* We round up to the next power of 2 if necessary */
935         cfg.cq_depth = rte_align32pow2(cq_depth);
936         cfg.cq_depth_threshold = rsvd_tokens;
937
938         cfg.cq_history_list_size = DLB_NUM_HIST_LIST_ENTRIES_PER_LDB_PORT;
939
940         /* User controls the LDB high watermark via enqueue depth. The DIR high
941          * watermark is equal, unless the directed credit pool is too small.
942          */
943         cfg.ldb_credit_high_watermark = enqueue_depth;
944
945         /* If there are no directed ports, the kernel driver will ignore this
946          * port's directed credit settings. Don't use enqueue_depth if it would
947          * require more directed credits than are available.
948          */
949         cfg.dir_credit_high_watermark =
950                 RTE_MIN(enqueue_depth,
951                         handle->cfg.num_dir_credits / dlb->num_ports);
952
953         cfg.ldb_credit_quantum = cfg.ldb_credit_high_watermark / 2;
954         cfg.ldb_credit_low_watermark = RTE_MIN(16, cfg.ldb_credit_quantum);
955
956         cfg.dir_credit_quantum = cfg.dir_credit_high_watermark / 2;
957         cfg.dir_credit_low_watermark = RTE_MIN(16, cfg.dir_credit_quantum);
958
959         /* Per QM values */
960
961         cfg.ldb_credit_pool_id = handle->cfg.ldb_credit_pool_id;
962         cfg.dir_credit_pool_id = handle->cfg.dir_credit_pool_id;
963
964         ret = dlb_iface_ldb_port_create(handle, &cfg, dlb->poll_mode);
965         if (ret < 0) {
966                 DLB_LOG_ERR("dlb: dlb_ldb_port_create error, ret=%d (driver status: %s)\n",
967                             ret, dlb_error_strings[response.status]);
968                 goto error_exit;
969         }
970
971         qm_port_id = response.id;
972
973         DLB_LOG_DBG("dlb: ev_port %d uses qm LB port %d <<<<<\n",
974                     ev_port->id, qm_port_id);
975
976         qm_port = &ev_port->qm_port;
977         qm_port->ev_port = ev_port; /* back ptr */
978         qm_port->dlb = dlb; /* back ptr */
979
980         /*
981          * Allocate and init local qe struct(s).
982          * Note: MOVDIR64 requires the enqueue QE (qe4) to be aligned.
983          */
984
985         snprintf(mz_name, sizeof(mz_name), "ldb_port%d",
986                  ev_port->id);
987
988         ret = dlb_init_qe_mem(qm_port, mz_name);
989         if (ret < 0) {
990                 DLB_LOG_ERR("dlb: init_qe_mem failed, ret=%d\n", ret);
991                 goto error_exit;
992         }
993
994         qm_port->pp_mmio_base = DLB_LDB_PP_BASE + PAGE_SIZE * qm_port_id;
995         qm_port->id = qm_port_id;
996
997         /* The credit window is one high water mark of QEs */
998         qm_port->ldb_pushcount_at_credit_expiry = 0;
999         qm_port->cached_ldb_credits = cfg.ldb_credit_high_watermark;
1000         /* The credit window is one high water mark of QEs */
1001         qm_port->dir_pushcount_at_credit_expiry = 0;
1002         qm_port->cached_dir_credits = cfg.dir_credit_high_watermark;
1003         /* CQs with depth < 8 use an 8-entry queue, but withhold credits so
1004          * the effective depth is smaller.
1005          */
1006         qm_port->cq_depth = cfg.cq_depth <= 8 ? 8 : cfg.cq_depth;
1007         qm_port->cq_idx = 0;
1008         qm_port->cq_idx_unmasked = 0;
1009         if (dlb->poll_mode == DLB_CQ_POLL_MODE_SPARSE)
1010                 qm_port->cq_depth_mask = (qm_port->cq_depth * 4) - 1;
1011         else
1012                 qm_port->cq_depth_mask = qm_port->cq_depth - 1;
1013
1014         qm_port->gen_bit_shift = __builtin_popcount(qm_port->cq_depth_mask);
1015         /* starting value of gen bit - it toggles at wrap time */
1016         qm_port->gen_bit = 1;
1017
1018         qm_port->use_rsvd_token_scheme = use_rsvd_token_scheme;
1019         qm_port->cq_rsvd_token_deficit = rsvd_tokens;
1020         qm_port->int_armed = false;
1021
1022         /* Save off for later use in info and lookup APIs. */
1023         qm_port->qid_mappings = &dlb->qm_ldb_to_ev_queue_id[0];
1024
1025         qm_port->dequeue_depth = dequeue_depth;
1026
1027         /* When using the reserved token scheme, token_pop_thresh is
1028          * initially 2 * dequeue_depth. Once the tokens are reserved,
1029          * the enqueue code re-assigns it to dequeue_depth.
1030          */
1031         qm_port->token_pop_thresh = cq_depth;
1032
1033         /* When the deferred scheduling vdev arg is selected, use deferred pop
1034          * for all single-entry CQs.
1035          */
1036         if (cfg.cq_depth == 1 || (cfg.cq_depth == 2 && use_rsvd_token_scheme)) {
1037                 if (dlb->defer_sched)
1038                         qm_port->token_pop_mode = DEFERRED_POP;
1039         }
1040
1041         /* The default enqueue functions do not include delayed-pop support for
1042          * performance reasons.
1043          */
1044         if (qm_port->token_pop_mode == DELAYED_POP) {
1045                 dlb->event_dev->enqueue = dlb_event_enqueue_delayed;
1046                 dlb->event_dev->enqueue_burst =
1047                         dlb_event_enqueue_burst_delayed;
1048                 dlb->event_dev->enqueue_new_burst =
1049                         dlb_event_enqueue_new_burst_delayed;
1050                 dlb->event_dev->enqueue_forward_burst =
1051                         dlb_event_enqueue_forward_burst_delayed;
1052         }
1053
1054         qm_port->owed_tokens = 0;
1055         qm_port->issued_releases = 0;
1056
1057         /* update state */
1058         qm_port->state = PORT_STARTED; /* enabled at create time */
1059         qm_port->config_state = DLB_CONFIGURED;
1060
1061         qm_port->dir_credits = cfg.dir_credit_high_watermark;
1062         qm_port->ldb_credits = cfg.ldb_credit_high_watermark;
1063
1064         DLB_LOG_DBG("dlb: created ldb port %d, depth = %d, ldb credits=%d, dir credits=%d\n",
1065                     qm_port_id,
1066                     cq_depth,
1067                     qm_port->ldb_credits,
1068                     qm_port->dir_credits);
1069
1070         rte_spinlock_unlock(&handle->resource_lock);
1071
1072         return 0;
1073
1074 error_exit:
1075         if (qm_port) {
1076                 dlb_free_qe_mem(qm_port);
1077                 qm_port->pp_mmio_base = 0;
1078         }
1079
1080         rte_spinlock_unlock(&handle->resource_lock);
1081
1082         DLB_LOG_ERR("dlb: create ldb port failed!\n");
1083
1084         return ret;
1085 }
1086
1087 static int
1088 dlb_hw_create_dir_port(struct dlb_eventdev *dlb,
1089                        struct dlb_eventdev_port *ev_port,
1090                        uint32_t dequeue_depth,
1091                        uint32_t cq_depth,
1092                        uint32_t enqueue_depth,
1093                        uint16_t rsvd_tokens,
1094                        bool use_rsvd_token_scheme)
1095 {
1096         struct dlb_hw_dev *handle = &dlb->qm_instance;
1097         struct dlb_create_dir_port_args cfg = {0};
1098         struct dlb_cmd_response response = {0};
1099         int ret;
1100         struct dlb_port *qm_port = NULL;
1101         char mz_name[RTE_MEMZONE_NAMESIZE];
1102         uint32_t qm_port_id;
1103
1104         if (dlb == NULL || handle == NULL)
1105                 return -EINVAL;
1106
1107         if (cq_depth < DLB_MIN_DIR_CQ_DEPTH) {
1108                 DLB_LOG_ERR("dlb: invalid cq_depth, must be at least %d\n",
1109                             DLB_MIN_DIR_CQ_DEPTH);
1110                 return -EINVAL;
1111         }
1112
1113         if (enqueue_depth < DLB_MIN_ENQUEUE_DEPTH) {
1114                 DLB_LOG_ERR("dlb: invalid enqueue_depth, must be at least %d\n",
1115                             DLB_MIN_ENQUEUE_DEPTH);
1116                 return -EINVAL;
1117         }
1118
1119         rte_spinlock_lock(&handle->resource_lock);
1120
1121         /* Directed queues are configured at link time. */
1122         cfg.queue_id = -1;
1123
1124         cfg.response = (uintptr_t)&response;
1125
1126         /* We round up to the next power of 2 if necessary */
1127         cfg.cq_depth = rte_align32pow2(cq_depth);
1128         cfg.cq_depth_threshold = rsvd_tokens;
1129
1130         /* User controls the LDB high watermark via enqueue depth. The DIR high
1131          * watermark is equal, unless the directed credit pool is too small.
1132          */
1133         cfg.ldb_credit_high_watermark = enqueue_depth;
1134
1135         /* Don't use enqueue_depth if it would require more directed credits
1136          * than are available.
1137          */
1138         cfg.dir_credit_high_watermark =
1139                 RTE_MIN(enqueue_depth,
1140                         handle->cfg.num_dir_credits / dlb->num_ports);
1141
1142         cfg.ldb_credit_quantum = cfg.ldb_credit_high_watermark / 2;
1143         cfg.ldb_credit_low_watermark = RTE_MIN(16, cfg.ldb_credit_quantum);
1144
1145         cfg.dir_credit_quantum = cfg.dir_credit_high_watermark / 2;
1146         cfg.dir_credit_low_watermark = RTE_MIN(16, cfg.dir_credit_quantum);
1147
1148         /* Per QM values */
1149
1150         cfg.ldb_credit_pool_id = handle->cfg.ldb_credit_pool_id;
1151         cfg.dir_credit_pool_id = handle->cfg.dir_credit_pool_id;
1152
1153         ret = dlb_iface_dir_port_create(handle, &cfg, dlb->poll_mode);
1154         if (ret < 0) {
1155                 DLB_LOG_ERR("dlb: dlb_dir_port_create error, ret=%d (driver status: %s)\n",
1156                             ret, dlb_error_strings[response.status]);
1157                 goto error_exit;
1158         }
1159
1160         qm_port_id = response.id;
1161
1162         DLB_LOG_DBG("dlb: ev_port %d uses qm DIR port %d <<<<<\n",
1163                     ev_port->id, qm_port_id);
1164
1165         qm_port = &ev_port->qm_port;
1166         qm_port->ev_port = ev_port; /* back ptr */
1167         qm_port->dlb = dlb;  /* back ptr */
1168
1169         /*
1170          * Init local qe struct(s).
1171          * Note: MOVDIR64 requires the enqueue QE to be aligned
1172          */
1173
1174         snprintf(mz_name, sizeof(mz_name), "dir_port%d",
1175                  ev_port->id);
1176
1177         ret = dlb_init_qe_mem(qm_port, mz_name);
1178
1179         if (ret < 0) {
1180                 DLB_LOG_ERR("dlb: init_qe_mem failed, ret=%d\n", ret);
1181                 goto error_exit;
1182         }
1183
1184         qm_port->pp_mmio_base = DLB_DIR_PP_BASE + PAGE_SIZE * qm_port_id;
1185         qm_port->id = qm_port_id;
1186
1187         /* The credit window is one high water mark of QEs */
1188         qm_port->ldb_pushcount_at_credit_expiry = 0;
1189         qm_port->cached_ldb_credits = cfg.ldb_credit_high_watermark;
1190         /* The credit window is one high water mark of QEs */
1191         qm_port->dir_pushcount_at_credit_expiry = 0;
1192         qm_port->cached_dir_credits = cfg.dir_credit_high_watermark;
1193         qm_port->cq_depth = cfg.cq_depth;
1194         qm_port->cq_idx = 0;
1195         qm_port->cq_idx_unmasked = 0;
1196         if (dlb->poll_mode == DLB_CQ_POLL_MODE_SPARSE)
1197                 qm_port->cq_depth_mask = (cfg.cq_depth * 4) - 1;
1198         else
1199                 qm_port->cq_depth_mask = cfg.cq_depth - 1;
1200
1201         qm_port->gen_bit_shift = __builtin_popcount(qm_port->cq_depth_mask);
1202         /* starting value of gen bit - it toggles at wrap time */
1203         qm_port->gen_bit = 1;
1204
1205         qm_port->use_rsvd_token_scheme = use_rsvd_token_scheme;
1206         qm_port->cq_rsvd_token_deficit = rsvd_tokens;
1207         qm_port->int_armed = false;
1208
1209         /* Save off for later use in info and lookup APIs. */
1210         qm_port->qid_mappings = &dlb->qm_dir_to_ev_queue_id[0];
1211
1212         qm_port->dequeue_depth = dequeue_depth;
1213
1214         /* Directed ports are auto-pop, by default. */
1215         qm_port->token_pop_mode = AUTO_POP;
1216         qm_port->owed_tokens = 0;
1217         qm_port->issued_releases = 0;
1218
1219         /* update state */
1220         qm_port->state = PORT_STARTED; /* enabled at create time */
1221         qm_port->config_state = DLB_CONFIGURED;
1222
1223         qm_port->dir_credits = cfg.dir_credit_high_watermark;
1224         qm_port->ldb_credits = cfg.ldb_credit_high_watermark;
1225
1226         DLB_LOG_DBG("dlb: created dir port %d, depth = %d cr=%d,%d\n",
1227                     qm_port_id,
1228                     cq_depth,
1229                     cfg.dir_credit_high_watermark,
1230                     cfg.ldb_credit_high_watermark);
1231
1232         rte_spinlock_unlock(&handle->resource_lock);
1233
1234         return 0;
1235
1236 error_exit:
1237         if (qm_port) {
1238                 qm_port->pp_mmio_base = 0;
1239                 dlb_free_qe_mem(qm_port);
1240         }
1241
1242         rte_spinlock_unlock(&handle->resource_lock);
1243
1244         DLB_LOG_ERR("dlb: create dir port failed!\n");
1245
1246         return ret;
1247 }
1248
1249 static int32_t
1250 dlb_hw_create_ldb_queue(struct dlb_eventdev *dlb,
1251                         struct dlb_queue *queue,
1252                         const struct rte_event_queue_conf *evq_conf)
1253 {
1254         struct dlb_hw_dev *handle = &dlb->qm_instance;
1255         struct dlb_create_ldb_queue_args cfg;
1256         struct dlb_cmd_response response;
1257         int32_t ret;
1258         uint32_t qm_qid;
1259         int sched_type = -1;
1260
1261         if (evq_conf == NULL)
1262                 return -EINVAL;
1263
1264         if (evq_conf->event_queue_cfg & RTE_EVENT_QUEUE_CFG_ALL_TYPES) {
1265                 if (evq_conf->nb_atomic_order_sequences != 0)
1266                         sched_type = RTE_SCHED_TYPE_ORDERED;
1267                 else
1268                         sched_type = RTE_SCHED_TYPE_PARALLEL;
1269         } else
1270                 sched_type = evq_conf->schedule_type;
1271
1272         cfg.response = (uintptr_t)&response;
1273         cfg.num_atomic_inflights = dlb->num_atm_inflights_per_queue;
1274         cfg.num_sequence_numbers = evq_conf->nb_atomic_order_sequences;
1275         cfg.num_qid_inflights = evq_conf->nb_atomic_order_sequences;
1276
1277         if (sched_type != RTE_SCHED_TYPE_ORDERED) {
1278                 cfg.num_sequence_numbers = 0;
1279                 cfg.num_qid_inflights = DLB_DEF_UNORDERED_QID_INFLIGHTS;
1280         }
1281
1282         ret = dlb_iface_ldb_queue_create(handle, &cfg);
1283         if (ret < 0) {
1284                 DLB_LOG_ERR("dlb: create LB event queue error, ret=%d (driver status: %s)\n",
1285                             ret, dlb_error_strings[response.status]);
1286                 return -EINVAL;
1287         }
1288
1289         qm_qid = response.id;
1290
1291         /* Save off queue config for debug, resource lookups, and reconfig */
1292         queue->num_qid_inflights = cfg.num_qid_inflights;
1293         queue->num_atm_inflights = cfg.num_atomic_inflights;
1294
1295         queue->sched_type = sched_type;
1296         queue->config_state = DLB_CONFIGURED;
1297
1298         DLB_LOG_DBG("Created LB event queue %d, nb_inflights=%d, nb_seq=%d, qid inflights=%d\n",
1299                     qm_qid,
1300                     cfg.num_atomic_inflights,
1301                     cfg.num_sequence_numbers,
1302                     cfg.num_qid_inflights);
1303
1304         return qm_qid;
1305 }
1306
1307 static int32_t
1308 dlb_get_sn_allocation(struct dlb_eventdev *dlb, int group)
1309 {
1310         struct dlb_hw_dev *handle = &dlb->qm_instance;
1311         struct dlb_get_sn_allocation_args cfg;
1312         struct dlb_cmd_response response;
1313         int ret;
1314
1315         cfg.group = group;
1316         cfg.response = (uintptr_t)&response;
1317
1318         ret = dlb_iface_get_sn_allocation(handle, &cfg);
1319         if (ret < 0) {
1320                 DLB_LOG_ERR("dlb: get_sn_allocation ret=%d (driver status: %s)\n",
1321                             ret, dlb_error_strings[response.status]);
1322                 return ret;
1323         }
1324
1325         return response.id;
1326 }
1327
1328 static int
1329 dlb_set_sn_allocation(struct dlb_eventdev *dlb, int group, int num)
1330 {
1331         struct dlb_hw_dev *handle = &dlb->qm_instance;
1332         struct dlb_set_sn_allocation_args cfg;
1333         struct dlb_cmd_response response;
1334         int ret;
1335
1336         cfg.num = num;
1337         cfg.group = group;
1338         cfg.response = (uintptr_t)&response;
1339
1340         ret = dlb_iface_set_sn_allocation(handle, &cfg);
1341         if (ret < 0) {
1342                 DLB_LOG_ERR("dlb: set_sn_allocation ret=%d (driver status: %s)\n",
1343                             ret, dlb_error_strings[response.status]);
1344                 return ret;
1345         }
1346
1347         return ret;
1348 }
1349
1350 static int32_t
1351 dlb_get_sn_occupancy(struct dlb_eventdev *dlb, int group)
1352 {
1353         struct dlb_hw_dev *handle = &dlb->qm_instance;
1354         struct dlb_get_sn_occupancy_args cfg;
1355         struct dlb_cmd_response response;
1356         int ret;
1357
1358         cfg.group = group;
1359         cfg.response = (uintptr_t)&response;
1360
1361         ret = dlb_iface_get_sn_occupancy(handle, &cfg);
1362         if (ret < 0) {
1363                 DLB_LOG_ERR("dlb: get_sn_occupancy ret=%d (driver status: %s)\n",
1364                             ret, dlb_error_strings[response.status]);
1365                 return ret;
1366         }
1367
1368         return response.id;
1369 }
1370
1371 /* Query the current sequence number allocations and, if they conflict with the
1372  * requested LDB queue configuration, attempt to re-allocate sequence numbers.
1373  * This is best-effort; if it fails, the PMD will attempt to configure the
1374  * load-balanced queue and return an error.
1375  */
1376 static void
1377 dlb_program_sn_allocation(struct dlb_eventdev *dlb,
1378                           const struct rte_event_queue_conf *queue_conf)
1379 {
1380         int grp_occupancy[DLB_NUM_SN_GROUPS];
1381         int grp_alloc[DLB_NUM_SN_GROUPS];
1382         int i, sequence_numbers;
1383
1384         sequence_numbers = (int)queue_conf->nb_atomic_order_sequences;
1385
1386         for (i = 0; i < DLB_NUM_SN_GROUPS; i++) {
1387                 int total_slots;
1388
1389                 grp_alloc[i] = dlb_get_sn_allocation(dlb, i);
1390                 if (grp_alloc[i] < 0)
1391                         return;
1392
1393                 total_slots = DLB_MAX_LDB_SN_ALLOC / grp_alloc[i];
1394
1395                 grp_occupancy[i] = dlb_get_sn_occupancy(dlb, i);
1396                 if (grp_occupancy[i] < 0)
1397                         return;
1398
1399                 /* DLB has at least one available slot for the requested
1400                  * sequence numbers, so no further configuration required.
1401                  */
1402                 if (grp_alloc[i] == sequence_numbers &&
1403                     grp_occupancy[i] < total_slots)
1404                         return;
1405         }
1406
1407         /* None of the sequence number groups are configured for the requested
1408          * sequence numbers, so we have to reconfigure one of them. This is
1409          * only possible if a group is not in use.
1410          */
1411         for (i = 0; i < DLB_NUM_SN_GROUPS; i++) {
1412                 if (grp_occupancy[i] == 0)
1413                         break;
1414         }
1415
1416         if (i == DLB_NUM_SN_GROUPS) {
1417                 DLB_LOG_ERR("[%s()] No groups with %d sequence_numbers are available or have free slots\n",
1418                        __func__, sequence_numbers);
1419                 return;
1420         }
1421
1422         /* Attempt to configure slot i with the requested number of sequence
1423          * numbers. Ignore the return value -- if this fails, the error will be
1424          * caught during subsequent queue configuration.
1425          */
1426         dlb_set_sn_allocation(dlb, i, sequence_numbers);
1427 }
1428
1429 static int
1430 dlb_eventdev_ldb_queue_setup(struct rte_eventdev *dev,
1431                              struct dlb_eventdev_queue *ev_queue,
1432                              const struct rte_event_queue_conf *queue_conf)
1433 {
1434         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1435         int32_t qm_qid;
1436
1437         if (queue_conf->nb_atomic_order_sequences)
1438                 dlb_program_sn_allocation(dlb, queue_conf);
1439
1440         qm_qid = dlb_hw_create_ldb_queue(dlb,
1441                                          &ev_queue->qm_queue,
1442                                          queue_conf);
1443         if (qm_qid < 0) {
1444                 DLB_LOG_ERR("Failed to create the load-balanced queue\n");
1445
1446                 return qm_qid;
1447         }
1448
1449         dlb->qm_ldb_to_ev_queue_id[qm_qid] = ev_queue->id;
1450
1451         ev_queue->qm_queue.id = qm_qid;
1452
1453         return 0;
1454 }
1455
1456 static int dlb_num_dir_queues_setup(struct dlb_eventdev *dlb)
1457 {
1458         int i, num = 0;
1459
1460         for (i = 0; i < dlb->num_queues; i++) {
1461                 if (dlb->ev_queues[i].setup_done &&
1462                     dlb->ev_queues[i].qm_queue.is_directed)
1463                         num++;
1464         }
1465
1466         return num;
1467 }
1468
1469 static void
1470 dlb_queue_link_teardown(struct dlb_eventdev *dlb,
1471                         struct dlb_eventdev_queue *ev_queue)
1472 {
1473         struct dlb_eventdev_port *ev_port;
1474         int i, j;
1475
1476         for (i = 0; i < dlb->num_ports; i++) {
1477                 ev_port = &dlb->ev_ports[i];
1478
1479                 for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++) {
1480                         if (!ev_port->link[j].valid ||
1481                             ev_port->link[j].queue_id != ev_queue->id)
1482                                 continue;
1483
1484                         ev_port->link[j].valid = false;
1485                         ev_port->num_links--;
1486                 }
1487         }
1488
1489         ev_queue->num_links = 0;
1490 }
1491
1492 static int
1493 dlb_eventdev_queue_setup(struct rte_eventdev *dev,
1494                          uint8_t ev_qid,
1495                          const struct rte_event_queue_conf *queue_conf)
1496 {
1497         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1498         struct dlb_eventdev_queue *ev_queue;
1499         int ret;
1500
1501         if (queue_conf == NULL)
1502                 return -EINVAL;
1503
1504         if (ev_qid >= dlb->num_queues)
1505                 return -EINVAL;
1506
1507         ev_queue = &dlb->ev_queues[ev_qid];
1508
1509         ev_queue->qm_queue.is_directed = queue_conf->event_queue_cfg &
1510                 RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
1511         ev_queue->id = ev_qid;
1512         ev_queue->conf = *queue_conf;
1513
1514         if (!ev_queue->qm_queue.is_directed) {
1515                 ret = dlb_eventdev_ldb_queue_setup(dev, ev_queue, queue_conf);
1516         } else {
1517                 /* The directed queue isn't setup until link time, at which
1518                  * point we know its directed port ID. Directed queue setup
1519                  * will only fail if this queue is already setup or there are
1520                  * no directed queues left to configure.
1521                  */
1522                 ret = 0;
1523
1524                 ev_queue->qm_queue.config_state = DLB_NOT_CONFIGURED;
1525
1526                 if (ev_queue->setup_done ||
1527                     dlb_num_dir_queues_setup(dlb) == dlb->num_dir_queues)
1528                         ret = -EINVAL;
1529         }
1530
1531         /* Tear down pre-existing port->queue links */
1532         if (!ret && dlb->run_state == DLB_RUN_STATE_STOPPED)
1533                 dlb_queue_link_teardown(dlb, ev_queue);
1534
1535         if (!ret)
1536                 ev_queue->setup_done = true;
1537
1538         return ret;
1539 }
1540
1541 static void
1542 dlb_port_link_teardown(struct dlb_eventdev *dlb,
1543                        struct dlb_eventdev_port *ev_port)
1544 {
1545         struct dlb_eventdev_queue *ev_queue;
1546         int i;
1547
1548         for (i = 0; i < DLB_MAX_NUM_QIDS_PER_LDB_CQ; i++) {
1549                 if (!ev_port->link[i].valid)
1550                         continue;
1551
1552                 ev_queue = &dlb->ev_queues[ev_port->link[i].queue_id];
1553
1554                 ev_port->link[i].valid = false;
1555                 ev_port->num_links--;
1556                 ev_queue->num_links--;
1557         }
1558 }
1559
1560 static int
1561 dlb_eventdev_port_setup(struct rte_eventdev *dev,
1562                         uint8_t ev_port_id,
1563                         const struct rte_event_port_conf *port_conf)
1564 {
1565         struct dlb_eventdev *dlb;
1566         struct dlb_eventdev_port *ev_port;
1567         bool use_rsvd_token_scheme;
1568         uint32_t adj_cq_depth;
1569         uint16_t rsvd_tokens;
1570         int ret;
1571
1572         if (dev == NULL || port_conf == NULL) {
1573                 DLB_LOG_ERR("Null parameter\n");
1574                 return -EINVAL;
1575         }
1576
1577         dlb = dlb_pmd_priv(dev);
1578
1579         if (ev_port_id >= DLB_MAX_NUM_PORTS)
1580                 return -EINVAL;
1581
1582         if (port_conf->dequeue_depth >
1583                 evdev_dlb_default_info.max_event_port_dequeue_depth ||
1584             port_conf->enqueue_depth >
1585                 evdev_dlb_default_info.max_event_port_enqueue_depth)
1586                 return -EINVAL;
1587
1588         ev_port = &dlb->ev_ports[ev_port_id];
1589         /* configured? */
1590         if (ev_port->setup_done) {
1591                 DLB_LOG_ERR("evport %d is already configured\n", ev_port_id);
1592                 return -EINVAL;
1593         }
1594
1595         /* The reserved token interrupt arming scheme requires that one or more
1596          * CQ tokens be reserved by the PMD. This limits the amount of CQ space
1597          * usable by the DLB, so in order to give an *effective* CQ depth equal
1598          * to the user-requested value, we double CQ depth and reserve half of
1599          * its tokens. If the user requests the max CQ depth (256) then we
1600          * cannot double it, so we reserve one token and give an effective
1601          * depth of 255 entries.
1602          */
1603         use_rsvd_token_scheme = true;
1604         rsvd_tokens = 1;
1605         adj_cq_depth = port_conf->dequeue_depth;
1606
1607         if (use_rsvd_token_scheme && adj_cq_depth < 256) {
1608                 rsvd_tokens = adj_cq_depth;
1609                 adj_cq_depth *= 2;
1610         }
1611
1612         ev_port->qm_port.is_directed = port_conf->event_port_cfg &
1613                 RTE_EVENT_PORT_CFG_SINGLE_LINK;
1614
1615         if (!ev_port->qm_port.is_directed) {
1616                 ret = dlb_hw_create_ldb_port(dlb,
1617                                              ev_port,
1618                                              port_conf->dequeue_depth,
1619                                              adj_cq_depth,
1620                                              port_conf->enqueue_depth,
1621                                              rsvd_tokens,
1622                                              use_rsvd_token_scheme);
1623                 if (ret < 0) {
1624                         DLB_LOG_ERR("Failed to create the lB port ve portId=%d\n",
1625                                     ev_port_id);
1626                         return ret;
1627                 }
1628         } else {
1629                 ret = dlb_hw_create_dir_port(dlb,
1630                                              ev_port,
1631                                              port_conf->dequeue_depth,
1632                                              adj_cq_depth,
1633                                              port_conf->enqueue_depth,
1634                                              rsvd_tokens,
1635                                              use_rsvd_token_scheme);
1636                 if (ret < 0) {
1637                         DLB_LOG_ERR("Failed to create the DIR port\n");
1638                         return ret;
1639                 }
1640         }
1641
1642         /* Save off port config for reconfig */
1643         dlb->ev_ports[ev_port_id].conf = *port_conf;
1644
1645         dlb->ev_ports[ev_port_id].id = ev_port_id;
1646         dlb->ev_ports[ev_port_id].enq_configured = true;
1647         dlb->ev_ports[ev_port_id].setup_done = true;
1648         dlb->ev_ports[ev_port_id].inflight_max =
1649                 port_conf->new_event_threshold;
1650         dlb->ev_ports[ev_port_id].implicit_release =
1651                 !(port_conf->event_port_cfg &
1652                   RTE_EVENT_PORT_CFG_DISABLE_IMPL_REL);
1653         dlb->ev_ports[ev_port_id].outstanding_releases = 0;
1654         dlb->ev_ports[ev_port_id].inflight_credits = 0;
1655         dlb->ev_ports[ev_port_id].credit_update_quanta =
1656                 RTE_LIBRTE_PMD_DLB_SW_CREDIT_QUANTA;
1657         dlb->ev_ports[ev_port_id].dlb = dlb; /* reverse link */
1658
1659         /* Tear down pre-existing port->queue links */
1660         if (dlb->run_state == DLB_RUN_STATE_STOPPED)
1661                 dlb_port_link_teardown(dlb, &dlb->ev_ports[ev_port_id]);
1662
1663         dev->data->ports[ev_port_id] = &dlb->ev_ports[ev_port_id];
1664
1665         return 0;
1666 }
1667
1668 static int
1669 dlb_eventdev_reapply_configuration(struct rte_eventdev *dev)
1670 {
1671         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1672         int ret, i;
1673
1674         /* If an event queue or port was previously configured, but hasn't been
1675          * reconfigured, reapply its original configuration.
1676          */
1677         for (i = 0; i < dlb->num_queues; i++) {
1678                 struct dlb_eventdev_queue *ev_queue;
1679
1680                 ev_queue = &dlb->ev_queues[i];
1681
1682                 if (ev_queue->qm_queue.config_state != DLB_PREV_CONFIGURED)
1683                         continue;
1684
1685                 ret = dlb_eventdev_queue_setup(dev, i, &ev_queue->conf);
1686                 if (ret < 0) {
1687                         DLB_LOG_ERR("dlb: failed to reconfigure queue %d", i);
1688                         return ret;
1689                 }
1690         }
1691
1692         for (i = 0; i < dlb->num_ports; i++) {
1693                 struct dlb_eventdev_port *ev_port = &dlb->ev_ports[i];
1694
1695                 if (ev_port->qm_port.config_state != DLB_PREV_CONFIGURED)
1696                         continue;
1697
1698                 ret = dlb_eventdev_port_setup(dev, i, &ev_port->conf);
1699                 if (ret < 0) {
1700                         DLB_LOG_ERR("dlb: failed to reconfigure ev_port %d",
1701                                     i);
1702                         return ret;
1703                 }
1704         }
1705
1706         return 0;
1707 }
1708
1709 static int
1710 set_dev_id(const char *key __rte_unused,
1711            const char *value,
1712            void *opaque)
1713 {
1714         int *dev_id = opaque;
1715         int ret;
1716
1717         if (value == NULL || opaque == NULL) {
1718                 DLB_LOG_ERR("NULL pointer\n");
1719                 return -EINVAL;
1720         }
1721
1722         ret = dlb_string_to_int(dev_id, value);
1723         if (ret < 0)
1724                 return ret;
1725
1726         return 0;
1727 }
1728
1729 static int
1730 set_defer_sched(const char *key __rte_unused,
1731                 const char *value,
1732                 void *opaque)
1733 {
1734         int *defer_sched = opaque;
1735
1736         if (value == NULL || opaque == NULL) {
1737                 DLB_LOG_ERR("NULL pointer\n");
1738                 return -EINVAL;
1739         }
1740
1741         if (strncmp(value, "on", 2) != 0) {
1742                 DLB_LOG_ERR("Invalid defer_sched argument \"%s\" (expected \"on\")\n",
1743                             value);
1744                 return -EINVAL;
1745         }
1746
1747         *defer_sched = 1;
1748
1749         return 0;
1750 }
1751
1752 static int
1753 set_num_atm_inflights(const char *key __rte_unused,
1754                       const char *value,
1755                       void *opaque)
1756 {
1757         int *num_atm_inflights = opaque;
1758         int ret;
1759
1760         if (value == NULL || opaque == NULL) {
1761                 DLB_LOG_ERR("NULL pointer\n");
1762                 return -EINVAL;
1763         }
1764
1765         ret = dlb_string_to_int(num_atm_inflights, value);
1766         if (ret < 0)
1767                 return ret;
1768
1769         if (*num_atm_inflights < 0 ||
1770             *num_atm_inflights > DLB_MAX_NUM_ATM_INFLIGHTS) {
1771                 DLB_LOG_ERR("dlb: atm_inflights must be between 0 and %d\n",
1772                             DLB_MAX_NUM_ATM_INFLIGHTS);
1773                 return -EINVAL;
1774         }
1775
1776         return 0;
1777 }
1778
1779 static int
1780 dlb_validate_port_link(struct dlb_eventdev_port *ev_port,
1781                        uint8_t queue_id,
1782                        bool link_exists,
1783                        int index)
1784 {
1785         struct dlb_eventdev *dlb = ev_port->dlb;
1786         struct dlb_eventdev_queue *ev_queue;
1787         bool port_is_dir, queue_is_dir;
1788
1789         if (queue_id > dlb->num_queues) {
1790                 DLB_LOG_ERR("queue_id %d > num queues %d\n",
1791                             queue_id, dlb->num_queues);
1792                 rte_errno = -EINVAL;
1793                 return -1;
1794         }
1795
1796         ev_queue = &dlb->ev_queues[queue_id];
1797
1798         if (!ev_queue->setup_done &&
1799             ev_queue->qm_queue.config_state != DLB_PREV_CONFIGURED) {
1800                 DLB_LOG_ERR("setup not done and not previously configured\n");
1801                 rte_errno = -EINVAL;
1802                 return -1;
1803         }
1804
1805         port_is_dir = ev_port->qm_port.is_directed;
1806         queue_is_dir = ev_queue->qm_queue.is_directed;
1807
1808         if (port_is_dir != queue_is_dir) {
1809                 DLB_LOG_ERR("%s queue %u can't link to %s port %u\n",
1810                             queue_is_dir ? "DIR" : "LDB", ev_queue->id,
1811                             port_is_dir ? "DIR" : "LDB", ev_port->id);
1812
1813                 rte_errno = -EINVAL;
1814                 return -1;
1815         }
1816
1817         /* Check if there is space for the requested link */
1818         if (!link_exists && index == -1) {
1819                 DLB_LOG_ERR("no space for new link\n");
1820                 rte_errno = -ENOSPC;
1821                 return -1;
1822         }
1823
1824         /* Check if the directed port is already linked */
1825         if (ev_port->qm_port.is_directed && ev_port->num_links > 0 &&
1826             !link_exists) {
1827                 DLB_LOG_ERR("Can't link DIR port %d to >1 queues\n",
1828                             ev_port->id);
1829                 rte_errno = -EINVAL;
1830                 return -1;
1831         }
1832
1833         /* Check if the directed queue is already linked */
1834         if (ev_queue->qm_queue.is_directed && ev_queue->num_links > 0 &&
1835             !link_exists) {
1836                 DLB_LOG_ERR("Can't link DIR queue %d to >1 ports\n",
1837                             ev_queue->id);
1838                 rte_errno = -EINVAL;
1839                 return -1;
1840         }
1841
1842         return 0;
1843 }
1844
1845 static int32_t
1846 dlb_hw_create_dir_queue(struct dlb_eventdev *dlb, int32_t qm_port_id)
1847 {
1848         struct dlb_hw_dev *handle = &dlb->qm_instance;
1849         struct dlb_create_dir_queue_args cfg;
1850         struct dlb_cmd_response response;
1851         int32_t ret;
1852
1853         cfg.response = (uintptr_t)&response;
1854
1855         /* The directed port is always configured before its queue */
1856         cfg.port_id = qm_port_id;
1857
1858         ret = dlb_iface_dir_queue_create(handle, &cfg);
1859         if (ret < 0) {
1860                 DLB_LOG_ERR("dlb: create DIR event queue error, ret=%d (driver status: %s)\n",
1861                             ret, dlb_error_strings[response.status]);
1862                 return -EINVAL;
1863         }
1864
1865         return response.id;
1866 }
1867
1868 static int
1869 dlb_eventdev_dir_queue_setup(struct dlb_eventdev *dlb,
1870                              struct dlb_eventdev_queue *ev_queue,
1871                              struct dlb_eventdev_port *ev_port)
1872 {
1873         int32_t qm_qid;
1874
1875         qm_qid = dlb_hw_create_dir_queue(dlb, ev_port->qm_port.id);
1876
1877         if (qm_qid < 0) {
1878                 DLB_LOG_ERR("Failed to create the DIR queue\n");
1879                 return qm_qid;
1880         }
1881
1882         dlb->qm_dir_to_ev_queue_id[qm_qid] = ev_queue->id;
1883
1884         ev_queue->qm_queue.id = qm_qid;
1885
1886         return 0;
1887 }
1888
1889 static int16_t
1890 dlb_hw_map_ldb_qid_to_port(struct dlb_hw_dev *handle,
1891                            uint32_t qm_port_id,
1892                            uint16_t qm_qid,
1893                            uint8_t priority)
1894 {
1895         struct dlb_map_qid_args cfg;
1896         struct dlb_cmd_response response;
1897         int32_t ret;
1898
1899         if (handle == NULL)
1900                 return -EINVAL;
1901
1902         /* Build message */
1903         cfg.response = (uintptr_t)&response;
1904         cfg.port_id = qm_port_id;
1905         cfg.qid = qm_qid;
1906         cfg.priority = EV_TO_DLB_PRIO(priority);
1907
1908         ret = dlb_iface_map_qid(handle, &cfg);
1909         if (ret < 0) {
1910                 DLB_LOG_ERR("dlb: map qid error, ret=%d (driver status: %s)\n",
1911                             ret, dlb_error_strings[response.status]);
1912                 DLB_LOG_ERR("dlb: device_id=%d grp=%d, qm_port=%d, qm_qid=%d prio=%d\n",
1913                             handle->device_id,
1914                             handle->domain_id, cfg.port_id,
1915                             cfg.qid,
1916                             cfg.priority);
1917         } else {
1918                 DLB_LOG_DBG("dlb: mapped queue %d to qm_port %d\n",
1919                             qm_qid, qm_port_id);
1920         }
1921
1922         return ret;
1923 }
1924
1925 static int
1926 dlb_event_queue_join_ldb(struct dlb_eventdev *dlb,
1927                          struct dlb_eventdev_port *ev_port,
1928                          struct dlb_eventdev_queue *ev_queue,
1929                          uint8_t priority)
1930 {
1931         int first_avail = -1;
1932         int ret, i;
1933
1934         for (i = 0; i < DLB_MAX_NUM_QIDS_PER_LDB_CQ; i++) {
1935                 if (ev_port->link[i].valid) {
1936                         if (ev_port->link[i].queue_id == ev_queue->id &&
1937                             ev_port->link[i].priority == priority) {
1938                                 if (ev_port->link[i].mapped)
1939                                         return 0; /* already mapped */
1940                                 first_avail = i;
1941                         }
1942                 } else {
1943                         if (first_avail == -1)
1944                                 first_avail = i;
1945                 }
1946         }
1947         if (first_avail == -1) {
1948                 DLB_LOG_ERR("dlb: qm_port %d has no available QID slots.\n",
1949                             ev_port->qm_port.id);
1950                 return -EINVAL;
1951         }
1952
1953         ret = dlb_hw_map_ldb_qid_to_port(&dlb->qm_instance,
1954                                          ev_port->qm_port.id,
1955                                          ev_queue->qm_queue.id,
1956                                          priority);
1957
1958         if (!ret)
1959                 ev_port->link[first_avail].mapped = true;
1960
1961         return ret;
1962 }
1963
1964 static int
1965 dlb_do_port_link(struct rte_eventdev *dev,
1966                  struct dlb_eventdev_queue *ev_queue,
1967                  struct dlb_eventdev_port *ev_port,
1968                  uint8_t prio)
1969 {
1970         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1971         int err;
1972
1973         /* Don't link until start time. */
1974         if (dlb->run_state == DLB_RUN_STATE_STOPPED)
1975                 return 0;
1976
1977         if (ev_queue->qm_queue.is_directed)
1978                 err = dlb_eventdev_dir_queue_setup(dlb, ev_queue, ev_port);
1979         else
1980                 err = dlb_event_queue_join_ldb(dlb, ev_port, ev_queue, prio);
1981
1982         if (err) {
1983                 DLB_LOG_ERR("port link failure for %s ev_q %d, ev_port %d\n",
1984                             ev_queue->qm_queue.is_directed ? "DIR" : "LDB",
1985                             ev_queue->id, ev_port->id);
1986
1987                 rte_errno = err;
1988                 return -1;
1989         }
1990
1991         return 0;
1992 }
1993
1994 static int
1995 dlb_eventdev_apply_port_links(struct rte_eventdev *dev)
1996 {
1997         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1998         int i;
1999
2000         /* Perform requested port->queue links */
2001         for (i = 0; i < dlb->num_ports; i++) {
2002                 struct dlb_eventdev_port *ev_port = &dlb->ev_ports[i];
2003                 int j;
2004
2005                 for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++) {
2006                         struct dlb_eventdev_queue *ev_queue;
2007                         uint8_t prio, queue_id;
2008
2009                         if (!ev_port->link[j].valid)
2010                                 continue;
2011
2012                         prio = ev_port->link[j].priority;
2013                         queue_id = ev_port->link[j].queue_id;
2014
2015                         if (dlb_validate_port_link(ev_port, queue_id, true, j))
2016                                 return -EINVAL;
2017
2018                         ev_queue = &dlb->ev_queues[queue_id];
2019
2020                         if (dlb_do_port_link(dev, ev_queue, ev_port, prio))
2021                                 return -EINVAL;
2022                 }
2023         }
2024
2025         return 0;
2026 }
2027
2028 static int
2029 dlb_eventdev_port_link(struct rte_eventdev *dev, void *event_port,
2030                        const uint8_t queues[], const uint8_t priorities[],
2031                        uint16_t nb_links)
2032
2033 {
2034         struct dlb_eventdev_port *ev_port = event_port;
2035         struct dlb_eventdev *dlb;
2036         int i, j;
2037
2038         RTE_SET_USED(dev);
2039
2040         if (ev_port == NULL) {
2041                 DLB_LOG_ERR("dlb: evport not setup\n");
2042                 rte_errno = -EINVAL;
2043                 return 0;
2044         }
2045
2046         if (!ev_port->setup_done &&
2047             ev_port->qm_port.config_state != DLB_PREV_CONFIGURED) {
2048                 DLB_LOG_ERR("dlb: evport not setup\n");
2049                 rte_errno = -EINVAL;
2050                 return 0;
2051         }
2052
2053         /* Note: rte_event_port_link() ensures the PMD won't receive a NULL
2054          * queues pointer.
2055          */
2056         if (nb_links == 0) {
2057                 DLB_LOG_DBG("dlb: nb_links is 0\n");
2058                 return 0; /* Ignore and return success */
2059         }
2060
2061         dlb = ev_port->dlb;
2062
2063         DLB_LOG_DBG("Linking %u queues to %s port %d\n",
2064                     nb_links,
2065                     ev_port->qm_port.is_directed ? "DIR" : "LDB",
2066                     ev_port->id);
2067
2068         for (i = 0; i < nb_links; i++) {
2069                 struct dlb_eventdev_queue *ev_queue;
2070                 uint8_t queue_id, prio;
2071                 bool found = false;
2072                 int index = -1;
2073
2074                 queue_id = queues[i];
2075                 prio = priorities[i];
2076
2077                 /* Check if the link already exists. */
2078                 for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++)
2079                         if (ev_port->link[j].valid) {
2080                                 if (ev_port->link[j].queue_id == queue_id) {
2081                                         found = true;
2082                                         index = j;
2083                                         break;
2084                                 }
2085                         } else {
2086                                 if (index == -1)
2087                                         index = j;
2088                         }
2089
2090                 /* could not link */
2091                 if (index == -1)
2092                         break;
2093
2094                 /* Check if already linked at the requested priority */
2095                 if (found && ev_port->link[j].priority == prio)
2096                         continue;
2097
2098                 if (dlb_validate_port_link(ev_port, queue_id, found, index))
2099                         break; /* return index of offending queue */
2100
2101                 ev_queue = &dlb->ev_queues[queue_id];
2102
2103                 if (dlb_do_port_link(dev, ev_queue, ev_port, prio))
2104                         break; /* return index of offending queue */
2105
2106                 ev_queue->num_links++;
2107
2108                 ev_port->link[index].queue_id = queue_id;
2109                 ev_port->link[index].priority = prio;
2110                 ev_port->link[index].valid = true;
2111                 /* Entry already exists?  If so, then must be prio change */
2112                 if (!found)
2113                         ev_port->num_links++;
2114         }
2115         return i;
2116 }
2117
2118 static int
2119 dlb_eventdev_start(struct rte_eventdev *dev)
2120 {
2121         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
2122         struct dlb_hw_dev *handle = &dlb->qm_instance;
2123         struct dlb_start_domain_args cfg;
2124         struct dlb_cmd_response response;
2125         int ret, i;
2126
2127         rte_spinlock_lock(&dlb->qm_instance.resource_lock);
2128         if (dlb->run_state != DLB_RUN_STATE_STOPPED) {
2129                 DLB_LOG_ERR("bad state %d for dev_start\n",
2130                             (int)dlb->run_state);
2131                 rte_spinlock_unlock(&dlb->qm_instance.resource_lock);
2132                 return -EINVAL;
2133         }
2134         dlb->run_state  = DLB_RUN_STATE_STARTING;
2135         rte_spinlock_unlock(&dlb->qm_instance.resource_lock);
2136
2137         /* If the device was configured more than once, some event ports and/or
2138          * queues may need to be reconfigured.
2139          */
2140         ret = dlb_eventdev_reapply_configuration(dev);
2141         if (ret)
2142                 return ret;
2143
2144         /* The DLB PMD delays port links until the device is started. */
2145         ret = dlb_eventdev_apply_port_links(dev);
2146         if (ret)
2147                 return ret;
2148
2149         cfg.response = (uintptr_t)&response;
2150
2151         for (i = 0; i < dlb->num_ports; i++) {
2152                 if (!dlb->ev_ports[i].setup_done) {
2153                         DLB_LOG_ERR("dlb: port %d not setup", i);
2154                         return -ESTALE;
2155                 }
2156         }
2157
2158         for (i = 0; i < dlb->num_queues; i++) {
2159                 if (dlb->ev_queues[i].num_links == 0) {
2160                         DLB_LOG_ERR("dlb: queue %d is not linked", i);
2161                         return -ENOLINK;
2162                 }
2163         }
2164
2165         ret = dlb_iface_sched_domain_start(handle, &cfg);
2166         if (ret < 0) {
2167                 DLB_LOG_ERR("dlb: sched_domain_start ret=%d (driver status: %s)\n",
2168                             ret, dlb_error_strings[response.status]);
2169                 return ret;
2170         }
2171
2172         dlb->run_state = DLB_RUN_STATE_STARTED;
2173         DLB_LOG_DBG("dlb: sched_domain_start completed OK\n");
2174
2175         return 0;
2176 }
2177
2178 static inline int
2179 dlb_check_enqueue_sw_credits(struct dlb_eventdev *dlb,
2180                              struct dlb_eventdev_port *ev_port)
2181 {
2182         uint32_t sw_inflights = __atomic_load_n(&dlb->inflights,
2183                                                 __ATOMIC_SEQ_CST);
2184         const int num = 1;
2185
2186         if (unlikely(ev_port->inflight_max < sw_inflights)) {
2187                 DLB_INC_STAT(ev_port->stats.traffic.tx_nospc_inflight_max, 1);
2188                 rte_errno = -ENOSPC;
2189                 return 1;
2190         }
2191
2192         if (ev_port->inflight_credits < num) {
2193                 /* check if event enqueue brings ev_port over max threshold */
2194                 uint32_t credit_update_quanta = ev_port->credit_update_quanta;
2195
2196                 if (sw_inflights + credit_update_quanta >
2197                     dlb->new_event_limit) {
2198                         DLB_INC_STAT(
2199                                 ev_port->stats.traffic.tx_nospc_new_event_limit,
2200                                 1);
2201                         rte_errno = -ENOSPC;
2202                         return 1;
2203                 }
2204
2205                 __atomic_fetch_add(&dlb->inflights, credit_update_quanta,
2206                                    __ATOMIC_SEQ_CST);
2207                 ev_port->inflight_credits += (credit_update_quanta);
2208
2209                 if (ev_port->inflight_credits < num) {
2210                         DLB_INC_STAT(
2211                             ev_port->stats.traffic.tx_nospc_inflight_credits,
2212                             1);
2213                         rte_errno = -ENOSPC;
2214                         return 1;
2215                 }
2216         }
2217
2218         return 0;
2219 }
2220
2221 static inline void
2222 dlb_replenish_sw_credits(struct dlb_eventdev *dlb,
2223                          struct dlb_eventdev_port *ev_port)
2224 {
2225         uint16_t quanta = ev_port->credit_update_quanta;
2226
2227         if (ev_port->inflight_credits >= quanta * 2) {
2228                 /* Replenish credits, saving one quanta for enqueues */
2229                 uint16_t val = ev_port->inflight_credits - quanta;
2230
2231                 __atomic_fetch_sub(&dlb->inflights, val, __ATOMIC_SEQ_CST);
2232                 ev_port->inflight_credits -= val;
2233         }
2234 }
2235
2236 static __rte_always_inline uint16_t
2237 dlb_read_pc(struct process_local_port_data *port_data, bool ldb)
2238 {
2239         volatile uint16_t *popcount;
2240
2241         if (ldb)
2242                 popcount = port_data->ldb_popcount;
2243         else
2244                 popcount = port_data->dir_popcount;
2245
2246         return *popcount;
2247 }
2248
2249 static inline int
2250 dlb_check_enqueue_hw_ldb_credits(struct dlb_port *qm_port,
2251                                  struct process_local_port_data *port_data)
2252 {
2253         if (unlikely(qm_port->cached_ldb_credits == 0)) {
2254                 uint16_t pc;
2255
2256                 pc = dlb_read_pc(port_data, true);
2257
2258                 qm_port->cached_ldb_credits = pc -
2259                         qm_port->ldb_pushcount_at_credit_expiry;
2260                 if (unlikely(qm_port->cached_ldb_credits == 0)) {
2261                         DLB_INC_STAT(
2262                         qm_port->ev_port->stats.traffic.tx_nospc_ldb_hw_credits,
2263                         1);
2264
2265                         DLB_LOG_DBG("ldb credits exhausted\n");
2266                         return 1;
2267                 }
2268                 qm_port->ldb_pushcount_at_credit_expiry +=
2269                         qm_port->cached_ldb_credits;
2270         }
2271
2272         return 0;
2273 }
2274
2275 static inline int
2276 dlb_check_enqueue_hw_dir_credits(struct dlb_port *qm_port,
2277                                  struct process_local_port_data *port_data)
2278 {
2279         if (unlikely(qm_port->cached_dir_credits == 0)) {
2280                 uint16_t pc;
2281
2282                 pc = dlb_read_pc(port_data, false);
2283
2284                 qm_port->cached_dir_credits = pc -
2285                         qm_port->dir_pushcount_at_credit_expiry;
2286
2287                 if (unlikely(qm_port->cached_dir_credits == 0)) {
2288                         DLB_INC_STAT(
2289                         qm_port->ev_port->stats.traffic.tx_nospc_dir_hw_credits,
2290                         1);
2291
2292                         DLB_LOG_DBG("dir credits exhausted\n");
2293                         return 1;
2294                 }
2295                 qm_port->dir_pushcount_at_credit_expiry +=
2296                         qm_port->cached_dir_credits;
2297         }
2298
2299         return 0;
2300 }
2301
2302 static inline int
2303 dlb_event_enqueue_prep(struct dlb_eventdev_port *ev_port,
2304                        struct dlb_port *qm_port,
2305                        const struct rte_event ev[],
2306                        struct process_local_port_data *port_data,
2307                        uint8_t *sched_type,
2308                        uint8_t *queue_id)
2309 {
2310         struct dlb_eventdev *dlb = ev_port->dlb;
2311         struct dlb_eventdev_queue *ev_queue;
2312         uint16_t *cached_credits = NULL;
2313         struct dlb_queue *qm_queue;
2314
2315         ev_queue = &dlb->ev_queues[ev->queue_id];
2316         qm_queue = &ev_queue->qm_queue;
2317         *queue_id = qm_queue->id;
2318
2319         /* Ignore sched_type and hardware credits on release events */
2320         if (ev->op == RTE_EVENT_OP_RELEASE)
2321                 goto op_check;
2322
2323         if (!qm_queue->is_directed) {
2324                 /* Load balanced destination queue */
2325
2326                 if (dlb_check_enqueue_hw_ldb_credits(qm_port, port_data)) {
2327                         rte_errno = -ENOSPC;
2328                         return 1;
2329                 }
2330                 cached_credits = &qm_port->cached_ldb_credits;
2331
2332                 switch (ev->sched_type) {
2333                 case RTE_SCHED_TYPE_ORDERED:
2334                         DLB_LOG_DBG("dlb: put_qe: RTE_SCHED_TYPE_ORDERED\n");
2335                         if (qm_queue->sched_type != RTE_SCHED_TYPE_ORDERED) {
2336                                 DLB_LOG_ERR("dlb: tried to send ordered event to unordered queue %d\n",
2337                                             *queue_id);
2338                                 rte_errno = -EINVAL;
2339                                 return 1;
2340                         }
2341                         *sched_type = DLB_SCHED_ORDERED;
2342                         break;
2343                 case RTE_SCHED_TYPE_ATOMIC:
2344                         DLB_LOG_DBG("dlb: put_qe: RTE_SCHED_TYPE_ATOMIC\n");
2345                         *sched_type = DLB_SCHED_ATOMIC;
2346                         break;
2347                 case RTE_SCHED_TYPE_PARALLEL:
2348                         DLB_LOG_DBG("dlb: put_qe: RTE_SCHED_TYPE_PARALLEL\n");
2349                         if (qm_queue->sched_type == RTE_SCHED_TYPE_ORDERED)
2350                                 *sched_type = DLB_SCHED_ORDERED;
2351                         else
2352                                 *sched_type = DLB_SCHED_UNORDERED;
2353                         break;
2354                 default:
2355                         DLB_LOG_ERR("Unsupported LDB sched type in put_qe\n");
2356                         DLB_INC_STAT(ev_port->stats.tx_invalid, 1);
2357                         rte_errno = -EINVAL;
2358                         return 1;
2359                 }
2360         } else {
2361                 /* Directed destination queue */
2362
2363                 if (dlb_check_enqueue_hw_dir_credits(qm_port, port_data)) {
2364                         rte_errno = -ENOSPC;
2365                         return 1;
2366                 }
2367                 cached_credits = &qm_port->cached_dir_credits;
2368
2369                 DLB_LOG_DBG("dlb: put_qe: RTE_SCHED_TYPE_DIRECTED\n");
2370
2371                 *sched_type = DLB_SCHED_DIRECTED;
2372         }
2373
2374 op_check:
2375         switch (ev->op) {
2376         case RTE_EVENT_OP_NEW:
2377                 /* Check that a sw credit is available */
2378                 if (dlb_check_enqueue_sw_credits(dlb, ev_port)) {
2379                         rte_errno = -ENOSPC;
2380                         return 1;
2381                 }
2382                 ev_port->inflight_credits--;
2383                 (*cached_credits)--;
2384                 break;
2385         case RTE_EVENT_OP_FORWARD:
2386                 /* Check for outstanding_releases underflow. If this occurs,
2387                  * the application is not using the EVENT_OPs correctly; for
2388                  * example, forwarding or releasing events that were not
2389                  * dequeued.
2390                  */
2391                 RTE_ASSERT(ev_port->outstanding_releases > 0);
2392                 ev_port->outstanding_releases--;
2393                 qm_port->issued_releases++;
2394                 (*cached_credits)--;
2395                 break;
2396         case RTE_EVENT_OP_RELEASE:
2397                 ev_port->inflight_credits++;
2398                 /* Check for outstanding_releases underflow. If this occurs,
2399                  * the application is not using the EVENT_OPs correctly; for
2400                  * example, forwarding or releasing events that were not
2401                  * dequeued.
2402                  */
2403                 RTE_ASSERT(ev_port->outstanding_releases > 0);
2404                 ev_port->outstanding_releases--;
2405                 qm_port->issued_releases++;
2406                 /* Replenish s/w credits if enough are cached */
2407                 dlb_replenish_sw_credits(dlb, ev_port);
2408                 break;
2409         }
2410
2411         DLB_INC_STAT(ev_port->stats.tx_op_cnt[ev->op], 1);
2412         DLB_INC_STAT(ev_port->stats.traffic.tx_ok, 1);
2413
2414 #ifndef RTE_LIBRTE_PMD_DLB_QUELL_STATS
2415         if (ev->op != RTE_EVENT_OP_RELEASE) {
2416                 DLB_INC_STAT(ev_port->stats.enq_ok[ev->queue_id], 1);
2417                 DLB_INC_STAT(ev_port->stats.tx_sched_cnt[*sched_type], 1);
2418         }
2419 #endif
2420
2421         return 0;
2422 }
2423
2424 static uint8_t cmd_byte_map[NUM_DLB_PORT_TYPES][DLB_NUM_HW_SCHED_TYPES] = {
2425         {
2426                 /* Load-balanced cmd bytes */
2427                 [RTE_EVENT_OP_NEW] = DLB_NEW_CMD_BYTE,
2428                 [RTE_EVENT_OP_FORWARD] = DLB_FWD_CMD_BYTE,
2429                 [RTE_EVENT_OP_RELEASE] = DLB_COMP_CMD_BYTE,
2430         },
2431         {
2432                 /* Directed cmd bytes */
2433                 [RTE_EVENT_OP_NEW] = DLB_NEW_CMD_BYTE,
2434                 [RTE_EVENT_OP_FORWARD] = DLB_NEW_CMD_BYTE,
2435                 [RTE_EVENT_OP_RELEASE] = DLB_NOOP_CMD_BYTE,
2436         },
2437 };
2438
2439 static inline void
2440 dlb_event_build_hcws(struct dlb_port *qm_port,
2441                      const struct rte_event ev[],
2442                      int num,
2443                      uint8_t *sched_type,
2444                      uint8_t *queue_id)
2445 {
2446         struct dlb_enqueue_qe *qe;
2447         uint16_t sched_word[4];
2448         __m128i sse_qe[2];
2449         int i;
2450
2451         qe = qm_port->qe4;
2452
2453         sse_qe[0] = _mm_setzero_si128();
2454         sse_qe[1] = _mm_setzero_si128();
2455
2456         switch (num) {
2457         case 4:
2458                 /* Construct the metadata portion of two HCWs in one 128b SSE
2459                  * register. HCW metadata is constructed in the SSE registers
2460                  * like so:
2461                  * sse_qe[0][63:0]:   qe[0]'s metadata
2462                  * sse_qe[0][127:64]: qe[1]'s metadata
2463                  * sse_qe[1][63:0]:   qe[2]'s metadata
2464                  * sse_qe[1][127:64]: qe[3]'s metadata
2465                  */
2466
2467                 /* Convert the event operation into a command byte and store it
2468                  * in the metadata:
2469                  * sse_qe[0][63:56]   = cmd_byte_map[is_directed][ev[0].op]
2470                  * sse_qe[0][127:120] = cmd_byte_map[is_directed][ev[1].op]
2471                  * sse_qe[1][63:56]   = cmd_byte_map[is_directed][ev[2].op]
2472                  * sse_qe[1][127:120] = cmd_byte_map[is_directed][ev[3].op]
2473                  */
2474 #define DLB_QE_CMD_BYTE 7
2475                 sse_qe[0] = _mm_insert_epi8(sse_qe[0],
2476                                 cmd_byte_map[qm_port->is_directed][ev[0].op],
2477                                 DLB_QE_CMD_BYTE);
2478                 sse_qe[0] = _mm_insert_epi8(sse_qe[0],
2479                                 cmd_byte_map[qm_port->is_directed][ev[1].op],
2480                                 DLB_QE_CMD_BYTE + 8);
2481                 sse_qe[1] = _mm_insert_epi8(sse_qe[1],
2482                                 cmd_byte_map[qm_port->is_directed][ev[2].op],
2483                                 DLB_QE_CMD_BYTE);
2484                 sse_qe[1] = _mm_insert_epi8(sse_qe[1],
2485                                 cmd_byte_map[qm_port->is_directed][ev[3].op],
2486                                 DLB_QE_CMD_BYTE + 8);
2487
2488                 /* Store priority, scheduling type, and queue ID in the sched
2489                  * word array because these values are re-used when the
2490                  * destination is a directed queue.
2491                  */
2492                 sched_word[0] = EV_TO_DLB_PRIO(ev[0].priority) << 10 |
2493                                 sched_type[0] << 8 |
2494                                 queue_id[0];
2495                 sched_word[1] = EV_TO_DLB_PRIO(ev[1].priority) << 10 |
2496                                 sched_type[1] << 8 |
2497                                 queue_id[1];
2498                 sched_word[2] = EV_TO_DLB_PRIO(ev[2].priority) << 10 |
2499                                 sched_type[2] << 8 |
2500                                 queue_id[2];
2501                 sched_word[3] = EV_TO_DLB_PRIO(ev[3].priority) << 10 |
2502                                 sched_type[3] << 8 |
2503                                 queue_id[3];
2504
2505                 /* Store the event priority, scheduling type, and queue ID in
2506                  * the metadata:
2507                  * sse_qe[0][31:16] = sched_word[0]
2508                  * sse_qe[0][95:80] = sched_word[1]
2509                  * sse_qe[1][31:16] = sched_word[2]
2510                  * sse_qe[1][95:80] = sched_word[3]
2511                  */
2512 #define DLB_QE_QID_SCHED_WORD 1
2513                 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2514                                              sched_word[0],
2515                                              DLB_QE_QID_SCHED_WORD);
2516                 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2517                                              sched_word[1],
2518                                              DLB_QE_QID_SCHED_WORD + 4);
2519                 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2520                                              sched_word[2],
2521                                              DLB_QE_QID_SCHED_WORD);
2522                 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2523                                              sched_word[3],
2524                                              DLB_QE_QID_SCHED_WORD + 4);
2525
2526                 /* If the destination is a load-balanced queue, store the lock
2527                  * ID. If it is a directed queue, DLB places this field in
2528                  * bytes 10-11 of the received QE, so we format it accordingly:
2529                  * sse_qe[0][47:32]  = dir queue ? sched_word[0] : flow_id[0]
2530                  * sse_qe[0][111:96] = dir queue ? sched_word[1] : flow_id[1]
2531                  * sse_qe[1][47:32]  = dir queue ? sched_word[2] : flow_id[2]
2532                  * sse_qe[1][111:96] = dir queue ? sched_word[3] : flow_id[3]
2533                  */
2534 #define DLB_QE_LOCK_ID_WORD 2
2535                 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2536                                 (sched_type[0] == DLB_SCHED_DIRECTED) ?
2537                                         sched_word[0] : ev[0].flow_id,
2538                                 DLB_QE_LOCK_ID_WORD);
2539                 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2540                                 (sched_type[1] == DLB_SCHED_DIRECTED) ?
2541                                         sched_word[1] : ev[1].flow_id,
2542                                 DLB_QE_LOCK_ID_WORD + 4);
2543                 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2544                                 (sched_type[2] == DLB_SCHED_DIRECTED) ?
2545                                         sched_word[2] : ev[2].flow_id,
2546                                 DLB_QE_LOCK_ID_WORD);
2547                 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2548                                 (sched_type[3] == DLB_SCHED_DIRECTED) ?
2549                                         sched_word[3] : ev[3].flow_id,
2550                                 DLB_QE_LOCK_ID_WORD + 4);
2551
2552                 /* Store the event type and sub event type in the metadata:
2553                  * sse_qe[0][15:0]  = flow_id[0]
2554                  * sse_qe[0][79:64] = flow_id[1]
2555                  * sse_qe[1][15:0]  = flow_id[2]
2556                  * sse_qe[1][79:64] = flow_id[3]
2557                  */
2558 #define DLB_QE_EV_TYPE_WORD 0
2559                 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2560                                              ev[0].sub_event_type << 8 |
2561                                                 ev[0].event_type,
2562                                              DLB_QE_EV_TYPE_WORD);
2563                 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2564                                              ev[1].sub_event_type << 8 |
2565                                                 ev[1].event_type,
2566                                              DLB_QE_EV_TYPE_WORD + 4);
2567                 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2568                                              ev[2].sub_event_type << 8 |
2569                                                 ev[2].event_type,
2570                                              DLB_QE_EV_TYPE_WORD);
2571                 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2572                                              ev[3].sub_event_type << 8 |
2573                                                 ev[3].event_type,
2574                                              DLB_QE_EV_TYPE_WORD + 4);
2575
2576                 /* Store the metadata to memory (use the double-precision
2577                  * _mm_storeh_pd because there is no integer function for
2578                  * storing the upper 64b):
2579                  * qe[0] metadata = sse_qe[0][63:0]
2580                  * qe[1] metadata = sse_qe[0][127:64]
2581                  * qe[2] metadata = sse_qe[1][63:0]
2582                  * qe[3] metadata = sse_qe[1][127:64]
2583                  */
2584                 _mm_storel_epi64((__m128i *)&qe[0].u.opaque_data, sse_qe[0]);
2585                 _mm_storeh_pd((double *)&qe[1].u.opaque_data,
2586                               (__m128d) sse_qe[0]);
2587                 _mm_storel_epi64((__m128i *)&qe[2].u.opaque_data, sse_qe[1]);
2588                 _mm_storeh_pd((double *)&qe[3].u.opaque_data,
2589                               (__m128d) sse_qe[1]);
2590
2591                 qe[0].data = ev[0].u64;
2592                 qe[1].data = ev[1].u64;
2593                 qe[2].data = ev[2].u64;
2594                 qe[3].data = ev[3].u64;
2595
2596                 break;
2597         case 3:
2598         case 2:
2599         case 1:
2600                 for (i = 0; i < num; i++) {
2601                         qe[i].cmd_byte =
2602                                 cmd_byte_map[qm_port->is_directed][ev[i].op];
2603                         qe[i].sched_type = sched_type[i];
2604                         qe[i].data = ev[i].u64;
2605                         qe[i].qid = queue_id[i];
2606                         qe[i].priority = EV_TO_DLB_PRIO(ev[i].priority);
2607                         qe[i].lock_id = ev[i].flow_id;
2608                         if (sched_type[i] == DLB_SCHED_DIRECTED) {
2609                                 struct dlb_msg_info *info =
2610                                         (struct dlb_msg_info *)&qe[i].lock_id;
2611
2612                                 info->qid = queue_id[i];
2613                                 info->sched_type = DLB_SCHED_DIRECTED;
2614                                 info->priority = qe[i].priority;
2615                         }
2616                         qe[i].u.event_type.major = ev[i].event_type;
2617                         qe[i].u.event_type.sub = ev[i].sub_event_type;
2618                 }
2619                 break;
2620         case 0:
2621                 break;
2622         }
2623 }
2624
2625 static inline void
2626 dlb_construct_token_pop_qe(struct dlb_port *qm_port, int idx)
2627 {
2628         struct dlb_cq_pop_qe *qe = (void *)qm_port->qe4;
2629         int num = qm_port->owed_tokens;
2630
2631         if (qm_port->use_rsvd_token_scheme) {
2632                 /* Check if there's a deficit of reserved tokens, and return
2633                  * early if there are no (unreserved) tokens to consume.
2634                  */
2635                 if (num <= qm_port->cq_rsvd_token_deficit) {
2636                         qm_port->cq_rsvd_token_deficit -= num;
2637                         qm_port->owed_tokens = 0;
2638                         return;
2639                 }
2640                 num -= qm_port->cq_rsvd_token_deficit;
2641                 qm_port->cq_rsvd_token_deficit = 0;
2642         }
2643
2644         qe[idx].cmd_byte = DLB_POP_CMD_BYTE;
2645         qe[idx].tokens = num - 1;
2646         qm_port->owed_tokens = 0;
2647 }
2648
2649 static __rte_always_inline void
2650 dlb_pp_write(struct dlb_enqueue_qe *qe4,
2651              struct process_local_port_data *port_data)
2652 {
2653         dlb_movdir64b(port_data->pp_addr, qe4);
2654 }
2655
2656 static inline void
2657 dlb_hw_do_enqueue(struct dlb_port *qm_port,
2658                   bool do_sfence,
2659                   struct process_local_port_data *port_data)
2660 {
2661         DLB_LOG_DBG("dlb: Flushing QE(s) to DLB\n");
2662
2663         /* Since MOVDIR64B is weakly-ordered, use an SFENCE to ensure that
2664          * application writes complete before enqueueing the release HCW.
2665          */
2666         if (do_sfence)
2667                 rte_wmb();
2668
2669         dlb_pp_write(qm_port->qe4, port_data);
2670 }
2671
2672 static inline int
2673 dlb_consume_qe_immediate(struct dlb_port *qm_port, int num)
2674 {
2675         struct process_local_port_data *port_data;
2676         struct dlb_cq_pop_qe *qe;
2677
2678         RTE_ASSERT(qm_port->config_state == DLB_CONFIGURED);
2679
2680         if (qm_port->use_rsvd_token_scheme) {
2681                 /* Check if there's a deficit of reserved tokens, and return
2682                  * early if there are no (unreserved) tokens to consume.
2683                  */
2684                 if (num <= qm_port->cq_rsvd_token_deficit) {
2685                         qm_port->cq_rsvd_token_deficit -= num;
2686                         qm_port->owed_tokens = 0;
2687                         return 0;
2688                 }
2689                 num -= qm_port->cq_rsvd_token_deficit;
2690                 qm_port->cq_rsvd_token_deficit = 0;
2691         }
2692
2693         qe = qm_port->consume_qe;
2694
2695         qe->tokens = num - 1;
2696         qe->int_arm = 0;
2697
2698         /* No store fence needed since no pointer is being sent, and CQ token
2699          * pops can be safely reordered with other HCWs.
2700          */
2701         port_data = &dlb_port[qm_port->id][PORT_TYPE(qm_port)];
2702
2703         dlb_movntdq_single(port_data->pp_addr, qe);
2704
2705         DLB_LOG_DBG("dlb: consume immediate - %d QEs\n", num);
2706
2707         qm_port->owed_tokens = 0;
2708
2709         return 0;
2710 }
2711
2712 static inline uint16_t
2713 __dlb_event_enqueue_burst(void *event_port,
2714                           const struct rte_event events[],
2715                           uint16_t num,
2716                           bool use_delayed)
2717 {
2718         struct dlb_eventdev_port *ev_port = event_port;
2719         struct dlb_port *qm_port = &ev_port->qm_port;
2720         struct process_local_port_data *port_data;
2721         int i;
2722
2723         RTE_ASSERT(ev_port->enq_configured);
2724         RTE_ASSERT(events != NULL);
2725
2726         rte_errno = 0;
2727         i = 0;
2728
2729         port_data = &dlb_port[qm_port->id][PORT_TYPE(qm_port)];
2730
2731         while (i < num) {
2732                 uint8_t sched_types[DLB_NUM_QES_PER_CACHE_LINE];
2733                 uint8_t queue_ids[DLB_NUM_QES_PER_CACHE_LINE];
2734                 int pop_offs = 0;
2735                 int j = 0;
2736
2737                 memset(qm_port->qe4,
2738                        0,
2739                        DLB_NUM_QES_PER_CACHE_LINE *
2740                        sizeof(struct dlb_enqueue_qe));
2741
2742                 for (; j < DLB_NUM_QES_PER_CACHE_LINE && (i + j) < num; j++) {
2743                         const struct rte_event *ev = &events[i + j];
2744                         int16_t thresh = qm_port->token_pop_thresh;
2745
2746                         if (use_delayed &&
2747                             qm_port->token_pop_mode == DELAYED_POP &&
2748                             (ev->op == RTE_EVENT_OP_FORWARD ||
2749                              ev->op == RTE_EVENT_OP_RELEASE) &&
2750                             qm_port->issued_releases >= thresh - 1) {
2751                                 /* Insert the token pop QE and break out. This
2752                                  * may result in a partial HCW, but that is
2753                                  * simpler than supporting arbitrary QE
2754                                  * insertion.
2755                                  */
2756                                 dlb_construct_token_pop_qe(qm_port, j);
2757
2758                                 /* Reset the releases for the next QE batch */
2759                                 qm_port->issued_releases -= thresh;
2760
2761                                 /* When using delayed token pop mode, the
2762                                  * initial token threshold is the full CQ
2763                                  * depth. After the first token pop, we need to
2764                                  * reset it to the dequeue_depth.
2765                                  */
2766                                 qm_port->token_pop_thresh =
2767                                         qm_port->dequeue_depth;
2768
2769                                 pop_offs = 1;
2770                                 j++;
2771                                 break;
2772                         }
2773
2774                         if (dlb_event_enqueue_prep(ev_port, qm_port, ev,
2775                                                    port_data, &sched_types[j],
2776                                                    &queue_ids[j]))
2777                                 break;
2778                 }
2779
2780                 if (j == 0)
2781                         break;
2782
2783                 dlb_event_build_hcws(qm_port, &events[i], j - pop_offs,
2784                                      sched_types, queue_ids);
2785
2786                 dlb_hw_do_enqueue(qm_port, i == 0, port_data);
2787
2788                 /* Don't include the token pop QE in the enqueue count */
2789                 i += j - pop_offs;
2790
2791                 /* Don't interpret j < DLB_NUM_... as out-of-credits if
2792                  * pop_offs != 0
2793                  */
2794                 if (j < DLB_NUM_QES_PER_CACHE_LINE && pop_offs == 0)
2795                         break;
2796         }
2797
2798         RTE_ASSERT(!((i == 0 && rte_errno != -ENOSPC)));
2799
2800         return i;
2801 }
2802
2803 static inline uint16_t
2804 dlb_event_enqueue_burst(void *event_port,
2805                         const struct rte_event events[],
2806                         uint16_t num)
2807 {
2808         return __dlb_event_enqueue_burst(event_port, events, num, false);
2809 }
2810
2811 static inline uint16_t
2812 dlb_event_enqueue_burst_delayed(void *event_port,
2813                                 const struct rte_event events[],
2814                                 uint16_t num)
2815 {
2816         return __dlb_event_enqueue_burst(event_port, events, num, true);
2817 }
2818
2819 static inline uint16_t
2820 dlb_event_enqueue(void *event_port,
2821                   const struct rte_event events[])
2822 {
2823         return __dlb_event_enqueue_burst(event_port, events, 1, false);
2824 }
2825
2826 static inline uint16_t
2827 dlb_event_enqueue_delayed(void *event_port,
2828                           const struct rte_event events[])
2829 {
2830         return __dlb_event_enqueue_burst(event_port, events, 1, true);
2831 }
2832
2833 static uint16_t
2834 dlb_event_enqueue_new_burst(void *event_port,
2835                             const struct rte_event events[],
2836                             uint16_t num)
2837 {
2838         return __dlb_event_enqueue_burst(event_port, events, num, false);
2839 }
2840
2841 static uint16_t
2842 dlb_event_enqueue_new_burst_delayed(void *event_port,
2843                                     const struct rte_event events[],
2844                                     uint16_t num)
2845 {
2846         return __dlb_event_enqueue_burst(event_port, events, num, true);
2847 }
2848
2849 static uint16_t
2850 dlb_event_enqueue_forward_burst(void *event_port,
2851                                 const struct rte_event events[],
2852                                 uint16_t num)
2853 {
2854         return __dlb_event_enqueue_burst(event_port, events, num, false);
2855 }
2856
2857 static uint16_t
2858 dlb_event_enqueue_forward_burst_delayed(void *event_port,
2859                                         const struct rte_event events[],
2860                                         uint16_t num)
2861 {
2862         return __dlb_event_enqueue_burst(event_port, events, num, true);
2863 }
2864
2865 static __rte_always_inline int
2866 dlb_recv_qe(struct dlb_port *qm_port, struct dlb_dequeue_qe *qe,
2867             uint8_t *offset)
2868 {
2869         uint8_t xor_mask[2][4] = { {0x0F, 0x0E, 0x0C, 0x08},
2870                                    {0x00, 0x01, 0x03, 0x07} };
2871         uint8_t and_mask[4] = {0x0F, 0x0E, 0x0C, 0x08};
2872         volatile struct dlb_dequeue_qe *cq_addr;
2873         __m128i *qes = (__m128i *)qe;
2874         uint64_t *cache_line_base;
2875         uint8_t gen_bits;
2876
2877         cq_addr = dlb_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
2878         cq_addr = &cq_addr[qm_port->cq_idx];
2879
2880         cache_line_base = (void *)(((uintptr_t)cq_addr) & ~0x3F);
2881         *offset = ((uintptr_t)cq_addr & 0x30) >> 4;
2882
2883         /* Load the next CQ cache line from memory. Pack these reads as tight
2884          * as possible to reduce the chance that DLB invalidates the line while
2885          * the CPU is reading it. Read the cache line backwards to ensure that
2886          * if QE[N] (N > 0) is valid, then QEs[0:N-1] are too.
2887          *
2888          * (Valid QEs start at &qe[offset])
2889          */
2890         qes[3] = _mm_load_si128((__m128i *)&cache_line_base[6]);
2891         qes[2] = _mm_load_si128((__m128i *)&cache_line_base[4]);
2892         qes[1] = _mm_load_si128((__m128i *)&cache_line_base[2]);
2893         qes[0] = _mm_load_si128((__m128i *)&cache_line_base[0]);
2894
2895         /* Evict the cache line ASAP */
2896         rte_cldemote(cache_line_base);
2897
2898         /* Extract and combine the gen bits */
2899         gen_bits = ((_mm_extract_epi8(qes[0], 15) & 0x1) << 0) |
2900                    ((_mm_extract_epi8(qes[1], 15) & 0x1) << 1) |
2901                    ((_mm_extract_epi8(qes[2], 15) & 0x1) << 2) |
2902                    ((_mm_extract_epi8(qes[3], 15) & 0x1) << 3);
2903
2904         /* XOR the combined bits such that a 1 represents a valid QE */
2905         gen_bits ^= xor_mask[qm_port->gen_bit][*offset];
2906
2907         /* Mask off gen bits we don't care about */
2908         gen_bits &= and_mask[*offset];
2909
2910         return __builtin_popcount(gen_bits);
2911 }
2912
2913 static inline void
2914 dlb_inc_cq_idx(struct dlb_port *qm_port, int cnt)
2915 {
2916         uint16_t idx = qm_port->cq_idx_unmasked + cnt;
2917
2918         qm_port->cq_idx_unmasked = idx;
2919         qm_port->cq_idx = idx & qm_port->cq_depth_mask;
2920         qm_port->gen_bit = (~(idx >> qm_port->gen_bit_shift)) & 0x1;
2921 }
2922
2923 static inline int
2924 dlb_process_dequeue_qes(struct dlb_eventdev_port *ev_port,
2925                         struct dlb_port *qm_port,
2926                         struct rte_event *events,
2927                         struct dlb_dequeue_qe *qes,
2928                         int cnt)
2929 {
2930         uint8_t *qid_mappings = qm_port->qid_mappings;
2931         int i, num;
2932
2933         RTE_SET_USED(ev_port);  /* avoids unused variable error */
2934
2935         for (i = 0, num = 0; i < cnt; i++) {
2936                 struct dlb_dequeue_qe *qe = &qes[i];
2937                 int sched_type_map[4] = {
2938                         [DLB_SCHED_ATOMIC] = RTE_SCHED_TYPE_ATOMIC,
2939                         [DLB_SCHED_UNORDERED] = RTE_SCHED_TYPE_PARALLEL,
2940                         [DLB_SCHED_ORDERED] = RTE_SCHED_TYPE_ORDERED,
2941                         [DLB_SCHED_DIRECTED] = RTE_SCHED_TYPE_ATOMIC,
2942                 };
2943
2944                 DLB_LOG_DBG("dequeue success, data = 0x%llx, qid=%d, event_type=%d, subevent=%d\npp_id = %d, sched_type = %d, qid = %d, err=%d\n",
2945                             (long long)qe->data, qe->qid,
2946                             qe->u.event_type.major,
2947                             qe->u.event_type.sub,
2948                             qe->pp_id, qe->sched_type, qe->qid, qe->error);
2949
2950                 /* Fill in event information.
2951                  * Note that flow_id must be embedded in the data by
2952                  * the app, such as the mbuf RSS hash field if the data
2953                  * buffer is a mbuf.
2954                  */
2955                 if (unlikely(qe->error)) {
2956                         DLB_LOG_ERR("QE error bit ON\n");
2957                         DLB_INC_STAT(ev_port->stats.traffic.rx_drop, 1);
2958                         dlb_consume_qe_immediate(qm_port, 1);
2959                         continue; /* Ignore */
2960                 }
2961
2962                 events[num].u64 = qe->data;
2963                 events[num].queue_id = qid_mappings[qe->qid];
2964                 events[num].priority = DLB_TO_EV_PRIO((uint8_t)qe->priority);
2965                 events[num].event_type = qe->u.event_type.major;
2966                 events[num].sub_event_type = qe->u.event_type.sub;
2967                 events[num].sched_type = sched_type_map[qe->sched_type];
2968                 DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qe->sched_type], 1);
2969                 num++;
2970         }
2971         DLB_INC_STAT(ev_port->stats.traffic.rx_ok, num);
2972
2973         return num;
2974 }
2975
2976 static inline int
2977 dlb_process_dequeue_four_qes(struct dlb_eventdev_port *ev_port,
2978                              struct dlb_port *qm_port,
2979                              struct rte_event *events,
2980                              struct dlb_dequeue_qe *qes)
2981 {
2982         int sched_type_map[] = {
2983                 [DLB_SCHED_ATOMIC] = RTE_SCHED_TYPE_ATOMIC,
2984                 [DLB_SCHED_UNORDERED] = RTE_SCHED_TYPE_PARALLEL,
2985                 [DLB_SCHED_ORDERED] = RTE_SCHED_TYPE_ORDERED,
2986                 [DLB_SCHED_DIRECTED] = RTE_SCHED_TYPE_ATOMIC,
2987         };
2988         const int num_events = DLB_NUM_QES_PER_CACHE_LINE;
2989         uint8_t *qid_mappings = qm_port->qid_mappings;
2990         __m128i sse_evt[2];
2991         int i;
2992
2993         /* In the unlikely case that any of the QE error bits are set, process
2994          * them one at a time.
2995          */
2996         if (unlikely(qes[0].error || qes[1].error ||
2997                      qes[2].error || qes[3].error))
2998                 return dlb_process_dequeue_qes(ev_port, qm_port, events,
2999                                                qes, num_events);
3000
3001         for (i = 0; i < DLB_NUM_QES_PER_CACHE_LINE; i++) {
3002                 DLB_LOG_DBG("dequeue success, data = 0x%llx, qid=%d, event_type=%d, subevent=%d\npp_id = %d, sched_type = %d, qid = %d, err=%d\n",
3003                             (long long)qes[i].data, qes[i].qid,
3004                             qes[i].u.event_type.major,
3005                             qes[i].u.event_type.sub,
3006                             qes[i].pp_id, qes[i].sched_type, qes[i].qid,
3007                             qes[i].error);
3008         }
3009
3010         events[0].u64 = qes[0].data;
3011         events[1].u64 = qes[1].data;
3012         events[2].u64 = qes[2].data;
3013         events[3].u64 = qes[3].data;
3014
3015         /* Construct the metadata portion of two struct rte_events
3016          * in one 128b SSE register. Event metadata is constructed in the SSE
3017          * registers like so:
3018          * sse_evt[0][63:0]:   event[0]'s metadata
3019          * sse_evt[0][127:64]: event[1]'s metadata
3020          * sse_evt[1][63:0]:   event[2]'s metadata
3021          * sse_evt[1][127:64]: event[3]'s metadata
3022          */
3023         sse_evt[0] = _mm_setzero_si128();
3024         sse_evt[1] = _mm_setzero_si128();
3025
3026         /* Convert the hardware queue ID to an event queue ID and store it in
3027          * the metadata:
3028          * sse_evt[0][47:40]   = qid_mappings[qes[0].qid]
3029          * sse_evt[0][111:104] = qid_mappings[qes[1].qid]
3030          * sse_evt[1][47:40]   = qid_mappings[qes[2].qid]
3031          * sse_evt[1][111:104] = qid_mappings[qes[3].qid]
3032          */
3033 #define DLB_EVENT_QUEUE_ID_BYTE 5
3034         sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3035                                      qid_mappings[qes[0].qid],
3036                                      DLB_EVENT_QUEUE_ID_BYTE);
3037         sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3038                                      qid_mappings[qes[1].qid],
3039                                      DLB_EVENT_QUEUE_ID_BYTE + 8);
3040         sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3041                                      qid_mappings[qes[2].qid],
3042                                      DLB_EVENT_QUEUE_ID_BYTE);
3043         sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3044                                      qid_mappings[qes[3].qid],
3045                                      DLB_EVENT_QUEUE_ID_BYTE + 8);
3046
3047         /* Convert the hardware priority to an event priority and store it in
3048          * the metadata:
3049          * sse_evt[0][55:48]   = DLB_TO_EV_PRIO(qes[0].priority)
3050          * sse_evt[0][119:112] = DLB_TO_EV_PRIO(qes[1].priority)
3051          * sse_evt[1][55:48]   = DLB_TO_EV_PRIO(qes[2].priority)
3052          * sse_evt[1][119:112] = DLB_TO_EV_PRIO(qes[3].priority)
3053          */
3054 #define DLB_EVENT_PRIO_BYTE 6
3055         sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3056                                      DLB_TO_EV_PRIO((uint8_t)qes[0].priority),
3057                                      DLB_EVENT_PRIO_BYTE);
3058         sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3059                                      DLB_TO_EV_PRIO((uint8_t)qes[1].priority),
3060                                      DLB_EVENT_PRIO_BYTE + 8);
3061         sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3062                                      DLB_TO_EV_PRIO((uint8_t)qes[2].priority),
3063                                      DLB_EVENT_PRIO_BYTE);
3064         sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3065                                      DLB_TO_EV_PRIO((uint8_t)qes[3].priority),
3066                                      DLB_EVENT_PRIO_BYTE + 8);
3067
3068         /* Write the event type and sub event type to the event metadata. Leave
3069          * flow ID unspecified, since the hardware does not maintain it during
3070          * scheduling:
3071          * sse_evt[0][31:0]   = qes[0].u.event_type.major << 28 |
3072          *                      qes[0].u.event_type.sub << 20;
3073          * sse_evt[0][95:64]  = qes[1].u.event_type.major << 28 |
3074          *                      qes[1].u.event_type.sub << 20;
3075          * sse_evt[1][31:0]   = qes[2].u.event_type.major << 28 |
3076          *                      qes[2].u.event_type.sub << 20;
3077          * sse_evt[1][95:64]  = qes[3].u.event_type.major << 28 |
3078          *                      qes[3].u.event_type.sub << 20;
3079          */
3080 #define DLB_EVENT_EV_TYPE_DW 0
3081 #define DLB_EVENT_EV_TYPE_SHIFT 28
3082 #define DLB_EVENT_SUB_EV_TYPE_SHIFT 20
3083         sse_evt[0] = _mm_insert_epi32(sse_evt[0],
3084                         qes[0].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT |
3085                         qes[0].u.event_type.sub << DLB_EVENT_SUB_EV_TYPE_SHIFT,
3086                         DLB_EVENT_EV_TYPE_DW);
3087         sse_evt[0] = _mm_insert_epi32(sse_evt[0],
3088                         qes[1].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT |
3089                         qes[1].u.event_type.sub <<  DLB_EVENT_SUB_EV_TYPE_SHIFT,
3090                         DLB_EVENT_EV_TYPE_DW + 2);
3091         sse_evt[1] = _mm_insert_epi32(sse_evt[1],
3092                         qes[2].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT |
3093                         qes[2].u.event_type.sub <<  DLB_EVENT_SUB_EV_TYPE_SHIFT,
3094                         DLB_EVENT_EV_TYPE_DW);
3095         sse_evt[1] = _mm_insert_epi32(sse_evt[1],
3096                         qes[3].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT  |
3097                         qes[3].u.event_type.sub << DLB_EVENT_SUB_EV_TYPE_SHIFT,
3098                         DLB_EVENT_EV_TYPE_DW + 2);
3099
3100         /* Write the sched type to the event metadata. 'op' and 'rsvd' are not
3101          * set:
3102          * sse_evt[0][39:32]  = sched_type_map[qes[0].sched_type] << 6
3103          * sse_evt[0][103:96] = sched_type_map[qes[1].sched_type] << 6
3104          * sse_evt[1][39:32]  = sched_type_map[qes[2].sched_type] << 6
3105          * sse_evt[1][103:96] = sched_type_map[qes[3].sched_type] << 6
3106          */
3107 #define DLB_EVENT_SCHED_TYPE_BYTE 4
3108 #define DLB_EVENT_SCHED_TYPE_SHIFT 6
3109         sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3110                 sched_type_map[qes[0].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
3111                 DLB_EVENT_SCHED_TYPE_BYTE);
3112         sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3113                 sched_type_map[qes[1].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
3114                 DLB_EVENT_SCHED_TYPE_BYTE + 8);
3115         sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3116                 sched_type_map[qes[2].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
3117                 DLB_EVENT_SCHED_TYPE_BYTE);
3118         sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3119                 sched_type_map[qes[3].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
3120                 DLB_EVENT_SCHED_TYPE_BYTE + 8);
3121
3122         /* Store the metadata to the event (use the double-precision
3123          * _mm_storeh_pd because there is no integer function for storing the
3124          * upper 64b):
3125          * events[0].event = sse_evt[0][63:0]
3126          * events[1].event = sse_evt[0][127:64]
3127          * events[2].event = sse_evt[1][63:0]
3128          * events[3].event = sse_evt[1][127:64]
3129          */
3130         _mm_storel_epi64((__m128i *)&events[0].event, sse_evt[0]);
3131         _mm_storeh_pd((double *)&events[1].event, (__m128d) sse_evt[0]);
3132         _mm_storel_epi64((__m128i *)&events[2].event, sse_evt[1]);
3133         _mm_storeh_pd((double *)&events[3].event, (__m128d) sse_evt[1]);
3134
3135         DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qes[0].sched_type], 1);
3136         DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qes[1].sched_type], 1);
3137         DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qes[2].sched_type], 1);
3138         DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qes[3].sched_type], 1);
3139
3140         DLB_INC_STAT(ev_port->stats.traffic.rx_ok, num_events);
3141
3142         return num_events;
3143 }
3144
3145 static inline int
3146 dlb_dequeue_wait(struct dlb_eventdev *dlb,
3147                  struct dlb_eventdev_port *ev_port,
3148                  struct dlb_port *qm_port,
3149                  uint64_t timeout,
3150                  uint64_t start_ticks)
3151 {
3152         struct process_local_port_data *port_data;
3153         uint64_t elapsed_ticks;
3154
3155         port_data = &dlb_port[qm_port->id][PORT_TYPE(qm_port)];
3156
3157         elapsed_ticks = rte_get_timer_cycles() - start_ticks;
3158
3159         /* Wait/poll time expired */
3160         if (elapsed_ticks >= timeout) {
3161                 /* Interrupts not supported by PF PMD */
3162                 return 1;
3163         } else if (dlb->umwait_allowed) {
3164                 struct rte_power_monitor_cond pmc;
3165                 volatile struct dlb_dequeue_qe *cq_base;
3166                 union {
3167                         uint64_t raw_qe[2];
3168                         struct dlb_dequeue_qe qe;
3169                 } qe_mask;
3170                 uint64_t expected_value;
3171                 volatile uint64_t *monitor_addr;
3172
3173                 qe_mask.qe.cq_gen = 1; /* set mask */
3174
3175                 cq_base = port_data->cq_base;
3176                 monitor_addr = (volatile uint64_t *)(volatile void *)
3177                         &cq_base[qm_port->cq_idx];
3178                 monitor_addr++; /* cq_gen bit is in second 64bit location */
3179
3180                 if (qm_port->gen_bit)
3181                         expected_value = qe_mask.raw_qe[1];
3182                 else
3183                         expected_value = 0;
3184
3185                 pmc.addr = monitor_addr;
3186                 pmc.val = expected_value;
3187                 pmc.mask = qe_mask.raw_qe[1];
3188                 pmc.data_sz = sizeof(uint64_t);
3189
3190                 rte_power_monitor(&pmc, timeout + start_ticks);
3191
3192                 DLB_INC_STAT(ev_port->stats.traffic.rx_umonitor_umwait, 1);
3193         } else {
3194                 uint64_t poll_interval = RTE_LIBRTE_PMD_DLB_POLL_INTERVAL;
3195                 uint64_t curr_ticks = rte_get_timer_cycles();
3196                 uint64_t init_ticks = curr_ticks;
3197
3198                 while ((curr_ticks - start_ticks < timeout) &&
3199                        (curr_ticks - init_ticks < poll_interval))
3200                         curr_ticks = rte_get_timer_cycles();
3201         }
3202
3203         return 0;
3204 }
3205
3206 static inline int16_t
3207 dlb_hw_dequeue(struct dlb_eventdev *dlb,
3208                struct dlb_eventdev_port *ev_port,
3209                struct rte_event *events,
3210                uint16_t max_num,
3211                uint64_t dequeue_timeout_ticks)
3212 {
3213         uint64_t timeout;
3214         uint64_t start_ticks = 0ULL;
3215         struct dlb_port *qm_port;
3216         int num = 0;
3217
3218         qm_port = &ev_port->qm_port;
3219
3220         /* If configured for per dequeue wait, then use wait value provided
3221          * to this API. Otherwise we must use the global
3222          * value from eventdev config time.
3223          */
3224         if (!dlb->global_dequeue_wait)
3225                 timeout = dequeue_timeout_ticks;
3226         else
3227                 timeout = dlb->global_dequeue_wait_ticks;
3228
3229         if (timeout)
3230                 start_ticks = rte_get_timer_cycles();
3231
3232         while (num < max_num) {
3233                 struct dlb_dequeue_qe qes[DLB_NUM_QES_PER_CACHE_LINE];
3234                 uint8_t offset;
3235                 int num_avail;
3236
3237                 /* Copy up to 4 QEs from the current cache line into qes */
3238                 num_avail = dlb_recv_qe(qm_port, qes, &offset);
3239
3240                 /* But don't process more than the user requested */
3241                 num_avail = RTE_MIN(num_avail, max_num - num);
3242
3243                 dlb_inc_cq_idx(qm_port, num_avail);
3244
3245                 if (num_avail == DLB_NUM_QES_PER_CACHE_LINE)
3246                         num += dlb_process_dequeue_four_qes(ev_port,
3247                                                              qm_port,
3248                                                              &events[num],
3249                                                              &qes[offset]);
3250                 else if (num_avail)
3251                         num += dlb_process_dequeue_qes(ev_port,
3252                                                         qm_port,
3253                                                         &events[num],
3254                                                         &qes[offset],
3255                                                         num_avail);
3256                 else if ((timeout == 0) || (num > 0))
3257                         /* Not waiting in any form, or 1+ events received? */
3258                         break;
3259                 else if (dlb_dequeue_wait(dlb, ev_port, qm_port,
3260                                           timeout, start_ticks))
3261                         break;
3262         }
3263
3264         qm_port->owed_tokens += num;
3265
3266         if (num && qm_port->token_pop_mode == AUTO_POP)
3267                 dlb_consume_qe_immediate(qm_port, num);
3268
3269         ev_port->outstanding_releases += num;
3270
3271         return num;
3272 }
3273
3274 static __rte_always_inline int
3275 dlb_recv_qe_sparse(struct dlb_port *qm_port, struct dlb_dequeue_qe *qe)
3276 {
3277         volatile struct dlb_dequeue_qe *cq_addr;
3278         uint8_t xor_mask[2] = {0x0F, 0x00};
3279         const uint8_t and_mask = 0x0F;
3280         __m128i *qes = (__m128i *)qe;
3281         uint8_t gen_bits, gen_bit;
3282         uintptr_t addr[4];
3283         uint16_t idx;
3284
3285         cq_addr = dlb_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
3286
3287         idx = qm_port->cq_idx;
3288
3289         /* Load the next 4 QEs */
3290         addr[0] = (uintptr_t)&cq_addr[idx];
3291         addr[1] = (uintptr_t)&cq_addr[(idx +  4) & qm_port->cq_depth_mask];
3292         addr[2] = (uintptr_t)&cq_addr[(idx +  8) & qm_port->cq_depth_mask];
3293         addr[3] = (uintptr_t)&cq_addr[(idx + 12) & qm_port->cq_depth_mask];
3294
3295         /* Prefetch next batch of QEs (all CQs occupy minimum 8 cache lines) */
3296         rte_prefetch0(&cq_addr[(idx + 16) & qm_port->cq_depth_mask]);
3297         rte_prefetch0(&cq_addr[(idx + 20) & qm_port->cq_depth_mask]);
3298         rte_prefetch0(&cq_addr[(idx + 24) & qm_port->cq_depth_mask]);
3299         rte_prefetch0(&cq_addr[(idx + 28) & qm_port->cq_depth_mask]);
3300
3301         /* Correct the xor_mask for wrap-around QEs */
3302         gen_bit = qm_port->gen_bit;
3303         xor_mask[gen_bit] ^= !!((idx +  4) > qm_port->cq_depth_mask) << 1;
3304         xor_mask[gen_bit] ^= !!((idx +  8) > qm_port->cq_depth_mask) << 2;
3305         xor_mask[gen_bit] ^= !!((idx + 12) > qm_port->cq_depth_mask) << 3;
3306
3307         /* Read the cache lines backwards to ensure that if QE[N] (N > 0) is
3308          * valid, then QEs[0:N-1] are too.
3309          */
3310         qes[3] = _mm_load_si128((__m128i *)(void *)addr[3]);
3311         rte_compiler_barrier();
3312         qes[2] = _mm_load_si128((__m128i *)(void *)addr[2]);
3313         rte_compiler_barrier();
3314         qes[1] = _mm_load_si128((__m128i *)(void *)addr[1]);
3315         rte_compiler_barrier();
3316         qes[0] = _mm_load_si128((__m128i *)(void *)addr[0]);
3317
3318         /* Extract and combine the gen bits */
3319         gen_bits = ((_mm_extract_epi8(qes[0], 15) & 0x1) << 0) |
3320                    ((_mm_extract_epi8(qes[1], 15) & 0x1) << 1) |
3321                    ((_mm_extract_epi8(qes[2], 15) & 0x1) << 2) |
3322                    ((_mm_extract_epi8(qes[3], 15) & 0x1) << 3);
3323
3324         /* XOR the combined bits such that a 1 represents a valid QE */
3325         gen_bits ^= xor_mask[gen_bit];
3326
3327         /* Mask off gen bits we don't care about */
3328         gen_bits &= and_mask;
3329
3330         return __builtin_popcount(gen_bits);
3331 }
3332
3333 static inline int16_t
3334 dlb_hw_dequeue_sparse(struct dlb_eventdev *dlb,
3335                       struct dlb_eventdev_port *ev_port,
3336                       struct rte_event *events,
3337                       uint16_t max_num,
3338                       uint64_t dequeue_timeout_ticks)
3339 {
3340         uint64_t timeout;
3341         uint64_t start_ticks = 0ULL;
3342         struct dlb_port *qm_port;
3343         int num = 0;
3344
3345         qm_port = &ev_port->qm_port;
3346
3347         /* If configured for per dequeue wait, then use wait value provided
3348          * to this API. Otherwise we must use the global
3349          * value from eventdev config time.
3350          */
3351         if (!dlb->global_dequeue_wait)
3352                 timeout = dequeue_timeout_ticks;
3353         else
3354                 timeout = dlb->global_dequeue_wait_ticks;
3355
3356         if (timeout)
3357                 start_ticks = rte_get_timer_cycles();
3358
3359         while (num < max_num) {
3360                 struct dlb_dequeue_qe qes[DLB_NUM_QES_PER_CACHE_LINE];
3361                 int num_avail;
3362
3363                 /* Copy up to 4 QEs from the current cache line into qes */
3364                 num_avail = dlb_recv_qe_sparse(qm_port, qes);
3365
3366                 /* But don't process more than the user requested */
3367                 num_avail = RTE_MIN(num_avail, max_num - num);
3368
3369                 dlb_inc_cq_idx(qm_port, num_avail << 2);
3370
3371                 if (num_avail == DLB_NUM_QES_PER_CACHE_LINE)
3372                         num += dlb_process_dequeue_four_qes(ev_port,
3373                                                              qm_port,
3374                                                              &events[num],
3375                                                              &qes[0]);
3376                 else if (num_avail)
3377                         num += dlb_process_dequeue_qes(ev_port,
3378                                                         qm_port,
3379                                                         &events[num],
3380                                                         &qes[0],
3381                                                         num_avail);
3382                 else if ((timeout == 0) || (num > 0))
3383                         /* Not waiting in any form, or 1+ events received? */
3384                         break;
3385                 else if (dlb_dequeue_wait(dlb, ev_port, qm_port,
3386                                           timeout, start_ticks))
3387                         break;
3388         }
3389
3390         qm_port->owed_tokens += num;
3391
3392         if (num && qm_port->token_pop_mode == AUTO_POP)
3393                 dlb_consume_qe_immediate(qm_port, num);
3394
3395         ev_port->outstanding_releases += num;
3396
3397         return num;
3398 }
3399
3400 static int
3401 dlb_event_release(struct dlb_eventdev *dlb, uint8_t port_id, int n)
3402 {
3403         struct process_local_port_data *port_data;
3404         struct dlb_eventdev_port *ev_port;
3405         struct dlb_port *qm_port;
3406         int i;
3407
3408         if (port_id > dlb->num_ports) {
3409                 DLB_LOG_ERR("Invalid port id %d in dlb-event_release\n",
3410                             port_id);
3411                 rte_errno = -EINVAL;
3412                 return rte_errno;
3413         }
3414
3415         ev_port = &dlb->ev_ports[port_id];
3416         qm_port = &ev_port->qm_port;
3417         port_data = &dlb_port[qm_port->id][PORT_TYPE(qm_port)];
3418
3419         i = 0;
3420
3421         if (qm_port->is_directed) {
3422                 i = n;
3423                 goto sw_credit_update;
3424         }
3425
3426         while (i < n) {
3427                 int pop_offs = 0;
3428                 int j = 0;
3429
3430                 /* Zero-out QEs */
3431                 qm_port->qe4[0].cmd_byte = 0;
3432                 qm_port->qe4[1].cmd_byte = 0;
3433                 qm_port->qe4[2].cmd_byte = 0;
3434                 qm_port->qe4[3].cmd_byte = 0;
3435
3436                 for (; j < DLB_NUM_QES_PER_CACHE_LINE && (i + j) < n; j++) {
3437                         int16_t thresh = qm_port->token_pop_thresh;
3438
3439                         if (qm_port->token_pop_mode == DELAYED_POP &&
3440                             qm_port->issued_releases >= thresh - 1) {
3441                                 /* Insert the token pop QE */
3442                                 dlb_construct_token_pop_qe(qm_port, j);
3443
3444                                 /* Reset the releases for the next QE batch */
3445                                 qm_port->issued_releases -= thresh;
3446
3447                                 /* When using delayed token pop mode, the
3448                                  * initial token threshold is the full CQ
3449                                  * depth. After the first token pop, we need to
3450                                  * reset it to the dequeue_depth.
3451                                  */
3452                                 qm_port->token_pop_thresh =
3453                                         qm_port->dequeue_depth;
3454
3455                                 pop_offs = 1;
3456                                 j++;
3457                                 break;
3458                         }
3459
3460                         qm_port->qe4[j].cmd_byte = DLB_COMP_CMD_BYTE;
3461                         qm_port->issued_releases++;
3462                 }
3463
3464                 dlb_hw_do_enqueue(qm_port, i == 0, port_data);
3465
3466                 /* Don't include the token pop QE in the release count */
3467                 i += j - pop_offs;
3468         }
3469
3470 sw_credit_update:
3471         /* each release returns one credit */
3472         if (!ev_port->outstanding_releases) {
3473                 DLB_LOG_ERR("Unrecoverable application error. Outstanding releases underflowed.\n");
3474                 rte_errno = -ENOTRECOVERABLE;
3475                 return rte_errno;
3476         }
3477
3478         ev_port->outstanding_releases -= i;
3479         ev_port->inflight_credits += i;
3480
3481         /* Replenish s/w credits if enough releases are performed */
3482         dlb_replenish_sw_credits(dlb, ev_port);
3483         return 0;
3484 }
3485
3486 static uint16_t
3487 dlb_event_dequeue_burst(void *event_port, struct rte_event *ev, uint16_t num,
3488                         uint64_t wait)
3489 {
3490         struct dlb_eventdev_port *ev_port = event_port;
3491         struct dlb_port *qm_port = &ev_port->qm_port;
3492         struct dlb_eventdev *dlb = ev_port->dlb;
3493         uint16_t cnt;
3494         int ret;
3495
3496         rte_errno = 0;
3497
3498         RTE_ASSERT(ev_port->setup_done);
3499         RTE_ASSERT(ev != NULL);
3500
3501         if (ev_port->implicit_release && ev_port->outstanding_releases > 0) {
3502                 uint16_t out_rels = ev_port->outstanding_releases;
3503
3504                 ret = dlb_event_release(dlb, ev_port->id, out_rels);
3505                 if (ret)
3506                         return(ret);
3507
3508                 DLB_INC_STAT(ev_port->stats.tx_implicit_rel, out_rels);
3509         }
3510
3511         if (qm_port->token_pop_mode == DEFERRED_POP &&
3512                         qm_port->owed_tokens)
3513                 dlb_consume_qe_immediate(qm_port, qm_port->owed_tokens);
3514
3515         cnt = dlb_hw_dequeue(dlb, ev_port, ev, num, wait);
3516
3517         DLB_INC_STAT(ev_port->stats.traffic.total_polls, 1);
3518         DLB_INC_STAT(ev_port->stats.traffic.zero_polls, ((cnt == 0) ? 1 : 0));
3519         return cnt;
3520 }
3521
3522 static uint16_t
3523 dlb_event_dequeue(void *event_port, struct rte_event *ev, uint64_t wait)
3524 {
3525         return dlb_event_dequeue_burst(event_port, ev, 1, wait);
3526 }
3527
3528 static uint16_t
3529 dlb_event_dequeue_burst_sparse(void *event_port, struct rte_event *ev,
3530                                uint16_t num, uint64_t wait)
3531 {
3532         struct dlb_eventdev_port *ev_port = event_port;
3533         struct dlb_port *qm_port = &ev_port->qm_port;
3534         struct dlb_eventdev *dlb = ev_port->dlb;
3535         uint16_t cnt;
3536         int ret;
3537
3538         rte_errno = 0;
3539
3540         RTE_ASSERT(ev_port->setup_done);
3541         RTE_ASSERT(ev != NULL);
3542
3543         if (ev_port->implicit_release && ev_port->outstanding_releases > 0) {
3544                 uint16_t out_rels = ev_port->outstanding_releases;
3545
3546                 ret = dlb_event_release(dlb, ev_port->id, out_rels);
3547                 if (ret)
3548                         return(ret);
3549
3550                 DLB_INC_STAT(ev_port->stats.tx_implicit_rel, out_rels);
3551         }
3552
3553         if (qm_port->token_pop_mode == DEFERRED_POP &&
3554             qm_port->owed_tokens)
3555                 dlb_consume_qe_immediate(qm_port, qm_port->owed_tokens);
3556
3557         cnt = dlb_hw_dequeue_sparse(dlb, ev_port, ev, num, wait);
3558
3559         DLB_INC_STAT(ev_port->stats.traffic.total_polls, 1);
3560         DLB_INC_STAT(ev_port->stats.traffic.zero_polls, ((cnt == 0) ? 1 : 0));
3561         return cnt;
3562 }
3563
3564 static uint16_t
3565 dlb_event_dequeue_sparse(void *event_port, struct rte_event *ev, uint64_t wait)
3566 {
3567         return dlb_event_dequeue_burst_sparse(event_port, ev, 1, wait);
3568 }
3569
3570 static uint32_t
3571 dlb_get_ldb_queue_depth(struct dlb_eventdev *dlb,
3572                         struct dlb_eventdev_queue *queue)
3573 {
3574         struct dlb_hw_dev *handle = &dlb->qm_instance;
3575         struct dlb_get_ldb_queue_depth_args cfg;
3576         struct dlb_cmd_response response;
3577         int ret;
3578
3579         cfg.queue_id = queue->qm_queue.id;
3580         cfg.response = (uintptr_t)&response;
3581
3582         ret = dlb_iface_get_ldb_queue_depth(handle, &cfg);
3583         if (ret < 0) {
3584                 DLB_LOG_ERR("dlb: get_ldb_queue_depth ret=%d (driver status: %s)\n",
3585                             ret, dlb_error_strings[response.status]);
3586                 return ret;
3587         }
3588
3589         return response.id;
3590 }
3591
3592 static uint32_t
3593 dlb_get_dir_queue_depth(struct dlb_eventdev *dlb,
3594                         struct dlb_eventdev_queue *queue)
3595 {
3596         struct dlb_hw_dev *handle = &dlb->qm_instance;
3597         struct dlb_get_dir_queue_depth_args cfg;
3598         struct dlb_cmd_response response;
3599         int ret;
3600
3601         cfg.queue_id = queue->qm_queue.id;
3602         cfg.response = (uintptr_t)&response;
3603
3604         ret = dlb_iface_get_dir_queue_depth(handle, &cfg);
3605         if (ret < 0) {
3606                 DLB_LOG_ERR("dlb: get_dir_queue_depth ret=%d (driver status: %s)\n",
3607                             ret, dlb_error_strings[response.status]);
3608                 return ret;
3609         }
3610
3611         return response.id;
3612 }
3613
3614 uint32_t
3615 dlb_get_queue_depth(struct dlb_eventdev *dlb,
3616                     struct dlb_eventdev_queue *queue)
3617 {
3618         if (queue->qm_queue.is_directed)
3619                 return dlb_get_dir_queue_depth(dlb, queue);
3620         else
3621                 return dlb_get_ldb_queue_depth(dlb, queue);
3622 }
3623
3624 static bool
3625 dlb_queue_is_empty(struct dlb_eventdev *dlb,
3626                    struct dlb_eventdev_queue *queue)
3627 {
3628         return dlb_get_queue_depth(dlb, queue) == 0;
3629 }
3630
3631 static bool
3632 dlb_linked_queues_empty(struct dlb_eventdev *dlb)
3633 {
3634         int i;
3635
3636         for (i = 0; i < dlb->num_queues; i++) {
3637                 if (dlb->ev_queues[i].num_links == 0)
3638                         continue;
3639                 if (!dlb_queue_is_empty(dlb, &dlb->ev_queues[i]))
3640                         return false;
3641         }
3642
3643         return true;
3644 }
3645
3646 static bool
3647 dlb_queues_empty(struct dlb_eventdev *dlb)
3648 {
3649         int i;
3650
3651         for (i = 0; i < dlb->num_queues; i++) {
3652                 if (!dlb_queue_is_empty(dlb, &dlb->ev_queues[i]))
3653                         return false;
3654         }
3655
3656         return true;
3657 }
3658
3659 static void
3660 dlb_flush_port(struct rte_eventdev *dev, int port_id)
3661 {
3662         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
3663         eventdev_stop_flush_t flush;
3664         struct rte_event ev;
3665         uint8_t dev_id;
3666         void *arg;
3667         int i;
3668
3669         flush = dev->dev_ops->dev_stop_flush;
3670         dev_id = dev->data->dev_id;
3671         arg = dev->data->dev_stop_flush_arg;
3672
3673         while (rte_event_dequeue_burst(dev_id, port_id, &ev, 1, 0)) {
3674                 if (flush)
3675                         flush(dev_id, ev, arg);
3676
3677                 if (dlb->ev_ports[port_id].qm_port.is_directed)
3678                         continue;
3679
3680                 ev.op = RTE_EVENT_OP_RELEASE;
3681
3682                 rte_event_enqueue_burst(dev_id, port_id, &ev, 1);
3683         }
3684
3685         /* Enqueue any additional outstanding releases */
3686         ev.op = RTE_EVENT_OP_RELEASE;
3687
3688         for (i = dlb->ev_ports[port_id].outstanding_releases; i > 0; i--)
3689                 rte_event_enqueue_burst(dev_id, port_id, &ev, 1);
3690 }
3691
3692 static void
3693 dlb_drain(struct rte_eventdev *dev)
3694 {
3695         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
3696         struct dlb_eventdev_port *ev_port = NULL;
3697         uint8_t dev_id;
3698         int i;
3699
3700         dev_id = dev->data->dev_id;
3701
3702         while (!dlb_linked_queues_empty(dlb)) {
3703                 /* Flush all the ev_ports, which will drain all their connected
3704                  * queues.
3705                  */
3706                 for (i = 0; i < dlb->num_ports; i++)
3707                         dlb_flush_port(dev, i);
3708         }
3709
3710         /* The queues are empty, but there may be events left in the ports. */
3711         for (i = 0; i < dlb->num_ports; i++)
3712                 dlb_flush_port(dev, i);
3713
3714         /* If the domain's queues are empty, we're done. */
3715         if (dlb_queues_empty(dlb))
3716                 return;
3717
3718         /* Else, there must be at least one unlinked load-balanced queue.
3719          * Select a load-balanced port with which to drain the unlinked
3720          * queue(s).
3721          */
3722         for (i = 0; i < dlb->num_ports; i++) {
3723                 ev_port = &dlb->ev_ports[i];
3724
3725                 if (!ev_port->qm_port.is_directed)
3726                         break;
3727         }
3728
3729         if (i == dlb->num_ports) {
3730                 DLB_LOG_ERR("internal error: no LDB ev_ports\n");
3731                 return;
3732         }
3733
3734         rte_errno = 0;
3735         rte_event_port_unlink(dev_id, ev_port->id, NULL, 0);
3736
3737         if (rte_errno) {
3738                 DLB_LOG_ERR("internal error: failed to unlink ev_port %d\n",
3739                             ev_port->id);
3740                 return;
3741         }
3742
3743         for (i = 0; i < dlb->num_queues; i++) {
3744                 uint8_t qid, prio;
3745                 int ret;
3746
3747                 if (dlb_queue_is_empty(dlb, &dlb->ev_queues[i]))
3748                         continue;
3749
3750                 qid = i;
3751                 prio = 0;
3752
3753                 /* Link the ev_port to the queue */
3754                 ret = rte_event_port_link(dev_id, ev_port->id, &qid, &prio, 1);
3755                 if (ret != 1) {
3756                         DLB_LOG_ERR("internal error: failed to link ev_port %d to queue %d\n",
3757                                     ev_port->id, qid);
3758                         return;
3759                 }
3760
3761                 /* Flush the queue */
3762                 while (!dlb_queue_is_empty(dlb, &dlb->ev_queues[i]))
3763                         dlb_flush_port(dev, ev_port->id);
3764
3765                 /* Drain any extant events in the ev_port. */
3766                 dlb_flush_port(dev, ev_port->id);
3767
3768                 /* Unlink the ev_port from the queue */
3769                 ret = rte_event_port_unlink(dev_id, ev_port->id, &qid, 1);
3770                 if (ret != 1) {
3771                         DLB_LOG_ERR("internal error: failed to unlink ev_port %d to queue %d\n",
3772                                     ev_port->id, qid);
3773                         return;
3774                 }
3775         }
3776 }
3777
3778 static void
3779 dlb_eventdev_stop(struct rte_eventdev *dev)
3780 {
3781         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
3782
3783         rte_spinlock_lock(&dlb->qm_instance.resource_lock);
3784
3785         if (dlb->run_state == DLB_RUN_STATE_STOPPED) {
3786                 DLB_LOG_DBG("Internal error: already stopped\n");
3787                 rte_spinlock_unlock(&dlb->qm_instance.resource_lock);
3788                 return;
3789         } else if (dlb->run_state != DLB_RUN_STATE_STARTED) {
3790                 DLB_LOG_ERR("Internal error: bad state %d for dev_stop\n",
3791                             (int)dlb->run_state);
3792                 rte_spinlock_unlock(&dlb->qm_instance.resource_lock);
3793                 return;
3794         }
3795
3796         dlb->run_state = DLB_RUN_STATE_STOPPING;
3797
3798         rte_spinlock_unlock(&dlb->qm_instance.resource_lock);
3799
3800         dlb_drain(dev);
3801
3802         dlb->run_state = DLB_RUN_STATE_STOPPED;
3803 }
3804
3805 static int
3806 dlb_eventdev_close(struct rte_eventdev *dev)
3807 {
3808         dlb_hw_reset_sched_domain(dev, false);
3809
3810         return 0;
3811 }
3812
3813 static void
3814 dlb_eventdev_port_release(void *port)
3815 {
3816         struct dlb_eventdev_port *ev_port = port;
3817
3818         if (ev_port) {
3819                 struct dlb_port *qm_port = &ev_port->qm_port;
3820
3821                 if (qm_port->config_state == DLB_CONFIGURED)
3822                         dlb_free_qe_mem(qm_port);
3823         }
3824 }
3825
3826 static void
3827 dlb_eventdev_queue_release(struct rte_eventdev *dev, uint8_t id)
3828 {
3829         RTE_SET_USED(dev);
3830         RTE_SET_USED(id);
3831
3832         /* This function intentionally left blank. */
3833 }
3834
3835 static int
3836 dlb_eventdev_timeout_ticks(struct rte_eventdev *dev, uint64_t ns,
3837                            uint64_t *timeout_ticks)
3838 {
3839         RTE_SET_USED(dev);
3840         uint64_t cycles_per_ns = rte_get_timer_hz() / 1E9;
3841
3842         *timeout_ticks = ns * cycles_per_ns;
3843
3844         return 0;
3845 }
3846
3847 void
3848 dlb_entry_points_init(struct rte_eventdev *dev)
3849 {
3850         struct dlb_eventdev *dlb;
3851
3852         static struct rte_eventdev_ops dlb_eventdev_entry_ops = {
3853                 .dev_infos_get    = dlb_eventdev_info_get,
3854                 .dev_configure    = dlb_eventdev_configure,
3855                 .dev_start        = dlb_eventdev_start,
3856                 .dev_stop         = dlb_eventdev_stop,
3857                 .dev_close        = dlb_eventdev_close,
3858                 .queue_def_conf   = dlb_eventdev_queue_default_conf_get,
3859                 .port_def_conf    = dlb_eventdev_port_default_conf_get,
3860                 .queue_setup      = dlb_eventdev_queue_setup,
3861                 .queue_release    = dlb_eventdev_queue_release,
3862                 .port_setup       = dlb_eventdev_port_setup,
3863                 .port_release     = dlb_eventdev_port_release,
3864                 .port_link        = dlb_eventdev_port_link,
3865                 .port_unlink      = dlb_eventdev_port_unlink,
3866                 .port_unlinks_in_progress =
3867                                     dlb_eventdev_port_unlinks_in_progress,
3868                 .timeout_ticks    = dlb_eventdev_timeout_ticks,
3869                 .dump             = dlb_eventdev_dump,
3870                 .xstats_get       = dlb_eventdev_xstats_get,
3871                 .xstats_get_names = dlb_eventdev_xstats_get_names,
3872                 .xstats_get_by_name = dlb_eventdev_xstats_get_by_name,
3873                 .xstats_reset       = dlb_eventdev_xstats_reset,
3874                 .dev_selftest     = test_dlb_eventdev,
3875         };
3876
3877         /* Expose PMD's eventdev interface */
3878         dev->dev_ops = &dlb_eventdev_entry_ops;
3879
3880         dev->enqueue = dlb_event_enqueue;
3881         dev->enqueue_burst = dlb_event_enqueue_burst;
3882         dev->enqueue_new_burst = dlb_event_enqueue_new_burst;
3883         dev->enqueue_forward_burst = dlb_event_enqueue_forward_burst;
3884         dev->dequeue = dlb_event_dequeue;
3885         dev->dequeue_burst = dlb_event_dequeue_burst;
3886
3887         dlb = dev->data->dev_private;
3888
3889         if (dlb->poll_mode == DLB_CQ_POLL_MODE_SPARSE) {
3890                 dev->dequeue = dlb_event_dequeue_sparse;
3891                 dev->dequeue_burst = dlb_event_dequeue_burst_sparse;
3892         }
3893 }
3894
3895 int
3896 dlb_primary_eventdev_probe(struct rte_eventdev *dev,
3897                            const char *name,
3898                            struct dlb_devargs *dlb_args)
3899 {
3900         struct dlb_eventdev *dlb;
3901         int err, i;
3902
3903         dlb = dev->data->dev_private;
3904
3905         dlb->event_dev = dev; /* backlink */
3906
3907         evdev_dlb_default_info.driver_name = name;
3908
3909         dlb->max_num_events_override = dlb_args->max_num_events;
3910         dlb->num_dir_credits_override = dlb_args->num_dir_credits_override;
3911         dlb->defer_sched = dlb_args->defer_sched;
3912         dlb->num_atm_inflights_per_queue = dlb_args->num_atm_inflights;
3913
3914         /* Open the interface.
3915          * For vdev mode, this means open the dlb kernel module.
3916          */
3917         err = dlb_iface_open(&dlb->qm_instance, name);
3918         if (err < 0) {
3919                 DLB_LOG_ERR("could not open event hardware device, err=%d\n",
3920                             err);
3921                 return err;
3922         }
3923
3924         err = dlb_iface_get_device_version(&dlb->qm_instance, &dlb->revision);
3925         if (err < 0) {
3926                 DLB_LOG_ERR("dlb: failed to get the device version, err=%d\n",
3927                             err);
3928                 return err;
3929         }
3930
3931         err = dlb_hw_query_resources(dlb);
3932         if (err) {
3933                 DLB_LOG_ERR("get resources err=%d for %s\n", err, name);
3934                 return err;
3935         }
3936
3937         err = dlb_iface_get_cq_poll_mode(&dlb->qm_instance, &dlb->poll_mode);
3938         if (err < 0) {
3939                 DLB_LOG_ERR("dlb: failed to get the poll mode, err=%d\n", err);
3940                 return err;
3941         }
3942
3943         /* Complete xtstats runtime initialization */
3944         err = dlb_xstats_init(dlb);
3945         if (err) {
3946                 DLB_LOG_ERR("dlb: failed to init xstats, err=%d\n", err);
3947                 return err;
3948         }
3949
3950         /* Initialize each port's token pop mode */
3951         for (i = 0; i < DLB_MAX_NUM_PORTS; i++)
3952                 dlb->ev_ports[i].qm_port.token_pop_mode = AUTO_POP;
3953
3954         rte_spinlock_init(&dlb->qm_instance.resource_lock);
3955
3956         dlb_iface_low_level_io_init(dlb);
3957
3958         dlb_entry_points_init(dev);
3959
3960         return 0;
3961 }
3962
3963 int
3964 dlb_secondary_eventdev_probe(struct rte_eventdev *dev,
3965                              const char *name)
3966 {
3967         struct dlb_eventdev *dlb;
3968         int err;
3969
3970         dlb = dev->data->dev_private;
3971
3972         evdev_dlb_default_info.driver_name = name;
3973
3974         err = dlb_iface_open(&dlb->qm_instance, name);
3975         if (err < 0) {
3976                 DLB_LOG_ERR("could not open event hardware device, err=%d\n",
3977                             err);
3978                 return err;
3979         }
3980
3981         err = dlb_hw_query_resources(dlb);
3982         if (err) {
3983                 DLB_LOG_ERR("get resources err=%d for %s\n", err, name);
3984                 return err;
3985         }
3986
3987         dlb_iface_low_level_io_init(dlb);
3988
3989         dlb_entry_points_init(dev);
3990
3991         return 0;
3992 }
3993
3994 int
3995 dlb_parse_params(const char *params,
3996                  const char *name,
3997                  struct dlb_devargs *dlb_args)
3998 {
3999         int ret = 0;
4000         static const char * const args[] = { NUMA_NODE_ARG,
4001                                              DLB_MAX_NUM_EVENTS,
4002                                              DLB_NUM_DIR_CREDITS,
4003                                              DEV_ID_ARG,
4004                                              DLB_DEFER_SCHED_ARG,
4005                                              DLB_NUM_ATM_INFLIGHTS_ARG,
4006                                              NULL };
4007
4008         if (params && params[0] != '\0') {
4009                 struct rte_kvargs *kvlist = rte_kvargs_parse(params, args);
4010
4011                 if (kvlist == NULL) {
4012                         DLB_LOG_INFO("Ignoring unsupported parameters when creating device '%s'\n",
4013                                      name);
4014                 } else {
4015                         int ret = rte_kvargs_process(kvlist, NUMA_NODE_ARG,
4016                                                      set_numa_node,
4017                                                      &dlb_args->socket_id);
4018                         if (ret != 0) {
4019                                 DLB_LOG_ERR("%s: Error parsing numa node parameter",
4020                                             name);
4021                                 rte_kvargs_free(kvlist);
4022                                 return ret;
4023                         }
4024
4025                         ret = rte_kvargs_process(kvlist, DLB_MAX_NUM_EVENTS,
4026                                                  set_max_num_events,
4027                                                  &dlb_args->max_num_events);
4028                         if (ret != 0) {
4029                                 DLB_LOG_ERR("%s: Error parsing max_num_events parameter",
4030                                             name);
4031                                 rte_kvargs_free(kvlist);
4032                                 return ret;
4033                         }
4034
4035                         ret = rte_kvargs_process(kvlist,
4036                                         DLB_NUM_DIR_CREDITS,
4037                                         set_num_dir_credits,
4038                                         &dlb_args->num_dir_credits_override);
4039                         if (ret != 0) {
4040                                 DLB_LOG_ERR("%s: Error parsing num_dir_credits parameter",
4041                                             name);
4042                                 rte_kvargs_free(kvlist);
4043                                 return ret;
4044                         }
4045
4046                         ret = rte_kvargs_process(kvlist, DEV_ID_ARG,
4047                                                  set_dev_id,
4048                                                  &dlb_args->dev_id);
4049                         if (ret != 0) {
4050                                 DLB_LOG_ERR("%s: Error parsing dev_id parameter",
4051                                             name);
4052                                 rte_kvargs_free(kvlist);
4053                                 return ret;
4054                         }
4055
4056                         ret = rte_kvargs_process(kvlist, DLB_DEFER_SCHED_ARG,
4057                                                  set_defer_sched,
4058                                                  &dlb_args->defer_sched);
4059                         if (ret != 0) {
4060                                 DLB_LOG_ERR("%s: Error parsing defer_sched parameter",
4061                                             name);
4062                                 rte_kvargs_free(kvlist);
4063                                 return ret;
4064                         }
4065
4066                         ret = rte_kvargs_process(kvlist,
4067                                                  DLB_NUM_ATM_INFLIGHTS_ARG,
4068                                                  set_num_atm_inflights,
4069                                                  &dlb_args->num_atm_inflights);
4070                         if (ret != 0) {
4071                                 DLB_LOG_ERR("%s: Error parsing atm_inflights parameter",
4072                                             name);
4073                                 rte_kvargs_free(kvlist);
4074                                 return ret;
4075                         }
4076
4077                         rte_kvargs_free(kvlist);
4078                 }
4079         }
4080         return ret;
4081 }
4082 RTE_LOG_REGISTER(eventdev_dlb_log_level, pmd.event.dlb, NOTICE);