event/dlb: add port link
[dpdk.git] / drivers / event / dlb / dlb.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2020 Intel Corporation
3  */
4
5 #include <assert.h>
6 #include <errno.h>
7 #include <nmmintrin.h>
8 #include <pthread.h>
9 #include <stdbool.h>
10 #include <stdint.h>
11 #include <stdio.h>
12 #include <string.h>
13 #include <sys/fcntl.h>
14 #include <sys/mman.h>
15 #include <unistd.h>
16
17 #include <rte_common.h>
18 #include <rte_config.h>
19 #include <rte_cycles.h>
20 #include <rte_debug.h>
21 #include <rte_dev.h>
22 #include <rte_errno.h>
23 #include <rte_io.h>
24 #include <rte_kvargs.h>
25 #include <rte_log.h>
26 #include <rte_malloc.h>
27 #include <rte_mbuf.h>
28 #include <rte_prefetch.h>
29 #include <rte_ring.h>
30 #include <rte_string_fns.h>
31
32 #include <rte_eventdev.h>
33 #include <rte_eventdev_pmd.h>
34
35 #include "dlb_priv.h"
36 #include "dlb_iface.h"
37 #include "dlb_inline_fns.h"
38
39 /*
40  * Resources exposed to eventdev.
41  */
42 #if (RTE_EVENT_MAX_QUEUES_PER_DEV > UINT8_MAX)
43 #error "RTE_EVENT_MAX_QUEUES_PER_DEV cannot fit in member max_event_queues"
44 #endif
45 static struct rte_event_dev_info evdev_dlb_default_info = {
46         .driver_name = "", /* probe will set */
47         .min_dequeue_timeout_ns = DLB_MIN_DEQUEUE_TIMEOUT_NS,
48         .max_dequeue_timeout_ns = DLB_MAX_DEQUEUE_TIMEOUT_NS,
49 #if (RTE_EVENT_MAX_QUEUES_PER_DEV < DLB_MAX_NUM_LDB_QUEUES)
50         .max_event_queues = RTE_EVENT_MAX_QUEUES_PER_DEV,
51 #else
52         .max_event_queues = DLB_MAX_NUM_LDB_QUEUES,
53 #endif
54         .max_event_queue_flows = DLB_MAX_NUM_FLOWS,
55         .max_event_queue_priority_levels = DLB_QID_PRIORITIES,
56         .max_event_priority_levels = DLB_QID_PRIORITIES,
57         .max_event_ports = DLB_MAX_NUM_LDB_PORTS,
58         .max_event_port_dequeue_depth = DLB_MAX_CQ_DEPTH,
59         .max_event_port_enqueue_depth = DLB_MAX_ENQUEUE_DEPTH,
60         .max_event_port_links = DLB_MAX_NUM_QIDS_PER_LDB_CQ,
61         .max_num_events = DLB_MAX_NUM_LDB_CREDITS,
62         .max_single_link_event_port_queue_pairs = DLB_MAX_NUM_DIR_PORTS,
63         .event_dev_cap = (RTE_EVENT_DEV_CAP_QUEUE_QOS |
64                           RTE_EVENT_DEV_CAP_EVENT_QOS |
65                           RTE_EVENT_DEV_CAP_BURST_MODE |
66                           RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED |
67                           RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE |
68                           RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES),
69 };
70
71 struct process_local_port_data
72 dlb_port[DLB_MAX_NUM_PORTS][NUM_DLB_PORT_TYPES];
73
74 uint32_t
75 dlb_get_queue_depth(struct dlb_eventdev *dlb,
76                     struct dlb_eventdev_queue *queue)
77 {
78         /* DUMMY FOR NOW So "xstats" patch compiles */
79         RTE_SET_USED(dlb);
80         RTE_SET_USED(queue);
81
82         return 0;
83 }
84
85 static int
86 dlb_hw_query_resources(struct dlb_eventdev *dlb)
87 {
88         struct dlb_hw_dev *handle = &dlb->qm_instance;
89         struct dlb_hw_resource_info *dlb_info = &handle->info;
90         int ret;
91
92         ret = dlb_iface_get_num_resources(handle,
93                                           &dlb->hw_rsrc_query_results);
94         if (ret) {
95                 DLB_LOG_ERR("get dlb num resources, err=%d\n", ret);
96                 return ret;
97         }
98
99         /* Complete filling in device resource info returned to evdev app,
100          * overriding any default values.
101          * The capabilities (CAPs) were set at compile time.
102          */
103
104         evdev_dlb_default_info.max_event_queues =
105                 dlb->hw_rsrc_query_results.num_ldb_queues;
106
107         evdev_dlb_default_info.max_event_ports =
108                 dlb->hw_rsrc_query_results.num_ldb_ports;
109
110         evdev_dlb_default_info.max_num_events =
111                 dlb->hw_rsrc_query_results.max_contiguous_ldb_credits;
112
113         /* Save off values used when creating the scheduling domain. */
114
115         handle->info.num_sched_domains =
116                 dlb->hw_rsrc_query_results.num_sched_domains;
117
118         handle->info.hw_rsrc_max.nb_events_limit =
119                 dlb->hw_rsrc_query_results.max_contiguous_ldb_credits;
120
121         handle->info.hw_rsrc_max.num_queues =
122                 dlb->hw_rsrc_query_results.num_ldb_queues +
123                 dlb->hw_rsrc_query_results.num_dir_ports;
124
125         handle->info.hw_rsrc_max.num_ldb_queues =
126                 dlb->hw_rsrc_query_results.num_ldb_queues;
127
128         handle->info.hw_rsrc_max.num_ldb_ports =
129                 dlb->hw_rsrc_query_results.num_ldb_ports;
130
131         handle->info.hw_rsrc_max.num_dir_ports =
132                 dlb->hw_rsrc_query_results.num_dir_ports;
133
134         handle->info.hw_rsrc_max.reorder_window_size =
135                 dlb->hw_rsrc_query_results.num_hist_list_entries;
136
137         rte_memcpy(dlb_info, &handle->info.hw_rsrc_max, sizeof(*dlb_info));
138
139         return 0;
140 }
141
142 static void
143 dlb_free_qe_mem(struct dlb_port *qm_port)
144 {
145         if (qm_port == NULL)
146                 return;
147
148         rte_free(qm_port->qe4);
149         qm_port->qe4 = NULL;
150
151         rte_free(qm_port->consume_qe);
152         qm_port->consume_qe = NULL;
153 }
154
155 static int
156 dlb_init_consume_qe(struct dlb_port *qm_port, char *mz_name)
157 {
158         struct dlb_cq_pop_qe *qe;
159
160         qe = rte_zmalloc(mz_name,
161                         DLB_NUM_QES_PER_CACHE_LINE *
162                                 sizeof(struct dlb_cq_pop_qe),
163                         RTE_CACHE_LINE_SIZE);
164
165         if (qe == NULL) {
166                 DLB_LOG_ERR("dlb: no memory for consume_qe\n");
167                 return -ENOMEM;
168         }
169
170         qm_port->consume_qe = qe;
171
172         qe->qe_valid = 0;
173         qe->qe_frag = 0;
174         qe->qe_comp = 0;
175         qe->cq_token = 1;
176         /* Tokens value is 0-based; i.e. '0' returns 1 token, '1' returns 2,
177          * and so on.
178          */
179         qe->tokens = 0; /* set at run time */
180         qe->meas_lat = 0;
181         qe->no_dec = 0;
182         /* Completion IDs are disabled */
183         qe->cmp_id = 0;
184
185         return 0;
186 }
187
188 static int
189 dlb_init_qe_mem(struct dlb_port *qm_port, char *mz_name)
190 {
191         int ret, sz;
192
193         sz = DLB_NUM_QES_PER_CACHE_LINE * sizeof(struct dlb_enqueue_qe);
194
195         qm_port->qe4 = rte_zmalloc(mz_name, sz, RTE_CACHE_LINE_SIZE);
196
197         if (qm_port->qe4 == NULL) {
198                 DLB_LOG_ERR("dlb: no qe4 memory\n");
199                 ret = -ENOMEM;
200                 goto error_exit;
201         }
202
203         ret = dlb_init_consume_qe(qm_port, mz_name);
204         if (ret < 0) {
205                 DLB_LOG_ERR("dlb: dlb_init_consume_qe ret=%d\n", ret);
206                 goto error_exit;
207         }
208
209         return 0;
210
211 error_exit:
212
213         dlb_free_qe_mem(qm_port);
214
215         return ret;
216 }
217
218 /* Wrapper for string to int conversion. Substituted for atoi(...), which is
219  * unsafe.
220  */
221 #define DLB_BASE_10 10
222
223 static int
224 dlb_string_to_int(int *result, const char *str)
225 {
226         long ret;
227         char *endstr;
228
229         if (str == NULL || result == NULL)
230                 return -EINVAL;
231
232         errno = 0;
233         ret = strtol(str, &endstr, DLB_BASE_10);
234         if (errno)
235                 return -errno;
236
237         /* long int and int may be different width for some architectures */
238         if (ret < INT_MIN || ret > INT_MAX || endstr == str)
239                 return -EINVAL;
240
241         *result = ret;
242         return 0;
243 }
244
245 static int
246 set_numa_node(const char *key __rte_unused, const char *value, void *opaque)
247 {
248         int *socket_id = opaque;
249         int ret;
250
251         ret = dlb_string_to_int(socket_id, value);
252         if (ret < 0)
253                 return ret;
254
255         if (*socket_id > RTE_MAX_NUMA_NODES)
256                 return -EINVAL;
257
258         return 0;
259 }
260
261 static int
262 set_max_num_events(const char *key __rte_unused,
263                    const char *value,
264                    void *opaque)
265 {
266         int *max_num_events = opaque;
267         int ret;
268
269         if (value == NULL || opaque == NULL) {
270                 DLB_LOG_ERR("NULL pointer\n");
271                 return -EINVAL;
272         }
273
274         ret = dlb_string_to_int(max_num_events, value);
275         if (ret < 0)
276                 return ret;
277
278         if (*max_num_events < 0 || *max_num_events > DLB_MAX_NUM_LDB_CREDITS) {
279                 DLB_LOG_ERR("dlb: max_num_events must be between 0 and %d\n",
280                             DLB_MAX_NUM_LDB_CREDITS);
281                 return -EINVAL;
282         }
283
284         return 0;
285 }
286
287 static int
288 set_num_dir_credits(const char *key __rte_unused,
289                     const char *value,
290                     void *opaque)
291 {
292         int *num_dir_credits = opaque;
293         int ret;
294
295         if (value == NULL || opaque == NULL) {
296                 DLB_LOG_ERR("NULL pointer\n");
297                 return -EINVAL;
298         }
299
300         ret = dlb_string_to_int(num_dir_credits, value);
301         if (ret < 0)
302                 return ret;
303
304         if (*num_dir_credits < 0 ||
305             *num_dir_credits > DLB_MAX_NUM_DIR_CREDITS) {
306                 DLB_LOG_ERR("dlb: num_dir_credits must be between 0 and %d\n",
307                             DLB_MAX_NUM_DIR_CREDITS);
308                 return -EINVAL;
309         }
310         return 0;
311 }
312
313 /* VDEV-only notes:
314  * This function first unmaps all memory mappings and closes the
315  * domain's file descriptor, which causes the driver to reset the
316  * scheduling domain. Once that completes (when close() returns), we
317  * can safely free the dynamically allocated memory used by the
318  * scheduling domain.
319  *
320  * PF-only notes:
321  * We will maintain a use count and use that to determine when
322  * a reset is required.  In PF mode, we never mmap, or munmap
323  * device memory,  and we own the entire physical PCI device.
324  */
325
326 static void
327 dlb_hw_reset_sched_domain(const struct rte_eventdev *dev, bool reconfig)
328 {
329         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
330         enum dlb_configuration_state config_state;
331         int i, j;
332
333         /* Close and reset the domain */
334         dlb_iface_domain_close(dlb);
335
336         /* Free all dynamically allocated port memory */
337         for (i = 0; i < dlb->num_ports; i++)
338                 dlb_free_qe_mem(&dlb->ev_ports[i].qm_port);
339
340         /* If reconfiguring, mark the device's queues and ports as "previously
341          * configured." If the user does not reconfigure them, the PMD will
342          * reapply their previous configuration when the device is started.
343          */
344         config_state = (reconfig) ? DLB_PREV_CONFIGURED : DLB_NOT_CONFIGURED;
345
346         for (i = 0; i < dlb->num_ports; i++) {
347                 dlb->ev_ports[i].qm_port.config_state = config_state;
348                 /* Reset setup_done so ports can be reconfigured */
349                 dlb->ev_ports[i].setup_done = false;
350                 for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++)
351                         dlb->ev_ports[i].link[j].mapped = false;
352         }
353
354         for (i = 0; i < dlb->num_queues; i++)
355                 dlb->ev_queues[i].qm_queue.config_state = config_state;
356
357         for (i = 0; i < DLB_MAX_NUM_QUEUES; i++)
358                 dlb->ev_queues[i].setup_done = false;
359
360         dlb->num_ports = 0;
361         dlb->num_ldb_ports = 0;
362         dlb->num_dir_ports = 0;
363         dlb->num_queues = 0;
364         dlb->num_ldb_queues = 0;
365         dlb->num_dir_queues = 0;
366         dlb->configured = false;
367 }
368
369 static int
370 dlb_ldb_credit_pool_create(struct dlb_hw_dev *handle)
371 {
372         struct dlb_create_ldb_pool_args cfg;
373         struct dlb_cmd_response response;
374         int ret;
375
376         if (handle == NULL)
377                 return -EINVAL;
378
379         if (!handle->cfg.resources.num_ldb_credits) {
380                 handle->cfg.ldb_credit_pool_id = 0;
381                 handle->cfg.num_ldb_credits = 0;
382                 return 0;
383         }
384
385         cfg.response = (uintptr_t)&response;
386         cfg.num_ldb_credits = handle->cfg.resources.num_ldb_credits;
387
388         ret = dlb_iface_ldb_credit_pool_create(handle,
389                                                &cfg);
390         if (ret < 0) {
391                 DLB_LOG_ERR("dlb: ldb_credit_pool_create ret=%d (driver status: %s)\n",
392                             ret, dlb_error_strings[response.status]);
393         }
394
395         handle->cfg.ldb_credit_pool_id = response.id;
396         handle->cfg.num_ldb_credits = cfg.num_ldb_credits;
397
398         return ret;
399 }
400
401 static int
402 dlb_dir_credit_pool_create(struct dlb_hw_dev *handle)
403 {
404         struct dlb_create_dir_pool_args cfg;
405         struct dlb_cmd_response response;
406         int ret;
407
408         if (handle == NULL)
409                 return -EINVAL;
410
411         if (!handle->cfg.resources.num_dir_credits) {
412                 handle->cfg.dir_credit_pool_id = 0;
413                 handle->cfg.num_dir_credits = 0;
414                 return 0;
415         }
416
417         cfg.response = (uintptr_t)&response;
418         cfg.num_dir_credits = handle->cfg.resources.num_dir_credits;
419
420         ret = dlb_iface_dir_credit_pool_create(handle, &cfg);
421         if (ret < 0)
422                 DLB_LOG_ERR("dlb: dir_credit_pool_create ret=%d (driver status: %s)\n",
423                             ret, dlb_error_strings[response.status]);
424
425         handle->cfg.dir_credit_pool_id = response.id;
426         handle->cfg.num_dir_credits = cfg.num_dir_credits;
427
428         return ret;
429 }
430
431 static int
432 dlb_hw_create_sched_domain(struct dlb_hw_dev *handle,
433                            struct dlb_eventdev *dlb,
434                            const struct dlb_hw_rsrcs *resources_asked)
435 {
436         int ret = 0;
437         struct dlb_create_sched_domain_args *config_params;
438         struct dlb_cmd_response response;
439
440         if (resources_asked == NULL) {
441                 DLB_LOG_ERR("dlb: dlb_create NULL parameter\n");
442                 ret = EINVAL;
443                 goto error_exit;
444         }
445
446         /* Map generic qm resources to dlb resources */
447         config_params = &handle->cfg.resources;
448
449         config_params->response = (uintptr_t)&response;
450
451         /* DIR ports and queues */
452
453         config_params->num_dir_ports =
454                 resources_asked->num_dir_ports;
455
456         config_params->num_dir_credits =
457                 resources_asked->num_dir_credits;
458
459         /* LDB ports and queues */
460
461         config_params->num_ldb_queues =
462                 resources_asked->num_ldb_queues;
463
464         config_params->num_ldb_ports =
465                 resources_asked->num_ldb_ports;
466
467         config_params->num_ldb_credits =
468                 resources_asked->num_ldb_credits;
469
470         config_params->num_atomic_inflights =
471                 dlb->num_atm_inflights_per_queue *
472                 config_params->num_ldb_queues;
473
474         config_params->num_hist_list_entries = config_params->num_ldb_ports *
475                 DLB_NUM_HIST_LIST_ENTRIES_PER_LDB_PORT;
476
477         /* dlb limited to 1 credit pool per queue type */
478         config_params->num_ldb_credit_pools = 1;
479         config_params->num_dir_credit_pools = 1;
480
481         DLB_LOG_DBG("sched domain create - ldb_qs=%d, ldb_ports=%d, dir_ports=%d, atomic_inflights=%d, hist_list_entries=%d, ldb_credits=%d, dir_credits=%d, ldb_cred_pools=%d, dir-credit_pools=%d\n",
482                     config_params->num_ldb_queues,
483                     config_params->num_ldb_ports,
484                     config_params->num_dir_ports,
485                     config_params->num_atomic_inflights,
486                     config_params->num_hist_list_entries,
487                     config_params->num_ldb_credits,
488                     config_params->num_dir_credits,
489                     config_params->num_ldb_credit_pools,
490                     config_params->num_dir_credit_pools);
491
492         /* Configure the QM */
493
494         ret = dlb_iface_sched_domain_create(handle, config_params);
495         if (ret < 0) {
496                 DLB_LOG_ERR("dlb: domain create failed, device_id = %d, (driver ret = %d, extra status: %s)\n",
497                             handle->device_id,
498                             ret,
499                             dlb_error_strings[response.status]);
500                 goto error_exit;
501         }
502
503         handle->domain_id = response.id;
504         handle->domain_id_valid = 1;
505
506         config_params->response = 0;
507
508         ret = dlb_ldb_credit_pool_create(handle);
509         if (ret < 0) {
510                 DLB_LOG_ERR("dlb: create ldb credit pool failed\n");
511                 goto error_exit2;
512         }
513
514         ret = dlb_dir_credit_pool_create(handle);
515         if (ret < 0) {
516                 DLB_LOG_ERR("dlb: create dir credit pool failed\n");
517                 goto error_exit2;
518         }
519
520         handle->cfg.configured = true;
521
522         return 0;
523
524 error_exit2:
525         dlb_iface_domain_close(dlb);
526
527 error_exit:
528         return ret;
529 }
530
531 /* End HW specific */
532 static void
533 dlb_eventdev_info_get(struct rte_eventdev *dev,
534                       struct rte_event_dev_info *dev_info)
535 {
536         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
537         int ret;
538
539         ret = dlb_hw_query_resources(dlb);
540         if (ret) {
541                 const struct rte_eventdev_data *data = dev->data;
542
543                 DLB_LOG_ERR("get resources err=%d, devid=%d\n",
544                             ret, data->dev_id);
545                 /* fn is void, so fall through and return values set up in
546                  * probe
547                  */
548         }
549
550         /* Add num resources currently owned by this domain.
551          * These would become available if the scheduling domain were reset due
552          * to the application recalling eventdev_configure to *reconfigure* the
553          * domain.
554          */
555         evdev_dlb_default_info.max_event_ports += dlb->num_ldb_ports;
556         evdev_dlb_default_info.max_event_queues += dlb->num_ldb_queues;
557         evdev_dlb_default_info.max_num_events += dlb->num_ldb_credits;
558
559         /* In DLB A-stepping hardware, applications are limited to 128
560          * configured ports (load-balanced or directed). The reported number of
561          * available ports must reflect this.
562          */
563         if (dlb->revision < DLB_REV_B0) {
564                 int used_ports;
565
566                 used_ports = DLB_MAX_NUM_LDB_PORTS + DLB_MAX_NUM_DIR_PORTS -
567                         dlb->hw_rsrc_query_results.num_ldb_ports -
568                         dlb->hw_rsrc_query_results.num_dir_ports;
569
570                 evdev_dlb_default_info.max_event_ports =
571                         RTE_MIN(evdev_dlb_default_info.max_event_ports,
572                                 128 - used_ports);
573         }
574
575         evdev_dlb_default_info.max_event_queues =
576                 RTE_MIN(evdev_dlb_default_info.max_event_queues,
577                         RTE_EVENT_MAX_QUEUES_PER_DEV);
578
579         evdev_dlb_default_info.max_num_events =
580                 RTE_MIN(evdev_dlb_default_info.max_num_events,
581                         dlb->max_num_events_override);
582
583         *dev_info = evdev_dlb_default_info;
584 }
585
586 /* Note: 1 QM instance per QM device, QM instance/device == event device */
587 static int
588 dlb_eventdev_configure(const struct rte_eventdev *dev)
589 {
590         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
591         struct dlb_hw_dev *handle = &dlb->qm_instance;
592         struct dlb_hw_rsrcs *rsrcs = &handle->info.hw_rsrc_max;
593         const struct rte_eventdev_data *data = dev->data;
594         const struct rte_event_dev_config *config = &data->dev_conf;
595         int ret;
596
597         /* If this eventdev is already configured, we must release the current
598          * scheduling domain before attempting to configure a new one.
599          */
600         if (dlb->configured) {
601                 dlb_hw_reset_sched_domain(dev, true);
602
603                 ret = dlb_hw_query_resources(dlb);
604                 if (ret) {
605                         DLB_LOG_ERR("get resources err=%d, devid=%d\n",
606                                     ret, data->dev_id);
607                         return ret;
608                 }
609         }
610
611         if (config->nb_event_queues > rsrcs->num_queues) {
612                 DLB_LOG_ERR("nb_event_queues parameter (%d) exceeds the QM device's capabilities (%d).\n",
613                             config->nb_event_queues,
614                             rsrcs->num_queues);
615                 return -EINVAL;
616         }
617         if (config->nb_event_ports > (rsrcs->num_ldb_ports
618                         + rsrcs->num_dir_ports)) {
619                 DLB_LOG_ERR("nb_event_ports parameter (%d) exceeds the QM device's capabilities (%d).\n",
620                             config->nb_event_ports,
621                             (rsrcs->num_ldb_ports + rsrcs->num_dir_ports));
622                 return -EINVAL;
623         }
624         if (config->nb_events_limit > rsrcs->nb_events_limit) {
625                 DLB_LOG_ERR("nb_events_limit parameter (%d) exceeds the QM device's capabilities (%d).\n",
626                             config->nb_events_limit,
627                             rsrcs->nb_events_limit);
628                 return -EINVAL;
629         }
630
631         if (config->event_dev_cfg & RTE_EVENT_DEV_CFG_PER_DEQUEUE_TIMEOUT)
632                 dlb->global_dequeue_wait = false;
633         else {
634                 uint32_t timeout32;
635
636                 dlb->global_dequeue_wait = true;
637
638                 timeout32 = config->dequeue_timeout_ns;
639
640                 dlb->global_dequeue_wait_ticks =
641                         timeout32 * (rte_get_timer_hz() / 1E9);
642         }
643
644         /* Does this platform support umonitor/umwait? */
645         if (rte_cpu_get_flag_enabled(RTE_CPUFLAG_WAITPKG)) {
646                 if (RTE_LIBRTE_PMD_DLB_UMWAIT_CTL_STATE != 0 &&
647                     RTE_LIBRTE_PMD_DLB_UMWAIT_CTL_STATE != 1) {
648                         DLB_LOG_ERR("invalid value (%d) for RTE_LIBRTE_PMD_DLB_UMWAIT_CTL_STATE must be 0 or 1.\n",
649                                     RTE_LIBRTE_PMD_DLB_UMWAIT_CTL_STATE);
650                         return -EINVAL;
651                 }
652                 dlb->umwait_allowed = true;
653         }
654
655         rsrcs->num_dir_ports = config->nb_single_link_event_port_queues;
656         rsrcs->num_ldb_ports = config->nb_event_ports - rsrcs->num_dir_ports;
657         /* 1 dir queue per dir port */
658         rsrcs->num_ldb_queues = config->nb_event_queues - rsrcs->num_dir_ports;
659
660         /* Scale down nb_events_limit by 4 for directed credits, since there
661          * are 4x as many load-balanced credits.
662          */
663         rsrcs->num_ldb_credits = 0;
664         rsrcs->num_dir_credits = 0;
665
666         if (rsrcs->num_ldb_queues)
667                 rsrcs->num_ldb_credits = config->nb_events_limit;
668         if (rsrcs->num_dir_ports)
669                 rsrcs->num_dir_credits = config->nb_events_limit / 4;
670         if (dlb->num_dir_credits_override != -1)
671                 rsrcs->num_dir_credits = dlb->num_dir_credits_override;
672
673         if (dlb_hw_create_sched_domain(handle, dlb, rsrcs) < 0) {
674                 DLB_LOG_ERR("dlb_hw_create_sched_domain failed\n");
675                 return -ENODEV;
676         }
677
678         dlb->new_event_limit = config->nb_events_limit;
679         __atomic_store_n(&dlb->inflights, 0, __ATOMIC_SEQ_CST);
680
681         /* Save number of ports/queues for this event dev */
682         dlb->num_ports = config->nb_event_ports;
683         dlb->num_queues = config->nb_event_queues;
684         dlb->num_dir_ports = rsrcs->num_dir_ports;
685         dlb->num_ldb_ports = dlb->num_ports - dlb->num_dir_ports;
686         dlb->num_ldb_queues = dlb->num_queues - dlb->num_dir_ports;
687         dlb->num_dir_queues = dlb->num_dir_ports;
688         dlb->num_ldb_credits = rsrcs->num_ldb_credits;
689         dlb->num_dir_credits = rsrcs->num_dir_credits;
690
691         dlb->configured = true;
692
693         return 0;
694 }
695
696 static void
697 dlb_eventdev_port_default_conf_get(struct rte_eventdev *dev,
698                                    uint8_t port_id,
699                                    struct rte_event_port_conf *port_conf)
700 {
701         RTE_SET_USED(port_id);
702         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
703
704         port_conf->new_event_threshold = dlb->new_event_limit;
705         port_conf->dequeue_depth = 32;
706         port_conf->enqueue_depth = DLB_MAX_ENQUEUE_DEPTH;
707         port_conf->event_port_cfg = 0;
708 }
709
710 static void
711 dlb_eventdev_queue_default_conf_get(struct rte_eventdev *dev,
712                                     uint8_t queue_id,
713                                     struct rte_event_queue_conf *queue_conf)
714 {
715         RTE_SET_USED(dev);
716         RTE_SET_USED(queue_id);
717         queue_conf->nb_atomic_flows = 1024;
718         queue_conf->nb_atomic_order_sequences = 32;
719         queue_conf->event_queue_cfg = 0;
720         queue_conf->priority = 0;
721 }
722
723 static int
724 dlb_hw_create_ldb_port(struct dlb_eventdev *dlb,
725                        struct dlb_eventdev_port *ev_port,
726                        uint32_t dequeue_depth,
727                        uint32_t cq_depth,
728                        uint32_t enqueue_depth,
729                        uint16_t rsvd_tokens,
730                        bool use_rsvd_token_scheme)
731 {
732         struct dlb_hw_dev *handle = &dlb->qm_instance;
733         struct dlb_create_ldb_port_args cfg = {0};
734         struct dlb_cmd_response response = {0};
735         int ret;
736         struct dlb_port *qm_port = NULL;
737         char mz_name[RTE_MEMZONE_NAMESIZE];
738         uint32_t qm_port_id;
739
740         if (handle == NULL)
741                 return -EINVAL;
742
743         if (cq_depth < DLB_MIN_LDB_CQ_DEPTH) {
744                 DLB_LOG_ERR("dlb: invalid cq_depth, must be %d-%d\n",
745                         DLB_MIN_LDB_CQ_DEPTH, DLB_MAX_INPUT_QUEUE_DEPTH);
746                 return -EINVAL;
747         }
748
749         if (enqueue_depth < DLB_MIN_ENQUEUE_DEPTH) {
750                 DLB_LOG_ERR("dlb: invalid enqueue_depth, must be at least %d\n",
751                             DLB_MIN_ENQUEUE_DEPTH);
752                 return -EINVAL;
753         }
754
755         rte_spinlock_lock(&handle->resource_lock);
756
757         cfg.response = (uintptr_t)&response;
758
759         /* We round up to the next power of 2 if necessary */
760         cfg.cq_depth = rte_align32pow2(cq_depth);
761         cfg.cq_depth_threshold = rsvd_tokens;
762
763         cfg.cq_history_list_size = DLB_NUM_HIST_LIST_ENTRIES_PER_LDB_PORT;
764
765         /* User controls the LDB high watermark via enqueue depth. The DIR high
766          * watermark is equal, unless the directed credit pool is too small.
767          */
768         cfg.ldb_credit_high_watermark = enqueue_depth;
769
770         /* If there are no directed ports, the kernel driver will ignore this
771          * port's directed credit settings. Don't use enqueue_depth if it would
772          * require more directed credits than are available.
773          */
774         cfg.dir_credit_high_watermark =
775                 RTE_MIN(enqueue_depth,
776                         handle->cfg.num_dir_credits / dlb->num_ports);
777
778         cfg.ldb_credit_quantum = cfg.ldb_credit_high_watermark / 2;
779         cfg.ldb_credit_low_watermark = RTE_MIN(16, cfg.ldb_credit_quantum);
780
781         cfg.dir_credit_quantum = cfg.dir_credit_high_watermark / 2;
782         cfg.dir_credit_low_watermark = RTE_MIN(16, cfg.dir_credit_quantum);
783
784         /* Per QM values */
785
786         cfg.ldb_credit_pool_id = handle->cfg.ldb_credit_pool_id;
787         cfg.dir_credit_pool_id = handle->cfg.dir_credit_pool_id;
788
789         ret = dlb_iface_ldb_port_create(handle, &cfg, dlb->poll_mode);
790         if (ret < 0) {
791                 DLB_LOG_ERR("dlb: dlb_ldb_port_create error, ret=%d (driver status: %s)\n",
792                             ret, dlb_error_strings[response.status]);
793                 goto error_exit;
794         }
795
796         qm_port_id = response.id;
797
798         DLB_LOG_DBG("dlb: ev_port %d uses qm LB port %d <<<<<\n",
799                     ev_port->id, qm_port_id);
800
801         qm_port = &ev_port->qm_port;
802         qm_port->ev_port = ev_port; /* back ptr */
803         qm_port->dlb = dlb; /* back ptr */
804
805         /*
806          * Allocate and init local qe struct(s).
807          * Note: MOVDIR64 requires the enqueue QE (qe4) to be aligned.
808          */
809
810         snprintf(mz_name, sizeof(mz_name), "ldb_port%d",
811                  ev_port->id);
812
813         ret = dlb_init_qe_mem(qm_port, mz_name);
814         if (ret < 0) {
815                 DLB_LOG_ERR("dlb: init_qe_mem failed, ret=%d\n", ret);
816                 goto error_exit;
817         }
818
819         qm_port->pp_mmio_base = DLB_LDB_PP_BASE + PAGE_SIZE * qm_port_id;
820         qm_port->id = qm_port_id;
821
822         /* The credit window is one high water mark of QEs */
823         qm_port->ldb_pushcount_at_credit_expiry = 0;
824         qm_port->cached_ldb_credits = cfg.ldb_credit_high_watermark;
825         /* The credit window is one high water mark of QEs */
826         qm_port->dir_pushcount_at_credit_expiry = 0;
827         qm_port->cached_dir_credits = cfg.dir_credit_high_watermark;
828         qm_port->cq_depth = cfg.cq_depth;
829         /* CQs with depth < 8 use an 8-entry queue, but withhold credits so
830          * the effective depth is smaller.
831          */
832         qm_port->cq_depth = cfg.cq_depth <= 8 ? 8 : cfg.cq_depth;
833         qm_port->cq_idx = 0;
834         qm_port->cq_idx_unmasked = 0;
835         if (dlb->poll_mode == DLB_CQ_POLL_MODE_SPARSE)
836                 qm_port->cq_depth_mask = (qm_port->cq_depth * 4) - 1;
837         else
838                 qm_port->cq_depth_mask = qm_port->cq_depth - 1;
839
840         qm_port->gen_bit_shift = __builtin_popcount(qm_port->cq_depth_mask);
841         /* starting value of gen bit - it toggles at wrap time */
842         qm_port->gen_bit = 1;
843
844         qm_port->use_rsvd_token_scheme = use_rsvd_token_scheme;
845         qm_port->cq_rsvd_token_deficit = rsvd_tokens;
846         qm_port->int_armed = false;
847
848         /* Save off for later use in info and lookup APIs. */
849         qm_port->qid_mappings = &dlb->qm_ldb_to_ev_queue_id[0];
850
851         qm_port->dequeue_depth = dequeue_depth;
852
853         qm_port->owed_tokens = 0;
854         qm_port->issued_releases = 0;
855
856         /* update state */
857         qm_port->state = PORT_STARTED; /* enabled at create time */
858         qm_port->config_state = DLB_CONFIGURED;
859
860         qm_port->dir_credits = cfg.dir_credit_high_watermark;
861         qm_port->ldb_credits = cfg.ldb_credit_high_watermark;
862
863         DLB_LOG_DBG("dlb: created ldb port %d, depth = %d, ldb credits=%d, dir credits=%d\n",
864                     qm_port_id,
865                     cq_depth,
866                     qm_port->ldb_credits,
867                     qm_port->dir_credits);
868
869         rte_spinlock_unlock(&handle->resource_lock);
870
871         return 0;
872
873 error_exit:
874         if (qm_port) {
875                 dlb_free_qe_mem(qm_port);
876                 qm_port->pp_mmio_base = 0;
877         }
878
879         rte_spinlock_unlock(&handle->resource_lock);
880
881         DLB_LOG_ERR("dlb: create ldb port failed!\n");
882
883         return ret;
884 }
885
886 static int
887 dlb_hw_create_dir_port(struct dlb_eventdev *dlb,
888                        struct dlb_eventdev_port *ev_port,
889                        uint32_t dequeue_depth,
890                        uint32_t cq_depth,
891                        uint32_t enqueue_depth,
892                        uint16_t rsvd_tokens,
893                        bool use_rsvd_token_scheme)
894 {
895         struct dlb_hw_dev *handle = &dlb->qm_instance;
896         struct dlb_create_dir_port_args cfg = {0};
897         struct dlb_cmd_response response = {0};
898         int ret;
899         struct dlb_port *qm_port = NULL;
900         char mz_name[RTE_MEMZONE_NAMESIZE];
901         uint32_t qm_port_id;
902
903         if (dlb == NULL || handle == NULL)
904                 return -EINVAL;
905
906         if (cq_depth < DLB_MIN_DIR_CQ_DEPTH) {
907                 DLB_LOG_ERR("dlb: invalid cq_depth, must be at least %d\n",
908                             DLB_MIN_DIR_CQ_DEPTH);
909                 return -EINVAL;
910         }
911
912         if (enqueue_depth < DLB_MIN_ENQUEUE_DEPTH) {
913                 DLB_LOG_ERR("dlb: invalid enqueue_depth, must be at least %d\n",
914                             DLB_MIN_ENQUEUE_DEPTH);
915                 return -EINVAL;
916         }
917
918         rte_spinlock_lock(&handle->resource_lock);
919
920         /* Directed queues are configured at link time. */
921         cfg.queue_id = -1;
922
923         cfg.response = (uintptr_t)&response;
924
925         /* We round up to the next power of 2 if necessary */
926         cfg.cq_depth = rte_align32pow2(cq_depth);
927         cfg.cq_depth_threshold = rsvd_tokens;
928
929         /* User controls the LDB high watermark via enqueue depth. The DIR high
930          * watermark is equal, unless the directed credit pool is too small.
931          */
932         cfg.ldb_credit_high_watermark = enqueue_depth;
933
934         /* Don't use enqueue_depth if it would require more directed credits
935          * than are available.
936          */
937         cfg.dir_credit_high_watermark =
938                 RTE_MIN(enqueue_depth,
939                         handle->cfg.num_dir_credits / dlb->num_ports);
940
941         cfg.ldb_credit_quantum = cfg.ldb_credit_high_watermark / 2;
942         cfg.ldb_credit_low_watermark = RTE_MIN(16, cfg.ldb_credit_quantum);
943
944         cfg.dir_credit_quantum = cfg.dir_credit_high_watermark / 2;
945         cfg.dir_credit_low_watermark = RTE_MIN(16, cfg.dir_credit_quantum);
946
947         /* Per QM values */
948
949         cfg.ldb_credit_pool_id = handle->cfg.ldb_credit_pool_id;
950         cfg.dir_credit_pool_id = handle->cfg.dir_credit_pool_id;
951
952         ret = dlb_iface_dir_port_create(handle, &cfg, dlb->poll_mode);
953         if (ret < 0) {
954                 DLB_LOG_ERR("dlb: dlb_dir_port_create error, ret=%d (driver status: %s)\n",
955                             ret, dlb_error_strings[response.status]);
956                 goto error_exit;
957         }
958
959         qm_port_id = response.id;
960
961         DLB_LOG_DBG("dlb: ev_port %d uses qm DIR port %d <<<<<\n",
962                     ev_port->id, qm_port_id);
963
964         qm_port = &ev_port->qm_port;
965         qm_port->ev_port = ev_port; /* back ptr */
966         qm_port->dlb = dlb;  /* back ptr */
967
968         /*
969          * Init local qe struct(s).
970          * Note: MOVDIR64 requires the enqueue QE to be aligned
971          */
972
973         snprintf(mz_name, sizeof(mz_name), "dir_port%d",
974                  ev_port->id);
975
976         ret = dlb_init_qe_mem(qm_port, mz_name);
977
978         if (ret < 0) {
979                 DLB_LOG_ERR("dlb: init_qe_mem failed, ret=%d\n", ret);
980                 goto error_exit;
981         }
982
983         qm_port->pp_mmio_base = DLB_DIR_PP_BASE + PAGE_SIZE * qm_port_id;
984         qm_port->id = qm_port_id;
985
986         /* The credit window is one high water mark of QEs */
987         qm_port->ldb_pushcount_at_credit_expiry = 0;
988         qm_port->cached_ldb_credits = cfg.ldb_credit_high_watermark;
989         /* The credit window is one high water mark of QEs */
990         qm_port->dir_pushcount_at_credit_expiry = 0;
991         qm_port->cached_dir_credits = cfg.dir_credit_high_watermark;
992         qm_port->cq_depth = cfg.cq_depth;
993         qm_port->cq_idx = 0;
994         qm_port->cq_idx_unmasked = 0;
995         if (dlb->poll_mode == DLB_CQ_POLL_MODE_SPARSE)
996                 qm_port->cq_depth_mask = (cfg.cq_depth * 4) - 1;
997         else
998                 qm_port->cq_depth_mask = cfg.cq_depth - 1;
999
1000         qm_port->gen_bit_shift = __builtin_popcount(qm_port->cq_depth_mask);
1001         /* starting value of gen bit - it toggles at wrap time */
1002         qm_port->gen_bit = 1;
1003
1004         qm_port->use_rsvd_token_scheme = use_rsvd_token_scheme;
1005         qm_port->cq_rsvd_token_deficit = rsvd_tokens;
1006         qm_port->int_armed = false;
1007
1008         /* Save off for later use in info and lookup APIs. */
1009         qm_port->qid_mappings = &dlb->qm_dir_to_ev_queue_id[0];
1010
1011         qm_port->dequeue_depth = dequeue_depth;
1012
1013         qm_port->owed_tokens = 0;
1014         qm_port->issued_releases = 0;
1015
1016         /* update state */
1017         qm_port->state = PORT_STARTED; /* enabled at create time */
1018         qm_port->config_state = DLB_CONFIGURED;
1019
1020         qm_port->dir_credits = cfg.dir_credit_high_watermark;
1021         qm_port->ldb_credits = cfg.ldb_credit_high_watermark;
1022
1023         DLB_LOG_DBG("dlb: created dir port %d, depth = %d cr=%d,%d\n",
1024                     qm_port_id,
1025                     cq_depth,
1026                     cfg.dir_credit_high_watermark,
1027                     cfg.ldb_credit_high_watermark);
1028
1029         rte_spinlock_unlock(&handle->resource_lock);
1030
1031         return 0;
1032
1033 error_exit:
1034         if (qm_port) {
1035                 qm_port->pp_mmio_base = 0;
1036                 dlb_free_qe_mem(qm_port);
1037         }
1038
1039         rte_spinlock_unlock(&handle->resource_lock);
1040
1041         DLB_LOG_ERR("dlb: create dir port failed!\n");
1042
1043         return ret;
1044 }
1045
1046 static int32_t
1047 dlb_hw_create_ldb_queue(struct dlb_eventdev *dlb,
1048                         struct dlb_queue *queue,
1049                         const struct rte_event_queue_conf *evq_conf)
1050 {
1051         struct dlb_hw_dev *handle = &dlb->qm_instance;
1052         struct dlb_create_ldb_queue_args cfg;
1053         struct dlb_cmd_response response;
1054         int32_t ret;
1055         uint32_t qm_qid;
1056         int sched_type = -1;
1057
1058         if (evq_conf == NULL)
1059                 return -EINVAL;
1060
1061         if (evq_conf->event_queue_cfg & RTE_EVENT_QUEUE_CFG_ALL_TYPES) {
1062                 if (evq_conf->nb_atomic_order_sequences != 0)
1063                         sched_type = RTE_SCHED_TYPE_ORDERED;
1064                 else
1065                         sched_type = RTE_SCHED_TYPE_PARALLEL;
1066         } else
1067                 sched_type = evq_conf->schedule_type;
1068
1069         cfg.response = (uintptr_t)&response;
1070         cfg.num_atomic_inflights = dlb->num_atm_inflights_per_queue;
1071         cfg.num_sequence_numbers = evq_conf->nb_atomic_order_sequences;
1072         cfg.num_qid_inflights = evq_conf->nb_atomic_order_sequences;
1073
1074         if (sched_type != RTE_SCHED_TYPE_ORDERED) {
1075                 cfg.num_sequence_numbers = 0;
1076                 cfg.num_qid_inflights = DLB_DEF_UNORDERED_QID_INFLIGHTS;
1077         }
1078
1079         ret = dlb_iface_ldb_queue_create(handle, &cfg);
1080         if (ret < 0) {
1081                 DLB_LOG_ERR("dlb: create LB event queue error, ret=%d (driver status: %s)\n",
1082                             ret, dlb_error_strings[response.status]);
1083                 return -EINVAL;
1084         }
1085
1086         qm_qid = response.id;
1087
1088         /* Save off queue config for debug, resource lookups, and reconfig */
1089         queue->num_qid_inflights = cfg.num_qid_inflights;
1090         queue->num_atm_inflights = cfg.num_atomic_inflights;
1091
1092         queue->sched_type = sched_type;
1093         queue->config_state = DLB_CONFIGURED;
1094
1095         DLB_LOG_DBG("Created LB event queue %d, nb_inflights=%d, nb_seq=%d, qid inflights=%d\n",
1096                     qm_qid,
1097                     cfg.num_atomic_inflights,
1098                     cfg.num_sequence_numbers,
1099                     cfg.num_qid_inflights);
1100
1101         return qm_qid;
1102 }
1103
1104 static int32_t
1105 dlb_get_sn_allocation(struct dlb_eventdev *dlb, int group)
1106 {
1107         struct dlb_hw_dev *handle = &dlb->qm_instance;
1108         struct dlb_get_sn_allocation_args cfg;
1109         struct dlb_cmd_response response;
1110         int ret;
1111
1112         cfg.group = group;
1113         cfg.response = (uintptr_t)&response;
1114
1115         ret = dlb_iface_get_sn_allocation(handle, &cfg);
1116         if (ret < 0) {
1117                 DLB_LOG_ERR("dlb: get_sn_allocation ret=%d (driver status: %s)\n",
1118                             ret, dlb_error_strings[response.status]);
1119                 return ret;
1120         }
1121
1122         return response.id;
1123 }
1124
1125 static int
1126 dlb_set_sn_allocation(struct dlb_eventdev *dlb, int group, int num)
1127 {
1128         struct dlb_hw_dev *handle = &dlb->qm_instance;
1129         struct dlb_set_sn_allocation_args cfg;
1130         struct dlb_cmd_response response;
1131         int ret;
1132
1133         cfg.num = num;
1134         cfg.group = group;
1135         cfg.response = (uintptr_t)&response;
1136
1137         ret = dlb_iface_set_sn_allocation(handle, &cfg);
1138         if (ret < 0) {
1139                 DLB_LOG_ERR("dlb: set_sn_allocation ret=%d (driver status: %s)\n",
1140                             ret, dlb_error_strings[response.status]);
1141                 return ret;
1142         }
1143
1144         return ret;
1145 }
1146
1147 static int32_t
1148 dlb_get_sn_occupancy(struct dlb_eventdev *dlb, int group)
1149 {
1150         struct dlb_hw_dev *handle = &dlb->qm_instance;
1151         struct dlb_get_sn_occupancy_args cfg;
1152         struct dlb_cmd_response response;
1153         int ret;
1154
1155         cfg.group = group;
1156         cfg.response = (uintptr_t)&response;
1157
1158         ret = dlb_iface_get_sn_occupancy(handle, &cfg);
1159         if (ret < 0) {
1160                 DLB_LOG_ERR("dlb: get_sn_occupancy ret=%d (driver status: %s)\n",
1161                             ret, dlb_error_strings[response.status]);
1162                 return ret;
1163         }
1164
1165         return response.id;
1166 }
1167
1168 /* Query the current sequence number allocations and, if they conflict with the
1169  * requested LDB queue configuration, attempt to re-allocate sequence numbers.
1170  * This is best-effort; if it fails, the PMD will attempt to configure the
1171  * load-balanced queue and return an error.
1172  */
1173 static void
1174 dlb_program_sn_allocation(struct dlb_eventdev *dlb,
1175                           const struct rte_event_queue_conf *queue_conf)
1176 {
1177         int grp_occupancy[DLB_NUM_SN_GROUPS];
1178         int grp_alloc[DLB_NUM_SN_GROUPS];
1179         int i, sequence_numbers;
1180
1181         sequence_numbers = (int)queue_conf->nb_atomic_order_sequences;
1182
1183         for (i = 0; i < DLB_NUM_SN_GROUPS; i++) {
1184                 int total_slots;
1185
1186                 grp_alloc[i] = dlb_get_sn_allocation(dlb, i);
1187                 if (grp_alloc[i] < 0)
1188                         return;
1189
1190                 total_slots = DLB_MAX_LDB_SN_ALLOC / grp_alloc[i];
1191
1192                 grp_occupancy[i] = dlb_get_sn_occupancy(dlb, i);
1193                 if (grp_occupancy[i] < 0)
1194                         return;
1195
1196                 /* DLB has at least one available slot for the requested
1197                  * sequence numbers, so no further configuration required.
1198                  */
1199                 if (grp_alloc[i] == sequence_numbers &&
1200                     grp_occupancy[i] < total_slots)
1201                         return;
1202         }
1203
1204         /* None of the sequence number groups are configured for the requested
1205          * sequence numbers, so we have to reconfigure one of them. This is
1206          * only possible if a group is not in use.
1207          */
1208         for (i = 0; i < DLB_NUM_SN_GROUPS; i++) {
1209                 if (grp_occupancy[i] == 0)
1210                         break;
1211         }
1212
1213         if (i == DLB_NUM_SN_GROUPS) {
1214                 DLB_LOG_ERR("[%s()] No groups with %d sequence_numbers are available or have free slots\n",
1215                        __func__, sequence_numbers);
1216                 return;
1217         }
1218
1219         /* Attempt to configure slot i with the requested number of sequence
1220          * numbers. Ignore the return value -- if this fails, the error will be
1221          * caught during subsequent queue configuration.
1222          */
1223         dlb_set_sn_allocation(dlb, i, sequence_numbers);
1224 }
1225
1226 static int
1227 dlb_eventdev_ldb_queue_setup(struct rte_eventdev *dev,
1228                              struct dlb_eventdev_queue *ev_queue,
1229                              const struct rte_event_queue_conf *queue_conf)
1230 {
1231         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1232         int32_t qm_qid;
1233
1234         if (queue_conf->nb_atomic_order_sequences)
1235                 dlb_program_sn_allocation(dlb, queue_conf);
1236
1237         qm_qid = dlb_hw_create_ldb_queue(dlb,
1238                                          &ev_queue->qm_queue,
1239                                          queue_conf);
1240         if (qm_qid < 0) {
1241                 DLB_LOG_ERR("Failed to create the load-balanced queue\n");
1242
1243                 return qm_qid;
1244         }
1245
1246         dlb->qm_ldb_to_ev_queue_id[qm_qid] = ev_queue->id;
1247
1248         ev_queue->qm_queue.id = qm_qid;
1249
1250         return 0;
1251 }
1252
1253 static int dlb_num_dir_queues_setup(struct dlb_eventdev *dlb)
1254 {
1255         int i, num = 0;
1256
1257         for (i = 0; i < dlb->num_queues; i++) {
1258                 if (dlb->ev_queues[i].setup_done &&
1259                     dlb->ev_queues[i].qm_queue.is_directed)
1260                         num++;
1261         }
1262
1263         return num;
1264 }
1265
1266 static void
1267 dlb_queue_link_teardown(struct dlb_eventdev *dlb,
1268                         struct dlb_eventdev_queue *ev_queue)
1269 {
1270         struct dlb_eventdev_port *ev_port;
1271         int i, j;
1272
1273         for (i = 0; i < dlb->num_ports; i++) {
1274                 ev_port = &dlb->ev_ports[i];
1275
1276                 for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++) {
1277                         if (!ev_port->link[j].valid ||
1278                             ev_port->link[j].queue_id != ev_queue->id)
1279                                 continue;
1280
1281                         ev_port->link[j].valid = false;
1282                         ev_port->num_links--;
1283                 }
1284         }
1285
1286         ev_queue->num_links = 0;
1287 }
1288
1289 static int
1290 dlb_eventdev_queue_setup(struct rte_eventdev *dev,
1291                          uint8_t ev_qid,
1292                          const struct rte_event_queue_conf *queue_conf)
1293 {
1294         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1295         struct dlb_eventdev_queue *ev_queue;
1296         int ret;
1297
1298         if (queue_conf == NULL)
1299                 return -EINVAL;
1300
1301         if (ev_qid >= dlb->num_queues)
1302                 return -EINVAL;
1303
1304         ev_queue = &dlb->ev_queues[ev_qid];
1305
1306         ev_queue->qm_queue.is_directed = queue_conf->event_queue_cfg &
1307                 RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
1308         ev_queue->id = ev_qid;
1309         ev_queue->conf = *queue_conf;
1310
1311         if (!ev_queue->qm_queue.is_directed) {
1312                 ret = dlb_eventdev_ldb_queue_setup(dev, ev_queue, queue_conf);
1313         } else {
1314                 /* The directed queue isn't setup until link time, at which
1315                  * point we know its directed port ID. Directed queue setup
1316                  * will only fail if this queue is already setup or there are
1317                  * no directed queues left to configure.
1318                  */
1319                 ret = 0;
1320
1321                 ev_queue->qm_queue.config_state = DLB_NOT_CONFIGURED;
1322
1323                 if (ev_queue->setup_done ||
1324                     dlb_num_dir_queues_setup(dlb) == dlb->num_dir_queues)
1325                         ret = -EINVAL;
1326         }
1327
1328         /* Tear down pre-existing port->queue links */
1329         if (!ret && dlb->run_state == DLB_RUN_STATE_STOPPED)
1330                 dlb_queue_link_teardown(dlb, ev_queue);
1331
1332         if (!ret)
1333                 ev_queue->setup_done = true;
1334
1335         return ret;
1336 }
1337
1338 static void
1339 dlb_port_link_teardown(struct dlb_eventdev *dlb,
1340                        struct dlb_eventdev_port *ev_port)
1341 {
1342         struct dlb_eventdev_queue *ev_queue;
1343         int i;
1344
1345         for (i = 0; i < DLB_MAX_NUM_QIDS_PER_LDB_CQ; i++) {
1346                 if (!ev_port->link[i].valid)
1347                         continue;
1348
1349                 ev_queue = &dlb->ev_queues[ev_port->link[i].queue_id];
1350
1351                 ev_port->link[i].valid = false;
1352                 ev_port->num_links--;
1353                 ev_queue->num_links--;
1354         }
1355 }
1356
1357 static int
1358 dlb_eventdev_port_setup(struct rte_eventdev *dev,
1359                         uint8_t ev_port_id,
1360                         const struct rte_event_port_conf *port_conf)
1361 {
1362         struct dlb_eventdev *dlb;
1363         struct dlb_eventdev_port *ev_port;
1364         bool use_rsvd_token_scheme;
1365         uint32_t adj_cq_depth;
1366         uint16_t rsvd_tokens;
1367         int ret;
1368
1369         if (dev == NULL || port_conf == NULL) {
1370                 DLB_LOG_ERR("Null parameter\n");
1371                 return -EINVAL;
1372         }
1373
1374         dlb = dlb_pmd_priv(dev);
1375
1376         if (ev_port_id >= DLB_MAX_NUM_PORTS)
1377                 return -EINVAL;
1378
1379         if (port_conf->dequeue_depth >
1380                 evdev_dlb_default_info.max_event_port_dequeue_depth ||
1381             port_conf->enqueue_depth >
1382                 evdev_dlb_default_info.max_event_port_enqueue_depth)
1383                 return -EINVAL;
1384
1385         ev_port = &dlb->ev_ports[ev_port_id];
1386         /* configured? */
1387         if (ev_port->setup_done) {
1388                 DLB_LOG_ERR("evport %d is already configured\n", ev_port_id);
1389                 return -EINVAL;
1390         }
1391
1392         /* The reserved token interrupt arming scheme requires that one or more
1393          * CQ tokens be reserved by the PMD. This limits the amount of CQ space
1394          * usable by the DLB, so in order to give an *effective* CQ depth equal
1395          * to the user-requested value, we double CQ depth and reserve half of
1396          * its tokens. If the user requests the max CQ depth (256) then we
1397          * cannot double it, so we reserve one token and give an effective
1398          * depth of 255 entries.
1399          */
1400         use_rsvd_token_scheme = true;
1401         rsvd_tokens = 1;
1402         adj_cq_depth = port_conf->dequeue_depth;
1403
1404         if (use_rsvd_token_scheme && adj_cq_depth < 256) {
1405                 rsvd_tokens = adj_cq_depth;
1406                 adj_cq_depth *= 2;
1407         }
1408
1409         ev_port->qm_port.is_directed = port_conf->event_port_cfg &
1410                 RTE_EVENT_PORT_CFG_SINGLE_LINK;
1411
1412         if (!ev_port->qm_port.is_directed) {
1413                 ret = dlb_hw_create_ldb_port(dlb,
1414                                              ev_port,
1415                                              port_conf->dequeue_depth,
1416                                              adj_cq_depth,
1417                                              port_conf->enqueue_depth,
1418                                              rsvd_tokens,
1419                                              use_rsvd_token_scheme);
1420                 if (ret < 0) {
1421                         DLB_LOG_ERR("Failed to create the lB port ve portId=%d\n",
1422                                     ev_port_id);
1423                         return ret;
1424                 }
1425         } else {
1426                 ret = dlb_hw_create_dir_port(dlb,
1427                                              ev_port,
1428                                              port_conf->dequeue_depth,
1429                                              adj_cq_depth,
1430                                              port_conf->enqueue_depth,
1431                                              rsvd_tokens,
1432                                              use_rsvd_token_scheme);
1433                 if (ret < 0) {
1434                         DLB_LOG_ERR("Failed to create the DIR port\n");
1435                         return ret;
1436                 }
1437         }
1438
1439         /* Save off port config for reconfig */
1440         dlb->ev_ports[ev_port_id].conf = *port_conf;
1441
1442         dlb->ev_ports[ev_port_id].id = ev_port_id;
1443         dlb->ev_ports[ev_port_id].enq_configured = true;
1444         dlb->ev_ports[ev_port_id].setup_done = true;
1445         dlb->ev_ports[ev_port_id].inflight_max =
1446                 port_conf->new_event_threshold;
1447         dlb->ev_ports[ev_port_id].implicit_release =
1448                 !(port_conf->event_port_cfg &
1449                   RTE_EVENT_PORT_CFG_DISABLE_IMPL_REL);
1450         dlb->ev_ports[ev_port_id].outstanding_releases = 0;
1451         dlb->ev_ports[ev_port_id].inflight_credits = 0;
1452         dlb->ev_ports[ev_port_id].credit_update_quanta =
1453                 RTE_LIBRTE_PMD_DLB_SW_CREDIT_QUANTA;
1454         dlb->ev_ports[ev_port_id].dlb = dlb; /* reverse link */
1455
1456         /* Tear down pre-existing port->queue links */
1457         if (dlb->run_state == DLB_RUN_STATE_STOPPED)
1458                 dlb_port_link_teardown(dlb, &dlb->ev_ports[ev_port_id]);
1459
1460         dev->data->ports[ev_port_id] = &dlb->ev_ports[ev_port_id];
1461
1462         return 0;
1463 }
1464
1465 static int
1466 set_dev_id(const char *key __rte_unused,
1467            const char *value,
1468            void *opaque)
1469 {
1470         int *dev_id = opaque;
1471         int ret;
1472
1473         if (value == NULL || opaque == NULL) {
1474                 DLB_LOG_ERR("NULL pointer\n");
1475                 return -EINVAL;
1476         }
1477
1478         ret = dlb_string_to_int(dev_id, value);
1479         if (ret < 0)
1480                 return ret;
1481
1482         return 0;
1483 }
1484
1485 static int
1486 set_defer_sched(const char *key __rte_unused,
1487                 const char *value,
1488                 void *opaque)
1489 {
1490         int *defer_sched = opaque;
1491
1492         if (value == NULL || opaque == NULL) {
1493                 DLB_LOG_ERR("NULL pointer\n");
1494                 return -EINVAL;
1495         }
1496
1497         if (strncmp(value, "on", 2) != 0) {
1498                 DLB_LOG_ERR("Invalid defer_sched argument \"%s\" (expected \"on\")\n",
1499                             value);
1500                 return -EINVAL;
1501         }
1502
1503         *defer_sched = 1;
1504
1505         return 0;
1506 }
1507
1508 static int
1509 set_num_atm_inflights(const char *key __rte_unused,
1510                       const char *value,
1511                       void *opaque)
1512 {
1513         int *num_atm_inflights = opaque;
1514         int ret;
1515
1516         if (value == NULL || opaque == NULL) {
1517                 DLB_LOG_ERR("NULL pointer\n");
1518                 return -EINVAL;
1519         }
1520
1521         ret = dlb_string_to_int(num_atm_inflights, value);
1522         if (ret < 0)
1523                 return ret;
1524
1525         if (*num_atm_inflights < 0 ||
1526             *num_atm_inflights > DLB_MAX_NUM_ATM_INFLIGHTS) {
1527                 DLB_LOG_ERR("dlb: atm_inflights must be between 0 and %d\n",
1528                             DLB_MAX_NUM_ATM_INFLIGHTS);
1529                 return -EINVAL;
1530         }
1531
1532         return 0;
1533 }
1534
1535 static int
1536 dlb_validate_port_link(struct dlb_eventdev_port *ev_port,
1537                        uint8_t queue_id,
1538                        bool link_exists,
1539                        int index)
1540 {
1541         struct dlb_eventdev *dlb = ev_port->dlb;
1542         struct dlb_eventdev_queue *ev_queue;
1543         bool port_is_dir, queue_is_dir;
1544
1545         if (queue_id > dlb->num_queues) {
1546                 DLB_LOG_ERR("queue_id %d > num queues %d\n",
1547                             queue_id, dlb->num_queues);
1548                 rte_errno = -EINVAL;
1549                 return -1;
1550         }
1551
1552         ev_queue = &dlb->ev_queues[queue_id];
1553
1554         if (!ev_queue->setup_done &&
1555             ev_queue->qm_queue.config_state != DLB_PREV_CONFIGURED) {
1556                 DLB_LOG_ERR("setup not done and not previously configured\n");
1557                 rte_errno = -EINVAL;
1558                 return -1;
1559         }
1560
1561         port_is_dir = ev_port->qm_port.is_directed;
1562         queue_is_dir = ev_queue->qm_queue.is_directed;
1563
1564         if (port_is_dir != queue_is_dir) {
1565                 DLB_LOG_ERR("%s queue %u can't link to %s port %u\n",
1566                             queue_is_dir ? "DIR" : "LDB", ev_queue->id,
1567                             port_is_dir ? "DIR" : "LDB", ev_port->id);
1568
1569                 rte_errno = -EINVAL;
1570                 return -1;
1571         }
1572
1573         /* Check if there is space for the requested link */
1574         if (!link_exists && index == -1) {
1575                 DLB_LOG_ERR("no space for new link\n");
1576                 rte_errno = -ENOSPC;
1577                 return -1;
1578         }
1579
1580         /* Check if the directed port is already linked */
1581         if (ev_port->qm_port.is_directed && ev_port->num_links > 0 &&
1582             !link_exists) {
1583                 DLB_LOG_ERR("Can't link DIR port %d to >1 queues\n",
1584                             ev_port->id);
1585                 rte_errno = -EINVAL;
1586                 return -1;
1587         }
1588
1589         /* Check if the directed queue is already linked */
1590         if (ev_queue->qm_queue.is_directed && ev_queue->num_links > 0 &&
1591             !link_exists) {
1592                 DLB_LOG_ERR("Can't link DIR queue %d to >1 ports\n",
1593                             ev_queue->id);
1594                 rte_errno = -EINVAL;
1595                 return -1;
1596         }
1597
1598         return 0;
1599 }
1600
1601 static int16_t
1602 dlb_hw_map_ldb_qid_to_port(struct dlb_hw_dev *handle,
1603                            uint32_t qm_port_id,
1604                            uint16_t qm_qid,
1605                            uint8_t priority)
1606 {
1607         struct dlb_map_qid_args cfg;
1608         struct dlb_cmd_response response;
1609         int32_t ret;
1610
1611         if (handle == NULL)
1612                 return -EINVAL;
1613
1614         /* Build message */
1615         cfg.response = (uintptr_t)&response;
1616         cfg.port_id = qm_port_id;
1617         cfg.qid = qm_qid;
1618         cfg.priority = EV_TO_DLB_PRIO(priority);
1619
1620         ret = dlb_iface_map_qid(handle, &cfg);
1621         if (ret < 0) {
1622                 DLB_LOG_ERR("dlb: map qid error, ret=%d (driver status: %s)\n",
1623                             ret, dlb_error_strings[response.status]);
1624                 DLB_LOG_ERR("dlb: device_id=%d grp=%d, qm_port=%d, qm_qid=%d prio=%d\n",
1625                             handle->device_id,
1626                             handle->domain_id, cfg.port_id,
1627                             cfg.qid,
1628                             cfg.priority);
1629         } else {
1630                 DLB_LOG_DBG("dlb: mapped queue %d to qm_port %d\n",
1631                             qm_qid, qm_port_id);
1632         }
1633
1634         return ret;
1635 }
1636
1637 static int
1638 dlb_event_queue_join_ldb(struct dlb_eventdev *dlb,
1639                          struct dlb_eventdev_port *ev_port,
1640                          struct dlb_eventdev_queue *ev_queue,
1641                          uint8_t priority)
1642 {
1643         int first_avail = -1;
1644         int ret, i;
1645
1646         for (i = 0; i < DLB_MAX_NUM_QIDS_PER_LDB_CQ; i++) {
1647                 if (ev_port->link[i].valid) {
1648                         if (ev_port->link[i].queue_id == ev_queue->id &&
1649                             ev_port->link[i].priority == priority) {
1650                                 if (ev_port->link[i].mapped)
1651                                         return 0; /* already mapped */
1652                                 first_avail = i;
1653                         }
1654                 } else {
1655                         if (first_avail == -1)
1656                                 first_avail = i;
1657                 }
1658         }
1659         if (first_avail == -1) {
1660                 DLB_LOG_ERR("dlb: qm_port %d has no available QID slots.\n",
1661                             ev_port->qm_port.id);
1662                 return -EINVAL;
1663         }
1664
1665         ret = dlb_hw_map_ldb_qid_to_port(&dlb->qm_instance,
1666                                          ev_port->qm_port.id,
1667                                          ev_queue->qm_queue.id,
1668                                          priority);
1669
1670         if (!ret)
1671                 ev_port->link[first_avail].mapped = true;
1672
1673         return ret;
1674 }
1675
1676 static int32_t
1677 dlb_hw_create_dir_queue(struct dlb_eventdev *dlb, int32_t qm_port_id)
1678 {
1679         struct dlb_hw_dev *handle = &dlb->qm_instance;
1680         struct dlb_create_dir_queue_args cfg;
1681         struct dlb_cmd_response response;
1682         int32_t ret;
1683
1684         cfg.response = (uintptr_t)&response;
1685
1686         /* The directed port is always configured before its queue */
1687         cfg.port_id = qm_port_id;
1688
1689         ret = dlb_iface_dir_queue_create(handle, &cfg);
1690         if (ret < 0) {
1691                 DLB_LOG_ERR("dlb: create DIR event queue error, ret=%d (driver status: %s)\n",
1692                             ret, dlb_error_strings[response.status]);
1693                 return -EINVAL;
1694         }
1695
1696         return response.id;
1697 }
1698
1699 static int
1700 dlb_eventdev_dir_queue_setup(struct dlb_eventdev *dlb,
1701                              struct dlb_eventdev_queue *ev_queue,
1702                              struct dlb_eventdev_port *ev_port)
1703 {
1704         int32_t qm_qid;
1705
1706         qm_qid = dlb_hw_create_dir_queue(dlb, ev_port->qm_port.id);
1707
1708         if (qm_qid < 0) {
1709                 DLB_LOG_ERR("Failed to create the DIR queue\n");
1710                 return qm_qid;
1711         }
1712
1713         dlb->qm_dir_to_ev_queue_id[qm_qid] = ev_queue->id;
1714
1715         ev_queue->qm_queue.id = qm_qid;
1716
1717         return 0;
1718 }
1719
1720 static int
1721 dlb_do_port_link(struct rte_eventdev *dev,
1722                  struct dlb_eventdev_queue *ev_queue,
1723                  struct dlb_eventdev_port *ev_port,
1724                  uint8_t prio)
1725 {
1726         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1727         int err;
1728
1729         /* Don't link until start time. */
1730         if (dlb->run_state == DLB_RUN_STATE_STOPPED)
1731                 return 0;
1732
1733         if (ev_queue->qm_queue.is_directed)
1734                 err = dlb_eventdev_dir_queue_setup(dlb, ev_queue, ev_port);
1735         else
1736                 err = dlb_event_queue_join_ldb(dlb, ev_port, ev_queue, prio);
1737
1738         if (err) {
1739                 DLB_LOG_ERR("port link failure for %s ev_q %d, ev_port %d\n",
1740                             ev_queue->qm_queue.is_directed ? "DIR" : "LDB",
1741                             ev_queue->id, ev_port->id);
1742
1743                 rte_errno = err;
1744                 return -1;
1745         }
1746
1747         return 0;
1748 }
1749
1750 static int
1751 dlb_eventdev_port_link(struct rte_eventdev *dev, void *event_port,
1752                        const uint8_t queues[], const uint8_t priorities[],
1753                        uint16_t nb_links)
1754
1755 {
1756         struct dlb_eventdev_port *ev_port = event_port;
1757         struct dlb_eventdev *dlb;
1758         int i, j;
1759
1760         RTE_SET_USED(dev);
1761
1762         if (ev_port == NULL) {
1763                 DLB_LOG_ERR("dlb: evport not setup\n");
1764                 rte_errno = -EINVAL;
1765                 return 0;
1766         }
1767
1768         if (!ev_port->setup_done &&
1769             ev_port->qm_port.config_state != DLB_PREV_CONFIGURED) {
1770                 DLB_LOG_ERR("dlb: evport not setup\n");
1771                 rte_errno = -EINVAL;
1772                 return 0;
1773         }
1774
1775         /* Note: rte_event_port_link() ensures the PMD won't receive a NULL
1776          * queues pointer.
1777          */
1778         if (nb_links == 0) {
1779                 DLB_LOG_DBG("dlb: nb_links is 0\n");
1780                 return 0; /* Ignore and return success */
1781         }
1782
1783         dlb = ev_port->dlb;
1784
1785         DLB_LOG_DBG("Linking %u queues to %s port %d\n",
1786                     nb_links,
1787                     ev_port->qm_port.is_directed ? "DIR" : "LDB",
1788                     ev_port->id);
1789
1790         for (i = 0; i < nb_links; i++) {
1791                 struct dlb_eventdev_queue *ev_queue;
1792                 uint8_t queue_id, prio;
1793                 bool found = false;
1794                 int index = -1;
1795
1796                 queue_id = queues[i];
1797                 prio = priorities[i];
1798
1799                 /* Check if the link already exists. */
1800                 for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++)
1801                         if (ev_port->link[j].valid) {
1802                                 if (ev_port->link[j].queue_id == queue_id) {
1803                                         found = true;
1804                                         index = j;
1805                                         break;
1806                                 }
1807                         } else {
1808                                 if (index == -1)
1809                                         index = j;
1810                         }
1811
1812                 /* could not link */
1813                 if (index == -1)
1814                         break;
1815
1816                 /* Check if already linked at the requested priority */
1817                 if (found && ev_port->link[j].priority == prio)
1818                         continue;
1819
1820                 if (dlb_validate_port_link(ev_port, queue_id, found, index))
1821                         break; /* return index of offending queue */
1822
1823                 ev_queue = &dlb->ev_queues[queue_id];
1824
1825                 if (dlb_do_port_link(dev, ev_queue, ev_port, prio))
1826                         break; /* return index of offending queue */
1827
1828                 ev_queue->num_links++;
1829
1830                 ev_port->link[index].queue_id = queue_id;
1831                 ev_port->link[index].priority = prio;
1832                 ev_port->link[index].valid = true;
1833                 /* Entry already exists?  If so, then must be prio change */
1834                 if (!found)
1835                         ev_port->num_links++;
1836         }
1837         return i;
1838 }
1839
1840 void
1841 dlb_entry_points_init(struct rte_eventdev *dev)
1842 {
1843         static struct rte_eventdev_ops dlb_eventdev_entry_ops = {
1844                 .dev_infos_get    = dlb_eventdev_info_get,
1845                 .dev_configure    = dlb_eventdev_configure,
1846                 .queue_def_conf   = dlb_eventdev_queue_default_conf_get,
1847                 .port_def_conf    = dlb_eventdev_port_default_conf_get,
1848                 .queue_setup      = dlb_eventdev_queue_setup,
1849                 .port_setup       = dlb_eventdev_port_setup,
1850                 .port_link        = dlb_eventdev_port_link,
1851                 .dump             = dlb_eventdev_dump,
1852                 .xstats_get       = dlb_eventdev_xstats_get,
1853                 .xstats_get_names = dlb_eventdev_xstats_get_names,
1854                 .xstats_get_by_name = dlb_eventdev_xstats_get_by_name,
1855                 .xstats_reset       = dlb_eventdev_xstats_reset,
1856         };
1857
1858         /* Expose PMD's eventdev interface */
1859         dev->dev_ops = &dlb_eventdev_entry_ops;
1860 }
1861
1862 int
1863 dlb_primary_eventdev_probe(struct rte_eventdev *dev,
1864                            const char *name,
1865                            struct dlb_devargs *dlb_args)
1866 {
1867         struct dlb_eventdev *dlb;
1868         int err;
1869
1870         dlb = dev->data->dev_private;
1871
1872         dlb->event_dev = dev; /* backlink */
1873
1874         evdev_dlb_default_info.driver_name = name;
1875
1876         dlb->max_num_events_override = dlb_args->max_num_events;
1877         dlb->num_dir_credits_override = dlb_args->num_dir_credits_override;
1878         dlb->defer_sched = dlb_args->defer_sched;
1879         dlb->num_atm_inflights_per_queue = dlb_args->num_atm_inflights;
1880
1881         /* Open the interface.
1882          * For vdev mode, this means open the dlb kernel module.
1883          */
1884         err = dlb_iface_open(&dlb->qm_instance, name);
1885         if (err < 0) {
1886                 DLB_LOG_ERR("could not open event hardware device, err=%d\n",
1887                             err);
1888                 return err;
1889         }
1890
1891         err = dlb_iface_get_device_version(&dlb->qm_instance, &dlb->revision);
1892         if (err < 0) {
1893                 DLB_LOG_ERR("dlb: failed to get the device version, err=%d\n",
1894                             err);
1895                 return err;
1896         }
1897
1898         err = dlb_hw_query_resources(dlb);
1899         if (err) {
1900                 DLB_LOG_ERR("get resources err=%d for %s\n", err, name);
1901                 return err;
1902         }
1903
1904         err = dlb_iface_get_cq_poll_mode(&dlb->qm_instance, &dlb->poll_mode);
1905         if (err < 0) {
1906                 DLB_LOG_ERR("dlb: failed to get the poll mode, err=%d\n", err);
1907                 return err;
1908         }
1909
1910         /* Complete xtstats runtime initialization */
1911         err = dlb_xstats_init(dlb);
1912         if (err) {
1913                 DLB_LOG_ERR("dlb: failed to init xstats, err=%d\n", err);
1914                 return err;
1915         }
1916
1917         rte_spinlock_init(&dlb->qm_instance.resource_lock);
1918
1919         dlb_iface_low_level_io_init(dlb);
1920
1921         dlb_entry_points_init(dev);
1922
1923         return 0;
1924 }
1925
1926 int
1927 dlb_secondary_eventdev_probe(struct rte_eventdev *dev,
1928                              const char *name)
1929 {
1930         struct dlb_eventdev *dlb;
1931         int err;
1932
1933         dlb = dev->data->dev_private;
1934
1935         evdev_dlb_default_info.driver_name = name;
1936
1937         err = dlb_iface_open(&dlb->qm_instance, name);
1938         if (err < 0) {
1939                 DLB_LOG_ERR("could not open event hardware device, err=%d\n",
1940                             err);
1941                 return err;
1942         }
1943
1944         err = dlb_hw_query_resources(dlb);
1945         if (err) {
1946                 DLB_LOG_ERR("get resources err=%d for %s\n", err, name);
1947                 return err;
1948         }
1949
1950         dlb_iface_low_level_io_init(dlb);
1951
1952         dlb_entry_points_init(dev);
1953
1954         return 0;
1955 }
1956
1957 int
1958 dlb_parse_params(const char *params,
1959                  const char *name,
1960                  struct dlb_devargs *dlb_args)
1961 {
1962         int ret = 0;
1963         static const char * const args[] = { NUMA_NODE_ARG,
1964                                              DLB_MAX_NUM_EVENTS,
1965                                              DLB_NUM_DIR_CREDITS,
1966                                              DEV_ID_ARG,
1967                                              DLB_DEFER_SCHED_ARG,
1968                                              DLB_NUM_ATM_INFLIGHTS_ARG,
1969                                              NULL };
1970
1971         if (params && params[0] != '\0') {
1972                 struct rte_kvargs *kvlist = rte_kvargs_parse(params, args);
1973
1974                 if (kvlist == NULL) {
1975                         DLB_LOG_INFO("Ignoring unsupported parameters when creating device '%s'\n",
1976                                      name);
1977                 } else {
1978                         int ret = rte_kvargs_process(kvlist, NUMA_NODE_ARG,
1979                                                      set_numa_node,
1980                                                      &dlb_args->socket_id);
1981                         if (ret != 0) {
1982                                 DLB_LOG_ERR("%s: Error parsing numa node parameter",
1983                                             name);
1984                                 rte_kvargs_free(kvlist);
1985                                 return ret;
1986                         }
1987
1988                         ret = rte_kvargs_process(kvlist, DLB_MAX_NUM_EVENTS,
1989                                                  set_max_num_events,
1990                                                  &dlb_args->max_num_events);
1991                         if (ret != 0) {
1992                                 DLB_LOG_ERR("%s: Error parsing max_num_events parameter",
1993                                             name);
1994                                 rte_kvargs_free(kvlist);
1995                                 return ret;
1996                         }
1997
1998                         ret = rte_kvargs_process(kvlist,
1999                                         DLB_NUM_DIR_CREDITS,
2000                                         set_num_dir_credits,
2001                                         &dlb_args->num_dir_credits_override);
2002                         if (ret != 0) {
2003                                 DLB_LOG_ERR("%s: Error parsing num_dir_credits parameter",
2004                                             name);
2005                                 rte_kvargs_free(kvlist);
2006                                 return ret;
2007                         }
2008
2009                         ret = rte_kvargs_process(kvlist, DEV_ID_ARG,
2010                                                  set_dev_id,
2011                                                  &dlb_args->dev_id);
2012                         if (ret != 0) {
2013                                 DLB_LOG_ERR("%s: Error parsing dev_id parameter",
2014                                             name);
2015                                 rte_kvargs_free(kvlist);
2016                                 return ret;
2017                         }
2018
2019                         ret = rte_kvargs_process(kvlist, DLB_DEFER_SCHED_ARG,
2020                                                  set_defer_sched,
2021                                                  &dlb_args->defer_sched);
2022                         if (ret != 0) {
2023                                 DLB_LOG_ERR("%s: Error parsing defer_sched parameter",
2024                                             name);
2025                                 rte_kvargs_free(kvlist);
2026                                 return ret;
2027                         }
2028
2029                         ret = rte_kvargs_process(kvlist,
2030                                                  DLB_NUM_ATM_INFLIGHTS_ARG,
2031                                                  set_num_atm_inflights,
2032                                                  &dlb_args->num_atm_inflights);
2033                         if (ret != 0) {
2034                                 DLB_LOG_ERR("%s: Error parsing atm_inflights parameter",
2035                                             name);
2036                                 rte_kvargs_free(kvlist);
2037                                 return ret;
2038                         }
2039
2040                         rte_kvargs_free(kvlist);
2041                 }
2042         }
2043         return ret;
2044 }
2045 RTE_LOG_REGISTER(eventdev_dlb_log_level, pmd.event.dlb, NOTICE);