event/dlb: add queue and port release
[dpdk.git] / drivers / event / dlb / dlb.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2016-2020 Intel Corporation
3  */
4
5 #include <assert.h>
6 #include <errno.h>
7 #include <nmmintrin.h>
8 #include <pthread.h>
9 #include <stdbool.h>
10 #include <stdint.h>
11 #include <stdio.h>
12 #include <string.h>
13 #include <sys/fcntl.h>
14 #include <sys/mman.h>
15 #include <unistd.h>
16
17 #include <rte_common.h>
18 #include <rte_config.h>
19 #include <rte_cycles.h>
20 #include <rte_debug.h>
21 #include <rte_dev.h>
22 #include <rte_errno.h>
23 #include <rte_io.h>
24 #include <rte_kvargs.h>
25 #include <rte_log.h>
26 #include <rte_malloc.h>
27 #include <rte_mbuf.h>
28 #include <rte_power_intrinsics.h>
29 #include <rte_prefetch.h>
30 #include <rte_ring.h>
31 #include <rte_string_fns.h>
32
33 #include <rte_eventdev.h>
34 #include <rte_eventdev_pmd.h>
35
36 #include "dlb_priv.h"
37 #include "dlb_iface.h"
38 #include "dlb_inline_fns.h"
39
40 /*
41  * Resources exposed to eventdev.
42  */
43 #if (RTE_EVENT_MAX_QUEUES_PER_DEV > UINT8_MAX)
44 #error "RTE_EVENT_MAX_QUEUES_PER_DEV cannot fit in member max_event_queues"
45 #endif
46 static struct rte_event_dev_info evdev_dlb_default_info = {
47         .driver_name = "", /* probe will set */
48         .min_dequeue_timeout_ns = DLB_MIN_DEQUEUE_TIMEOUT_NS,
49         .max_dequeue_timeout_ns = DLB_MAX_DEQUEUE_TIMEOUT_NS,
50 #if (RTE_EVENT_MAX_QUEUES_PER_DEV < DLB_MAX_NUM_LDB_QUEUES)
51         .max_event_queues = RTE_EVENT_MAX_QUEUES_PER_DEV,
52 #else
53         .max_event_queues = DLB_MAX_NUM_LDB_QUEUES,
54 #endif
55         .max_event_queue_flows = DLB_MAX_NUM_FLOWS,
56         .max_event_queue_priority_levels = DLB_QID_PRIORITIES,
57         .max_event_priority_levels = DLB_QID_PRIORITIES,
58         .max_event_ports = DLB_MAX_NUM_LDB_PORTS,
59         .max_event_port_dequeue_depth = DLB_MAX_CQ_DEPTH,
60         .max_event_port_enqueue_depth = DLB_MAX_ENQUEUE_DEPTH,
61         .max_event_port_links = DLB_MAX_NUM_QIDS_PER_LDB_CQ,
62         .max_num_events = DLB_MAX_NUM_LDB_CREDITS,
63         .max_single_link_event_port_queue_pairs = DLB_MAX_NUM_DIR_PORTS,
64         .event_dev_cap = (RTE_EVENT_DEV_CAP_QUEUE_QOS |
65                           RTE_EVENT_DEV_CAP_EVENT_QOS |
66                           RTE_EVENT_DEV_CAP_BURST_MODE |
67                           RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED |
68                           RTE_EVENT_DEV_CAP_IMPLICIT_RELEASE_DISABLE |
69                           RTE_EVENT_DEV_CAP_QUEUE_ALL_TYPES),
70 };
71
72 struct process_local_port_data
73 dlb_port[DLB_MAX_NUM_PORTS][NUM_DLB_PORT_TYPES];
74
75 static inline uint16_t
76 dlb_event_enqueue_delayed(void *event_port,
77                           const struct rte_event events[]);
78
79 static inline uint16_t
80 dlb_event_enqueue_burst_delayed(void *event_port,
81                                 const struct rte_event events[],
82                                 uint16_t num);
83
84 static inline uint16_t
85 dlb_event_enqueue_new_burst_delayed(void *event_port,
86                                     const struct rte_event events[],
87                                     uint16_t num);
88
89 static inline uint16_t
90 dlb_event_enqueue_forward_burst_delayed(void *event_port,
91                                         const struct rte_event events[],
92                                         uint16_t num);
93
94 static int
95 dlb_hw_query_resources(struct dlb_eventdev *dlb)
96 {
97         struct dlb_hw_dev *handle = &dlb->qm_instance;
98         struct dlb_hw_resource_info *dlb_info = &handle->info;
99         int ret;
100
101         ret = dlb_iface_get_num_resources(handle,
102                                           &dlb->hw_rsrc_query_results);
103         if (ret) {
104                 DLB_LOG_ERR("get dlb num resources, err=%d\n", ret);
105                 return ret;
106         }
107
108         /* Complete filling in device resource info returned to evdev app,
109          * overriding any default values.
110          * The capabilities (CAPs) were set at compile time.
111          */
112
113         evdev_dlb_default_info.max_event_queues =
114                 dlb->hw_rsrc_query_results.num_ldb_queues;
115
116         evdev_dlb_default_info.max_event_ports =
117                 dlb->hw_rsrc_query_results.num_ldb_ports;
118
119         evdev_dlb_default_info.max_num_events =
120                 dlb->hw_rsrc_query_results.max_contiguous_ldb_credits;
121
122         /* Save off values used when creating the scheduling domain. */
123
124         handle->info.num_sched_domains =
125                 dlb->hw_rsrc_query_results.num_sched_domains;
126
127         handle->info.hw_rsrc_max.nb_events_limit =
128                 dlb->hw_rsrc_query_results.max_contiguous_ldb_credits;
129
130         handle->info.hw_rsrc_max.num_queues =
131                 dlb->hw_rsrc_query_results.num_ldb_queues +
132                 dlb->hw_rsrc_query_results.num_dir_ports;
133
134         handle->info.hw_rsrc_max.num_ldb_queues =
135                 dlb->hw_rsrc_query_results.num_ldb_queues;
136
137         handle->info.hw_rsrc_max.num_ldb_ports =
138                 dlb->hw_rsrc_query_results.num_ldb_ports;
139
140         handle->info.hw_rsrc_max.num_dir_ports =
141                 dlb->hw_rsrc_query_results.num_dir_ports;
142
143         handle->info.hw_rsrc_max.reorder_window_size =
144                 dlb->hw_rsrc_query_results.num_hist_list_entries;
145
146         rte_memcpy(dlb_info, &handle->info.hw_rsrc_max, sizeof(*dlb_info));
147
148         return 0;
149 }
150
151 static void
152 dlb_free_qe_mem(struct dlb_port *qm_port)
153 {
154         if (qm_port == NULL)
155                 return;
156
157         rte_free(qm_port->qe4);
158         qm_port->qe4 = NULL;
159
160         rte_free(qm_port->consume_qe);
161         qm_port->consume_qe = NULL;
162
163         rte_memzone_free(dlb_port[qm_port->id][PORT_TYPE(qm_port)].mz);
164         dlb_port[qm_port->id][PORT_TYPE(qm_port)].mz = NULL;
165 }
166
167 static int
168 dlb_init_consume_qe(struct dlb_port *qm_port, char *mz_name)
169 {
170         struct dlb_cq_pop_qe *qe;
171
172         qe = rte_zmalloc(mz_name,
173                         DLB_NUM_QES_PER_CACHE_LINE *
174                                 sizeof(struct dlb_cq_pop_qe),
175                         RTE_CACHE_LINE_SIZE);
176
177         if (qe == NULL) {
178                 DLB_LOG_ERR("dlb: no memory for consume_qe\n");
179                 return -ENOMEM;
180         }
181
182         qm_port->consume_qe = qe;
183
184         qe->qe_valid = 0;
185         qe->qe_frag = 0;
186         qe->qe_comp = 0;
187         qe->cq_token = 1;
188         /* Tokens value is 0-based; i.e. '0' returns 1 token, '1' returns 2,
189          * and so on.
190          */
191         qe->tokens = 0; /* set at run time */
192         qe->meas_lat = 0;
193         qe->no_dec = 0;
194         /* Completion IDs are disabled */
195         qe->cmp_id = 0;
196
197         return 0;
198 }
199
200 static int
201 dlb_init_qe_mem(struct dlb_port *qm_port, char *mz_name)
202 {
203         int ret, sz;
204
205         sz = DLB_NUM_QES_PER_CACHE_LINE * sizeof(struct dlb_enqueue_qe);
206
207         qm_port->qe4 = rte_zmalloc(mz_name, sz, RTE_CACHE_LINE_SIZE);
208
209         if (qm_port->qe4 == NULL) {
210                 DLB_LOG_ERR("dlb: no qe4 memory\n");
211                 ret = -ENOMEM;
212                 goto error_exit;
213         }
214
215         ret = dlb_init_consume_qe(qm_port, mz_name);
216         if (ret < 0) {
217                 DLB_LOG_ERR("dlb: dlb_init_consume_qe ret=%d\n", ret);
218                 goto error_exit;
219         }
220
221         return 0;
222
223 error_exit:
224
225         dlb_free_qe_mem(qm_port);
226
227         return ret;
228 }
229
230 /* Wrapper for string to int conversion. Substituted for atoi(...), which is
231  * unsafe.
232  */
233 #define DLB_BASE_10 10
234
235 static int
236 dlb_string_to_int(int *result, const char *str)
237 {
238         long ret;
239         char *endstr;
240
241         if (str == NULL || result == NULL)
242                 return -EINVAL;
243
244         errno = 0;
245         ret = strtol(str, &endstr, DLB_BASE_10);
246         if (errno)
247                 return -errno;
248
249         /* long int and int may be different width for some architectures */
250         if (ret < INT_MIN || ret > INT_MAX || endstr == str)
251                 return -EINVAL;
252
253         *result = ret;
254         return 0;
255 }
256
257 static int
258 set_numa_node(const char *key __rte_unused, const char *value, void *opaque)
259 {
260         int *socket_id = opaque;
261         int ret;
262
263         ret = dlb_string_to_int(socket_id, value);
264         if (ret < 0)
265                 return ret;
266
267         if (*socket_id > RTE_MAX_NUMA_NODES)
268                 return -EINVAL;
269
270         return 0;
271 }
272
273 static int
274 set_max_num_events(const char *key __rte_unused,
275                    const char *value,
276                    void *opaque)
277 {
278         int *max_num_events = opaque;
279         int ret;
280
281         if (value == NULL || opaque == NULL) {
282                 DLB_LOG_ERR("NULL pointer\n");
283                 return -EINVAL;
284         }
285
286         ret = dlb_string_to_int(max_num_events, value);
287         if (ret < 0)
288                 return ret;
289
290         if (*max_num_events < 0 || *max_num_events > DLB_MAX_NUM_LDB_CREDITS) {
291                 DLB_LOG_ERR("dlb: max_num_events must be between 0 and %d\n",
292                             DLB_MAX_NUM_LDB_CREDITS);
293                 return -EINVAL;
294         }
295
296         return 0;
297 }
298
299 static int
300 set_num_dir_credits(const char *key __rte_unused,
301                     const char *value,
302                     void *opaque)
303 {
304         int *num_dir_credits = opaque;
305         int ret;
306
307         if (value == NULL || opaque == NULL) {
308                 DLB_LOG_ERR("NULL pointer\n");
309                 return -EINVAL;
310         }
311
312         ret = dlb_string_to_int(num_dir_credits, value);
313         if (ret < 0)
314                 return ret;
315
316         if (*num_dir_credits < 0 ||
317             *num_dir_credits > DLB_MAX_NUM_DIR_CREDITS) {
318                 DLB_LOG_ERR("dlb: num_dir_credits must be between 0 and %d\n",
319                             DLB_MAX_NUM_DIR_CREDITS);
320                 return -EINVAL;
321         }
322         return 0;
323 }
324
325 /* VDEV-only notes:
326  * This function first unmaps all memory mappings and closes the
327  * domain's file descriptor, which causes the driver to reset the
328  * scheduling domain. Once that completes (when close() returns), we
329  * can safely free the dynamically allocated memory used by the
330  * scheduling domain.
331  *
332  * PF-only notes:
333  * We will maintain a use count and use that to determine when
334  * a reset is required.  In PF mode, we never mmap, or munmap
335  * device memory,  and we own the entire physical PCI device.
336  */
337
338 static void
339 dlb_hw_reset_sched_domain(const struct rte_eventdev *dev, bool reconfig)
340 {
341         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
342         enum dlb_configuration_state config_state;
343         int i, j;
344
345         /* Close and reset the domain */
346         dlb_iface_domain_close(dlb);
347
348         /* Free all dynamically allocated port memory */
349         for (i = 0; i < dlb->num_ports; i++)
350                 dlb_free_qe_mem(&dlb->ev_ports[i].qm_port);
351
352         /* If reconfiguring, mark the device's queues and ports as "previously
353          * configured." If the user does not reconfigure them, the PMD will
354          * reapply their previous configuration when the device is started.
355          */
356         config_state = (reconfig) ? DLB_PREV_CONFIGURED : DLB_NOT_CONFIGURED;
357
358         for (i = 0; i < dlb->num_ports; i++) {
359                 dlb->ev_ports[i].qm_port.config_state = config_state;
360                 /* Reset setup_done so ports can be reconfigured */
361                 dlb->ev_ports[i].setup_done = false;
362                 for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++)
363                         dlb->ev_ports[i].link[j].mapped = false;
364         }
365
366         for (i = 0; i < dlb->num_queues; i++)
367                 dlb->ev_queues[i].qm_queue.config_state = config_state;
368
369         for (i = 0; i < DLB_MAX_NUM_QUEUES; i++)
370                 dlb->ev_queues[i].setup_done = false;
371
372         dlb->num_ports = 0;
373         dlb->num_ldb_ports = 0;
374         dlb->num_dir_ports = 0;
375         dlb->num_queues = 0;
376         dlb->num_ldb_queues = 0;
377         dlb->num_dir_queues = 0;
378         dlb->configured = false;
379 }
380
381 static int
382 dlb_ldb_credit_pool_create(struct dlb_hw_dev *handle)
383 {
384         struct dlb_create_ldb_pool_args cfg;
385         struct dlb_cmd_response response;
386         int ret;
387
388         if (handle == NULL)
389                 return -EINVAL;
390
391         if (!handle->cfg.resources.num_ldb_credits) {
392                 handle->cfg.ldb_credit_pool_id = 0;
393                 handle->cfg.num_ldb_credits = 0;
394                 return 0;
395         }
396
397         cfg.response = (uintptr_t)&response;
398         cfg.num_ldb_credits = handle->cfg.resources.num_ldb_credits;
399
400         ret = dlb_iface_ldb_credit_pool_create(handle,
401                                                &cfg);
402         if (ret < 0) {
403                 DLB_LOG_ERR("dlb: ldb_credit_pool_create ret=%d (driver status: %s)\n",
404                             ret, dlb_error_strings[response.status]);
405         }
406
407         handle->cfg.ldb_credit_pool_id = response.id;
408         handle->cfg.num_ldb_credits = cfg.num_ldb_credits;
409
410         return ret;
411 }
412
413 static int
414 dlb_dir_credit_pool_create(struct dlb_hw_dev *handle)
415 {
416         struct dlb_create_dir_pool_args cfg;
417         struct dlb_cmd_response response;
418         int ret;
419
420         if (handle == NULL)
421                 return -EINVAL;
422
423         if (!handle->cfg.resources.num_dir_credits) {
424                 handle->cfg.dir_credit_pool_id = 0;
425                 handle->cfg.num_dir_credits = 0;
426                 return 0;
427         }
428
429         cfg.response = (uintptr_t)&response;
430         cfg.num_dir_credits = handle->cfg.resources.num_dir_credits;
431
432         ret = dlb_iface_dir_credit_pool_create(handle, &cfg);
433         if (ret < 0)
434                 DLB_LOG_ERR("dlb: dir_credit_pool_create ret=%d (driver status: %s)\n",
435                             ret, dlb_error_strings[response.status]);
436
437         handle->cfg.dir_credit_pool_id = response.id;
438         handle->cfg.num_dir_credits = cfg.num_dir_credits;
439
440         return ret;
441 }
442
443 static int
444 dlb_hw_create_sched_domain(struct dlb_hw_dev *handle,
445                            struct dlb_eventdev *dlb,
446                            const struct dlb_hw_rsrcs *resources_asked)
447 {
448         int ret = 0;
449         struct dlb_create_sched_domain_args *config_params;
450         struct dlb_cmd_response response;
451
452         if (resources_asked == NULL) {
453                 DLB_LOG_ERR("dlb: dlb_create NULL parameter\n");
454                 ret = EINVAL;
455                 goto error_exit;
456         }
457
458         /* Map generic qm resources to dlb resources */
459         config_params = &handle->cfg.resources;
460
461         config_params->response = (uintptr_t)&response;
462
463         /* DIR ports and queues */
464
465         config_params->num_dir_ports =
466                 resources_asked->num_dir_ports;
467
468         config_params->num_dir_credits =
469                 resources_asked->num_dir_credits;
470
471         /* LDB ports and queues */
472
473         config_params->num_ldb_queues =
474                 resources_asked->num_ldb_queues;
475
476         config_params->num_ldb_ports =
477                 resources_asked->num_ldb_ports;
478
479         config_params->num_ldb_credits =
480                 resources_asked->num_ldb_credits;
481
482         config_params->num_atomic_inflights =
483                 dlb->num_atm_inflights_per_queue *
484                 config_params->num_ldb_queues;
485
486         config_params->num_hist_list_entries = config_params->num_ldb_ports *
487                 DLB_NUM_HIST_LIST_ENTRIES_PER_LDB_PORT;
488
489         /* dlb limited to 1 credit pool per queue type */
490         config_params->num_ldb_credit_pools = 1;
491         config_params->num_dir_credit_pools = 1;
492
493         DLB_LOG_DBG("sched domain create - ldb_qs=%d, ldb_ports=%d, dir_ports=%d, atomic_inflights=%d, hist_list_entries=%d, ldb_credits=%d, dir_credits=%d, ldb_cred_pools=%d, dir-credit_pools=%d\n",
494                     config_params->num_ldb_queues,
495                     config_params->num_ldb_ports,
496                     config_params->num_dir_ports,
497                     config_params->num_atomic_inflights,
498                     config_params->num_hist_list_entries,
499                     config_params->num_ldb_credits,
500                     config_params->num_dir_credits,
501                     config_params->num_ldb_credit_pools,
502                     config_params->num_dir_credit_pools);
503
504         /* Configure the QM */
505
506         ret = dlb_iface_sched_domain_create(handle, config_params);
507         if (ret < 0) {
508                 DLB_LOG_ERR("dlb: domain create failed, device_id = %d, (driver ret = %d, extra status: %s)\n",
509                             handle->device_id,
510                             ret,
511                             dlb_error_strings[response.status]);
512                 goto error_exit;
513         }
514
515         handle->domain_id = response.id;
516         handle->domain_id_valid = 1;
517
518         config_params->response = 0;
519
520         ret = dlb_ldb_credit_pool_create(handle);
521         if (ret < 0) {
522                 DLB_LOG_ERR("dlb: create ldb credit pool failed\n");
523                 goto error_exit2;
524         }
525
526         ret = dlb_dir_credit_pool_create(handle);
527         if (ret < 0) {
528                 DLB_LOG_ERR("dlb: create dir credit pool failed\n");
529                 goto error_exit2;
530         }
531
532         handle->cfg.configured = true;
533
534         return 0;
535
536 error_exit2:
537         dlb_iface_domain_close(dlb);
538
539 error_exit:
540         return ret;
541 }
542
543 /* End HW specific */
544 static void
545 dlb_eventdev_info_get(struct rte_eventdev *dev,
546                       struct rte_event_dev_info *dev_info)
547 {
548         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
549         int ret;
550
551         ret = dlb_hw_query_resources(dlb);
552         if (ret) {
553                 const struct rte_eventdev_data *data = dev->data;
554
555                 DLB_LOG_ERR("get resources err=%d, devid=%d\n",
556                             ret, data->dev_id);
557                 /* fn is void, so fall through and return values set up in
558                  * probe
559                  */
560         }
561
562         /* Add num resources currently owned by this domain.
563          * These would become available if the scheduling domain were reset due
564          * to the application recalling eventdev_configure to *reconfigure* the
565          * domain.
566          */
567         evdev_dlb_default_info.max_event_ports += dlb->num_ldb_ports;
568         evdev_dlb_default_info.max_event_queues += dlb->num_ldb_queues;
569         evdev_dlb_default_info.max_num_events += dlb->num_ldb_credits;
570
571         /* In DLB A-stepping hardware, applications are limited to 128
572          * configured ports (load-balanced or directed). The reported number of
573          * available ports must reflect this.
574          */
575         if (dlb->revision < DLB_REV_B0) {
576                 int used_ports;
577
578                 used_ports = DLB_MAX_NUM_LDB_PORTS + DLB_MAX_NUM_DIR_PORTS -
579                         dlb->hw_rsrc_query_results.num_ldb_ports -
580                         dlb->hw_rsrc_query_results.num_dir_ports;
581
582                 evdev_dlb_default_info.max_event_ports =
583                         RTE_MIN(evdev_dlb_default_info.max_event_ports,
584                                 128 - used_ports);
585         }
586
587         evdev_dlb_default_info.max_event_queues =
588                 RTE_MIN(evdev_dlb_default_info.max_event_queues,
589                         RTE_EVENT_MAX_QUEUES_PER_DEV);
590
591         evdev_dlb_default_info.max_num_events =
592                 RTE_MIN(evdev_dlb_default_info.max_num_events,
593                         dlb->max_num_events_override);
594
595         *dev_info = evdev_dlb_default_info;
596 }
597
598 /* Note: 1 QM instance per QM device, QM instance/device == event device */
599 static int
600 dlb_eventdev_configure(const struct rte_eventdev *dev)
601 {
602         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
603         struct dlb_hw_dev *handle = &dlb->qm_instance;
604         struct dlb_hw_rsrcs *rsrcs = &handle->info.hw_rsrc_max;
605         const struct rte_eventdev_data *data = dev->data;
606         const struct rte_event_dev_config *config = &data->dev_conf;
607         int ret;
608
609         /* If this eventdev is already configured, we must release the current
610          * scheduling domain before attempting to configure a new one.
611          */
612         if (dlb->configured) {
613                 dlb_hw_reset_sched_domain(dev, true);
614
615                 ret = dlb_hw_query_resources(dlb);
616                 if (ret) {
617                         DLB_LOG_ERR("get resources err=%d, devid=%d\n",
618                                     ret, data->dev_id);
619                         return ret;
620                 }
621         }
622
623         if (config->nb_event_queues > rsrcs->num_queues) {
624                 DLB_LOG_ERR("nb_event_queues parameter (%d) exceeds the QM device's capabilities (%d).\n",
625                             config->nb_event_queues,
626                             rsrcs->num_queues);
627                 return -EINVAL;
628         }
629         if (config->nb_event_ports > (rsrcs->num_ldb_ports
630                         + rsrcs->num_dir_ports)) {
631                 DLB_LOG_ERR("nb_event_ports parameter (%d) exceeds the QM device's capabilities (%d).\n",
632                             config->nb_event_ports,
633                             (rsrcs->num_ldb_ports + rsrcs->num_dir_ports));
634                 return -EINVAL;
635         }
636         if (config->nb_events_limit > rsrcs->nb_events_limit) {
637                 DLB_LOG_ERR("nb_events_limit parameter (%d) exceeds the QM device's capabilities (%d).\n",
638                             config->nb_events_limit,
639                             rsrcs->nb_events_limit);
640                 return -EINVAL;
641         }
642
643         if (config->event_dev_cfg & RTE_EVENT_DEV_CFG_PER_DEQUEUE_TIMEOUT)
644                 dlb->global_dequeue_wait = false;
645         else {
646                 uint32_t timeout32;
647
648                 dlb->global_dequeue_wait = true;
649
650                 timeout32 = config->dequeue_timeout_ns;
651
652                 dlb->global_dequeue_wait_ticks =
653                         timeout32 * (rte_get_timer_hz() / 1E9);
654         }
655
656         /* Does this platform support umonitor/umwait? */
657         if (rte_cpu_get_flag_enabled(RTE_CPUFLAG_WAITPKG)) {
658                 if (RTE_LIBRTE_PMD_DLB_UMWAIT_CTL_STATE != 0 &&
659                     RTE_LIBRTE_PMD_DLB_UMWAIT_CTL_STATE != 1) {
660                         DLB_LOG_ERR("invalid value (%d) for RTE_LIBRTE_PMD_DLB_UMWAIT_CTL_STATE must be 0 or 1.\n",
661                                     RTE_LIBRTE_PMD_DLB_UMWAIT_CTL_STATE);
662                         return -EINVAL;
663                 }
664                 dlb->umwait_allowed = true;
665         }
666
667         rsrcs->num_dir_ports = config->nb_single_link_event_port_queues;
668         rsrcs->num_ldb_ports = config->nb_event_ports - rsrcs->num_dir_ports;
669         /* 1 dir queue per dir port */
670         rsrcs->num_ldb_queues = config->nb_event_queues - rsrcs->num_dir_ports;
671
672         /* Scale down nb_events_limit by 4 for directed credits, since there
673          * are 4x as many load-balanced credits.
674          */
675         rsrcs->num_ldb_credits = 0;
676         rsrcs->num_dir_credits = 0;
677
678         if (rsrcs->num_ldb_queues)
679                 rsrcs->num_ldb_credits = config->nb_events_limit;
680         if (rsrcs->num_dir_ports)
681                 rsrcs->num_dir_credits = config->nb_events_limit / 4;
682         if (dlb->num_dir_credits_override != -1)
683                 rsrcs->num_dir_credits = dlb->num_dir_credits_override;
684
685         if (dlb_hw_create_sched_domain(handle, dlb, rsrcs) < 0) {
686                 DLB_LOG_ERR("dlb_hw_create_sched_domain failed\n");
687                 return -ENODEV;
688         }
689
690         dlb->new_event_limit = config->nb_events_limit;
691         __atomic_store_n(&dlb->inflights, 0, __ATOMIC_SEQ_CST);
692
693         /* Save number of ports/queues for this event dev */
694         dlb->num_ports = config->nb_event_ports;
695         dlb->num_queues = config->nb_event_queues;
696         dlb->num_dir_ports = rsrcs->num_dir_ports;
697         dlb->num_ldb_ports = dlb->num_ports - dlb->num_dir_ports;
698         dlb->num_ldb_queues = dlb->num_queues - dlb->num_dir_ports;
699         dlb->num_dir_queues = dlb->num_dir_ports;
700         dlb->num_ldb_credits = rsrcs->num_ldb_credits;
701         dlb->num_dir_credits = rsrcs->num_dir_credits;
702
703         dlb->configured = true;
704
705         return 0;
706 }
707
708 static int16_t
709 dlb_hw_unmap_ldb_qid_from_port(struct dlb_hw_dev *handle,
710                                uint32_t qm_port_id,
711                                uint16_t qm_qid)
712 {
713         struct dlb_unmap_qid_args cfg;
714         struct dlb_cmd_response response;
715         int32_t ret;
716
717         if (handle == NULL)
718                 return -EINVAL;
719
720         cfg.response = (uintptr_t)&response;
721         cfg.port_id = qm_port_id;
722         cfg.qid = qm_qid;
723
724         ret = dlb_iface_unmap_qid(handle, &cfg);
725         if (ret < 0)
726                 DLB_LOG_ERR("dlb: unmap qid error, ret=%d (driver status: %s)\n",
727                             ret, dlb_error_strings[response.status]);
728
729         return ret;
730 }
731
732 static int
733 dlb_event_queue_detach_ldb(struct dlb_eventdev *dlb,
734                            struct dlb_eventdev_port *ev_port,
735                            struct dlb_eventdev_queue *ev_queue)
736 {
737         int ret, i;
738
739         /* Don't unlink until start time. */
740         if (dlb->run_state == DLB_RUN_STATE_STOPPED)
741                 return 0;
742
743         for (i = 0; i < DLB_MAX_NUM_QIDS_PER_LDB_CQ; i++) {
744                 if (ev_port->link[i].valid &&
745                     ev_port->link[i].queue_id == ev_queue->id)
746                         break; /* found */
747         }
748
749         /* This is expected with eventdev API!
750          * It blindly attempts to unmap all queues.
751          */
752         if (i == DLB_MAX_NUM_QIDS_PER_LDB_CQ) {
753                 DLB_LOG_DBG("dlb: ignoring LB QID %d not mapped for qm_port %d.\n",
754                             ev_queue->qm_queue.id,
755                             ev_port->qm_port.id);
756                 return 0;
757         }
758
759         ret = dlb_hw_unmap_ldb_qid_from_port(&dlb->qm_instance,
760                                              ev_port->qm_port.id,
761                                              ev_queue->qm_queue.id);
762         if (!ret)
763                 ev_port->link[i].mapped = false;
764
765         return ret;
766 }
767
768 static int
769 dlb_eventdev_port_unlink(struct rte_eventdev *dev, void *event_port,
770                          uint8_t queues[], uint16_t nb_unlinks)
771 {
772         struct dlb_eventdev_port *ev_port = event_port;
773         struct dlb_eventdev *dlb;
774         int i;
775
776         RTE_SET_USED(dev);
777
778         if (!ev_port->setup_done) {
779                 DLB_LOG_ERR("dlb: evport %d is not configured\n",
780                             ev_port->id);
781                 rte_errno = -EINVAL;
782                 return 0;
783         }
784
785         if (queues == NULL || nb_unlinks == 0) {
786                 DLB_LOG_DBG("dlb: queues is NULL or nb_unlinks is 0\n");
787                 return 0; /* Ignore and return success */
788         }
789
790         if (ev_port->qm_port.is_directed) {
791                 DLB_LOG_DBG("dlb: ignore unlink from dir port %d\n",
792                             ev_port->id);
793                 rte_errno = 0;
794                 return nb_unlinks; /* as if success */
795         }
796
797         dlb = ev_port->dlb;
798
799         for (i = 0; i < nb_unlinks; i++) {
800                 struct dlb_eventdev_queue *ev_queue;
801                 int ret, j;
802
803                 if (queues[i] >= dlb->num_queues) {
804                         DLB_LOG_ERR("dlb: invalid queue id %d\n", queues[i]);
805                         rte_errno = -EINVAL;
806                         return i; /* return index of offending queue */
807                 }
808
809                 ev_queue = &dlb->ev_queues[queues[i]];
810
811                 /* Does a link exist? */
812                 for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++)
813                         if (ev_port->link[j].queue_id == queues[i] &&
814                             ev_port->link[j].valid)
815                                 break;
816
817                 if (j == DLB_MAX_NUM_QIDS_PER_LDB_CQ)
818                         continue;
819
820                 ret = dlb_event_queue_detach_ldb(dlb, ev_port, ev_queue);
821                 if (ret) {
822                         DLB_LOG_ERR("unlink err=%d for port %d queue %d\n",
823                                     ret, ev_port->id, queues[i]);
824                         rte_errno = -ENOENT;
825                         return i; /* return index of offending queue */
826                 }
827
828                 ev_port->link[j].valid = false;
829                 ev_port->num_links--;
830                 ev_queue->num_links--;
831         }
832
833         return nb_unlinks;
834 }
835
836 static int
837 dlb_eventdev_port_unlinks_in_progress(struct rte_eventdev *dev,
838                                       void *event_port)
839 {
840         struct dlb_eventdev_port *ev_port = event_port;
841         struct dlb_eventdev *dlb;
842         struct dlb_hw_dev *handle;
843         struct dlb_pending_port_unmaps_args cfg;
844         struct dlb_cmd_response response;
845         int ret;
846
847         RTE_SET_USED(dev);
848
849         if (!ev_port->setup_done) {
850                 DLB_LOG_ERR("dlb: evport %d is not configured\n",
851                             ev_port->id);
852                 rte_errno = -EINVAL;
853                 return 0;
854         }
855
856         cfg.port_id = ev_port->qm_port.id;
857         cfg.response = (uintptr_t)&response;
858         dlb = ev_port->dlb;
859         handle = &dlb->qm_instance;
860         ret = dlb_iface_pending_port_unmaps(handle, &cfg);
861
862         if (ret < 0) {
863                 DLB_LOG_ERR("dlb: num_unlinks_in_progress ret=%d (driver status: %s)\n",
864                             ret, dlb_error_strings[response.status]);
865                 return ret;
866         }
867
868         return response.id;
869 }
870
871 static void
872 dlb_eventdev_port_default_conf_get(struct rte_eventdev *dev,
873                                    uint8_t port_id,
874                                    struct rte_event_port_conf *port_conf)
875 {
876         RTE_SET_USED(port_id);
877         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
878
879         port_conf->new_event_threshold = dlb->new_event_limit;
880         port_conf->dequeue_depth = 32;
881         port_conf->enqueue_depth = DLB_MAX_ENQUEUE_DEPTH;
882         port_conf->event_port_cfg = 0;
883 }
884
885 static void
886 dlb_eventdev_queue_default_conf_get(struct rte_eventdev *dev,
887                                     uint8_t queue_id,
888                                     struct rte_event_queue_conf *queue_conf)
889 {
890         RTE_SET_USED(dev);
891         RTE_SET_USED(queue_id);
892         queue_conf->nb_atomic_flows = 1024;
893         queue_conf->nb_atomic_order_sequences = 32;
894         queue_conf->event_queue_cfg = 0;
895         queue_conf->priority = 0;
896 }
897
898 static int
899 dlb_hw_create_ldb_port(struct dlb_eventdev *dlb,
900                        struct dlb_eventdev_port *ev_port,
901                        uint32_t dequeue_depth,
902                        uint32_t cq_depth,
903                        uint32_t enqueue_depth,
904                        uint16_t rsvd_tokens,
905                        bool use_rsvd_token_scheme)
906 {
907         struct dlb_hw_dev *handle = &dlb->qm_instance;
908         struct dlb_create_ldb_port_args cfg = {0};
909         struct dlb_cmd_response response = {0};
910         int ret;
911         struct dlb_port *qm_port = NULL;
912         char mz_name[RTE_MEMZONE_NAMESIZE];
913         uint32_t qm_port_id;
914
915         if (handle == NULL)
916                 return -EINVAL;
917
918         if (cq_depth < DLB_MIN_LDB_CQ_DEPTH) {
919                 DLB_LOG_ERR("dlb: invalid cq_depth, must be %d-%d\n",
920                         DLB_MIN_LDB_CQ_DEPTH, DLB_MAX_INPUT_QUEUE_DEPTH);
921                 return -EINVAL;
922         }
923
924         if (enqueue_depth < DLB_MIN_ENQUEUE_DEPTH) {
925                 DLB_LOG_ERR("dlb: invalid enqueue_depth, must be at least %d\n",
926                             DLB_MIN_ENQUEUE_DEPTH);
927                 return -EINVAL;
928         }
929
930         rte_spinlock_lock(&handle->resource_lock);
931
932         cfg.response = (uintptr_t)&response;
933
934         /* We round up to the next power of 2 if necessary */
935         cfg.cq_depth = rte_align32pow2(cq_depth);
936         cfg.cq_depth_threshold = rsvd_tokens;
937
938         cfg.cq_history_list_size = DLB_NUM_HIST_LIST_ENTRIES_PER_LDB_PORT;
939
940         /* User controls the LDB high watermark via enqueue depth. The DIR high
941          * watermark is equal, unless the directed credit pool is too small.
942          */
943         cfg.ldb_credit_high_watermark = enqueue_depth;
944
945         /* If there are no directed ports, the kernel driver will ignore this
946          * port's directed credit settings. Don't use enqueue_depth if it would
947          * require more directed credits than are available.
948          */
949         cfg.dir_credit_high_watermark =
950                 RTE_MIN(enqueue_depth,
951                         handle->cfg.num_dir_credits / dlb->num_ports);
952
953         cfg.ldb_credit_quantum = cfg.ldb_credit_high_watermark / 2;
954         cfg.ldb_credit_low_watermark = RTE_MIN(16, cfg.ldb_credit_quantum);
955
956         cfg.dir_credit_quantum = cfg.dir_credit_high_watermark / 2;
957         cfg.dir_credit_low_watermark = RTE_MIN(16, cfg.dir_credit_quantum);
958
959         /* Per QM values */
960
961         cfg.ldb_credit_pool_id = handle->cfg.ldb_credit_pool_id;
962         cfg.dir_credit_pool_id = handle->cfg.dir_credit_pool_id;
963
964         ret = dlb_iface_ldb_port_create(handle, &cfg, dlb->poll_mode);
965         if (ret < 0) {
966                 DLB_LOG_ERR("dlb: dlb_ldb_port_create error, ret=%d (driver status: %s)\n",
967                             ret, dlb_error_strings[response.status]);
968                 goto error_exit;
969         }
970
971         qm_port_id = response.id;
972
973         DLB_LOG_DBG("dlb: ev_port %d uses qm LB port %d <<<<<\n",
974                     ev_port->id, qm_port_id);
975
976         qm_port = &ev_port->qm_port;
977         qm_port->ev_port = ev_port; /* back ptr */
978         qm_port->dlb = dlb; /* back ptr */
979
980         /*
981          * Allocate and init local qe struct(s).
982          * Note: MOVDIR64 requires the enqueue QE (qe4) to be aligned.
983          */
984
985         snprintf(mz_name, sizeof(mz_name), "ldb_port%d",
986                  ev_port->id);
987
988         ret = dlb_init_qe_mem(qm_port, mz_name);
989         if (ret < 0) {
990                 DLB_LOG_ERR("dlb: init_qe_mem failed, ret=%d\n", ret);
991                 goto error_exit;
992         }
993
994         qm_port->pp_mmio_base = DLB_LDB_PP_BASE + PAGE_SIZE * qm_port_id;
995         qm_port->id = qm_port_id;
996
997         /* The credit window is one high water mark of QEs */
998         qm_port->ldb_pushcount_at_credit_expiry = 0;
999         qm_port->cached_ldb_credits = cfg.ldb_credit_high_watermark;
1000         /* The credit window is one high water mark of QEs */
1001         qm_port->dir_pushcount_at_credit_expiry = 0;
1002         qm_port->cached_dir_credits = cfg.dir_credit_high_watermark;
1003         qm_port->cq_depth = cfg.cq_depth;
1004         /* CQs with depth < 8 use an 8-entry queue, but withhold credits so
1005          * the effective depth is smaller.
1006          */
1007         qm_port->cq_depth = cfg.cq_depth <= 8 ? 8 : cfg.cq_depth;
1008         qm_port->cq_idx = 0;
1009         qm_port->cq_idx_unmasked = 0;
1010         if (dlb->poll_mode == DLB_CQ_POLL_MODE_SPARSE)
1011                 qm_port->cq_depth_mask = (qm_port->cq_depth * 4) - 1;
1012         else
1013                 qm_port->cq_depth_mask = qm_port->cq_depth - 1;
1014
1015         qm_port->gen_bit_shift = __builtin_popcount(qm_port->cq_depth_mask);
1016         /* starting value of gen bit - it toggles at wrap time */
1017         qm_port->gen_bit = 1;
1018
1019         qm_port->use_rsvd_token_scheme = use_rsvd_token_scheme;
1020         qm_port->cq_rsvd_token_deficit = rsvd_tokens;
1021         qm_port->int_armed = false;
1022
1023         /* Save off for later use in info and lookup APIs. */
1024         qm_port->qid_mappings = &dlb->qm_ldb_to_ev_queue_id[0];
1025
1026         qm_port->dequeue_depth = dequeue_depth;
1027
1028         /* When using the reserved token scheme, token_pop_thresh is
1029          * initially 2 * dequeue_depth. Once the tokens are reserved,
1030          * the enqueue code re-assigns it to dequeue_depth.
1031          */
1032         qm_port->token_pop_thresh = cq_depth;
1033
1034         /* When the deferred scheduling vdev arg is selected, use deferred pop
1035          * for all single-entry CQs.
1036          */
1037         if (cfg.cq_depth == 1 || (cfg.cq_depth == 2 && use_rsvd_token_scheme)) {
1038                 if (dlb->defer_sched)
1039                         qm_port->token_pop_mode = DEFERRED_POP;
1040         }
1041
1042         /* The default enqueue functions do not include delayed-pop support for
1043          * performance reasons.
1044          */
1045         if (qm_port->token_pop_mode == DELAYED_POP) {
1046                 dlb->event_dev->enqueue = dlb_event_enqueue_delayed;
1047                 dlb->event_dev->enqueue_burst =
1048                         dlb_event_enqueue_burst_delayed;
1049                 dlb->event_dev->enqueue_new_burst =
1050                         dlb_event_enqueue_new_burst_delayed;
1051                 dlb->event_dev->enqueue_forward_burst =
1052                         dlb_event_enqueue_forward_burst_delayed;
1053         }
1054
1055         qm_port->owed_tokens = 0;
1056         qm_port->issued_releases = 0;
1057
1058         /* update state */
1059         qm_port->state = PORT_STARTED; /* enabled at create time */
1060         qm_port->config_state = DLB_CONFIGURED;
1061
1062         qm_port->dir_credits = cfg.dir_credit_high_watermark;
1063         qm_port->ldb_credits = cfg.ldb_credit_high_watermark;
1064
1065         DLB_LOG_DBG("dlb: created ldb port %d, depth = %d, ldb credits=%d, dir credits=%d\n",
1066                     qm_port_id,
1067                     cq_depth,
1068                     qm_port->ldb_credits,
1069                     qm_port->dir_credits);
1070
1071         rte_spinlock_unlock(&handle->resource_lock);
1072
1073         return 0;
1074
1075 error_exit:
1076         if (qm_port) {
1077                 dlb_free_qe_mem(qm_port);
1078                 qm_port->pp_mmio_base = 0;
1079         }
1080
1081         rte_spinlock_unlock(&handle->resource_lock);
1082
1083         DLB_LOG_ERR("dlb: create ldb port failed!\n");
1084
1085         return ret;
1086 }
1087
1088 static int
1089 dlb_hw_create_dir_port(struct dlb_eventdev *dlb,
1090                        struct dlb_eventdev_port *ev_port,
1091                        uint32_t dequeue_depth,
1092                        uint32_t cq_depth,
1093                        uint32_t enqueue_depth,
1094                        uint16_t rsvd_tokens,
1095                        bool use_rsvd_token_scheme)
1096 {
1097         struct dlb_hw_dev *handle = &dlb->qm_instance;
1098         struct dlb_create_dir_port_args cfg = {0};
1099         struct dlb_cmd_response response = {0};
1100         int ret;
1101         struct dlb_port *qm_port = NULL;
1102         char mz_name[RTE_MEMZONE_NAMESIZE];
1103         uint32_t qm_port_id;
1104
1105         if (dlb == NULL || handle == NULL)
1106                 return -EINVAL;
1107
1108         if (cq_depth < DLB_MIN_DIR_CQ_DEPTH) {
1109                 DLB_LOG_ERR("dlb: invalid cq_depth, must be at least %d\n",
1110                             DLB_MIN_DIR_CQ_DEPTH);
1111                 return -EINVAL;
1112         }
1113
1114         if (enqueue_depth < DLB_MIN_ENQUEUE_DEPTH) {
1115                 DLB_LOG_ERR("dlb: invalid enqueue_depth, must be at least %d\n",
1116                             DLB_MIN_ENQUEUE_DEPTH);
1117                 return -EINVAL;
1118         }
1119
1120         rte_spinlock_lock(&handle->resource_lock);
1121
1122         /* Directed queues are configured at link time. */
1123         cfg.queue_id = -1;
1124
1125         cfg.response = (uintptr_t)&response;
1126
1127         /* We round up to the next power of 2 if necessary */
1128         cfg.cq_depth = rte_align32pow2(cq_depth);
1129         cfg.cq_depth_threshold = rsvd_tokens;
1130
1131         /* User controls the LDB high watermark via enqueue depth. The DIR high
1132          * watermark is equal, unless the directed credit pool is too small.
1133          */
1134         cfg.ldb_credit_high_watermark = enqueue_depth;
1135
1136         /* Don't use enqueue_depth if it would require more directed credits
1137          * than are available.
1138          */
1139         cfg.dir_credit_high_watermark =
1140                 RTE_MIN(enqueue_depth,
1141                         handle->cfg.num_dir_credits / dlb->num_ports);
1142
1143         cfg.ldb_credit_quantum = cfg.ldb_credit_high_watermark / 2;
1144         cfg.ldb_credit_low_watermark = RTE_MIN(16, cfg.ldb_credit_quantum);
1145
1146         cfg.dir_credit_quantum = cfg.dir_credit_high_watermark / 2;
1147         cfg.dir_credit_low_watermark = RTE_MIN(16, cfg.dir_credit_quantum);
1148
1149         /* Per QM values */
1150
1151         cfg.ldb_credit_pool_id = handle->cfg.ldb_credit_pool_id;
1152         cfg.dir_credit_pool_id = handle->cfg.dir_credit_pool_id;
1153
1154         ret = dlb_iface_dir_port_create(handle, &cfg, dlb->poll_mode);
1155         if (ret < 0) {
1156                 DLB_LOG_ERR("dlb: dlb_dir_port_create error, ret=%d (driver status: %s)\n",
1157                             ret, dlb_error_strings[response.status]);
1158                 goto error_exit;
1159         }
1160
1161         qm_port_id = response.id;
1162
1163         DLB_LOG_DBG("dlb: ev_port %d uses qm DIR port %d <<<<<\n",
1164                     ev_port->id, qm_port_id);
1165
1166         qm_port = &ev_port->qm_port;
1167         qm_port->ev_port = ev_port; /* back ptr */
1168         qm_port->dlb = dlb;  /* back ptr */
1169
1170         /*
1171          * Init local qe struct(s).
1172          * Note: MOVDIR64 requires the enqueue QE to be aligned
1173          */
1174
1175         snprintf(mz_name, sizeof(mz_name), "dir_port%d",
1176                  ev_port->id);
1177
1178         ret = dlb_init_qe_mem(qm_port, mz_name);
1179
1180         if (ret < 0) {
1181                 DLB_LOG_ERR("dlb: init_qe_mem failed, ret=%d\n", ret);
1182                 goto error_exit;
1183         }
1184
1185         qm_port->pp_mmio_base = DLB_DIR_PP_BASE + PAGE_SIZE * qm_port_id;
1186         qm_port->id = qm_port_id;
1187
1188         /* The credit window is one high water mark of QEs */
1189         qm_port->ldb_pushcount_at_credit_expiry = 0;
1190         qm_port->cached_ldb_credits = cfg.ldb_credit_high_watermark;
1191         /* The credit window is one high water mark of QEs */
1192         qm_port->dir_pushcount_at_credit_expiry = 0;
1193         qm_port->cached_dir_credits = cfg.dir_credit_high_watermark;
1194         qm_port->cq_depth = cfg.cq_depth;
1195         qm_port->cq_idx = 0;
1196         qm_port->cq_idx_unmasked = 0;
1197         if (dlb->poll_mode == DLB_CQ_POLL_MODE_SPARSE)
1198                 qm_port->cq_depth_mask = (cfg.cq_depth * 4) - 1;
1199         else
1200                 qm_port->cq_depth_mask = cfg.cq_depth - 1;
1201
1202         qm_port->gen_bit_shift = __builtin_popcount(qm_port->cq_depth_mask);
1203         /* starting value of gen bit - it toggles at wrap time */
1204         qm_port->gen_bit = 1;
1205
1206         qm_port->use_rsvd_token_scheme = use_rsvd_token_scheme;
1207         qm_port->cq_rsvd_token_deficit = rsvd_tokens;
1208         qm_port->int_armed = false;
1209
1210         /* Save off for later use in info and lookup APIs. */
1211         qm_port->qid_mappings = &dlb->qm_dir_to_ev_queue_id[0];
1212
1213         qm_port->dequeue_depth = dequeue_depth;
1214
1215         /* Directed ports are auto-pop, by default. */
1216         qm_port->token_pop_mode = AUTO_POP;
1217         qm_port->owed_tokens = 0;
1218         qm_port->issued_releases = 0;
1219
1220         /* update state */
1221         qm_port->state = PORT_STARTED; /* enabled at create time */
1222         qm_port->config_state = DLB_CONFIGURED;
1223
1224         qm_port->dir_credits = cfg.dir_credit_high_watermark;
1225         qm_port->ldb_credits = cfg.ldb_credit_high_watermark;
1226
1227         DLB_LOG_DBG("dlb: created dir port %d, depth = %d cr=%d,%d\n",
1228                     qm_port_id,
1229                     cq_depth,
1230                     cfg.dir_credit_high_watermark,
1231                     cfg.ldb_credit_high_watermark);
1232
1233         rte_spinlock_unlock(&handle->resource_lock);
1234
1235         return 0;
1236
1237 error_exit:
1238         if (qm_port) {
1239                 qm_port->pp_mmio_base = 0;
1240                 dlb_free_qe_mem(qm_port);
1241         }
1242
1243         rte_spinlock_unlock(&handle->resource_lock);
1244
1245         DLB_LOG_ERR("dlb: create dir port failed!\n");
1246
1247         return ret;
1248 }
1249
1250 static int32_t
1251 dlb_hw_create_ldb_queue(struct dlb_eventdev *dlb,
1252                         struct dlb_queue *queue,
1253                         const struct rte_event_queue_conf *evq_conf)
1254 {
1255         struct dlb_hw_dev *handle = &dlb->qm_instance;
1256         struct dlb_create_ldb_queue_args cfg;
1257         struct dlb_cmd_response response;
1258         int32_t ret;
1259         uint32_t qm_qid;
1260         int sched_type = -1;
1261
1262         if (evq_conf == NULL)
1263                 return -EINVAL;
1264
1265         if (evq_conf->event_queue_cfg & RTE_EVENT_QUEUE_CFG_ALL_TYPES) {
1266                 if (evq_conf->nb_atomic_order_sequences != 0)
1267                         sched_type = RTE_SCHED_TYPE_ORDERED;
1268                 else
1269                         sched_type = RTE_SCHED_TYPE_PARALLEL;
1270         } else
1271                 sched_type = evq_conf->schedule_type;
1272
1273         cfg.response = (uintptr_t)&response;
1274         cfg.num_atomic_inflights = dlb->num_atm_inflights_per_queue;
1275         cfg.num_sequence_numbers = evq_conf->nb_atomic_order_sequences;
1276         cfg.num_qid_inflights = evq_conf->nb_atomic_order_sequences;
1277
1278         if (sched_type != RTE_SCHED_TYPE_ORDERED) {
1279                 cfg.num_sequence_numbers = 0;
1280                 cfg.num_qid_inflights = DLB_DEF_UNORDERED_QID_INFLIGHTS;
1281         }
1282
1283         ret = dlb_iface_ldb_queue_create(handle, &cfg);
1284         if (ret < 0) {
1285                 DLB_LOG_ERR("dlb: create LB event queue error, ret=%d (driver status: %s)\n",
1286                             ret, dlb_error_strings[response.status]);
1287                 return -EINVAL;
1288         }
1289
1290         qm_qid = response.id;
1291
1292         /* Save off queue config for debug, resource lookups, and reconfig */
1293         queue->num_qid_inflights = cfg.num_qid_inflights;
1294         queue->num_atm_inflights = cfg.num_atomic_inflights;
1295
1296         queue->sched_type = sched_type;
1297         queue->config_state = DLB_CONFIGURED;
1298
1299         DLB_LOG_DBG("Created LB event queue %d, nb_inflights=%d, nb_seq=%d, qid inflights=%d\n",
1300                     qm_qid,
1301                     cfg.num_atomic_inflights,
1302                     cfg.num_sequence_numbers,
1303                     cfg.num_qid_inflights);
1304
1305         return qm_qid;
1306 }
1307
1308 static int32_t
1309 dlb_get_sn_allocation(struct dlb_eventdev *dlb, int group)
1310 {
1311         struct dlb_hw_dev *handle = &dlb->qm_instance;
1312         struct dlb_get_sn_allocation_args cfg;
1313         struct dlb_cmd_response response;
1314         int ret;
1315
1316         cfg.group = group;
1317         cfg.response = (uintptr_t)&response;
1318
1319         ret = dlb_iface_get_sn_allocation(handle, &cfg);
1320         if (ret < 0) {
1321                 DLB_LOG_ERR("dlb: get_sn_allocation ret=%d (driver status: %s)\n",
1322                             ret, dlb_error_strings[response.status]);
1323                 return ret;
1324         }
1325
1326         return response.id;
1327 }
1328
1329 static int
1330 dlb_set_sn_allocation(struct dlb_eventdev *dlb, int group, int num)
1331 {
1332         struct dlb_hw_dev *handle = &dlb->qm_instance;
1333         struct dlb_set_sn_allocation_args cfg;
1334         struct dlb_cmd_response response;
1335         int ret;
1336
1337         cfg.num = num;
1338         cfg.group = group;
1339         cfg.response = (uintptr_t)&response;
1340
1341         ret = dlb_iface_set_sn_allocation(handle, &cfg);
1342         if (ret < 0) {
1343                 DLB_LOG_ERR("dlb: set_sn_allocation ret=%d (driver status: %s)\n",
1344                             ret, dlb_error_strings[response.status]);
1345                 return ret;
1346         }
1347
1348         return ret;
1349 }
1350
1351 static int32_t
1352 dlb_get_sn_occupancy(struct dlb_eventdev *dlb, int group)
1353 {
1354         struct dlb_hw_dev *handle = &dlb->qm_instance;
1355         struct dlb_get_sn_occupancy_args cfg;
1356         struct dlb_cmd_response response;
1357         int ret;
1358
1359         cfg.group = group;
1360         cfg.response = (uintptr_t)&response;
1361
1362         ret = dlb_iface_get_sn_occupancy(handle, &cfg);
1363         if (ret < 0) {
1364                 DLB_LOG_ERR("dlb: get_sn_occupancy ret=%d (driver status: %s)\n",
1365                             ret, dlb_error_strings[response.status]);
1366                 return ret;
1367         }
1368
1369         return response.id;
1370 }
1371
1372 /* Query the current sequence number allocations and, if they conflict with the
1373  * requested LDB queue configuration, attempt to re-allocate sequence numbers.
1374  * This is best-effort; if it fails, the PMD will attempt to configure the
1375  * load-balanced queue and return an error.
1376  */
1377 static void
1378 dlb_program_sn_allocation(struct dlb_eventdev *dlb,
1379                           const struct rte_event_queue_conf *queue_conf)
1380 {
1381         int grp_occupancy[DLB_NUM_SN_GROUPS];
1382         int grp_alloc[DLB_NUM_SN_GROUPS];
1383         int i, sequence_numbers;
1384
1385         sequence_numbers = (int)queue_conf->nb_atomic_order_sequences;
1386
1387         for (i = 0; i < DLB_NUM_SN_GROUPS; i++) {
1388                 int total_slots;
1389
1390                 grp_alloc[i] = dlb_get_sn_allocation(dlb, i);
1391                 if (grp_alloc[i] < 0)
1392                         return;
1393
1394                 total_slots = DLB_MAX_LDB_SN_ALLOC / grp_alloc[i];
1395
1396                 grp_occupancy[i] = dlb_get_sn_occupancy(dlb, i);
1397                 if (grp_occupancy[i] < 0)
1398                         return;
1399
1400                 /* DLB has at least one available slot for the requested
1401                  * sequence numbers, so no further configuration required.
1402                  */
1403                 if (grp_alloc[i] == sequence_numbers &&
1404                     grp_occupancy[i] < total_slots)
1405                         return;
1406         }
1407
1408         /* None of the sequence number groups are configured for the requested
1409          * sequence numbers, so we have to reconfigure one of them. This is
1410          * only possible if a group is not in use.
1411          */
1412         for (i = 0; i < DLB_NUM_SN_GROUPS; i++) {
1413                 if (grp_occupancy[i] == 0)
1414                         break;
1415         }
1416
1417         if (i == DLB_NUM_SN_GROUPS) {
1418                 DLB_LOG_ERR("[%s()] No groups with %d sequence_numbers are available or have free slots\n",
1419                        __func__, sequence_numbers);
1420                 return;
1421         }
1422
1423         /* Attempt to configure slot i with the requested number of sequence
1424          * numbers. Ignore the return value -- if this fails, the error will be
1425          * caught during subsequent queue configuration.
1426          */
1427         dlb_set_sn_allocation(dlb, i, sequence_numbers);
1428 }
1429
1430 static int
1431 dlb_eventdev_ldb_queue_setup(struct rte_eventdev *dev,
1432                              struct dlb_eventdev_queue *ev_queue,
1433                              const struct rte_event_queue_conf *queue_conf)
1434 {
1435         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1436         int32_t qm_qid;
1437
1438         if (queue_conf->nb_atomic_order_sequences)
1439                 dlb_program_sn_allocation(dlb, queue_conf);
1440
1441         qm_qid = dlb_hw_create_ldb_queue(dlb,
1442                                          &ev_queue->qm_queue,
1443                                          queue_conf);
1444         if (qm_qid < 0) {
1445                 DLB_LOG_ERR("Failed to create the load-balanced queue\n");
1446
1447                 return qm_qid;
1448         }
1449
1450         dlb->qm_ldb_to_ev_queue_id[qm_qid] = ev_queue->id;
1451
1452         ev_queue->qm_queue.id = qm_qid;
1453
1454         return 0;
1455 }
1456
1457 static int dlb_num_dir_queues_setup(struct dlb_eventdev *dlb)
1458 {
1459         int i, num = 0;
1460
1461         for (i = 0; i < dlb->num_queues; i++) {
1462                 if (dlb->ev_queues[i].setup_done &&
1463                     dlb->ev_queues[i].qm_queue.is_directed)
1464                         num++;
1465         }
1466
1467         return num;
1468 }
1469
1470 static void
1471 dlb_queue_link_teardown(struct dlb_eventdev *dlb,
1472                         struct dlb_eventdev_queue *ev_queue)
1473 {
1474         struct dlb_eventdev_port *ev_port;
1475         int i, j;
1476
1477         for (i = 0; i < dlb->num_ports; i++) {
1478                 ev_port = &dlb->ev_ports[i];
1479
1480                 for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++) {
1481                         if (!ev_port->link[j].valid ||
1482                             ev_port->link[j].queue_id != ev_queue->id)
1483                                 continue;
1484
1485                         ev_port->link[j].valid = false;
1486                         ev_port->num_links--;
1487                 }
1488         }
1489
1490         ev_queue->num_links = 0;
1491 }
1492
1493 static int
1494 dlb_eventdev_queue_setup(struct rte_eventdev *dev,
1495                          uint8_t ev_qid,
1496                          const struct rte_event_queue_conf *queue_conf)
1497 {
1498         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1499         struct dlb_eventdev_queue *ev_queue;
1500         int ret;
1501
1502         if (queue_conf == NULL)
1503                 return -EINVAL;
1504
1505         if (ev_qid >= dlb->num_queues)
1506                 return -EINVAL;
1507
1508         ev_queue = &dlb->ev_queues[ev_qid];
1509
1510         ev_queue->qm_queue.is_directed = queue_conf->event_queue_cfg &
1511                 RTE_EVENT_QUEUE_CFG_SINGLE_LINK;
1512         ev_queue->id = ev_qid;
1513         ev_queue->conf = *queue_conf;
1514
1515         if (!ev_queue->qm_queue.is_directed) {
1516                 ret = dlb_eventdev_ldb_queue_setup(dev, ev_queue, queue_conf);
1517         } else {
1518                 /* The directed queue isn't setup until link time, at which
1519                  * point we know its directed port ID. Directed queue setup
1520                  * will only fail if this queue is already setup or there are
1521                  * no directed queues left to configure.
1522                  */
1523                 ret = 0;
1524
1525                 ev_queue->qm_queue.config_state = DLB_NOT_CONFIGURED;
1526
1527                 if (ev_queue->setup_done ||
1528                     dlb_num_dir_queues_setup(dlb) == dlb->num_dir_queues)
1529                         ret = -EINVAL;
1530         }
1531
1532         /* Tear down pre-existing port->queue links */
1533         if (!ret && dlb->run_state == DLB_RUN_STATE_STOPPED)
1534                 dlb_queue_link_teardown(dlb, ev_queue);
1535
1536         if (!ret)
1537                 ev_queue->setup_done = true;
1538
1539         return ret;
1540 }
1541
1542 static void
1543 dlb_port_link_teardown(struct dlb_eventdev *dlb,
1544                        struct dlb_eventdev_port *ev_port)
1545 {
1546         struct dlb_eventdev_queue *ev_queue;
1547         int i;
1548
1549         for (i = 0; i < DLB_MAX_NUM_QIDS_PER_LDB_CQ; i++) {
1550                 if (!ev_port->link[i].valid)
1551                         continue;
1552
1553                 ev_queue = &dlb->ev_queues[ev_port->link[i].queue_id];
1554
1555                 ev_port->link[i].valid = false;
1556                 ev_port->num_links--;
1557                 ev_queue->num_links--;
1558         }
1559 }
1560
1561 static int
1562 dlb_eventdev_port_setup(struct rte_eventdev *dev,
1563                         uint8_t ev_port_id,
1564                         const struct rte_event_port_conf *port_conf)
1565 {
1566         struct dlb_eventdev *dlb;
1567         struct dlb_eventdev_port *ev_port;
1568         bool use_rsvd_token_scheme;
1569         uint32_t adj_cq_depth;
1570         uint16_t rsvd_tokens;
1571         int ret;
1572
1573         if (dev == NULL || port_conf == NULL) {
1574                 DLB_LOG_ERR("Null parameter\n");
1575                 return -EINVAL;
1576         }
1577
1578         dlb = dlb_pmd_priv(dev);
1579
1580         if (ev_port_id >= DLB_MAX_NUM_PORTS)
1581                 return -EINVAL;
1582
1583         if (port_conf->dequeue_depth >
1584                 evdev_dlb_default_info.max_event_port_dequeue_depth ||
1585             port_conf->enqueue_depth >
1586                 evdev_dlb_default_info.max_event_port_enqueue_depth)
1587                 return -EINVAL;
1588
1589         ev_port = &dlb->ev_ports[ev_port_id];
1590         /* configured? */
1591         if (ev_port->setup_done) {
1592                 DLB_LOG_ERR("evport %d is already configured\n", ev_port_id);
1593                 return -EINVAL;
1594         }
1595
1596         /* The reserved token interrupt arming scheme requires that one or more
1597          * CQ tokens be reserved by the PMD. This limits the amount of CQ space
1598          * usable by the DLB, so in order to give an *effective* CQ depth equal
1599          * to the user-requested value, we double CQ depth and reserve half of
1600          * its tokens. If the user requests the max CQ depth (256) then we
1601          * cannot double it, so we reserve one token and give an effective
1602          * depth of 255 entries.
1603          */
1604         use_rsvd_token_scheme = true;
1605         rsvd_tokens = 1;
1606         adj_cq_depth = port_conf->dequeue_depth;
1607
1608         if (use_rsvd_token_scheme && adj_cq_depth < 256) {
1609                 rsvd_tokens = adj_cq_depth;
1610                 adj_cq_depth *= 2;
1611         }
1612
1613         ev_port->qm_port.is_directed = port_conf->event_port_cfg &
1614                 RTE_EVENT_PORT_CFG_SINGLE_LINK;
1615
1616         if (!ev_port->qm_port.is_directed) {
1617                 ret = dlb_hw_create_ldb_port(dlb,
1618                                              ev_port,
1619                                              port_conf->dequeue_depth,
1620                                              adj_cq_depth,
1621                                              port_conf->enqueue_depth,
1622                                              rsvd_tokens,
1623                                              use_rsvd_token_scheme);
1624                 if (ret < 0) {
1625                         DLB_LOG_ERR("Failed to create the lB port ve portId=%d\n",
1626                                     ev_port_id);
1627                         return ret;
1628                 }
1629         } else {
1630                 ret = dlb_hw_create_dir_port(dlb,
1631                                              ev_port,
1632                                              port_conf->dequeue_depth,
1633                                              adj_cq_depth,
1634                                              port_conf->enqueue_depth,
1635                                              rsvd_tokens,
1636                                              use_rsvd_token_scheme);
1637                 if (ret < 0) {
1638                         DLB_LOG_ERR("Failed to create the DIR port\n");
1639                         return ret;
1640                 }
1641         }
1642
1643         /* Save off port config for reconfig */
1644         dlb->ev_ports[ev_port_id].conf = *port_conf;
1645
1646         dlb->ev_ports[ev_port_id].id = ev_port_id;
1647         dlb->ev_ports[ev_port_id].enq_configured = true;
1648         dlb->ev_ports[ev_port_id].setup_done = true;
1649         dlb->ev_ports[ev_port_id].inflight_max =
1650                 port_conf->new_event_threshold;
1651         dlb->ev_ports[ev_port_id].implicit_release =
1652                 !(port_conf->event_port_cfg &
1653                   RTE_EVENT_PORT_CFG_DISABLE_IMPL_REL);
1654         dlb->ev_ports[ev_port_id].outstanding_releases = 0;
1655         dlb->ev_ports[ev_port_id].inflight_credits = 0;
1656         dlb->ev_ports[ev_port_id].credit_update_quanta =
1657                 RTE_LIBRTE_PMD_DLB_SW_CREDIT_QUANTA;
1658         dlb->ev_ports[ev_port_id].dlb = dlb; /* reverse link */
1659
1660         /* Tear down pre-existing port->queue links */
1661         if (dlb->run_state == DLB_RUN_STATE_STOPPED)
1662                 dlb_port_link_teardown(dlb, &dlb->ev_ports[ev_port_id]);
1663
1664         dev->data->ports[ev_port_id] = &dlb->ev_ports[ev_port_id];
1665
1666         return 0;
1667 }
1668
1669 static int
1670 dlb_eventdev_reapply_configuration(struct rte_eventdev *dev)
1671 {
1672         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1673         int ret, i;
1674
1675         /* If an event queue or port was previously configured, but hasn't been
1676          * reconfigured, reapply its original configuration.
1677          */
1678         for (i = 0; i < dlb->num_queues; i++) {
1679                 struct dlb_eventdev_queue *ev_queue;
1680
1681                 ev_queue = &dlb->ev_queues[i];
1682
1683                 if (ev_queue->qm_queue.config_state != DLB_PREV_CONFIGURED)
1684                         continue;
1685
1686                 ret = dlb_eventdev_queue_setup(dev, i, &ev_queue->conf);
1687                 if (ret < 0) {
1688                         DLB_LOG_ERR("dlb: failed to reconfigure queue %d", i);
1689                         return ret;
1690                 }
1691         }
1692
1693         for (i = 0; i < dlb->num_ports; i++) {
1694                 struct dlb_eventdev_port *ev_port = &dlb->ev_ports[i];
1695
1696                 if (ev_port->qm_port.config_state != DLB_PREV_CONFIGURED)
1697                         continue;
1698
1699                 ret = dlb_eventdev_port_setup(dev, i, &ev_port->conf);
1700                 if (ret < 0) {
1701                         DLB_LOG_ERR("dlb: failed to reconfigure ev_port %d",
1702                                     i);
1703                         return ret;
1704                 }
1705         }
1706
1707         return 0;
1708 }
1709
1710 static int
1711 set_dev_id(const char *key __rte_unused,
1712            const char *value,
1713            void *opaque)
1714 {
1715         int *dev_id = opaque;
1716         int ret;
1717
1718         if (value == NULL || opaque == NULL) {
1719                 DLB_LOG_ERR("NULL pointer\n");
1720                 return -EINVAL;
1721         }
1722
1723         ret = dlb_string_to_int(dev_id, value);
1724         if (ret < 0)
1725                 return ret;
1726
1727         return 0;
1728 }
1729
1730 static int
1731 set_defer_sched(const char *key __rte_unused,
1732                 const char *value,
1733                 void *opaque)
1734 {
1735         int *defer_sched = opaque;
1736
1737         if (value == NULL || opaque == NULL) {
1738                 DLB_LOG_ERR("NULL pointer\n");
1739                 return -EINVAL;
1740         }
1741
1742         if (strncmp(value, "on", 2) != 0) {
1743                 DLB_LOG_ERR("Invalid defer_sched argument \"%s\" (expected \"on\")\n",
1744                             value);
1745                 return -EINVAL;
1746         }
1747
1748         *defer_sched = 1;
1749
1750         return 0;
1751 }
1752
1753 static int
1754 set_num_atm_inflights(const char *key __rte_unused,
1755                       const char *value,
1756                       void *opaque)
1757 {
1758         int *num_atm_inflights = opaque;
1759         int ret;
1760
1761         if (value == NULL || opaque == NULL) {
1762                 DLB_LOG_ERR("NULL pointer\n");
1763                 return -EINVAL;
1764         }
1765
1766         ret = dlb_string_to_int(num_atm_inflights, value);
1767         if (ret < 0)
1768                 return ret;
1769
1770         if (*num_atm_inflights < 0 ||
1771             *num_atm_inflights > DLB_MAX_NUM_ATM_INFLIGHTS) {
1772                 DLB_LOG_ERR("dlb: atm_inflights must be between 0 and %d\n",
1773                             DLB_MAX_NUM_ATM_INFLIGHTS);
1774                 return -EINVAL;
1775         }
1776
1777         return 0;
1778 }
1779
1780 static int
1781 dlb_validate_port_link(struct dlb_eventdev_port *ev_port,
1782                        uint8_t queue_id,
1783                        bool link_exists,
1784                        int index)
1785 {
1786         struct dlb_eventdev *dlb = ev_port->dlb;
1787         struct dlb_eventdev_queue *ev_queue;
1788         bool port_is_dir, queue_is_dir;
1789
1790         if (queue_id > dlb->num_queues) {
1791                 DLB_LOG_ERR("queue_id %d > num queues %d\n",
1792                             queue_id, dlb->num_queues);
1793                 rte_errno = -EINVAL;
1794                 return -1;
1795         }
1796
1797         ev_queue = &dlb->ev_queues[queue_id];
1798
1799         if (!ev_queue->setup_done &&
1800             ev_queue->qm_queue.config_state != DLB_PREV_CONFIGURED) {
1801                 DLB_LOG_ERR("setup not done and not previously configured\n");
1802                 rte_errno = -EINVAL;
1803                 return -1;
1804         }
1805
1806         port_is_dir = ev_port->qm_port.is_directed;
1807         queue_is_dir = ev_queue->qm_queue.is_directed;
1808
1809         if (port_is_dir != queue_is_dir) {
1810                 DLB_LOG_ERR("%s queue %u can't link to %s port %u\n",
1811                             queue_is_dir ? "DIR" : "LDB", ev_queue->id,
1812                             port_is_dir ? "DIR" : "LDB", ev_port->id);
1813
1814                 rte_errno = -EINVAL;
1815                 return -1;
1816         }
1817
1818         /* Check if there is space for the requested link */
1819         if (!link_exists && index == -1) {
1820                 DLB_LOG_ERR("no space for new link\n");
1821                 rte_errno = -ENOSPC;
1822                 return -1;
1823         }
1824
1825         /* Check if the directed port is already linked */
1826         if (ev_port->qm_port.is_directed && ev_port->num_links > 0 &&
1827             !link_exists) {
1828                 DLB_LOG_ERR("Can't link DIR port %d to >1 queues\n",
1829                             ev_port->id);
1830                 rte_errno = -EINVAL;
1831                 return -1;
1832         }
1833
1834         /* Check if the directed queue is already linked */
1835         if (ev_queue->qm_queue.is_directed && ev_queue->num_links > 0 &&
1836             !link_exists) {
1837                 DLB_LOG_ERR("Can't link DIR queue %d to >1 ports\n",
1838                             ev_queue->id);
1839                 rte_errno = -EINVAL;
1840                 return -1;
1841         }
1842
1843         return 0;
1844 }
1845
1846 static int32_t
1847 dlb_hw_create_dir_queue(struct dlb_eventdev *dlb, int32_t qm_port_id)
1848 {
1849         struct dlb_hw_dev *handle = &dlb->qm_instance;
1850         struct dlb_create_dir_queue_args cfg;
1851         struct dlb_cmd_response response;
1852         int32_t ret;
1853
1854         cfg.response = (uintptr_t)&response;
1855
1856         /* The directed port is always configured before its queue */
1857         cfg.port_id = qm_port_id;
1858
1859         ret = dlb_iface_dir_queue_create(handle, &cfg);
1860         if (ret < 0) {
1861                 DLB_LOG_ERR("dlb: create DIR event queue error, ret=%d (driver status: %s)\n",
1862                             ret, dlb_error_strings[response.status]);
1863                 return -EINVAL;
1864         }
1865
1866         return response.id;
1867 }
1868
1869 static int
1870 dlb_eventdev_dir_queue_setup(struct dlb_eventdev *dlb,
1871                              struct dlb_eventdev_queue *ev_queue,
1872                              struct dlb_eventdev_port *ev_port)
1873 {
1874         int32_t qm_qid;
1875
1876         qm_qid = dlb_hw_create_dir_queue(dlb, ev_port->qm_port.id);
1877
1878         if (qm_qid < 0) {
1879                 DLB_LOG_ERR("Failed to create the DIR queue\n");
1880                 return qm_qid;
1881         }
1882
1883         dlb->qm_dir_to_ev_queue_id[qm_qid] = ev_queue->id;
1884
1885         ev_queue->qm_queue.id = qm_qid;
1886
1887         return 0;
1888 }
1889
1890 static int16_t
1891 dlb_hw_map_ldb_qid_to_port(struct dlb_hw_dev *handle,
1892                            uint32_t qm_port_id,
1893                            uint16_t qm_qid,
1894                            uint8_t priority)
1895 {
1896         struct dlb_map_qid_args cfg;
1897         struct dlb_cmd_response response;
1898         int32_t ret;
1899
1900         if (handle == NULL)
1901                 return -EINVAL;
1902
1903         /* Build message */
1904         cfg.response = (uintptr_t)&response;
1905         cfg.port_id = qm_port_id;
1906         cfg.qid = qm_qid;
1907         cfg.priority = EV_TO_DLB_PRIO(priority);
1908
1909         ret = dlb_iface_map_qid(handle, &cfg);
1910         if (ret < 0) {
1911                 DLB_LOG_ERR("dlb: map qid error, ret=%d (driver status: %s)\n",
1912                             ret, dlb_error_strings[response.status]);
1913                 DLB_LOG_ERR("dlb: device_id=%d grp=%d, qm_port=%d, qm_qid=%d prio=%d\n",
1914                             handle->device_id,
1915                             handle->domain_id, cfg.port_id,
1916                             cfg.qid,
1917                             cfg.priority);
1918         } else {
1919                 DLB_LOG_DBG("dlb: mapped queue %d to qm_port %d\n",
1920                             qm_qid, qm_port_id);
1921         }
1922
1923         return ret;
1924 }
1925
1926 static int
1927 dlb_event_queue_join_ldb(struct dlb_eventdev *dlb,
1928                          struct dlb_eventdev_port *ev_port,
1929                          struct dlb_eventdev_queue *ev_queue,
1930                          uint8_t priority)
1931 {
1932         int first_avail = -1;
1933         int ret, i;
1934
1935         for (i = 0; i < DLB_MAX_NUM_QIDS_PER_LDB_CQ; i++) {
1936                 if (ev_port->link[i].valid) {
1937                         if (ev_port->link[i].queue_id == ev_queue->id &&
1938                             ev_port->link[i].priority == priority) {
1939                                 if (ev_port->link[i].mapped)
1940                                         return 0; /* already mapped */
1941                                 first_avail = i;
1942                         }
1943                 } else {
1944                         if (first_avail == -1)
1945                                 first_avail = i;
1946                 }
1947         }
1948         if (first_avail == -1) {
1949                 DLB_LOG_ERR("dlb: qm_port %d has no available QID slots.\n",
1950                             ev_port->qm_port.id);
1951                 return -EINVAL;
1952         }
1953
1954         ret = dlb_hw_map_ldb_qid_to_port(&dlb->qm_instance,
1955                                          ev_port->qm_port.id,
1956                                          ev_queue->qm_queue.id,
1957                                          priority);
1958
1959         if (!ret)
1960                 ev_port->link[first_avail].mapped = true;
1961
1962         return ret;
1963 }
1964
1965 static int
1966 dlb_do_port_link(struct rte_eventdev *dev,
1967                  struct dlb_eventdev_queue *ev_queue,
1968                  struct dlb_eventdev_port *ev_port,
1969                  uint8_t prio)
1970 {
1971         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1972         int err;
1973
1974         /* Don't link until start time. */
1975         if (dlb->run_state == DLB_RUN_STATE_STOPPED)
1976                 return 0;
1977
1978         if (ev_queue->qm_queue.is_directed)
1979                 err = dlb_eventdev_dir_queue_setup(dlb, ev_queue, ev_port);
1980         else
1981                 err = dlb_event_queue_join_ldb(dlb, ev_port, ev_queue, prio);
1982
1983         if (err) {
1984                 DLB_LOG_ERR("port link failure for %s ev_q %d, ev_port %d\n",
1985                             ev_queue->qm_queue.is_directed ? "DIR" : "LDB",
1986                             ev_queue->id, ev_port->id);
1987
1988                 rte_errno = err;
1989                 return -1;
1990         }
1991
1992         return 0;
1993 }
1994
1995 static int
1996 dlb_eventdev_apply_port_links(struct rte_eventdev *dev)
1997 {
1998         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
1999         int i;
2000
2001         /* Perform requested port->queue links */
2002         for (i = 0; i < dlb->num_ports; i++) {
2003                 struct dlb_eventdev_port *ev_port = &dlb->ev_ports[i];
2004                 int j;
2005
2006                 for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++) {
2007                         struct dlb_eventdev_queue *ev_queue;
2008                         uint8_t prio, queue_id;
2009
2010                         if (!ev_port->link[j].valid)
2011                                 continue;
2012
2013                         prio = ev_port->link[j].priority;
2014                         queue_id = ev_port->link[j].queue_id;
2015
2016                         if (dlb_validate_port_link(ev_port, queue_id, true, j))
2017                                 return -EINVAL;
2018
2019                         ev_queue = &dlb->ev_queues[queue_id];
2020
2021                         if (dlb_do_port_link(dev, ev_queue, ev_port, prio))
2022                                 return -EINVAL;
2023                 }
2024         }
2025
2026         return 0;
2027 }
2028
2029 static int
2030 dlb_eventdev_port_link(struct rte_eventdev *dev, void *event_port,
2031                        const uint8_t queues[], const uint8_t priorities[],
2032                        uint16_t nb_links)
2033
2034 {
2035         struct dlb_eventdev_port *ev_port = event_port;
2036         struct dlb_eventdev *dlb;
2037         int i, j;
2038
2039         RTE_SET_USED(dev);
2040
2041         if (ev_port == NULL) {
2042                 DLB_LOG_ERR("dlb: evport not setup\n");
2043                 rte_errno = -EINVAL;
2044                 return 0;
2045         }
2046
2047         if (!ev_port->setup_done &&
2048             ev_port->qm_port.config_state != DLB_PREV_CONFIGURED) {
2049                 DLB_LOG_ERR("dlb: evport not setup\n");
2050                 rte_errno = -EINVAL;
2051                 return 0;
2052         }
2053
2054         /* Note: rte_event_port_link() ensures the PMD won't receive a NULL
2055          * queues pointer.
2056          */
2057         if (nb_links == 0) {
2058                 DLB_LOG_DBG("dlb: nb_links is 0\n");
2059                 return 0; /* Ignore and return success */
2060         }
2061
2062         dlb = ev_port->dlb;
2063
2064         DLB_LOG_DBG("Linking %u queues to %s port %d\n",
2065                     nb_links,
2066                     ev_port->qm_port.is_directed ? "DIR" : "LDB",
2067                     ev_port->id);
2068
2069         for (i = 0; i < nb_links; i++) {
2070                 struct dlb_eventdev_queue *ev_queue;
2071                 uint8_t queue_id, prio;
2072                 bool found = false;
2073                 int index = -1;
2074
2075                 queue_id = queues[i];
2076                 prio = priorities[i];
2077
2078                 /* Check if the link already exists. */
2079                 for (j = 0; j < DLB_MAX_NUM_QIDS_PER_LDB_CQ; j++)
2080                         if (ev_port->link[j].valid) {
2081                                 if (ev_port->link[j].queue_id == queue_id) {
2082                                         found = true;
2083                                         index = j;
2084                                         break;
2085                                 }
2086                         } else {
2087                                 if (index == -1)
2088                                         index = j;
2089                         }
2090
2091                 /* could not link */
2092                 if (index == -1)
2093                         break;
2094
2095                 /* Check if already linked at the requested priority */
2096                 if (found && ev_port->link[j].priority == prio)
2097                         continue;
2098
2099                 if (dlb_validate_port_link(ev_port, queue_id, found, index))
2100                         break; /* return index of offending queue */
2101
2102                 ev_queue = &dlb->ev_queues[queue_id];
2103
2104                 if (dlb_do_port_link(dev, ev_queue, ev_port, prio))
2105                         break; /* return index of offending queue */
2106
2107                 ev_queue->num_links++;
2108
2109                 ev_port->link[index].queue_id = queue_id;
2110                 ev_port->link[index].priority = prio;
2111                 ev_port->link[index].valid = true;
2112                 /* Entry already exists?  If so, then must be prio change */
2113                 if (!found)
2114                         ev_port->num_links++;
2115         }
2116         return i;
2117 }
2118
2119 static int
2120 dlb_eventdev_start(struct rte_eventdev *dev)
2121 {
2122         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
2123         struct dlb_hw_dev *handle = &dlb->qm_instance;
2124         struct dlb_start_domain_args cfg;
2125         struct dlb_cmd_response response;
2126         int ret, i;
2127
2128         rte_spinlock_lock(&dlb->qm_instance.resource_lock);
2129         if (dlb->run_state != DLB_RUN_STATE_STOPPED) {
2130                 DLB_LOG_ERR("bad state %d for dev_start\n",
2131                             (int)dlb->run_state);
2132                 rte_spinlock_unlock(&dlb->qm_instance.resource_lock);
2133                 return -EINVAL;
2134         }
2135         dlb->run_state  = DLB_RUN_STATE_STARTING;
2136         rte_spinlock_unlock(&dlb->qm_instance.resource_lock);
2137
2138         /* If the device was configured more than once, some event ports and/or
2139          * queues may need to be reconfigured.
2140          */
2141         ret = dlb_eventdev_reapply_configuration(dev);
2142         if (ret)
2143                 return ret;
2144
2145         /* The DLB PMD delays port links until the device is started. */
2146         ret = dlb_eventdev_apply_port_links(dev);
2147         if (ret)
2148                 return ret;
2149
2150         cfg.response = (uintptr_t)&response;
2151
2152         for (i = 0; i < dlb->num_ports; i++) {
2153                 if (!dlb->ev_ports[i].setup_done) {
2154                         DLB_LOG_ERR("dlb: port %d not setup", i);
2155                         return -ESTALE;
2156                 }
2157         }
2158
2159         for (i = 0; i < dlb->num_queues; i++) {
2160                 if (dlb->ev_queues[i].num_links == 0) {
2161                         DLB_LOG_ERR("dlb: queue %d is not linked", i);
2162                         return -ENOLINK;
2163                 }
2164         }
2165
2166         ret = dlb_iface_sched_domain_start(handle, &cfg);
2167         if (ret < 0) {
2168                 DLB_LOG_ERR("dlb: sched_domain_start ret=%d (driver status: %s)\n",
2169                             ret, dlb_error_strings[response.status]);
2170                 return ret;
2171         }
2172
2173         dlb->run_state = DLB_RUN_STATE_STARTED;
2174         DLB_LOG_DBG("dlb: sched_domain_start completed OK\n");
2175
2176         return 0;
2177 }
2178
2179 static inline int
2180 dlb_check_enqueue_sw_credits(struct dlb_eventdev *dlb,
2181                              struct dlb_eventdev_port *ev_port)
2182 {
2183         uint32_t sw_inflights = __atomic_load_n(&dlb->inflights,
2184                                                 __ATOMIC_SEQ_CST);
2185         const int num = 1;
2186
2187         if (unlikely(ev_port->inflight_max < sw_inflights)) {
2188                 DLB_INC_STAT(ev_port->stats.traffic.tx_nospc_inflight_max, 1);
2189                 rte_errno = -ENOSPC;
2190                 return 1;
2191         }
2192
2193         if (ev_port->inflight_credits < num) {
2194                 /* check if event enqueue brings ev_port over max threshold */
2195                 uint32_t credit_update_quanta = ev_port->credit_update_quanta;
2196
2197                 if (sw_inflights + credit_update_quanta >
2198                     dlb->new_event_limit) {
2199                         DLB_INC_STAT(
2200                                 ev_port->stats.traffic.tx_nospc_new_event_limit,
2201                                 1);
2202                         rte_errno = -ENOSPC;
2203                         return 1;
2204                 }
2205
2206                 __atomic_fetch_add(&dlb->inflights, credit_update_quanta,
2207                                    __ATOMIC_SEQ_CST);
2208                 ev_port->inflight_credits += (credit_update_quanta);
2209
2210                 if (ev_port->inflight_credits < num) {
2211                         DLB_INC_STAT(
2212                             ev_port->stats.traffic.tx_nospc_inflight_credits,
2213                             1);
2214                         rte_errno = -ENOSPC;
2215                         return 1;
2216                 }
2217         }
2218
2219         return 0;
2220 }
2221
2222 static inline void
2223 dlb_replenish_sw_credits(struct dlb_eventdev *dlb,
2224                          struct dlb_eventdev_port *ev_port)
2225 {
2226         uint16_t quanta = ev_port->credit_update_quanta;
2227
2228         if (ev_port->inflight_credits >= quanta * 2) {
2229                 /* Replenish credits, saving one quanta for enqueues */
2230                 uint16_t val = ev_port->inflight_credits - quanta;
2231
2232                 __atomic_fetch_sub(&dlb->inflights, val, __ATOMIC_SEQ_CST);
2233                 ev_port->inflight_credits -= val;
2234         }
2235 }
2236
2237 static __rte_always_inline uint16_t
2238 dlb_read_pc(struct process_local_port_data *port_data, bool ldb)
2239 {
2240         volatile uint16_t *popcount;
2241
2242         if (ldb)
2243                 popcount = port_data->ldb_popcount;
2244         else
2245                 popcount = port_data->dir_popcount;
2246
2247         return *popcount;
2248 }
2249
2250 static inline int
2251 dlb_check_enqueue_hw_ldb_credits(struct dlb_port *qm_port,
2252                                  struct process_local_port_data *port_data)
2253 {
2254         if (unlikely(qm_port->cached_ldb_credits == 0)) {
2255                 uint16_t pc;
2256
2257                 pc = dlb_read_pc(port_data, true);
2258
2259                 qm_port->cached_ldb_credits = pc -
2260                         qm_port->ldb_pushcount_at_credit_expiry;
2261                 if (unlikely(qm_port->cached_ldb_credits == 0)) {
2262                         DLB_INC_STAT(
2263                         qm_port->ev_port->stats.traffic.tx_nospc_ldb_hw_credits,
2264                         1);
2265
2266                         DLB_LOG_DBG("ldb credits exhausted\n");
2267                         return 1;
2268                 }
2269                 qm_port->ldb_pushcount_at_credit_expiry +=
2270                         qm_port->cached_ldb_credits;
2271         }
2272
2273         return 0;
2274 }
2275
2276 static inline int
2277 dlb_check_enqueue_hw_dir_credits(struct dlb_port *qm_port,
2278                                  struct process_local_port_data *port_data)
2279 {
2280         if (unlikely(qm_port->cached_dir_credits == 0)) {
2281                 uint16_t pc;
2282
2283                 pc = dlb_read_pc(port_data, false);
2284
2285                 qm_port->cached_dir_credits = pc -
2286                         qm_port->dir_pushcount_at_credit_expiry;
2287
2288                 if (unlikely(qm_port->cached_dir_credits == 0)) {
2289                         DLB_INC_STAT(
2290                         qm_port->ev_port->stats.traffic.tx_nospc_dir_hw_credits,
2291                         1);
2292
2293                         DLB_LOG_DBG("dir credits exhausted\n");
2294                         return 1;
2295                 }
2296                 qm_port->dir_pushcount_at_credit_expiry +=
2297                         qm_port->cached_dir_credits;
2298         }
2299
2300         return 0;
2301 }
2302
2303 static inline int
2304 dlb_event_enqueue_prep(struct dlb_eventdev_port *ev_port,
2305                        struct dlb_port *qm_port,
2306                        const struct rte_event ev[],
2307                        struct process_local_port_data *port_data,
2308                        uint8_t *sched_type,
2309                        uint8_t *queue_id)
2310 {
2311         struct dlb_eventdev *dlb = ev_port->dlb;
2312         struct dlb_eventdev_queue *ev_queue;
2313         uint16_t *cached_credits = NULL;
2314         struct dlb_queue *qm_queue;
2315
2316         ev_queue = &dlb->ev_queues[ev->queue_id];
2317         qm_queue = &ev_queue->qm_queue;
2318         *queue_id = qm_queue->id;
2319
2320         /* Ignore sched_type and hardware credits on release events */
2321         if (ev->op == RTE_EVENT_OP_RELEASE)
2322                 goto op_check;
2323
2324         if (!qm_queue->is_directed) {
2325                 /* Load balanced destination queue */
2326
2327                 if (dlb_check_enqueue_hw_ldb_credits(qm_port, port_data)) {
2328                         rte_errno = -ENOSPC;
2329                         return 1;
2330                 }
2331                 cached_credits = &qm_port->cached_ldb_credits;
2332
2333                 switch (ev->sched_type) {
2334                 case RTE_SCHED_TYPE_ORDERED:
2335                         DLB_LOG_DBG("dlb: put_qe: RTE_SCHED_TYPE_ORDERED\n");
2336                         if (qm_queue->sched_type != RTE_SCHED_TYPE_ORDERED) {
2337                                 DLB_LOG_ERR("dlb: tried to send ordered event to unordered queue %d\n",
2338                                             *queue_id);
2339                                 rte_errno = -EINVAL;
2340                                 return 1;
2341                         }
2342                         *sched_type = DLB_SCHED_ORDERED;
2343                         break;
2344                 case RTE_SCHED_TYPE_ATOMIC:
2345                         DLB_LOG_DBG("dlb: put_qe: RTE_SCHED_TYPE_ATOMIC\n");
2346                         *sched_type = DLB_SCHED_ATOMIC;
2347                         break;
2348                 case RTE_SCHED_TYPE_PARALLEL:
2349                         DLB_LOG_DBG("dlb: put_qe: RTE_SCHED_TYPE_PARALLEL\n");
2350                         if (qm_queue->sched_type == RTE_SCHED_TYPE_ORDERED)
2351                                 *sched_type = DLB_SCHED_ORDERED;
2352                         else
2353                                 *sched_type = DLB_SCHED_UNORDERED;
2354                         break;
2355                 default:
2356                         DLB_LOG_ERR("Unsupported LDB sched type in put_qe\n");
2357                         DLB_INC_STAT(ev_port->stats.tx_invalid, 1);
2358                         rte_errno = -EINVAL;
2359                         return 1;
2360                 }
2361         } else {
2362                 /* Directed destination queue */
2363
2364                 if (dlb_check_enqueue_hw_dir_credits(qm_port, port_data)) {
2365                         rte_errno = -ENOSPC;
2366                         return 1;
2367                 }
2368                 cached_credits = &qm_port->cached_dir_credits;
2369
2370                 DLB_LOG_DBG("dlb: put_qe: RTE_SCHED_TYPE_DIRECTED\n");
2371
2372                 *sched_type = DLB_SCHED_DIRECTED;
2373         }
2374
2375 op_check:
2376         switch (ev->op) {
2377         case RTE_EVENT_OP_NEW:
2378                 /* Check that a sw credit is available */
2379                 if (dlb_check_enqueue_sw_credits(dlb, ev_port)) {
2380                         rte_errno = -ENOSPC;
2381                         return 1;
2382                 }
2383                 ev_port->inflight_credits--;
2384                 (*cached_credits)--;
2385                 break;
2386         case RTE_EVENT_OP_FORWARD:
2387                 /* Check for outstanding_releases underflow. If this occurs,
2388                  * the application is not using the EVENT_OPs correctly; for
2389                  * example, forwarding or releasing events that were not
2390                  * dequeued.
2391                  */
2392                 RTE_ASSERT(ev_port->outstanding_releases > 0);
2393                 ev_port->outstanding_releases--;
2394                 qm_port->issued_releases++;
2395                 (*cached_credits)--;
2396                 break;
2397         case RTE_EVENT_OP_RELEASE:
2398                 ev_port->inflight_credits++;
2399                 /* Check for outstanding_releases underflow. If this occurs,
2400                  * the application is not using the EVENT_OPs correctly; for
2401                  * example, forwarding or releasing events that were not
2402                  * dequeued.
2403                  */
2404                 RTE_ASSERT(ev_port->outstanding_releases > 0);
2405                 ev_port->outstanding_releases--;
2406                 qm_port->issued_releases++;
2407                 /* Replenish s/w credits if enough are cached */
2408                 dlb_replenish_sw_credits(dlb, ev_port);
2409                 break;
2410         }
2411
2412         DLB_INC_STAT(ev_port->stats.tx_op_cnt[ev->op], 1);
2413         DLB_INC_STAT(ev_port->stats.traffic.tx_ok, 1);
2414
2415 #ifndef RTE_LIBRTE_PMD_DLB_QUELL_STATS
2416         if (ev->op != RTE_EVENT_OP_RELEASE) {
2417                 DLB_INC_STAT(ev_port->stats.enq_ok[ev->queue_id], 1);
2418                 DLB_INC_STAT(ev_port->stats.tx_sched_cnt[*sched_type], 1);
2419         }
2420 #endif
2421
2422         return 0;
2423 }
2424
2425 static uint8_t cmd_byte_map[NUM_DLB_PORT_TYPES][DLB_NUM_HW_SCHED_TYPES] = {
2426         {
2427                 /* Load-balanced cmd bytes */
2428                 [RTE_EVENT_OP_NEW] = DLB_NEW_CMD_BYTE,
2429                 [RTE_EVENT_OP_FORWARD] = DLB_FWD_CMD_BYTE,
2430                 [RTE_EVENT_OP_RELEASE] = DLB_COMP_CMD_BYTE,
2431         },
2432         {
2433                 /* Directed cmd bytes */
2434                 [RTE_EVENT_OP_NEW] = DLB_NEW_CMD_BYTE,
2435                 [RTE_EVENT_OP_FORWARD] = DLB_NEW_CMD_BYTE,
2436                 [RTE_EVENT_OP_RELEASE] = DLB_NOOP_CMD_BYTE,
2437         },
2438 };
2439
2440 static inline void
2441 dlb_event_build_hcws(struct dlb_port *qm_port,
2442                      const struct rte_event ev[],
2443                      int num,
2444                      uint8_t *sched_type,
2445                      uint8_t *queue_id)
2446 {
2447         struct dlb_enqueue_qe *qe;
2448         uint16_t sched_word[4];
2449         __m128i sse_qe[2];
2450         int i;
2451
2452         qe = qm_port->qe4;
2453
2454         sse_qe[0] = _mm_setzero_si128();
2455         sse_qe[1] = _mm_setzero_si128();
2456
2457         switch (num) {
2458         case 4:
2459                 /* Construct the metadata portion of two HCWs in one 128b SSE
2460                  * register. HCW metadata is constructed in the SSE registers
2461                  * like so:
2462                  * sse_qe[0][63:0]:   qe[0]'s metadata
2463                  * sse_qe[0][127:64]: qe[1]'s metadata
2464                  * sse_qe[1][63:0]:   qe[2]'s metadata
2465                  * sse_qe[1][127:64]: qe[3]'s metadata
2466                  */
2467
2468                 /* Convert the event operation into a command byte and store it
2469                  * in the metadata:
2470                  * sse_qe[0][63:56]   = cmd_byte_map[is_directed][ev[0].op]
2471                  * sse_qe[0][127:120] = cmd_byte_map[is_directed][ev[1].op]
2472                  * sse_qe[1][63:56]   = cmd_byte_map[is_directed][ev[2].op]
2473                  * sse_qe[1][127:120] = cmd_byte_map[is_directed][ev[3].op]
2474                  */
2475 #define DLB_QE_CMD_BYTE 7
2476                 sse_qe[0] = _mm_insert_epi8(sse_qe[0],
2477                                 cmd_byte_map[qm_port->is_directed][ev[0].op],
2478                                 DLB_QE_CMD_BYTE);
2479                 sse_qe[0] = _mm_insert_epi8(sse_qe[0],
2480                                 cmd_byte_map[qm_port->is_directed][ev[1].op],
2481                                 DLB_QE_CMD_BYTE + 8);
2482                 sse_qe[1] = _mm_insert_epi8(sse_qe[1],
2483                                 cmd_byte_map[qm_port->is_directed][ev[2].op],
2484                                 DLB_QE_CMD_BYTE);
2485                 sse_qe[1] = _mm_insert_epi8(sse_qe[1],
2486                                 cmd_byte_map[qm_port->is_directed][ev[3].op],
2487                                 DLB_QE_CMD_BYTE + 8);
2488
2489                 /* Store priority, scheduling type, and queue ID in the sched
2490                  * word array because these values are re-used when the
2491                  * destination is a directed queue.
2492                  */
2493                 sched_word[0] = EV_TO_DLB_PRIO(ev[0].priority) << 10 |
2494                                 sched_type[0] << 8 |
2495                                 queue_id[0];
2496                 sched_word[1] = EV_TO_DLB_PRIO(ev[1].priority) << 10 |
2497                                 sched_type[1] << 8 |
2498                                 queue_id[1];
2499                 sched_word[2] = EV_TO_DLB_PRIO(ev[2].priority) << 10 |
2500                                 sched_type[2] << 8 |
2501                                 queue_id[2];
2502                 sched_word[3] = EV_TO_DLB_PRIO(ev[3].priority) << 10 |
2503                                 sched_type[3] << 8 |
2504                                 queue_id[3];
2505
2506                 /* Store the event priority, scheduling type, and queue ID in
2507                  * the metadata:
2508                  * sse_qe[0][31:16] = sched_word[0]
2509                  * sse_qe[0][95:80] = sched_word[1]
2510                  * sse_qe[1][31:16] = sched_word[2]
2511                  * sse_qe[1][95:80] = sched_word[3]
2512                  */
2513 #define DLB_QE_QID_SCHED_WORD 1
2514                 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2515                                              sched_word[0],
2516                                              DLB_QE_QID_SCHED_WORD);
2517                 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2518                                              sched_word[1],
2519                                              DLB_QE_QID_SCHED_WORD + 4);
2520                 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2521                                              sched_word[2],
2522                                              DLB_QE_QID_SCHED_WORD);
2523                 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2524                                              sched_word[3],
2525                                              DLB_QE_QID_SCHED_WORD + 4);
2526
2527                 /* If the destination is a load-balanced queue, store the lock
2528                  * ID. If it is a directed queue, DLB places this field in
2529                  * bytes 10-11 of the received QE, so we format it accordingly:
2530                  * sse_qe[0][47:32]  = dir queue ? sched_word[0] : flow_id[0]
2531                  * sse_qe[0][111:96] = dir queue ? sched_word[1] : flow_id[1]
2532                  * sse_qe[1][47:32]  = dir queue ? sched_word[2] : flow_id[2]
2533                  * sse_qe[1][111:96] = dir queue ? sched_word[3] : flow_id[3]
2534                  */
2535 #define DLB_QE_LOCK_ID_WORD 2
2536                 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2537                                 (sched_type[0] == DLB_SCHED_DIRECTED) ?
2538                                         sched_word[0] : ev[0].flow_id,
2539                                 DLB_QE_LOCK_ID_WORD);
2540                 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2541                                 (sched_type[1] == DLB_SCHED_DIRECTED) ?
2542                                         sched_word[1] : ev[1].flow_id,
2543                                 DLB_QE_LOCK_ID_WORD + 4);
2544                 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2545                                 (sched_type[2] == DLB_SCHED_DIRECTED) ?
2546                                         sched_word[2] : ev[2].flow_id,
2547                                 DLB_QE_LOCK_ID_WORD);
2548                 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2549                                 (sched_type[3] == DLB_SCHED_DIRECTED) ?
2550                                         sched_word[3] : ev[3].flow_id,
2551                                 DLB_QE_LOCK_ID_WORD + 4);
2552
2553                 /* Store the event type and sub event type in the metadata:
2554                  * sse_qe[0][15:0]  = flow_id[0]
2555                  * sse_qe[0][79:64] = flow_id[1]
2556                  * sse_qe[1][15:0]  = flow_id[2]
2557                  * sse_qe[1][79:64] = flow_id[3]
2558                  */
2559 #define DLB_QE_EV_TYPE_WORD 0
2560                 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2561                                              ev[0].sub_event_type << 8 |
2562                                                 ev[0].event_type,
2563                                              DLB_QE_EV_TYPE_WORD);
2564                 sse_qe[0] = _mm_insert_epi16(sse_qe[0],
2565                                              ev[1].sub_event_type << 8 |
2566                                                 ev[1].event_type,
2567                                              DLB_QE_EV_TYPE_WORD + 4);
2568                 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2569                                              ev[2].sub_event_type << 8 |
2570                                                 ev[2].event_type,
2571                                              DLB_QE_EV_TYPE_WORD);
2572                 sse_qe[1] = _mm_insert_epi16(sse_qe[1],
2573                                              ev[3].sub_event_type << 8 |
2574                                                 ev[3].event_type,
2575                                              DLB_QE_EV_TYPE_WORD + 4);
2576
2577                 /* Store the metadata to memory (use the double-precision
2578                  * _mm_storeh_pd because there is no integer function for
2579                  * storing the upper 64b):
2580                  * qe[0] metadata = sse_qe[0][63:0]
2581                  * qe[1] metadata = sse_qe[0][127:64]
2582                  * qe[2] metadata = sse_qe[1][63:0]
2583                  * qe[3] metadata = sse_qe[1][127:64]
2584                  */
2585                 _mm_storel_epi64((__m128i *)&qe[0].u.opaque_data, sse_qe[0]);
2586                 _mm_storeh_pd((double *)&qe[1].u.opaque_data,
2587                               (__m128d) sse_qe[0]);
2588                 _mm_storel_epi64((__m128i *)&qe[2].u.opaque_data, sse_qe[1]);
2589                 _mm_storeh_pd((double *)&qe[3].u.opaque_data,
2590                               (__m128d) sse_qe[1]);
2591
2592                 qe[0].data = ev[0].u64;
2593                 qe[1].data = ev[1].u64;
2594                 qe[2].data = ev[2].u64;
2595                 qe[3].data = ev[3].u64;
2596
2597                 break;
2598         case 3:
2599         case 2:
2600         case 1:
2601                 for (i = 0; i < num; i++) {
2602                         qe[i].cmd_byte =
2603                                 cmd_byte_map[qm_port->is_directed][ev[i].op];
2604                         qe[i].sched_type = sched_type[i];
2605                         qe[i].data = ev[i].u64;
2606                         qe[i].qid = queue_id[i];
2607                         qe[i].priority = EV_TO_DLB_PRIO(ev[i].priority);
2608                         qe[i].lock_id = ev[i].flow_id;
2609                         if (sched_type[i] == DLB_SCHED_DIRECTED) {
2610                                 struct dlb_msg_info *info =
2611                                         (struct dlb_msg_info *)&qe[i].lock_id;
2612
2613                                 info->qid = queue_id[i];
2614                                 info->sched_type = DLB_SCHED_DIRECTED;
2615                                 info->priority = qe[i].priority;
2616                         }
2617                         qe[i].u.event_type.major = ev[i].event_type;
2618                         qe[i].u.event_type.sub = ev[i].sub_event_type;
2619                 }
2620                 break;
2621         case 0:
2622                 break;
2623         }
2624 }
2625
2626 static inline void
2627 dlb_construct_token_pop_qe(struct dlb_port *qm_port, int idx)
2628 {
2629         struct dlb_cq_pop_qe *qe = (void *)qm_port->qe4;
2630         int num = qm_port->owed_tokens;
2631
2632         if (qm_port->use_rsvd_token_scheme) {
2633                 /* Check if there's a deficit of reserved tokens, and return
2634                  * early if there are no (unreserved) tokens to consume.
2635                  */
2636                 if (num <= qm_port->cq_rsvd_token_deficit) {
2637                         qm_port->cq_rsvd_token_deficit -= num;
2638                         qm_port->owed_tokens = 0;
2639                         return;
2640                 }
2641                 num -= qm_port->cq_rsvd_token_deficit;
2642                 qm_port->cq_rsvd_token_deficit = 0;
2643         }
2644
2645         qe[idx].cmd_byte = DLB_POP_CMD_BYTE;
2646         qe[idx].tokens = num - 1;
2647         qm_port->owed_tokens = 0;
2648 }
2649
2650 static __rte_always_inline void
2651 dlb_pp_write(struct dlb_enqueue_qe *qe4,
2652              struct process_local_port_data *port_data)
2653 {
2654         dlb_movdir64b(port_data->pp_addr, qe4);
2655 }
2656
2657 static inline void
2658 dlb_hw_do_enqueue(struct dlb_port *qm_port,
2659                   bool do_sfence,
2660                   struct process_local_port_data *port_data)
2661 {
2662         DLB_LOG_DBG("dlb: Flushing QE(s) to DLB\n");
2663
2664         /* Since MOVDIR64B is weakly-ordered, use an SFENCE to ensure that
2665          * application writes complete before enqueueing the release HCW.
2666          */
2667         if (do_sfence)
2668                 rte_wmb();
2669
2670         dlb_pp_write(qm_port->qe4, port_data);
2671 }
2672
2673 static inline int
2674 dlb_consume_qe_immediate(struct dlb_port *qm_port, int num)
2675 {
2676         struct process_local_port_data *port_data;
2677         struct dlb_cq_pop_qe *qe;
2678
2679         RTE_ASSERT(qm_port->config_state == DLB_CONFIGURED);
2680
2681         if (qm_port->use_rsvd_token_scheme) {
2682                 /* Check if there's a deficit of reserved tokens, and return
2683                  * early if there are no (unreserved) tokens to consume.
2684                  */
2685                 if (num <= qm_port->cq_rsvd_token_deficit) {
2686                         qm_port->cq_rsvd_token_deficit -= num;
2687                         qm_port->owed_tokens = 0;
2688                         return 0;
2689                 }
2690                 num -= qm_port->cq_rsvd_token_deficit;
2691                 qm_port->cq_rsvd_token_deficit = 0;
2692         }
2693
2694         qe = qm_port->consume_qe;
2695
2696         qe->tokens = num - 1;
2697         qe->int_arm = 0;
2698
2699         /* No store fence needed since no pointer is being sent, and CQ token
2700          * pops can be safely reordered with other HCWs.
2701          */
2702         port_data = &dlb_port[qm_port->id][PORT_TYPE(qm_port)];
2703
2704         dlb_movntdq_single(port_data->pp_addr, qe);
2705
2706         DLB_LOG_DBG("dlb: consume immediate - %d QEs\n", num);
2707
2708         qm_port->owed_tokens = 0;
2709
2710         return 0;
2711 }
2712
2713 static inline uint16_t
2714 __dlb_event_enqueue_burst(void *event_port,
2715                           const struct rte_event events[],
2716                           uint16_t num,
2717                           bool use_delayed)
2718 {
2719         struct dlb_eventdev_port *ev_port = event_port;
2720         struct dlb_port *qm_port = &ev_port->qm_port;
2721         struct process_local_port_data *port_data;
2722         int i;
2723
2724         RTE_ASSERT(ev_port->enq_configured);
2725         RTE_ASSERT(events != NULL);
2726
2727         rte_errno = 0;
2728         i = 0;
2729
2730         port_data = &dlb_port[qm_port->id][PORT_TYPE(qm_port)];
2731
2732         while (i < num) {
2733                 uint8_t sched_types[DLB_NUM_QES_PER_CACHE_LINE];
2734                 uint8_t queue_ids[DLB_NUM_QES_PER_CACHE_LINE];
2735                 int pop_offs = 0;
2736                 int j = 0;
2737
2738                 memset(qm_port->qe4,
2739                        0,
2740                        DLB_NUM_QES_PER_CACHE_LINE *
2741                        sizeof(struct dlb_enqueue_qe));
2742
2743                 for (; j < DLB_NUM_QES_PER_CACHE_LINE && (i + j) < num; j++) {
2744                         const struct rte_event *ev = &events[i + j];
2745                         int16_t thresh = qm_port->token_pop_thresh;
2746
2747                         if (use_delayed &&
2748                             qm_port->token_pop_mode == DELAYED_POP &&
2749                             (ev->op == RTE_EVENT_OP_FORWARD ||
2750                              ev->op == RTE_EVENT_OP_RELEASE) &&
2751                             qm_port->issued_releases >= thresh - 1) {
2752                                 /* Insert the token pop QE and break out. This
2753                                  * may result in a partial HCW, but that is
2754                                  * simpler than supporting arbitrary QE
2755                                  * insertion.
2756                                  */
2757                                 dlb_construct_token_pop_qe(qm_port, j);
2758
2759                                 /* Reset the releases for the next QE batch */
2760                                 qm_port->issued_releases -= thresh;
2761
2762                                 /* When using delayed token pop mode, the
2763                                  * initial token threshold is the full CQ
2764                                  * depth. After the first token pop, we need to
2765                                  * reset it to the dequeue_depth.
2766                                  */
2767                                 qm_port->token_pop_thresh =
2768                                         qm_port->dequeue_depth;
2769
2770                                 pop_offs = 1;
2771                                 j++;
2772                                 break;
2773                         }
2774
2775                         if (dlb_event_enqueue_prep(ev_port, qm_port, ev,
2776                                                    port_data, &sched_types[j],
2777                                                    &queue_ids[j]))
2778                                 break;
2779                 }
2780
2781                 if (j == 0)
2782                         break;
2783
2784                 dlb_event_build_hcws(qm_port, &events[i], j - pop_offs,
2785                                      sched_types, queue_ids);
2786
2787                 dlb_hw_do_enqueue(qm_port, i == 0, port_data);
2788
2789                 /* Don't include the token pop QE in the enqueue count */
2790                 i += j - pop_offs;
2791
2792                 /* Don't interpret j < DLB_NUM_... as out-of-credits if
2793                  * pop_offs != 0
2794                  */
2795                 if (j < DLB_NUM_QES_PER_CACHE_LINE && pop_offs == 0)
2796                         break;
2797         }
2798
2799         RTE_ASSERT(!((i == 0 && rte_errno != -ENOSPC)));
2800
2801         return i;
2802 }
2803
2804 static inline uint16_t
2805 dlb_event_enqueue_burst(void *event_port,
2806                         const struct rte_event events[],
2807                         uint16_t num)
2808 {
2809         return __dlb_event_enqueue_burst(event_port, events, num, false);
2810 }
2811
2812 static inline uint16_t
2813 dlb_event_enqueue_burst_delayed(void *event_port,
2814                                 const struct rte_event events[],
2815                                 uint16_t num)
2816 {
2817         return __dlb_event_enqueue_burst(event_port, events, num, true);
2818 }
2819
2820 static inline uint16_t
2821 dlb_event_enqueue(void *event_port,
2822                   const struct rte_event events[])
2823 {
2824         return __dlb_event_enqueue_burst(event_port, events, 1, false);
2825 }
2826
2827 static inline uint16_t
2828 dlb_event_enqueue_delayed(void *event_port,
2829                           const struct rte_event events[])
2830 {
2831         return __dlb_event_enqueue_burst(event_port, events, 1, true);
2832 }
2833
2834 static uint16_t
2835 dlb_event_enqueue_new_burst(void *event_port,
2836                             const struct rte_event events[],
2837                             uint16_t num)
2838 {
2839         return __dlb_event_enqueue_burst(event_port, events, num, false);
2840 }
2841
2842 static uint16_t
2843 dlb_event_enqueue_new_burst_delayed(void *event_port,
2844                                     const struct rte_event events[],
2845                                     uint16_t num)
2846 {
2847         return __dlb_event_enqueue_burst(event_port, events, num, true);
2848 }
2849
2850 static uint16_t
2851 dlb_event_enqueue_forward_burst(void *event_port,
2852                                 const struct rte_event events[],
2853                                 uint16_t num)
2854 {
2855         return __dlb_event_enqueue_burst(event_port, events, num, false);
2856 }
2857
2858 static uint16_t
2859 dlb_event_enqueue_forward_burst_delayed(void *event_port,
2860                                         const struct rte_event events[],
2861                                         uint16_t num)
2862 {
2863         return __dlb_event_enqueue_burst(event_port, events, num, true);
2864 }
2865
2866 static __rte_always_inline int
2867 dlb_recv_qe(struct dlb_port *qm_port, struct dlb_dequeue_qe *qe,
2868             uint8_t *offset)
2869 {
2870         uint8_t xor_mask[2][4] = { {0x0F, 0x0E, 0x0C, 0x08},
2871                                    {0x00, 0x01, 0x03, 0x07} };
2872         uint8_t and_mask[4] = {0x0F, 0x0E, 0x0C, 0x08};
2873         volatile struct dlb_dequeue_qe *cq_addr;
2874         __m128i *qes = (__m128i *)qe;
2875         uint64_t *cache_line_base;
2876         uint8_t gen_bits;
2877
2878         cq_addr = dlb_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
2879         cq_addr = &cq_addr[qm_port->cq_idx];
2880
2881         cache_line_base = (void *)(((uintptr_t)cq_addr) & ~0x3F);
2882         *offset = ((uintptr_t)cq_addr & 0x30) >> 4;
2883
2884         /* Load the next CQ cache line from memory. Pack these reads as tight
2885          * as possible to reduce the chance that DLB invalidates the line while
2886          * the CPU is reading it. Read the cache line backwards to ensure that
2887          * if QE[N] (N > 0) is valid, then QEs[0:N-1] are too.
2888          *
2889          * (Valid QEs start at &qe[offset])
2890          */
2891         qes[3] = _mm_load_si128((__m128i *)&cache_line_base[6]);
2892         qes[2] = _mm_load_si128((__m128i *)&cache_line_base[4]);
2893         qes[1] = _mm_load_si128((__m128i *)&cache_line_base[2]);
2894         qes[0] = _mm_load_si128((__m128i *)&cache_line_base[0]);
2895
2896         /* Evict the cache line ASAP */
2897         rte_cldemote(cache_line_base);
2898
2899         /* Extract and combine the gen bits */
2900         gen_bits = ((_mm_extract_epi8(qes[0], 15) & 0x1) << 0) |
2901                    ((_mm_extract_epi8(qes[1], 15) & 0x1) << 1) |
2902                    ((_mm_extract_epi8(qes[2], 15) & 0x1) << 2) |
2903                    ((_mm_extract_epi8(qes[3], 15) & 0x1) << 3);
2904
2905         /* XOR the combined bits such that a 1 represents a valid QE */
2906         gen_bits ^= xor_mask[qm_port->gen_bit][*offset];
2907
2908         /* Mask off gen bits we don't care about */
2909         gen_bits &= and_mask[*offset];
2910
2911         return __builtin_popcount(gen_bits);
2912 }
2913
2914 static inline void
2915 dlb_inc_cq_idx(struct dlb_port *qm_port, int cnt)
2916 {
2917         uint16_t idx = qm_port->cq_idx_unmasked + cnt;
2918
2919         qm_port->cq_idx_unmasked = idx;
2920         qm_port->cq_idx = idx & qm_port->cq_depth_mask;
2921         qm_port->gen_bit = (~(idx >> qm_port->gen_bit_shift)) & 0x1;
2922 }
2923
2924 static inline int
2925 dlb_process_dequeue_qes(struct dlb_eventdev_port *ev_port,
2926                         struct dlb_port *qm_port,
2927                         struct rte_event *events,
2928                         struct dlb_dequeue_qe *qes,
2929                         int cnt)
2930 {
2931         uint8_t *qid_mappings = qm_port->qid_mappings;
2932         int i, num;
2933
2934         RTE_SET_USED(ev_port);  /* avoids unused variable error */
2935
2936         for (i = 0, num = 0; i < cnt; i++) {
2937                 struct dlb_dequeue_qe *qe = &qes[i];
2938                 int sched_type_map[4] = {
2939                         [DLB_SCHED_ATOMIC] = RTE_SCHED_TYPE_ATOMIC,
2940                         [DLB_SCHED_UNORDERED] = RTE_SCHED_TYPE_PARALLEL,
2941                         [DLB_SCHED_ORDERED] = RTE_SCHED_TYPE_ORDERED,
2942                         [DLB_SCHED_DIRECTED] = RTE_SCHED_TYPE_ATOMIC,
2943                 };
2944
2945                 DLB_LOG_DBG("dequeue success, data = 0x%llx, qid=%d, event_type=%d, subevent=%d\npp_id = %d, sched_type = %d, qid = %d, err=%d\n",
2946                             (long long)qe->data, qe->qid,
2947                             qe->u.event_type.major,
2948                             qe->u.event_type.sub,
2949                             qe->pp_id, qe->sched_type, qe->qid, qe->error);
2950
2951                 /* Fill in event information.
2952                  * Note that flow_id must be embedded in the data by
2953                  * the app, such as the mbuf RSS hash field if the data
2954                  * buffer is a mbuf.
2955                  */
2956                 if (unlikely(qe->error)) {
2957                         DLB_LOG_ERR("QE error bit ON\n");
2958                         DLB_INC_STAT(ev_port->stats.traffic.rx_drop, 1);
2959                         dlb_consume_qe_immediate(qm_port, 1);
2960                         continue; /* Ignore */
2961                 }
2962
2963                 events[num].u64 = qe->data;
2964                 events[num].queue_id = qid_mappings[qe->qid];
2965                 events[num].priority = DLB_TO_EV_PRIO((uint8_t)qe->priority);
2966                 events[num].event_type = qe->u.event_type.major;
2967                 events[num].sub_event_type = qe->u.event_type.sub;
2968                 events[num].sched_type = sched_type_map[qe->sched_type];
2969                 DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qe->sched_type], 1);
2970                 num++;
2971         }
2972         DLB_INC_STAT(ev_port->stats.traffic.rx_ok, num);
2973
2974         return num;
2975 }
2976
2977 static inline int
2978 dlb_process_dequeue_four_qes(struct dlb_eventdev_port *ev_port,
2979                              struct dlb_port *qm_port,
2980                              struct rte_event *events,
2981                              struct dlb_dequeue_qe *qes)
2982 {
2983         int sched_type_map[] = {
2984                 [DLB_SCHED_ATOMIC] = RTE_SCHED_TYPE_ATOMIC,
2985                 [DLB_SCHED_UNORDERED] = RTE_SCHED_TYPE_PARALLEL,
2986                 [DLB_SCHED_ORDERED] = RTE_SCHED_TYPE_ORDERED,
2987                 [DLB_SCHED_DIRECTED] = RTE_SCHED_TYPE_ATOMIC,
2988         };
2989         const int num_events = DLB_NUM_QES_PER_CACHE_LINE;
2990         uint8_t *qid_mappings = qm_port->qid_mappings;
2991         __m128i sse_evt[2];
2992         int i;
2993
2994         /* In the unlikely case that any of the QE error bits are set, process
2995          * them one at a time.
2996          */
2997         if (unlikely(qes[0].error || qes[1].error ||
2998                      qes[2].error || qes[3].error))
2999                 return dlb_process_dequeue_qes(ev_port, qm_port, events,
3000                                                qes, num_events);
3001
3002         for (i = 0; i < DLB_NUM_QES_PER_CACHE_LINE; i++) {
3003                 DLB_LOG_DBG("dequeue success, data = 0x%llx, qid=%d, event_type=%d, subevent=%d\npp_id = %d, sched_type = %d, qid = %d, err=%d\n",
3004                             (long long)qes[i].data, qes[i].qid,
3005                             qes[i].u.event_type.major,
3006                             qes[i].u.event_type.sub,
3007                             qes[i].pp_id, qes[i].sched_type, qes[i].qid,
3008                             qes[i].error);
3009         }
3010
3011         events[0].u64 = qes[0].data;
3012         events[1].u64 = qes[1].data;
3013         events[2].u64 = qes[2].data;
3014         events[3].u64 = qes[3].data;
3015
3016         /* Construct the metadata portion of two struct rte_events
3017          * in one 128b SSE register. Event metadata is constructed in the SSE
3018          * registers like so:
3019          * sse_evt[0][63:0]:   event[0]'s metadata
3020          * sse_evt[0][127:64]: event[1]'s metadata
3021          * sse_evt[1][63:0]:   event[2]'s metadata
3022          * sse_evt[1][127:64]: event[3]'s metadata
3023          */
3024         sse_evt[0] = _mm_setzero_si128();
3025         sse_evt[1] = _mm_setzero_si128();
3026
3027         /* Convert the hardware queue ID to an event queue ID and store it in
3028          * the metadata:
3029          * sse_evt[0][47:40]   = qid_mappings[qes[0].qid]
3030          * sse_evt[0][111:104] = qid_mappings[qes[1].qid]
3031          * sse_evt[1][47:40]   = qid_mappings[qes[2].qid]
3032          * sse_evt[1][111:104] = qid_mappings[qes[3].qid]
3033          */
3034 #define DLB_EVENT_QUEUE_ID_BYTE 5
3035         sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3036                                      qid_mappings[qes[0].qid],
3037                                      DLB_EVENT_QUEUE_ID_BYTE);
3038         sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3039                                      qid_mappings[qes[1].qid],
3040                                      DLB_EVENT_QUEUE_ID_BYTE + 8);
3041         sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3042                                      qid_mappings[qes[2].qid],
3043                                      DLB_EVENT_QUEUE_ID_BYTE);
3044         sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3045                                      qid_mappings[qes[3].qid],
3046                                      DLB_EVENT_QUEUE_ID_BYTE + 8);
3047
3048         /* Convert the hardware priority to an event priority and store it in
3049          * the metadata:
3050          * sse_evt[0][55:48]   = DLB_TO_EV_PRIO(qes[0].priority)
3051          * sse_evt[0][119:112] = DLB_TO_EV_PRIO(qes[1].priority)
3052          * sse_evt[1][55:48]   = DLB_TO_EV_PRIO(qes[2].priority)
3053          * sse_evt[1][119:112] = DLB_TO_EV_PRIO(qes[3].priority)
3054          */
3055 #define DLB_EVENT_PRIO_BYTE 6
3056         sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3057                                      DLB_TO_EV_PRIO((uint8_t)qes[0].priority),
3058                                      DLB_EVENT_PRIO_BYTE);
3059         sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3060                                      DLB_TO_EV_PRIO((uint8_t)qes[1].priority),
3061                                      DLB_EVENT_PRIO_BYTE + 8);
3062         sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3063                                      DLB_TO_EV_PRIO((uint8_t)qes[2].priority),
3064                                      DLB_EVENT_PRIO_BYTE);
3065         sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3066                                      DLB_TO_EV_PRIO((uint8_t)qes[3].priority),
3067                                      DLB_EVENT_PRIO_BYTE + 8);
3068
3069         /* Write the event type and sub event type to the event metadata. Leave
3070          * flow ID unspecified, since the hardware does not maintain it during
3071          * scheduling:
3072          * sse_evt[0][31:0]   = qes[0].u.event_type.major << 28 |
3073          *                      qes[0].u.event_type.sub << 20;
3074          * sse_evt[0][95:64]  = qes[1].u.event_type.major << 28 |
3075          *                      qes[1].u.event_type.sub << 20;
3076          * sse_evt[1][31:0]   = qes[2].u.event_type.major << 28 |
3077          *                      qes[2].u.event_type.sub << 20;
3078          * sse_evt[1][95:64]  = qes[3].u.event_type.major << 28 |
3079          *                      qes[3].u.event_type.sub << 20;
3080          */
3081 #define DLB_EVENT_EV_TYPE_DW 0
3082 #define DLB_EVENT_EV_TYPE_SHIFT 28
3083 #define DLB_EVENT_SUB_EV_TYPE_SHIFT 20
3084         sse_evt[0] = _mm_insert_epi32(sse_evt[0],
3085                         qes[0].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT |
3086                         qes[0].u.event_type.sub << DLB_EVENT_SUB_EV_TYPE_SHIFT,
3087                         DLB_EVENT_EV_TYPE_DW);
3088         sse_evt[0] = _mm_insert_epi32(sse_evt[0],
3089                         qes[1].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT |
3090                         qes[1].u.event_type.sub <<  DLB_EVENT_SUB_EV_TYPE_SHIFT,
3091                         DLB_EVENT_EV_TYPE_DW + 2);
3092         sse_evt[1] = _mm_insert_epi32(sse_evt[1],
3093                         qes[2].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT |
3094                         qes[2].u.event_type.sub <<  DLB_EVENT_SUB_EV_TYPE_SHIFT,
3095                         DLB_EVENT_EV_TYPE_DW);
3096         sse_evt[1] = _mm_insert_epi32(sse_evt[1],
3097                         qes[3].u.event_type.major << DLB_EVENT_EV_TYPE_SHIFT  |
3098                         qes[3].u.event_type.sub << DLB_EVENT_SUB_EV_TYPE_SHIFT,
3099                         DLB_EVENT_EV_TYPE_DW + 2);
3100
3101         /* Write the sched type to the event metadata. 'op' and 'rsvd' are not
3102          * set:
3103          * sse_evt[0][39:32]  = sched_type_map[qes[0].sched_type] << 6
3104          * sse_evt[0][103:96] = sched_type_map[qes[1].sched_type] << 6
3105          * sse_evt[1][39:32]  = sched_type_map[qes[2].sched_type] << 6
3106          * sse_evt[1][103:96] = sched_type_map[qes[3].sched_type] << 6
3107          */
3108 #define DLB_EVENT_SCHED_TYPE_BYTE 4
3109 #define DLB_EVENT_SCHED_TYPE_SHIFT 6
3110         sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3111                 sched_type_map[qes[0].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
3112                 DLB_EVENT_SCHED_TYPE_BYTE);
3113         sse_evt[0] = _mm_insert_epi8(sse_evt[0],
3114                 sched_type_map[qes[1].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
3115                 DLB_EVENT_SCHED_TYPE_BYTE + 8);
3116         sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3117                 sched_type_map[qes[2].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
3118                 DLB_EVENT_SCHED_TYPE_BYTE);
3119         sse_evt[1] = _mm_insert_epi8(sse_evt[1],
3120                 sched_type_map[qes[3].sched_type] << DLB_EVENT_SCHED_TYPE_SHIFT,
3121                 DLB_EVENT_SCHED_TYPE_BYTE + 8);
3122
3123         /* Store the metadata to the event (use the double-precision
3124          * _mm_storeh_pd because there is no integer function for storing the
3125          * upper 64b):
3126          * events[0].event = sse_evt[0][63:0]
3127          * events[1].event = sse_evt[0][127:64]
3128          * events[2].event = sse_evt[1][63:0]
3129          * events[3].event = sse_evt[1][127:64]
3130          */
3131         _mm_storel_epi64((__m128i *)&events[0].event, sse_evt[0]);
3132         _mm_storeh_pd((double *)&events[1].event, (__m128d) sse_evt[0]);
3133         _mm_storel_epi64((__m128i *)&events[2].event, sse_evt[1]);
3134         _mm_storeh_pd((double *)&events[3].event, (__m128d) sse_evt[1]);
3135
3136         DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qes[0].sched_type], 1);
3137         DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qes[1].sched_type], 1);
3138         DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qes[2].sched_type], 1);
3139         DLB_INC_STAT(ev_port->stats.rx_sched_cnt[qes[3].sched_type], 1);
3140
3141         DLB_INC_STAT(ev_port->stats.traffic.rx_ok, num_events);
3142
3143         return num_events;
3144 }
3145
3146 static inline int
3147 dlb_dequeue_wait(struct dlb_eventdev *dlb,
3148                  struct dlb_eventdev_port *ev_port,
3149                  struct dlb_port *qm_port,
3150                  uint64_t timeout,
3151                  uint64_t start_ticks)
3152 {
3153         struct process_local_port_data *port_data;
3154         uint64_t elapsed_ticks;
3155
3156         port_data = &dlb_port[qm_port->id][PORT_TYPE(qm_port)];
3157
3158         elapsed_ticks = rte_get_timer_cycles() - start_ticks;
3159
3160         /* Wait/poll time expired */
3161         if (elapsed_ticks >= timeout) {
3162                 /* Interrupts not supported by PF PMD */
3163                 return 1;
3164         } else if (dlb->umwait_allowed) {
3165                 volatile struct dlb_dequeue_qe *cq_base;
3166                 union {
3167                         uint64_t raw_qe[2];
3168                         struct dlb_dequeue_qe qe;
3169                 } qe_mask;
3170                 uint64_t expected_value;
3171                 volatile uint64_t *monitor_addr;
3172
3173                 qe_mask.qe.cq_gen = 1; /* set mask */
3174
3175                 cq_base = port_data->cq_base;
3176                 monitor_addr = (volatile uint64_t *)(volatile void *)
3177                         &cq_base[qm_port->cq_idx];
3178                 monitor_addr++; /* cq_gen bit is in second 64bit location */
3179
3180                 if (qm_port->gen_bit)
3181                         expected_value = qe_mask.raw_qe[1];
3182                 else
3183                         expected_value = 0;
3184
3185                 rte_power_monitor(monitor_addr, expected_value,
3186                                   qe_mask.raw_qe[1], timeout + start_ticks,
3187                                   sizeof(uint64_t));
3188
3189                 DLB_INC_STAT(ev_port->stats.traffic.rx_umonitor_umwait, 1);
3190         } else {
3191                 uint64_t poll_interval = RTE_LIBRTE_PMD_DLB_POLL_INTERVAL;
3192                 uint64_t curr_ticks = rte_get_timer_cycles();
3193                 uint64_t init_ticks = curr_ticks;
3194
3195                 while ((curr_ticks - start_ticks < timeout) &&
3196                        (curr_ticks - init_ticks < poll_interval))
3197                         curr_ticks = rte_get_timer_cycles();
3198         }
3199
3200         return 0;
3201 }
3202
3203 static inline int16_t
3204 dlb_hw_dequeue(struct dlb_eventdev *dlb,
3205                struct dlb_eventdev_port *ev_port,
3206                struct rte_event *events,
3207                uint16_t max_num,
3208                uint64_t dequeue_timeout_ticks)
3209 {
3210         uint64_t timeout;
3211         uint64_t start_ticks = 0ULL;
3212         struct dlb_port *qm_port;
3213         int num = 0;
3214
3215         qm_port = &ev_port->qm_port;
3216
3217         /* If configured for per dequeue wait, then use wait value provided
3218          * to this API. Otherwise we must use the global
3219          * value from eventdev config time.
3220          */
3221         if (!dlb->global_dequeue_wait)
3222                 timeout = dequeue_timeout_ticks;
3223         else
3224                 timeout = dlb->global_dequeue_wait_ticks;
3225
3226         if (timeout)
3227                 start_ticks = rte_get_timer_cycles();
3228
3229         while (num < max_num) {
3230                 struct dlb_dequeue_qe qes[DLB_NUM_QES_PER_CACHE_LINE];
3231                 uint8_t offset;
3232                 int num_avail;
3233
3234                 /* Copy up to 4 QEs from the current cache line into qes */
3235                 num_avail = dlb_recv_qe(qm_port, qes, &offset);
3236
3237                 /* But don't process more than the user requested */
3238                 num_avail = RTE_MIN(num_avail, max_num - num);
3239
3240                 dlb_inc_cq_idx(qm_port, num_avail);
3241
3242                 if (num_avail == DLB_NUM_QES_PER_CACHE_LINE)
3243                         num += dlb_process_dequeue_four_qes(ev_port,
3244                                                              qm_port,
3245                                                              &events[num],
3246                                                              &qes[offset]);
3247                 else if (num_avail)
3248                         num += dlb_process_dequeue_qes(ev_port,
3249                                                         qm_port,
3250                                                         &events[num],
3251                                                         &qes[offset],
3252                                                         num_avail);
3253                 else if ((timeout == 0) || (num > 0))
3254                         /* Not waiting in any form, or 1+ events received? */
3255                         break;
3256                 else if (dlb_dequeue_wait(dlb, ev_port, qm_port,
3257                                           timeout, start_ticks))
3258                         break;
3259         }
3260
3261         qm_port->owed_tokens += num;
3262
3263         if (num && qm_port->token_pop_mode == AUTO_POP)
3264                 dlb_consume_qe_immediate(qm_port, num);
3265
3266         ev_port->outstanding_releases += num;
3267
3268         return num;
3269 }
3270
3271 static __rte_always_inline int
3272 dlb_recv_qe_sparse(struct dlb_port *qm_port, struct dlb_dequeue_qe *qe)
3273 {
3274         volatile struct dlb_dequeue_qe *cq_addr;
3275         uint8_t xor_mask[2] = {0x0F, 0x00};
3276         const uint8_t and_mask = 0x0F;
3277         __m128i *qes = (__m128i *)qe;
3278         uint8_t gen_bits, gen_bit;
3279         uintptr_t addr[4];
3280         uint16_t idx;
3281
3282         cq_addr = dlb_port[qm_port->id][PORT_TYPE(qm_port)].cq_base;
3283
3284         idx = qm_port->cq_idx;
3285
3286         /* Load the next 4 QEs */
3287         addr[0] = (uintptr_t)&cq_addr[idx];
3288         addr[1] = (uintptr_t)&cq_addr[(idx +  4) & qm_port->cq_depth_mask];
3289         addr[2] = (uintptr_t)&cq_addr[(idx +  8) & qm_port->cq_depth_mask];
3290         addr[3] = (uintptr_t)&cq_addr[(idx + 12) & qm_port->cq_depth_mask];
3291
3292         /* Prefetch next batch of QEs (all CQs occupy minimum 8 cache lines) */
3293         rte_prefetch0(&cq_addr[(idx + 16) & qm_port->cq_depth_mask]);
3294         rte_prefetch0(&cq_addr[(idx + 20) & qm_port->cq_depth_mask]);
3295         rte_prefetch0(&cq_addr[(idx + 24) & qm_port->cq_depth_mask]);
3296         rte_prefetch0(&cq_addr[(idx + 28) & qm_port->cq_depth_mask]);
3297
3298         /* Correct the xor_mask for wrap-around QEs */
3299         gen_bit = qm_port->gen_bit;
3300         xor_mask[gen_bit] ^= !!((idx +  4) > qm_port->cq_depth_mask) << 1;
3301         xor_mask[gen_bit] ^= !!((idx +  8) > qm_port->cq_depth_mask) << 2;
3302         xor_mask[gen_bit] ^= !!((idx + 12) > qm_port->cq_depth_mask) << 3;
3303
3304         /* Read the cache lines backwards to ensure that if QE[N] (N > 0) is
3305          * valid, then QEs[0:N-1] are too.
3306          */
3307         qes[3] = _mm_load_si128((__m128i *)(void *)addr[3]);
3308         rte_compiler_barrier();
3309         qes[2] = _mm_load_si128((__m128i *)(void *)addr[2]);
3310         rte_compiler_barrier();
3311         qes[1] = _mm_load_si128((__m128i *)(void *)addr[1]);
3312         rte_compiler_barrier();
3313         qes[0] = _mm_load_si128((__m128i *)(void *)addr[0]);
3314
3315         /* Extract and combine the gen bits */
3316         gen_bits = ((_mm_extract_epi8(qes[0], 15) & 0x1) << 0) |
3317                    ((_mm_extract_epi8(qes[1], 15) & 0x1) << 1) |
3318                    ((_mm_extract_epi8(qes[2], 15) & 0x1) << 2) |
3319                    ((_mm_extract_epi8(qes[3], 15) & 0x1) << 3);
3320
3321         /* XOR the combined bits such that a 1 represents a valid QE */
3322         gen_bits ^= xor_mask[gen_bit];
3323
3324         /* Mask off gen bits we don't care about */
3325         gen_bits &= and_mask;
3326
3327         return __builtin_popcount(gen_bits);
3328 }
3329
3330 static inline int16_t
3331 dlb_hw_dequeue_sparse(struct dlb_eventdev *dlb,
3332                       struct dlb_eventdev_port *ev_port,
3333                       struct rte_event *events,
3334                       uint16_t max_num,
3335                       uint64_t dequeue_timeout_ticks)
3336 {
3337         uint64_t timeout;
3338         uint64_t start_ticks = 0ULL;
3339         struct dlb_port *qm_port;
3340         int num = 0;
3341
3342         qm_port = &ev_port->qm_port;
3343
3344         /* If configured for per dequeue wait, then use wait value provided
3345          * to this API. Otherwise we must use the global
3346          * value from eventdev config time.
3347          */
3348         if (!dlb->global_dequeue_wait)
3349                 timeout = dequeue_timeout_ticks;
3350         else
3351                 timeout = dlb->global_dequeue_wait_ticks;
3352
3353         if (timeout)
3354                 start_ticks = rte_get_timer_cycles();
3355
3356         while (num < max_num) {
3357                 struct dlb_dequeue_qe qes[DLB_NUM_QES_PER_CACHE_LINE];
3358                 int num_avail;
3359
3360                 /* Copy up to 4 QEs from the current cache line into qes */
3361                 num_avail = dlb_recv_qe_sparse(qm_port, qes);
3362
3363                 /* But don't process more than the user requested */
3364                 num_avail = RTE_MIN(num_avail, max_num - num);
3365
3366                 dlb_inc_cq_idx(qm_port, num_avail << 2);
3367
3368                 if (num_avail == DLB_NUM_QES_PER_CACHE_LINE)
3369                         num += dlb_process_dequeue_four_qes(ev_port,
3370                                                              qm_port,
3371                                                              &events[num],
3372                                                              &qes[0]);
3373                 else if (num_avail)
3374                         num += dlb_process_dequeue_qes(ev_port,
3375                                                         qm_port,
3376                                                         &events[num],
3377                                                         &qes[0],
3378                                                         num_avail);
3379                 else if ((timeout == 0) || (num > 0))
3380                         /* Not waiting in any form, or 1+ events received? */
3381                         break;
3382                 else if (dlb_dequeue_wait(dlb, ev_port, qm_port,
3383                                           timeout, start_ticks))
3384                         break;
3385         }
3386
3387         qm_port->owed_tokens += num;
3388
3389         if (num && qm_port->token_pop_mode == AUTO_POP)
3390                 dlb_consume_qe_immediate(qm_port, num);
3391
3392         ev_port->outstanding_releases += num;
3393
3394         return num;
3395 }
3396
3397 static int
3398 dlb_event_release(struct dlb_eventdev *dlb, uint8_t port_id, int n)
3399 {
3400         struct process_local_port_data *port_data;
3401         struct dlb_eventdev_port *ev_port;
3402         struct dlb_port *qm_port;
3403         int i;
3404
3405         if (port_id > dlb->num_ports) {
3406                 DLB_LOG_ERR("Invalid port id %d in dlb-event_release\n",
3407                             port_id);
3408                 rte_errno = -EINVAL;
3409                 return rte_errno;
3410         }
3411
3412         ev_port = &dlb->ev_ports[port_id];
3413         qm_port = &ev_port->qm_port;
3414         port_data = &dlb_port[qm_port->id][PORT_TYPE(qm_port)];
3415
3416         i = 0;
3417
3418         if (qm_port->is_directed) {
3419                 i = n;
3420                 goto sw_credit_update;
3421         }
3422
3423         while (i < n) {
3424                 int pop_offs = 0;
3425                 int j = 0;
3426
3427                 /* Zero-out QEs */
3428                 qm_port->qe4[0].cmd_byte = 0;
3429                 qm_port->qe4[1].cmd_byte = 0;
3430                 qm_port->qe4[2].cmd_byte = 0;
3431                 qm_port->qe4[3].cmd_byte = 0;
3432
3433                 for (; j < DLB_NUM_QES_PER_CACHE_LINE && (i + j) < n; j++) {
3434                         int16_t thresh = qm_port->token_pop_thresh;
3435
3436                         if (qm_port->token_pop_mode == DELAYED_POP &&
3437                             qm_port->issued_releases >= thresh - 1) {
3438                                 /* Insert the token pop QE */
3439                                 dlb_construct_token_pop_qe(qm_port, j);
3440
3441                                 /* Reset the releases for the next QE batch */
3442                                 qm_port->issued_releases -= thresh;
3443
3444                                 /* When using delayed token pop mode, the
3445                                  * initial token threshold is the full CQ
3446                                  * depth. After the first token pop, we need to
3447                                  * reset it to the dequeue_depth.
3448                                  */
3449                                 qm_port->token_pop_thresh =
3450                                         qm_port->dequeue_depth;
3451
3452                                 pop_offs = 1;
3453                                 j++;
3454                                 break;
3455                         }
3456
3457                         qm_port->qe4[j].cmd_byte = DLB_COMP_CMD_BYTE;
3458                         qm_port->issued_releases++;
3459                 }
3460
3461                 dlb_hw_do_enqueue(qm_port, i == 0, port_data);
3462
3463                 /* Don't include the token pop QE in the release count */
3464                 i += j - pop_offs;
3465         }
3466
3467 sw_credit_update:
3468         /* each release returns one credit */
3469         if (!ev_port->outstanding_releases) {
3470                 DLB_LOG_ERR("Unrecoverable application error. Outstanding releases underflowed.\n");
3471                 rte_errno = -ENOTRECOVERABLE;
3472                 return rte_errno;
3473         }
3474
3475         ev_port->outstanding_releases -= i;
3476         ev_port->inflight_credits += i;
3477
3478         /* Replenish s/w credits if enough releases are performed */
3479         dlb_replenish_sw_credits(dlb, ev_port);
3480         return 0;
3481 }
3482
3483 static uint16_t
3484 dlb_event_dequeue_burst(void *event_port, struct rte_event *ev, uint16_t num,
3485                         uint64_t wait)
3486 {
3487         struct dlb_eventdev_port *ev_port = event_port;
3488         struct dlb_port *qm_port = &ev_port->qm_port;
3489         struct dlb_eventdev *dlb = ev_port->dlb;
3490         uint16_t cnt;
3491         int ret;
3492
3493         rte_errno = 0;
3494
3495         RTE_ASSERT(ev_port->setup_done);
3496         RTE_ASSERT(ev != NULL);
3497
3498         if (ev_port->implicit_release && ev_port->outstanding_releases > 0) {
3499                 uint16_t out_rels = ev_port->outstanding_releases;
3500
3501                 ret = dlb_event_release(dlb, ev_port->id, out_rels);
3502                 if (ret)
3503                         return(ret);
3504
3505                 DLB_INC_STAT(ev_port->stats.tx_implicit_rel, out_rels);
3506         }
3507
3508         if (qm_port->token_pop_mode == DEFERRED_POP &&
3509                         qm_port->owed_tokens)
3510                 dlb_consume_qe_immediate(qm_port, qm_port->owed_tokens);
3511
3512         cnt = dlb_hw_dequeue(dlb, ev_port, ev, num, wait);
3513
3514         DLB_INC_STAT(ev_port->stats.traffic.total_polls, 1);
3515         DLB_INC_STAT(ev_port->stats.traffic.zero_polls, ((cnt == 0) ? 1 : 0));
3516         return cnt;
3517 }
3518
3519 static uint16_t
3520 dlb_event_dequeue(void *event_port, struct rte_event *ev, uint64_t wait)
3521 {
3522         return dlb_event_dequeue_burst(event_port, ev, 1, wait);
3523 }
3524
3525 static uint16_t
3526 dlb_event_dequeue_burst_sparse(void *event_port, struct rte_event *ev,
3527                                uint16_t num, uint64_t wait)
3528 {
3529         struct dlb_eventdev_port *ev_port = event_port;
3530         struct dlb_port *qm_port = &ev_port->qm_port;
3531         struct dlb_eventdev *dlb = ev_port->dlb;
3532         uint16_t cnt;
3533         int ret;
3534
3535         rte_errno = 0;
3536
3537         RTE_ASSERT(ev_port->setup_done);
3538         RTE_ASSERT(ev != NULL);
3539
3540         if (ev_port->implicit_release && ev_port->outstanding_releases > 0) {
3541                 uint16_t out_rels = ev_port->outstanding_releases;
3542
3543                 ret = dlb_event_release(dlb, ev_port->id, out_rels);
3544                 if (ret)
3545                         return(ret);
3546
3547                 DLB_INC_STAT(ev_port->stats.tx_implicit_rel, out_rels);
3548         }
3549
3550         if (qm_port->token_pop_mode == DEFERRED_POP &&
3551             qm_port->owed_tokens)
3552                 dlb_consume_qe_immediate(qm_port, qm_port->owed_tokens);
3553
3554         cnt = dlb_hw_dequeue_sparse(dlb, ev_port, ev, num, wait);
3555
3556         DLB_INC_STAT(ev_port->stats.traffic.total_polls, 1);
3557         DLB_INC_STAT(ev_port->stats.traffic.zero_polls, ((cnt == 0) ? 1 : 0));
3558         return cnt;
3559 }
3560
3561 static uint16_t
3562 dlb_event_dequeue_sparse(void *event_port, struct rte_event *ev, uint64_t wait)
3563 {
3564         return dlb_event_dequeue_burst_sparse(event_port, ev, 1, wait);
3565 }
3566
3567 static uint32_t
3568 dlb_get_ldb_queue_depth(struct dlb_eventdev *dlb,
3569                         struct dlb_eventdev_queue *queue)
3570 {
3571         struct dlb_hw_dev *handle = &dlb->qm_instance;
3572         struct dlb_get_ldb_queue_depth_args cfg;
3573         struct dlb_cmd_response response;
3574         int ret;
3575
3576         cfg.queue_id = queue->qm_queue.id;
3577         cfg.response = (uintptr_t)&response;
3578
3579         ret = dlb_iface_get_ldb_queue_depth(handle, &cfg);
3580         if (ret < 0) {
3581                 DLB_LOG_ERR("dlb: get_ldb_queue_depth ret=%d (driver status: %s)\n",
3582                             ret, dlb_error_strings[response.status]);
3583                 return ret;
3584         }
3585
3586         return response.id;
3587 }
3588
3589 static uint32_t
3590 dlb_get_dir_queue_depth(struct dlb_eventdev *dlb,
3591                         struct dlb_eventdev_queue *queue)
3592 {
3593         struct dlb_hw_dev *handle = &dlb->qm_instance;
3594         struct dlb_get_dir_queue_depth_args cfg;
3595         struct dlb_cmd_response response;
3596         int ret;
3597
3598         cfg.queue_id = queue->qm_queue.id;
3599         cfg.response = (uintptr_t)&response;
3600
3601         ret = dlb_iface_get_dir_queue_depth(handle, &cfg);
3602         if (ret < 0) {
3603                 DLB_LOG_ERR("dlb: get_dir_queue_depth ret=%d (driver status: %s)\n",
3604                             ret, dlb_error_strings[response.status]);
3605                 return ret;
3606         }
3607
3608         return response.id;
3609 }
3610
3611 uint32_t
3612 dlb_get_queue_depth(struct dlb_eventdev *dlb,
3613                     struct dlb_eventdev_queue *queue)
3614 {
3615         if (queue->qm_queue.is_directed)
3616                 return dlb_get_dir_queue_depth(dlb, queue);
3617         else
3618                 return dlb_get_ldb_queue_depth(dlb, queue);
3619 }
3620
3621 static bool
3622 dlb_queue_is_empty(struct dlb_eventdev *dlb,
3623                    struct dlb_eventdev_queue *queue)
3624 {
3625         return dlb_get_queue_depth(dlb, queue) == 0;
3626 }
3627
3628 static bool
3629 dlb_linked_queues_empty(struct dlb_eventdev *dlb)
3630 {
3631         int i;
3632
3633         for (i = 0; i < dlb->num_queues; i++) {
3634                 if (dlb->ev_queues[i].num_links == 0)
3635                         continue;
3636                 if (!dlb_queue_is_empty(dlb, &dlb->ev_queues[i]))
3637                         return false;
3638         }
3639
3640         return true;
3641 }
3642
3643 static bool
3644 dlb_queues_empty(struct dlb_eventdev *dlb)
3645 {
3646         int i;
3647
3648         for (i = 0; i < dlb->num_queues; i++) {
3649                 if (!dlb_queue_is_empty(dlb, &dlb->ev_queues[i]))
3650                         return false;
3651         }
3652
3653         return true;
3654 }
3655
3656 static void
3657 dlb_flush_port(struct rte_eventdev *dev, int port_id)
3658 {
3659         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
3660         eventdev_stop_flush_t flush;
3661         struct rte_event ev;
3662         uint8_t dev_id;
3663         void *arg;
3664         int i;
3665
3666         flush = dev->dev_ops->dev_stop_flush;
3667         dev_id = dev->data->dev_id;
3668         arg = dev->data->dev_stop_flush_arg;
3669
3670         while (rte_event_dequeue_burst(dev_id, port_id, &ev, 1, 0)) {
3671                 if (flush)
3672                         flush(dev_id, ev, arg);
3673
3674                 if (dlb->ev_ports[port_id].qm_port.is_directed)
3675                         continue;
3676
3677                 ev.op = RTE_EVENT_OP_RELEASE;
3678
3679                 rte_event_enqueue_burst(dev_id, port_id, &ev, 1);
3680         }
3681
3682         /* Enqueue any additional outstanding releases */
3683         ev.op = RTE_EVENT_OP_RELEASE;
3684
3685         for (i = dlb->ev_ports[port_id].outstanding_releases; i > 0; i--)
3686                 rte_event_enqueue_burst(dev_id, port_id, &ev, 1);
3687 }
3688
3689 static void
3690 dlb_drain(struct rte_eventdev *dev)
3691 {
3692         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
3693         struct dlb_eventdev_port *ev_port = NULL;
3694         uint8_t dev_id;
3695         int i;
3696
3697         dev_id = dev->data->dev_id;
3698
3699         while (!dlb_linked_queues_empty(dlb)) {
3700                 /* Flush all the ev_ports, which will drain all their connected
3701                  * queues.
3702                  */
3703                 for (i = 0; i < dlb->num_ports; i++)
3704                         dlb_flush_port(dev, i);
3705         }
3706
3707         /* The queues are empty, but there may be events left in the ports. */
3708         for (i = 0; i < dlb->num_ports; i++)
3709                 dlb_flush_port(dev, i);
3710
3711         /* If the domain's queues are empty, we're done. */
3712         if (dlb_queues_empty(dlb))
3713                 return;
3714
3715         /* Else, there must be at least one unlinked load-balanced queue.
3716          * Select a load-balanced port with which to drain the unlinked
3717          * queue(s).
3718          */
3719         for (i = 0; i < dlb->num_ports; i++) {
3720                 ev_port = &dlb->ev_ports[i];
3721
3722                 if (!ev_port->qm_port.is_directed)
3723                         break;
3724         }
3725
3726         if (i == dlb->num_ports) {
3727                 DLB_LOG_ERR("internal error: no LDB ev_ports\n");
3728                 return;
3729         }
3730
3731         rte_errno = 0;
3732         rte_event_port_unlink(dev_id, ev_port->id, NULL, 0);
3733
3734         if (rte_errno) {
3735                 DLB_LOG_ERR("internal error: failed to unlink ev_port %d\n",
3736                             ev_port->id);
3737                 return;
3738         }
3739
3740         for (i = 0; i < dlb->num_queues; i++) {
3741                 uint8_t qid, prio;
3742                 int ret;
3743
3744                 if (dlb_queue_is_empty(dlb, &dlb->ev_queues[i]))
3745                         continue;
3746
3747                 qid = i;
3748                 prio = 0;
3749
3750                 /* Link the ev_port to the queue */
3751                 ret = rte_event_port_link(dev_id, ev_port->id, &qid, &prio, 1);
3752                 if (ret != 1) {
3753                         DLB_LOG_ERR("internal error: failed to link ev_port %d to queue %d\n",
3754                                     ev_port->id, qid);
3755                         return;
3756                 }
3757
3758                 /* Flush the queue */
3759                 while (!dlb_queue_is_empty(dlb, &dlb->ev_queues[i]))
3760                         dlb_flush_port(dev, ev_port->id);
3761
3762                 /* Drain any extant events in the ev_port. */
3763                 dlb_flush_port(dev, ev_port->id);
3764
3765                 /* Unlink the ev_port from the queue */
3766                 ret = rte_event_port_unlink(dev_id, ev_port->id, &qid, 1);
3767                 if (ret != 1) {
3768                         DLB_LOG_ERR("internal error: failed to unlink ev_port %d to queue %d\n",
3769                                     ev_port->id, qid);
3770                         return;
3771                 }
3772         }
3773 }
3774
3775 static void
3776 dlb_eventdev_stop(struct rte_eventdev *dev)
3777 {
3778         struct dlb_eventdev *dlb = dlb_pmd_priv(dev);
3779
3780         rte_spinlock_lock(&dlb->qm_instance.resource_lock);
3781
3782         if (dlb->run_state == DLB_RUN_STATE_STOPPED) {
3783                 DLB_LOG_DBG("Internal error: already stopped\n");
3784                 rte_spinlock_unlock(&dlb->qm_instance.resource_lock);
3785                 return;
3786         } else if (dlb->run_state != DLB_RUN_STATE_STARTED) {
3787                 DLB_LOG_ERR("Internal error: bad state %d for dev_stop\n",
3788                             (int)dlb->run_state);
3789                 rte_spinlock_unlock(&dlb->qm_instance.resource_lock);
3790                 return;
3791         }
3792
3793         dlb->run_state = DLB_RUN_STATE_STOPPING;
3794
3795         rte_spinlock_unlock(&dlb->qm_instance.resource_lock);
3796
3797         dlb_drain(dev);
3798
3799         dlb->run_state = DLB_RUN_STATE_STOPPED;
3800 }
3801
3802 static int
3803 dlb_eventdev_close(struct rte_eventdev *dev)
3804 {
3805         dlb_hw_reset_sched_domain(dev, false);
3806
3807         return 0;
3808 }
3809
3810 static void
3811 dlb_eventdev_port_release(void *port)
3812 {
3813         struct dlb_eventdev_port *ev_port = port;
3814
3815         if (ev_port) {
3816                 struct dlb_port *qm_port = &ev_port->qm_port;
3817
3818                 if (qm_port->config_state == DLB_CONFIGURED)
3819                         dlb_free_qe_mem(qm_port);
3820         }
3821 }
3822
3823 static void
3824 dlb_eventdev_queue_release(struct rte_eventdev *dev, uint8_t id)
3825 {
3826         RTE_SET_USED(dev);
3827         RTE_SET_USED(id);
3828
3829         /* This function intentionally left blank. */
3830 }
3831
3832 void
3833 dlb_entry_points_init(struct rte_eventdev *dev)
3834 {
3835         struct dlb_eventdev *dlb;
3836
3837         static struct rte_eventdev_ops dlb_eventdev_entry_ops = {
3838                 .dev_infos_get    = dlb_eventdev_info_get,
3839                 .dev_configure    = dlb_eventdev_configure,
3840                 .dev_start        = dlb_eventdev_start,
3841                 .dev_stop         = dlb_eventdev_stop,
3842                 .dev_close        = dlb_eventdev_close,
3843                 .queue_def_conf   = dlb_eventdev_queue_default_conf_get,
3844                 .port_def_conf    = dlb_eventdev_port_default_conf_get,
3845                 .queue_setup      = dlb_eventdev_queue_setup,
3846                 .queue_release    = dlb_eventdev_queue_release,
3847                 .port_setup       = dlb_eventdev_port_setup,
3848                 .port_release     = dlb_eventdev_port_release,
3849                 .port_link        = dlb_eventdev_port_link,
3850                 .port_unlink      = dlb_eventdev_port_unlink,
3851                 .port_unlinks_in_progress =
3852                                     dlb_eventdev_port_unlinks_in_progress,
3853                 .dump             = dlb_eventdev_dump,
3854                 .xstats_get       = dlb_eventdev_xstats_get,
3855                 .xstats_get_names = dlb_eventdev_xstats_get_names,
3856                 .xstats_get_by_name = dlb_eventdev_xstats_get_by_name,
3857                 .xstats_reset       = dlb_eventdev_xstats_reset,
3858                 .dev_selftest     = test_dlb_eventdev,
3859         };
3860
3861         /* Expose PMD's eventdev interface */
3862         dev->dev_ops = &dlb_eventdev_entry_ops;
3863
3864         dev->enqueue = dlb_event_enqueue;
3865         dev->enqueue_burst = dlb_event_enqueue_burst;
3866         dev->enqueue_new_burst = dlb_event_enqueue_new_burst;
3867         dev->enqueue_forward_burst = dlb_event_enqueue_forward_burst;
3868         dev->dequeue = dlb_event_dequeue;
3869         dev->dequeue_burst = dlb_event_dequeue_burst;
3870
3871         dlb = dev->data->dev_private;
3872
3873         if (dlb->poll_mode == DLB_CQ_POLL_MODE_SPARSE) {
3874                 dev->dequeue = dlb_event_dequeue_sparse;
3875                 dev->dequeue_burst = dlb_event_dequeue_burst_sparse;
3876         }
3877 }
3878
3879 int
3880 dlb_primary_eventdev_probe(struct rte_eventdev *dev,
3881                            const char *name,
3882                            struct dlb_devargs *dlb_args)
3883 {
3884         struct dlb_eventdev *dlb;
3885         int err, i;
3886
3887         dlb = dev->data->dev_private;
3888
3889         dlb->event_dev = dev; /* backlink */
3890
3891         evdev_dlb_default_info.driver_name = name;
3892
3893         dlb->max_num_events_override = dlb_args->max_num_events;
3894         dlb->num_dir_credits_override = dlb_args->num_dir_credits_override;
3895         dlb->defer_sched = dlb_args->defer_sched;
3896         dlb->num_atm_inflights_per_queue = dlb_args->num_atm_inflights;
3897
3898         /* Open the interface.
3899          * For vdev mode, this means open the dlb kernel module.
3900          */
3901         err = dlb_iface_open(&dlb->qm_instance, name);
3902         if (err < 0) {
3903                 DLB_LOG_ERR("could not open event hardware device, err=%d\n",
3904                             err);
3905                 return err;
3906         }
3907
3908         err = dlb_iface_get_device_version(&dlb->qm_instance, &dlb->revision);
3909         if (err < 0) {
3910                 DLB_LOG_ERR("dlb: failed to get the device version, err=%d\n",
3911                             err);
3912                 return err;
3913         }
3914
3915         err = dlb_hw_query_resources(dlb);
3916         if (err) {
3917                 DLB_LOG_ERR("get resources err=%d for %s\n", err, name);
3918                 return err;
3919         }
3920
3921         err = dlb_iface_get_cq_poll_mode(&dlb->qm_instance, &dlb->poll_mode);
3922         if (err < 0) {
3923                 DLB_LOG_ERR("dlb: failed to get the poll mode, err=%d\n", err);
3924                 return err;
3925         }
3926
3927         /* Complete xtstats runtime initialization */
3928         err = dlb_xstats_init(dlb);
3929         if (err) {
3930                 DLB_LOG_ERR("dlb: failed to init xstats, err=%d\n", err);
3931                 return err;
3932         }
3933
3934         /* Initialize each port's token pop mode */
3935         for (i = 0; i < DLB_MAX_NUM_PORTS; i++)
3936                 dlb->ev_ports[i].qm_port.token_pop_mode = AUTO_POP;
3937
3938         rte_spinlock_init(&dlb->qm_instance.resource_lock);
3939
3940         dlb_iface_low_level_io_init(dlb);
3941
3942         dlb_entry_points_init(dev);
3943
3944         return 0;
3945 }
3946
3947 int
3948 dlb_secondary_eventdev_probe(struct rte_eventdev *dev,
3949                              const char *name)
3950 {
3951         struct dlb_eventdev *dlb;
3952         int err;
3953
3954         dlb = dev->data->dev_private;
3955
3956         evdev_dlb_default_info.driver_name = name;
3957
3958         err = dlb_iface_open(&dlb->qm_instance, name);
3959         if (err < 0) {
3960                 DLB_LOG_ERR("could not open event hardware device, err=%d\n",
3961                             err);
3962                 return err;
3963         }
3964
3965         err = dlb_hw_query_resources(dlb);
3966         if (err) {
3967                 DLB_LOG_ERR("get resources err=%d for %s\n", err, name);
3968                 return err;
3969         }
3970
3971         dlb_iface_low_level_io_init(dlb);
3972
3973         dlb_entry_points_init(dev);
3974
3975         return 0;
3976 }
3977
3978 int
3979 dlb_parse_params(const char *params,
3980                  const char *name,
3981                  struct dlb_devargs *dlb_args)
3982 {
3983         int ret = 0;
3984         static const char * const args[] = { NUMA_NODE_ARG,
3985                                              DLB_MAX_NUM_EVENTS,
3986                                              DLB_NUM_DIR_CREDITS,
3987                                              DEV_ID_ARG,
3988                                              DLB_DEFER_SCHED_ARG,
3989                                              DLB_NUM_ATM_INFLIGHTS_ARG,
3990                                              NULL };
3991
3992         if (params && params[0] != '\0') {
3993                 struct rte_kvargs *kvlist = rte_kvargs_parse(params, args);
3994
3995                 if (kvlist == NULL) {
3996                         DLB_LOG_INFO("Ignoring unsupported parameters when creating device '%s'\n",
3997                                      name);
3998                 } else {
3999                         int ret = rte_kvargs_process(kvlist, NUMA_NODE_ARG,
4000                                                      set_numa_node,
4001                                                      &dlb_args->socket_id);
4002                         if (ret != 0) {
4003                                 DLB_LOG_ERR("%s: Error parsing numa node parameter",
4004                                             name);
4005                                 rte_kvargs_free(kvlist);
4006                                 return ret;
4007                         }
4008
4009                         ret = rte_kvargs_process(kvlist, DLB_MAX_NUM_EVENTS,
4010                                                  set_max_num_events,
4011                                                  &dlb_args->max_num_events);
4012                         if (ret != 0) {
4013                                 DLB_LOG_ERR("%s: Error parsing max_num_events parameter",
4014                                             name);
4015                                 rte_kvargs_free(kvlist);
4016                                 return ret;
4017                         }
4018
4019                         ret = rte_kvargs_process(kvlist,
4020                                         DLB_NUM_DIR_CREDITS,
4021                                         set_num_dir_credits,
4022                                         &dlb_args->num_dir_credits_override);
4023                         if (ret != 0) {
4024                                 DLB_LOG_ERR("%s: Error parsing num_dir_credits parameter",
4025                                             name);
4026                                 rte_kvargs_free(kvlist);
4027                                 return ret;
4028                         }
4029
4030                         ret = rte_kvargs_process(kvlist, DEV_ID_ARG,
4031                                                  set_dev_id,
4032                                                  &dlb_args->dev_id);
4033                         if (ret != 0) {
4034                                 DLB_LOG_ERR("%s: Error parsing dev_id parameter",
4035                                             name);
4036                                 rte_kvargs_free(kvlist);
4037                                 return ret;
4038                         }
4039
4040                         ret = rte_kvargs_process(kvlist, DLB_DEFER_SCHED_ARG,
4041                                                  set_defer_sched,
4042                                                  &dlb_args->defer_sched);
4043                         if (ret != 0) {
4044                                 DLB_LOG_ERR("%s: Error parsing defer_sched parameter",
4045                                             name);
4046                                 rte_kvargs_free(kvlist);
4047                                 return ret;
4048                         }
4049
4050                         ret = rte_kvargs_process(kvlist,
4051                                                  DLB_NUM_ATM_INFLIGHTS_ARG,
4052                                                  set_num_atm_inflights,
4053                                                  &dlb_args->num_atm_inflights);
4054                         if (ret != 0) {
4055                                 DLB_LOG_ERR("%s: Error parsing atm_inflights parameter",
4056                                             name);
4057                                 rte_kvargs_free(kvlist);
4058                                 return ret;
4059                         }
4060
4061                         rte_kvargs_free(kvlist);
4062                 }
4063         }
4064         return ret;
4065 }
4066 RTE_LOG_REGISTER(eventdev_dlb_log_level, pmd.event.dlb, NOTICE);