1aab7cecfc3af87bb978d90ab303b03b2f60991e
[dpdk.git] / drivers / event / dlb / dlb.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2020 Intel Corporation
3  */
4
5 #include <assert.h>
6 #include <errno.h>
7 #include <nmmintrin.h>
8 #include <pthread.h>
9 #include <stdbool.h>
10 #include <stdint.h>
11 #include <stdio.h>
12 #include <string.h>
13 #include <sys/fcntl.h>
14 #include <sys/mman.h>
15 #include <unistd.h>
16
17 #include <rte_common.h>
18 #include <rte_config.h>
19 #include <rte_cycles.h>
20 #include <rte_debug.h>
21 #include <rte_dev.h>
22 #include <rte_errno.h>
23 #include <rte_io.h>
24 #include <rte_kvargs.h>
25 #include <rte_log.h>
26 #include <rte_malloc.h>
27 #include <rte_mbuf.h>
28 #include <rte_power_intrinsics.h>
29 #include <rte_prefetch.h>
30 #include <rte_ring.h>
31 #include <rte_string_fns.h>
32
33 #include <rte_eventdev.h>
34 #include <rte_eventdev_pmd.h>
35
36 #include "dlb_priv.h"
37 #include "dlb_iface.h"
38 #include "dlb_inline_fns.h"
39
40 /*
41  * Resources exposed to eventdev.
42  */
43 #if (RTE_EVENT_MAX_QUEUES_PER_DEV > UINT8_MAX)
44 #error "RTE_EVENT_MAX_QUEUES_PER_DEV cannot fit in member max_event_queues"
45 #endif
46 static struct rte_event_dev_info evdev_dlb_default_info = {
47         .driver_name = "", /* probe will set */
48         .min_dequeue_timeout_ns = DLB_MIN_DEQUEUE_TIMEOUT_NS,
49         .max_dequeue_timeout_ns = DLB_MAX_DEQUEUE_TIMEOUT_NS,
50 #if (RTE_EVENT_MAX_QUEUES_PER_DEV < DLB_MAX_NUM_LDB_QUEUES)
51         .max_event_queues = RTE_EVENT_MAX_QUEUES_PER_DEV,
52 #else
53         .max_event_queues = DLB_MAX_NUM_LDB_QUEUES,
54 #endif
55         .max_event_queue_flows = DLB_MAX_NUM_FLOWS,
56         .max_event_queue_priority_levels = DLB_QID_PRIORITIES,
57         .max_event_priority_levels = DLB_QID_PRIORITIES,
58         .max_event_ports = DLB_MAX_NUM_LDB_PORTS,
59         .max_event_port_dequeue_depth = DLB_MAX_CQ_DEPTH,
60         .max_event_port_enqueue_depth = DLB_MAX_ENQUEUE_DEPTH,
61         .max_event_port_links = DLB_MAX_NUM_QIDS_PER_LDB_CQ,
62         .max_num_events = DLB_MAX_NUM_LDB_CREDITS,
63         .max_single_link_event_port_queue_pairs = DLB_MAX_NUM_DIR_PORTS,
64         .event_dev_cap = (RTE_EVENT_DEV_CAP_QUEUE_QOS |
65                           RTE_EVENT_DEV_CAP_EVENT_QOS |
66                           RTE_EVENT_DEV_CAP_BURST_MODE |
67                           RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED |
68                           RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE |
69                           RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES),
70 };
71
72 struct process_local_port_data
73 dlb_port[DLB_MAX_NUM_PORTS][NUM_DLB_PORT_TYPES];
74
75 uint32_t
76 dlb_get_queue_depth(struct dlb_eventdev *dlb,
77                     struct dlb_eventdev_queue *queue)
78 {
79         /* DUMMY FOR NOW So "xstats" patch compiles */
80         RTE_SET_USED(dlb);
81         RTE_SET_USED(queue);
82
83         return 0;
84 }
85
86 static int
87 dlb_hw_query_resources(struct dlb_eventdev *dlb)
88 {
89         struct dlb_hw_dev *handle = &dlb->qm_instance;
90         struct dlb_hw_resource_info *dlb_info = &handle->info;
91         int ret;
92
93         ret = dlb_iface_get_num_resources(handle,
94                                           &dlb->hw_rsrc_query_results);
95         if (ret) {
96                 DLB_LOG_ERR("get dlb num resources, err=%d\n", ret);
97                 return ret;
98         }
99
100         /* Complete filling in device resource info returned to evdev app,
101          * overriding any default values.
102          * The capabilities (CAPs) were set at compile time.
103          */
104
105         evdev_dlb_default_info.max_event_queues =
106                 dlb->hw_rsrc_query_results.num_ldb_queues;
107
108         evdev_dlb_default_info.max_event_ports =
109                 dlb->hw_rsrc_query_results.num_ldb_ports;
110
111         evdev_dlb_default_info.max_num_events =
112                 dlb->hw_rsrc_query_results.max_contiguous_ldb_credits;
113
114         /* Save off values used when creating the scheduling domain. */
115
116         handle->info.num_sched_domains =
117                 dlb->hw_rsrc_query_results.num_sched_domains;
118
119         handle->info.hw_rsrc_max.nb_events_limit =
120                 dlb->hw_rsrc_query_results.max_contiguous_ldb_credits;
121
122         handle->info.hw_rsrc_max.num_queues =
123                 dlb->hw_rsrc_query_results.num_ldb_queues +
124                 dlb->hw_rsrc_query_results.num_dir_ports;
125
126         handle->info.hw_rsrc_max.num_ldb_queues =
127                 dlb->hw_rsrc_query_results.num_ldb_queues;
128
129         handle->info.hw_rsrc_max.num_ldb_ports =
130                 dlb->hw_rsrc_query_results.num_ldb_ports;
131
132         handle->info.hw_rsrc_max.num_dir_ports =
133                 dlb->hw_rsrc_query_results.num_dir_ports;
134
135         handle->info.hw_rsrc_max.reorder_window_size =
136                 dlb->hw_rsrc_query_results.num_hist_list_entries;
137
138         rte_memcpy(dlb_info, &handle->info.hw_rsrc_max, sizeof(*dlb_info));
139
140         return 0;
141 }
142
143 static void
144 dlb_free_qe_mem(struct dlb_port *qm_port)
145 {
146         if (qm_port == NULL)
147                 return;
148
149         rte_free(qm_port->qe4);
150         qm_port->qe4 = NULL;
151
152         rte_free(qm_port->consume_qe);
153         qm_port->consume_qe = NULL;
154 }
155
156 static int
157 dlb_init_consume_qe(struct dlb_port *qm_port, char *mz_name)
158 {
159         struct dlb_cq_pop_qe *qe;
160
161         qe = rte_zmalloc(mz_name,
162                         DLB_NUM_QES_PER_CACHE_LINE *
163                                 sizeof(struct dlb_cq_pop_qe),
164                         RTE_CACHE_LINE_SIZE);
165
166         if (qe == NULL) {
167                 DLB_LOG_ERR("dlb: no memory for consume_qe\n");
168                 return -ENOMEM;
169         }
170
171         qm_port->consume_qe = qe;
172
173         qe->qe_valid = 0;
174         qe->qe_frag = 0;
175         qe->qe_comp = 0;
176         qe->cq_token = 1;
177         /* Tokens value is 0-based; i.e. '0' returns 1 token, '1' returns 2,
178          * and so on.
179          */
180         qe->tokens = 0; /* set at run time */
181         qe->meas_lat = 0;
182         qe->no_dec = 0;
183         /* Completion IDs are disabled */
184         qe->cmp_id = 0;
185
186         return 0;
187 }
188
189 static int
190 dlb_init_qe_mem(struct dlb_port *qm_port, char *mz_name)
191 {
192         int ret, sz;
193
194         sz = DLB_NUM_QES_PER_CACHE_LINE * sizeof(struct dlb_enqueue_qe);
195
196         qm_port->qe4 = rte_zmalloc(mz_name, sz, RTE_CACHE_LINE_SIZE);
197
198         if (qm_port->qe4 == NULL) {
199                 DLB_LOG_ERR("dlb: no qe4 memory\n");
200                 ret = -ENOMEM;
201                 goto error_exit;
202         }
203
204         ret = dlb_init_consume_qe(qm_port, mz_name);
205         if (ret < 0) {
206                 DLB_LOG_ERR("dlb: dlb_init_consume_qe ret=%d\n", ret);
207                 goto error_exit;
208         }
209
210         return 0;
211
212 error_exit:
213
214         dlb_free_qe_mem(qm_port);
215
216         return ret;
217 }
218
219 /* Wrapper for string to int conversion. Substituted for atoi(...), which is
220  * unsafe.
221  */
222 #define DLB_BASE_10 10
223
224 static int
225 dlb_string_to_int(int *result, const char *str)
226 {
227         long ret;
228         char *endstr;
229
230         if (str == NULL || result == NULL)
231                 return -EINVAL;
232
233         errno = 0;
234         ret = strtol(str, &endstr, DLB_BASE_10);
235         if (errno)
236                 return -errno;
237
238         /* long int and int may be different width for some architectures */
239         if (ret < INT_MIN || ret > INT_MAX || endstr == str)
240                 return -EINVAL;
241
242         *result = ret;
243         return 0;
244 }
245
246 static int
247 set_numa_node(const char *key __rte_unused, const char *value, void *opaque)
248 {
249         int *socket_id = opaque;
250         int ret;
251
252         ret = dlb_string_to_int(socket_id, value);
253         if (ret < 0)
254                 return ret;
255
256         if (*socket_id > RTE_MAX_NUMA_NODES)
257                 return -EINVAL;
258
259         return 0;
260 }
261
262 static int
263 set_max_num_events(const char *key __rte_unused,
264                    const char *value,
265                    void *opaque)
266 {
267         int *max_num_events = opaque;
268         int ret;
269
270         if (value == NULL || opaque == NULL) {
271                 DLB_LOG_ERR("NULL pointer\n");
272                 return -EINVAL;
273         }
274
275         ret = dlb_string_to_int(max_num_events, value);
276         if (ret < 0)
277                 return ret;
278
279         if (*max_num_events < 0 || *max_num_events > DLB_MAX_NUM_LDB_CREDITS) {
280                 DLB_LOG_ERR("dlb: max_num_events must be between 0 and %d\n",
281                             DLB_MAX_NUM_LDB_CREDITS);
282                 return -EINVAL;
283         }
284
285         return 0;
286 }
287
288 static int
289 set_num_dir_credits(const char *key __rte_unused,
290                     const char *value,
291                     void *opaque)
292 {
293         int *num_dir_credits = opaque;
294         int ret;
295
296         if (value == NULL || opaque == NULL) {
297                 DLB_LOG_ERR("NULL pointer\n");
298                 return -EINVAL;
299         }
300
301         ret = dlb_string_to_int(num_dir_credits, value);
302         if (ret < 0)
303                 return ret;
304
305         if (*num_dir_credits < 0 ||
306             *num_dir_credits > DLB_MAX_NUM_DIR_CREDITS) {
307                 DLB_LOG_ERR("dlb: num_dir_credits must be between 0 and %d\n",
308                             DLB_MAX_NUM_DIR_CREDITS);
309                 return -EINVAL;
310         }
311         return 0;
312 }
313
314 /* VDEV-only notes:
315  * This function first unmaps all memory mappings and closes the
316  * domain's file descriptor, which causes the driver to reset the
317  * scheduling domain. Once that completes (when close() returns), we
318  * can safely free the dynamically allocated memory used by the
319  * scheduling domain.
320  *
321  * PF-only notes:
322  * We will maintain a use count and use that to determine when
323  * a reset is required.  In PF mode, we never mmap, or munmap
324  * device memory,  and we own the entire physical PCI device.
325  */
326
327 static void
328 dlb_hw_reset_sched_domain(const struct rte_eventdev *dev, bool reconfig)
329 {
330         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
331         enum dlb_configuration_state config_state;
332         int i, j;
333
334         /* Close and reset the domain */
335         dlb_iface_domain_close(dlb);
336
337         /* Free all dynamically allocated port memory */
338         for (i = 0; i < dlb->num_ports; i++)
339                 dlb_free_qe_mem(&dlb->ev_ports[i].qm_port);
340
341         /* If reconfiguring, mark the device's queues and ports as "previously
342          * configured." If the user does not reconfigure them, the PMD will
343          * reapply their previous configuration when the device is started.
344          */
345         config_state = (reconfig) ? DLB_PREV_CONFIGURED : DLB_NOT_CONFIGURED;
346
347         for (i = 0; i < dlb->num_ports; i++) {
348                 dlb->ev_ports[i].qm_port.config_state = config_state;
349                 /* Reset setup_done so ports can be reconfigured */
350                 dlb->ev_ports[i].setup_done = false;
351                 for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++)
352                         dlb->ev_ports[i].link[j].mapped = false;
353         }
354
355         for (i = 0; i < dlb->num_queues; i++)
356                 dlb->ev_queues[i].qm_queue.config_state = config_state;
357
358         for (i = 0; i < DLB_MAX_NUM_QUEUES; i++)
359                 dlb->ev_queues[i].setup_done = false;
360
361         dlb->num_ports = 0;
362         dlb->num_ldb_ports = 0;
363         dlb->num_dir_ports = 0;
364         dlb->num_queues = 0;
365         dlb->num_ldb_queues = 0;
366         dlb->num_dir_queues = 0;
367         dlb->configured = false;
368 }
369
370 static int
371 dlb_ldb_credit_pool_create(struct dlb_hw_dev *handle)
372 {
373         struct dlb_create_ldb_pool_args cfg;
374         struct dlb_cmd_response response;
375         int ret;
376
377         if (handle == NULL)
378                 return -EINVAL;
379
380         if (!handle->cfg.resources.num_ldb_credits) {
381                 handle->cfg.ldb_credit_pool_id = 0;
382                 handle->cfg.num_ldb_credits = 0;
383                 return 0;
384         }
385
386         cfg.response = (uintptr_t)&response;
387         cfg.num_ldb_credits = handle->cfg.resources.num_ldb_credits;
388
389         ret = dlb_iface_ldb_credit_pool_create(handle,
390                                                &cfg);
391         if (ret < 0) {
392                 DLB_LOG_ERR("dlb: ldb_credit_pool_create ret=%d (driver status: %s)\n",
393                             ret, dlb_error_strings[response.status]);
394         }
395
396         handle->cfg.ldb_credit_pool_id = response.id;
397         handle->cfg.num_ldb_credits = cfg.num_ldb_credits;
398
399         return ret;
400 }
401
402 static int
403 dlb_dir_credit_pool_create(struct dlb_hw_dev *handle)
404 {
405         struct dlb_create_dir_pool_args cfg;
406         struct dlb_cmd_response response;
407         int ret;
408
409         if (handle == NULL)
410                 return -EINVAL;
411
412         if (!handle->cfg.resources.num_dir_credits) {
413                 handle->cfg.dir_credit_pool_id = 0;
414                 handle->cfg.num_dir_credits = 0;
415                 return 0;
416         }
417
418         cfg.response = (uintptr_t)&response;
419         cfg.num_dir_credits = handle->cfg.resources.num_dir_credits;
420
421         ret = dlb_iface_dir_credit_pool_create(handle, &cfg);
422         if (ret < 0)
423                 DLB_LOG_ERR("dlb: dir_credit_pool_create ret=%d (driver status: %s)\n",
424                             ret, dlb_error_strings[response.status]);
425
426         handle->cfg.dir_credit_pool_id = response.id;
427         handle->cfg.num_dir_credits = cfg.num_dir_credits;
428
429         return ret;
430 }
431
432 static int
433 dlb_hw_create_sched_domain(struct dlb_hw_dev *handle,
434                            struct dlb_eventdev *dlb,
435                            const struct dlb_hw_rsrcs *resources_asked)
436 {
437         int ret = 0;
438         struct dlb_create_sched_domain_args *config_params;
439         struct dlb_cmd_response response;
440
441         if (resources_asked == NULL) {
442                 DLB_LOG_ERR("dlb: dlb_create NULL parameter\n");
443                 ret = EINVAL;
444                 goto error_exit;
445         }
446
447         /* Map generic qm resources to dlb resources */
448         config_params = &handle->cfg.resources;
449
450         config_params->response = (uintptr_t)&response;
451
452         /* DIR ports and queues */
453
454         config_params->num_dir_ports =
455                 resources_asked->num_dir_ports;
456
457         config_params->num_dir_credits =
458                 resources_asked->num_dir_credits;
459
460         /* LDB ports and queues */
461
462         config_params->num_ldb_queues =
463                 resources_asked->num_ldb_queues;
464
465         config_params->num_ldb_ports =
466                 resources_asked->num_ldb_ports;
467
468         config_params->num_ldb_credits =
469                 resources_asked->num_ldb_credits;
470
471         config_params->num_atomic_inflights =
472                 dlb->num_atm_inflights_per_queue *
473                 config_params->num_ldb_queues;
474
475         config_params->num_hist_list_entries = config_params->num_ldb_ports *
476                 DLB_NUM_HIST_LIST_ENTRIES_PER_LDB_PORT;
477
478         /* dlb limited to 1 credit pool per queue type */
479         config_params->num_ldb_credit_pools = 1;
480         config_params->num_dir_credit_pools = 1;
481
482         DLB_LOG_DBG("sched domain create - ldb_qs=%d, ldb_ports=%d, dir_ports=%d, atomic_inflights=%d, hist_list_entries=%d, ldb_credits=%d, dir_credits=%d, ldb_cred_pools=%d, dir-credit_pools=%d\n",
483                     config_params->num_ldb_queues,
484                     config_params->num_ldb_ports,
485                     config_params->num_dir_ports,
486                     config_params->num_atomic_inflights,
487                     config_params->num_hist_list_entries,
488                     config_params->num_ldb_credits,
489                     config_params->num_dir_credits,
490                     config_params->num_ldb_credit_pools,
491                     config_params->num_dir_credit_pools);
492
493         /* Configure the QM */
494
495         ret = dlb_iface_sched_domain_create(handle, config_params);
496         if (ret < 0) {
497                 DLB_LOG_ERR("dlb: domain create failed, device_id = %d, (driver ret = %d, extra status: %s)\n",
498                             handle->device_id,
499                             ret,
500                             dlb_error_strings[response.status]);
501                 goto error_exit;
502         }
503
504         handle->domain_id = response.id;
505         handle->domain_id_valid = 1;
506
507         config_params->response = 0;
508
509         ret = dlb_ldb_credit_pool_create(handle);
510         if (ret < 0) {
511                 DLB_LOG_ERR("dlb: create ldb credit pool failed\n");
512                 goto error_exit2;
513         }
514
515         ret = dlb_dir_credit_pool_create(handle);
516         if (ret < 0) {
517                 DLB_LOG_ERR("dlb: create dir credit pool failed\n");
518                 goto error_exit2;
519         }
520
521         handle->cfg.configured = true;
522
523         return 0;
524
525 error_exit2:
526         dlb_iface_domain_close(dlb);
527
528 error_exit:
529         return ret;
530 }
531
532 /* End HW specific */
533 static void
534 dlb_eventdev_info_get(struct rte_eventdev *dev,
535                       struct rte_event_dev_info *dev_info)
536 {
537         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
538         int ret;
539
540         ret = dlb_hw_query_resources(dlb);
541         if (ret) {
542                 const struct rte_eventdev_data *data = dev->data;
543
544                 DLB_LOG_ERR("get resources err=%d, devid=%d\n",
545                             ret, data->dev_id);
546                 /* fn is void, so fall through and return values set up in
547                  * probe
548                  */
549         }
550
551         /* Add num resources currently owned by this domain.
552          * These would become available if the scheduling domain were reset due
553          * to the application recalling eventdev_configure to *reconfigure* the
554          * domain.
555          */
556         evdev_dlb_default_info.max_event_ports += dlb->num_ldb_ports;
557         evdev_dlb_default_info.max_event_queues += dlb->num_ldb_queues;
558         evdev_dlb_default_info.max_num_events += dlb->num_ldb_credits;
559
560         /* In DLB A-stepping hardware, applications are limited to 128
561          * configured ports (load-balanced or directed). The reported number of
562          * available ports must reflect this.
563          */
564         if (dlb->revision < DLB_REV_B0) {
565                 int used_ports;
566
567                 used_ports = DLB_MAX_NUM_LDB_PORTS + DLB_MAX_NUM_DIR_PORTS -
568                         dlb->hw_rsrc_query_results.num_ldb_ports -
569                         dlb->hw_rsrc_query_results.num_dir_ports;
570
571                 evdev_dlb_default_info.max_event_ports =
572                         RTE_MIN(evdev_dlb_default_info.max_event_ports,
573                                 128 - used_ports);
574         }
575
576         evdev_dlb_default_info.max_event_queues =
577                 RTE_MIN(evdev_dlb_default_info.max_event_queues,
578                         RTE_EVENT_MAX_QUEUES_PER_DEV);
579
580         evdev_dlb_default_info.max_num_events =
581                 RTE_MIN(evdev_dlb_default_info.max_num_events,
582                         dlb->max_num_events_override);
583
584         *dev_info = evdev_dlb_default_info;
585 }
586
587 /* Note: 1 QM instance per QM device, QM instance/device == event device */
588 static int
589 dlb_eventdev_configure(const struct rte_eventdev *dev)
590 {
591         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
592         struct dlb_hw_dev *handle = &dlb->qm_instance;
593         struct dlb_hw_rsrcs *rsrcs = &handle->info.hw_rsrc_max;
594         const struct rte_eventdev_data *data = dev->data;
595         const struct rte_event_dev_config *config = &data->dev_conf;
596         int ret;
597
598         /* If this eventdev is already configured, we must release the current
599          * scheduling domain before attempting to configure a new one.
600          */
601         if (dlb->configured) {
602                 dlb_hw_reset_sched_domain(dev, true);
603
604                 ret = dlb_hw_query_resources(dlb);
605                 if (ret) {
606                         DLB_LOG_ERR("get resources err=%d, devid=%d\n",
607                                     ret, data->dev_id);
608                         return ret;
609                 }
610         }
611
612         if (config->nb_event_queues > rsrcs->num_queues) {
613                 DLB_LOG_ERR("nb_event_queues parameter (%d) exceeds the QM device's capabilities (%d).\n",
614                             config->nb_event_queues,
615                             rsrcs->num_queues);
616                 return -EINVAL;
617         }
618         if (config->nb_event_ports > (rsrcs->num_ldb_ports
619                         + rsrcs->num_dir_ports)) {
620                 DLB_LOG_ERR("nb_event_ports parameter (%d) exceeds the QM device's capabilities (%d).\n",
621                             config->nb_event_ports,
622                             (rsrcs->num_ldb_ports + rsrcs->num_dir_ports));
623                 return -EINVAL;
624         }
625         if (config->nb_events_limit > rsrcs->nb_events_limit) {
626                 DLB_LOG_ERR("nb_events_limit parameter (%d) exceeds the QM device's capabilities (%d).\n",
627                             config->nb_events_limit,
628                             rsrcs->nb_events_limit);
629                 return -EINVAL;
630         }
631
632         if (config->event_dev_cfg & RTE_EVENT_DEV_CFG_PER_DEQUEUE_TIMEOUT)
633                 dlb->global_dequeue_wait = false;
634         else {
635                 uint32_t timeout32;
636
637                 dlb->global_dequeue_wait = true;
638
639                 timeout32 = config->dequeue_timeout_ns;
640
641                 dlb->global_dequeue_wait_ticks =
642                         timeout32 * (rte_get_timer_hz() / 1E9);
643         }
644
645         /* Does this platform support umonitor/umwait? */
646         if (rte_cpu_get_flag_enabled(RTE_CPUFLAG_WAITPKG)) {
647                 if (RTE_LIBRTE_PMD_DLB_UMWAIT_CTL_STATE != 0 &&
648                     RTE_LIBRTE_PMD_DLB_UMWAIT_CTL_STATE != 1) {
649                         DLB_LOG_ERR("invalid value (%d) for RTE_LIBRTE_PMD_DLB_UMWAIT_CTL_STATE must be 0 or 1.\n",
650                                     RTE_LIBRTE_PMD_DLB_UMWAIT_CTL_STATE);
651                         return -EINVAL;
652                 }
653                 dlb->umwait_allowed = true;
654         }
655
656         rsrcs->num_dir_ports = config->nb_single_link_event_port_queues;
657         rsrcs->num_ldb_ports = config->nb_event_ports - rsrcs->num_dir_ports;
658         /* 1 dir queue per dir port */
659         rsrcs->num_ldb_queues = config->nb_event_queues - rsrcs->num_dir_ports;
660
661         /* Scale down nb_events_limit by 4 for directed credits, since there
662          * are 4x as many load-balanced credits.
663          */
664         rsrcs->num_ldb_credits = 0;
665         rsrcs->num_dir_credits = 0;
666
667         if (rsrcs->num_ldb_queues)
668                 rsrcs->num_ldb_credits = config->nb_events_limit;
669         if (rsrcs->num_dir_ports)
670                 rsrcs->num_dir_credits = config->nb_events_limit / 4;
671         if (dlb->num_dir_credits_override != -1)
672                 rsrcs->num_dir_credits = dlb->num_dir_credits_override;
673
674         if (dlb_hw_create_sched_domain(handle, dlb, rsrcs) < 0) {
675                 DLB_LOG_ERR("dlb_hw_create_sched_domain failed\n");
676                 return -ENODEV;
677         }
678
679         dlb->new_event_limit = config->nb_events_limit;
680         __atomic_store_n(&dlb->inflights, 0, __ATOMIC_SEQ_CST);
681
682         /* Save number of ports/queues for this event dev */
683         dlb->num_ports = config->nb_event_ports;
684         dlb->num_queues = config->nb_event_queues;
685         dlb->num_dir_ports = rsrcs->num_dir_ports;
686         dlb->num_ldb_ports = dlb->num_ports - dlb->num_dir_ports;
687         dlb->num_ldb_queues = dlb->num_queues - dlb->num_dir_ports;
688         dlb->num_dir_queues = dlb->num_dir_ports;
689         dlb->num_ldb_credits = rsrcs->num_ldb_credits;
690         dlb->num_dir_credits = rsrcs->num_dir_credits;
691
692         dlb->configured = true;
693
694         return 0;
695 }
696
697 static int16_t
698 dlb_hw_unmap_ldb_qid_from_port(struct dlb_hw_dev *handle,
699                                uint32_t qm_port_id,
700                                uint16_t qm_qid)
701 {
702         struct dlb_unmap_qid_args cfg;
703         struct dlb_cmd_response response;
704         int32_t ret;
705
706         if (handle == NULL)
707                 return -EINVAL;
708
709         cfg.response = (uintptr_t)&response;
710         cfg.port_id = qm_port_id;
711         cfg.qid = qm_qid;
712
713         ret = dlb_iface_unmap_qid(handle, &cfg);
714         if (ret < 0)
715                 DLB_LOG_ERR("dlb: unmap qid error, ret=%d (driver status: %s)\n",
716                             ret, dlb_error_strings[response.status]);
717
718         return ret;
719 }
720
721 static int
722 dlb_event_queue_detach_ldb(struct dlb_eventdev *dlb,
723                            struct dlb_eventdev_port *ev_port,
724                            struct dlb_eventdev_queue *ev_queue)
725 {
726         int ret, i;
727
728         /* Don't unlink until start time. */
729         if (dlb->run_state == DLB_RUN_STATE_STOPPED)
730                 return 0;
731
732         for (i = 0; i < DLB_MAX_NUM_QIDS_PER_LDB_CQ; i++) {
733                 if (ev_port->link[i].valid &&
734                     ev_port->link[i].queue_id == ev_queue->id)
735                         break; /* found */
736         }
737
738         /* This is expected with eventdev API!
739          * It blindly attempts to unmap all queues.
740          */
741         if (i == DLB_MAX_NUM_QIDS_PER_LDB_CQ) {
742                 DLB_LOG_DBG("dlb: ignoring LB QID %d not mapped for qm_port %d.\n",
743                             ev_queue->qm_queue.id,
744                             ev_port->qm_port.id);
745                 return 0;
746         }
747
748         ret = dlb_hw_unmap_ldb_qid_from_port(&dlb->qm_instance,
749                                              ev_port->qm_port.id,
750                                              ev_queue->qm_queue.id);
751         if (!ret)
752                 ev_port->link[i].mapped = false;
753
754         return ret;
755 }
756
757 static int
758 dlb_eventdev_port_unlink(struct rte_eventdev *dev, void *event_port,
759                          uint8_t queues[], uint16_t nb_unlinks)
760 {
761         struct dlb_eventdev_port *ev_port = event_port;
762         struct dlb_eventdev *dlb;
763         int i;
764
765         RTE_SET_USED(dev);
766
767         if (!ev_port->setup_done) {
768                 DLB_LOG_ERR("dlb: evport %d is not configured\n",
769                             ev_port->id);
770                 rte_errno = -EINVAL;
771                 return 0;
772         }
773
774         if (queues == NULL || nb_unlinks == 0) {
775                 DLB_LOG_DBG("dlb: queues is NULL or nb_unlinks is 0\n");
776                 return 0; /* Ignore and return success */
777         }
778
779         if (ev_port->qm_port.is_directed) {
780                 DLB_LOG_DBG("dlb: ignore unlink from dir port %d\n",
781                             ev_port->id);
782                 rte_errno = 0;
783                 return nb_unlinks; /* as if success */
784         }
785
786         dlb = ev_port->dlb;
787
788         for (i = 0; i < nb_unlinks; i++) {
789                 struct dlb_eventdev_queue *ev_queue;
790                 int ret, j;
791
792                 if (queues[i] >= dlb->num_queues) {
793                         DLB_LOG_ERR("dlb: invalid queue id %d\n", queues[i]);
794                         rte_errno = -EINVAL;
795                         return i; /* return index of offending queue */
796                 }
797
798                 ev_queue = &dlb->ev_queues[queues[i]];
799
800                 /* Does a link exist? */
801                 for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++)
802                         if (ev_port->link[j].queue_id == queues[i] &&
803                             ev_port->link[j].valid)
804                                 break;
805
806                 if (j == DLB_MAX_NUM_QIDS_PER_LDB_CQ)
807                         continue;
808
809                 ret = dlb_event_queue_detach_ldb(dlb, ev_port, ev_queue);
810                 if (ret) {
811                         DLB_LOG_ERR("unlink err=%d for port %d queue %d\n",
812                                     ret, ev_port->id, queues[i]);
813                         rte_errno = -ENOENT;
814                         return i; /* return index of offending queue */
815                 }
816
817                 ev_port->link[j].valid = false;
818                 ev_port->num_links--;
819                 ev_queue->num_links--;
820         }
821
822         return nb_unlinks;
823 }
824
825 static int
826 dlb_eventdev_port_unlinks_in_progress(struct rte_eventdev *dev,
827                                       void *event_port)
828 {
829         struct dlb_eventdev_port *ev_port = event_port;
830         struct dlb_eventdev *dlb;
831         struct dlb_hw_dev *handle;
832         struct dlb_pending_port_unmaps_args cfg;
833         struct dlb_cmd_response response;
834         int ret;
835
836         RTE_SET_USED(dev);
837
838         if (!ev_port->setup_done) {
839                 DLB_LOG_ERR("dlb: evport %d is not configured\n",
840                             ev_port->id);
841                 rte_errno = -EINVAL;
842                 return 0;
843         }
844
845         cfg.port_id = ev_port->qm_port.id;
846         cfg.response = (uintptr_t)&response;
847         dlb = ev_port->dlb;
848         handle = &dlb->qm_instance;
849         ret = dlb_iface_pending_port_unmaps(handle, &cfg);
850
851         if (ret < 0) {
852                 DLB_LOG_ERR("dlb: num_unlinks_in_progress ret=%d (driver status: %s)\n",
853                             ret, dlb_error_strings[response.status]);
854                 return ret;
855         }
856
857         return response.id;
858 }
859
860 static void
861 dlb_eventdev_port_default_conf_get(struct rte_eventdev *dev,
862                                    uint8_t port_id,
863                                    struct rte_event_port_conf *port_conf)
864 {
865         RTE_SET_USED(port_id);
866         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
867
868         port_conf->new_event_threshold = dlb->new_event_limit;
869         port_conf->dequeue_depth = 32;
870         port_conf->enqueue_depth = DLB_MAX_ENQUEUE_DEPTH;
871         port_conf->event_port_cfg = 0;
872 }
873
874 static void
875 dlb_eventdev_queue_default_conf_get(struct rte_eventdev *dev,
876                                     uint8_t queue_id,
877                                     struct rte_event_queue_conf *queue_conf)
878 {
879         RTE_SET_USED(dev);
880         RTE_SET_USED(queue_id);
881         queue_conf->nb_atomic_flows = 1024;
882         queue_conf->nb_atomic_order_sequences = 32;
883         queue_conf->event_queue_cfg = 0;
884         queue_conf->priority = 0;
885 }
886
887 static int
888 dlb_hw_create_ldb_port(struct dlb_eventdev *dlb,
889                        struct dlb_eventdev_port *ev_port,
890                        uint32_t dequeue_depth,
891                        uint32_t cq_depth,
892                        uint32_t enqueue_depth,
893                        uint16_t rsvd_tokens,
894                        bool use_rsvd_token_scheme)
895 {
896         struct dlb_hw_dev *handle = &dlb->qm_instance;
897         struct dlb_create_ldb_port_args cfg = {0};
898         struct dlb_cmd_response response = {0};
899         int ret;
900         struct dlb_port *qm_port = NULL;
901         char mz_name[RTE_MEMZONE_NAMESIZE];
902         uint32_t qm_port_id;
903
904         if (handle == NULL)
905                 return -EINVAL;
906
907         if (cq_depth < DLB_MIN_LDB_CQ_DEPTH) {
908                 DLB_LOG_ERR("dlb: invalid cq_depth, must be %d-%d\n",
909                         DLB_MIN_LDB_CQ_DEPTH, DLB_MAX_INPUT_QUEUE_DEPTH);
910                 return -EINVAL;
911         }
912
913         if (enqueue_depth < DLB_MIN_ENQUEUE_DEPTH) {
914                 DLB_LOG_ERR("dlb: invalid enqueue_depth, must be at least %d\n",
915                             DLB_MIN_ENQUEUE_DEPTH);
916                 return -EINVAL;
917         }
918
919         rte_spinlock_lock(&handle->resource_lock);
920
921         cfg.response = (uintptr_t)&response;
922
923         /* We round up to the next power of 2 if necessary */
924         cfg.cq_depth = rte_align32pow2(cq_depth);
925         cfg.cq_depth_threshold = rsvd_tokens;
926
927         cfg.cq_history_list_size = DLB_NUM_HIST_LIST_ENTRIES_PER_LDB_PORT;
928
929         /* User controls the LDB high watermark via enqueue depth. The DIR high
930          * watermark is equal, unless the directed credit pool is too small.
931          */
932         cfg.ldb_credit_high_watermark = enqueue_depth;
933
934         /* If there are no directed ports, the kernel driver will ignore this
935          * port's directed credit settings. Don't use enqueue_depth if it would
936          * require more directed credits than are available.
937          */
938         cfg.dir_credit_high_watermark =
939                 RTE_MIN(enqueue_depth,
940                         handle->cfg.num_dir_credits / dlb->num_ports);
941
942         cfg.ldb_credit_quantum = cfg.ldb_credit_high_watermark / 2;
943         cfg.ldb_credit_low_watermark = RTE_MIN(16, cfg.ldb_credit_quantum);
944
945         cfg.dir_credit_quantum = cfg.dir_credit_high_watermark / 2;
946         cfg.dir_credit_low_watermark = RTE_MIN(16, cfg.dir_credit_quantum);
947
948         /* Per QM values */
949
950         cfg.ldb_credit_pool_id = handle->cfg.ldb_credit_pool_id;
951         cfg.dir_credit_pool_id = handle->cfg.dir_credit_pool_id;
952
953         ret = dlb_iface_ldb_port_create(handle, &cfg, dlb->poll_mode);
954         if (ret < 0) {
955                 DLB_LOG_ERR("dlb: dlb_ldb_port_create error, ret=%d (driver status: %s)\n",
956                             ret, dlb_error_strings[response.status]);
957                 goto error_exit;
958         }
959
960         qm_port_id = response.id;
961
962         DLB_LOG_DBG("dlb: ev_port %d uses qm LB port %d <<<<<\n",
963                     ev_port->id, qm_port_id);
964
965         qm_port = &ev_port->qm_port;
966         qm_port->ev_port = ev_port; /* back ptr */
967         qm_port->dlb = dlb; /* back ptr */
968
969         /*
970          * Allocate and init local qe struct(s).
971          * Note: MOVDIR64 requires the enqueue QE (qe4) to be aligned.
972          */
973
974         snprintf(mz_name, sizeof(mz_name), "ldb_port%d",
975                  ev_port->id);
976
977         ret = dlb_init_qe_mem(qm_port, mz_name);
978         if (ret < 0) {
979                 DLB_LOG_ERR("dlb: init_qe_mem failed, ret=%d\n", ret);
980                 goto error_exit;
981         }
982
983         qm_port->pp_mmio_base = DLB_LDB_PP_BASE + PAGE_SIZE * qm_port_id;
984         qm_port->id = qm_port_id;
985
986         /* The credit window is one high water mark of QEs */
987         qm_port->ldb_pushcount_at_credit_expiry = 0;
988         qm_port->cached_ldb_credits = cfg.ldb_credit_high_watermark;
989         /* The credit window is one high water mark of QEs */
990         qm_port->dir_pushcount_at_credit_expiry = 0;
991         qm_port->cached_dir_credits = cfg.dir_credit_high_watermark;
992         qm_port->cq_depth = cfg.cq_depth;
993         /* CQs with depth < 8 use an 8-entry queue, but withhold credits so
994          * the effective depth is smaller.
995          */
996         qm_port->cq_depth = cfg.cq_depth <= 8 ? 8 : cfg.cq_depth;
997         qm_port->cq_idx = 0;
998         qm_port->cq_idx_unmasked = 0;
999         if (dlb->poll_mode == DLB_CQ_POLL_MODE_SPARSE)
1000                 qm_port->cq_depth_mask = (qm_port->cq_depth * 4) - 1;
1001         else
1002                 qm_port->cq_depth_mask = qm_port->cq_depth - 1;
1003
1004         qm_port->gen_bit_shift = __builtin_popcount(qm_port->cq_depth_mask);
1005         /* starting value of gen bit - it toggles at wrap time */
1006         qm_port->gen_bit = 1;
1007
1008         qm_port->use_rsvd_token_scheme = use_rsvd_token_scheme;
1009         qm_port->cq_rsvd_token_deficit = rsvd_tokens;
1010         qm_port->int_armed = false;
1011
1012         /* Save off for later use in info and lookup APIs. */
1013         qm_port->qid_mappings = &dlb->qm_ldb_to_ev_queue_id[0];
1014
1015         qm_port->dequeue_depth = dequeue_depth;
1016
1017         qm_port->owed_tokens = 0;
1018         qm_port->issued_releases = 0;
1019
1020         /* update state */
1021         qm_port->state = PORT_STARTED; /* enabled at create time */
1022         qm_port->config_state = DLB_CONFIGURED;
1023
1024         qm_port->dir_credits = cfg.dir_credit_high_watermark;
1025         qm_port->ldb_credits = cfg.ldb_credit_high_watermark;
1026
1027         DLB_LOG_DBG("dlb: created ldb port %d, depth = %d, ldb credits=%d, dir credits=%d\n",
1028                     qm_port_id,
1029                     cq_depth,
1030                     qm_port->ldb_credits,
1031                     qm_port->dir_credits);
1032
1033         rte_spinlock_unlock(&handle->resource_lock);
1034
1035         return 0;
1036
1037 error_exit:
1038         if (qm_port) {
1039                 dlb_free_qe_mem(qm_port);
1040                 qm_port->pp_mmio_base = 0;
1041         }
1042
1043         rte_spinlock_unlock(&handle->resource_lock);
1044
1045         DLB_LOG_ERR("dlb: create ldb port failed!\n");
1046
1047         return ret;
1048 }
1049
1050 static int
1051 dlb_hw_create_dir_port(struct dlb_eventdev *dlb,
1052                        struct dlb_eventdev_port *ev_port,
1053                        uint32_t dequeue_depth,
1054                        uint32_t cq_depth,
1055                        uint32_t enqueue_depth,
1056                        uint16_t rsvd_tokens,
1057                        bool use_rsvd_token_scheme)
1058 {
1059         struct dlb_hw_dev *handle = &dlb->qm_instance;
1060         struct dlb_create_dir_port_args cfg = {0};
1061         struct dlb_cmd_response response = {0};
1062         int ret;
1063         struct dlb_port *qm_port = NULL;
1064         char mz_name[RTE_MEMZONE_NAMESIZE];
1065         uint32_t qm_port_id;
1066
1067         if (dlb == NULL || handle == NULL)
1068                 return -EINVAL;
1069
1070         if (cq_depth < DLB_MIN_DIR_CQ_DEPTH) {
1071                 DLB_LOG_ERR("dlb: invalid cq_depth, must be at least %d\n",
1072                             DLB_MIN_DIR_CQ_DEPTH);
1073                 return -EINVAL;
1074         }
1075
1076         if (enqueue_depth < DLB_MIN_ENQUEUE_DEPTH) {
1077                 DLB_LOG_ERR("dlb: invalid enqueue_depth, must be at least %d\n",
1078                             DLB_MIN_ENQUEUE_DEPTH);
1079                 return -EINVAL;
1080         }
1081
1082         rte_spinlock_lock(&handle->resource_lock);
1083
1084         /* Directed queues are configured at link time. */
1085         cfg.queue_id = -1;
1086
1087         cfg.response = (uintptr_t)&response;
1088
1089         /* We round up to the next power of 2 if necessary */
1090         cfg.cq_depth = rte_align32pow2(cq_depth);
1091         cfg.cq_depth_threshold = rsvd_tokens;
1092
1093         /* User controls the LDB high watermark via enqueue depth. The DIR high
1094          * watermark is equal, unless the directed credit pool is too small.
1095          */
1096         cfg.ldb_credit_high_watermark = enqueue_depth;
1097
1098         /* Don't use enqueue_depth if it would require more directed credits
1099          * than are available.
1100          */
1101         cfg.dir_credit_high_watermark =
1102                 RTE_MIN(enqueue_depth,
1103                         handle->cfg.num_dir_credits / dlb->num_ports);
1104
1105         cfg.ldb_credit_quantum = cfg.ldb_credit_high_watermark / 2;
1106         cfg.ldb_credit_low_watermark = RTE_MIN(16, cfg.ldb_credit_quantum);
1107
1108         cfg.dir_credit_quantum = cfg.dir_credit_high_watermark / 2;
1109         cfg.dir_credit_low_watermark = RTE_MIN(16, cfg.dir_credit_quantum);
1110
1111         /* Per QM values */
1112
1113         cfg.ldb_credit_pool_id = handle->cfg.ldb_credit_pool_id;
1114         cfg.dir_credit_pool_id = handle->cfg.dir_credit_pool_id;
1115
1116         ret = dlb_iface_dir_port_create(handle, &cfg, dlb->poll_mode);
1117         if (ret < 0) {
1118                 DLB_LOG_ERR("dlb: dlb_dir_port_create error, ret=%d (driver status: %s)\n",
1119                             ret, dlb_error_strings[response.status]);
1120                 goto error_exit;
1121         }
1122
1123         qm_port_id = response.id;
1124
1125         DLB_LOG_DBG("dlb: ev_port %d uses qm DIR port %d <<<<<\n",
1126                     ev_port->id, qm_port_id);
1127
1128         qm_port = &ev_port->qm_port;
1129         qm_port->ev_port = ev_port; /* back ptr */
1130         qm_port->dlb = dlb;  /* back ptr */
1131
1132         /*
1133          * Init local qe struct(s).
1134          * Note: MOVDIR64 requires the enqueue QE to be aligned
1135          */
1136
1137         snprintf(mz_name, sizeof(mz_name), "dir_port%d",
1138                  ev_port->id);
1139
1140         ret = dlb_init_qe_mem(qm_port, mz_name);
1141
1142         if (ret < 0) {
1143                 DLB_LOG_ERR("dlb: init_qe_mem failed, ret=%d\n", ret);
1144                 goto error_exit;
1145         }
1146
1147         qm_port->pp_mmio_base = DLB_DIR_PP_BASE + PAGE_SIZE * qm_port_id;
1148         qm_port->id = qm_port_id;
1149
1150         /* The credit window is one high water mark of QEs */
1151         qm_port->ldb_pushcount_at_credit_expiry = 0;
1152         qm_port->cached_ldb_credits = cfg.ldb_credit_high_watermark;
1153         /* The credit window is one high water mark of QEs */
1154         qm_port->dir_pushcount_at_credit_expiry = 0;
1155         qm_port->cached_dir_credits = cfg.dir_credit_high_watermark;
1156         qm_port->cq_depth = cfg.cq_depth;
1157         qm_port->cq_idx = 0;
1158         qm_port->cq_idx_unmasked = 0;
1159         if (dlb->poll_mode == DLB_CQ_POLL_MODE_SPARSE)
1160                 qm_port->cq_depth_mask = (cfg.cq_depth * 4) - 1;
1161         else
1162                 qm_port->cq_depth_mask = cfg.cq_depth - 1;
1163
1164         qm_port->gen_bit_shift = __builtin_popcount(qm_port->cq_depth_mask);
1165         /* starting value of gen bit - it toggles at wrap time */
1166         qm_port->gen_bit = 1;
1167
1168         qm_port->use_rsvd_token_scheme = use_rsvd_token_scheme;
1169         qm_port->cq_rsvd_token_deficit = rsvd_tokens;
1170         qm_port->int_armed = false;
1171
1172         /* Save off for later use in info and lookup APIs. */
1173         qm_port->qid_mappings = &dlb->qm_dir_to_ev_queue_id[0];
1174
1175         qm_port->dequeue_depth = dequeue_depth;
1176
1177         qm_port->owed_tokens = 0;
1178         qm_port->issued_releases = 0;
1179
1180         /* update state */
1181         qm_port->state = PORT_STARTED; /* enabled at create time */
1182         qm_port->config_state = DLB_CONFIGURED;
1183
1184         qm_port->dir_credits = cfg.dir_credit_high_watermark;
1185         qm_port->ldb_credits = cfg.ldb_credit_high_watermark;
1186
1187         DLB_LOG_DBG("dlb: created dir port %d, depth = %d cr=%d,%d\n",
1188                     qm_port_id,
1189                     cq_depth,
1190                     cfg.dir_credit_high_watermark,
1191                     cfg.ldb_credit_high_watermark);
1192
1193         rte_spinlock_unlock(&handle->resource_lock);
1194
1195         return 0;
1196
1197 error_exit:
1198         if (qm_port) {
1199                 qm_port->pp_mmio_base = 0;
1200                 dlb_free_qe_mem(qm_port);
1201         }
1202
1203         rte_spinlock_unlock(&handle->resource_lock);
1204
1205         DLB_LOG_ERR("dlb: create dir port failed!\n");
1206
1207         return ret;
1208 }
1209
1210 static int32_t
1211 dlb_hw_create_ldb_queue(struct dlb_eventdev *dlb,
1212                         struct dlb_queue *queue,
1213                         const struct rte_event_queue_conf *evq_conf)
1214 {
1215         struct dlb_hw_dev *handle = &dlb->qm_instance;
1216         struct dlb_create_ldb_queue_args cfg;
1217         struct dlb_cmd_response response;
1218         int32_t ret;
1219         uint32_t qm_qid;
1220         int sched_type = -1;
1221
1222         if (evq_conf == NULL)
1223                 return -EINVAL;
1224
1225         if (evq_conf->event_queue_cfg & RTE_EVENT_QUEUE_CFG_ALL_TYPES) {
1226                 if (evq_conf->nb_atomic_order_sequences != 0)
1227                         sched_type = RTE_SCHED_TYPE_ORDERED;
1228                 else
1229                         sched_type = RTE_SCHED_TYPE_PARALLEL;
1230         } else
1231                 sched_type = evq_conf->schedule_type;
1232
1233         cfg.response = (uintptr_t)&response;
1234         cfg.num_atomic_inflights = dlb->num_atm_inflights_per_queue;
1235         cfg.num_sequence_numbers = evq_conf->nb_atomic_order_sequences;
1236         cfg.num_qid_inflights = evq_conf->nb_atomic_order_sequences;
1237
1238         if (sched_type != RTE_SCHED_TYPE_ORDERED) {
1239                 cfg.num_sequence_numbers = 0;
1240                 cfg.num_qid_inflights = DLB_DEF_UNORDERED_QID_INFLIGHTS;
1241         }
1242
1243         ret = dlb_iface_ldb_queue_create(handle, &cfg);
1244         if (ret < 0) {
1245                 DLB_LOG_ERR("dlb: create LB event queue error, ret=%d (driver status: %s)\n",
1246                             ret, dlb_error_strings[response.status]);
1247                 return -EINVAL;
1248         }
1249
1250         qm_qid = response.id;
1251
1252         /* Save off queue config for debug, resource lookups, and reconfig */
1253         queue->num_qid_inflights = cfg.num_qid_inflights;
1254         queue->num_atm_inflights = cfg.num_atomic_inflights;
1255
1256         queue->sched_type = sched_type;
1257         queue->config_state = DLB_CONFIGURED;
1258
1259         DLB_LOG_DBG("Created LB event queue %d, nb_inflights=%d, nb_seq=%d, qid inflights=%d\n",
1260                     qm_qid,
1261                     cfg.num_atomic_inflights,
1262                     cfg.num_sequence_numbers,
1263                     cfg.num_qid_inflights);
1264
1265         return qm_qid;
1266 }
1267
1268 static int32_t
1269 dlb_get_sn_allocation(struct dlb_eventdev *dlb, int group)
1270 {
1271         struct dlb_hw_dev *handle = &dlb->qm_instance;
1272         struct dlb_get_sn_allocation_args cfg;
1273         struct dlb_cmd_response response;
1274         int ret;
1275
1276         cfg.group = group;
1277         cfg.response = (uintptr_t)&response;
1278
1279         ret = dlb_iface_get_sn_allocation(handle, &cfg);
1280         if (ret < 0) {
1281                 DLB_LOG_ERR("dlb: get_sn_allocation ret=%d (driver status: %s)\n",
1282                             ret, dlb_error_strings[response.status]);
1283                 return ret;
1284         }
1285
1286         return response.id;
1287 }
1288
1289 static int
1290 dlb_set_sn_allocation(struct dlb_eventdev *dlb, int group, int num)
1291 {
1292         struct dlb_hw_dev *handle = &dlb->qm_instance;
1293         struct dlb_set_sn_allocation_args cfg;
1294         struct dlb_cmd_response response;
1295         int ret;
1296
1297         cfg.num = num;
1298         cfg.group = group;
1299         cfg.response = (uintptr_t)&response;
1300
1301         ret = dlb_iface_set_sn_allocation(handle, &cfg);
1302         if (ret < 0) {
1303                 DLB_LOG_ERR("dlb: set_sn_allocation ret=%d (driver status: %s)\n",
1304                             ret, dlb_error_strings[response.status]);
1305                 return ret;
1306         }
1307
1308         return ret;
1309 }
1310
1311 static int32_t
1312 dlb_get_sn_occupancy(struct dlb_eventdev *dlb, int group)
1313 {
1314         struct dlb_hw_dev *handle = &dlb->qm_instance;
1315         struct dlb_get_sn_occupancy_args cfg;
1316         struct dlb_cmd_response response;
1317         int ret;
1318
1319         cfg.group = group;
1320         cfg.response = (uintptr_t)&response;
1321
1322         ret = dlb_iface_get_sn_occupancy(handle, &cfg);
1323         if (ret < 0) {
1324                 DLB_LOG_ERR("dlb: get_sn_occupancy ret=%d (driver status: %s)\n",
1325                             ret, dlb_error_strings[response.status]);
1326                 return ret;
1327         }
1328
1329         return response.id;
1330 }
1331
1332 /* Query the current sequence number allocations and, if they conflict with the
1333  * requested LDB queue configuration, attempt to re-allocate sequence numbers.
1334  * This is best-effort; if it fails, the PMD will attempt to configure the
1335  * load-balanced queue and return an error.
1336  */
1337 static void
1338 dlb_program_sn_allocation(struct dlb_eventdev *dlb,
1339                           const struct rte_event_queue_conf *queue_conf)
1340 {
1341         int grp_occupancy[DLB_NUM_SN_GROUPS];
1342         int grp_alloc[DLB_NUM_SN_GROUPS];
1343         int i, sequence_numbers;
1344
1345         sequence_numbers = (int)queue_conf->nb_atomic_order_sequences;
1346
1347         for (i = 0; i < DLB_NUM_SN_GROUPS; i++) {
1348                 int total_slots;
1349
1350                 grp_alloc[i] = dlb_get_sn_allocation(dlb, i);
1351                 if (grp_alloc[i] < 0)
1352                         return;
1353
1354                 total_slots = DLB_MAX_LDB_SN_ALLOC / grp_alloc[i];
1355
1356                 grp_occupancy[i] = dlb_get_sn_occupancy(dlb, i);
1357                 if (grp_occupancy[i] < 0)
1358                         return;
1359
1360                 /* DLB has at least one available slot for the requested
1361                  * sequence numbers, so no further configuration required.
1362                  */
1363                 if (grp_alloc[i] == sequence_numbers &&
1364                     grp_occupancy[i] < total_slots)
1365                         return;
1366         }
1367
1368         /* None of the sequence number groups are configured for the requested
1369          * sequence numbers, so we have to reconfigure one of them. This is
1370          * only possible if a group is not in use.
1371          */
1372         for (i = 0; i < DLB_NUM_SN_GROUPS; i++) {
1373                 if (grp_occupancy[i] == 0)
1374                         break;
1375         }
1376
1377         if (i == DLB_NUM_SN_GROUPS) {
1378                 DLB_LOG_ERR("[%s()] No groups with %d sequence_numbers are available or have free slots\n",
1379                        __func__, sequence_numbers);
1380                 return;
1381         }
1382
1383         /* Attempt to configure slot i with the requested number of sequence
1384          * numbers. Ignore the return value -- if this fails, the error will be
1385          * caught during subsequent queue configuration.
1386          */
1387         dlb_set_sn_allocation(dlb, i, sequence_numbers);
1388 }
1389
1390 static int
1391 dlb_eventdev_ldb_queue_setup(struct rte_eventdev *dev,
1392                              struct dlb_eventdev_queue *ev_queue,
1393                              const struct rte_event_queue_conf *queue_conf)
1394 {
1395         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1396         int32_t qm_qid;
1397
1398         if (queue_conf->nb_atomic_order_sequences)
1399                 dlb_program_sn_allocation(dlb, queue_conf);
1400
1401         qm_qid = dlb_hw_create_ldb_queue(dlb,
1402                                          &ev_queue->qm_queue,
1403                                          queue_conf);
1404         if (qm_qid < 0) {
1405                 DLB_LOG_ERR("Failed to create the load-balanced queue\n");
1406
1407                 return qm_qid;
1408         }
1409
1410         dlb->qm_ldb_to_ev_queue_id[qm_qid] = ev_queue->id;
1411
1412         ev_queue->qm_queue.id = qm_qid;
1413
1414         return 0;
1415 }
1416
1417 static int dlb_num_dir_queues_setup(struct dlb_eventdev *dlb)
1418 {
1419         int i, num = 0;
1420
1421         for (i = 0; i < dlb->num_queues; i++) {
1422                 if (dlb->ev_queues[i].setup_done &&
1423                     dlb->ev_queues[i].qm_queue.is_directed)
1424                         num++;
1425         }
1426
1427         return num;
1428 }
1429
1430 static void
1431 dlb_queue_link_teardown(struct dlb_eventdev *dlb,
1432                         struct dlb_eventdev_queue *ev_queue)
1433 {
1434         struct dlb_eventdev_port *ev_port;
1435         int i, j;
1436
1437         for (i = 0; i < dlb->num_ports; i++) {
1438                 ev_port = &dlb->ev_ports[i];
1439
1440                 for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++) {
1441                         if (!ev_port->link[j].valid ||
1442                             ev_port->link[j].queue_id != ev_queue->id)
1443                                 continue;
1444
1445                         ev_port->link[j].valid = false;
1446                         ev_port->num_links--;
1447                 }
1448         }
1449
1450         ev_queue->num_links = 0;
1451 }
1452
1453 static int
1454 dlb_eventdev_queue_setup(struct rte_eventdev *dev,
1455                          uint8_t ev_qid,
1456                          const struct rte_event_queue_conf *queue_conf)
1457 {
1458         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1459         struct dlb_eventdev_queue *ev_queue;
1460         int ret;
1461
1462         if (queue_conf == NULL)
1463                 return -EINVAL;
1464
1465         if (ev_qid >= dlb->num_queues)
1466                 return -EINVAL;
1467
1468         ev_queue = &dlb->ev_queues[ev_qid];
1469
1470         ev_queue->qm_queue.is_directed = queue_conf->event_queue_cfg &
1471                 RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
1472         ev_queue->id = ev_qid;
1473         ev_queue->conf = *queue_conf;
1474
1475         if (!ev_queue->qm_queue.is_directed) {
1476                 ret = dlb_eventdev_ldb_queue_setup(dev, ev_queue, queue_conf);
1477         } else {
1478                 /* The directed queue isn't setup until link time, at which
1479                  * point we know its directed port ID. Directed queue setup
1480                  * will only fail if this queue is already setup or there are
1481                  * no directed queues left to configure.
1482                  */
1483                 ret = 0;
1484
1485                 ev_queue->qm_queue.config_state = DLB_NOT_CONFIGURED;
1486
1487                 if (ev_queue->setup_done ||
1488                     dlb_num_dir_queues_setup(dlb) == dlb->num_dir_queues)
1489                         ret = -EINVAL;
1490         }
1491
1492         /* Tear down pre-existing port->queue links */
1493         if (!ret && dlb->run_state == DLB_RUN_STATE_STOPPED)
1494                 dlb_queue_link_teardown(dlb, ev_queue);
1495
1496         if (!ret)
1497                 ev_queue->setup_done = true;
1498
1499         return ret;
1500 }
1501
1502 static void
1503 dlb_port_link_teardown(struct dlb_eventdev *dlb,
1504                        struct dlb_eventdev_port *ev_port)
1505 {
1506         struct dlb_eventdev_queue *ev_queue;
1507         int i;
1508
1509         for (i = 0; i < DLB_MAX_NUM_QIDS_PER_LDB_CQ; i++) {
1510                 if (!ev_port->link[i].valid)
1511                         continue;
1512
1513                 ev_queue = &dlb->ev_queues[ev_port->link[i].queue_id];
1514
1515                 ev_port->link[i].valid = false;
1516                 ev_port->num_links--;
1517                 ev_queue->num_links--;
1518         }
1519 }
1520
1521 static int
1522 dlb_eventdev_port_setup(struct rte_eventdev *dev,
1523                         uint8_t ev_port_id,
1524                         const struct rte_event_port_conf *port_conf)
1525 {
1526         struct dlb_eventdev *dlb;
1527         struct dlb_eventdev_port *ev_port;
1528         bool use_rsvd_token_scheme;
1529         uint32_t adj_cq_depth;
1530         uint16_t rsvd_tokens;
1531         int ret;
1532
1533         if (dev == NULL || port_conf == NULL) {
1534                 DLB_LOG_ERR("Null parameter\n");
1535                 return -EINVAL;
1536         }
1537
1538         dlb = dlb_pmd_priv(dev);
1539
1540         if (ev_port_id >= DLB_MAX_NUM_PORTS)
1541                 return -EINVAL;
1542
1543         if (port_conf->dequeue_depth >
1544                 evdev_dlb_default_info.max_event_port_dequeue_depth ||
1545             port_conf->enqueue_depth >
1546                 evdev_dlb_default_info.max_event_port_enqueue_depth)
1547                 return -EINVAL;
1548
1549         ev_port = &dlb->ev_ports[ev_port_id];
1550         /* configured? */
1551         if (ev_port->setup_done) {
1552                 DLB_LOG_ERR("evport %d is already configured\n", ev_port_id);
1553                 return -EINVAL;
1554         }
1555
1556         /* The reserved token interrupt arming scheme requires that one or more
1557          * CQ tokens be reserved by the PMD. This limits the amount of CQ space
1558          * usable by the DLB, so in order to give an *effective* CQ depth equal
1559          * to the user-requested value, we double CQ depth and reserve half of
1560          * its tokens. If the user requests the max CQ depth (256) then we
1561          * cannot double it, so we reserve one token and give an effective
1562          * depth of 255 entries.
1563          */
1564         use_rsvd_token_scheme = true;
1565         rsvd_tokens = 1;
1566         adj_cq_depth = port_conf->dequeue_depth;
1567
1568         if (use_rsvd_token_scheme && adj_cq_depth < 256) {
1569                 rsvd_tokens = adj_cq_depth;
1570                 adj_cq_depth *= 2;
1571         }
1572
1573         ev_port->qm_port.is_directed = port_conf->event_port_cfg &
1574                 RTE_EVENT_PORT_CFG_SINGLE_LINK;
1575
1576         if (!ev_port->qm_port.is_directed) {
1577                 ret = dlb_hw_create_ldb_port(dlb,
1578                                              ev_port,
1579                                              port_conf->dequeue_depth,
1580                                              adj_cq_depth,
1581                                              port_conf->enqueue_depth,
1582                                              rsvd_tokens,
1583                                              use_rsvd_token_scheme);
1584                 if (ret < 0) {
1585                         DLB_LOG_ERR("Failed to create the lB port ve portId=%d\n",
1586                                     ev_port_id);
1587                         return ret;
1588                 }
1589         } else {
1590                 ret = dlb_hw_create_dir_port(dlb,
1591                                              ev_port,
1592                                              port_conf->dequeue_depth,
1593                                              adj_cq_depth,
1594                                              port_conf->enqueue_depth,
1595                                              rsvd_tokens,
1596                                              use_rsvd_token_scheme);
1597                 if (ret < 0) {
1598                         DLB_LOG_ERR("Failed to create the DIR port\n");
1599                         return ret;
1600                 }
1601         }
1602
1603         /* Save off port config for reconfig */
1604         dlb->ev_ports[ev_port_id].conf = *port_conf;
1605
1606         dlb->ev_ports[ev_port_id].id = ev_port_id;
1607         dlb->ev_ports[ev_port_id].enq_configured = true;
1608         dlb->ev_ports[ev_port_id].setup_done = true;
1609         dlb->ev_ports[ev_port_id].inflight_max =
1610                 port_conf->new_event_threshold;
1611         dlb->ev_ports[ev_port_id].implicit_release =
1612                 !(port_conf->event_port_cfg &
1613                   RTE_EVENT_PORT_CFG_DISABLE_IMPL_REL);
1614         dlb->ev_ports[ev_port_id].outstanding_releases = 0;
1615         dlb->ev_ports[ev_port_id].inflight_credits = 0;
1616         dlb->ev_ports[ev_port_id].credit_update_quanta =
1617                 RTE_LIBRTE_PMD_DLB_SW_CREDIT_QUANTA;
1618         dlb->ev_ports[ev_port_id].dlb = dlb; /* reverse link */
1619
1620         /* Tear down pre-existing port->queue links */
1621         if (dlb->run_state == DLB_RUN_STATE_STOPPED)
1622                 dlb_port_link_teardown(dlb, &dlb->ev_ports[ev_port_id]);
1623
1624         dev->data->ports[ev_port_id] = &dlb->ev_ports[ev_port_id];
1625
1626         return 0;
1627 }
1628
1629 static int
1630 dlb_eventdev_reapply_configuration(struct rte_eventdev *dev)
1631 {
1632         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1633         int ret, i;
1634
1635         /* If an event queue or port was previously configured, but hasn't been
1636          * reconfigured, reapply its original configuration.
1637          */
1638         for (i = 0; i < dlb->num_queues; i++) {
1639                 struct dlb_eventdev_queue *ev_queue;
1640
1641                 ev_queue = &dlb->ev_queues[i];
1642
1643                 if (ev_queue->qm_queue.config_state != DLB_PREV_CONFIGURED)
1644                         continue;
1645
1646                 ret = dlb_eventdev_queue_setup(dev, i, &ev_queue->conf);
1647                 if (ret < 0) {
1648                         DLB_LOG_ERR("dlb: failed to reconfigure queue %d", i);
1649                         return ret;
1650                 }
1651         }
1652
1653         for (i = 0; i < dlb->num_ports; i++) {
1654                 struct dlb_eventdev_port *ev_port = &dlb->ev_ports[i];
1655
1656                 if (ev_port->qm_port.config_state != DLB_PREV_CONFIGURED)
1657                         continue;
1658
1659                 ret = dlb_eventdev_port_setup(dev, i, &ev_port->conf);
1660                 if (ret < 0) {
1661                         DLB_LOG_ERR("dlb: failed to reconfigure ev_port %d",
1662                                     i);
1663                         return ret;
1664                 }
1665         }
1666
1667         return 0;
1668 }
1669
1670 static int
1671 set_dev_id(const char *key __rte_unused,
1672            const char *value,
1673            void *opaque)
1674 {
1675         int *dev_id = opaque;
1676         int ret;
1677
1678         if (value == NULL || opaque == NULL) {
1679                 DLB_LOG_ERR("NULL pointer\n");
1680                 return -EINVAL;
1681         }
1682
1683         ret = dlb_string_to_int(dev_id, value);
1684         if (ret < 0)
1685                 return ret;
1686
1687         return 0;
1688 }
1689
1690 static int
1691 set_defer_sched(const char *key __rte_unused,
1692                 const char *value,
1693                 void *opaque)
1694 {
1695         int *defer_sched = opaque;
1696
1697         if (value == NULL || opaque == NULL) {
1698                 DLB_LOG_ERR("NULL pointer\n");
1699                 return -EINVAL;
1700         }
1701
1702         if (strncmp(value, "on", 2) != 0) {
1703                 DLB_LOG_ERR("Invalid defer_sched argument \"%s\" (expected \"on\")\n",
1704                             value);
1705                 return -EINVAL;
1706         }
1707
1708         *defer_sched = 1;
1709
1710         return 0;
1711 }
1712
1713 static int
1714 set_num_atm_inflights(const char *key __rte_unused,
1715                       const char *value,
1716                       void *opaque)
1717 {
1718         int *num_atm_inflights = opaque;
1719         int ret;
1720
1721         if (value == NULL || opaque == NULL) {
1722                 DLB_LOG_ERR("NULL pointer\n");
1723                 return -EINVAL;
1724         }
1725
1726         ret = dlb_string_to_int(num_atm_inflights, value);
1727         if (ret < 0)
1728                 return ret;
1729
1730         if (*num_atm_inflights < 0 ||
1731             *num_atm_inflights > DLB_MAX_NUM_ATM_INFLIGHTS) {
1732                 DLB_LOG_ERR("dlb: atm_inflights must be between 0 and %d\n",
1733                             DLB_MAX_NUM_ATM_INFLIGHTS);
1734                 return -EINVAL;
1735         }
1736
1737         return 0;
1738 }
1739
1740 static int
1741 dlb_validate_port_link(struct dlb_eventdev_port *ev_port,
1742                        uint8_t queue_id,
1743                        bool link_exists,
1744                        int index)
1745 {
1746         struct dlb_eventdev *dlb = ev_port->dlb;
1747         struct dlb_eventdev_queue *ev_queue;
1748         bool port_is_dir, queue_is_dir;
1749
1750         if (queue_id > dlb->num_queues) {
1751                 DLB_LOG_ERR("queue_id %d > num queues %d\n",
1752                             queue_id, dlb->num_queues);
1753                 rte_errno = -EINVAL;
1754                 return -1;
1755         }
1756
1757         ev_queue = &dlb->ev_queues[queue_id];
1758
1759         if (!ev_queue->setup_done &&
1760             ev_queue->qm_queue.config_state != DLB_PREV_CONFIGURED) {
1761                 DLB_LOG_ERR("setup not done and not previously configured\n");
1762                 rte_errno = -EINVAL;
1763                 return -1;
1764         }
1765
1766         port_is_dir = ev_port->qm_port.is_directed;
1767         queue_is_dir = ev_queue->qm_queue.is_directed;
1768
1769         if (port_is_dir != queue_is_dir) {
1770                 DLB_LOG_ERR("%s queue %u can't link to %s port %u\n",
1771                             queue_is_dir ? "DIR" : "LDB", ev_queue->id,
1772                             port_is_dir ? "DIR" : "LDB", ev_port->id);
1773
1774                 rte_errno = -EINVAL;
1775                 return -1;
1776         }
1777
1778         /* Check if there is space for the requested link */
1779         if (!link_exists && index == -1) {
1780                 DLB_LOG_ERR("no space for new link\n");
1781                 rte_errno = -ENOSPC;
1782                 return -1;
1783         }
1784
1785         /* Check if the directed port is already linked */
1786         if (ev_port->qm_port.is_directed && ev_port->num_links > 0 &&
1787             !link_exists) {
1788                 DLB_LOG_ERR("Can't link DIR port %d to >1 queues\n",
1789                             ev_port->id);
1790                 rte_errno = -EINVAL;
1791                 return -1;
1792         }
1793
1794         /* Check if the directed queue is already linked */
1795         if (ev_queue->qm_queue.is_directed && ev_queue->num_links > 0 &&
1796             !link_exists) {
1797                 DLB_LOG_ERR("Can't link DIR queue %d to >1 ports\n",
1798                             ev_queue->id);
1799                 rte_errno = -EINVAL;
1800                 return -1;
1801         }
1802
1803         return 0;
1804 }
1805
1806 static int32_t
1807 dlb_hw_create_dir_queue(struct dlb_eventdev *dlb, int32_t qm_port_id)
1808 {
1809         struct dlb_hw_dev *handle = &dlb->qm_instance;
1810         struct dlb_create_dir_queue_args cfg;
1811         struct dlb_cmd_response response;
1812         int32_t ret;
1813
1814         cfg.response = (uintptr_t)&response;
1815
1816         /* The directed port is always configured before its queue */
1817         cfg.port_id = qm_port_id;
1818
1819         ret = dlb_iface_dir_queue_create(handle, &cfg);
1820         if (ret < 0) {
1821                 DLB_LOG_ERR("dlb: create DIR event queue error, ret=%d (driver status: %s)\n",
1822                             ret, dlb_error_strings[response.status]);
1823                 return -EINVAL;
1824         }
1825
1826         return response.id;
1827 }
1828
1829 static int
1830 dlb_eventdev_dir_queue_setup(struct dlb_eventdev *dlb,
1831                              struct dlb_eventdev_queue *ev_queue,
1832                              struct dlb_eventdev_port *ev_port)
1833 {
1834         int32_t qm_qid;
1835
1836         qm_qid = dlb_hw_create_dir_queue(dlb, ev_port->qm_port.id);
1837
1838         if (qm_qid < 0) {
1839                 DLB_LOG_ERR("Failed to create the DIR queue\n");
1840                 return qm_qid;
1841         }
1842
1843         dlb->qm_dir_to_ev_queue_id[qm_qid] = ev_queue->id;
1844
1845         ev_queue->qm_queue.id = qm_qid;
1846
1847         return 0;
1848 }
1849
1850 static int16_t
1851 dlb_hw_map_ldb_qid_to_port(struct dlb_hw_dev *handle,
1852                            uint32_t qm_port_id,
1853                            uint16_t qm_qid,
1854                            uint8_t priority)
1855 {
1856         struct dlb_map_qid_args cfg;
1857         struct dlb_cmd_response response;
1858         int32_t ret;
1859
1860         if (handle == NULL)
1861                 return -EINVAL;
1862
1863         /* Build message */
1864         cfg.response = (uintptr_t)&response;
1865         cfg.port_id = qm_port_id;
1866         cfg.qid = qm_qid;
1867         cfg.priority = EV_TO_DLB_PRIO(priority);
1868
1869         ret = dlb_iface_map_qid(handle, &cfg);
1870         if (ret < 0) {
1871                 DLB_LOG_ERR("dlb: map qid error, ret=%d (driver status: %s)\n",
1872                             ret, dlb_error_strings[response.status]);
1873                 DLB_LOG_ERR("dlb: device_id=%d grp=%d, qm_port=%d, qm_qid=%d prio=%d\n",
1874                             handle->device_id,
1875                             handle->domain_id, cfg.port_id,
1876                             cfg.qid,
1877                             cfg.priority);
1878         } else {
1879                 DLB_LOG_DBG("dlb: mapped queue %d to qm_port %d\n",
1880                             qm_qid, qm_port_id);
1881         }
1882
1883         return ret;
1884 }
1885
1886 static int
1887 dlb_event_queue_join_ldb(struct dlb_eventdev *dlb,
1888                          struct dlb_eventdev_port *ev_port,
1889                          struct dlb_eventdev_queue *ev_queue,
1890                          uint8_t priority)
1891 {
1892         int first_avail = -1;
1893         int ret, i;
1894
1895         for (i = 0; i < DLB_MAX_NUM_QIDS_PER_LDB_CQ; i++) {
1896                 if (ev_port->link[i].valid) {
1897                         if (ev_port->link[i].queue_id == ev_queue->id &&
1898                             ev_port->link[i].priority == priority) {
1899                                 if (ev_port->link[i].mapped)
1900                                         return 0; /* already mapped */
1901                                 first_avail = i;
1902                         }
1903                 } else {
1904                         if (first_avail == -1)
1905                                 first_avail = i;
1906                 }
1907         }
1908         if (first_avail == -1) {
1909                 DLB_LOG_ERR("dlb: qm_port %d has no available QID slots.\n",
1910                             ev_port->qm_port.id);
1911                 return -EINVAL;
1912         }
1913
1914         ret = dlb_hw_map_ldb_qid_to_port(&dlb->qm_instance,
1915                                          ev_port->qm_port.id,
1916                                          ev_queue->qm_queue.id,
1917                                          priority);
1918
1919         if (!ret)
1920                 ev_port->link[first_avail].mapped = true;
1921
1922         return ret;
1923 }
1924
1925 static int
1926 dlb_do_port_link(struct rte_eventdev *dev,
1927                  struct dlb_eventdev_queue *ev_queue,
1928                  struct dlb_eventdev_port *ev_port,
1929                  uint8_t prio)
1930 {
1931         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1932         int err;
1933
1934         /* Don't link until start time. */
1935         if (dlb->run_state == DLB_RUN_STATE_STOPPED)
1936                 return 0;
1937
1938         if (ev_queue->qm_queue.is_directed)
1939                 err = dlb_eventdev_dir_queue_setup(dlb, ev_queue, ev_port);
1940         else
1941                 err = dlb_event_queue_join_ldb(dlb, ev_port, ev_queue, prio);
1942
1943         if (err) {
1944                 DLB_LOG_ERR("port link failure for %s ev_q %d, ev_port %d\n",
1945                             ev_queue->qm_queue.is_directed ? "DIR" : "LDB",
1946                             ev_queue->id, ev_port->id);
1947
1948                 rte_errno = err;
1949                 return -1;
1950         }
1951
1952         return 0;
1953 }
1954
1955 static int
1956 dlb_eventdev_apply_port_links(struct rte_eventdev *dev)
1957 {
1958         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1959         int i;
1960
1961         /* Perform requested port->queue links */
1962         for (i = 0; i < dlb->num_ports; i++) {
1963                 struct dlb_eventdev_port *ev_port = &dlb->ev_ports[i];
1964                 int j;
1965
1966                 for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++) {
1967                         struct dlb_eventdev_queue *ev_queue;
1968                         uint8_t prio, queue_id;
1969
1970                         if (!ev_port->link[j].valid)
1971                                 continue;
1972
1973                         prio = ev_port->link[j].priority;
1974                         queue_id = ev_port->link[j].queue_id;
1975
1976                         if (dlb_validate_port_link(ev_port, queue_id, true, j))
1977                                 return -EINVAL;
1978
1979                         ev_queue = &dlb->ev_queues[queue_id];
1980
1981                         if (dlb_do_port_link(dev, ev_queue, ev_port, prio))
1982                                 return -EINVAL;
1983                 }
1984         }
1985
1986         return 0;
1987 }
1988
1989 static int
1990 dlb_eventdev_port_link(struct rte_eventdev *dev, void *event_port,
1991                        const uint8_t queues[], const uint8_t priorities[],
1992                        uint16_t nb_links)
1993
1994 {
1995         struct dlb_eventdev_port *ev_port = event_port;
1996         struct dlb_eventdev *dlb;
1997         int i, j;
1998
1999         RTE_SET_USED(dev);
2000
2001         if (ev_port == NULL) {
2002                 DLB_LOG_ERR("dlb: evport not setup\n");
2003                 rte_errno = -EINVAL;
2004                 return 0;
2005         }
2006
2007         if (!ev_port->setup_done &&
2008             ev_port->qm_port.config_state != DLB_PREV_CONFIGURED) {
2009                 DLB_LOG_ERR("dlb: evport not setup\n");
2010                 rte_errno = -EINVAL;
2011                 return 0;
2012         }
2013
2014         /* Note: rte_event_port_link() ensures the PMD won't receive a NULL
2015          * queues pointer.
2016          */
2017         if (nb_links == 0) {
2018                 DLB_LOG_DBG("dlb: nb_links is 0\n");
2019                 return 0; /* Ignore and return success */
2020         }
2021
2022         dlb = ev_port->dlb;
2023
2024         DLB_LOG_DBG("Linking %u queues to %s port %d\n",
2025                     nb_links,
2026                     ev_port->qm_port.is_directed ? "DIR" : "LDB",
2027                     ev_port->id);
2028
2029         for (i = 0; i < nb_links; i++) {
2030                 struct dlb_eventdev_queue *ev_queue;
2031                 uint8_t queue_id, prio;
2032                 bool found = false;
2033                 int index = -1;
2034
2035                 queue_id = queues[i];
2036                 prio = priorities[i];
2037
2038                 /* Check if the link already exists. */
2039                 for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++)
2040                         if (ev_port->link[j].valid) {
2041                                 if (ev_port->link[j].queue_id == queue_id) {
2042                                         found = true;
2043                                         index = j;
2044                                         break;
2045                                 }
2046                         } else {
2047                                 if (index == -1)
2048                                         index = j;
2049                         }
2050
2051                 /* could not link */
2052                 if (index == -1)
2053                         break;
2054
2055                 /* Check if already linked at the requested priority */
2056                 if (found && ev_port->link[j].priority == prio)
2057                         continue;
2058
2059                 if (dlb_validate_port_link(ev_port, queue_id, found, index))
2060                         break; /* return index of offending queue */
2061
2062                 ev_queue = &dlb->ev_queues[queue_id];
2063
2064                 if (dlb_do_port_link(dev, ev_queue, ev_port, prio))
2065                         break; /* return index of offending queue */
2066
2067                 ev_queue->num_links++;
2068
2069                 ev_port->link[index].queue_id = queue_id;
2070                 ev_port->link[index].priority = prio;
2071                 ev_port->link[index].valid = true;
2072                 /* Entry already exists?  If so, then must be prio change */
2073                 if (!found)
2074                         ev_port->num_links++;
2075         }
2076         return i;
2077 }
2078
2079 static int
2080 dlb_eventdev_start(struct rte_eventdev *dev)
2081 {
2082         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
2083         struct dlb_hw_dev *handle = &dlb->qm_instance;
2084         struct dlb_start_domain_args cfg;
2085         struct dlb_cmd_response response;
2086         int ret, i;
2087
2088         rte_spinlock_lock(&dlb->qm_instance.resource_lock);
2089         if (dlb->run_state != DLB_RUN_STATE_STOPPED) {
2090                 DLB_LOG_ERR("bad state %d for dev_start\n",
2091                             (int)dlb->run_state);
2092                 rte_spinlock_unlock(&dlb->qm_instance.resource_lock);
2093                 return -EINVAL;
2094         }
2095         dlb->run_state  = DLB_RUN_STATE_STARTING;
2096         rte_spinlock_unlock(&dlb->qm_instance.resource_lock);
2097
2098         /* If the device was configured more than once, some event ports and/or
2099          * queues may need to be reconfigured.
2100          */
2101         ret = dlb_eventdev_reapply_configuration(dev);
2102         if (ret)
2103                 return ret;
2104
2105         /* The DLB PMD delays port links until the device is started. */
2106         ret = dlb_eventdev_apply_port_links(dev);
2107         if (ret)
2108                 return ret;
2109
2110         cfg.response = (uintptr_t)&response;
2111
2112         for (i = 0; i < dlb->num_ports; i++) {
2113                 if (!dlb->ev_ports[i].setup_done) {
2114                         DLB_LOG_ERR("dlb: port %d not setup", i);
2115                         return -ESTALE;
2116                 }
2117         }
2118
2119         for (i = 0; i < dlb->num_queues; i++) {
2120                 if (dlb->ev_queues[i].num_links == 0) {
2121                         DLB_LOG_ERR("dlb: queue %d is not linked", i);
2122                         return -ENOLINK;
2123                 }
2124         }
2125
2126         ret = dlb_iface_sched_domain_start(handle, &cfg);
2127         if (ret < 0) {
2128                 DLB_LOG_ERR("dlb: sched_domain_start ret=%d (driver status: %s)\n",
2129                             ret, dlb_error_strings[response.status]);
2130                 return ret;
2131         }
2132
2133         dlb->run_state = DLB_RUN_STATE_STARTED;
2134         DLB_LOG_DBG("dlb: sched_domain_start completed OK\n");
2135
2136         return 0;
2137 }
2138
2139 static inline int
2140 dlb_check_enqueue_sw_credits(struct dlb_eventdev *dlb,
2141                              struct dlb_eventdev_port *ev_port)
2142 {
2143         uint32_t sw_inflights = __atomic_load_n(&dlb->inflights,
2144                                                 __ATOMIC_SEQ_CST);
2145         const int num = 1;
2146
2147         if (unlikely(ev_port->inflight_max < sw_inflights)) {
2148                 DLB_INC_STAT(ev_port->stats.traffic.tx_nospc_inflight_max, 1);
2149                 rte_errno = -ENOSPC;
2150                 return 1;
2151         }
2152
2153         if (ev_port->inflight_credits < num) {
2154                 /* check if event enqueue brings ev_port over max threshold */
2155                 uint32_t credit_update_quanta = ev_port->credit_update_quanta;
2156
2157                 if (sw_inflights + credit_update_quanta >
2158                     dlb->new_event_limit) {
2159                         DLB_INC_STAT(
2160                                 ev_port->stats.traffic.tx_nospc_new_event_limit,
2161                                 1);
2162                         rte_errno = -ENOSPC;
2163                         return 1;
2164                 }
2165
2166                 __atomic_fetch_add(&dlb->inflights, credit_update_quanta,
2167                                    __ATOMIC_SEQ_CST);
2168                 ev_port->inflight_credits += (credit_update_quanta);
2169
2170                 if (ev_port->inflight_credits < num) {
2171                         DLB_INC_STAT(
2172                             ev_port->stats.traffic.tx_nospc_inflight_credits,
2173                             1);
2174                         rte_errno = -ENOSPC;
2175                         return 1;
2176                 }
2177         }
2178
2179         return 0;
2180 }
2181
2182 static inline void
2183 dlb_replenish_sw_credits(struct dlb_eventdev *dlb,
2184                          struct dlb_eventdev_port *ev_port)
2185 {
2186         uint16_t quanta = ev_port->credit_update_quanta;
2187
2188         if (ev_port->inflight_credits >= quanta * 2) {
2189                 /* Replenish credits, saving one quanta for enqueues */
2190                 uint16_t val = ev_port->inflight_credits - quanta;
2191
2192                 __atomic_fetch_sub(&dlb->inflights, val, __ATOMIC_SEQ_CST);
2193                 ev_port->inflight_credits -= val;
2194         }
2195 }
2196
2197 static __rte_always_inline uint16_t
2198 dlb_read_pc(struct process_local_port_data *port_data, bool ldb)
2199 {
2200         volatile uint16_t *popcount;
2201
2202         if (ldb)
2203                 popcount = port_data->ldb_popcount;
2204         else
2205                 popcount = port_data->dir_popcount;
2206
2207         return *popcount;
2208 }
2209
2210 static inline int
2211 dlb_check_enqueue_hw_ldb_credits(struct dlb_port *qm_port,
2212                                  struct process_local_port_data *port_data)
2213 {
2214         if (unlikely(qm_port->cached_ldb_credits == 0)) {
2215                 uint16_t pc;
2216
2217                 pc = dlb_read_pc(port_data, true);
2218
2219                 qm_port->cached_ldb_credits = pc -
2220                         qm_port->ldb_pushcount_at_credit_expiry;
2221                 if (unlikely(qm_port->cached_ldb_credits == 0)) {
2222                         DLB_INC_STAT(
2223                         qm_port->ev_port->stats.traffic.tx_nospc_ldb_hw_credits,
2224                         1);
2225
2226                         DLB_LOG_DBG("ldb credits exhausted\n");
2227                         return 1;
2228                 }
2229                 qm_port->ldb_pushcount_at_credit_expiry +=
2230                         qm_port->cached_ldb_credits;
2231         }
2232
2233         return 0;
2234 }
2235
2236 static inline int
2237 dlb_check_enqueue_hw_dir_credits(struct dlb_port *qm_port,
2238                                  struct process_local_port_data *port_data)
2239 {
2240         if (unlikely(qm_port->cached_dir_credits == 0)) {
2241                 uint16_t pc;
2242
2243                 pc = dlb_read_pc(port_data, false);
2244
2245                 qm_port->cached_dir_credits = pc -
2246                         qm_port->dir_pushcount_at_credit_expiry;
2247
2248                 if (unlikely(qm_port->cached_dir_credits == 0)) {
2249                         DLB_INC_STAT(
2250                         qm_port->ev_port->stats.traffic.tx_nospc_dir_hw_credits,
2251                         1);
2252
2253                         DLB_LOG_DBG("dir credits exhausted\n");
2254                         return 1;
2255                 }
2256                 qm_port->dir_pushcount_at_credit_expiry +=
2257                         qm_port->cached_dir_credits;
2258         }
2259
2260         return 0;
2261 }
2262
2263 static inline int
2264 dlb_event_enqueue_prep(struct dlb_eventdev_port *ev_port,
2265                        struct dlb_port *qm_port,
2266                        const struct rte_event ev[],
2267                        struct process_local_port_data *port_data,
2268                        uint8_t *sched_type,
2269                        uint8_t *queue_id)
2270 {
2271         struct dlb_eventdev *dlb = ev_port->dlb;
2272         struct dlb_eventdev_queue *ev_queue;
2273         uint16_t *cached_credits = NULL;
2274         struct dlb_queue *qm_queue;
2275
2276         ev_queue = &dlb->ev_queues[ev->queue_id];
2277         qm_queue = &ev_queue->qm_queue;
2278         *queue_id = qm_queue->id;
2279
2280         /* Ignore sched_type and hardware credits on release events */
2281         if (ev->op == RTE_EVENT_OP_RELEASE)
2282                 goto op_check;
2283
2284         if (!qm_queue->is_directed) {
2285                 /* Load balanced destination queue */
2286
2287                 if (dlb_check_enqueue_hw_ldb_credits(qm_port, port_data)) {
2288                         rte_errno = -ENOSPC;
2289                         return 1;
2290                 }
2291                 cached_credits = &qm_port->cached_ldb_credits;
2292
2293                 switch (ev->sched_type) {
2294                 case RTE_SCHED_TYPE_ORDERED:
2295                         DLB_LOG_DBG("dlb: put_qe: RTE_SCHED_TYPE_ORDERED\n");
2296                         if (qm_queue->sched_type != RTE_SCHED_TYPE_ORDERED) {
2297                                 DLB_LOG_ERR("dlb: tried to send ordered event to unordered queue %d\n",
2298                                             *queue_id);
2299                                 rte_errno = -EINVAL;
2300                                 return 1;
2301                         }
2302                         *sched_type = DLB_SCHED_ORDERED;
2303                         break;
2304                 case RTE_SCHED_TYPE_ATOMIC:
2305                         DLB_LOG_DBG("dlb: put_qe: RTE_SCHED_TYPE_ATOMIC\n");
2306                         *sched_type = DLB_SCHED_ATOMIC;
2307                         break;
2308                 case RTE_SCHED_TYPE_PARALLEL:
2309                         DLB_LOG_DBG("dlb: put_qe: RTE_SCHED_TYPE_PARALLEL\n");
2310                         if (qm_queue->sched_type == RTE_SCHED_TYPE_ORDERED)
2311                                 *sched_type = DLB_SCHED_ORDERED;
2312                         else
2313                                 *sched_type = DLB_SCHED_UNORDERED;
2314                         break;
2315                 default:
2316                         DLB_LOG_ERR("Unsupported LDB sched type in put_qe\n");
2317                         DLB_INC_STAT(ev_port->stats.tx_invalid, 1);
2318                         rte_errno = -EINVAL;
2319                         return 1;
2320                 }
2321         } else {
2322                 /* Directed destination queue */
2323
2324                 if (dlb_check_enqueue_hw_dir_credits(qm_port, port_data)) {
2325                         rte_errno = -ENOSPC;
2326                         return 1;
2327                 }
2328                 cached_credits = &qm_port->cached_dir_credits;
2329
2330                 DLB_LOG_DBG("dlb: put_qe: RTE_SCHED_TYPE_DIRECTED\n");
2331
2332                 *sched_type = DLB_SCHED_DIRECTED;
2333         }
2334
2335 op_check:
2336         switch (ev->op) {
2337         case RTE_EVENT_OP_NEW:
2338                 /* Check that a sw credit is available */
2339                 if (dlb_check_enqueue_sw_credits(dlb, ev_port)) {
2340                         rte_errno = -ENOSPC;
2341                         return 1;
2342                 }
2343                 ev_port->inflight_credits--;
2344                 (*cached_credits)--;
2345                 break;
2346         case RTE_EVENT_OP_FORWARD:
2347                 /* Check for outstanding_releases underflow. If this occurs,
2348                  * the application is not using the EVENT_OPs correctly; for
2349                  * example, forwarding or releasing events that were not
2350                  * dequeued.
2351                  */
2352                 RTE_ASSERT(ev_port->outstanding_releases > 0);
2353                 ev_port->outstanding_releases--;
2354                 qm_port->issued_releases++;
2355                 (*cached_credits)--;
2356                 break;
2357         case RTE_EVENT_OP_RELEASE:
2358                 ev_port->inflight_credits++;
2359                 /* Check for outstanding_releases underflow. If this occurs,
2360                  * the application is not using the EVENT_OPs correctly; for
2361                  * example, forwarding or releasing events that were not
2362                  * dequeued.
2363                  */
2364                 RTE_ASSERT(ev_port->outstanding_releases > 0);
2365                 ev_port->outstanding_releases--;
2366                 qm_port->issued_releases++;
2367                 /* Replenish s/w credits if enough are cached */
2368                 dlb_replenish_sw_credits(dlb, ev_port);
2369                 break;
2370         }
2371
2372         DLB_INC_STAT(ev_port->stats.tx_op_cnt[ev->op], 1);
2373         DLB_INC_STAT(ev_port->stats.traffic.tx_ok, 1);
2374
2375 #ifndef RTE_LIBRTE_PMD_DLB_QUELL_STATS
2376         if (ev->op != RTE_EVENT_OP_RELEASE) {
2377                 DLB_INC_STAT(ev_port->stats.enq_ok[ev->queue_id], 1);
2378                 DLB_INC_STAT(ev_port->stats.tx_sched_cnt[*sched_type], 1);
2379         }
2380 #endif
2381
2382         return 0;
2383 }
2384
2385 static uint8_t cmd_byte_map[NUM_DLB_PORT_TYPES][DLB_NUM_HW_SCHED_TYPES] = {
2386         {
2387                 /* Load-balanced cmd bytes */
2388                 [RTE_EVENT_OP_NEW] = DLB_NEW_CMD_BYTE,
2389                 [RTE_EVENT_OP_FORWARD] = DLB_FWD_CMD_BYTE,
2390                 [RTE_EVENT_OP_RELEASE] = DLB_COMP_CMD_BYTE,
2391         },
2392         {
2393                 /* Directed cmd bytes */
2394                 [RTE_EVENT_OP_NEW] = DLB_NEW_CMD_BYTE,
2395                 [RTE_EVENT_OP_FORWARD] = DLB_NEW_CMD_BYTE,
2396                 [RTE_EVENT_OP_RELEASE] = DLB_NOOP_CMD_BYTE,
2397         },
2398 };
2399
2400 static inline void
2401 dlb_event_build_hcws(struct dlb_port *qm_port,
2402                      const struct rte_event ev[],
2403                      int num,
2404                      uint8_t *sched_type,
2405                      uint8_t *queue_id)
2406 {
2407         struct dlb_enqueue_qe *qe;
2408         uint16_t sched_word[4];
2409         __m128i sse_qe[2];
2410         int i;
2411
2412         qe = qm_port->qe4;
2413
2414         sse_qe[0] = _mm_setzero_si128();
2415         sse_qe[1] = _mm_setzero_si128();
2416
2417         switch (num) {
2418         case 4:
2419                 /* Construct the metadata portion of two HCWs in one 128b SSE
2420                  * register. HCW metadata is constructed in the SSE registers
2421                  * like so:
2422                  * sse_qe[0][63:0]:   qe[0]'s metadata
2423                  * sse_qe[0][127:64]: qe[1]'s metadata
2424                  * sse_qe[1][63:0]:   qe[2]'s metadata
2425                  * sse_qe[1][127:64]: qe[3]'s metadata
2426                  */
2427
2428                 /* Convert the event operation into a command byte and store it
2429                  * in the metadata:
2430                  * sse_qe[0][63:56]   = cmd_byte_map[is_directed][ev[0].op]
2431                  * sse_qe[0][127:120] = cmd_byte_map[is_directed][ev[1].op]
2432                  * sse_qe[1][63:56]   = cmd_byte_map[is_directed][ev[2].op]
2433                  * sse_qe[1][127:120] = cmd_byte_map[is_directed][ev[3].op]
2434                  */
2435 #define DLB_QE_CMD_BYTE 7
2436                 sse_qe[0] = _mm_insert_epi8(sse_qe[0],
2437                                 cmd_byte_map[qm_port->is_directed][ev[0].op],
2438                                 DLB_QE_CMD_BYTE);
2439                 sse_qe[0] = _mm_insert_epi8(sse_qe[0],
2440                                 cmd_byte_map[qm_port->is_directed][ev[1].op],
2441                                 DLB_QE_CMD_BYTE + 8);
2442                 sse_qe[1] = _mm_insert_epi8(sse_qe[1],
2443                                 cmd_byte_map[qm_port->is_directed][ev[2].op],
2444                                 DLB_QE_CMD_BYTE);
2445                 sse_qe[1] = _mm_insert_epi8(sse_qe[1],
2446                                 cmd_byte_map[qm_port->is_directed][ev[3].op],
2447                                 DLB_QE_CMD_BYTE + 8);
2448
2449                 /* Store priority, scheduling type, and queue ID in the sched
2450                  * word array because these values are re-used when the
2451                  * destination is a directed queue.
2452                  */
2453                 sched_word[0] = EV_TO_DLB_PRIO(ev[0].priority) << 10 |
2454                                 sched_type[0] << 8 |
2455                                 queue_id[0];
2456                 sched_word[1] = EV_TO_DLB_PRIO(ev[1].priority) << 10 |
2457                                 sched_type[1] << 8 |
2458                                 queue_id[1];
2459                 sched_word[2] = EV_TO_DLB_PRIO(ev[2].priority) << 10 |
2460                                 sched_type[2] << 8 |
2461                                 queue_id[2];
2462                 sched_word[3] = EV_TO_DLB_PRIO(ev[3].priority) << 10 |
2463                                 sched_type[3] << 8 |
2464                                 queue_id[3];
2465
2466                 /* Store the event priority, scheduling type, and queue ID in
2467                  * the metadata:
2468                  * sse_qe[0][31:16] = sched_word[0]
2469                  * sse_qe[0][95:80] = sched_word[1]
2470                  * sse_qe[1][31:16] = sched_word[2]
2471                  * sse_qe[1][95:80] = sched_word[3]
2472                  */
2473 #define DLB_QE_QID_SCHED_WORD 1
2474                 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2475                                              sched_word[0],
2476                                              DLB_QE_QID_SCHED_WORD);
2477                 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2478                                              sched_word[1],
2479                                              DLB_QE_QID_SCHED_WORD + 4);
2480                 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2481                                              sched_word[2],
2482                                              DLB_QE_QID_SCHED_WORD);
2483                 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2484                                              sched_word[3],
2485                                              DLB_QE_QID_SCHED_WORD + 4);
2486
2487                 /* If the destination is a load-balanced queue, store the lock
2488                  * ID. If it is a directed queue, DLB places this field in
2489                  * bytes 10-11 of the received QE, so we format it accordingly:
2490                  * sse_qe[0][47:32]  = dir queue ? sched_word[0] : flow_id[0]
2491                  * sse_qe[0][111:96] = dir queue ? sched_word[1] : flow_id[1]
2492                  * sse_qe[1][47:32]  = dir queue ? sched_word[2] : flow_id[2]
2493                  * sse_qe[1][111:96] = dir queue ? sched_word[3] : flow_id[3]
2494                  */
2495 #define DLB_QE_LOCK_ID_WORD 2
2496                 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2497                                 (sched_type[0] == DLB_SCHED_DIRECTED) ?
2498                                         sched_word[0] : ev[0].flow_id,
2499                                 DLB_QE_LOCK_ID_WORD);
2500                 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2501                                 (sched_type[1] == DLB_SCHED_DIRECTED) ?
2502                                         sched_word[1] : ev[1].flow_id,
2503                                 DLB_QE_LOCK_ID_WORD + 4);
2504                 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2505                                 (sched_type[2] == DLB_SCHED_DIRECTED) ?
2506                                         sched_word[2] : ev[2].flow_id,
2507                                 DLB_QE_LOCK_ID_WORD);
2508                 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2509                                 (sched_type[3] == DLB_SCHED_DIRECTED) ?
2510                                         sched_word[3] : ev[3].flow_id,
2511                                 DLB_QE_LOCK_ID_WORD + 4);
2512
2513                 /* Store the event type and sub event type in the metadata:
2514                  * sse_qe[0][15:0]  = flow_id[0]
2515                  * sse_qe[0][79:64] = flow_id[1]
2516                  * sse_qe[1][15:0]  = flow_id[2]
2517                  * sse_qe[1][79:64] = flow_id[3]
2518                  */
2519 #define DLB_QE_EV_TYPE_WORD 0
2520                 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2521                                              ev[0].sub_event_type << 8 |
2522                                                 ev[0].event_type,
2523                                              DLB_QE_EV_TYPE_WORD);
2524                 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2525                                              ev[1].sub_event_type << 8 |
2526                                                 ev[1].event_type,
2527                                              DLB_QE_EV_TYPE_WORD + 4);
2528                 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2529                                              ev[2].sub_event_type << 8 |
2530                                                 ev[2].event_type,
2531                                              DLB_QE_EV_TYPE_WORD);
2532                 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2533                                              ev[3].sub_event_type << 8 |
2534                                                 ev[3].event_type,
2535                                              DLB_QE_EV_TYPE_WORD + 4);
2536
2537                 /* Store the metadata to memory (use the double-precision
2538                  * _mm_storeh_pd because there is no integer function for
2539                  * storing the upper 64b):
2540                  * qe[0] metadata = sse_qe[0][63:0]
2541                  * qe[1] metadata = sse_qe[0][127:64]
2542                  * qe[2] metadata = sse_qe[1][63:0]
2543                  * qe[3] metadata = sse_qe[1][127:64]
2544                  */
2545                 _mm_storel_epi64((__m128i *)&qe[0].u.opaque_data, sse_qe[0]);
2546                 _mm_storeh_pd((double *)&qe[1].u.opaque_data,
2547                               (__m128d) sse_qe[0]);
2548                 _mm_storel_epi64((__m128i *)&qe[2].u.opaque_data, sse_qe[1]);
2549                 _mm_storeh_pd((double *)&qe[3].u.opaque_data,
2550                               (__m128d) sse_qe[1]);
2551
2552                 qe[0].data = ev[0].u64;
2553                 qe[1].data = ev[1].u64;
2554                 qe[2].data = ev[2].u64;
2555                 qe[3].data = ev[3].u64;
2556
2557                 break;
2558         case 3:
2559         case 2:
2560         case 1:
2561                 for (i = 0; i < num; i++) {
2562                         qe[i].cmd_byte =
2563                                 cmd_byte_map[qm_port->is_directed][ev[i].op];
2564                         qe[i].sched_type = sched_type[i];
2565                         qe[i].data = ev[i].u64;
2566                         qe[i].qid = queue_id[i];
2567                         qe[i].priority = EV_TO_DLB_PRIO(ev[i].priority);
2568                         qe[i].lock_id = ev[i].flow_id;
2569                         if (sched_type[i] == DLB_SCHED_DIRECTED) {
2570                                 struct dlb_msg_info *info =
2571                                         (struct dlb_msg_info *)&qe[i].lock_id;
2572
2573                                 info->qid = queue_id[i];
2574                                 info->sched_type = DLB_SCHED_DIRECTED;
2575                                 info->priority = qe[i].priority;
2576                         }
2577                         qe[i].u.event_type.major = ev[i].event_type;
2578                         qe[i].u.event_type.sub = ev[i].sub_event_type;
2579                 }
2580                 break;
2581         case 0:
2582                 break;
2583         }
2584 }
2585
2586 static __rte_always_inline void
2587 dlb_pp_write(struct dlb_enqueue_qe *qe4,
2588              struct process_local_port_data *port_data)
2589 {
2590         dlb_movdir64b(port_data->pp_addr, qe4);
2591 }
2592
2593 static inline void
2594 dlb_hw_do_enqueue(struct dlb_port *qm_port,
2595                   bool do_sfence,
2596                   struct process_local_port_data *port_data)
2597 {
2598         DLB_LOG_DBG("dlb: Flushing QE(s) to DLB\n");
2599
2600         /* Since MOVDIR64B is weakly-ordered, use an SFENCE to ensure that
2601          * application writes complete before enqueueing the release HCW.
2602          */
2603         if (do_sfence)
2604                 rte_wmb();
2605
2606         dlb_pp_write(qm_port->qe4, port_data);
2607 }
2608
2609 static inline int
2610 dlb_consume_qe_immediate(struct dlb_port *qm_port, int num)
2611 {
2612         struct process_local_port_data *port_data;
2613         struct dlb_cq_pop_qe *qe;
2614
2615         RTE_ASSERT(qm_port->config_state == DLB_CONFIGURED);
2616
2617         if (qm_port->use_rsvd_token_scheme) {
2618                 /* Check if there's a deficit of reserved tokens, and return
2619                  * early if there are no (unreserved) tokens to consume.
2620                  */
2621                 if (num <= qm_port->cq_rsvd_token_deficit) {
2622                         qm_port->cq_rsvd_token_deficit -= num;
2623                         qm_port->owed_tokens = 0;
2624                         return 0;
2625                 }
2626                 num -= qm_port->cq_rsvd_token_deficit;
2627                 qm_port->cq_rsvd_token_deficit = 0;
2628         }
2629
2630         qe = qm_port->consume_qe;
2631
2632         qe->tokens = num - 1;
2633         qe->int_arm = 0;
2634
2635         /* No store fence needed since no pointer is being sent, and CQ token
2636          * pops can be safely reordered with other HCWs.
2637          */
2638         port_data = &dlb_port[qm_port->id][PORT_TYPE(qm_port)];
2639
2640         dlb_movntdq_single(port_data->pp_addr, qe);
2641
2642         DLB_LOG_DBG("dlb: consume immediate - %d QEs\n", num);
2643
2644         qm_port->owed_tokens = 0;
2645
2646         return 0;
2647 }
2648
2649 static inline uint16_t
2650 __dlb_event_enqueue_burst(void *event_port,
2651                           const struct rte_event events[],
2652                           uint16_t num)
2653 {
2654         struct dlb_eventdev_port *ev_port = event_port;
2655         struct dlb_port *qm_port = &ev_port->qm_port;
2656         struct process_local_port_data *port_data;
2657         int i;
2658
2659         RTE_ASSERT(ev_port->enq_configured);
2660         RTE_ASSERT(events != NULL);
2661
2662         rte_errno = 0;
2663         i = 0;
2664
2665         port_data = &dlb_port[qm_port->id][PORT_TYPE(qm_port)];
2666
2667         while (i < num) {
2668                 uint8_t sched_types[DLB_NUM_QES_PER_CACHE_LINE];
2669                 uint8_t queue_ids[DLB_NUM_QES_PER_CACHE_LINE];
2670                 int pop_offs = 0;
2671                 int j = 0;
2672
2673                 memset(qm_port->qe4,
2674                        0,
2675                        DLB_NUM_QES_PER_CACHE_LINE *
2676                        sizeof(struct dlb_enqueue_qe));
2677
2678                 for (; j < DLB_NUM_QES_PER_CACHE_LINE && (i + j) < num; j++) {
2679                         const struct rte_event *ev = &events[i + j];
2680
2681                         if (dlb_event_enqueue_prep(ev_port, qm_port, ev,
2682                                                    port_data, &sched_types[j],
2683                                                    &queue_ids[j]))
2684                                 break;
2685                 }
2686
2687                 if (j == 0)
2688                         break;
2689
2690                 dlb_event_build_hcws(qm_port, &events[i], j - pop_offs,
2691                                      sched_types, queue_ids);
2692
2693                 dlb_hw_do_enqueue(qm_port, i == 0, port_data);
2694
2695                 /* Don't include the token pop QE in the enqueue count */
2696                 i += j - pop_offs;
2697
2698                 /* Don't interpret j < DLB_NUM_... as out-of-credits if
2699                  * pop_offs != 0
2700                  */
2701                 if (j < DLB_NUM_QES_PER_CACHE_LINE && pop_offs == 0)
2702                         break;
2703         }
2704
2705         RTE_ASSERT(!((i == 0 && rte_errno != -ENOSPC)));
2706
2707         return i;
2708 }
2709
2710 static inline uint16_t
2711 dlb_event_enqueue_burst(void *event_port,
2712                         const struct rte_event events[],
2713                         uint16_t num)
2714 {
2715         return __dlb_event_enqueue_burst(event_port, events, num);
2716 }
2717
2718 static inline uint16_t
2719 dlb_event_enqueue(void *event_port,
2720                   const struct rte_event events[])
2721 {
2722         return __dlb_event_enqueue_burst(event_port, events, 1);
2723 }
2724
2725 static uint16_t
2726 dlb_event_enqueue_new_burst(void *event_port,
2727                             const struct rte_event events[],
2728                             uint16_t num)
2729 {
2730         return __dlb_event_enqueue_burst(event_port, events, num);
2731 }
2732
2733 static uint16_t
2734 dlb_event_enqueue_forward_burst(void *event_port,
2735                                 const struct rte_event events[],
2736                                 uint16_t num)
2737 {
2738         return __dlb_event_enqueue_burst(event_port, events, num);
2739 }
2740
2741 static __rte_always_inline int
2742 dlb_recv_qe(struct dlb_port *qm_port, struct dlb_dequeue_qe *qe,
2743             uint8_t *offset)
2744 {
2745         uint8_t xor_mask[2][4] = { {0x0F, 0x0E, 0x0C, 0x08},
2746                                    {0x00, 0x01, 0x03, 0x07} };
2747         uint8_t and_mask[4] = {0x0F, 0x0E, 0x0C, 0x08};
2748         volatile struct dlb_dequeue_qe *cq_addr;
2749         __m128i *qes = (__m128i *)qe;
2750         uint64_t *cache_line_base;
2751         uint8_t gen_bits;
2752
2753         cq_addr = dlb_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
2754         cq_addr = &cq_addr[qm_port->cq_idx];
2755
2756         cache_line_base = (void *)(((uintptr_t)cq_addr) & ~0x3F);
2757         *offset = ((uintptr_t)cq_addr & 0x30) >> 4;
2758
2759         /* Load the next CQ cache line from memory. Pack these reads as tight
2760          * as possible to reduce the chance that DLB invalidates the line while
2761          * the CPU is reading it. Read the cache line backwards to ensure that
2762          * if QE[N] (N > 0) is valid, then QEs[0:N-1] are too.
2763          *
2764          * (Valid QEs start at &qe[offset])
2765          */
2766         qes[3] = _mm_load_si128((__m128i *)&cache_line_base[6]);
2767         qes[2] = _mm_load_si128((__m128i *)&cache_line_base[4]);
2768         qes[1] = _mm_load_si128((__m128i *)&cache_line_base[2]);
2769         qes[0] = _mm_load_si128((__m128i *)&cache_line_base[0]);
2770
2771         /* Evict the cache line ASAP */
2772         rte_cldemote(cache_line_base);
2773
2774         /* Extract and combine the gen bits */
2775         gen_bits = ((_mm_extract_epi8(qes[0], 15) & 0x1) << 0) |
2776                    ((_mm_extract_epi8(qes[1], 15) & 0x1) << 1) |
2777                    ((_mm_extract_epi8(qes[2], 15) & 0x1) << 2) |
2778                    ((_mm_extract_epi8(qes[3], 15) & 0x1) << 3);
2779
2780         /* XOR the combined bits such that a 1 represents a valid QE */
2781         gen_bits ^= xor_mask[qm_port->gen_bit][*offset];
2782
2783         /* Mask off gen bits we don't care about */
2784         gen_bits &= and_mask[*offset];
2785
2786         return __builtin_popcount(gen_bits);
2787 }
2788
2789 static inline void
2790 dlb_inc_cq_idx(struct dlb_port *qm_port, int cnt)
2791 {
2792         uint16_t idx = qm_port->cq_idx_unmasked + cnt;
2793
2794         qm_port->cq_idx_unmasked = idx;
2795         qm_port->cq_idx = idx & qm_port->cq_depth_mask;
2796         qm_port->gen_bit = (~(idx >> qm_port->gen_bit_shift)) & 0x1;
2797 }
2798
2799 static inline int
2800 dlb_process_dequeue_qes(struct dlb_eventdev_port *ev_port,
2801                         struct dlb_port *qm_port,
2802                         struct rte_event *events,
2803                         struct dlb_dequeue_qe *qes,
2804                         int cnt)
2805 {
2806         uint8_t *qid_mappings = qm_port->qid_mappings;
2807         int i, num;
2808
2809         RTE_SET_USED(ev_port);  /* avoids unused variable error */
2810
2811         for (i = 0, num = 0; i < cnt; i++) {
2812                 struct dlb_dequeue_qe *qe = &qes[i];
2813                 int sched_type_map[4] = {
2814                         [DLB_SCHED_ATOMIC] = RTE_SCHED_TYPE_ATOMIC,
2815                         [DLB_SCHED_UNORDERED] = RTE_SCHED_TYPE_PARALLEL,
2816                         [DLB_SCHED_ORDERED] = RTE_SCHED_TYPE_ORDERED,
2817                         [DLB_SCHED_DIRECTED] = RTE_SCHED_TYPE_ATOMIC,
2818                 };
2819
2820                 DLB_LOG_DBG("dequeue success, data = 0x%llx, qid=%d, event_type=%d, subevent=%d\npp_id = %d, sched_type = %d, qid = %d, err=%d\n",
2821                             (long long)qe->data, qe->qid,
2822                             qe->u.event_type.major,
2823                             qe->u.event_type.sub,
2824                             qe->pp_id, qe->sched_type, qe->qid, qe->error);
2825
2826                 /* Fill in event information.
2827                  * Note that flow_id must be embedded in the data by
2828                  * the app, such as the mbuf RSS hash field if the data
2829                  * buffer is a mbuf.
2830                  */
2831                 if (unlikely(qe->error)) {
2832                         DLB_LOG_ERR("QE error bit ON\n");
2833                         DLB_INC_STAT(ev_port->stats.traffic.rx_drop, 1);
2834                         dlb_consume_qe_immediate(qm_port, 1);
2835                         continue; /* Ignore */
2836                 }
2837
2838                 events[num].u64 = qe->data;
2839                 events[num].queue_id = qid_mappings[qe->qid];
2840                 events[num].priority = DLB_TO_EV_PRIO((uint8_t)qe->priority);
2841                 events[num].event_type = qe->u.event_type.major;
2842                 events[num].sub_event_type = qe->u.event_type.sub;
2843                 events[num].sched_type = sched_type_map[qe->sched_type];
2844                 DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qe->sched_type], 1);
2845                 num++;
2846         }
2847         DLB_INC_STAT(ev_port->stats.traffic.rx_ok, num);
2848
2849         return num;
2850 }
2851
2852 static inline int
2853 dlb_process_dequeue_four_qes(struct dlb_eventdev_port *ev_port,
2854                              struct dlb_port *qm_port,
2855                              struct rte_event *events,
2856                              struct dlb_dequeue_qe *qes)
2857 {
2858         int sched_type_map[] = {
2859                 [DLB_SCHED_ATOMIC] = RTE_SCHED_TYPE_ATOMIC,
2860                 [DLB_SCHED_UNORDERED] = RTE_SCHED_TYPE_PARALLEL,
2861                 [DLB_SCHED_ORDERED] = RTE_SCHED_TYPE_ORDERED,
2862                 [DLB_SCHED_DIRECTED] = RTE_SCHED_TYPE_ATOMIC,
2863         };
2864         const int num_events = DLB_NUM_QES_PER_CACHE_LINE;
2865         uint8_t *qid_mappings = qm_port->qid_mappings;
2866         __m128i sse_evt[2];
2867         int i;
2868
2869         /* In the unlikely case that any of the QE error bits are set, process
2870          * them one at a time.
2871          */
2872         if (unlikely(qes[0].error || qes[1].error ||
2873                      qes[2].error || qes[3].error))
2874                 return dlb_process_dequeue_qes(ev_port, qm_port, events,
2875                                                qes, num_events);
2876
2877         for (i = 0; i < DLB_NUM_QES_PER_CACHE_LINE; i++) {
2878                 DLB_LOG_DBG("dequeue success, data = 0x%llx, qid=%d, event_type=%d, subevent=%d\npp_id = %d, sched_type = %d, qid = %d, err=%d\n",
2879                             (long long)qes[i].data, qes[i].qid,
2880                             qes[i].u.event_type.major,
2881                             qes[i].u.event_type.sub,
2882                             qes[i].pp_id, qes[i].sched_type, qes[i].qid,
2883                             qes[i].error);
2884         }
2885
2886         events[0].u64 = qes[0].data;
2887         events[1].u64 = qes[1].data;
2888         events[2].u64 = qes[2].data;
2889         events[3].u64 = qes[3].data;
2890
2891         /* Construct the metadata portion of two struct rte_events
2892          * in one 128b SSE register. Event metadata is constructed in the SSE
2893          * registers like so:
2894          * sse_evt[0][63:0]:   event[0]'s metadata
2895          * sse_evt[0][127:64]: event[1]'s metadata
2896          * sse_evt[1][63:0]:   event[2]'s metadata
2897          * sse_evt[1][127:64]: event[3]'s metadata
2898          */
2899         sse_evt[0] = _mm_setzero_si128();
2900         sse_evt[1] = _mm_setzero_si128();
2901
2902         /* Convert the hardware queue ID to an event queue ID and store it in
2903          * the metadata:
2904          * sse_evt[0][47:40]   = qid_mappings[qes[0].qid]
2905          * sse_evt[0][111:104] = qid_mappings[qes[1].qid]
2906          * sse_evt[1][47:40]   = qid_mappings[qes[2].qid]
2907          * sse_evt[1][111:104] = qid_mappings[qes[3].qid]
2908          */
2909 #define DLB_EVENT_QUEUE_ID_BYTE 5
2910         sse_evt[0] = _mm_insert_epi8(sse_evt[0],
2911                                      qid_mappings[qes[0].qid],
2912                                      DLB_EVENT_QUEUE_ID_BYTE);
2913         sse_evt[0] = _mm_insert_epi8(sse_evt[0],
2914                                      qid_mappings[qes[1].qid],
2915                                      DLB_EVENT_QUEUE_ID_BYTE + 8);
2916         sse_evt[1] = _mm_insert_epi8(sse_evt[1],
2917                                      qid_mappings[qes[2].qid],
2918                                      DLB_EVENT_QUEUE_ID_BYTE);
2919         sse_evt[1] = _mm_insert_epi8(sse_evt[1],
2920                                      qid_mappings[qes[3].qid],
2921                                      DLB_EVENT_QUEUE_ID_BYTE + 8);
2922
2923         /* Convert the hardware priority to an event priority and store it in
2924          * the metadata:
2925          * sse_evt[0][55:48]   = DLB_TO_EV_PRIO(qes[0].priority)
2926          * sse_evt[0][119:112] = DLB_TO_EV_PRIO(qes[1].priority)
2927          * sse_evt[1][55:48]   = DLB_TO_EV_PRIO(qes[2].priority)
2928          * sse_evt[1][119:112] = DLB_TO_EV_PRIO(qes[3].priority)
2929          */
2930 #define DLB_EVENT_PRIO_BYTE 6
2931         sse_evt[0] = _mm_insert_epi8(sse_evt[0],
2932                                      DLB_TO_EV_PRIO((uint8_t)qes[0].priority),
2933                                      DLB_EVENT_PRIO_BYTE);
2934         sse_evt[0] = _mm_insert_epi8(sse_evt[0],
2935                                      DLB_TO_EV_PRIO((uint8_t)qes[1].priority),
2936                                      DLB_EVENT_PRIO_BYTE + 8);
2937         sse_evt[1] = _mm_insert_epi8(sse_evt[1],
2938                                      DLB_TO_EV_PRIO((uint8_t)qes[2].priority),
2939                                      DLB_EVENT_PRIO_BYTE);
2940         sse_evt[1] = _mm_insert_epi8(sse_evt[1],
2941                                      DLB_TO_EV_PRIO((uint8_t)qes[3].priority),
2942                                      DLB_EVENT_PRIO_BYTE + 8);
2943
2944         /* Write the event type and sub event type to the event metadata. Leave
2945          * flow ID unspecified, since the hardware does not maintain it during
2946          * scheduling:
2947          * sse_evt[0][31:0]   = qes[0].u.event_type.major << 28 |
2948          *                      qes[0].u.event_type.sub << 20;
2949          * sse_evt[0][95:64]  = qes[1].u.event_type.major << 28 |
2950          *                      qes[1].u.event_type.sub << 20;
2951          * sse_evt[1][31:0]   = qes[2].u.event_type.major << 28 |
2952          *                      qes[2].u.event_type.sub << 20;
2953          * sse_evt[1][95:64]  = qes[3].u.event_type.major << 28 |
2954          *                      qes[3].u.event_type.sub << 20;
2955          */
2956 #define DLB_EVENT_EV_TYPE_DW 0
2957 #define DLB_EVENT_EV_TYPE_SHIFT 28
2958 #define DLB_EVENT_SUB_EV_TYPE_SHIFT 20
2959         sse_evt[0] = _mm_insert_epi32(sse_evt[0],
2960                         qes[0].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT |
2961                         qes[0].u.event_type.sub << DLB_EVENT_SUB_EV_TYPE_SHIFT,
2962                         DLB_EVENT_EV_TYPE_DW);
2963         sse_evt[0] = _mm_insert_epi32(sse_evt[0],
2964                         qes[1].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT |
2965                         qes[1].u.event_type.sub <<  DLB_EVENT_SUB_EV_TYPE_SHIFT,
2966                         DLB_EVENT_EV_TYPE_DW + 2);
2967         sse_evt[1] = _mm_insert_epi32(sse_evt[1],
2968                         qes[2].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT |
2969                         qes[2].u.event_type.sub <<  DLB_EVENT_SUB_EV_TYPE_SHIFT,
2970                         DLB_EVENT_EV_TYPE_DW);
2971         sse_evt[1] = _mm_insert_epi32(sse_evt[1],
2972                         qes[3].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT  |
2973                         qes[3].u.event_type.sub << DLB_EVENT_SUB_EV_TYPE_SHIFT,
2974                         DLB_EVENT_EV_TYPE_DW + 2);
2975
2976         /* Write the sched type to the event metadata. 'op' and 'rsvd' are not
2977          * set:
2978          * sse_evt[0][39:32]  = sched_type_map[qes[0].sched_type] << 6
2979          * sse_evt[0][103:96] = sched_type_map[qes[1].sched_type] << 6
2980          * sse_evt[1][39:32]  = sched_type_map[qes[2].sched_type] << 6
2981          * sse_evt[1][103:96] = sched_type_map[qes[3].sched_type] << 6
2982          */
2983 #define DLB_EVENT_SCHED_TYPE_BYTE 4
2984 #define DLB_EVENT_SCHED_TYPE_SHIFT 6
2985         sse_evt[0] = _mm_insert_epi8(sse_evt[0],
2986                 sched_type_map[qes[0].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
2987                 DLB_EVENT_SCHED_TYPE_BYTE);
2988         sse_evt[0] = _mm_insert_epi8(sse_evt[0],
2989                 sched_type_map[qes[1].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
2990                 DLB_EVENT_SCHED_TYPE_BYTE + 8);
2991         sse_evt[1] = _mm_insert_epi8(sse_evt[1],
2992                 sched_type_map[qes[2].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
2993                 DLB_EVENT_SCHED_TYPE_BYTE);
2994         sse_evt[1] = _mm_insert_epi8(sse_evt[1],
2995                 sched_type_map[qes[3].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
2996                 DLB_EVENT_SCHED_TYPE_BYTE + 8);
2997
2998         /* Store the metadata to the event (use the double-precision
2999          * _mm_storeh_pd because there is no integer function for storing the
3000          * upper 64b):
3001          * events[0].event = sse_evt[0][63:0]
3002          * events[1].event = sse_evt[0][127:64]
3003          * events[2].event = sse_evt[1][63:0]
3004          * events[3].event = sse_evt[1][127:64]
3005          */
3006         _mm_storel_epi64((__m128i *)&events[0].event, sse_evt[0]);
3007         _mm_storeh_pd((double *)&events[1].event, (__m128d) sse_evt[0]);
3008         _mm_storel_epi64((__m128i *)&events[2].event, sse_evt[1]);
3009         _mm_storeh_pd((double *)&events[3].event, (__m128d) sse_evt[1]);
3010
3011         DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qes[0].sched_type], 1);
3012         DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qes[1].sched_type], 1);
3013         DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qes[2].sched_type], 1);
3014         DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qes[3].sched_type], 1);
3015
3016         DLB_INC_STAT(ev_port->stats.traffic.rx_ok, num_events);
3017
3018         return num_events;
3019 }
3020
3021 static inline int
3022 dlb_dequeue_wait(struct dlb_eventdev *dlb,
3023                  struct dlb_eventdev_port *ev_port,
3024                  struct dlb_port *qm_port,
3025                  uint64_t timeout,
3026                  uint64_t start_ticks)
3027 {
3028         struct process_local_port_data *port_data;
3029         uint64_t elapsed_ticks;
3030
3031         port_data = &dlb_port[qm_port->id][PORT_TYPE(qm_port)];
3032
3033         elapsed_ticks = rte_get_timer_cycles() - start_ticks;
3034
3035         /* Wait/poll time expired */
3036         if (elapsed_ticks >= timeout) {
3037                 /* Interrupts not supported by PF PMD */
3038                 return 1;
3039         } else if (dlb->umwait_allowed) {
3040                 volatile struct dlb_dequeue_qe *cq_base;
3041                 union {
3042                         uint64_t raw_qe[2];
3043                         struct dlb_dequeue_qe qe;
3044                 } qe_mask;
3045                 uint64_t expected_value;
3046                 volatile uint64_t *monitor_addr;
3047
3048                 qe_mask.qe.cq_gen = 1; /* set mask */
3049
3050                 cq_base = port_data->cq_base;
3051                 monitor_addr = (volatile uint64_t *)(volatile void *)
3052                         &cq_base[qm_port->cq_idx];
3053                 monitor_addr++; /* cq_gen bit is in second 64bit location */
3054
3055                 if (qm_port->gen_bit)
3056                         expected_value = qe_mask.raw_qe[1];
3057                 else
3058                         expected_value = 0;
3059
3060                 rte_power_monitor(monitor_addr, expected_value,
3061                                   qe_mask.raw_qe[1], timeout + start_ticks,
3062                                   sizeof(uint64_t));
3063
3064                 DLB_INC_STAT(ev_port->stats.traffic.rx_umonitor_umwait, 1);
3065         } else {
3066                 uint64_t poll_interval = RTE_LIBRTE_PMD_DLB_POLL_INTERVAL;
3067                 uint64_t curr_ticks = rte_get_timer_cycles();
3068                 uint64_t init_ticks = curr_ticks;
3069
3070                 while ((curr_ticks - start_ticks < timeout) &&
3071                        (curr_ticks - init_ticks < poll_interval))
3072                         curr_ticks = rte_get_timer_cycles();
3073         }
3074
3075         return 0;
3076 }
3077
3078 static inline int16_t
3079 dlb_hw_dequeue(struct dlb_eventdev *dlb,
3080                struct dlb_eventdev_port *ev_port,
3081                struct rte_event *events,
3082                uint16_t max_num,
3083                uint64_t dequeue_timeout_ticks)
3084 {
3085         uint64_t timeout;
3086         uint64_t start_ticks = 0ULL;
3087         struct dlb_port *qm_port;
3088         int num = 0;
3089
3090         qm_port = &ev_port->qm_port;
3091
3092         /* If configured for per dequeue wait, then use wait value provided
3093          * to this API. Otherwise we must use the global
3094          * value from eventdev config time.
3095          */
3096         if (!dlb->global_dequeue_wait)
3097                 timeout = dequeue_timeout_ticks;
3098         else
3099                 timeout = dlb->global_dequeue_wait_ticks;
3100
3101         if (timeout)
3102                 start_ticks = rte_get_timer_cycles();
3103
3104         while (num < max_num) {
3105                 struct dlb_dequeue_qe qes[DLB_NUM_QES_PER_CACHE_LINE];
3106                 uint8_t offset;
3107                 int num_avail;
3108
3109                 /* Copy up to 4 QEs from the current cache line into qes */
3110                 num_avail = dlb_recv_qe(qm_port, qes, &offset);
3111
3112                 /* But don't process more than the user requested */
3113                 num_avail = RTE_MIN(num_avail, max_num - num);
3114
3115                 dlb_inc_cq_idx(qm_port, num_avail);
3116
3117                 if (num_avail == DLB_NUM_QES_PER_CACHE_LINE)
3118                         num += dlb_process_dequeue_four_qes(ev_port,
3119                                                              qm_port,
3120                                                              &events[num],
3121                                                              &qes[offset]);
3122                 else if (num_avail)
3123                         num += dlb_process_dequeue_qes(ev_port,
3124                                                         qm_port,
3125                                                         &events[num],
3126                                                         &qes[offset],
3127                                                         num_avail);
3128                 else if ((timeout == 0) || (num > 0))
3129                         /* Not waiting in any form, or 1+ events received? */
3130                         break;
3131                 else if (dlb_dequeue_wait(dlb, ev_port, qm_port,
3132                                           timeout, start_ticks))
3133                         break;
3134         }
3135
3136         qm_port->owed_tokens += num;
3137
3138         dlb_consume_qe_immediate(qm_port, num);
3139
3140         ev_port->outstanding_releases += num;
3141
3142         return num;
3143 }
3144
3145 static __rte_always_inline int
3146 dlb_recv_qe_sparse(struct dlb_port *qm_port, struct dlb_dequeue_qe *qe)
3147 {
3148         volatile struct dlb_dequeue_qe *cq_addr;
3149         uint8_t xor_mask[2] = {0x0F, 0x00};
3150         const uint8_t and_mask = 0x0F;
3151         __m128i *qes = (__m128i *)qe;
3152         uint8_t gen_bits, gen_bit;
3153         uintptr_t addr[4];
3154         uint16_t idx;
3155
3156         cq_addr = dlb_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
3157
3158         idx = qm_port->cq_idx;
3159
3160         /* Load the next 4 QEs */
3161         addr[0] = (uintptr_t)&cq_addr[idx];
3162         addr[1] = (uintptr_t)&cq_addr[(idx +  4) & qm_port->cq_depth_mask];
3163         addr[2] = (uintptr_t)&cq_addr[(idx +  8) & qm_port->cq_depth_mask];
3164         addr[3] = (uintptr_t)&cq_addr[(idx + 12) & qm_port->cq_depth_mask];
3165
3166         /* Prefetch next batch of QEs (all CQs occupy minimum 8 cache lines) */
3167         rte_prefetch0(&cq_addr[(idx + 16) & qm_port->cq_depth_mask]);
3168         rte_prefetch0(&cq_addr[(idx + 20) & qm_port->cq_depth_mask]);
3169         rte_prefetch0(&cq_addr[(idx + 24) & qm_port->cq_depth_mask]);
3170         rte_prefetch0(&cq_addr[(idx + 28) & qm_port->cq_depth_mask]);
3171
3172         /* Correct the xor_mask for wrap-around QEs */
3173         gen_bit = qm_port->gen_bit;
3174         xor_mask[gen_bit] ^= !!((idx +  4) > qm_port->cq_depth_mask) << 1;
3175         xor_mask[gen_bit] ^= !!((idx +  8) > qm_port->cq_depth_mask) << 2;
3176         xor_mask[gen_bit] ^= !!((idx + 12) > qm_port->cq_depth_mask) << 3;
3177
3178         /* Read the cache lines backwards to ensure that if QE[N] (N > 0) is
3179          * valid, then QEs[0:N-1] are too.
3180          */
3181         qes[3] = _mm_load_si128((__m128i *)(void *)addr[3]);
3182         rte_compiler_barrier();
3183         qes[2] = _mm_load_si128((__m128i *)(void *)addr[2]);
3184         rte_compiler_barrier();
3185         qes[1] = _mm_load_si128((__m128i *)(void *)addr[1]);
3186         rte_compiler_barrier();
3187         qes[0] = _mm_load_si128((__m128i *)(void *)addr[0]);
3188
3189         /* Extract and combine the gen bits */
3190         gen_bits = ((_mm_extract_epi8(qes[0], 15) & 0x1) << 0) |
3191                    ((_mm_extract_epi8(qes[1], 15) & 0x1) << 1) |
3192                    ((_mm_extract_epi8(qes[2], 15) & 0x1) << 2) |
3193                    ((_mm_extract_epi8(qes[3], 15) & 0x1) << 3);
3194
3195         /* XOR the combined bits such that a 1 represents a valid QE */
3196         gen_bits ^= xor_mask[gen_bit];
3197
3198         /* Mask off gen bits we don't care about */
3199         gen_bits &= and_mask;
3200
3201         return __builtin_popcount(gen_bits);
3202 }
3203
3204 static inline int16_t
3205 dlb_hw_dequeue_sparse(struct dlb_eventdev *dlb,
3206                       struct dlb_eventdev_port *ev_port,
3207                       struct rte_event *events,
3208                       uint16_t max_num,
3209                       uint64_t dequeue_timeout_ticks)
3210 {
3211         uint64_t timeout;
3212         uint64_t start_ticks = 0ULL;
3213         struct dlb_port *qm_port;
3214         int num = 0;
3215
3216         qm_port = &ev_port->qm_port;
3217
3218         /* If configured for per dequeue wait, then use wait value provided
3219          * to this API. Otherwise we must use the global
3220          * value from eventdev config time.
3221          */
3222         if (!dlb->global_dequeue_wait)
3223                 timeout = dequeue_timeout_ticks;
3224         else
3225                 timeout = dlb->global_dequeue_wait_ticks;
3226
3227         if (timeout)
3228                 start_ticks = rte_get_timer_cycles();
3229
3230         while (num < max_num) {
3231                 struct dlb_dequeue_qe qes[DLB_NUM_QES_PER_CACHE_LINE];
3232                 int num_avail;
3233
3234                 /* Copy up to 4 QEs from the current cache line into qes */
3235                 num_avail = dlb_recv_qe_sparse(qm_port, qes);
3236
3237                 /* But don't process more than the user requested */
3238                 num_avail = RTE_MIN(num_avail, max_num - num);
3239
3240                 dlb_inc_cq_idx(qm_port, num_avail << 2);
3241
3242                 if (num_avail == DLB_NUM_QES_PER_CACHE_LINE)
3243                         num += dlb_process_dequeue_four_qes(ev_port,
3244                                                              qm_port,
3245                                                              &events[num],
3246                                                              &qes[0]);
3247                 else if (num_avail)
3248                         num += dlb_process_dequeue_qes(ev_port,
3249                                                         qm_port,
3250                                                         &events[num],
3251                                                         &qes[0],
3252                                                         num_avail);
3253                 else if ((timeout == 0) || (num > 0))
3254                         /* Not waiting in any form, or 1+ events received? */
3255                         break;
3256                 else if (dlb_dequeue_wait(dlb, ev_port, qm_port,
3257                                           timeout, start_ticks))
3258                         break;
3259         }
3260
3261         qm_port->owed_tokens += num;
3262
3263         dlb_consume_qe_immediate(qm_port, num);
3264
3265         ev_port->outstanding_releases += num;
3266
3267         return num;
3268 }
3269
3270 static int
3271 dlb_event_release(struct dlb_eventdev *dlb, uint8_t port_id, int n)
3272 {
3273         struct process_local_port_data *port_data;
3274         struct dlb_eventdev_port *ev_port;
3275         struct dlb_port *qm_port;
3276         int i;
3277
3278         if (port_id > dlb->num_ports) {
3279                 DLB_LOG_ERR("Invalid port id %d in dlb-event_release\n",
3280                             port_id);
3281                 rte_errno = -EINVAL;
3282                 return rte_errno;
3283         }
3284
3285         ev_port = &dlb->ev_ports[port_id];
3286         qm_port = &ev_port->qm_port;
3287         port_data = &dlb_port[qm_port->id][PORT_TYPE(qm_port)];
3288
3289         i = 0;
3290
3291         if (qm_port->is_directed) {
3292                 i = n;
3293                 goto sw_credit_update;
3294         }
3295
3296         while (i < n) {
3297                 int pop_offs = 0;
3298                 int j = 0;
3299
3300                 /* Zero-out QEs */
3301                 qm_port->qe4[0].cmd_byte = 0;
3302                 qm_port->qe4[1].cmd_byte = 0;
3303                 qm_port->qe4[2].cmd_byte = 0;
3304                 qm_port->qe4[3].cmd_byte = 0;
3305
3306                 for (; j < DLB_NUM_QES_PER_CACHE_LINE && (i + j) < n; j++) {
3307
3308                         qm_port->qe4[j].cmd_byte = DLB_COMP_CMD_BYTE;
3309                         qm_port->issued_releases++;
3310                 }
3311
3312                 dlb_hw_do_enqueue(qm_port, i == 0, port_data);
3313
3314                 /* Don't include the token pop QE in the release count */
3315                 i += j - pop_offs;
3316         }
3317
3318 sw_credit_update:
3319         /* each release returns one credit */
3320         if (!ev_port->outstanding_releases) {
3321                 DLB_LOG_ERR("Unrecoverable application error. Outstanding releases underflowed.\n");
3322                 rte_errno = -ENOTRECOVERABLE;
3323                 return rte_errno;
3324         }
3325
3326         ev_port->outstanding_releases -= i;
3327         ev_port->inflight_credits += i;
3328
3329         /* Replenish s/w credits if enough releases are performed */
3330         dlb_replenish_sw_credits(dlb, ev_port);
3331         return 0;
3332 }
3333
3334 static uint16_t
3335 dlb_event_dequeue_burst(void *event_port, struct rte_event *ev, uint16_t num,
3336                         uint64_t wait)
3337 {
3338         struct dlb_eventdev_port *ev_port = event_port;
3339         struct dlb_eventdev *dlb = ev_port->dlb;
3340         uint16_t cnt;
3341         int ret;
3342
3343         rte_errno = 0;
3344
3345         RTE_ASSERT(ev_port->setup_done);
3346         RTE_ASSERT(ev != NULL);
3347
3348         if (ev_port->implicit_release && ev_port->outstanding_releases > 0) {
3349                 uint16_t out_rels = ev_port->outstanding_releases;
3350
3351                 ret = dlb_event_release(dlb, ev_port->id, out_rels);
3352                 if (ret)
3353                         return(ret);
3354
3355                 DLB_INC_STAT(ev_port->stats.tx_implicit_rel, out_rels);
3356         }
3357
3358         cnt = dlb_hw_dequeue(dlb, ev_port, ev, num, wait);
3359
3360         DLB_INC_STAT(ev_port->stats.traffic.total_polls, 1);
3361         DLB_INC_STAT(ev_port->stats.traffic.zero_polls, ((cnt == 0) ? 1 : 0));
3362         return cnt;
3363 }
3364
3365 static uint16_t
3366 dlb_event_dequeue(void *event_port, struct rte_event *ev, uint64_t wait)
3367 {
3368         return dlb_event_dequeue_burst(event_port, ev, 1, wait);
3369 }
3370
3371 static uint16_t
3372 dlb_event_dequeue_burst_sparse(void *event_port, struct rte_event *ev,
3373                                uint16_t num, uint64_t wait)
3374 {
3375         struct dlb_eventdev_port *ev_port = event_port;
3376         struct dlb_eventdev *dlb = ev_port->dlb;
3377         uint16_t cnt;
3378         int ret;
3379
3380         rte_errno = 0;
3381
3382         RTE_ASSERT(ev_port->setup_done);
3383         RTE_ASSERT(ev != NULL);
3384
3385         if (ev_port->implicit_release && ev_port->outstanding_releases > 0) {
3386                 uint16_t out_rels = ev_port->outstanding_releases;
3387
3388                 ret = dlb_event_release(dlb, ev_port->id, out_rels);
3389                 if (ret)
3390                         return(ret);
3391
3392                 DLB_INC_STAT(ev_port->stats.tx_implicit_rel, out_rels);
3393         }
3394
3395         cnt = dlb_hw_dequeue_sparse(dlb, ev_port, ev, num, wait);
3396
3397         DLB_INC_STAT(ev_port->stats.traffic.total_polls, 1);
3398         DLB_INC_STAT(ev_port->stats.traffic.zero_polls, ((cnt == 0) ? 1 : 0));
3399         return cnt;
3400 }
3401
3402 static uint16_t
3403 dlb_event_dequeue_sparse(void *event_port, struct rte_event *ev, uint64_t wait)
3404 {
3405         return dlb_event_dequeue_burst_sparse(event_port, ev, 1, wait);
3406 }
3407
3408 void
3409 dlb_entry_points_init(struct rte_eventdev *dev)
3410 {
3411         struct dlb_eventdev *dlb;
3412
3413         static struct rte_eventdev_ops dlb_eventdev_entry_ops = {
3414                 .dev_infos_get    = dlb_eventdev_info_get,
3415                 .dev_configure    = dlb_eventdev_configure,
3416                 .dev_start        = dlb_eventdev_start,
3417                 .queue_def_conf   = dlb_eventdev_queue_default_conf_get,
3418                 .port_def_conf    = dlb_eventdev_port_default_conf_get,
3419                 .queue_setup      = dlb_eventdev_queue_setup,
3420                 .port_setup       = dlb_eventdev_port_setup,
3421                 .port_link        = dlb_eventdev_port_link,
3422                 .port_unlink      = dlb_eventdev_port_unlink,
3423                 .port_unlinks_in_progress =
3424                                     dlb_eventdev_port_unlinks_in_progress,
3425                 .dump             = dlb_eventdev_dump,
3426                 .xstats_get       = dlb_eventdev_xstats_get,
3427                 .xstats_get_names = dlb_eventdev_xstats_get_names,
3428                 .xstats_get_by_name = dlb_eventdev_xstats_get_by_name,
3429                 .xstats_reset       = dlb_eventdev_xstats_reset,
3430         };
3431
3432         /* Expose PMD's eventdev interface */
3433         dev->dev_ops = &dlb_eventdev_entry_ops;
3434
3435         dev->enqueue = dlb_event_enqueue;
3436         dev->enqueue_burst = dlb_event_enqueue_burst;
3437         dev->enqueue_new_burst = dlb_event_enqueue_new_burst;
3438         dev->enqueue_forward_burst = dlb_event_enqueue_forward_burst;
3439         dev->dequeue = dlb_event_dequeue;
3440         dev->dequeue_burst = dlb_event_dequeue_burst;
3441
3442         dlb = dev->data->dev_private;
3443
3444         if (dlb->poll_mode == DLB_CQ_POLL_MODE_SPARSE) {
3445                 dev->dequeue = dlb_event_dequeue_sparse;
3446                 dev->dequeue_burst = dlb_event_dequeue_burst_sparse;
3447         }
3448 }
3449
3450 int
3451 dlb_primary_eventdev_probe(struct rte_eventdev *dev,
3452                            const char *name,
3453                            struct dlb_devargs *dlb_args)
3454 {
3455         struct dlb_eventdev *dlb;
3456         int err;
3457
3458         dlb = dev->data->dev_private;
3459
3460         dlb->event_dev = dev; /* backlink */
3461
3462         evdev_dlb_default_info.driver_name = name;
3463
3464         dlb->max_num_events_override = dlb_args->max_num_events;
3465         dlb->num_dir_credits_override = dlb_args->num_dir_credits_override;
3466         dlb->defer_sched = dlb_args->defer_sched;
3467         dlb->num_atm_inflights_per_queue = dlb_args->num_atm_inflights;
3468
3469         /* Open the interface.
3470          * For vdev mode, this means open the dlb kernel module.
3471          */
3472         err = dlb_iface_open(&dlb->qm_instance, name);
3473         if (err < 0) {
3474                 DLB_LOG_ERR("could not open event hardware device, err=%d\n",
3475                             err);
3476                 return err;
3477         }
3478
3479         err = dlb_iface_get_device_version(&dlb->qm_instance, &dlb->revision);
3480         if (err < 0) {
3481                 DLB_LOG_ERR("dlb: failed to get the device version, err=%d\n",
3482                             err);
3483                 return err;
3484         }
3485
3486         err = dlb_hw_query_resources(dlb);
3487         if (err) {
3488                 DLB_LOG_ERR("get resources err=%d for %s\n", err, name);
3489                 return err;
3490         }
3491
3492         err = dlb_iface_get_cq_poll_mode(&dlb->qm_instance, &dlb->poll_mode);
3493         if (err < 0) {
3494                 DLB_LOG_ERR("dlb: failed to get the poll mode, err=%d\n", err);
3495                 return err;
3496         }
3497
3498         /* Complete xtstats runtime initialization */
3499         err = dlb_xstats_init(dlb);
3500         if (err) {
3501                 DLB_LOG_ERR("dlb: failed to init xstats, err=%d\n", err);
3502                 return err;
3503         }
3504
3505         rte_spinlock_init(&dlb->qm_instance.resource_lock);
3506
3507         dlb_iface_low_level_io_init(dlb);
3508
3509         dlb_entry_points_init(dev);
3510
3511         return 0;
3512 }
3513
3514 int
3515 dlb_secondary_eventdev_probe(struct rte_eventdev *dev,
3516                              const char *name)
3517 {
3518         struct dlb_eventdev *dlb;
3519         int err;
3520
3521         dlb = dev->data->dev_private;
3522
3523         evdev_dlb_default_info.driver_name = name;
3524
3525         err = dlb_iface_open(&dlb->qm_instance, name);
3526         if (err < 0) {
3527                 DLB_LOG_ERR("could not open event hardware device, err=%d\n",
3528                             err);
3529                 return err;
3530         }
3531
3532         err = dlb_hw_query_resources(dlb);
3533         if (err) {
3534                 DLB_LOG_ERR("get resources err=%d for %s\n", err, name);
3535                 return err;
3536         }
3537
3538         dlb_iface_low_level_io_init(dlb);
3539
3540         dlb_entry_points_init(dev);
3541
3542         return 0;
3543 }
3544
3545 int
3546 dlb_parse_params(const char *params,
3547                  const char *name,
3548                  struct dlb_devargs *dlb_args)
3549 {
3550         int ret = 0;
3551         static const char * const args[] = { NUMA_NODE_ARG,
3552                                              DLB_MAX_NUM_EVENTS,
3553                                              DLB_NUM_DIR_CREDITS,
3554                                              DEV_ID_ARG,
3555                                              DLB_DEFER_SCHED_ARG,
3556                                              DLB_NUM_ATM_INFLIGHTS_ARG,
3557                                              NULL };
3558
3559         if (params && params[0] != '\0') {
3560                 struct rte_kvargs *kvlist = rte_kvargs_parse(params, args);
3561
3562                 if (kvlist == NULL) {
3563                         DLB_LOG_INFO("Ignoring unsupported parameters when creating device '%s'\n",
3564                                      name);
3565                 } else {
3566                         int ret = rte_kvargs_process(kvlist, NUMA_NODE_ARG,
3567                                                      set_numa_node,
3568                                                      &dlb_args->socket_id);
3569                         if (ret != 0) {
3570                                 DLB_LOG_ERR("%s: Error parsing numa node parameter",
3571                                             name);
3572                                 rte_kvargs_free(kvlist);
3573                                 return ret;
3574                         }
3575
3576                         ret = rte_kvargs_process(kvlist, DLB_MAX_NUM_EVENTS,
3577                                                  set_max_num_events,
3578                                                  &dlb_args->max_num_events);
3579                         if (ret != 0) {
3580                                 DLB_LOG_ERR("%s: Error parsing max_num_events parameter",
3581                                             name);
3582                                 rte_kvargs_free(kvlist);
3583                                 return ret;
3584                         }
3585
3586                         ret = rte_kvargs_process(kvlist,
3587                                         DLB_NUM_DIR_CREDITS,
3588                                         set_num_dir_credits,
3589                                         &dlb_args->num_dir_credits_override);
3590                         if (ret != 0) {
3591                                 DLB_LOG_ERR("%s: Error parsing num_dir_credits parameter",
3592                                             name);
3593                                 rte_kvargs_free(kvlist);
3594                                 return ret;
3595                         }
3596
3597                         ret = rte_kvargs_process(kvlist, DEV_ID_ARG,
3598                                                  set_dev_id,
3599                                                  &dlb_args->dev_id);
3600                         if (ret != 0) {
3601                                 DLB_LOG_ERR("%s: Error parsing dev_id parameter",
3602                                             name);
3603                                 rte_kvargs_free(kvlist);
3604                                 return ret;
3605                         }
3606
3607                         ret = rte_kvargs_process(kvlist, DLB_DEFER_SCHED_ARG,
3608                                                  set_defer_sched,
3609                                                  &dlb_args->defer_sched);
3610                         if (ret != 0) {
3611                                 DLB_LOG_ERR("%s: Error parsing defer_sched parameter",
3612                                             name);
3613                                 rte_kvargs_free(kvlist);
3614                                 return ret;
3615                         }
3616
3617                         ret = rte_kvargs_process(kvlist,
3618                                                  DLB_NUM_ATM_INFLIGHTS_ARG,
3619                                                  set_num_atm_inflights,
3620                                                  &dlb_args->num_atm_inflights);
3621                         if (ret != 0) {
3622                                 DLB_LOG_ERR("%s: Error parsing atm_inflights parameter",
3623                                             name);
3624                                 rte_kvargs_free(kvlist);
3625                                 return ret;
3626                         }
3627
3628                         rte_kvargs_free(kvlist);
3629                 }
3630         }
3631         return ret;
3632 }
3633 RTE_LOG_REGISTER(eventdev_dlb_log_level, pmd.event.dlb, NOTICE);