event/dlb: add token pop API
[dpdk.git] / drivers / event / dlb / dlb.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2020 Intel Corporation
3  */
4
5 #include <assert.h>
6 #include <errno.h>
7 #include <nmmintrin.h>
8 #include <pthread.h>
9 #include <stdbool.h>
10 #include <stdint.h>
11 #include <stdio.h>
12 #include <string.h>
13 #include <sys/fcntl.h>
14 #include <sys/mman.h>
15 #include <unistd.h>
16
17 #include <rte_common.h>
18 #include <rte_config.h>
19 #include <rte_cycles.h>
20 #include <rte_debug.h>
21 #include <rte_dev.h>
22 #include <rte_errno.h>
23 #include <rte_io.h>
24 #include <rte_kvargs.h>
25 #include <rte_log.h>
26 #include <rte_malloc.h>
27 #include <rte_mbuf.h>
28 #include <rte_power_intrinsics.h>
29 #include <rte_prefetch.h>
30 #include <rte_ring.h>
31 #include <rte_string_fns.h>
32
33 #include <rte_eventdev.h>
34 #include <rte_eventdev_pmd.h>
35
36 #include "dlb_priv.h"
37 #include "dlb_iface.h"
38 #include "dlb_inline_fns.h"
39
40 /*
41  * Resources exposed to eventdev.
42  */
43 #if (RTE_EVENT_MAX_QUEUES_PER_DEV > UINT8_MAX)
44 #error "RTE_EVENT_MAX_QUEUES_PER_DEV cannot fit in member max_event_queues"
45 #endif
46 static struct rte_event_dev_info evdev_dlb_default_info = {
47         .driver_name = "", /* probe will set */
48         .min_dequeue_timeout_ns = DLB_MIN_DEQUEUE_TIMEOUT_NS,
49         .max_dequeue_timeout_ns = DLB_MAX_DEQUEUE_TIMEOUT_NS,
50 #if (RTE_EVENT_MAX_QUEUES_PER_DEV < DLB_MAX_NUM_LDB_QUEUES)
51         .max_event_queues = RTE_EVENT_MAX_QUEUES_PER_DEV,
52 #else
53         .max_event_queues = DLB_MAX_NUM_LDB_QUEUES,
54 #endif
55         .max_event_queue_flows = DLB_MAX_NUM_FLOWS,
56         .max_event_queue_priority_levels = DLB_QID_PRIORITIES,
57         .max_event_priority_levels = DLB_QID_PRIORITIES,
58         .max_event_ports = DLB_MAX_NUM_LDB_PORTS,
59         .max_event_port_dequeue_depth = DLB_MAX_CQ_DEPTH,
60         .max_event_port_enqueue_depth = DLB_MAX_ENQUEUE_DEPTH,
61         .max_event_port_links = DLB_MAX_NUM_QIDS_PER_LDB_CQ,
62         .max_num_events = DLB_MAX_NUM_LDB_CREDITS,
63         .max_single_link_event_port_queue_pairs = DLB_MAX_NUM_DIR_PORTS,
64         .event_dev_cap = (RTE_EVENT_DEV_CAP_QUEUE_QOS |
65                           RTE_EVENT_DEV_CAP_EVENT_QOS |
66                           RTE_EVENT_DEV_CAP_BURST_MODE |
67                           RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED |
68                           RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE |
69                           RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES),
70 };
71
72 struct process_local_port_data
73 dlb_port[DLB_MAX_NUM_PORTS][NUM_DLB_PORT_TYPES];
74
75 static inline uint16_t
76 dlb_event_enqueue_delayed(void *event_port,
77                           const struct rte_event events[]);
78
79 static inline uint16_t
80 dlb_event_enqueue_burst_delayed(void *event_port,
81                                 const struct rte_event events[],
82                                 uint16_t num);
83
84 static inline uint16_t
85 dlb_event_enqueue_new_burst_delayed(void *event_port,
86                                     const struct rte_event events[],
87                                     uint16_t num);
88
89 static inline uint16_t
90 dlb_event_enqueue_forward_burst_delayed(void *event_port,
91                                         const struct rte_event events[],
92                                         uint16_t num);
93
94 static int
95 dlb_hw_query_resources(struct dlb_eventdev *dlb)
96 {
97         struct dlb_hw_dev *handle = &dlb->qm_instance;
98         struct dlb_hw_resource_info *dlb_info = &handle->info;
99         int ret;
100
101         ret = dlb_iface_get_num_resources(handle,
102                                           &dlb->hw_rsrc_query_results);
103         if (ret) {
104                 DLB_LOG_ERR("get dlb num resources, err=%d\n", ret);
105                 return ret;
106         }
107
108         /* Complete filling in device resource info returned to evdev app,
109          * overriding any default values.
110          * The capabilities (CAPs) were set at compile time.
111          */
112
113         evdev_dlb_default_info.max_event_queues =
114                 dlb->hw_rsrc_query_results.num_ldb_queues;
115
116         evdev_dlb_default_info.max_event_ports =
117                 dlb->hw_rsrc_query_results.num_ldb_ports;
118
119         evdev_dlb_default_info.max_num_events =
120                 dlb->hw_rsrc_query_results.max_contiguous_ldb_credits;
121
122         /* Save off values used when creating the scheduling domain. */
123
124         handle->info.num_sched_domains =
125                 dlb->hw_rsrc_query_results.num_sched_domains;
126
127         handle->info.hw_rsrc_max.nb_events_limit =
128                 dlb->hw_rsrc_query_results.max_contiguous_ldb_credits;
129
130         handle->info.hw_rsrc_max.num_queues =
131                 dlb->hw_rsrc_query_results.num_ldb_queues +
132                 dlb->hw_rsrc_query_results.num_dir_ports;
133
134         handle->info.hw_rsrc_max.num_ldb_queues =
135                 dlb->hw_rsrc_query_results.num_ldb_queues;
136
137         handle->info.hw_rsrc_max.num_ldb_ports =
138                 dlb->hw_rsrc_query_results.num_ldb_ports;
139
140         handle->info.hw_rsrc_max.num_dir_ports =
141                 dlb->hw_rsrc_query_results.num_dir_ports;
142
143         handle->info.hw_rsrc_max.reorder_window_size =
144                 dlb->hw_rsrc_query_results.num_hist_list_entries;
145
146         rte_memcpy(dlb_info, &handle->info.hw_rsrc_max, sizeof(*dlb_info));
147
148         return 0;
149 }
150
151 static void
152 dlb_free_qe_mem(struct dlb_port *qm_port)
153 {
154         if (qm_port == NULL)
155                 return;
156
157         rte_free(qm_port->qe4);
158         qm_port->qe4 = NULL;
159
160         rte_free(qm_port->consume_qe);
161         qm_port->consume_qe = NULL;
162 }
163
164 static int
165 dlb_init_consume_qe(struct dlb_port *qm_port, char *mz_name)
166 {
167         struct dlb_cq_pop_qe *qe;
168
169         qe = rte_zmalloc(mz_name,
170                         DLB_NUM_QES_PER_CACHE_LINE *
171                                 sizeof(struct dlb_cq_pop_qe),
172                         RTE_CACHE_LINE_SIZE);
173
174         if (qe == NULL) {
175                 DLB_LOG_ERR("dlb: no memory for consume_qe\n");
176                 return -ENOMEM;
177         }
178
179         qm_port->consume_qe = qe;
180
181         qe->qe_valid = 0;
182         qe->qe_frag = 0;
183         qe->qe_comp = 0;
184         qe->cq_token = 1;
185         /* Tokens value is 0-based; i.e. '0' returns 1 token, '1' returns 2,
186          * and so on.
187          */
188         qe->tokens = 0; /* set at run time */
189         qe->meas_lat = 0;
190         qe->no_dec = 0;
191         /* Completion IDs are disabled */
192         qe->cmp_id = 0;
193
194         return 0;
195 }
196
197 static int
198 dlb_init_qe_mem(struct dlb_port *qm_port, char *mz_name)
199 {
200         int ret, sz;
201
202         sz = DLB_NUM_QES_PER_CACHE_LINE * sizeof(struct dlb_enqueue_qe);
203
204         qm_port->qe4 = rte_zmalloc(mz_name, sz, RTE_CACHE_LINE_SIZE);
205
206         if (qm_port->qe4 == NULL) {
207                 DLB_LOG_ERR("dlb: no qe4 memory\n");
208                 ret = -ENOMEM;
209                 goto error_exit;
210         }
211
212         ret = dlb_init_consume_qe(qm_port, mz_name);
213         if (ret < 0) {
214                 DLB_LOG_ERR("dlb: dlb_init_consume_qe ret=%d\n", ret);
215                 goto error_exit;
216         }
217
218         return 0;
219
220 error_exit:
221
222         dlb_free_qe_mem(qm_port);
223
224         return ret;
225 }
226
227 /* Wrapper for string to int conversion. Substituted for atoi(...), which is
228  * unsafe.
229  */
230 #define DLB_BASE_10 10
231
232 static int
233 dlb_string_to_int(int *result, const char *str)
234 {
235         long ret;
236         char *endstr;
237
238         if (str == NULL || result == NULL)
239                 return -EINVAL;
240
241         errno = 0;
242         ret = strtol(str, &endstr, DLB_BASE_10);
243         if (errno)
244                 return -errno;
245
246         /* long int and int may be different width for some architectures */
247         if (ret < INT_MIN || ret > INT_MAX || endstr == str)
248                 return -EINVAL;
249
250         *result = ret;
251         return 0;
252 }
253
254 static int
255 set_numa_node(const char *key __rte_unused, const char *value, void *opaque)
256 {
257         int *socket_id = opaque;
258         int ret;
259
260         ret = dlb_string_to_int(socket_id, value);
261         if (ret < 0)
262                 return ret;
263
264         if (*socket_id > RTE_MAX_NUMA_NODES)
265                 return -EINVAL;
266
267         return 0;
268 }
269
270 static int
271 set_max_num_events(const char *key __rte_unused,
272                    const char *value,
273                    void *opaque)
274 {
275         int *max_num_events = opaque;
276         int ret;
277
278         if (value == NULL || opaque == NULL) {
279                 DLB_LOG_ERR("NULL pointer\n");
280                 return -EINVAL;
281         }
282
283         ret = dlb_string_to_int(max_num_events, value);
284         if (ret < 0)
285                 return ret;
286
287         if (*max_num_events < 0 || *max_num_events > DLB_MAX_NUM_LDB_CREDITS) {
288                 DLB_LOG_ERR("dlb: max_num_events must be between 0 and %d\n",
289                             DLB_MAX_NUM_LDB_CREDITS);
290                 return -EINVAL;
291         }
292
293         return 0;
294 }
295
296 static int
297 set_num_dir_credits(const char *key __rte_unused,
298                     const char *value,
299                     void *opaque)
300 {
301         int *num_dir_credits = opaque;
302         int ret;
303
304         if (value == NULL || opaque == NULL) {
305                 DLB_LOG_ERR("NULL pointer\n");
306                 return -EINVAL;
307         }
308
309         ret = dlb_string_to_int(num_dir_credits, value);
310         if (ret < 0)
311                 return ret;
312
313         if (*num_dir_credits < 0 ||
314             *num_dir_credits > DLB_MAX_NUM_DIR_CREDITS) {
315                 DLB_LOG_ERR("dlb: num_dir_credits must be between 0 and %d\n",
316                             DLB_MAX_NUM_DIR_CREDITS);
317                 return -EINVAL;
318         }
319         return 0;
320 }
321
322 /* VDEV-only notes:
323  * This function first unmaps all memory mappings and closes the
324  * domain's file descriptor, which causes the driver to reset the
325  * scheduling domain. Once that completes (when close() returns), we
326  * can safely free the dynamically allocated memory used by the
327  * scheduling domain.
328  *
329  * PF-only notes:
330  * We will maintain a use count and use that to determine when
331  * a reset is required.  In PF mode, we never mmap, or munmap
332  * device memory,  and we own the entire physical PCI device.
333  */
334
335 static void
336 dlb_hw_reset_sched_domain(const struct rte_eventdev *dev, bool reconfig)
337 {
338         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
339         enum dlb_configuration_state config_state;
340         int i, j;
341
342         /* Close and reset the domain */
343         dlb_iface_domain_close(dlb);
344
345         /* Free all dynamically allocated port memory */
346         for (i = 0; i < dlb->num_ports; i++)
347                 dlb_free_qe_mem(&dlb->ev_ports[i].qm_port);
348
349         /* If reconfiguring, mark the device's queues and ports as "previously
350          * configured." If the user does not reconfigure them, the PMD will
351          * reapply their previous configuration when the device is started.
352          */
353         config_state = (reconfig) ? DLB_PREV_CONFIGURED : DLB_NOT_CONFIGURED;
354
355         for (i = 0; i < dlb->num_ports; i++) {
356                 dlb->ev_ports[i].qm_port.config_state = config_state;
357                 /* Reset setup_done so ports can be reconfigured */
358                 dlb->ev_ports[i].setup_done = false;
359                 for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++)
360                         dlb->ev_ports[i].link[j].mapped = false;
361         }
362
363         for (i = 0; i < dlb->num_queues; i++)
364                 dlb->ev_queues[i].qm_queue.config_state = config_state;
365
366         for (i = 0; i < DLB_MAX_NUM_QUEUES; i++)
367                 dlb->ev_queues[i].setup_done = false;
368
369         dlb->num_ports = 0;
370         dlb->num_ldb_ports = 0;
371         dlb->num_dir_ports = 0;
372         dlb->num_queues = 0;
373         dlb->num_ldb_queues = 0;
374         dlb->num_dir_queues = 0;
375         dlb->configured = false;
376 }
377
378 static int
379 dlb_ldb_credit_pool_create(struct dlb_hw_dev *handle)
380 {
381         struct dlb_create_ldb_pool_args cfg;
382         struct dlb_cmd_response response;
383         int ret;
384
385         if (handle == NULL)
386                 return -EINVAL;
387
388         if (!handle->cfg.resources.num_ldb_credits) {
389                 handle->cfg.ldb_credit_pool_id = 0;
390                 handle->cfg.num_ldb_credits = 0;
391                 return 0;
392         }
393
394         cfg.response = (uintptr_t)&response;
395         cfg.num_ldb_credits = handle->cfg.resources.num_ldb_credits;
396
397         ret = dlb_iface_ldb_credit_pool_create(handle,
398                                                &cfg);
399         if (ret < 0) {
400                 DLB_LOG_ERR("dlb: ldb_credit_pool_create ret=%d (driver status: %s)\n",
401                             ret, dlb_error_strings[response.status]);
402         }
403
404         handle->cfg.ldb_credit_pool_id = response.id;
405         handle->cfg.num_ldb_credits = cfg.num_ldb_credits;
406
407         return ret;
408 }
409
410 static int
411 dlb_dir_credit_pool_create(struct dlb_hw_dev *handle)
412 {
413         struct dlb_create_dir_pool_args cfg;
414         struct dlb_cmd_response response;
415         int ret;
416
417         if (handle == NULL)
418                 return -EINVAL;
419
420         if (!handle->cfg.resources.num_dir_credits) {
421                 handle->cfg.dir_credit_pool_id = 0;
422                 handle->cfg.num_dir_credits = 0;
423                 return 0;
424         }
425
426         cfg.response = (uintptr_t)&response;
427         cfg.num_dir_credits = handle->cfg.resources.num_dir_credits;
428
429         ret = dlb_iface_dir_credit_pool_create(handle, &cfg);
430         if (ret < 0)
431                 DLB_LOG_ERR("dlb: dir_credit_pool_create ret=%d (driver status: %s)\n",
432                             ret, dlb_error_strings[response.status]);
433
434         handle->cfg.dir_credit_pool_id = response.id;
435         handle->cfg.num_dir_credits = cfg.num_dir_credits;
436
437         return ret;
438 }
439
440 static int
441 dlb_hw_create_sched_domain(struct dlb_hw_dev *handle,
442                            struct dlb_eventdev *dlb,
443                            const struct dlb_hw_rsrcs *resources_asked)
444 {
445         int ret = 0;
446         struct dlb_create_sched_domain_args *config_params;
447         struct dlb_cmd_response response;
448
449         if (resources_asked == NULL) {
450                 DLB_LOG_ERR("dlb: dlb_create NULL parameter\n");
451                 ret = EINVAL;
452                 goto error_exit;
453         }
454
455         /* Map generic qm resources to dlb resources */
456         config_params = &handle->cfg.resources;
457
458         config_params->response = (uintptr_t)&response;
459
460         /* DIR ports and queues */
461
462         config_params->num_dir_ports =
463                 resources_asked->num_dir_ports;
464
465         config_params->num_dir_credits =
466                 resources_asked->num_dir_credits;
467
468         /* LDB ports and queues */
469
470         config_params->num_ldb_queues =
471                 resources_asked->num_ldb_queues;
472
473         config_params->num_ldb_ports =
474                 resources_asked->num_ldb_ports;
475
476         config_params->num_ldb_credits =
477                 resources_asked->num_ldb_credits;
478
479         config_params->num_atomic_inflights =
480                 dlb->num_atm_inflights_per_queue *
481                 config_params->num_ldb_queues;
482
483         config_params->num_hist_list_entries = config_params->num_ldb_ports *
484                 DLB_NUM_HIST_LIST_ENTRIES_PER_LDB_PORT;
485
486         /* dlb limited to 1 credit pool per queue type */
487         config_params->num_ldb_credit_pools = 1;
488         config_params->num_dir_credit_pools = 1;
489
490         DLB_LOG_DBG("sched domain create - ldb_qs=%d, ldb_ports=%d, dir_ports=%d, atomic_inflights=%d, hist_list_entries=%d, ldb_credits=%d, dir_credits=%d, ldb_cred_pools=%d, dir-credit_pools=%d\n",
491                     config_params->num_ldb_queues,
492                     config_params->num_ldb_ports,
493                     config_params->num_dir_ports,
494                     config_params->num_atomic_inflights,
495                     config_params->num_hist_list_entries,
496                     config_params->num_ldb_credits,
497                     config_params->num_dir_credits,
498                     config_params->num_ldb_credit_pools,
499                     config_params->num_dir_credit_pools);
500
501         /* Configure the QM */
502
503         ret = dlb_iface_sched_domain_create(handle, config_params);
504         if (ret < 0) {
505                 DLB_LOG_ERR("dlb: domain create failed, device_id = %d, (driver ret = %d, extra status: %s)\n",
506                             handle->device_id,
507                             ret,
508                             dlb_error_strings[response.status]);
509                 goto error_exit;
510         }
511
512         handle->domain_id = response.id;
513         handle->domain_id_valid = 1;
514
515         config_params->response = 0;
516
517         ret = dlb_ldb_credit_pool_create(handle);
518         if (ret < 0) {
519                 DLB_LOG_ERR("dlb: create ldb credit pool failed\n");
520                 goto error_exit2;
521         }
522
523         ret = dlb_dir_credit_pool_create(handle);
524         if (ret < 0) {
525                 DLB_LOG_ERR("dlb: create dir credit pool failed\n");
526                 goto error_exit2;
527         }
528
529         handle->cfg.configured = true;
530
531         return 0;
532
533 error_exit2:
534         dlb_iface_domain_close(dlb);
535
536 error_exit:
537         return ret;
538 }
539
540 /* End HW specific */
541 static void
542 dlb_eventdev_info_get(struct rte_eventdev *dev,
543                       struct rte_event_dev_info *dev_info)
544 {
545         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
546         int ret;
547
548         ret = dlb_hw_query_resources(dlb);
549         if (ret) {
550                 const struct rte_eventdev_data *data = dev->data;
551
552                 DLB_LOG_ERR("get resources err=%d, devid=%d\n",
553                             ret, data->dev_id);
554                 /* fn is void, so fall through and return values set up in
555                  * probe
556                  */
557         }
558
559         /* Add num resources currently owned by this domain.
560          * These would become available if the scheduling domain were reset due
561          * to the application recalling eventdev_configure to *reconfigure* the
562          * domain.
563          */
564         evdev_dlb_default_info.max_event_ports += dlb->num_ldb_ports;
565         evdev_dlb_default_info.max_event_queues += dlb->num_ldb_queues;
566         evdev_dlb_default_info.max_num_events += dlb->num_ldb_credits;
567
568         /* In DLB A-stepping hardware, applications are limited to 128
569          * configured ports (load-balanced or directed). The reported number of
570          * available ports must reflect this.
571          */
572         if (dlb->revision < DLB_REV_B0) {
573                 int used_ports;
574
575                 used_ports = DLB_MAX_NUM_LDB_PORTS + DLB_MAX_NUM_DIR_PORTS -
576                         dlb->hw_rsrc_query_results.num_ldb_ports -
577                         dlb->hw_rsrc_query_results.num_dir_ports;
578
579                 evdev_dlb_default_info.max_event_ports =
580                         RTE_MIN(evdev_dlb_default_info.max_event_ports,
581                                 128 - used_ports);
582         }
583
584         evdev_dlb_default_info.max_event_queues =
585                 RTE_MIN(evdev_dlb_default_info.max_event_queues,
586                         RTE_EVENT_MAX_QUEUES_PER_DEV);
587
588         evdev_dlb_default_info.max_num_events =
589                 RTE_MIN(evdev_dlb_default_info.max_num_events,
590                         dlb->max_num_events_override);
591
592         *dev_info = evdev_dlb_default_info;
593 }
594
595 /* Note: 1 QM instance per QM device, QM instance/device == event device */
596 static int
597 dlb_eventdev_configure(const struct rte_eventdev *dev)
598 {
599         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
600         struct dlb_hw_dev *handle = &dlb->qm_instance;
601         struct dlb_hw_rsrcs *rsrcs = &handle->info.hw_rsrc_max;
602         const struct rte_eventdev_data *data = dev->data;
603         const struct rte_event_dev_config *config = &data->dev_conf;
604         int ret;
605
606         /* If this eventdev is already configured, we must release the current
607          * scheduling domain before attempting to configure a new one.
608          */
609         if (dlb->configured) {
610                 dlb_hw_reset_sched_domain(dev, true);
611
612                 ret = dlb_hw_query_resources(dlb);
613                 if (ret) {
614                         DLB_LOG_ERR("get resources err=%d, devid=%d\n",
615                                     ret, data->dev_id);
616                         return ret;
617                 }
618         }
619
620         if (config->nb_event_queues > rsrcs->num_queues) {
621                 DLB_LOG_ERR("nb_event_queues parameter (%d) exceeds the QM device's capabilities (%d).\n",
622                             config->nb_event_queues,
623                             rsrcs->num_queues);
624                 return -EINVAL;
625         }
626         if (config->nb_event_ports > (rsrcs->num_ldb_ports
627                         + rsrcs->num_dir_ports)) {
628                 DLB_LOG_ERR("nb_event_ports parameter (%d) exceeds the QM device's capabilities (%d).\n",
629                             config->nb_event_ports,
630                             (rsrcs->num_ldb_ports + rsrcs->num_dir_ports));
631                 return -EINVAL;
632         }
633         if (config->nb_events_limit > rsrcs->nb_events_limit) {
634                 DLB_LOG_ERR("nb_events_limit parameter (%d) exceeds the QM device's capabilities (%d).\n",
635                             config->nb_events_limit,
636                             rsrcs->nb_events_limit);
637                 return -EINVAL;
638         }
639
640         if (config->event_dev_cfg & RTE_EVENT_DEV_CFG_PER_DEQUEUE_TIMEOUT)
641                 dlb->global_dequeue_wait = false;
642         else {
643                 uint32_t timeout32;
644
645                 dlb->global_dequeue_wait = true;
646
647                 timeout32 = config->dequeue_timeout_ns;
648
649                 dlb->global_dequeue_wait_ticks =
650                         timeout32 * (rte_get_timer_hz() / 1E9);
651         }
652
653         /* Does this platform support umonitor/umwait? */
654         if (rte_cpu_get_flag_enabled(RTE_CPUFLAG_WAITPKG)) {
655                 if (RTE_LIBRTE_PMD_DLB_UMWAIT_CTL_STATE != 0 &&
656                     RTE_LIBRTE_PMD_DLB_UMWAIT_CTL_STATE != 1) {
657                         DLB_LOG_ERR("invalid value (%d) for RTE_LIBRTE_PMD_DLB_UMWAIT_CTL_STATE must be 0 or 1.\n",
658                                     RTE_LIBRTE_PMD_DLB_UMWAIT_CTL_STATE);
659                         return -EINVAL;
660                 }
661                 dlb->umwait_allowed = true;
662         }
663
664         rsrcs->num_dir_ports = config->nb_single_link_event_port_queues;
665         rsrcs->num_ldb_ports = config->nb_event_ports - rsrcs->num_dir_ports;
666         /* 1 dir queue per dir port */
667         rsrcs->num_ldb_queues = config->nb_event_queues - rsrcs->num_dir_ports;
668
669         /* Scale down nb_events_limit by 4 for directed credits, since there
670          * are 4x as many load-balanced credits.
671          */
672         rsrcs->num_ldb_credits = 0;
673         rsrcs->num_dir_credits = 0;
674
675         if (rsrcs->num_ldb_queues)
676                 rsrcs->num_ldb_credits = config->nb_events_limit;
677         if (rsrcs->num_dir_ports)
678                 rsrcs->num_dir_credits = config->nb_events_limit / 4;
679         if (dlb->num_dir_credits_override != -1)
680                 rsrcs->num_dir_credits = dlb->num_dir_credits_override;
681
682         if (dlb_hw_create_sched_domain(handle, dlb, rsrcs) < 0) {
683                 DLB_LOG_ERR("dlb_hw_create_sched_domain failed\n");
684                 return -ENODEV;
685         }
686
687         dlb->new_event_limit = config->nb_events_limit;
688         __atomic_store_n(&dlb->inflights, 0, __ATOMIC_SEQ_CST);
689
690         /* Save number of ports/queues for this event dev */
691         dlb->num_ports = config->nb_event_ports;
692         dlb->num_queues = config->nb_event_queues;
693         dlb->num_dir_ports = rsrcs->num_dir_ports;
694         dlb->num_ldb_ports = dlb->num_ports - dlb->num_dir_ports;
695         dlb->num_ldb_queues = dlb->num_queues - dlb->num_dir_ports;
696         dlb->num_dir_queues = dlb->num_dir_ports;
697         dlb->num_ldb_credits = rsrcs->num_ldb_credits;
698         dlb->num_dir_credits = rsrcs->num_dir_credits;
699
700         dlb->configured = true;
701
702         return 0;
703 }
704
705 static int16_t
706 dlb_hw_unmap_ldb_qid_from_port(struct dlb_hw_dev *handle,
707                                uint32_t qm_port_id,
708                                uint16_t qm_qid)
709 {
710         struct dlb_unmap_qid_args cfg;
711         struct dlb_cmd_response response;
712         int32_t ret;
713
714         if (handle == NULL)
715                 return -EINVAL;
716
717         cfg.response = (uintptr_t)&response;
718         cfg.port_id = qm_port_id;
719         cfg.qid = qm_qid;
720
721         ret = dlb_iface_unmap_qid(handle, &cfg);
722         if (ret < 0)
723                 DLB_LOG_ERR("dlb: unmap qid error, ret=%d (driver status: %s)\n",
724                             ret, dlb_error_strings[response.status]);
725
726         return ret;
727 }
728
729 static int
730 dlb_event_queue_detach_ldb(struct dlb_eventdev *dlb,
731                            struct dlb_eventdev_port *ev_port,
732                            struct dlb_eventdev_queue *ev_queue)
733 {
734         int ret, i;
735
736         /* Don't unlink until start time. */
737         if (dlb->run_state == DLB_RUN_STATE_STOPPED)
738                 return 0;
739
740         for (i = 0; i < DLB_MAX_NUM_QIDS_PER_LDB_CQ; i++) {
741                 if (ev_port->link[i].valid &&
742                     ev_port->link[i].queue_id == ev_queue->id)
743                         break; /* found */
744         }
745
746         /* This is expected with eventdev API!
747          * It blindly attempts to unmap all queues.
748          */
749         if (i == DLB_MAX_NUM_QIDS_PER_LDB_CQ) {
750                 DLB_LOG_DBG("dlb: ignoring LB QID %d not mapped for qm_port %d.\n",
751                             ev_queue->qm_queue.id,
752                             ev_port->qm_port.id);
753                 return 0;
754         }
755
756         ret = dlb_hw_unmap_ldb_qid_from_port(&dlb->qm_instance,
757                                              ev_port->qm_port.id,
758                                              ev_queue->qm_queue.id);
759         if (!ret)
760                 ev_port->link[i].mapped = false;
761
762         return ret;
763 }
764
765 static int
766 dlb_eventdev_port_unlink(struct rte_eventdev *dev, void *event_port,
767                          uint8_t queues[], uint16_t nb_unlinks)
768 {
769         struct dlb_eventdev_port *ev_port = event_port;
770         struct dlb_eventdev *dlb;
771         int i;
772
773         RTE_SET_USED(dev);
774
775         if (!ev_port->setup_done) {
776                 DLB_LOG_ERR("dlb: evport %d is not configured\n",
777                             ev_port->id);
778                 rte_errno = -EINVAL;
779                 return 0;
780         }
781
782         if (queues == NULL || nb_unlinks == 0) {
783                 DLB_LOG_DBG("dlb: queues is NULL or nb_unlinks is 0\n");
784                 return 0; /* Ignore and return success */
785         }
786
787         if (ev_port->qm_port.is_directed) {
788                 DLB_LOG_DBG("dlb: ignore unlink from dir port %d\n",
789                             ev_port->id);
790                 rte_errno = 0;
791                 return nb_unlinks; /* as if success */
792         }
793
794         dlb = ev_port->dlb;
795
796         for (i = 0; i < nb_unlinks; i++) {
797                 struct dlb_eventdev_queue *ev_queue;
798                 int ret, j;
799
800                 if (queues[i] >= dlb->num_queues) {
801                         DLB_LOG_ERR("dlb: invalid queue id %d\n", queues[i]);
802                         rte_errno = -EINVAL;
803                         return i; /* return index of offending queue */
804                 }
805
806                 ev_queue = &dlb->ev_queues[queues[i]];
807
808                 /* Does a link exist? */
809                 for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++)
810                         if (ev_port->link[j].queue_id == queues[i] &&
811                             ev_port->link[j].valid)
812                                 break;
813
814                 if (j == DLB_MAX_NUM_QIDS_PER_LDB_CQ)
815                         continue;
816
817                 ret = dlb_event_queue_detach_ldb(dlb, ev_port, ev_queue);
818                 if (ret) {
819                         DLB_LOG_ERR("unlink err=%d for port %d queue %d\n",
820                                     ret, ev_port->id, queues[i]);
821                         rte_errno = -ENOENT;
822                         return i; /* return index of offending queue */
823                 }
824
825                 ev_port->link[j].valid = false;
826                 ev_port->num_links--;
827                 ev_queue->num_links--;
828         }
829
830         return nb_unlinks;
831 }
832
833 static int
834 dlb_eventdev_port_unlinks_in_progress(struct rte_eventdev *dev,
835                                       void *event_port)
836 {
837         struct dlb_eventdev_port *ev_port = event_port;
838         struct dlb_eventdev *dlb;
839         struct dlb_hw_dev *handle;
840         struct dlb_pending_port_unmaps_args cfg;
841         struct dlb_cmd_response response;
842         int ret;
843
844         RTE_SET_USED(dev);
845
846         if (!ev_port->setup_done) {
847                 DLB_LOG_ERR("dlb: evport %d is not configured\n",
848                             ev_port->id);
849                 rte_errno = -EINVAL;
850                 return 0;
851         }
852
853         cfg.port_id = ev_port->qm_port.id;
854         cfg.response = (uintptr_t)&response;
855         dlb = ev_port->dlb;
856         handle = &dlb->qm_instance;
857         ret = dlb_iface_pending_port_unmaps(handle, &cfg);
858
859         if (ret < 0) {
860                 DLB_LOG_ERR("dlb: num_unlinks_in_progress ret=%d (driver status: %s)\n",
861                             ret, dlb_error_strings[response.status]);
862                 return ret;
863         }
864
865         return response.id;
866 }
867
868 static void
869 dlb_eventdev_port_default_conf_get(struct rte_eventdev *dev,
870                                    uint8_t port_id,
871                                    struct rte_event_port_conf *port_conf)
872 {
873         RTE_SET_USED(port_id);
874         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
875
876         port_conf->new_event_threshold = dlb->new_event_limit;
877         port_conf->dequeue_depth = 32;
878         port_conf->enqueue_depth = DLB_MAX_ENQUEUE_DEPTH;
879         port_conf->event_port_cfg = 0;
880 }
881
882 static void
883 dlb_eventdev_queue_default_conf_get(struct rte_eventdev *dev,
884                                     uint8_t queue_id,
885                                     struct rte_event_queue_conf *queue_conf)
886 {
887         RTE_SET_USED(dev);
888         RTE_SET_USED(queue_id);
889         queue_conf->nb_atomic_flows = 1024;
890         queue_conf->nb_atomic_order_sequences = 32;
891         queue_conf->event_queue_cfg = 0;
892         queue_conf->priority = 0;
893 }
894
895 static int
896 dlb_hw_create_ldb_port(struct dlb_eventdev *dlb,
897                        struct dlb_eventdev_port *ev_port,
898                        uint32_t dequeue_depth,
899                        uint32_t cq_depth,
900                        uint32_t enqueue_depth,
901                        uint16_t rsvd_tokens,
902                        bool use_rsvd_token_scheme)
903 {
904         struct dlb_hw_dev *handle = &dlb->qm_instance;
905         struct dlb_create_ldb_port_args cfg = {0};
906         struct dlb_cmd_response response = {0};
907         int ret;
908         struct dlb_port *qm_port = NULL;
909         char mz_name[RTE_MEMZONE_NAMESIZE];
910         uint32_t qm_port_id;
911
912         if (handle == NULL)
913                 return -EINVAL;
914
915         if (cq_depth < DLB_MIN_LDB_CQ_DEPTH) {
916                 DLB_LOG_ERR("dlb: invalid cq_depth, must be %d-%d\n",
917                         DLB_MIN_LDB_CQ_DEPTH, DLB_MAX_INPUT_QUEUE_DEPTH);
918                 return -EINVAL;
919         }
920
921         if (enqueue_depth < DLB_MIN_ENQUEUE_DEPTH) {
922                 DLB_LOG_ERR("dlb: invalid enqueue_depth, must be at least %d\n",
923                             DLB_MIN_ENQUEUE_DEPTH);
924                 return -EINVAL;
925         }
926
927         rte_spinlock_lock(&handle->resource_lock);
928
929         cfg.response = (uintptr_t)&response;
930
931         /* We round up to the next power of 2 if necessary */
932         cfg.cq_depth = rte_align32pow2(cq_depth);
933         cfg.cq_depth_threshold = rsvd_tokens;
934
935         cfg.cq_history_list_size = DLB_NUM_HIST_LIST_ENTRIES_PER_LDB_PORT;
936
937         /* User controls the LDB high watermark via enqueue depth. The DIR high
938          * watermark is equal, unless the directed credit pool is too small.
939          */
940         cfg.ldb_credit_high_watermark = enqueue_depth;
941
942         /* If there are no directed ports, the kernel driver will ignore this
943          * port's directed credit settings. Don't use enqueue_depth if it would
944          * require more directed credits than are available.
945          */
946         cfg.dir_credit_high_watermark =
947                 RTE_MIN(enqueue_depth,
948                         handle->cfg.num_dir_credits / dlb->num_ports);
949
950         cfg.ldb_credit_quantum = cfg.ldb_credit_high_watermark / 2;
951         cfg.ldb_credit_low_watermark = RTE_MIN(16, cfg.ldb_credit_quantum);
952
953         cfg.dir_credit_quantum = cfg.dir_credit_high_watermark / 2;
954         cfg.dir_credit_low_watermark = RTE_MIN(16, cfg.dir_credit_quantum);
955
956         /* Per QM values */
957
958         cfg.ldb_credit_pool_id = handle->cfg.ldb_credit_pool_id;
959         cfg.dir_credit_pool_id = handle->cfg.dir_credit_pool_id;
960
961         ret = dlb_iface_ldb_port_create(handle, &cfg, dlb->poll_mode);
962         if (ret < 0) {
963                 DLB_LOG_ERR("dlb: dlb_ldb_port_create error, ret=%d (driver status: %s)\n",
964                             ret, dlb_error_strings[response.status]);
965                 goto error_exit;
966         }
967
968         qm_port_id = response.id;
969
970         DLB_LOG_DBG("dlb: ev_port %d uses qm LB port %d <<<<<\n",
971                     ev_port->id, qm_port_id);
972
973         qm_port = &ev_port->qm_port;
974         qm_port->ev_port = ev_port; /* back ptr */
975         qm_port->dlb = dlb; /* back ptr */
976
977         /*
978          * Allocate and init local qe struct(s).
979          * Note: MOVDIR64 requires the enqueue QE (qe4) to be aligned.
980          */
981
982         snprintf(mz_name, sizeof(mz_name), "ldb_port%d",
983                  ev_port->id);
984
985         ret = dlb_init_qe_mem(qm_port, mz_name);
986         if (ret < 0) {
987                 DLB_LOG_ERR("dlb: init_qe_mem failed, ret=%d\n", ret);
988                 goto error_exit;
989         }
990
991         qm_port->pp_mmio_base = DLB_LDB_PP_BASE + PAGE_SIZE * qm_port_id;
992         qm_port->id = qm_port_id;
993
994         /* The credit window is one high water mark of QEs */
995         qm_port->ldb_pushcount_at_credit_expiry = 0;
996         qm_port->cached_ldb_credits = cfg.ldb_credit_high_watermark;
997         /* The credit window is one high water mark of QEs */
998         qm_port->dir_pushcount_at_credit_expiry = 0;
999         qm_port->cached_dir_credits = cfg.dir_credit_high_watermark;
1000         qm_port->cq_depth = cfg.cq_depth;
1001         /* CQs with depth < 8 use an 8-entry queue, but withhold credits so
1002          * the effective depth is smaller.
1003          */
1004         qm_port->cq_depth = cfg.cq_depth <= 8 ? 8 : cfg.cq_depth;
1005         qm_port->cq_idx = 0;
1006         qm_port->cq_idx_unmasked = 0;
1007         if (dlb->poll_mode == DLB_CQ_POLL_MODE_SPARSE)
1008                 qm_port->cq_depth_mask = (qm_port->cq_depth * 4) - 1;
1009         else
1010                 qm_port->cq_depth_mask = qm_port->cq_depth - 1;
1011
1012         qm_port->gen_bit_shift = __builtin_popcount(qm_port->cq_depth_mask);
1013         /* starting value of gen bit - it toggles at wrap time */
1014         qm_port->gen_bit = 1;
1015
1016         qm_port->use_rsvd_token_scheme = use_rsvd_token_scheme;
1017         qm_port->cq_rsvd_token_deficit = rsvd_tokens;
1018         qm_port->int_armed = false;
1019
1020         /* Save off for later use in info and lookup APIs. */
1021         qm_port->qid_mappings = &dlb->qm_ldb_to_ev_queue_id[0];
1022
1023         qm_port->dequeue_depth = dequeue_depth;
1024
1025         /* When using the reserved token scheme, token_pop_thresh is
1026          * initially 2 * dequeue_depth. Once the tokens are reserved,
1027          * the enqueue code re-assigns it to dequeue_depth.
1028          */
1029         qm_port->token_pop_thresh = cq_depth;
1030
1031         /* When the deferred scheduling vdev arg is selected, use deferred pop
1032          * for all single-entry CQs.
1033          */
1034         if (cfg.cq_depth == 1 || (cfg.cq_depth == 2 && use_rsvd_token_scheme)) {
1035                 if (dlb->defer_sched)
1036                         qm_port->token_pop_mode = DEFERRED_POP;
1037         }
1038
1039         /* The default enqueue functions do not include delayed-pop support for
1040          * performance reasons.
1041          */
1042         if (qm_port->token_pop_mode == DELAYED_POP) {
1043                 dlb->event_dev->enqueue = dlb_event_enqueue_delayed;
1044                 dlb->event_dev->enqueue_burst =
1045                         dlb_event_enqueue_burst_delayed;
1046                 dlb->event_dev->enqueue_new_burst =
1047                         dlb_event_enqueue_new_burst_delayed;
1048                 dlb->event_dev->enqueue_forward_burst =
1049                         dlb_event_enqueue_forward_burst_delayed;
1050         }
1051
1052         qm_port->owed_tokens = 0;
1053         qm_port->issued_releases = 0;
1054
1055         /* update state */
1056         qm_port->state = PORT_STARTED; /* enabled at create time */
1057         qm_port->config_state = DLB_CONFIGURED;
1058
1059         qm_port->dir_credits = cfg.dir_credit_high_watermark;
1060         qm_port->ldb_credits = cfg.ldb_credit_high_watermark;
1061
1062         DLB_LOG_DBG("dlb: created ldb port %d, depth = %d, ldb credits=%d, dir credits=%d\n",
1063                     qm_port_id,
1064                     cq_depth,
1065                     qm_port->ldb_credits,
1066                     qm_port->dir_credits);
1067
1068         rte_spinlock_unlock(&handle->resource_lock);
1069
1070         return 0;
1071
1072 error_exit:
1073         if (qm_port) {
1074                 dlb_free_qe_mem(qm_port);
1075                 qm_port->pp_mmio_base = 0;
1076         }
1077
1078         rte_spinlock_unlock(&handle->resource_lock);
1079
1080         DLB_LOG_ERR("dlb: create ldb port failed!\n");
1081
1082         return ret;
1083 }
1084
1085 static int
1086 dlb_hw_create_dir_port(struct dlb_eventdev *dlb,
1087                        struct dlb_eventdev_port *ev_port,
1088                        uint32_t dequeue_depth,
1089                        uint32_t cq_depth,
1090                        uint32_t enqueue_depth,
1091                        uint16_t rsvd_tokens,
1092                        bool use_rsvd_token_scheme)
1093 {
1094         struct dlb_hw_dev *handle = &dlb->qm_instance;
1095         struct dlb_create_dir_port_args cfg = {0};
1096         struct dlb_cmd_response response = {0};
1097         int ret;
1098         struct dlb_port *qm_port = NULL;
1099         char mz_name[RTE_MEMZONE_NAMESIZE];
1100         uint32_t qm_port_id;
1101
1102         if (dlb == NULL || handle == NULL)
1103                 return -EINVAL;
1104
1105         if (cq_depth < DLB_MIN_DIR_CQ_DEPTH) {
1106                 DLB_LOG_ERR("dlb: invalid cq_depth, must be at least %d\n",
1107                             DLB_MIN_DIR_CQ_DEPTH);
1108                 return -EINVAL;
1109         }
1110
1111         if (enqueue_depth < DLB_MIN_ENQUEUE_DEPTH) {
1112                 DLB_LOG_ERR("dlb: invalid enqueue_depth, must be at least %d\n",
1113                             DLB_MIN_ENQUEUE_DEPTH);
1114                 return -EINVAL;
1115         }
1116
1117         rte_spinlock_lock(&handle->resource_lock);
1118
1119         /* Directed queues are configured at link time. */
1120         cfg.queue_id = -1;
1121
1122         cfg.response = (uintptr_t)&response;
1123
1124         /* We round up to the next power of 2 if necessary */
1125         cfg.cq_depth = rte_align32pow2(cq_depth);
1126         cfg.cq_depth_threshold = rsvd_tokens;
1127
1128         /* User controls the LDB high watermark via enqueue depth. The DIR high
1129          * watermark is equal, unless the directed credit pool is too small.
1130          */
1131         cfg.ldb_credit_high_watermark = enqueue_depth;
1132
1133         /* Don't use enqueue_depth if it would require more directed credits
1134          * than are available.
1135          */
1136         cfg.dir_credit_high_watermark =
1137                 RTE_MIN(enqueue_depth,
1138                         handle->cfg.num_dir_credits / dlb->num_ports);
1139
1140         cfg.ldb_credit_quantum = cfg.ldb_credit_high_watermark / 2;
1141         cfg.ldb_credit_low_watermark = RTE_MIN(16, cfg.ldb_credit_quantum);
1142
1143         cfg.dir_credit_quantum = cfg.dir_credit_high_watermark / 2;
1144         cfg.dir_credit_low_watermark = RTE_MIN(16, cfg.dir_credit_quantum);
1145
1146         /* Per QM values */
1147
1148         cfg.ldb_credit_pool_id = handle->cfg.ldb_credit_pool_id;
1149         cfg.dir_credit_pool_id = handle->cfg.dir_credit_pool_id;
1150
1151         ret = dlb_iface_dir_port_create(handle, &cfg, dlb->poll_mode);
1152         if (ret < 0) {
1153                 DLB_LOG_ERR("dlb: dlb_dir_port_create error, ret=%d (driver status: %s)\n",
1154                             ret, dlb_error_strings[response.status]);
1155                 goto error_exit;
1156         }
1157
1158         qm_port_id = response.id;
1159
1160         DLB_LOG_DBG("dlb: ev_port %d uses qm DIR port %d <<<<<\n",
1161                     ev_port->id, qm_port_id);
1162
1163         qm_port = &ev_port->qm_port;
1164         qm_port->ev_port = ev_port; /* back ptr */
1165         qm_port->dlb = dlb;  /* back ptr */
1166
1167         /*
1168          * Init local qe struct(s).
1169          * Note: MOVDIR64 requires the enqueue QE to be aligned
1170          */
1171
1172         snprintf(mz_name, sizeof(mz_name), "dir_port%d",
1173                  ev_port->id);
1174
1175         ret = dlb_init_qe_mem(qm_port, mz_name);
1176
1177         if (ret < 0) {
1178                 DLB_LOG_ERR("dlb: init_qe_mem failed, ret=%d\n", ret);
1179                 goto error_exit;
1180         }
1181
1182         qm_port->pp_mmio_base = DLB_DIR_PP_BASE + PAGE_SIZE * qm_port_id;
1183         qm_port->id = qm_port_id;
1184
1185         /* The credit window is one high water mark of QEs */
1186         qm_port->ldb_pushcount_at_credit_expiry = 0;
1187         qm_port->cached_ldb_credits = cfg.ldb_credit_high_watermark;
1188         /* The credit window is one high water mark of QEs */
1189         qm_port->dir_pushcount_at_credit_expiry = 0;
1190         qm_port->cached_dir_credits = cfg.dir_credit_high_watermark;
1191         qm_port->cq_depth = cfg.cq_depth;
1192         qm_port->cq_idx = 0;
1193         qm_port->cq_idx_unmasked = 0;
1194         if (dlb->poll_mode == DLB_CQ_POLL_MODE_SPARSE)
1195                 qm_port->cq_depth_mask = (cfg.cq_depth * 4) - 1;
1196         else
1197                 qm_port->cq_depth_mask = cfg.cq_depth - 1;
1198
1199         qm_port->gen_bit_shift = __builtin_popcount(qm_port->cq_depth_mask);
1200         /* starting value of gen bit - it toggles at wrap time */
1201         qm_port->gen_bit = 1;
1202
1203         qm_port->use_rsvd_token_scheme = use_rsvd_token_scheme;
1204         qm_port->cq_rsvd_token_deficit = rsvd_tokens;
1205         qm_port->int_armed = false;
1206
1207         /* Save off for later use in info and lookup APIs. */
1208         qm_port->qid_mappings = &dlb->qm_dir_to_ev_queue_id[0];
1209
1210         qm_port->dequeue_depth = dequeue_depth;
1211
1212         /* Directed ports are auto-pop, by default. */
1213         qm_port->token_pop_mode = AUTO_POP;
1214         qm_port->owed_tokens = 0;
1215         qm_port->issued_releases = 0;
1216
1217         /* update state */
1218         qm_port->state = PORT_STARTED; /* enabled at create time */
1219         qm_port->config_state = DLB_CONFIGURED;
1220
1221         qm_port->dir_credits = cfg.dir_credit_high_watermark;
1222         qm_port->ldb_credits = cfg.ldb_credit_high_watermark;
1223
1224         DLB_LOG_DBG("dlb: created dir port %d, depth = %d cr=%d,%d\n",
1225                     qm_port_id,
1226                     cq_depth,
1227                     cfg.dir_credit_high_watermark,
1228                     cfg.ldb_credit_high_watermark);
1229
1230         rte_spinlock_unlock(&handle->resource_lock);
1231
1232         return 0;
1233
1234 error_exit:
1235         if (qm_port) {
1236                 qm_port->pp_mmio_base = 0;
1237                 dlb_free_qe_mem(qm_port);
1238         }
1239
1240         rte_spinlock_unlock(&handle->resource_lock);
1241
1242         DLB_LOG_ERR("dlb: create dir port failed!\n");
1243
1244         return ret;
1245 }
1246
1247 static int32_t
1248 dlb_hw_create_ldb_queue(struct dlb_eventdev *dlb,
1249                         struct dlb_queue *queue,
1250                         const struct rte_event_queue_conf *evq_conf)
1251 {
1252         struct dlb_hw_dev *handle = &dlb->qm_instance;
1253         struct dlb_create_ldb_queue_args cfg;
1254         struct dlb_cmd_response response;
1255         int32_t ret;
1256         uint32_t qm_qid;
1257         int sched_type = -1;
1258
1259         if (evq_conf == NULL)
1260                 return -EINVAL;
1261
1262         if (evq_conf->event_queue_cfg & RTE_EVENT_QUEUE_CFG_ALL_TYPES) {
1263                 if (evq_conf->nb_atomic_order_sequences != 0)
1264                         sched_type = RTE_SCHED_TYPE_ORDERED;
1265                 else
1266                         sched_type = RTE_SCHED_TYPE_PARALLEL;
1267         } else
1268                 sched_type = evq_conf->schedule_type;
1269
1270         cfg.response = (uintptr_t)&response;
1271         cfg.num_atomic_inflights = dlb->num_atm_inflights_per_queue;
1272         cfg.num_sequence_numbers = evq_conf->nb_atomic_order_sequences;
1273         cfg.num_qid_inflights = evq_conf->nb_atomic_order_sequences;
1274
1275         if (sched_type != RTE_SCHED_TYPE_ORDERED) {
1276                 cfg.num_sequence_numbers = 0;
1277                 cfg.num_qid_inflights = DLB_DEF_UNORDERED_QID_INFLIGHTS;
1278         }
1279
1280         ret = dlb_iface_ldb_queue_create(handle, &cfg);
1281         if (ret < 0) {
1282                 DLB_LOG_ERR("dlb: create LB event queue error, ret=%d (driver status: %s)\n",
1283                             ret, dlb_error_strings[response.status]);
1284                 return -EINVAL;
1285         }
1286
1287         qm_qid = response.id;
1288
1289         /* Save off queue config for debug, resource lookups, and reconfig */
1290         queue->num_qid_inflights = cfg.num_qid_inflights;
1291         queue->num_atm_inflights = cfg.num_atomic_inflights;
1292
1293         queue->sched_type = sched_type;
1294         queue->config_state = DLB_CONFIGURED;
1295
1296         DLB_LOG_DBG("Created LB event queue %d, nb_inflights=%d, nb_seq=%d, qid inflights=%d\n",
1297                     qm_qid,
1298                     cfg.num_atomic_inflights,
1299                     cfg.num_sequence_numbers,
1300                     cfg.num_qid_inflights);
1301
1302         return qm_qid;
1303 }
1304
1305 static int32_t
1306 dlb_get_sn_allocation(struct dlb_eventdev *dlb, int group)
1307 {
1308         struct dlb_hw_dev *handle = &dlb->qm_instance;
1309         struct dlb_get_sn_allocation_args cfg;
1310         struct dlb_cmd_response response;
1311         int ret;
1312
1313         cfg.group = group;
1314         cfg.response = (uintptr_t)&response;
1315
1316         ret = dlb_iface_get_sn_allocation(handle, &cfg);
1317         if (ret < 0) {
1318                 DLB_LOG_ERR("dlb: get_sn_allocation ret=%d (driver status: %s)\n",
1319                             ret, dlb_error_strings[response.status]);
1320                 return ret;
1321         }
1322
1323         return response.id;
1324 }
1325
1326 static int
1327 dlb_set_sn_allocation(struct dlb_eventdev *dlb, int group, int num)
1328 {
1329         struct dlb_hw_dev *handle = &dlb->qm_instance;
1330         struct dlb_set_sn_allocation_args cfg;
1331         struct dlb_cmd_response response;
1332         int ret;
1333
1334         cfg.num = num;
1335         cfg.group = group;
1336         cfg.response = (uintptr_t)&response;
1337
1338         ret = dlb_iface_set_sn_allocation(handle, &cfg);
1339         if (ret < 0) {
1340                 DLB_LOG_ERR("dlb: set_sn_allocation ret=%d (driver status: %s)\n",
1341                             ret, dlb_error_strings[response.status]);
1342                 return ret;
1343         }
1344
1345         return ret;
1346 }
1347
1348 static int32_t
1349 dlb_get_sn_occupancy(struct dlb_eventdev *dlb, int group)
1350 {
1351         struct dlb_hw_dev *handle = &dlb->qm_instance;
1352         struct dlb_get_sn_occupancy_args cfg;
1353         struct dlb_cmd_response response;
1354         int ret;
1355
1356         cfg.group = group;
1357         cfg.response = (uintptr_t)&response;
1358
1359         ret = dlb_iface_get_sn_occupancy(handle, &cfg);
1360         if (ret < 0) {
1361                 DLB_LOG_ERR("dlb: get_sn_occupancy ret=%d (driver status: %s)\n",
1362                             ret, dlb_error_strings[response.status]);
1363                 return ret;
1364         }
1365
1366         return response.id;
1367 }
1368
1369 /* Query the current sequence number allocations and, if they conflict with the
1370  * requested LDB queue configuration, attempt to re-allocate sequence numbers.
1371  * This is best-effort; if it fails, the PMD will attempt to configure the
1372  * load-balanced queue and return an error.
1373  */
1374 static void
1375 dlb_program_sn_allocation(struct dlb_eventdev *dlb,
1376                           const struct rte_event_queue_conf *queue_conf)
1377 {
1378         int grp_occupancy[DLB_NUM_SN_GROUPS];
1379         int grp_alloc[DLB_NUM_SN_GROUPS];
1380         int i, sequence_numbers;
1381
1382         sequence_numbers = (int)queue_conf->nb_atomic_order_sequences;
1383
1384         for (i = 0; i < DLB_NUM_SN_GROUPS; i++) {
1385                 int total_slots;
1386
1387                 grp_alloc[i] = dlb_get_sn_allocation(dlb, i);
1388                 if (grp_alloc[i] < 0)
1389                         return;
1390
1391                 total_slots = DLB_MAX_LDB_SN_ALLOC / grp_alloc[i];
1392
1393                 grp_occupancy[i] = dlb_get_sn_occupancy(dlb, i);
1394                 if (grp_occupancy[i] < 0)
1395                         return;
1396
1397                 /* DLB has at least one available slot for the requested
1398                  * sequence numbers, so no further configuration required.
1399                  */
1400                 if (grp_alloc[i] == sequence_numbers &&
1401                     grp_occupancy[i] < total_slots)
1402                         return;
1403         }
1404
1405         /* None of the sequence number groups are configured for the requested
1406          * sequence numbers, so we have to reconfigure one of them. This is
1407          * only possible if a group is not in use.
1408          */
1409         for (i = 0; i < DLB_NUM_SN_GROUPS; i++) {
1410                 if (grp_occupancy[i] == 0)
1411                         break;
1412         }
1413
1414         if (i == DLB_NUM_SN_GROUPS) {
1415                 DLB_LOG_ERR("[%s()] No groups with %d sequence_numbers are available or have free slots\n",
1416                        __func__, sequence_numbers);
1417                 return;
1418         }
1419
1420         /* Attempt to configure slot i with the requested number of sequence
1421          * numbers. Ignore the return value -- if this fails, the error will be
1422          * caught during subsequent queue configuration.
1423          */
1424         dlb_set_sn_allocation(dlb, i, sequence_numbers);
1425 }
1426
1427 static int
1428 dlb_eventdev_ldb_queue_setup(struct rte_eventdev *dev,
1429                              struct dlb_eventdev_queue *ev_queue,
1430                              const struct rte_event_queue_conf *queue_conf)
1431 {
1432         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1433         int32_t qm_qid;
1434
1435         if (queue_conf->nb_atomic_order_sequences)
1436                 dlb_program_sn_allocation(dlb, queue_conf);
1437
1438         qm_qid = dlb_hw_create_ldb_queue(dlb,
1439                                          &ev_queue->qm_queue,
1440                                          queue_conf);
1441         if (qm_qid < 0) {
1442                 DLB_LOG_ERR("Failed to create the load-balanced queue\n");
1443
1444                 return qm_qid;
1445         }
1446
1447         dlb->qm_ldb_to_ev_queue_id[qm_qid] = ev_queue->id;
1448
1449         ev_queue->qm_queue.id = qm_qid;
1450
1451         return 0;
1452 }
1453
1454 static int dlb_num_dir_queues_setup(struct dlb_eventdev *dlb)
1455 {
1456         int i, num = 0;
1457
1458         for (i = 0; i < dlb->num_queues; i++) {
1459                 if (dlb->ev_queues[i].setup_done &&
1460                     dlb->ev_queues[i].qm_queue.is_directed)
1461                         num++;
1462         }
1463
1464         return num;
1465 }
1466
1467 static void
1468 dlb_queue_link_teardown(struct dlb_eventdev *dlb,
1469                         struct dlb_eventdev_queue *ev_queue)
1470 {
1471         struct dlb_eventdev_port *ev_port;
1472         int i, j;
1473
1474         for (i = 0; i < dlb->num_ports; i++) {
1475                 ev_port = &dlb->ev_ports[i];
1476
1477                 for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++) {
1478                         if (!ev_port->link[j].valid ||
1479                             ev_port->link[j].queue_id != ev_queue->id)
1480                                 continue;
1481
1482                         ev_port->link[j].valid = false;
1483                         ev_port->num_links--;
1484                 }
1485         }
1486
1487         ev_queue->num_links = 0;
1488 }
1489
1490 static int
1491 dlb_eventdev_queue_setup(struct rte_eventdev *dev,
1492                          uint8_t ev_qid,
1493                          const struct rte_event_queue_conf *queue_conf)
1494 {
1495         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1496         struct dlb_eventdev_queue *ev_queue;
1497         int ret;
1498
1499         if (queue_conf == NULL)
1500                 return -EINVAL;
1501
1502         if (ev_qid >= dlb->num_queues)
1503                 return -EINVAL;
1504
1505         ev_queue = &dlb->ev_queues[ev_qid];
1506
1507         ev_queue->qm_queue.is_directed = queue_conf->event_queue_cfg &
1508                 RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
1509         ev_queue->id = ev_qid;
1510         ev_queue->conf = *queue_conf;
1511
1512         if (!ev_queue->qm_queue.is_directed) {
1513                 ret = dlb_eventdev_ldb_queue_setup(dev, ev_queue, queue_conf);
1514         } else {
1515                 /* The directed queue isn't setup until link time, at which
1516                  * point we know its directed port ID. Directed queue setup
1517                  * will only fail if this queue is already setup or there are
1518                  * no directed queues left to configure.
1519                  */
1520                 ret = 0;
1521
1522                 ev_queue->qm_queue.config_state = DLB_NOT_CONFIGURED;
1523
1524                 if (ev_queue->setup_done ||
1525                     dlb_num_dir_queues_setup(dlb) == dlb->num_dir_queues)
1526                         ret = -EINVAL;
1527         }
1528
1529         /* Tear down pre-existing port->queue links */
1530         if (!ret && dlb->run_state == DLB_RUN_STATE_STOPPED)
1531                 dlb_queue_link_teardown(dlb, ev_queue);
1532
1533         if (!ret)
1534                 ev_queue->setup_done = true;
1535
1536         return ret;
1537 }
1538
1539 static void
1540 dlb_port_link_teardown(struct dlb_eventdev *dlb,
1541                        struct dlb_eventdev_port *ev_port)
1542 {
1543         struct dlb_eventdev_queue *ev_queue;
1544         int i;
1545
1546         for (i = 0; i < DLB_MAX_NUM_QIDS_PER_LDB_CQ; i++) {
1547                 if (!ev_port->link[i].valid)
1548                         continue;
1549
1550                 ev_queue = &dlb->ev_queues[ev_port->link[i].queue_id];
1551
1552                 ev_port->link[i].valid = false;
1553                 ev_port->num_links--;
1554                 ev_queue->num_links--;
1555         }
1556 }
1557
1558 static int
1559 dlb_eventdev_port_setup(struct rte_eventdev *dev,
1560                         uint8_t ev_port_id,
1561                         const struct rte_event_port_conf *port_conf)
1562 {
1563         struct dlb_eventdev *dlb;
1564         struct dlb_eventdev_port *ev_port;
1565         bool use_rsvd_token_scheme;
1566         uint32_t adj_cq_depth;
1567         uint16_t rsvd_tokens;
1568         int ret;
1569
1570         if (dev == NULL || port_conf == NULL) {
1571                 DLB_LOG_ERR("Null parameter\n");
1572                 return -EINVAL;
1573         }
1574
1575         dlb = dlb_pmd_priv(dev);
1576
1577         if (ev_port_id >= DLB_MAX_NUM_PORTS)
1578                 return -EINVAL;
1579
1580         if (port_conf->dequeue_depth >
1581                 evdev_dlb_default_info.max_event_port_dequeue_depth ||
1582             port_conf->enqueue_depth >
1583                 evdev_dlb_default_info.max_event_port_enqueue_depth)
1584                 return -EINVAL;
1585
1586         ev_port = &dlb->ev_ports[ev_port_id];
1587         /* configured? */
1588         if (ev_port->setup_done) {
1589                 DLB_LOG_ERR("evport %d is already configured\n", ev_port_id);
1590                 return -EINVAL;
1591         }
1592
1593         /* The reserved token interrupt arming scheme requires that one or more
1594          * CQ tokens be reserved by the PMD. This limits the amount of CQ space
1595          * usable by the DLB, so in order to give an *effective* CQ depth equal
1596          * to the user-requested value, we double CQ depth and reserve half of
1597          * its tokens. If the user requests the max CQ depth (256) then we
1598          * cannot double it, so we reserve one token and give an effective
1599          * depth of 255 entries.
1600          */
1601         use_rsvd_token_scheme = true;
1602         rsvd_tokens = 1;
1603         adj_cq_depth = port_conf->dequeue_depth;
1604
1605         if (use_rsvd_token_scheme && adj_cq_depth < 256) {
1606                 rsvd_tokens = adj_cq_depth;
1607                 adj_cq_depth *= 2;
1608         }
1609
1610         ev_port->qm_port.is_directed = port_conf->event_port_cfg &
1611                 RTE_EVENT_PORT_CFG_SINGLE_LINK;
1612
1613         if (!ev_port->qm_port.is_directed) {
1614                 ret = dlb_hw_create_ldb_port(dlb,
1615                                              ev_port,
1616                                              port_conf->dequeue_depth,
1617                                              adj_cq_depth,
1618                                              port_conf->enqueue_depth,
1619                                              rsvd_tokens,
1620                                              use_rsvd_token_scheme);
1621                 if (ret < 0) {
1622                         DLB_LOG_ERR("Failed to create the lB port ve portId=%d\n",
1623                                     ev_port_id);
1624                         return ret;
1625                 }
1626         } else {
1627                 ret = dlb_hw_create_dir_port(dlb,
1628                                              ev_port,
1629                                              port_conf->dequeue_depth,
1630                                              adj_cq_depth,
1631                                              port_conf->enqueue_depth,
1632                                              rsvd_tokens,
1633                                              use_rsvd_token_scheme);
1634                 if (ret < 0) {
1635                         DLB_LOG_ERR("Failed to create the DIR port\n");
1636                         return ret;
1637                 }
1638         }
1639
1640         /* Save off port config for reconfig */
1641         dlb->ev_ports[ev_port_id].conf = *port_conf;
1642
1643         dlb->ev_ports[ev_port_id].id = ev_port_id;
1644         dlb->ev_ports[ev_port_id].enq_configured = true;
1645         dlb->ev_ports[ev_port_id].setup_done = true;
1646         dlb->ev_ports[ev_port_id].inflight_max =
1647                 port_conf->new_event_threshold;
1648         dlb->ev_ports[ev_port_id].implicit_release =
1649                 !(port_conf->event_port_cfg &
1650                   RTE_EVENT_PORT_CFG_DISABLE_IMPL_REL);
1651         dlb->ev_ports[ev_port_id].outstanding_releases = 0;
1652         dlb->ev_ports[ev_port_id].inflight_credits = 0;
1653         dlb->ev_ports[ev_port_id].credit_update_quanta =
1654                 RTE_LIBRTE_PMD_DLB_SW_CREDIT_QUANTA;
1655         dlb->ev_ports[ev_port_id].dlb = dlb; /* reverse link */
1656
1657         /* Tear down pre-existing port->queue links */
1658         if (dlb->run_state == DLB_RUN_STATE_STOPPED)
1659                 dlb_port_link_teardown(dlb, &dlb->ev_ports[ev_port_id]);
1660
1661         dev->data->ports[ev_port_id] = &dlb->ev_ports[ev_port_id];
1662
1663         return 0;
1664 }
1665
1666 static int
1667 dlb_eventdev_reapply_configuration(struct rte_eventdev *dev)
1668 {
1669         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1670         int ret, i;
1671
1672         /* If an event queue or port was previously configured, but hasn't been
1673          * reconfigured, reapply its original configuration.
1674          */
1675         for (i = 0; i < dlb->num_queues; i++) {
1676                 struct dlb_eventdev_queue *ev_queue;
1677
1678                 ev_queue = &dlb->ev_queues[i];
1679
1680                 if (ev_queue->qm_queue.config_state != DLB_PREV_CONFIGURED)
1681                         continue;
1682
1683                 ret = dlb_eventdev_queue_setup(dev, i, &ev_queue->conf);
1684                 if (ret < 0) {
1685                         DLB_LOG_ERR("dlb: failed to reconfigure queue %d", i);
1686                         return ret;
1687                 }
1688         }
1689
1690         for (i = 0; i < dlb->num_ports; i++) {
1691                 struct dlb_eventdev_port *ev_port = &dlb->ev_ports[i];
1692
1693                 if (ev_port->qm_port.config_state != DLB_PREV_CONFIGURED)
1694                         continue;
1695
1696                 ret = dlb_eventdev_port_setup(dev, i, &ev_port->conf);
1697                 if (ret < 0) {
1698                         DLB_LOG_ERR("dlb: failed to reconfigure ev_port %d",
1699                                     i);
1700                         return ret;
1701                 }
1702         }
1703
1704         return 0;
1705 }
1706
1707 static int
1708 set_dev_id(const char *key __rte_unused,
1709            const char *value,
1710            void *opaque)
1711 {
1712         int *dev_id = opaque;
1713         int ret;
1714
1715         if (value == NULL || opaque == NULL) {
1716                 DLB_LOG_ERR("NULL pointer\n");
1717                 return -EINVAL;
1718         }
1719
1720         ret = dlb_string_to_int(dev_id, value);
1721         if (ret < 0)
1722                 return ret;
1723
1724         return 0;
1725 }
1726
1727 static int
1728 set_defer_sched(const char *key __rte_unused,
1729                 const char *value,
1730                 void *opaque)
1731 {
1732         int *defer_sched = opaque;
1733
1734         if (value == NULL || opaque == NULL) {
1735                 DLB_LOG_ERR("NULL pointer\n");
1736                 return -EINVAL;
1737         }
1738
1739         if (strncmp(value, "on", 2) != 0) {
1740                 DLB_LOG_ERR("Invalid defer_sched argument \"%s\" (expected \"on\")\n",
1741                             value);
1742                 return -EINVAL;
1743         }
1744
1745         *defer_sched = 1;
1746
1747         return 0;
1748 }
1749
1750 static int
1751 set_num_atm_inflights(const char *key __rte_unused,
1752                       const char *value,
1753                       void *opaque)
1754 {
1755         int *num_atm_inflights = opaque;
1756         int ret;
1757
1758         if (value == NULL || opaque == NULL) {
1759                 DLB_LOG_ERR("NULL pointer\n");
1760                 return -EINVAL;
1761         }
1762
1763         ret = dlb_string_to_int(num_atm_inflights, value);
1764         if (ret < 0)
1765                 return ret;
1766
1767         if (*num_atm_inflights < 0 ||
1768             *num_atm_inflights > DLB_MAX_NUM_ATM_INFLIGHTS) {
1769                 DLB_LOG_ERR("dlb: atm_inflights must be between 0 and %d\n",
1770                             DLB_MAX_NUM_ATM_INFLIGHTS);
1771                 return -EINVAL;
1772         }
1773
1774         return 0;
1775 }
1776
1777 static int
1778 dlb_validate_port_link(struct dlb_eventdev_port *ev_port,
1779                        uint8_t queue_id,
1780                        bool link_exists,
1781                        int index)
1782 {
1783         struct dlb_eventdev *dlb = ev_port->dlb;
1784         struct dlb_eventdev_queue *ev_queue;
1785         bool port_is_dir, queue_is_dir;
1786
1787         if (queue_id > dlb->num_queues) {
1788                 DLB_LOG_ERR("queue_id %d > num queues %d\n",
1789                             queue_id, dlb->num_queues);
1790                 rte_errno = -EINVAL;
1791                 return -1;
1792         }
1793
1794         ev_queue = &dlb->ev_queues[queue_id];
1795
1796         if (!ev_queue->setup_done &&
1797             ev_queue->qm_queue.config_state != DLB_PREV_CONFIGURED) {
1798                 DLB_LOG_ERR("setup not done and not previously configured\n");
1799                 rte_errno = -EINVAL;
1800                 return -1;
1801         }
1802
1803         port_is_dir = ev_port->qm_port.is_directed;
1804         queue_is_dir = ev_queue->qm_queue.is_directed;
1805
1806         if (port_is_dir != queue_is_dir) {
1807                 DLB_LOG_ERR("%s queue %u can't link to %s port %u\n",
1808                             queue_is_dir ? "DIR" : "LDB", ev_queue->id,
1809                             port_is_dir ? "DIR" : "LDB", ev_port->id);
1810
1811                 rte_errno = -EINVAL;
1812                 return -1;
1813         }
1814
1815         /* Check if there is space for the requested link */
1816         if (!link_exists && index == -1) {
1817                 DLB_LOG_ERR("no space for new link\n");
1818                 rte_errno = -ENOSPC;
1819                 return -1;
1820         }
1821
1822         /* Check if the directed port is already linked */
1823         if (ev_port->qm_port.is_directed && ev_port->num_links > 0 &&
1824             !link_exists) {
1825                 DLB_LOG_ERR("Can't link DIR port %d to >1 queues\n",
1826                             ev_port->id);
1827                 rte_errno = -EINVAL;
1828                 return -1;
1829         }
1830
1831         /* Check if the directed queue is already linked */
1832         if (ev_queue->qm_queue.is_directed && ev_queue->num_links > 0 &&
1833             !link_exists) {
1834                 DLB_LOG_ERR("Can't link DIR queue %d to >1 ports\n",
1835                             ev_queue->id);
1836                 rte_errno = -EINVAL;
1837                 return -1;
1838         }
1839
1840         return 0;
1841 }
1842
1843 static int32_t
1844 dlb_hw_create_dir_queue(struct dlb_eventdev *dlb, int32_t qm_port_id)
1845 {
1846         struct dlb_hw_dev *handle = &dlb->qm_instance;
1847         struct dlb_create_dir_queue_args cfg;
1848         struct dlb_cmd_response response;
1849         int32_t ret;
1850
1851         cfg.response = (uintptr_t)&response;
1852
1853         /* The directed port is always configured before its queue */
1854         cfg.port_id = qm_port_id;
1855
1856         ret = dlb_iface_dir_queue_create(handle, &cfg);
1857         if (ret < 0) {
1858                 DLB_LOG_ERR("dlb: create DIR event queue error, ret=%d (driver status: %s)\n",
1859                             ret, dlb_error_strings[response.status]);
1860                 return -EINVAL;
1861         }
1862
1863         return response.id;
1864 }
1865
1866 static int
1867 dlb_eventdev_dir_queue_setup(struct dlb_eventdev *dlb,
1868                              struct dlb_eventdev_queue *ev_queue,
1869                              struct dlb_eventdev_port *ev_port)
1870 {
1871         int32_t qm_qid;
1872
1873         qm_qid = dlb_hw_create_dir_queue(dlb, ev_port->qm_port.id);
1874
1875         if (qm_qid < 0) {
1876                 DLB_LOG_ERR("Failed to create the DIR queue\n");
1877                 return qm_qid;
1878         }
1879
1880         dlb->qm_dir_to_ev_queue_id[qm_qid] = ev_queue->id;
1881
1882         ev_queue->qm_queue.id = qm_qid;
1883
1884         return 0;
1885 }
1886
1887 static int16_t
1888 dlb_hw_map_ldb_qid_to_port(struct dlb_hw_dev *handle,
1889                            uint32_t qm_port_id,
1890                            uint16_t qm_qid,
1891                            uint8_t priority)
1892 {
1893         struct dlb_map_qid_args cfg;
1894         struct dlb_cmd_response response;
1895         int32_t ret;
1896
1897         if (handle == NULL)
1898                 return -EINVAL;
1899
1900         /* Build message */
1901         cfg.response = (uintptr_t)&response;
1902         cfg.port_id = qm_port_id;
1903         cfg.qid = qm_qid;
1904         cfg.priority = EV_TO_DLB_PRIO(priority);
1905
1906         ret = dlb_iface_map_qid(handle, &cfg);
1907         if (ret < 0) {
1908                 DLB_LOG_ERR("dlb: map qid error, ret=%d (driver status: %s)\n",
1909                             ret, dlb_error_strings[response.status]);
1910                 DLB_LOG_ERR("dlb: device_id=%d grp=%d, qm_port=%d, qm_qid=%d prio=%d\n",
1911                             handle->device_id,
1912                             handle->domain_id, cfg.port_id,
1913                             cfg.qid,
1914                             cfg.priority);
1915         } else {
1916                 DLB_LOG_DBG("dlb: mapped queue %d to qm_port %d\n",
1917                             qm_qid, qm_port_id);
1918         }
1919
1920         return ret;
1921 }
1922
1923 static int
1924 dlb_event_queue_join_ldb(struct dlb_eventdev *dlb,
1925                          struct dlb_eventdev_port *ev_port,
1926                          struct dlb_eventdev_queue *ev_queue,
1927                          uint8_t priority)
1928 {
1929         int first_avail = -1;
1930         int ret, i;
1931
1932         for (i = 0; i < DLB_MAX_NUM_QIDS_PER_LDB_CQ; i++) {
1933                 if (ev_port->link[i].valid) {
1934                         if (ev_port->link[i].queue_id == ev_queue->id &&
1935                             ev_port->link[i].priority == priority) {
1936                                 if (ev_port->link[i].mapped)
1937                                         return 0; /* already mapped */
1938                                 first_avail = i;
1939                         }
1940                 } else {
1941                         if (first_avail == -1)
1942                                 first_avail = i;
1943                 }
1944         }
1945         if (first_avail == -1) {
1946                 DLB_LOG_ERR("dlb: qm_port %d has no available QID slots.\n",
1947                             ev_port->qm_port.id);
1948                 return -EINVAL;
1949         }
1950
1951         ret = dlb_hw_map_ldb_qid_to_port(&dlb->qm_instance,
1952                                          ev_port->qm_port.id,
1953                                          ev_queue->qm_queue.id,
1954                                          priority);
1955
1956         if (!ret)
1957                 ev_port->link[first_avail].mapped = true;
1958
1959         return ret;
1960 }
1961
1962 static int
1963 dlb_do_port_link(struct rte_eventdev *dev,
1964                  struct dlb_eventdev_queue *ev_queue,
1965                  struct dlb_eventdev_port *ev_port,
1966                  uint8_t prio)
1967 {
1968         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1969         int err;
1970
1971         /* Don't link until start time. */
1972         if (dlb->run_state == DLB_RUN_STATE_STOPPED)
1973                 return 0;
1974
1975         if (ev_queue->qm_queue.is_directed)
1976                 err = dlb_eventdev_dir_queue_setup(dlb, ev_queue, ev_port);
1977         else
1978                 err = dlb_event_queue_join_ldb(dlb, ev_port, ev_queue, prio);
1979
1980         if (err) {
1981                 DLB_LOG_ERR("port link failure for %s ev_q %d, ev_port %d\n",
1982                             ev_queue->qm_queue.is_directed ? "DIR" : "LDB",
1983                             ev_queue->id, ev_port->id);
1984
1985                 rte_errno = err;
1986                 return -1;
1987         }
1988
1989         return 0;
1990 }
1991
1992 static int
1993 dlb_eventdev_apply_port_links(struct rte_eventdev *dev)
1994 {
1995         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1996         int i;
1997
1998         /* Perform requested port->queue links */
1999         for (i = 0; i < dlb->num_ports; i++) {
2000                 struct dlb_eventdev_port *ev_port = &dlb->ev_ports[i];
2001                 int j;
2002
2003                 for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++) {
2004                         struct dlb_eventdev_queue *ev_queue;
2005                         uint8_t prio, queue_id;
2006
2007                         if (!ev_port->link[j].valid)
2008                                 continue;
2009
2010                         prio = ev_port->link[j].priority;
2011                         queue_id = ev_port->link[j].queue_id;
2012
2013                         if (dlb_validate_port_link(ev_port, queue_id, true, j))
2014                                 return -EINVAL;
2015
2016                         ev_queue = &dlb->ev_queues[queue_id];
2017
2018                         if (dlb_do_port_link(dev, ev_queue, ev_port, prio))
2019                                 return -EINVAL;
2020                 }
2021         }
2022
2023         return 0;
2024 }
2025
2026 static int
2027 dlb_eventdev_port_link(struct rte_eventdev *dev, void *event_port,
2028                        const uint8_t queues[], const uint8_t priorities[],
2029                        uint16_t nb_links)
2030
2031 {
2032         struct dlb_eventdev_port *ev_port = event_port;
2033         struct dlb_eventdev *dlb;
2034         int i, j;
2035
2036         RTE_SET_USED(dev);
2037
2038         if (ev_port == NULL) {
2039                 DLB_LOG_ERR("dlb: evport not setup\n");
2040                 rte_errno = -EINVAL;
2041                 return 0;
2042         }
2043
2044         if (!ev_port->setup_done &&
2045             ev_port->qm_port.config_state != DLB_PREV_CONFIGURED) {
2046                 DLB_LOG_ERR("dlb: evport not setup\n");
2047                 rte_errno = -EINVAL;
2048                 return 0;
2049         }
2050
2051         /* Note: rte_event_port_link() ensures the PMD won't receive a NULL
2052          * queues pointer.
2053          */
2054         if (nb_links == 0) {
2055                 DLB_LOG_DBG("dlb: nb_links is 0\n");
2056                 return 0; /* Ignore and return success */
2057         }
2058
2059         dlb = ev_port->dlb;
2060
2061         DLB_LOG_DBG("Linking %u queues to %s port %d\n",
2062                     nb_links,
2063                     ev_port->qm_port.is_directed ? "DIR" : "LDB",
2064                     ev_port->id);
2065
2066         for (i = 0; i < nb_links; i++) {
2067                 struct dlb_eventdev_queue *ev_queue;
2068                 uint8_t queue_id, prio;
2069                 bool found = false;
2070                 int index = -1;
2071
2072                 queue_id = queues[i];
2073                 prio = priorities[i];
2074
2075                 /* Check if the link already exists. */
2076                 for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++)
2077                         if (ev_port->link[j].valid) {
2078                                 if (ev_port->link[j].queue_id == queue_id) {
2079                                         found = true;
2080                                         index = j;
2081                                         break;
2082                                 }
2083                         } else {
2084                                 if (index == -1)
2085                                         index = j;
2086                         }
2087
2088                 /* could not link */
2089                 if (index == -1)
2090                         break;
2091
2092                 /* Check if already linked at the requested priority */
2093                 if (found && ev_port->link[j].priority == prio)
2094                         continue;
2095
2096                 if (dlb_validate_port_link(ev_port, queue_id, found, index))
2097                         break; /* return index of offending queue */
2098
2099                 ev_queue = &dlb->ev_queues[queue_id];
2100
2101                 if (dlb_do_port_link(dev, ev_queue, ev_port, prio))
2102                         break; /* return index of offending queue */
2103
2104                 ev_queue->num_links++;
2105
2106                 ev_port->link[index].queue_id = queue_id;
2107                 ev_port->link[index].priority = prio;
2108                 ev_port->link[index].valid = true;
2109                 /* Entry already exists?  If so, then must be prio change */
2110                 if (!found)
2111                         ev_port->num_links++;
2112         }
2113         return i;
2114 }
2115
2116 static int
2117 dlb_eventdev_start(struct rte_eventdev *dev)
2118 {
2119         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
2120         struct dlb_hw_dev *handle = &dlb->qm_instance;
2121         struct dlb_start_domain_args cfg;
2122         struct dlb_cmd_response response;
2123         int ret, i;
2124
2125         rte_spinlock_lock(&dlb->qm_instance.resource_lock);
2126         if (dlb->run_state != DLB_RUN_STATE_STOPPED) {
2127                 DLB_LOG_ERR("bad state %d for dev_start\n",
2128                             (int)dlb->run_state);
2129                 rte_spinlock_unlock(&dlb->qm_instance.resource_lock);
2130                 return -EINVAL;
2131         }
2132         dlb->run_state  = DLB_RUN_STATE_STARTING;
2133         rte_spinlock_unlock(&dlb->qm_instance.resource_lock);
2134
2135         /* If the device was configured more than once, some event ports and/or
2136          * queues may need to be reconfigured.
2137          */
2138         ret = dlb_eventdev_reapply_configuration(dev);
2139         if (ret)
2140                 return ret;
2141
2142         /* The DLB PMD delays port links until the device is started. */
2143         ret = dlb_eventdev_apply_port_links(dev);
2144         if (ret)
2145                 return ret;
2146
2147         cfg.response = (uintptr_t)&response;
2148
2149         for (i = 0; i < dlb->num_ports; i++) {
2150                 if (!dlb->ev_ports[i].setup_done) {
2151                         DLB_LOG_ERR("dlb: port %d not setup", i);
2152                         return -ESTALE;
2153                 }
2154         }
2155
2156         for (i = 0; i < dlb->num_queues; i++) {
2157                 if (dlb->ev_queues[i].num_links == 0) {
2158                         DLB_LOG_ERR("dlb: queue %d is not linked", i);
2159                         return -ENOLINK;
2160                 }
2161         }
2162
2163         ret = dlb_iface_sched_domain_start(handle, &cfg);
2164         if (ret < 0) {
2165                 DLB_LOG_ERR("dlb: sched_domain_start ret=%d (driver status: %s)\n",
2166                             ret, dlb_error_strings[response.status]);
2167                 return ret;
2168         }
2169
2170         dlb->run_state = DLB_RUN_STATE_STARTED;
2171         DLB_LOG_DBG("dlb: sched_domain_start completed OK\n");
2172
2173         return 0;
2174 }
2175
2176 static inline int
2177 dlb_check_enqueue_sw_credits(struct dlb_eventdev *dlb,
2178                              struct dlb_eventdev_port *ev_port)
2179 {
2180         uint32_t sw_inflights = __atomic_load_n(&dlb->inflights,
2181                                                 __ATOMIC_SEQ_CST);
2182         const int num = 1;
2183
2184         if (unlikely(ev_port->inflight_max < sw_inflights)) {
2185                 DLB_INC_STAT(ev_port->stats.traffic.tx_nospc_inflight_max, 1);
2186                 rte_errno = -ENOSPC;
2187                 return 1;
2188         }
2189
2190         if (ev_port->inflight_credits < num) {
2191                 /* check if event enqueue brings ev_port over max threshold */
2192                 uint32_t credit_update_quanta = ev_port->credit_update_quanta;
2193
2194                 if (sw_inflights + credit_update_quanta >
2195                     dlb->new_event_limit) {
2196                         DLB_INC_STAT(
2197                                 ev_port->stats.traffic.tx_nospc_new_event_limit,
2198                                 1);
2199                         rte_errno = -ENOSPC;
2200                         return 1;
2201                 }
2202
2203                 __atomic_fetch_add(&dlb->inflights, credit_update_quanta,
2204                                    __ATOMIC_SEQ_CST);
2205                 ev_port->inflight_credits += (credit_update_quanta);
2206
2207                 if (ev_port->inflight_credits < num) {
2208                         DLB_INC_STAT(
2209                             ev_port->stats.traffic.tx_nospc_inflight_credits,
2210                             1);
2211                         rte_errno = -ENOSPC;
2212                         return 1;
2213                 }
2214         }
2215
2216         return 0;
2217 }
2218
2219 static inline void
2220 dlb_replenish_sw_credits(struct dlb_eventdev *dlb,
2221                          struct dlb_eventdev_port *ev_port)
2222 {
2223         uint16_t quanta = ev_port->credit_update_quanta;
2224
2225         if (ev_port->inflight_credits >= quanta * 2) {
2226                 /* Replenish credits, saving one quanta for enqueues */
2227                 uint16_t val = ev_port->inflight_credits - quanta;
2228
2229                 __atomic_fetch_sub(&dlb->inflights, val, __ATOMIC_SEQ_CST);
2230                 ev_port->inflight_credits -= val;
2231         }
2232 }
2233
2234 static __rte_always_inline uint16_t
2235 dlb_read_pc(struct process_local_port_data *port_data, bool ldb)
2236 {
2237         volatile uint16_t *popcount;
2238
2239         if (ldb)
2240                 popcount = port_data->ldb_popcount;
2241         else
2242                 popcount = port_data->dir_popcount;
2243
2244         return *popcount;
2245 }
2246
2247 static inline int
2248 dlb_check_enqueue_hw_ldb_credits(struct dlb_port *qm_port,
2249                                  struct process_local_port_data *port_data)
2250 {
2251         if (unlikely(qm_port->cached_ldb_credits == 0)) {
2252                 uint16_t pc;
2253
2254                 pc = dlb_read_pc(port_data, true);
2255
2256                 qm_port->cached_ldb_credits = pc -
2257                         qm_port->ldb_pushcount_at_credit_expiry;
2258                 if (unlikely(qm_port->cached_ldb_credits == 0)) {
2259                         DLB_INC_STAT(
2260                         qm_port->ev_port->stats.traffic.tx_nospc_ldb_hw_credits,
2261                         1);
2262
2263                         DLB_LOG_DBG("ldb credits exhausted\n");
2264                         return 1;
2265                 }
2266                 qm_port->ldb_pushcount_at_credit_expiry +=
2267                         qm_port->cached_ldb_credits;
2268         }
2269
2270         return 0;
2271 }
2272
2273 static inline int
2274 dlb_check_enqueue_hw_dir_credits(struct dlb_port *qm_port,
2275                                  struct process_local_port_data *port_data)
2276 {
2277         if (unlikely(qm_port->cached_dir_credits == 0)) {
2278                 uint16_t pc;
2279
2280                 pc = dlb_read_pc(port_data, false);
2281
2282                 qm_port->cached_dir_credits = pc -
2283                         qm_port->dir_pushcount_at_credit_expiry;
2284
2285                 if (unlikely(qm_port->cached_dir_credits == 0)) {
2286                         DLB_INC_STAT(
2287                         qm_port->ev_port->stats.traffic.tx_nospc_dir_hw_credits,
2288                         1);
2289
2290                         DLB_LOG_DBG("dir credits exhausted\n");
2291                         return 1;
2292                 }
2293                 qm_port->dir_pushcount_at_credit_expiry +=
2294                         qm_port->cached_dir_credits;
2295         }
2296
2297         return 0;
2298 }
2299
2300 static inline int
2301 dlb_event_enqueue_prep(struct dlb_eventdev_port *ev_port,
2302                        struct dlb_port *qm_port,
2303                        const struct rte_event ev[],
2304                        struct process_local_port_data *port_data,
2305                        uint8_t *sched_type,
2306                        uint8_t *queue_id)
2307 {
2308         struct dlb_eventdev *dlb = ev_port->dlb;
2309         struct dlb_eventdev_queue *ev_queue;
2310         uint16_t *cached_credits = NULL;
2311         struct dlb_queue *qm_queue;
2312
2313         ev_queue = &dlb->ev_queues[ev->queue_id];
2314         qm_queue = &ev_queue->qm_queue;
2315         *queue_id = qm_queue->id;
2316
2317         /* Ignore sched_type and hardware credits on release events */
2318         if (ev->op == RTE_EVENT_OP_RELEASE)
2319                 goto op_check;
2320
2321         if (!qm_queue->is_directed) {
2322                 /* Load balanced destination queue */
2323
2324                 if (dlb_check_enqueue_hw_ldb_credits(qm_port, port_data)) {
2325                         rte_errno = -ENOSPC;
2326                         return 1;
2327                 }
2328                 cached_credits = &qm_port->cached_ldb_credits;
2329
2330                 switch (ev->sched_type) {
2331                 case RTE_SCHED_TYPE_ORDERED:
2332                         DLB_LOG_DBG("dlb: put_qe: RTE_SCHED_TYPE_ORDERED\n");
2333                         if (qm_queue->sched_type != RTE_SCHED_TYPE_ORDERED) {
2334                                 DLB_LOG_ERR("dlb: tried to send ordered event to unordered queue %d\n",
2335                                             *queue_id);
2336                                 rte_errno = -EINVAL;
2337                                 return 1;
2338                         }
2339                         *sched_type = DLB_SCHED_ORDERED;
2340                         break;
2341                 case RTE_SCHED_TYPE_ATOMIC:
2342                         DLB_LOG_DBG("dlb: put_qe: RTE_SCHED_TYPE_ATOMIC\n");
2343                         *sched_type = DLB_SCHED_ATOMIC;
2344                         break;
2345                 case RTE_SCHED_TYPE_PARALLEL:
2346                         DLB_LOG_DBG("dlb: put_qe: RTE_SCHED_TYPE_PARALLEL\n");
2347                         if (qm_queue->sched_type == RTE_SCHED_TYPE_ORDERED)
2348                                 *sched_type = DLB_SCHED_ORDERED;
2349                         else
2350                                 *sched_type = DLB_SCHED_UNORDERED;
2351                         break;
2352                 default:
2353                         DLB_LOG_ERR("Unsupported LDB sched type in put_qe\n");
2354                         DLB_INC_STAT(ev_port->stats.tx_invalid, 1);
2355                         rte_errno = -EINVAL;
2356                         return 1;
2357                 }
2358         } else {
2359                 /* Directed destination queue */
2360
2361                 if (dlb_check_enqueue_hw_dir_credits(qm_port, port_data)) {
2362                         rte_errno = -ENOSPC;
2363                         return 1;
2364                 }
2365                 cached_credits = &qm_port->cached_dir_credits;
2366
2367                 DLB_LOG_DBG("dlb: put_qe: RTE_SCHED_TYPE_DIRECTED\n");
2368
2369                 *sched_type = DLB_SCHED_DIRECTED;
2370         }
2371
2372 op_check:
2373         switch (ev->op) {
2374         case RTE_EVENT_OP_NEW:
2375                 /* Check that a sw credit is available */
2376                 if (dlb_check_enqueue_sw_credits(dlb, ev_port)) {
2377                         rte_errno = -ENOSPC;
2378                         return 1;
2379                 }
2380                 ev_port->inflight_credits--;
2381                 (*cached_credits)--;
2382                 break;
2383         case RTE_EVENT_OP_FORWARD:
2384                 /* Check for outstanding_releases underflow. If this occurs,
2385                  * the application is not using the EVENT_OPs correctly; for
2386                  * example, forwarding or releasing events that were not
2387                  * dequeued.
2388                  */
2389                 RTE_ASSERT(ev_port->outstanding_releases > 0);
2390                 ev_port->outstanding_releases--;
2391                 qm_port->issued_releases++;
2392                 (*cached_credits)--;
2393                 break;
2394         case RTE_EVENT_OP_RELEASE:
2395                 ev_port->inflight_credits++;
2396                 /* Check for outstanding_releases underflow. If this occurs,
2397                  * the application is not using the EVENT_OPs correctly; for
2398                  * example, forwarding or releasing events that were not
2399                  * dequeued.
2400                  */
2401                 RTE_ASSERT(ev_port->outstanding_releases > 0);
2402                 ev_port->outstanding_releases--;
2403                 qm_port->issued_releases++;
2404                 /* Replenish s/w credits if enough are cached */
2405                 dlb_replenish_sw_credits(dlb, ev_port);
2406                 break;
2407         }
2408
2409         DLB_INC_STAT(ev_port->stats.tx_op_cnt[ev->op], 1);
2410         DLB_INC_STAT(ev_port->stats.traffic.tx_ok, 1);
2411
2412 #ifndef RTE_LIBRTE_PMD_DLB_QUELL_STATS
2413         if (ev->op != RTE_EVENT_OP_RELEASE) {
2414                 DLB_INC_STAT(ev_port->stats.enq_ok[ev->queue_id], 1);
2415                 DLB_INC_STAT(ev_port->stats.tx_sched_cnt[*sched_type], 1);
2416         }
2417 #endif
2418
2419         return 0;
2420 }
2421
2422 static uint8_t cmd_byte_map[NUM_DLB_PORT_TYPES][DLB_NUM_HW_SCHED_TYPES] = {
2423         {
2424                 /* Load-balanced cmd bytes */
2425                 [RTE_EVENT_OP_NEW] = DLB_NEW_CMD_BYTE,
2426                 [RTE_EVENT_OP_FORWARD] = DLB_FWD_CMD_BYTE,
2427                 [RTE_EVENT_OP_RELEASE] = DLB_COMP_CMD_BYTE,
2428         },
2429         {
2430                 /* Directed cmd bytes */
2431                 [RTE_EVENT_OP_NEW] = DLB_NEW_CMD_BYTE,
2432                 [RTE_EVENT_OP_FORWARD] = DLB_NEW_CMD_BYTE,
2433                 [RTE_EVENT_OP_RELEASE] = DLB_NOOP_CMD_BYTE,
2434         },
2435 };
2436
2437 static inline void
2438 dlb_event_build_hcws(struct dlb_port *qm_port,
2439                      const struct rte_event ev[],
2440                      int num,
2441                      uint8_t *sched_type,
2442                      uint8_t *queue_id)
2443 {
2444         struct dlb_enqueue_qe *qe;
2445         uint16_t sched_word[4];
2446         __m128i sse_qe[2];
2447         int i;
2448
2449         qe = qm_port->qe4;
2450
2451         sse_qe[0] = _mm_setzero_si128();
2452         sse_qe[1] = _mm_setzero_si128();
2453
2454         switch (num) {
2455         case 4:
2456                 /* Construct the metadata portion of two HCWs in one 128b SSE
2457                  * register. HCW metadata is constructed in the SSE registers
2458                  * like so:
2459                  * sse_qe[0][63:0]:   qe[0]'s metadata
2460                  * sse_qe[0][127:64]: qe[1]'s metadata
2461                  * sse_qe[1][63:0]:   qe[2]'s metadata
2462                  * sse_qe[1][127:64]: qe[3]'s metadata
2463                  */
2464
2465                 /* Convert the event operation into a command byte and store it
2466                  * in the metadata:
2467                  * sse_qe[0][63:56]   = cmd_byte_map[is_directed][ev[0].op]
2468                  * sse_qe[0][127:120] = cmd_byte_map[is_directed][ev[1].op]
2469                  * sse_qe[1][63:56]   = cmd_byte_map[is_directed][ev[2].op]
2470                  * sse_qe[1][127:120] = cmd_byte_map[is_directed][ev[3].op]
2471                  */
2472 #define DLB_QE_CMD_BYTE 7
2473                 sse_qe[0] = _mm_insert_epi8(sse_qe[0],
2474                                 cmd_byte_map[qm_port->is_directed][ev[0].op],
2475                                 DLB_QE_CMD_BYTE);
2476                 sse_qe[0] = _mm_insert_epi8(sse_qe[0],
2477                                 cmd_byte_map[qm_port->is_directed][ev[1].op],
2478                                 DLB_QE_CMD_BYTE + 8);
2479                 sse_qe[1] = _mm_insert_epi8(sse_qe[1],
2480                                 cmd_byte_map[qm_port->is_directed][ev[2].op],
2481                                 DLB_QE_CMD_BYTE);
2482                 sse_qe[1] = _mm_insert_epi8(sse_qe[1],
2483                                 cmd_byte_map[qm_port->is_directed][ev[3].op],
2484                                 DLB_QE_CMD_BYTE + 8);
2485
2486                 /* Store priority, scheduling type, and queue ID in the sched
2487                  * word array because these values are re-used when the
2488                  * destination is a directed queue.
2489                  */
2490                 sched_word[0] = EV_TO_DLB_PRIO(ev[0].priority) << 10 |
2491                                 sched_type[0] << 8 |
2492                                 queue_id[0];
2493                 sched_word[1] = EV_TO_DLB_PRIO(ev[1].priority) << 10 |
2494                                 sched_type[1] << 8 |
2495                                 queue_id[1];
2496                 sched_word[2] = EV_TO_DLB_PRIO(ev[2].priority) << 10 |
2497                                 sched_type[2] << 8 |
2498                                 queue_id[2];
2499                 sched_word[3] = EV_TO_DLB_PRIO(ev[3].priority) << 10 |
2500                                 sched_type[3] << 8 |
2501                                 queue_id[3];
2502
2503                 /* Store the event priority, scheduling type, and queue ID in
2504                  * the metadata:
2505                  * sse_qe[0][31:16] = sched_word[0]
2506                  * sse_qe[0][95:80] = sched_word[1]
2507                  * sse_qe[1][31:16] = sched_word[2]
2508                  * sse_qe[1][95:80] = sched_word[3]
2509                  */
2510 #define DLB_QE_QID_SCHED_WORD 1
2511                 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2512                                              sched_word[0],
2513                                              DLB_QE_QID_SCHED_WORD);
2514                 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2515                                              sched_word[1],
2516                                              DLB_QE_QID_SCHED_WORD + 4);
2517                 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2518                                              sched_word[2],
2519                                              DLB_QE_QID_SCHED_WORD);
2520                 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2521                                              sched_word[3],
2522                                              DLB_QE_QID_SCHED_WORD + 4);
2523
2524                 /* If the destination is a load-balanced queue, store the lock
2525                  * ID. If it is a directed queue, DLB places this field in
2526                  * bytes 10-11 of the received QE, so we format it accordingly:
2527                  * sse_qe[0][47:32]  = dir queue ? sched_word[0] : flow_id[0]
2528                  * sse_qe[0][111:96] = dir queue ? sched_word[1] : flow_id[1]
2529                  * sse_qe[1][47:32]  = dir queue ? sched_word[2] : flow_id[2]
2530                  * sse_qe[1][111:96] = dir queue ? sched_word[3] : flow_id[3]
2531                  */
2532 #define DLB_QE_LOCK_ID_WORD 2
2533                 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2534                                 (sched_type[0] == DLB_SCHED_DIRECTED) ?
2535                                         sched_word[0] : ev[0].flow_id,
2536                                 DLB_QE_LOCK_ID_WORD);
2537                 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2538                                 (sched_type[1] == DLB_SCHED_DIRECTED) ?
2539                                         sched_word[1] : ev[1].flow_id,
2540                                 DLB_QE_LOCK_ID_WORD + 4);
2541                 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2542                                 (sched_type[2] == DLB_SCHED_DIRECTED) ?
2543                                         sched_word[2] : ev[2].flow_id,
2544                                 DLB_QE_LOCK_ID_WORD);
2545                 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2546                                 (sched_type[3] == DLB_SCHED_DIRECTED) ?
2547                                         sched_word[3] : ev[3].flow_id,
2548                                 DLB_QE_LOCK_ID_WORD + 4);
2549
2550                 /* Store the event type and sub event type in the metadata:
2551                  * sse_qe[0][15:0]  = flow_id[0]
2552                  * sse_qe[0][79:64] = flow_id[1]
2553                  * sse_qe[1][15:0]  = flow_id[2]
2554                  * sse_qe[1][79:64] = flow_id[3]
2555                  */
2556 #define DLB_QE_EV_TYPE_WORD 0
2557                 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2558                                              ev[0].sub_event_type << 8 |
2559                                                 ev[0].event_type,
2560                                              DLB_QE_EV_TYPE_WORD);
2561                 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2562                                              ev[1].sub_event_type << 8 |
2563                                                 ev[1].event_type,
2564                                              DLB_QE_EV_TYPE_WORD + 4);
2565                 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2566                                              ev[2].sub_event_type << 8 |
2567                                                 ev[2].event_type,
2568                                              DLB_QE_EV_TYPE_WORD);
2569                 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2570                                              ev[3].sub_event_type << 8 |
2571                                                 ev[3].event_type,
2572                                              DLB_QE_EV_TYPE_WORD + 4);
2573
2574                 /* Store the metadata to memory (use the double-precision
2575                  * _mm_storeh_pd because there is no integer function for
2576                  * storing the upper 64b):
2577                  * qe[0] metadata = sse_qe[0][63:0]
2578                  * qe[1] metadata = sse_qe[0][127:64]
2579                  * qe[2] metadata = sse_qe[1][63:0]
2580                  * qe[3] metadata = sse_qe[1][127:64]
2581                  */
2582                 _mm_storel_epi64((__m128i *)&qe[0].u.opaque_data, sse_qe[0]);
2583                 _mm_storeh_pd((double *)&qe[1].u.opaque_data,
2584                               (__m128d) sse_qe[0]);
2585                 _mm_storel_epi64((__m128i *)&qe[2].u.opaque_data, sse_qe[1]);
2586                 _mm_storeh_pd((double *)&qe[3].u.opaque_data,
2587                               (__m128d) sse_qe[1]);
2588
2589                 qe[0].data = ev[0].u64;
2590                 qe[1].data = ev[1].u64;
2591                 qe[2].data = ev[2].u64;
2592                 qe[3].data = ev[3].u64;
2593
2594                 break;
2595         case 3:
2596         case 2:
2597         case 1:
2598                 for (i = 0; i < num; i++) {
2599                         qe[i].cmd_byte =
2600                                 cmd_byte_map[qm_port->is_directed][ev[i].op];
2601                         qe[i].sched_type = sched_type[i];
2602                         qe[i].data = ev[i].u64;
2603                         qe[i].qid = queue_id[i];
2604                         qe[i].priority = EV_TO_DLB_PRIO(ev[i].priority);
2605                         qe[i].lock_id = ev[i].flow_id;
2606                         if (sched_type[i] == DLB_SCHED_DIRECTED) {
2607                                 struct dlb_msg_info *info =
2608                                         (struct dlb_msg_info *)&qe[i].lock_id;
2609
2610                                 info->qid = queue_id[i];
2611                                 info->sched_type = DLB_SCHED_DIRECTED;
2612                                 info->priority = qe[i].priority;
2613                         }
2614                         qe[i].u.event_type.major = ev[i].event_type;
2615                         qe[i].u.event_type.sub = ev[i].sub_event_type;
2616                 }
2617                 break;
2618         case 0:
2619                 break;
2620         }
2621 }
2622
2623 static inline void
2624 dlb_construct_token_pop_qe(struct dlb_port *qm_port, int idx)
2625 {
2626         struct dlb_cq_pop_qe *qe = (void *)qm_port->qe4;
2627         int num = qm_port->owed_tokens;
2628
2629         if (qm_port->use_rsvd_token_scheme) {
2630                 /* Check if there's a deficit of reserved tokens, and return
2631                  * early if there are no (unreserved) tokens to consume.
2632                  */
2633                 if (num <= qm_port->cq_rsvd_token_deficit) {
2634                         qm_port->cq_rsvd_token_deficit -= num;
2635                         qm_port->owed_tokens = 0;
2636                         return;
2637                 }
2638                 num -= qm_port->cq_rsvd_token_deficit;
2639                 qm_port->cq_rsvd_token_deficit = 0;
2640         }
2641
2642         qe[idx].cmd_byte = DLB_POP_CMD_BYTE;
2643         qe[idx].tokens = num - 1;
2644         qm_port->owed_tokens = 0;
2645 }
2646
2647 static __rte_always_inline void
2648 dlb_pp_write(struct dlb_enqueue_qe *qe4,
2649              struct process_local_port_data *port_data)
2650 {
2651         dlb_movdir64b(port_data->pp_addr, qe4);
2652 }
2653
2654 static inline void
2655 dlb_hw_do_enqueue(struct dlb_port *qm_port,
2656                   bool do_sfence,
2657                   struct process_local_port_data *port_data)
2658 {
2659         DLB_LOG_DBG("dlb: Flushing QE(s) to DLB\n");
2660
2661         /* Since MOVDIR64B is weakly-ordered, use an SFENCE to ensure that
2662          * application writes complete before enqueueing the release HCW.
2663          */
2664         if (do_sfence)
2665                 rte_wmb();
2666
2667         dlb_pp_write(qm_port->qe4, port_data);
2668 }
2669
2670 static inline int
2671 dlb_consume_qe_immediate(struct dlb_port *qm_port, int num)
2672 {
2673         struct process_local_port_data *port_data;
2674         struct dlb_cq_pop_qe *qe;
2675
2676         RTE_ASSERT(qm_port->config_state == DLB_CONFIGURED);
2677
2678         if (qm_port->use_rsvd_token_scheme) {
2679                 /* Check if there's a deficit of reserved tokens, and return
2680                  * early if there are no (unreserved) tokens to consume.
2681                  */
2682                 if (num <= qm_port->cq_rsvd_token_deficit) {
2683                         qm_port->cq_rsvd_token_deficit -= num;
2684                         qm_port->owed_tokens = 0;
2685                         return 0;
2686                 }
2687                 num -= qm_port->cq_rsvd_token_deficit;
2688                 qm_port->cq_rsvd_token_deficit = 0;
2689         }
2690
2691         qe = qm_port->consume_qe;
2692
2693         qe->tokens = num - 1;
2694         qe->int_arm = 0;
2695
2696         /* No store fence needed since no pointer is being sent, and CQ token
2697          * pops can be safely reordered with other HCWs.
2698          */
2699         port_data = &dlb_port[qm_port->id][PORT_TYPE(qm_port)];
2700
2701         dlb_movntdq_single(port_data->pp_addr, qe);
2702
2703         DLB_LOG_DBG("dlb: consume immediate - %d QEs\n", num);
2704
2705         qm_port->owed_tokens = 0;
2706
2707         return 0;
2708 }
2709
2710 static inline uint16_t
2711 __dlb_event_enqueue_burst(void *event_port,
2712                           const struct rte_event events[],
2713                           uint16_t num,
2714                           bool use_delayed)
2715 {
2716         struct dlb_eventdev_port *ev_port = event_port;
2717         struct dlb_port *qm_port = &ev_port->qm_port;
2718         struct process_local_port_data *port_data;
2719         int i;
2720
2721         RTE_ASSERT(ev_port->enq_configured);
2722         RTE_ASSERT(events != NULL);
2723
2724         rte_errno = 0;
2725         i = 0;
2726
2727         port_data = &dlb_port[qm_port->id][PORT_TYPE(qm_port)];
2728
2729         while (i < num) {
2730                 uint8_t sched_types[DLB_NUM_QES_PER_CACHE_LINE];
2731                 uint8_t queue_ids[DLB_NUM_QES_PER_CACHE_LINE];
2732                 int pop_offs = 0;
2733                 int j = 0;
2734
2735                 memset(qm_port->qe4,
2736                        0,
2737                        DLB_NUM_QES_PER_CACHE_LINE *
2738                        sizeof(struct dlb_enqueue_qe));
2739
2740                 for (; j < DLB_NUM_QES_PER_CACHE_LINE && (i + j) < num; j++) {
2741                         const struct rte_event *ev = &events[i + j];
2742                         int16_t thresh = qm_port->token_pop_thresh;
2743
2744                         if (use_delayed &&
2745                             qm_port->token_pop_mode == DELAYED_POP &&
2746                             (ev->op == RTE_EVENT_OP_FORWARD ||
2747                              ev->op == RTE_EVENT_OP_RELEASE) &&
2748                             qm_port->issued_releases >= thresh - 1) {
2749                                 /* Insert the token pop QE and break out. This
2750                                  * may result in a partial HCW, but that is
2751                                  * simpler than supporting arbitrary QE
2752                                  * insertion.
2753                                  */
2754                                 dlb_construct_token_pop_qe(qm_port, j);
2755
2756                                 /* Reset the releases for the next QE batch */
2757                                 qm_port->issued_releases -= thresh;
2758
2759                                 /* When using delayed token pop mode, the
2760                                  * initial token threshold is the full CQ
2761                                  * depth. After the first token pop, we need to
2762                                  * reset it to the dequeue_depth.
2763                                  */
2764                                 qm_port->token_pop_thresh =
2765                                         qm_port->dequeue_depth;
2766
2767                                 pop_offs = 1;
2768                                 j++;
2769                                 break;
2770                         }
2771
2772                         if (dlb_event_enqueue_prep(ev_port, qm_port, ev,
2773                                                    port_data, &sched_types[j],
2774                                                    &queue_ids[j]))
2775                                 break;
2776                 }
2777
2778                 if (j == 0)
2779                         break;
2780
2781                 dlb_event_build_hcws(qm_port, &events[i], j - pop_offs,
2782                                      sched_types, queue_ids);
2783
2784                 dlb_hw_do_enqueue(qm_port, i == 0, port_data);
2785
2786                 /* Don't include the token pop QE in the enqueue count */
2787                 i += j - pop_offs;
2788
2789                 /* Don't interpret j < DLB_NUM_... as out-of-credits if
2790                  * pop_offs != 0
2791                  */
2792                 if (j < DLB_NUM_QES_PER_CACHE_LINE && pop_offs == 0)
2793                         break;
2794         }
2795
2796         RTE_ASSERT(!((i == 0 && rte_errno != -ENOSPC)));
2797
2798         return i;
2799 }
2800
2801 static inline uint16_t
2802 dlb_event_enqueue_burst(void *event_port,
2803                         const struct rte_event events[],
2804                         uint16_t num)
2805 {
2806         return __dlb_event_enqueue_burst(event_port, events, num, false);
2807 }
2808
2809 static inline uint16_t
2810 dlb_event_enqueue_burst_delayed(void *event_port,
2811                                 const struct rte_event events[],
2812                                 uint16_t num)
2813 {
2814         return __dlb_event_enqueue_burst(event_port, events, num, true);
2815 }
2816
2817 static inline uint16_t
2818 dlb_event_enqueue(void *event_port,
2819                   const struct rte_event events[])
2820 {
2821         return __dlb_event_enqueue_burst(event_port, events, 1, false);
2822 }
2823
2824 static inline uint16_t
2825 dlb_event_enqueue_delayed(void *event_port,
2826                           const struct rte_event events[])
2827 {
2828         return __dlb_event_enqueue_burst(event_port, events, 1, true);
2829 }
2830
2831 static uint16_t
2832 dlb_event_enqueue_new_burst(void *event_port,
2833                             const struct rte_event events[],
2834                             uint16_t num)
2835 {
2836         return __dlb_event_enqueue_burst(event_port, events, num, false);
2837 }
2838
2839 static uint16_t
2840 dlb_event_enqueue_new_burst_delayed(void *event_port,
2841                                     const struct rte_event events[],
2842                                     uint16_t num)
2843 {
2844         return __dlb_event_enqueue_burst(event_port, events, num, true);
2845 }
2846
2847 static uint16_t
2848 dlb_event_enqueue_forward_burst(void *event_port,
2849                                 const struct rte_event events[],
2850                                 uint16_t num)
2851 {
2852         return __dlb_event_enqueue_burst(event_port, events, num, false);
2853 }
2854
2855 static uint16_t
2856 dlb_event_enqueue_forward_burst_delayed(void *event_port,
2857                                         const struct rte_event events[],
2858                                         uint16_t num)
2859 {
2860         return __dlb_event_enqueue_burst(event_port, events, num, true);
2861 }
2862
2863 static __rte_always_inline int
2864 dlb_recv_qe(struct dlb_port *qm_port, struct dlb_dequeue_qe *qe,
2865             uint8_t *offset)
2866 {
2867         uint8_t xor_mask[2][4] = { {0x0F, 0x0E, 0x0C, 0x08},
2868                                    {0x00, 0x01, 0x03, 0x07} };
2869         uint8_t and_mask[4] = {0x0F, 0x0E, 0x0C, 0x08};
2870         volatile struct dlb_dequeue_qe *cq_addr;
2871         __m128i *qes = (__m128i *)qe;
2872         uint64_t *cache_line_base;
2873         uint8_t gen_bits;
2874
2875         cq_addr = dlb_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
2876         cq_addr = &cq_addr[qm_port->cq_idx];
2877
2878         cache_line_base = (void *)(((uintptr_t)cq_addr) & ~0x3F);
2879         *offset = ((uintptr_t)cq_addr & 0x30) >> 4;
2880
2881         /* Load the next CQ cache line from memory. Pack these reads as tight
2882          * as possible to reduce the chance that DLB invalidates the line while
2883          * the CPU is reading it. Read the cache line backwards to ensure that
2884          * if QE[N] (N > 0) is valid, then QEs[0:N-1] are too.
2885          *
2886          * (Valid QEs start at &qe[offset])
2887          */
2888         qes[3] = _mm_load_si128((__m128i *)&cache_line_base[6]);
2889         qes[2] = _mm_load_si128((__m128i *)&cache_line_base[4]);
2890         qes[1] = _mm_load_si128((__m128i *)&cache_line_base[2]);
2891         qes[0] = _mm_load_si128((__m128i *)&cache_line_base[0]);
2892
2893         /* Evict the cache line ASAP */
2894         rte_cldemote(cache_line_base);
2895
2896         /* Extract and combine the gen bits */
2897         gen_bits = ((_mm_extract_epi8(qes[0], 15) & 0x1) << 0) |
2898                    ((_mm_extract_epi8(qes[1], 15) & 0x1) << 1) |
2899                    ((_mm_extract_epi8(qes[2], 15) & 0x1) << 2) |
2900                    ((_mm_extract_epi8(qes[3], 15) & 0x1) << 3);
2901
2902         /* XOR the combined bits such that a 1 represents a valid QE */
2903         gen_bits ^= xor_mask[qm_port->gen_bit][*offset];
2904
2905         /* Mask off gen bits we don't care about */
2906         gen_bits &= and_mask[*offset];
2907
2908         return __builtin_popcount(gen_bits);
2909 }
2910
2911 static inline void
2912 dlb_inc_cq_idx(struct dlb_port *qm_port, int cnt)
2913 {
2914         uint16_t idx = qm_port->cq_idx_unmasked + cnt;
2915
2916         qm_port->cq_idx_unmasked = idx;
2917         qm_port->cq_idx = idx & qm_port->cq_depth_mask;
2918         qm_port->gen_bit = (~(idx >> qm_port->gen_bit_shift)) & 0x1;
2919 }
2920
2921 static inline int
2922 dlb_process_dequeue_qes(struct dlb_eventdev_port *ev_port,
2923                         struct dlb_port *qm_port,
2924                         struct rte_event *events,
2925                         struct dlb_dequeue_qe *qes,
2926                         int cnt)
2927 {
2928         uint8_t *qid_mappings = qm_port->qid_mappings;
2929         int i, num;
2930
2931         RTE_SET_USED(ev_port);  /* avoids unused variable error */
2932
2933         for (i = 0, num = 0; i < cnt; i++) {
2934                 struct dlb_dequeue_qe *qe = &qes[i];
2935                 int sched_type_map[4] = {
2936                         [DLB_SCHED_ATOMIC] = RTE_SCHED_TYPE_ATOMIC,
2937                         [DLB_SCHED_UNORDERED] = RTE_SCHED_TYPE_PARALLEL,
2938                         [DLB_SCHED_ORDERED] = RTE_SCHED_TYPE_ORDERED,
2939                         [DLB_SCHED_DIRECTED] = RTE_SCHED_TYPE_ATOMIC,
2940                 };
2941
2942                 DLB_LOG_DBG("dequeue success, data = 0x%llx, qid=%d, event_type=%d, subevent=%d\npp_id = %d, sched_type = %d, qid = %d, err=%d\n",
2943                             (long long)qe->data, qe->qid,
2944                             qe->u.event_type.major,
2945                             qe->u.event_type.sub,
2946                             qe->pp_id, qe->sched_type, qe->qid, qe->error);
2947
2948                 /* Fill in event information.
2949                  * Note that flow_id must be embedded in the data by
2950                  * the app, such as the mbuf RSS hash field if the data
2951                  * buffer is a mbuf.
2952                  */
2953                 if (unlikely(qe->error)) {
2954                         DLB_LOG_ERR("QE error bit ON\n");
2955                         DLB_INC_STAT(ev_port->stats.traffic.rx_drop, 1);
2956                         dlb_consume_qe_immediate(qm_port, 1);
2957                         continue; /* Ignore */
2958                 }
2959
2960                 events[num].u64 = qe->data;
2961                 events[num].queue_id = qid_mappings[qe->qid];
2962                 events[num].priority = DLB_TO_EV_PRIO((uint8_t)qe->priority);
2963                 events[num].event_type = qe->u.event_type.major;
2964                 events[num].sub_event_type = qe->u.event_type.sub;
2965                 events[num].sched_type = sched_type_map[qe->sched_type];
2966                 DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qe->sched_type], 1);
2967                 num++;
2968         }
2969         DLB_INC_STAT(ev_port->stats.traffic.rx_ok, num);
2970
2971         return num;
2972 }
2973
2974 static inline int
2975 dlb_process_dequeue_four_qes(struct dlb_eventdev_port *ev_port,
2976                              struct dlb_port *qm_port,
2977                              struct rte_event *events,
2978                              struct dlb_dequeue_qe *qes)
2979 {
2980         int sched_type_map[] = {
2981                 [DLB_SCHED_ATOMIC] = RTE_SCHED_TYPE_ATOMIC,
2982                 [DLB_SCHED_UNORDERED] = RTE_SCHED_TYPE_PARALLEL,
2983                 [DLB_SCHED_ORDERED] = RTE_SCHED_TYPE_ORDERED,
2984                 [DLB_SCHED_DIRECTED] = RTE_SCHED_TYPE_ATOMIC,
2985         };
2986         const int num_events = DLB_NUM_QES_PER_CACHE_LINE;
2987         uint8_t *qid_mappings = qm_port->qid_mappings;
2988         __m128i sse_evt[2];
2989         int i;
2990
2991         /* In the unlikely case that any of the QE error bits are set, process
2992          * them one at a time.
2993          */
2994         if (unlikely(qes[0].error || qes[1].error ||
2995                      qes[2].error || qes[3].error))
2996                 return dlb_process_dequeue_qes(ev_port, qm_port, events,
2997                                                qes, num_events);
2998
2999         for (i = 0; i < DLB_NUM_QES_PER_CACHE_LINE; i++) {
3000                 DLB_LOG_DBG("dequeue success, data = 0x%llx, qid=%d, event_type=%d, subevent=%d\npp_id = %d, sched_type = %d, qid = %d, err=%d\n",
3001                             (long long)qes[i].data, qes[i].qid,
3002                             qes[i].u.event_type.major,
3003                             qes[i].u.event_type.sub,
3004                             qes[i].pp_id, qes[i].sched_type, qes[i].qid,
3005                             qes[i].error);
3006         }
3007
3008         events[0].u64 = qes[0].data;
3009         events[1].u64 = qes[1].data;
3010         events[2].u64 = qes[2].data;
3011         events[3].u64 = qes[3].data;
3012
3013         /* Construct the metadata portion of two struct rte_events
3014          * in one 128b SSE register. Event metadata is constructed in the SSE
3015          * registers like so:
3016          * sse_evt[0][63:0]:   event[0]'s metadata
3017          * sse_evt[0][127:64]: event[1]'s metadata
3018          * sse_evt[1][63:0]:   event[2]'s metadata
3019          * sse_evt[1][127:64]: event[3]'s metadata
3020          */
3021         sse_evt[0] = _mm_setzero_si128();
3022         sse_evt[1] = _mm_setzero_si128();
3023
3024         /* Convert the hardware queue ID to an event queue ID and store it in
3025          * the metadata:
3026          * sse_evt[0][47:40]   = qid_mappings[qes[0].qid]
3027          * sse_evt[0][111:104] = qid_mappings[qes[1].qid]
3028          * sse_evt[1][47:40]   = qid_mappings[qes[2].qid]
3029          * sse_evt[1][111:104] = qid_mappings[qes[3].qid]
3030          */
3031 #define DLB_EVENT_QUEUE_ID_BYTE 5
3032         sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3033                                      qid_mappings[qes[0].qid],
3034                                      DLB_EVENT_QUEUE_ID_BYTE);
3035         sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3036                                      qid_mappings[qes[1].qid],
3037                                      DLB_EVENT_QUEUE_ID_BYTE + 8);
3038         sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3039                                      qid_mappings[qes[2].qid],
3040                                      DLB_EVENT_QUEUE_ID_BYTE);
3041         sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3042                                      qid_mappings[qes[3].qid],
3043                                      DLB_EVENT_QUEUE_ID_BYTE + 8);
3044
3045         /* Convert the hardware priority to an event priority and store it in
3046          * the metadata:
3047          * sse_evt[0][55:48]   = DLB_TO_EV_PRIO(qes[0].priority)
3048          * sse_evt[0][119:112] = DLB_TO_EV_PRIO(qes[1].priority)
3049          * sse_evt[1][55:48]   = DLB_TO_EV_PRIO(qes[2].priority)
3050          * sse_evt[1][119:112] = DLB_TO_EV_PRIO(qes[3].priority)
3051          */
3052 #define DLB_EVENT_PRIO_BYTE 6
3053         sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3054                                      DLB_TO_EV_PRIO((uint8_t)qes[0].priority),
3055                                      DLB_EVENT_PRIO_BYTE);
3056         sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3057                                      DLB_TO_EV_PRIO((uint8_t)qes[1].priority),
3058                                      DLB_EVENT_PRIO_BYTE + 8);
3059         sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3060                                      DLB_TO_EV_PRIO((uint8_t)qes[2].priority),
3061                                      DLB_EVENT_PRIO_BYTE);
3062         sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3063                                      DLB_TO_EV_PRIO((uint8_t)qes[3].priority),
3064                                      DLB_EVENT_PRIO_BYTE + 8);
3065
3066         /* Write the event type and sub event type to the event metadata. Leave
3067          * flow ID unspecified, since the hardware does not maintain it during
3068          * scheduling:
3069          * sse_evt[0][31:0]   = qes[0].u.event_type.major << 28 |
3070          *                      qes[0].u.event_type.sub << 20;
3071          * sse_evt[0][95:64]  = qes[1].u.event_type.major << 28 |
3072          *                      qes[1].u.event_type.sub << 20;
3073          * sse_evt[1][31:0]   = qes[2].u.event_type.major << 28 |
3074          *                      qes[2].u.event_type.sub << 20;
3075          * sse_evt[1][95:64]  = qes[3].u.event_type.major << 28 |
3076          *                      qes[3].u.event_type.sub << 20;
3077          */
3078 #define DLB_EVENT_EV_TYPE_DW 0
3079 #define DLB_EVENT_EV_TYPE_SHIFT 28
3080 #define DLB_EVENT_SUB_EV_TYPE_SHIFT 20
3081         sse_evt[0] = _mm_insert_epi32(sse_evt[0],
3082                         qes[0].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT |
3083                         qes[0].u.event_type.sub << DLB_EVENT_SUB_EV_TYPE_SHIFT,
3084                         DLB_EVENT_EV_TYPE_DW);
3085         sse_evt[0] = _mm_insert_epi32(sse_evt[0],
3086                         qes[1].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT |
3087                         qes[1].u.event_type.sub <<  DLB_EVENT_SUB_EV_TYPE_SHIFT,
3088                         DLB_EVENT_EV_TYPE_DW + 2);
3089         sse_evt[1] = _mm_insert_epi32(sse_evt[1],
3090                         qes[2].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT |
3091                         qes[2].u.event_type.sub <<  DLB_EVENT_SUB_EV_TYPE_SHIFT,
3092                         DLB_EVENT_EV_TYPE_DW);
3093         sse_evt[1] = _mm_insert_epi32(sse_evt[1],
3094                         qes[3].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT  |
3095                         qes[3].u.event_type.sub << DLB_EVENT_SUB_EV_TYPE_SHIFT,
3096                         DLB_EVENT_EV_TYPE_DW + 2);
3097
3098         /* Write the sched type to the event metadata. 'op' and 'rsvd' are not
3099          * set:
3100          * sse_evt[0][39:32]  = sched_type_map[qes[0].sched_type] << 6
3101          * sse_evt[0][103:96] = sched_type_map[qes[1].sched_type] << 6
3102          * sse_evt[1][39:32]  = sched_type_map[qes[2].sched_type] << 6
3103          * sse_evt[1][103:96] = sched_type_map[qes[3].sched_type] << 6
3104          */
3105 #define DLB_EVENT_SCHED_TYPE_BYTE 4
3106 #define DLB_EVENT_SCHED_TYPE_SHIFT 6
3107         sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3108                 sched_type_map[qes[0].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
3109                 DLB_EVENT_SCHED_TYPE_BYTE);
3110         sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3111                 sched_type_map[qes[1].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
3112                 DLB_EVENT_SCHED_TYPE_BYTE + 8);
3113         sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3114                 sched_type_map[qes[2].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
3115                 DLB_EVENT_SCHED_TYPE_BYTE);
3116         sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3117                 sched_type_map[qes[3].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
3118                 DLB_EVENT_SCHED_TYPE_BYTE + 8);
3119
3120         /* Store the metadata to the event (use the double-precision
3121          * _mm_storeh_pd because there is no integer function for storing the
3122          * upper 64b):
3123          * events[0].event = sse_evt[0][63:0]
3124          * events[1].event = sse_evt[0][127:64]
3125          * events[2].event = sse_evt[1][63:0]
3126          * events[3].event = sse_evt[1][127:64]
3127          */
3128         _mm_storel_epi64((__m128i *)&events[0].event, sse_evt[0]);
3129         _mm_storeh_pd((double *)&events[1].event, (__m128d) sse_evt[0]);
3130         _mm_storel_epi64((__m128i *)&events[2].event, sse_evt[1]);
3131         _mm_storeh_pd((double *)&events[3].event, (__m128d) sse_evt[1]);
3132
3133         DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qes[0].sched_type], 1);
3134         DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qes[1].sched_type], 1);
3135         DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qes[2].sched_type], 1);
3136         DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qes[3].sched_type], 1);
3137
3138         DLB_INC_STAT(ev_port->stats.traffic.rx_ok, num_events);
3139
3140         return num_events;
3141 }
3142
3143 static inline int
3144 dlb_dequeue_wait(struct dlb_eventdev *dlb,
3145                  struct dlb_eventdev_port *ev_port,
3146                  struct dlb_port *qm_port,
3147                  uint64_t timeout,
3148                  uint64_t start_ticks)
3149 {
3150         struct process_local_port_data *port_data;
3151         uint64_t elapsed_ticks;
3152
3153         port_data = &dlb_port[qm_port->id][PORT_TYPE(qm_port)];
3154
3155         elapsed_ticks = rte_get_timer_cycles() - start_ticks;
3156
3157         /* Wait/poll time expired */
3158         if (elapsed_ticks >= timeout) {
3159                 /* Interrupts not supported by PF PMD */
3160                 return 1;
3161         } else if (dlb->umwait_allowed) {
3162                 volatile struct dlb_dequeue_qe *cq_base;
3163                 union {
3164                         uint64_t raw_qe[2];
3165                         struct dlb_dequeue_qe qe;
3166                 } qe_mask;
3167                 uint64_t expected_value;
3168                 volatile uint64_t *monitor_addr;
3169
3170                 qe_mask.qe.cq_gen = 1; /* set mask */
3171
3172                 cq_base = port_data->cq_base;
3173                 monitor_addr = (volatile uint64_t *)(volatile void *)
3174                         &cq_base[qm_port->cq_idx];
3175                 monitor_addr++; /* cq_gen bit is in second 64bit location */
3176
3177                 if (qm_port->gen_bit)
3178                         expected_value = qe_mask.raw_qe[1];
3179                 else
3180                         expected_value = 0;
3181
3182                 rte_power_monitor(monitor_addr, expected_value,
3183                                   qe_mask.raw_qe[1], timeout + start_ticks,
3184                                   sizeof(uint64_t));
3185
3186                 DLB_INC_STAT(ev_port->stats.traffic.rx_umonitor_umwait, 1);
3187         } else {
3188                 uint64_t poll_interval = RTE_LIBRTE_PMD_DLB_POLL_INTERVAL;
3189                 uint64_t curr_ticks = rte_get_timer_cycles();
3190                 uint64_t init_ticks = curr_ticks;
3191
3192                 while ((curr_ticks - start_ticks < timeout) &&
3193                        (curr_ticks - init_ticks < poll_interval))
3194                         curr_ticks = rte_get_timer_cycles();
3195         }
3196
3197         return 0;
3198 }
3199
3200 static inline int16_t
3201 dlb_hw_dequeue(struct dlb_eventdev *dlb,
3202                struct dlb_eventdev_port *ev_port,
3203                struct rte_event *events,
3204                uint16_t max_num,
3205                uint64_t dequeue_timeout_ticks)
3206 {
3207         uint64_t timeout;
3208         uint64_t start_ticks = 0ULL;
3209         struct dlb_port *qm_port;
3210         int num = 0;
3211
3212         qm_port = &ev_port->qm_port;
3213
3214         /* If configured for per dequeue wait, then use wait value provided
3215          * to this API. Otherwise we must use the global
3216          * value from eventdev config time.
3217          */
3218         if (!dlb->global_dequeue_wait)
3219                 timeout = dequeue_timeout_ticks;
3220         else
3221                 timeout = dlb->global_dequeue_wait_ticks;
3222
3223         if (timeout)
3224                 start_ticks = rte_get_timer_cycles();
3225
3226         while (num < max_num) {
3227                 struct dlb_dequeue_qe qes[DLB_NUM_QES_PER_CACHE_LINE];
3228                 uint8_t offset;
3229                 int num_avail;
3230
3231                 /* Copy up to 4 QEs from the current cache line into qes */
3232                 num_avail = dlb_recv_qe(qm_port, qes, &offset);
3233
3234                 /* But don't process more than the user requested */
3235                 num_avail = RTE_MIN(num_avail, max_num - num);
3236
3237                 dlb_inc_cq_idx(qm_port, num_avail);
3238
3239                 if (num_avail == DLB_NUM_QES_PER_CACHE_LINE)
3240                         num += dlb_process_dequeue_four_qes(ev_port,
3241                                                              qm_port,
3242                                                              &events[num],
3243                                                              &qes[offset]);
3244                 else if (num_avail)
3245                         num += dlb_process_dequeue_qes(ev_port,
3246                                                         qm_port,
3247                                                         &events[num],
3248                                                         &qes[offset],
3249                                                         num_avail);
3250                 else if ((timeout == 0) || (num > 0))
3251                         /* Not waiting in any form, or 1+ events received? */
3252                         break;
3253                 else if (dlb_dequeue_wait(dlb, ev_port, qm_port,
3254                                           timeout, start_ticks))
3255                         break;
3256         }
3257
3258         qm_port->owed_tokens += num;
3259
3260         if (num && qm_port->token_pop_mode == AUTO_POP)
3261                 dlb_consume_qe_immediate(qm_port, num);
3262
3263         ev_port->outstanding_releases += num;
3264
3265         return num;
3266 }
3267
3268 static __rte_always_inline int
3269 dlb_recv_qe_sparse(struct dlb_port *qm_port, struct dlb_dequeue_qe *qe)
3270 {
3271         volatile struct dlb_dequeue_qe *cq_addr;
3272         uint8_t xor_mask[2] = {0x0F, 0x00};
3273         const uint8_t and_mask = 0x0F;
3274         __m128i *qes = (__m128i *)qe;
3275         uint8_t gen_bits, gen_bit;
3276         uintptr_t addr[4];
3277         uint16_t idx;
3278
3279         cq_addr = dlb_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
3280
3281         idx = qm_port->cq_idx;
3282
3283         /* Load the next 4 QEs */
3284         addr[0] = (uintptr_t)&cq_addr[idx];
3285         addr[1] = (uintptr_t)&cq_addr[(idx +  4) & qm_port->cq_depth_mask];
3286         addr[2] = (uintptr_t)&cq_addr[(idx +  8) & qm_port->cq_depth_mask];
3287         addr[3] = (uintptr_t)&cq_addr[(idx + 12) & qm_port->cq_depth_mask];
3288
3289         /* Prefetch next batch of QEs (all CQs occupy minimum 8 cache lines) */
3290         rte_prefetch0(&cq_addr[(idx + 16) & qm_port->cq_depth_mask]);
3291         rte_prefetch0(&cq_addr[(idx + 20) & qm_port->cq_depth_mask]);
3292         rte_prefetch0(&cq_addr[(idx + 24) & qm_port->cq_depth_mask]);
3293         rte_prefetch0(&cq_addr[(idx + 28) & qm_port->cq_depth_mask]);
3294
3295         /* Correct the xor_mask for wrap-around QEs */
3296         gen_bit = qm_port->gen_bit;
3297         xor_mask[gen_bit] ^= !!((idx +  4) > qm_port->cq_depth_mask) << 1;
3298         xor_mask[gen_bit] ^= !!((idx +  8) > qm_port->cq_depth_mask) << 2;
3299         xor_mask[gen_bit] ^= !!((idx + 12) > qm_port->cq_depth_mask) << 3;
3300
3301         /* Read the cache lines backwards to ensure that if QE[N] (N > 0) is
3302          * valid, then QEs[0:N-1] are too.
3303          */
3304         qes[3] = _mm_load_si128((__m128i *)(void *)addr[3]);
3305         rte_compiler_barrier();
3306         qes[2] = _mm_load_si128((__m128i *)(void *)addr[2]);
3307         rte_compiler_barrier();
3308         qes[1] = _mm_load_si128((__m128i *)(void *)addr[1]);
3309         rte_compiler_barrier();
3310         qes[0] = _mm_load_si128((__m128i *)(void *)addr[0]);
3311
3312         /* Extract and combine the gen bits */
3313         gen_bits = ((_mm_extract_epi8(qes[0], 15) & 0x1) << 0) |
3314                    ((_mm_extract_epi8(qes[1], 15) & 0x1) << 1) |
3315                    ((_mm_extract_epi8(qes[2], 15) & 0x1) << 2) |
3316                    ((_mm_extract_epi8(qes[3], 15) & 0x1) << 3);
3317
3318         /* XOR the combined bits such that a 1 represents a valid QE */
3319         gen_bits ^= xor_mask[gen_bit];
3320
3321         /* Mask off gen bits we don't care about */
3322         gen_bits &= and_mask;
3323
3324         return __builtin_popcount(gen_bits);
3325 }
3326
3327 static inline int16_t
3328 dlb_hw_dequeue_sparse(struct dlb_eventdev *dlb,
3329                       struct dlb_eventdev_port *ev_port,
3330                       struct rte_event *events,
3331                       uint16_t max_num,
3332                       uint64_t dequeue_timeout_ticks)
3333 {
3334         uint64_t timeout;
3335         uint64_t start_ticks = 0ULL;
3336         struct dlb_port *qm_port;
3337         int num = 0;
3338
3339         qm_port = &ev_port->qm_port;
3340
3341         /* If configured for per dequeue wait, then use wait value provided
3342          * to this API. Otherwise we must use the global
3343          * value from eventdev config time.
3344          */
3345         if (!dlb->global_dequeue_wait)
3346                 timeout = dequeue_timeout_ticks;
3347         else
3348                 timeout = dlb->global_dequeue_wait_ticks;
3349
3350         if (timeout)
3351                 start_ticks = rte_get_timer_cycles();
3352
3353         while (num < max_num) {
3354                 struct dlb_dequeue_qe qes[DLB_NUM_QES_PER_CACHE_LINE];
3355                 int num_avail;
3356
3357                 /* Copy up to 4 QEs from the current cache line into qes */
3358                 num_avail = dlb_recv_qe_sparse(qm_port, qes);
3359
3360                 /* But don't process more than the user requested */
3361                 num_avail = RTE_MIN(num_avail, max_num - num);
3362
3363                 dlb_inc_cq_idx(qm_port, num_avail << 2);
3364
3365                 if (num_avail == DLB_NUM_QES_PER_CACHE_LINE)
3366                         num += dlb_process_dequeue_four_qes(ev_port,
3367                                                              qm_port,
3368                                                              &events[num],
3369                                                              &qes[0]);
3370                 else if (num_avail)
3371                         num += dlb_process_dequeue_qes(ev_port,
3372                                                         qm_port,
3373                                                         &events[num],
3374                                                         &qes[0],
3375                                                         num_avail);
3376                 else if ((timeout == 0) || (num > 0))
3377                         /* Not waiting in any form, or 1+ events received? */
3378                         break;
3379                 else if (dlb_dequeue_wait(dlb, ev_port, qm_port,
3380                                           timeout, start_ticks))
3381                         break;
3382         }
3383
3384         qm_port->owed_tokens += num;
3385
3386         if (num && qm_port->token_pop_mode == AUTO_POP)
3387                 dlb_consume_qe_immediate(qm_port, num);
3388
3389         ev_port->outstanding_releases += num;
3390
3391         return num;
3392 }
3393
3394 static int
3395 dlb_event_release(struct dlb_eventdev *dlb, uint8_t port_id, int n)
3396 {
3397         struct process_local_port_data *port_data;
3398         struct dlb_eventdev_port *ev_port;
3399         struct dlb_port *qm_port;
3400         int i;
3401
3402         if (port_id > dlb->num_ports) {
3403                 DLB_LOG_ERR("Invalid port id %d in dlb-event_release\n",
3404                             port_id);
3405                 rte_errno = -EINVAL;
3406                 return rte_errno;
3407         }
3408
3409         ev_port = &dlb->ev_ports[port_id];
3410         qm_port = &ev_port->qm_port;
3411         port_data = &dlb_port[qm_port->id][PORT_TYPE(qm_port)];
3412
3413         i = 0;
3414
3415         if (qm_port->is_directed) {
3416                 i = n;
3417                 goto sw_credit_update;
3418         }
3419
3420         while (i < n) {
3421                 int pop_offs = 0;
3422                 int j = 0;
3423
3424                 /* Zero-out QEs */
3425                 qm_port->qe4[0].cmd_byte = 0;
3426                 qm_port->qe4[1].cmd_byte = 0;
3427                 qm_port->qe4[2].cmd_byte = 0;
3428                 qm_port->qe4[3].cmd_byte = 0;
3429
3430                 for (; j < DLB_NUM_QES_PER_CACHE_LINE && (i + j) < n; j++) {
3431                         int16_t thresh = qm_port->token_pop_thresh;
3432
3433                         if (qm_port->token_pop_mode == DELAYED_POP &&
3434                             qm_port->issued_releases >= thresh - 1) {
3435                                 /* Insert the token pop QE */
3436                                 dlb_construct_token_pop_qe(qm_port, j);
3437
3438                                 /* Reset the releases for the next QE batch */
3439                                 qm_port->issued_releases -= thresh;
3440
3441                                 /* When using delayed token pop mode, the
3442                                  * initial token threshold is the full CQ
3443                                  * depth. After the first token pop, we need to
3444                                  * reset it to the dequeue_depth.
3445                                  */
3446                                 qm_port->token_pop_thresh =
3447                                         qm_port->dequeue_depth;
3448
3449                                 pop_offs = 1;
3450                                 j++;
3451                                 break;
3452                         }
3453
3454                         qm_port->qe4[j].cmd_byte = DLB_COMP_CMD_BYTE;
3455                         qm_port->issued_releases++;
3456                 }
3457
3458                 dlb_hw_do_enqueue(qm_port, i == 0, port_data);
3459
3460                 /* Don't include the token pop QE in the release count */
3461                 i += j - pop_offs;
3462         }
3463
3464 sw_credit_update:
3465         /* each release returns one credit */
3466         if (!ev_port->outstanding_releases) {
3467                 DLB_LOG_ERR("Unrecoverable application error. Outstanding releases underflowed.\n");
3468                 rte_errno = -ENOTRECOVERABLE;
3469                 return rte_errno;
3470         }
3471
3472         ev_port->outstanding_releases -= i;
3473         ev_port->inflight_credits += i;
3474
3475         /* Replenish s/w credits if enough releases are performed */
3476         dlb_replenish_sw_credits(dlb, ev_port);
3477         return 0;
3478 }
3479
3480 static uint16_t
3481 dlb_event_dequeue_burst(void *event_port, struct rte_event *ev, uint16_t num,
3482                         uint64_t wait)
3483 {
3484         struct dlb_eventdev_port *ev_port = event_port;
3485         struct dlb_port *qm_port = &ev_port->qm_port;
3486         struct dlb_eventdev *dlb = ev_port->dlb;
3487         uint16_t cnt;
3488         int ret;
3489
3490         rte_errno = 0;
3491
3492         RTE_ASSERT(ev_port->setup_done);
3493         RTE_ASSERT(ev != NULL);
3494
3495         if (ev_port->implicit_release && ev_port->outstanding_releases > 0) {
3496                 uint16_t out_rels = ev_port->outstanding_releases;
3497
3498                 ret = dlb_event_release(dlb, ev_port->id, out_rels);
3499                 if (ret)
3500                         return(ret);
3501
3502                 DLB_INC_STAT(ev_port->stats.tx_implicit_rel, out_rels);
3503         }
3504
3505         if (qm_port->token_pop_mode == DEFERRED_POP &&
3506                         qm_port->owed_tokens)
3507                 dlb_consume_qe_immediate(qm_port, qm_port->owed_tokens);
3508
3509         cnt = dlb_hw_dequeue(dlb, ev_port, ev, num, wait);
3510
3511         DLB_INC_STAT(ev_port->stats.traffic.total_polls, 1);
3512         DLB_INC_STAT(ev_port->stats.traffic.zero_polls, ((cnt == 0) ? 1 : 0));
3513         return cnt;
3514 }
3515
3516 static uint16_t
3517 dlb_event_dequeue(void *event_port, struct rte_event *ev, uint64_t wait)
3518 {
3519         return dlb_event_dequeue_burst(event_port, ev, 1, wait);
3520 }
3521
3522 static uint16_t
3523 dlb_event_dequeue_burst_sparse(void *event_port, struct rte_event *ev,
3524                                uint16_t num, uint64_t wait)
3525 {
3526         struct dlb_eventdev_port *ev_port = event_port;
3527         struct dlb_port *qm_port = &ev_port->qm_port;
3528         struct dlb_eventdev *dlb = ev_port->dlb;
3529         uint16_t cnt;
3530         int ret;
3531
3532         rte_errno = 0;
3533
3534         RTE_ASSERT(ev_port->setup_done);
3535         RTE_ASSERT(ev != NULL);
3536
3537         if (ev_port->implicit_release && ev_port->outstanding_releases > 0) {
3538                 uint16_t out_rels = ev_port->outstanding_releases;
3539
3540                 ret = dlb_event_release(dlb, ev_port->id, out_rels);
3541                 if (ret)
3542                         return(ret);
3543
3544                 DLB_INC_STAT(ev_port->stats.tx_implicit_rel, out_rels);
3545         }
3546
3547         if (qm_port->token_pop_mode == DEFERRED_POP &&
3548             qm_port->owed_tokens)
3549                 dlb_consume_qe_immediate(qm_port, qm_port->owed_tokens);
3550
3551         cnt = dlb_hw_dequeue_sparse(dlb, ev_port, ev, num, wait);
3552
3553         DLB_INC_STAT(ev_port->stats.traffic.total_polls, 1);
3554         DLB_INC_STAT(ev_port->stats.traffic.zero_polls, ((cnt == 0) ? 1 : 0));
3555         return cnt;
3556 }
3557
3558 static uint16_t
3559 dlb_event_dequeue_sparse(void *event_port, struct rte_event *ev, uint64_t wait)
3560 {
3561         return dlb_event_dequeue_burst_sparse(event_port, ev, 1, wait);
3562 }
3563
3564 static uint32_t
3565 dlb_get_ldb_queue_depth(struct dlb_eventdev *dlb,
3566                         struct dlb_eventdev_queue *queue)
3567 {
3568         struct dlb_hw_dev *handle = &dlb->qm_instance;
3569         struct dlb_get_ldb_queue_depth_args cfg;
3570         struct dlb_cmd_response response;
3571         int ret;
3572
3573         cfg.queue_id = queue->qm_queue.id;
3574         cfg.response = (uintptr_t)&response;
3575
3576         ret = dlb_iface_get_ldb_queue_depth(handle, &cfg);
3577         if (ret < 0) {
3578                 DLB_LOG_ERR("dlb: get_ldb_queue_depth ret=%d (driver status: %s)\n",
3579                             ret, dlb_error_strings[response.status]);
3580                 return ret;
3581         }
3582
3583         return response.id;
3584 }
3585
3586 static uint32_t
3587 dlb_get_dir_queue_depth(struct dlb_eventdev *dlb,
3588                         struct dlb_eventdev_queue *queue)
3589 {
3590         struct dlb_hw_dev *handle = &dlb->qm_instance;
3591         struct dlb_get_dir_queue_depth_args cfg;
3592         struct dlb_cmd_response response;
3593         int ret;
3594
3595         cfg.queue_id = queue->qm_queue.id;
3596         cfg.response = (uintptr_t)&response;
3597
3598         ret = dlb_iface_get_dir_queue_depth(handle, &cfg);
3599         if (ret < 0) {
3600                 DLB_LOG_ERR("dlb: get_dir_queue_depth ret=%d (driver status: %s)\n",
3601                             ret, dlb_error_strings[response.status]);
3602                 return ret;
3603         }
3604
3605         return response.id;
3606 }
3607
3608 uint32_t
3609 dlb_get_queue_depth(struct dlb_eventdev *dlb,
3610                     struct dlb_eventdev_queue *queue)
3611 {
3612         if (queue->qm_queue.is_directed)
3613                 return dlb_get_dir_queue_depth(dlb, queue);
3614         else
3615                 return dlb_get_ldb_queue_depth(dlb, queue);
3616 }
3617
3618 static bool
3619 dlb_queue_is_empty(struct dlb_eventdev *dlb,
3620                    struct dlb_eventdev_queue *queue)
3621 {
3622         return dlb_get_queue_depth(dlb, queue) == 0;
3623 }
3624
3625 static bool
3626 dlb_linked_queues_empty(struct dlb_eventdev *dlb)
3627 {
3628         int i;
3629
3630         for (i = 0; i < dlb->num_queues; i++) {
3631                 if (dlb->ev_queues[i].num_links == 0)
3632                         continue;
3633                 if (!dlb_queue_is_empty(dlb, &dlb->ev_queues[i]))
3634                         return false;
3635         }
3636
3637         return true;
3638 }
3639
3640 static bool
3641 dlb_queues_empty(struct dlb_eventdev *dlb)
3642 {
3643         int i;
3644
3645         for (i = 0; i < dlb->num_queues; i++) {
3646                 if (!dlb_queue_is_empty(dlb, &dlb->ev_queues[i]))
3647                         return false;
3648         }
3649
3650         return true;
3651 }
3652
3653 static void
3654 dlb_flush_port(struct rte_eventdev *dev, int port_id)
3655 {
3656         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
3657         eventdev_stop_flush_t flush;
3658         struct rte_event ev;
3659         uint8_t dev_id;
3660         void *arg;
3661         int i;
3662
3663         flush = dev->dev_ops->dev_stop_flush;
3664         dev_id = dev->data->dev_id;
3665         arg = dev->data->dev_stop_flush_arg;
3666
3667         while (rte_event_dequeue_burst(dev_id, port_id, &ev, 1, 0)) {
3668                 if (flush)
3669                         flush(dev_id, ev, arg);
3670
3671                 if (dlb->ev_ports[port_id].qm_port.is_directed)
3672                         continue;
3673
3674                 ev.op = RTE_EVENT_OP_RELEASE;
3675
3676                 rte_event_enqueue_burst(dev_id, port_id, &ev, 1);
3677         }
3678
3679         /* Enqueue any additional outstanding releases */
3680         ev.op = RTE_EVENT_OP_RELEASE;
3681
3682         for (i = dlb->ev_ports[port_id].outstanding_releases; i > 0; i--)
3683                 rte_event_enqueue_burst(dev_id, port_id, &ev, 1);
3684 }
3685
3686 static void
3687 dlb_drain(struct rte_eventdev *dev)
3688 {
3689         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
3690         struct dlb_eventdev_port *ev_port = NULL;
3691         uint8_t dev_id;
3692         int i;
3693
3694         dev_id = dev->data->dev_id;
3695
3696         while (!dlb_linked_queues_empty(dlb)) {
3697                 /* Flush all the ev_ports, which will drain all their connected
3698                  * queues.
3699                  */
3700                 for (i = 0; i < dlb->num_ports; i++)
3701                         dlb_flush_port(dev, i);
3702         }
3703
3704         /* The queues are empty, but there may be events left in the ports. */
3705         for (i = 0; i < dlb->num_ports; i++)
3706                 dlb_flush_port(dev, i);
3707
3708         /* If the domain's queues are empty, we're done. */
3709         if (dlb_queues_empty(dlb))
3710                 return;
3711
3712         /* Else, there must be at least one unlinked load-balanced queue.
3713          * Select a load-balanced port with which to drain the unlinked
3714          * queue(s).
3715          */
3716         for (i = 0; i < dlb->num_ports; i++) {
3717                 ev_port = &dlb->ev_ports[i];
3718
3719                 if (!ev_port->qm_port.is_directed)
3720                         break;
3721         }
3722
3723         if (i == dlb->num_ports) {
3724                 DLB_LOG_ERR("internal error: no LDB ev_ports\n");
3725                 return;
3726         }
3727
3728         rte_errno = 0;
3729         rte_event_port_unlink(dev_id, ev_port->id, NULL, 0);
3730
3731         if (rte_errno) {
3732                 DLB_LOG_ERR("internal error: failed to unlink ev_port %d\n",
3733                             ev_port->id);
3734                 return;
3735         }
3736
3737         for (i = 0; i < dlb->num_queues; i++) {
3738                 uint8_t qid, prio;
3739                 int ret;
3740
3741                 if (dlb_queue_is_empty(dlb, &dlb->ev_queues[i]))
3742                         continue;
3743
3744                 qid = i;
3745                 prio = 0;
3746
3747                 /* Link the ev_port to the queue */
3748                 ret = rte_event_port_link(dev_id, ev_port->id, &qid, &prio, 1);
3749                 if (ret != 1) {
3750                         DLB_LOG_ERR("internal error: failed to link ev_port %d to queue %d\n",
3751                                     ev_port->id, qid);
3752                         return;
3753                 }
3754
3755                 /* Flush the queue */
3756                 while (!dlb_queue_is_empty(dlb, &dlb->ev_queues[i]))
3757                         dlb_flush_port(dev, ev_port->id);
3758
3759                 /* Drain any extant events in the ev_port. */
3760                 dlb_flush_port(dev, ev_port->id);
3761
3762                 /* Unlink the ev_port from the queue */
3763                 ret = rte_event_port_unlink(dev_id, ev_port->id, &qid, 1);
3764                 if (ret != 1) {
3765                         DLB_LOG_ERR("internal error: failed to unlink ev_port %d to queue %d\n",
3766                                     ev_port->id, qid);
3767                         return;
3768                 }
3769         }
3770 }
3771
3772 static void
3773 dlb_eventdev_stop(struct rte_eventdev *dev)
3774 {
3775         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
3776
3777         rte_spinlock_lock(&dlb->qm_instance.resource_lock);
3778
3779         if (dlb->run_state == DLB_RUN_STATE_STOPPED) {
3780                 DLB_LOG_DBG("Internal error: already stopped\n");
3781                 rte_spinlock_unlock(&dlb->qm_instance.resource_lock);
3782                 return;
3783         } else if (dlb->run_state != DLB_RUN_STATE_STARTED) {
3784                 DLB_LOG_ERR("Internal error: bad state %d for dev_stop\n",
3785                             (int)dlb->run_state);
3786                 rte_spinlock_unlock(&dlb->qm_instance.resource_lock);
3787                 return;
3788         }
3789
3790         dlb->run_state = DLB_RUN_STATE_STOPPING;
3791
3792         rte_spinlock_unlock(&dlb->qm_instance.resource_lock);
3793
3794         dlb_drain(dev);
3795
3796         dlb->run_state = DLB_RUN_STATE_STOPPED;
3797 }
3798
3799 static int
3800 dlb_eventdev_close(struct rte_eventdev *dev)
3801 {
3802         dlb_hw_reset_sched_domain(dev, false);
3803
3804         return 0;
3805 }
3806
3807 void
3808 dlb_entry_points_init(struct rte_eventdev *dev)
3809 {
3810         struct dlb_eventdev *dlb;
3811
3812         static struct rte_eventdev_ops dlb_eventdev_entry_ops = {
3813                 .dev_infos_get    = dlb_eventdev_info_get,
3814                 .dev_configure    = dlb_eventdev_configure,
3815                 .dev_start        = dlb_eventdev_start,
3816                 .dev_stop         = dlb_eventdev_stop,
3817                 .dev_close        = dlb_eventdev_close,
3818                 .queue_def_conf   = dlb_eventdev_queue_default_conf_get,
3819                 .port_def_conf    = dlb_eventdev_port_default_conf_get,
3820                 .queue_setup      = dlb_eventdev_queue_setup,
3821                 .port_setup       = dlb_eventdev_port_setup,
3822                 .port_link        = dlb_eventdev_port_link,
3823                 .port_unlink      = dlb_eventdev_port_unlink,
3824                 .port_unlinks_in_progress =
3825                                     dlb_eventdev_port_unlinks_in_progress,
3826                 .dump             = dlb_eventdev_dump,
3827                 .xstats_get       = dlb_eventdev_xstats_get,
3828                 .xstats_get_names = dlb_eventdev_xstats_get_names,
3829                 .xstats_get_by_name = dlb_eventdev_xstats_get_by_name,
3830                 .xstats_reset       = dlb_eventdev_xstats_reset,
3831         };
3832
3833         /* Expose PMD's eventdev interface */
3834         dev->dev_ops = &dlb_eventdev_entry_ops;
3835
3836         dev->enqueue = dlb_event_enqueue;
3837         dev->enqueue_burst = dlb_event_enqueue_burst;
3838         dev->enqueue_new_burst = dlb_event_enqueue_new_burst;
3839         dev->enqueue_forward_burst = dlb_event_enqueue_forward_burst;
3840         dev->dequeue = dlb_event_dequeue;
3841         dev->dequeue_burst = dlb_event_dequeue_burst;
3842
3843         dlb = dev->data->dev_private;
3844
3845         if (dlb->poll_mode == DLB_CQ_POLL_MODE_SPARSE) {
3846                 dev->dequeue = dlb_event_dequeue_sparse;
3847                 dev->dequeue_burst = dlb_event_dequeue_burst_sparse;
3848         }
3849 }
3850
3851 int
3852 dlb_primary_eventdev_probe(struct rte_eventdev *dev,
3853                            const char *name,
3854                            struct dlb_devargs *dlb_args)
3855 {
3856         struct dlb_eventdev *dlb;
3857         int err, i;
3858
3859         dlb = dev->data->dev_private;
3860
3861         dlb->event_dev = dev; /* backlink */
3862
3863         evdev_dlb_default_info.driver_name = name;
3864
3865         dlb->max_num_events_override = dlb_args->max_num_events;
3866         dlb->num_dir_credits_override = dlb_args->num_dir_credits_override;
3867         dlb->defer_sched = dlb_args->defer_sched;
3868         dlb->num_atm_inflights_per_queue = dlb_args->num_atm_inflights;
3869
3870         /* Open the interface.
3871          * For vdev mode, this means open the dlb kernel module.
3872          */
3873         err = dlb_iface_open(&dlb->qm_instance, name);
3874         if (err < 0) {
3875                 DLB_LOG_ERR("could not open event hardware device, err=%d\n",
3876                             err);
3877                 return err;
3878         }
3879
3880         err = dlb_iface_get_device_version(&dlb->qm_instance, &dlb->revision);
3881         if (err < 0) {
3882                 DLB_LOG_ERR("dlb: failed to get the device version, err=%d\n",
3883                             err);
3884                 return err;
3885         }
3886
3887         err = dlb_hw_query_resources(dlb);
3888         if (err) {
3889                 DLB_LOG_ERR("get resources err=%d for %s\n", err, name);
3890                 return err;
3891         }
3892
3893         err = dlb_iface_get_cq_poll_mode(&dlb->qm_instance, &dlb->poll_mode);
3894         if (err < 0) {
3895                 DLB_LOG_ERR("dlb: failed to get the poll mode, err=%d\n", err);
3896                 return err;
3897         }
3898
3899         /* Complete xtstats runtime initialization */
3900         err = dlb_xstats_init(dlb);
3901         if (err) {
3902                 DLB_LOG_ERR("dlb: failed to init xstats, err=%d\n", err);
3903                 return err;
3904         }
3905
3906         /* Initialize each port's token pop mode */
3907         for (i = 0; i < DLB_MAX_NUM_PORTS; i++)
3908                 dlb->ev_ports[i].qm_port.token_pop_mode = AUTO_POP;
3909
3910         rte_spinlock_init(&dlb->qm_instance.resource_lock);
3911
3912         dlb_iface_low_level_io_init(dlb);
3913
3914         dlb_entry_points_init(dev);
3915
3916         return 0;
3917 }
3918
3919 int
3920 dlb_secondary_eventdev_probe(struct rte_eventdev *dev,
3921                              const char *name)
3922 {
3923         struct dlb_eventdev *dlb;
3924         int err;
3925
3926         dlb = dev->data->dev_private;
3927
3928         evdev_dlb_default_info.driver_name = name;
3929
3930         err = dlb_iface_open(&dlb->qm_instance, name);
3931         if (err < 0) {
3932                 DLB_LOG_ERR("could not open event hardware device, err=%d\n",
3933                             err);
3934                 return err;
3935         }
3936
3937         err = dlb_hw_query_resources(dlb);
3938         if (err) {
3939                 DLB_LOG_ERR("get resources err=%d for %s\n", err, name);
3940                 return err;
3941         }
3942
3943         dlb_iface_low_level_io_init(dlb);
3944
3945         dlb_entry_points_init(dev);
3946
3947         return 0;
3948 }
3949
3950 int
3951 dlb_parse_params(const char *params,
3952                  const char *name,
3953                  struct dlb_devargs *dlb_args)
3954 {
3955         int ret = 0;
3956         static const char * const args[] = { NUMA_NODE_ARG,
3957                                              DLB_MAX_NUM_EVENTS,
3958                                              DLB_NUM_DIR_CREDITS,
3959                                              DEV_ID_ARG,
3960                                              DLB_DEFER_SCHED_ARG,
3961                                              DLB_NUM_ATM_INFLIGHTS_ARG,
3962                                              NULL };
3963
3964         if (params && params[0] != '\0') {
3965                 struct rte_kvargs *kvlist = rte_kvargs_parse(params, args);
3966
3967                 if (kvlist == NULL) {
3968                         DLB_LOG_INFO("Ignoring unsupported parameters when creating device '%s'\n",
3969                                      name);
3970                 } else {
3971                         int ret = rte_kvargs_process(kvlist, NUMA_NODE_ARG,
3972                                                      set_numa_node,
3973                                                      &dlb_args->socket_id);
3974                         if (ret != 0) {
3975                                 DLB_LOG_ERR("%s: Error parsing numa node parameter",
3976                                             name);
3977                                 rte_kvargs_free(kvlist);
3978                                 return ret;
3979                         }
3980
3981                         ret = rte_kvargs_process(kvlist, DLB_MAX_NUM_EVENTS,
3982                                                  set_max_num_events,
3983                                                  &dlb_args->max_num_events);
3984                         if (ret != 0) {
3985                                 DLB_LOG_ERR("%s: Error parsing max_num_events parameter",
3986                                             name);
3987                                 rte_kvargs_free(kvlist);
3988                                 return ret;
3989                         }
3990
3991                         ret = rte_kvargs_process(kvlist,
3992                                         DLB_NUM_DIR_CREDITS,
3993                                         set_num_dir_credits,
3994                                         &dlb_args->num_dir_credits_override);
3995                         if (ret != 0) {
3996                                 DLB_LOG_ERR("%s: Error parsing num_dir_credits parameter",
3997                                             name);
3998                                 rte_kvargs_free(kvlist);
3999                                 return ret;
4000                         }
4001
4002                         ret = rte_kvargs_process(kvlist, DEV_ID_ARG,
4003                                                  set_dev_id,
4004                                                  &dlb_args->dev_id);
4005                         if (ret != 0) {
4006                                 DLB_LOG_ERR("%s: Error parsing dev_id parameter",
4007                                             name);
4008                                 rte_kvargs_free(kvlist);
4009                                 return ret;
4010                         }
4011
4012                         ret = rte_kvargs_process(kvlist, DLB_DEFER_SCHED_ARG,
4013                                                  set_defer_sched,
4014                                                  &dlb_args->defer_sched);
4015                         if (ret != 0) {
4016                                 DLB_LOG_ERR("%s: Error parsing defer_sched parameter",
4017                                             name);
4018                                 rte_kvargs_free(kvlist);
4019                                 return ret;
4020                         }
4021
4022                         ret = rte_kvargs_process(kvlist,
4023                                                  DLB_NUM_ATM_INFLIGHTS_ARG,
4024                                                  set_num_atm_inflights,
4025                                                  &dlb_args->num_atm_inflights);
4026                         if (ret != 0) {
4027                                 DLB_LOG_ERR("%s: Error parsing atm_inflights parameter",
4028                                             name);
4029                                 rte_kvargs_free(kvlist);
4030                                 return ret;
4031                         }
4032
4033                         rte_kvargs_free(kvlist);
4034                 }
4035         }
4036         return ret;
4037 }
4038 RTE_LOG_REGISTER(eventdev_dlb_log_level, pmd.event.dlb, NOTICE);