e6de036ca05441f48ad4680b4111527463e90627
[dpdk.git] / lib / eventdev / rte_event_eth_rx_adapter.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #if defined(LINUX)
6 #include <sys/epoll.h>
7 #endif
8 #include <unistd.h>
9
10 #include <rte_cycles.h>
11 #include <rte_common.h>
12 #include <rte_dev.h>
13 #include <rte_errno.h>
14 #include <ethdev_driver.h>
15 #include <rte_log.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
20 #include <rte_mbuf_dyn.h>
21
22 #include "rte_eventdev.h"
23 #include "eventdev_pmd.h"
24 #include "rte_eventdev_trace.h"
25 #include "rte_event_eth_rx_adapter.h"
26
27 #define BATCH_SIZE              32
28 #define BLOCK_CNT_THRESHOLD     10
29 #define ETH_EVENT_BUFFER_SIZE   (6*BATCH_SIZE)
30 #define MAX_VECTOR_SIZE         1024
31 #define MIN_VECTOR_SIZE         4
32 #define MAX_VECTOR_NS           1E9
33 #define MIN_VECTOR_NS           1E5
34
35 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
36 #define ETH_RX_ADAPTER_MEM_NAME_LEN     32
37
38 #define RSS_KEY_SIZE    40
39 /* value written to intr thread pipe to signal thread exit */
40 #define ETH_BRIDGE_INTR_THREAD_EXIT     1
41 /* Sentinel value to detect initialized file handle */
42 #define INIT_FD         -1
43
44 #define RXA_ADAPTER_ARRAY "rte_event_eth_rx_adapter_array"
45
46 /*
47  * Used to store port and queue ID of interrupting Rx queue
48  */
49 union queue_data {
50         RTE_STD_C11
51         void *ptr;
52         struct {
53                 uint16_t port;
54                 uint16_t queue;
55         };
56 };
57
58 /*
59  * There is an instance of this struct per polled Rx queue added to the
60  * adapter
61  */
62 struct eth_rx_poll_entry {
63         /* Eth port to poll */
64         uint16_t eth_dev_id;
65         /* Eth rx queue to poll */
66         uint16_t eth_rx_qid;
67 };
68
69 struct eth_rx_vector_data {
70         TAILQ_ENTRY(eth_rx_vector_data) next;
71         uint16_t port;
72         uint16_t queue;
73         uint16_t max_vector_count;
74         uint64_t event;
75         uint64_t ts;
76         uint64_t vector_timeout_ticks;
77         struct rte_mempool *vector_pool;
78         struct rte_event_vector *vector_ev;
79 } __rte_cache_aligned;
80
81 TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
82
83 /* Instance per adapter */
84 struct rte_eth_event_enqueue_buffer {
85         /* Count of events in this buffer */
86         uint16_t count;
87         /* Array of events in this buffer */
88         struct rte_event *events;
89         /* size of event buffer */
90         uint16_t events_size;
91         /* Event enqueue happens from head */
92         uint16_t head;
93         /* New packets from rte_eth_rx_burst is enqued from tail */
94         uint16_t tail;
95         /* last element in the buffer before rollover */
96         uint16_t last;
97         uint16_t last_mask;
98 };
99
100 struct rte_event_eth_rx_adapter {
101         /* RSS key */
102         uint8_t rss_key_be[RSS_KEY_SIZE];
103         /* Event device identifier */
104         uint8_t eventdev_id;
105         /* Per ethernet device structure */
106         struct eth_device_info *eth_devices;
107         /* Event port identifier */
108         uint8_t event_port_id;
109         /* Lock to serialize config updates with service function */
110         rte_spinlock_t rx_lock;
111         /* Max mbufs processed in any service function invocation */
112         uint32_t max_nb_rx;
113         /* Receive queues that need to be polled */
114         struct eth_rx_poll_entry *eth_rx_poll;
115         /* Size of the eth_rx_poll array */
116         uint16_t num_rx_polled;
117         /* Weighted round robin schedule */
118         uint32_t *wrr_sched;
119         /* wrr_sched[] size */
120         uint32_t wrr_len;
121         /* Next entry in wrr[] to begin polling */
122         uint32_t wrr_pos;
123         /* Event burst buffer */
124         struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
125         /* Vector enable flag */
126         uint8_t ena_vector;
127         /* Timestamp of previous vector expiry list traversal */
128         uint64_t prev_expiry_ts;
129         /* Minimum ticks to wait before traversing expiry list */
130         uint64_t vector_tmo_ticks;
131         /* vector list */
132         struct eth_rx_vector_data_list vector_list;
133         /* Per adapter stats */
134         struct rte_event_eth_rx_adapter_stats stats;
135         /* Block count, counts up to BLOCK_CNT_THRESHOLD */
136         uint16_t enq_block_count;
137         /* Block start ts */
138         uint64_t rx_enq_block_start_ts;
139         /* epoll fd used to wait for Rx interrupts */
140         int epd;
141         /* Num of interrupt driven interrupt queues */
142         uint32_t num_rx_intr;
143         /* Used to send <dev id, queue id> of interrupting Rx queues from
144          * the interrupt thread to the Rx thread
145          */
146         struct rte_ring *intr_ring;
147         /* Rx Queue data (dev id, queue id) for the last non-empty
148          * queue polled
149          */
150         union queue_data qd;
151         /* queue_data is valid */
152         int qd_valid;
153         /* Interrupt ring lock, synchronizes Rx thread
154          * and interrupt thread
155          */
156         rte_spinlock_t intr_ring_lock;
157         /* event array passed to rte_poll_wait */
158         struct rte_epoll_event *epoll_events;
159         /* Count of interrupt vectors in use */
160         uint32_t num_intr_vec;
161         /* Thread blocked on Rx interrupts */
162         pthread_t rx_intr_thread;
163         /* Configuration callback for rte_service configuration */
164         rte_event_eth_rx_adapter_conf_cb conf_cb;
165         /* Configuration callback argument */
166         void *conf_arg;
167         /* Set if  default_cb is being used */
168         int default_cb_arg;
169         /* Service initialization state */
170         uint8_t service_inited;
171         /* Total count of Rx queues in adapter */
172         uint32_t nb_queues;
173         /* Memory allocation name */
174         char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
175         /* Socket identifier cached from eventdev */
176         int socket_id;
177         /* Per adapter EAL service */
178         uint32_t service_id;
179         /* Adapter started flag */
180         uint8_t rxa_started;
181         /* Adapter ID */
182         uint8_t id;
183 } __rte_cache_aligned;
184
185 /* Per eth device */
186 struct eth_device_info {
187         struct rte_eth_dev *dev;
188         struct eth_rx_queue_info *rx_queue;
189         /* Rx callback */
190         rte_event_eth_rx_adapter_cb_fn cb_fn;
191         /* Rx callback argument */
192         void *cb_arg;
193         /* Set if ethdev->eventdev packet transfer uses a
194          * hardware mechanism
195          */
196         uint8_t internal_event_port;
197         /* Set if the adapter is processing rx queues for
198          * this eth device and packet processing has been
199          * started, allows for the code to know if the PMD
200          * rx_adapter_stop callback needs to be invoked
201          */
202         uint8_t dev_rx_started;
203         /* Number of queues added for this device */
204         uint16_t nb_dev_queues;
205         /* Number of poll based queues
206          * If nb_rx_poll > 0, the start callback will
207          * be invoked if not already invoked
208          */
209         uint16_t nb_rx_poll;
210         /* Number of interrupt based queues
211          * If nb_rx_intr > 0, the start callback will
212          * be invoked if not already invoked.
213          */
214         uint16_t nb_rx_intr;
215         /* Number of queues that use the shared interrupt */
216         uint16_t nb_shared_intr;
217         /* sum(wrr(q)) for all queues within the device
218          * useful when deleting all device queues
219          */
220         uint32_t wrr_len;
221         /* Intr based queue index to start polling from, this is used
222          * if the number of shared interrupts is non-zero
223          */
224         uint16_t next_q_idx;
225         /* Intr based queue indices */
226         uint16_t *intr_queue;
227         /* device generates per Rx queue interrupt for queue index
228          * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
229          */
230         int multi_intr_cap;
231         /* shared interrupt enabled */
232         int shared_intr_enabled;
233 };
234
235 /* Per Rx queue */
236 struct eth_rx_queue_info {
237         int queue_enabled;      /* True if added */
238         int intr_enabled;
239         uint8_t ena_vector;
240         uint16_t wt;            /* Polling weight */
241         uint32_t flow_id_mask;  /* Set to ~0 if app provides flow id else 0 */
242         uint64_t event;
243         struct eth_rx_vector_data vector_data;
244 };
245
246 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
247
248 /* Enable dynamic timestamp field in mbuf */
249 static uint64_t event_eth_rx_timestamp_dynflag;
250 static int event_eth_rx_timestamp_dynfield_offset = -1;
251
252 static inline rte_mbuf_timestamp_t *
253 rxa_timestamp_dynfield(struct rte_mbuf *mbuf)
254 {
255         return RTE_MBUF_DYNFIELD(mbuf,
256                 event_eth_rx_timestamp_dynfield_offset, rte_mbuf_timestamp_t *);
257 }
258
259 static inline int
260 rxa_validate_id(uint8_t id)
261 {
262         return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
263 }
264
265 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
266         if (!rxa_validate_id(id)) { \
267                 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
268                 return retval; \
269         } \
270 } while (0)
271
272 static inline int
273 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
274 {
275         return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
276 }
277
278 /* Greatest common divisor */
279 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
280 {
281         uint16_t r = a % b;
282
283         return r ? rxa_gcd_u16(b, r) : b;
284 }
285
286 /* Returns the next queue in the polling sequence
287  *
288  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
289  */
290 static int
291 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
292          unsigned int n, int *cw,
293          struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
294          uint16_t gcd, int prev)
295 {
296         int i = prev;
297         uint16_t w;
298
299         while (1) {
300                 uint16_t q;
301                 uint16_t d;
302
303                 i = (i + 1) % n;
304                 if (i == 0) {
305                         *cw = *cw - gcd;
306                         if (*cw <= 0)
307                                 *cw = max_wt;
308                 }
309
310                 q = eth_rx_poll[i].eth_rx_qid;
311                 d = eth_rx_poll[i].eth_dev_id;
312                 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
313
314                 if ((int)w >= *cw)
315                         return i;
316         }
317 }
318
319 static inline int
320 rxa_shared_intr(struct eth_device_info *dev_info,
321         int rx_queue_id)
322 {
323         int multi_intr_cap;
324
325         if (dev_info->dev->intr_handle == NULL)
326                 return 0;
327
328         multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
329         return !multi_intr_cap ||
330                 rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
331 }
332
333 static inline int
334 rxa_intr_queue(struct eth_device_info *dev_info,
335         int rx_queue_id)
336 {
337         struct eth_rx_queue_info *queue_info;
338
339         queue_info = &dev_info->rx_queue[rx_queue_id];
340         return dev_info->rx_queue &&
341                 !dev_info->internal_event_port &&
342                 queue_info->queue_enabled && queue_info->wt == 0;
343 }
344
345 static inline int
346 rxa_polled_queue(struct eth_device_info *dev_info,
347         int rx_queue_id)
348 {
349         struct eth_rx_queue_info *queue_info;
350
351         queue_info = &dev_info->rx_queue[rx_queue_id];
352         return !dev_info->internal_event_port &&
353                 dev_info->rx_queue &&
354                 queue_info->queue_enabled && queue_info->wt != 0;
355 }
356
357 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
358 static int
359 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
360 {
361         uint16_t i;
362         int n, s;
363         uint16_t nbq;
364
365         nbq = dev_info->dev->data->nb_rx_queues;
366         n = 0; /* non shared count */
367         s = 0; /* shared count */
368
369         if (rx_queue_id == -1) {
370                 for (i = 0; i < nbq; i++) {
371                         if (!rxa_shared_intr(dev_info, i))
372                                 n += add ? !rxa_intr_queue(dev_info, i) :
373                                         rxa_intr_queue(dev_info, i);
374                         else
375                                 s += add ? !rxa_intr_queue(dev_info, i) :
376                                         rxa_intr_queue(dev_info, i);
377                 }
378
379                 if (s > 0) {
380                         if ((add && dev_info->nb_shared_intr == 0) ||
381                                 (!add && dev_info->nb_shared_intr))
382                                 n += 1;
383                 }
384         } else {
385                 if (!rxa_shared_intr(dev_info, rx_queue_id))
386                         n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
387                                 rxa_intr_queue(dev_info, rx_queue_id);
388                 else
389                         n = add ? !dev_info->nb_shared_intr :
390                                 dev_info->nb_shared_intr == 1;
391         }
392
393         return add ? n : -n;
394 }
395
396 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
397  */
398 static void
399 rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
400                         struct eth_device_info *dev_info,
401                         int rx_queue_id,
402                         uint32_t *nb_rx_intr)
403 {
404         uint32_t intr_diff;
405
406         if (rx_queue_id == -1)
407                 intr_diff = dev_info->nb_rx_intr;
408         else
409                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
410
411         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
412 }
413
414 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
415  * interrupt queues could currently be poll mode Rx queues
416  */
417 static void
418 rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
419                         struct eth_device_info *dev_info,
420                         int rx_queue_id,
421                         uint32_t *nb_rx_poll,
422                         uint32_t *nb_rx_intr,
423                         uint32_t *nb_wrr)
424 {
425         uint32_t intr_diff;
426         uint32_t poll_diff;
427         uint32_t wrr_len_diff;
428
429         if (rx_queue_id == -1) {
430                 intr_diff = dev_info->dev->data->nb_rx_queues -
431                                                 dev_info->nb_rx_intr;
432                 poll_diff = dev_info->nb_rx_poll;
433                 wrr_len_diff = dev_info->wrr_len;
434         } else {
435                 intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
436                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
437                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
438                                         0;
439         }
440
441         *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
442         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
443         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
444 }
445
446 /* Calculate size of the eth_rx_poll and wrr_sched arrays
447  * after deleting poll mode rx queues
448  */
449 static void
450 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
451                         struct eth_device_info *dev_info,
452                         int rx_queue_id,
453                         uint32_t *nb_rx_poll,
454                         uint32_t *nb_wrr)
455 {
456         uint32_t poll_diff;
457         uint32_t wrr_len_diff;
458
459         if (rx_queue_id == -1) {
460                 poll_diff = dev_info->nb_rx_poll;
461                 wrr_len_diff = dev_info->wrr_len;
462         } else {
463                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
464                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
465                                         0;
466         }
467
468         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
469         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
470 }
471
472 /* Calculate nb_rx_* after adding poll mode rx queues
473  */
474 static void
475 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
476                         struct eth_device_info *dev_info,
477                         int rx_queue_id,
478                         uint16_t wt,
479                         uint32_t *nb_rx_poll,
480                         uint32_t *nb_rx_intr,
481                         uint32_t *nb_wrr)
482 {
483         uint32_t intr_diff;
484         uint32_t poll_diff;
485         uint32_t wrr_len_diff;
486
487         if (rx_queue_id == -1) {
488                 intr_diff = dev_info->nb_rx_intr;
489                 poll_diff = dev_info->dev->data->nb_rx_queues -
490                                                 dev_info->nb_rx_poll;
491                 wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
492                                 - dev_info->wrr_len;
493         } else {
494                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
495                 poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
496                 wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
497                                 wt - dev_info->rx_queue[rx_queue_id].wt :
498                                 wt;
499         }
500
501         *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
502         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
503         *nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
504 }
505
506 /* Calculate nb_rx_* after adding rx_queue_id */
507 static void
508 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
509                 struct eth_device_info *dev_info,
510                 int rx_queue_id,
511                 uint16_t wt,
512                 uint32_t *nb_rx_poll,
513                 uint32_t *nb_rx_intr,
514                 uint32_t *nb_wrr)
515 {
516         if (wt != 0)
517                 rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
518                                         wt, nb_rx_poll, nb_rx_intr, nb_wrr);
519         else
520                 rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
521                                         nb_rx_poll, nb_rx_intr, nb_wrr);
522 }
523
524 /* Calculate nb_rx_* after deleting rx_queue_id */
525 static void
526 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
527                 struct eth_device_info *dev_info,
528                 int rx_queue_id,
529                 uint32_t *nb_rx_poll,
530                 uint32_t *nb_rx_intr,
531                 uint32_t *nb_wrr)
532 {
533         rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
534                                 nb_wrr);
535         rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
536                                 nb_rx_intr);
537 }
538
539 /*
540  * Allocate the rx_poll array
541  */
542 static struct eth_rx_poll_entry *
543 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
544         uint32_t num_rx_polled)
545 {
546         size_t len;
547
548         len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
549                                                         RTE_CACHE_LINE_SIZE);
550         return  rte_zmalloc_socket(rx_adapter->mem_name,
551                                 len,
552                                 RTE_CACHE_LINE_SIZE,
553                                 rx_adapter->socket_id);
554 }
555
556 /*
557  * Allocate the WRR array
558  */
559 static uint32_t *
560 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
561 {
562         size_t len;
563
564         len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
565                         RTE_CACHE_LINE_SIZE);
566         return  rte_zmalloc_socket(rx_adapter->mem_name,
567                                 len,
568                                 RTE_CACHE_LINE_SIZE,
569                                 rx_adapter->socket_id);
570 }
571
572 static int
573 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
574                 uint32_t nb_poll,
575                 uint32_t nb_wrr,
576                 struct eth_rx_poll_entry **rx_poll,
577                 uint32_t **wrr_sched)
578 {
579
580         if (nb_poll == 0) {
581                 *rx_poll = NULL;
582                 *wrr_sched = NULL;
583                 return 0;
584         }
585
586         *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
587         if (*rx_poll == NULL) {
588                 *wrr_sched = NULL;
589                 return -ENOMEM;
590         }
591
592         *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
593         if (*wrr_sched == NULL) {
594                 rte_free(*rx_poll);
595                 return -ENOMEM;
596         }
597         return 0;
598 }
599
600 /* Precalculate WRR polling sequence for all queues in rx_adapter */
601 static void
602 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
603                 struct eth_rx_poll_entry *rx_poll,
604                 uint32_t *rx_wrr)
605 {
606         uint16_t d;
607         uint16_t q;
608         unsigned int i;
609         int prev = -1;
610         int cw = -1;
611
612         /* Initialize variables for calculation of wrr schedule */
613         uint16_t max_wrr_pos = 0;
614         unsigned int poll_q = 0;
615         uint16_t max_wt = 0;
616         uint16_t gcd = 0;
617
618         if (rx_poll == NULL)
619                 return;
620
621         /* Generate array of all queues to poll, the size of this
622          * array is poll_q
623          */
624         RTE_ETH_FOREACH_DEV(d) {
625                 uint16_t nb_rx_queues;
626                 struct eth_device_info *dev_info =
627                                 &rx_adapter->eth_devices[d];
628                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
629                 if (dev_info->rx_queue == NULL)
630                         continue;
631                 if (dev_info->internal_event_port)
632                         continue;
633                 dev_info->wrr_len = 0;
634                 for (q = 0; q < nb_rx_queues; q++) {
635                         struct eth_rx_queue_info *queue_info =
636                                 &dev_info->rx_queue[q];
637                         uint16_t wt;
638
639                         if (!rxa_polled_queue(dev_info, q))
640                                 continue;
641                         wt = queue_info->wt;
642                         rx_poll[poll_q].eth_dev_id = d;
643                         rx_poll[poll_q].eth_rx_qid = q;
644                         max_wrr_pos += wt;
645                         dev_info->wrr_len += wt;
646                         max_wt = RTE_MAX(max_wt, wt);
647                         gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
648                         poll_q++;
649                 }
650         }
651
652         /* Generate polling sequence based on weights */
653         prev = -1;
654         cw = -1;
655         for (i = 0; i < max_wrr_pos; i++) {
656                 rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
657                                      rx_poll, max_wt, gcd, prev);
658                 prev = rx_wrr[i];
659         }
660 }
661
662 static inline void
663 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
664         struct rte_ipv6_hdr **ipv6_hdr)
665 {
666         struct rte_ether_hdr *eth_hdr =
667                 rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
668         struct rte_vlan_hdr *vlan_hdr;
669
670         *ipv4_hdr = NULL;
671         *ipv6_hdr = NULL;
672
673         switch (eth_hdr->ether_type) {
674         case RTE_BE16(RTE_ETHER_TYPE_IPV4):
675                 *ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
676                 break;
677
678         case RTE_BE16(RTE_ETHER_TYPE_IPV6):
679                 *ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
680                 break;
681
682         case RTE_BE16(RTE_ETHER_TYPE_VLAN):
683                 vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
684                 switch (vlan_hdr->eth_proto) {
685                 case RTE_BE16(RTE_ETHER_TYPE_IPV4):
686                         *ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
687                         break;
688                 case RTE_BE16(RTE_ETHER_TYPE_IPV6):
689                         *ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
690                         break;
691                 default:
692                         break;
693                 }
694                 break;
695
696         default:
697                 break;
698         }
699 }
700
701 /* Calculate RSS hash for IPv4/6 */
702 static inline uint32_t
703 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
704 {
705         uint32_t input_len;
706         void *tuple;
707         struct rte_ipv4_tuple ipv4_tuple;
708         struct rte_ipv6_tuple ipv6_tuple;
709         struct rte_ipv4_hdr *ipv4_hdr;
710         struct rte_ipv6_hdr *ipv6_hdr;
711
712         rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
713
714         if (ipv4_hdr) {
715                 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
716                 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
717                 tuple = &ipv4_tuple;
718                 input_len = RTE_THASH_V4_L3_LEN;
719         } else if (ipv6_hdr) {
720                 rte_thash_load_v6_addrs(ipv6_hdr,
721                                         (union rte_thash_tuple *)&ipv6_tuple);
722                 tuple = &ipv6_tuple;
723                 input_len = RTE_THASH_V6_L3_LEN;
724         } else
725                 return 0;
726
727         return rte_softrss_be(tuple, input_len, rss_key_be);
728 }
729
730 static inline int
731 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
732 {
733         return !!rx_adapter->enq_block_count;
734 }
735
736 static inline void
737 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
738 {
739         if (rx_adapter->rx_enq_block_start_ts)
740                 return;
741
742         rx_adapter->enq_block_count++;
743         if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
744                 return;
745
746         rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
747 }
748
749 static inline void
750 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
751                     struct rte_event_eth_rx_adapter_stats *stats)
752 {
753         if (unlikely(!stats->rx_enq_start_ts))
754                 stats->rx_enq_start_ts = rte_get_tsc_cycles();
755
756         if (likely(!rxa_enq_blocked(rx_adapter)))
757                 return;
758
759         rx_adapter->enq_block_count = 0;
760         if (rx_adapter->rx_enq_block_start_ts) {
761                 stats->rx_enq_end_ts = rte_get_tsc_cycles();
762                 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
763                     rx_adapter->rx_enq_block_start_ts;
764                 rx_adapter->rx_enq_block_start_ts = 0;
765         }
766 }
767
768 /* Enqueue buffered events to event device */
769 static inline uint16_t
770 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
771 {
772         struct rte_eth_event_enqueue_buffer *buf =
773             &rx_adapter->event_enqueue_buffer;
774         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
775         uint16_t count = buf->last ? buf->last - buf->head : buf->count;
776
777         if (!count)
778                 return 0;
779
780         uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
781                                         rx_adapter->event_port_id,
782                                         &buf->events[buf->head],
783                                         count);
784         if (n != count)
785                 stats->rx_enq_retry++;
786
787         buf->head += n;
788
789         if (buf->last && n == count) {
790                 uint16_t n1;
791
792                 n1 = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
793                                         rx_adapter->event_port_id,
794                                         &buf->events[0],
795                                         buf->tail);
796
797                 if (n1 != buf->tail)
798                         stats->rx_enq_retry++;
799
800                 buf->last = 0;
801                 buf->head = n1;
802                 buf->last_mask = 0;
803                 n += n1;
804         }
805
806         n ? rxa_enq_block_end_ts(rx_adapter, stats) :
807                 rxa_enq_block_start_ts(rx_adapter);
808
809         buf->count -= n;
810         stats->rx_enq_count += n;
811
812         return n;
813 }
814
815 static inline void
816 rxa_init_vector(struct rte_event_eth_rx_adapter *rx_adapter,
817                 struct eth_rx_vector_data *vec)
818 {
819         vec->vector_ev->nb_elem = 0;
820         vec->vector_ev->port = vec->port;
821         vec->vector_ev->queue = vec->queue;
822         vec->vector_ev->attr_valid = true;
823         TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
824 }
825
826 static inline uint16_t
827 rxa_create_event_vector(struct rte_event_eth_rx_adapter *rx_adapter,
828                         struct eth_rx_queue_info *queue_info,
829                         struct rte_eth_event_enqueue_buffer *buf,
830                         struct rte_mbuf **mbufs, uint16_t num)
831 {
832         struct rte_event *ev = &buf->events[buf->count];
833         struct eth_rx_vector_data *vec;
834         uint16_t filled, space, sz;
835
836         filled = 0;
837         vec = &queue_info->vector_data;
838
839         if (vec->vector_ev == NULL) {
840                 if (rte_mempool_get(vec->vector_pool,
841                                     (void **)&vec->vector_ev) < 0) {
842                         rte_pktmbuf_free_bulk(mbufs, num);
843                         return 0;
844                 }
845                 rxa_init_vector(rx_adapter, vec);
846         }
847         while (num) {
848                 if (vec->vector_ev->nb_elem == vec->max_vector_count) {
849                         /* Event ready. */
850                         ev->event = vec->event;
851                         ev->vec = vec->vector_ev;
852                         ev++;
853                         filled++;
854                         vec->vector_ev = NULL;
855                         TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
856                         if (rte_mempool_get(vec->vector_pool,
857                                             (void **)&vec->vector_ev) < 0) {
858                                 rte_pktmbuf_free_bulk(mbufs, num);
859                                 return 0;
860                         }
861                         rxa_init_vector(rx_adapter, vec);
862                 }
863
864                 space = vec->max_vector_count - vec->vector_ev->nb_elem;
865                 sz = num > space ? space : num;
866                 memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
867                        sizeof(void *) * sz);
868                 vec->vector_ev->nb_elem += sz;
869                 num -= sz;
870                 mbufs += sz;
871                 vec->ts = rte_rdtsc();
872         }
873
874         if (vec->vector_ev->nb_elem == vec->max_vector_count) {
875                 ev->event = vec->event;
876                 ev->vec = vec->vector_ev;
877                 ev++;
878                 filled++;
879                 vec->vector_ev = NULL;
880                 TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
881         }
882
883         return filled;
884 }
885
886 static inline void
887 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
888                 uint16_t eth_dev_id,
889                 uint16_t rx_queue_id,
890                 struct rte_mbuf **mbufs,
891                 uint16_t num)
892 {
893         uint32_t i;
894         struct eth_device_info *dev_info =
895                                         &rx_adapter->eth_devices[eth_dev_id];
896         struct eth_rx_queue_info *eth_rx_queue_info =
897                                         &dev_info->rx_queue[rx_queue_id];
898         struct rte_eth_event_enqueue_buffer *buf =
899                                         &rx_adapter->event_enqueue_buffer;
900         uint16_t new_tail = buf->tail;
901         uint64_t event = eth_rx_queue_info->event;
902         uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
903         struct rte_mbuf *m = mbufs[0];
904         uint32_t rss_mask;
905         uint32_t rss;
906         int do_rss;
907         uint16_t nb_cb;
908         uint16_t dropped;
909         uint64_t ts, ts_mask;
910
911         if (!eth_rx_queue_info->ena_vector) {
912                 ts = m->ol_flags & event_eth_rx_timestamp_dynflag ?
913                                                 0 : rte_get_tsc_cycles();
914
915                 /* 0xffff ffff ffff ffff if PKT_RX_TIMESTAMP is set,
916                  * otherwise 0
917                  */
918                 ts_mask = (uint64_t)(!(m->ol_flags &
919                                        event_eth_rx_timestamp_dynflag)) - 1ULL;
920
921                 /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
922                 rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
923                 do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
924                 for (i = 0; i < num; i++) {
925                         struct rte_event *ev;
926
927                         m = mbufs[i];
928                         *rxa_timestamp_dynfield(m) = ts |
929                                         (*rxa_timestamp_dynfield(m) & ts_mask);
930
931                         ev = &buf->events[new_tail];
932
933                         rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
934                                      : m->hash.rss;
935                         ev->event = event;
936                         ev->flow_id = (rss & ~flow_id_mask) |
937                                       (ev->flow_id & flow_id_mask);
938                         ev->mbuf = m;
939                         new_tail++;
940                 }
941         } else {
942                 num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
943                                               buf, mbufs, num);
944         }
945
946         if (num && dev_info->cb_fn) {
947
948                 dropped = 0;
949                 nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
950                                        buf->last |
951                                        (buf->events_size & ~buf->last_mask),
952                                        buf->count >= BATCH_SIZE ?
953                                                 buf->count - BATCH_SIZE : 0,
954                                        &buf->events[buf->tail],
955                                        num,
956                                        dev_info->cb_arg,
957                                        &dropped);
958                 if (unlikely(nb_cb > num))
959                         RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
960                                 nb_cb, num);
961                 else
962                         num = nb_cb;
963                 if (dropped)
964                         rx_adapter->stats.rx_dropped += dropped;
965         }
966
967         buf->count += num;
968         buf->tail += num;
969 }
970
971 static inline bool
972 rxa_pkt_buf_available(struct rte_eth_event_enqueue_buffer *buf)
973 {
974         uint32_t nb_req = buf->tail + BATCH_SIZE;
975
976         if (!buf->last) {
977                 if (nb_req <= buf->events_size)
978                         return true;
979
980                 if (buf->head >= BATCH_SIZE) {
981                         buf->last_mask = ~0;
982                         buf->last = buf->tail;
983                         buf->tail = 0;
984                         return true;
985                 }
986         }
987
988         return nb_req <= buf->head;
989 }
990
991 /* Enqueue packets from  <port, q>  to event buffer */
992 static inline uint32_t
993 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
994         uint16_t port_id,
995         uint16_t queue_id,
996         uint32_t rx_count,
997         uint32_t max_rx,
998         int *rxq_empty)
999 {
1000         struct rte_mbuf *mbufs[BATCH_SIZE];
1001         struct rte_eth_event_enqueue_buffer *buf =
1002                                         &rx_adapter->event_enqueue_buffer;
1003         struct rte_event_eth_rx_adapter_stats *stats =
1004                                         &rx_adapter->stats;
1005         uint16_t n;
1006         uint32_t nb_rx = 0;
1007
1008         if (rxq_empty)
1009                 *rxq_empty = 0;
1010         /* Don't do a batch dequeue from the rx queue if there isn't
1011          * enough space in the enqueue buffer.
1012          */
1013         while (rxa_pkt_buf_available(buf)) {
1014                 if (buf->count >= BATCH_SIZE)
1015                         rxa_flush_event_buffer(rx_adapter);
1016
1017                 stats->rx_poll_count++;
1018                 n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
1019                 if (unlikely(!n)) {
1020                         if (rxq_empty)
1021                                 *rxq_empty = 1;
1022                         break;
1023                 }
1024                 rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
1025                 nb_rx += n;
1026                 if (rx_count + nb_rx > max_rx)
1027                         break;
1028         }
1029
1030         if (buf->count > 0)
1031                 rxa_flush_event_buffer(rx_adapter);
1032
1033         return nb_rx;
1034 }
1035
1036 static inline void
1037 rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
1038                 void *data)
1039 {
1040         uint16_t port_id;
1041         uint16_t queue;
1042         int err;
1043         union queue_data qd;
1044         struct eth_device_info *dev_info;
1045         struct eth_rx_queue_info *queue_info;
1046         int *intr_enabled;
1047
1048         qd.ptr = data;
1049         port_id = qd.port;
1050         queue = qd.queue;
1051
1052         dev_info = &rx_adapter->eth_devices[port_id];
1053         queue_info = &dev_info->rx_queue[queue];
1054         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1055         if (rxa_shared_intr(dev_info, queue))
1056                 intr_enabled = &dev_info->shared_intr_enabled;
1057         else
1058                 intr_enabled = &queue_info->intr_enabled;
1059
1060         if (*intr_enabled) {
1061                 *intr_enabled = 0;
1062                 err = rte_ring_enqueue(rx_adapter->intr_ring, data);
1063                 /* Entry should always be available.
1064                  * The ring size equals the maximum number of interrupt
1065                  * vectors supported (an interrupt vector is shared in
1066                  * case of shared interrupts)
1067                  */
1068                 if (err)
1069                         RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
1070                                 " to ring: %s", strerror(-err));
1071                 else
1072                         rte_eth_dev_rx_intr_disable(port_id, queue);
1073         }
1074         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1075 }
1076
1077 static int
1078 rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
1079                         uint32_t num_intr_vec)
1080 {
1081         if (rx_adapter->num_intr_vec + num_intr_vec >
1082                                 RTE_EVENT_ETH_INTR_RING_SIZE) {
1083                 RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
1084                 " %d needed %d limit %d", rx_adapter->num_intr_vec,
1085                 num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
1086                 return -ENOSPC;
1087         }
1088
1089         return 0;
1090 }
1091
1092 /* Delete entries for (dev, queue) from the interrupt ring */
1093 static void
1094 rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
1095                         struct eth_device_info *dev_info,
1096                         uint16_t rx_queue_id)
1097 {
1098         int i, n;
1099         union queue_data qd;
1100
1101         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1102
1103         n = rte_ring_count(rx_adapter->intr_ring);
1104         for (i = 0; i < n; i++) {
1105                 rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1106                 if (!rxa_shared_intr(dev_info, rx_queue_id)) {
1107                         if (qd.port == dev_info->dev->data->port_id &&
1108                                 qd.queue == rx_queue_id)
1109                                 continue;
1110                 } else {
1111                         if (qd.port == dev_info->dev->data->port_id)
1112                                 continue;
1113                 }
1114                 rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
1115         }
1116
1117         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1118 }
1119
1120 /* pthread callback handling interrupt mode receive queues
1121  * After receiving an Rx interrupt, it enqueues the port id and queue id of the
1122  * interrupting queue to the adapter's ring buffer for interrupt events.
1123  * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
1124  * the adapter service function.
1125  */
1126 static void *
1127 rxa_intr_thread(void *arg)
1128 {
1129         struct rte_event_eth_rx_adapter *rx_adapter = arg;
1130         struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
1131         int n, i;
1132
1133         while (1) {
1134                 n = rte_epoll_wait(rx_adapter->epd, epoll_events,
1135                                 RTE_EVENT_ETH_INTR_RING_SIZE, -1);
1136                 if (unlikely(n < 0))
1137                         RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
1138                                         n);
1139                 for (i = 0; i < n; i++) {
1140                         rxa_intr_ring_enqueue(rx_adapter,
1141                                         epoll_events[i].epdata.data);
1142                 }
1143         }
1144
1145         return NULL;
1146 }
1147
1148 /* Dequeue <port, q> from interrupt ring and enqueue received
1149  * mbufs to eventdev
1150  */
1151 static inline uint32_t
1152 rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
1153 {
1154         uint32_t n;
1155         uint32_t nb_rx = 0;
1156         int rxq_empty;
1157         struct rte_eth_event_enqueue_buffer *buf;
1158         rte_spinlock_t *ring_lock;
1159         uint8_t max_done = 0;
1160
1161         if (rx_adapter->num_rx_intr == 0)
1162                 return 0;
1163
1164         if (rte_ring_count(rx_adapter->intr_ring) == 0
1165                 && !rx_adapter->qd_valid)
1166                 return 0;
1167
1168         buf = &rx_adapter->event_enqueue_buffer;
1169         ring_lock = &rx_adapter->intr_ring_lock;
1170
1171         if (buf->count >= BATCH_SIZE)
1172                 rxa_flush_event_buffer(rx_adapter);
1173
1174         while (rxa_pkt_buf_available(buf)) {
1175                 struct eth_device_info *dev_info;
1176                 uint16_t port;
1177                 uint16_t queue;
1178                 union queue_data qd  = rx_adapter->qd;
1179                 int err;
1180
1181                 if (!rx_adapter->qd_valid) {
1182                         struct eth_rx_queue_info *queue_info;
1183
1184                         rte_spinlock_lock(ring_lock);
1185                         err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1186                         if (err) {
1187                                 rte_spinlock_unlock(ring_lock);
1188                                 break;
1189                         }
1190
1191                         port = qd.port;
1192                         queue = qd.queue;
1193                         rx_adapter->qd = qd;
1194                         rx_adapter->qd_valid = 1;
1195                         dev_info = &rx_adapter->eth_devices[port];
1196                         if (rxa_shared_intr(dev_info, queue))
1197                                 dev_info->shared_intr_enabled = 1;
1198                         else {
1199                                 queue_info = &dev_info->rx_queue[queue];
1200                                 queue_info->intr_enabled = 1;
1201                         }
1202                         rte_eth_dev_rx_intr_enable(port, queue);
1203                         rte_spinlock_unlock(ring_lock);
1204                 } else {
1205                         port = qd.port;
1206                         queue = qd.queue;
1207
1208                         dev_info = &rx_adapter->eth_devices[port];
1209                 }
1210
1211                 if (rxa_shared_intr(dev_info, queue)) {
1212                         uint16_t i;
1213                         uint16_t nb_queues;
1214
1215                         nb_queues = dev_info->dev->data->nb_rx_queues;
1216                         n = 0;
1217                         for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1218                                 uint8_t enq_buffer_full;
1219
1220                                 if (!rxa_intr_queue(dev_info, i))
1221                                         continue;
1222                                 n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1223                                         rx_adapter->max_nb_rx,
1224                                         &rxq_empty);
1225                                 nb_rx += n;
1226
1227                                 enq_buffer_full = !rxq_empty && n == 0;
1228                                 max_done = nb_rx > rx_adapter->max_nb_rx;
1229
1230                                 if (enq_buffer_full || max_done) {
1231                                         dev_info->next_q_idx = i;
1232                                         goto done;
1233                                 }
1234                         }
1235
1236                         rx_adapter->qd_valid = 0;
1237
1238                         /* Reinitialize for next interrupt */
1239                         dev_info->next_q_idx = dev_info->multi_intr_cap ?
1240                                                 RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1241                                                 0;
1242                 } else {
1243                         n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1244                                 rx_adapter->max_nb_rx,
1245                                 &rxq_empty);
1246                         rx_adapter->qd_valid = !rxq_empty;
1247                         nb_rx += n;
1248                         if (nb_rx > rx_adapter->max_nb_rx)
1249                                 break;
1250                 }
1251         }
1252
1253 done:
1254         rx_adapter->stats.rx_intr_packets += nb_rx;
1255         return nb_rx;
1256 }
1257
1258 /*
1259  * Polls receive queues added to the event adapter and enqueues received
1260  * packets to the event device.
1261  *
1262  * The receive code enqueues initially to a temporary buffer, the
1263  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1264  *
1265  * If there isn't space available in the temporary buffer, packets from the
1266  * Rx queue aren't dequeued from the eth device, this back pressures the
1267  * eth device, in virtual device environments this back pressure is relayed to
1268  * the hypervisor's switching layer where adjustments can be made to deal with
1269  * it.
1270  */
1271 static inline uint32_t
1272 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
1273 {
1274         uint32_t num_queue;
1275         uint32_t nb_rx = 0;
1276         struct rte_eth_event_enqueue_buffer *buf;
1277         uint32_t wrr_pos;
1278         uint32_t max_nb_rx;
1279
1280         wrr_pos = rx_adapter->wrr_pos;
1281         max_nb_rx = rx_adapter->max_nb_rx;
1282         buf = &rx_adapter->event_enqueue_buffer;
1283
1284         /* Iterate through a WRR sequence */
1285         for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1286                 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1287                 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1288                 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1289
1290                 /* Don't do a batch dequeue from the rx queue if there isn't
1291                  * enough space in the enqueue buffer.
1292                  */
1293                 if (buf->count >= BATCH_SIZE)
1294                         rxa_flush_event_buffer(rx_adapter);
1295                 if (!rxa_pkt_buf_available(buf)) {
1296                         rx_adapter->wrr_pos = wrr_pos;
1297                         return nb_rx;
1298                 }
1299
1300                 nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1301                                 NULL);
1302                 if (nb_rx > max_nb_rx) {
1303                         rx_adapter->wrr_pos =
1304                                     (wrr_pos + 1) % rx_adapter->wrr_len;
1305                         break;
1306                 }
1307
1308                 if (++wrr_pos == rx_adapter->wrr_len)
1309                         wrr_pos = 0;
1310         }
1311         return nb_rx;
1312 }
1313
1314 static void
1315 rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
1316 {
1317         struct rte_event_eth_rx_adapter *rx_adapter = arg;
1318         struct rte_eth_event_enqueue_buffer *buf =
1319                 &rx_adapter->event_enqueue_buffer;
1320         struct rte_event *ev;
1321
1322         if (buf->count)
1323                 rxa_flush_event_buffer(rx_adapter);
1324
1325         if (vec->vector_ev->nb_elem == 0)
1326                 return;
1327         ev = &buf->events[buf->count];
1328
1329         /* Event ready. */
1330         ev->event = vec->event;
1331         ev->vec = vec->vector_ev;
1332         buf->count++;
1333
1334         vec->vector_ev = NULL;
1335         vec->ts = 0;
1336 }
1337
1338 static int
1339 rxa_service_func(void *args)
1340 {
1341         struct rte_event_eth_rx_adapter *rx_adapter = args;
1342         struct rte_event_eth_rx_adapter_stats *stats;
1343
1344         if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1345                 return 0;
1346         if (!rx_adapter->rxa_started) {
1347                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1348                 return 0;
1349         }
1350
1351         if (rx_adapter->ena_vector) {
1352                 if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
1353                     rx_adapter->vector_tmo_ticks) {
1354                         struct eth_rx_vector_data *vec;
1355
1356                         TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1357                                 uint64_t elapsed_time = rte_rdtsc() - vec->ts;
1358
1359                                 if (elapsed_time >= vec->vector_timeout_ticks) {
1360                                         rxa_vector_expire(vec, rx_adapter);
1361                                         TAILQ_REMOVE(&rx_adapter->vector_list,
1362                                                      vec, next);
1363                                 }
1364                         }
1365                         rx_adapter->prev_expiry_ts = rte_rdtsc();
1366                 }
1367         }
1368
1369         stats = &rx_adapter->stats;
1370         stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
1371         stats->rx_packets += rxa_poll(rx_adapter);
1372         rte_spinlock_unlock(&rx_adapter->rx_lock);
1373         return 0;
1374 }
1375
1376 static int
1377 rte_event_eth_rx_adapter_init(void)
1378 {
1379         const char *name = RXA_ADAPTER_ARRAY;
1380         const struct rte_memzone *mz;
1381         unsigned int sz;
1382
1383         sz = sizeof(*event_eth_rx_adapter) *
1384             RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1385         sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1386
1387         mz = rte_memzone_lookup(name);
1388         if (mz == NULL) {
1389                 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1390                                                  RTE_CACHE_LINE_SIZE);
1391                 if (mz == NULL) {
1392                         RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1393                                         PRId32, rte_errno);
1394                         return -rte_errno;
1395                 }
1396         }
1397
1398         event_eth_rx_adapter = mz->addr;
1399         return 0;
1400 }
1401
1402 static int
1403 rxa_memzone_lookup(void)
1404 {
1405         const struct rte_memzone *mz;
1406
1407         if (event_eth_rx_adapter == NULL) {
1408                 mz = rte_memzone_lookup(RXA_ADAPTER_ARRAY);
1409                 if (mz == NULL)
1410                         return -ENOMEM;
1411                 event_eth_rx_adapter = mz->addr;
1412         }
1413
1414         return 0;
1415 }
1416
1417 static inline struct rte_event_eth_rx_adapter *
1418 rxa_id_to_adapter(uint8_t id)
1419 {
1420         return event_eth_rx_adapter ?
1421                 event_eth_rx_adapter[id] : NULL;
1422 }
1423
1424 static int
1425 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1426                 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1427 {
1428         int ret;
1429         struct rte_eventdev *dev;
1430         struct rte_event_dev_config dev_conf;
1431         int started;
1432         uint8_t port_id;
1433         struct rte_event_port_conf *port_conf = arg;
1434         struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1435
1436         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1437         dev_conf = dev->data->dev_conf;
1438
1439         started = dev->data->dev_started;
1440         if (started)
1441                 rte_event_dev_stop(dev_id);
1442         port_id = dev_conf.nb_event_ports;
1443         dev_conf.nb_event_ports += 1;
1444         ret = rte_event_dev_configure(dev_id, &dev_conf);
1445         if (ret) {
1446                 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1447                                                 dev_id);
1448                 if (started) {
1449                         if (rte_event_dev_start(dev_id))
1450                                 return -EIO;
1451                 }
1452                 return ret;
1453         }
1454
1455         ret = rte_event_port_setup(dev_id, port_id, port_conf);
1456         if (ret) {
1457                 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1458                                         port_id);
1459                 return ret;
1460         }
1461
1462         conf->event_port_id = port_id;
1463         conf->max_nb_rx = 128;
1464         if (started)
1465                 ret = rte_event_dev_start(dev_id);
1466         rx_adapter->default_cb_arg = 1;
1467         return ret;
1468 }
1469
1470 static int
1471 rxa_epoll_create1(void)
1472 {
1473 #if defined(LINUX)
1474         int fd;
1475         fd = epoll_create1(EPOLL_CLOEXEC);
1476         return fd < 0 ? -errno : fd;
1477 #elif defined(BSD)
1478         return -ENOTSUP;
1479 #endif
1480 }
1481
1482 static int
1483 rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
1484 {
1485         if (rx_adapter->epd != INIT_FD)
1486                 return 0;
1487
1488         rx_adapter->epd = rxa_epoll_create1();
1489         if (rx_adapter->epd < 0) {
1490                 int err = rx_adapter->epd;
1491                 rx_adapter->epd = INIT_FD;
1492                 RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1493                 return err;
1494         }
1495
1496         return 0;
1497 }
1498
1499 static int
1500 rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1501 {
1502         int err;
1503         char thread_name[RTE_MAX_THREAD_NAME_LEN];
1504
1505         if (rx_adapter->intr_ring)
1506                 return 0;
1507
1508         rx_adapter->intr_ring = rte_ring_create("intr_ring",
1509                                         RTE_EVENT_ETH_INTR_RING_SIZE,
1510                                         rte_socket_id(), 0);
1511         if (!rx_adapter->intr_ring)
1512                 return -ENOMEM;
1513
1514         rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1515                                         RTE_EVENT_ETH_INTR_RING_SIZE *
1516                                         sizeof(struct rte_epoll_event),
1517                                         RTE_CACHE_LINE_SIZE,
1518                                         rx_adapter->socket_id);
1519         if (!rx_adapter->epoll_events) {
1520                 err = -ENOMEM;
1521                 goto error;
1522         }
1523
1524         rte_spinlock_init(&rx_adapter->intr_ring_lock);
1525
1526         snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1527                         "rx-intr-thread-%d", rx_adapter->id);
1528
1529         err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1530                                 NULL, rxa_intr_thread, rx_adapter);
1531         if (!err)
1532                 return 0;
1533
1534         RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1535         rte_free(rx_adapter->epoll_events);
1536 error:
1537         rte_ring_free(rx_adapter->intr_ring);
1538         rx_adapter->intr_ring = NULL;
1539         rx_adapter->epoll_events = NULL;
1540         return err;
1541 }
1542
1543 static int
1544 rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1545 {
1546         int err;
1547
1548         err = pthread_cancel(rx_adapter->rx_intr_thread);
1549         if (err)
1550                 RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1551                                 err);
1552
1553         err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1554         if (err)
1555                 RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1556
1557         rte_free(rx_adapter->epoll_events);
1558         rte_ring_free(rx_adapter->intr_ring);
1559         rx_adapter->intr_ring = NULL;
1560         rx_adapter->epoll_events = NULL;
1561         return 0;
1562 }
1563
1564 static int
1565 rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
1566 {
1567         int ret;
1568
1569         if (rx_adapter->num_rx_intr == 0)
1570                 return 0;
1571
1572         ret = rxa_destroy_intr_thread(rx_adapter);
1573         if (ret)
1574                 return ret;
1575
1576         close(rx_adapter->epd);
1577         rx_adapter->epd = INIT_FD;
1578
1579         return ret;
1580 }
1581
1582 static int
1583 rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1584         struct eth_device_info *dev_info,
1585         uint16_t rx_queue_id)
1586 {
1587         int err;
1588         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1589         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1590
1591         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1592         if (err) {
1593                 RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1594                         rx_queue_id);
1595                 return err;
1596         }
1597
1598         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1599                                         rx_adapter->epd,
1600                                         RTE_INTR_EVENT_DEL,
1601                                         0);
1602         if (err)
1603                 RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1604
1605         if (sintr)
1606                 dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1607         else
1608                 dev_info->shared_intr_enabled = 0;
1609         return err;
1610 }
1611
1612 static int
1613 rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1614                 struct eth_device_info *dev_info,
1615                 int rx_queue_id)
1616 {
1617         int err;
1618         int i;
1619         int s;
1620
1621         if (dev_info->nb_rx_intr == 0)
1622                 return 0;
1623
1624         err = 0;
1625         if (rx_queue_id == -1) {
1626                 s = dev_info->nb_shared_intr;
1627                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1628                         int sintr;
1629                         uint16_t q;
1630
1631                         q = dev_info->intr_queue[i];
1632                         sintr = rxa_shared_intr(dev_info, q);
1633                         s -= sintr;
1634
1635                         if (!sintr || s == 0) {
1636
1637                                 err = rxa_disable_intr(rx_adapter, dev_info,
1638                                                 q);
1639                                 if (err)
1640                                         return err;
1641                                 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1642                                                         q);
1643                         }
1644                 }
1645         } else {
1646                 if (!rxa_intr_queue(dev_info, rx_queue_id))
1647                         return 0;
1648                 if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1649                                 dev_info->nb_shared_intr == 1) {
1650                         err = rxa_disable_intr(rx_adapter, dev_info,
1651                                         rx_queue_id);
1652                         if (err)
1653                                 return err;
1654                         rxa_intr_ring_del_entries(rx_adapter, dev_info,
1655                                                 rx_queue_id);
1656                 }
1657
1658                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1659                         if (dev_info->intr_queue[i] == rx_queue_id) {
1660                                 for (; i < dev_info->nb_rx_intr - 1; i++)
1661                                         dev_info->intr_queue[i] =
1662                                                 dev_info->intr_queue[i + 1];
1663                                 break;
1664                         }
1665                 }
1666         }
1667
1668         return err;
1669 }
1670
1671 static int
1672 rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1673         struct eth_device_info *dev_info,
1674         uint16_t rx_queue_id)
1675 {
1676         int err, err1;
1677         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1678         union queue_data qd;
1679         int init_fd;
1680         uint16_t *intr_queue;
1681         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1682
1683         if (rxa_intr_queue(dev_info, rx_queue_id))
1684                 return 0;
1685
1686         intr_queue = dev_info->intr_queue;
1687         if (dev_info->intr_queue == NULL) {
1688                 size_t len =
1689                         dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1690                 dev_info->intr_queue =
1691                         rte_zmalloc_socket(
1692                                 rx_adapter->mem_name,
1693                                 len,
1694                                 0,
1695                                 rx_adapter->socket_id);
1696                 if (dev_info->intr_queue == NULL)
1697                         return -ENOMEM;
1698         }
1699
1700         init_fd = rx_adapter->epd;
1701         err = rxa_init_epd(rx_adapter);
1702         if (err)
1703                 goto err_free_queue;
1704
1705         qd.port = eth_dev_id;
1706         qd.queue = rx_queue_id;
1707
1708         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1709                                         rx_adapter->epd,
1710                                         RTE_INTR_EVENT_ADD,
1711                                         qd.ptr);
1712         if (err) {
1713                 RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1714                         " Rx Queue %u err %d", rx_queue_id, err);
1715                 goto err_del_fd;
1716         }
1717
1718         err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1719         if (err) {
1720                 RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1721                                 " Rx Queue %u err %d", rx_queue_id, err);
1722
1723                 goto err_del_event;
1724         }
1725
1726         err = rxa_create_intr_thread(rx_adapter);
1727         if (!err)  {
1728                 if (sintr)
1729                         dev_info->shared_intr_enabled = 1;
1730                 else
1731                         dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1732                 return 0;
1733         }
1734
1735
1736         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1737         if (err)
1738                 RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1739                                 " Rx Queue %u err %d", rx_queue_id, err);
1740 err_del_event:
1741         err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1742                                         rx_adapter->epd,
1743                                         RTE_INTR_EVENT_DEL,
1744                                         0);
1745         if (err1) {
1746                 RTE_EDEV_LOG_ERR("Could not delete event for"
1747                                 " Rx Queue %u err %d", rx_queue_id, err1);
1748         }
1749 err_del_fd:
1750         if (init_fd == INIT_FD) {
1751                 close(rx_adapter->epd);
1752                 rx_adapter->epd = -1;
1753         }
1754 err_free_queue:
1755         if (intr_queue == NULL)
1756                 rte_free(dev_info->intr_queue);
1757
1758         return err;
1759 }
1760
1761 static int
1762 rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1763         struct eth_device_info *dev_info,
1764         int rx_queue_id)
1765
1766 {
1767         int i, j, err;
1768         int si = -1;
1769         int shared_done = (dev_info->nb_shared_intr > 0);
1770
1771         if (rx_queue_id != -1) {
1772                 if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1773                         return 0;
1774                 return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1775         }
1776
1777         err = 0;
1778         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1779
1780                 if (rxa_shared_intr(dev_info, i) && shared_done)
1781                         continue;
1782
1783                 err = rxa_config_intr(rx_adapter, dev_info, i);
1784
1785                 shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1786                 if (shared_done) {
1787                         si = i;
1788                         dev_info->shared_intr_enabled = 1;
1789                 }
1790                 if (err)
1791                         break;
1792         }
1793
1794         if (err == 0)
1795                 return 0;
1796
1797         shared_done = (dev_info->nb_shared_intr > 0);
1798         for (j = 0; j < i; j++) {
1799                 if (rxa_intr_queue(dev_info, j))
1800                         continue;
1801                 if (rxa_shared_intr(dev_info, j) && si != j)
1802                         continue;
1803                 err = rxa_disable_intr(rx_adapter, dev_info, j);
1804                 if (err)
1805                         break;
1806
1807         }
1808
1809         return err;
1810 }
1811
1812
1813 static int
1814 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
1815 {
1816         int ret;
1817         struct rte_service_spec service;
1818         struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1819
1820         if (rx_adapter->service_inited)
1821                 return 0;
1822
1823         memset(&service, 0, sizeof(service));
1824         snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1825                 "rte_event_eth_rx_adapter_%d", id);
1826         service.socket_id = rx_adapter->socket_id;
1827         service.callback = rxa_service_func;
1828         service.callback_userdata = rx_adapter;
1829         /* Service function handles locking for queue add/del updates */
1830         service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1831         ret = rte_service_component_register(&service, &rx_adapter->service_id);
1832         if (ret) {
1833                 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1834                         service.name, ret);
1835                 return ret;
1836         }
1837
1838         ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1839                 &rx_adapter_conf, rx_adapter->conf_arg);
1840         if (ret) {
1841                 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1842                         ret);
1843                 goto err_done;
1844         }
1845         rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1846         rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1847         rx_adapter->service_inited = 1;
1848         rx_adapter->epd = INIT_FD;
1849         return 0;
1850
1851 err_done:
1852         rte_service_component_unregister(rx_adapter->service_id);
1853         return ret;
1854 }
1855
1856 static void
1857 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1858                 struct eth_device_info *dev_info,
1859                 int32_t rx_queue_id,
1860                 uint8_t add)
1861 {
1862         struct eth_rx_queue_info *queue_info;
1863         int enabled;
1864         uint16_t i;
1865
1866         if (dev_info->rx_queue == NULL)
1867                 return;
1868
1869         if (rx_queue_id == -1) {
1870                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1871                         rxa_update_queue(rx_adapter, dev_info, i, add);
1872         } else {
1873                 queue_info = &dev_info->rx_queue[rx_queue_id];
1874                 enabled = queue_info->queue_enabled;
1875                 if (add) {
1876                         rx_adapter->nb_queues += !enabled;
1877                         dev_info->nb_dev_queues += !enabled;
1878                 } else {
1879                         rx_adapter->nb_queues -= enabled;
1880                         dev_info->nb_dev_queues -= enabled;
1881                 }
1882                 queue_info->queue_enabled = !!add;
1883         }
1884 }
1885
1886 static void
1887 rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t vector_count,
1888                     uint64_t vector_ns, struct rte_mempool *mp, uint32_t qid,
1889                     uint16_t port_id)
1890 {
1891 #define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
1892         struct eth_rx_vector_data *vector_data;
1893         uint32_t flow_id;
1894
1895         vector_data = &queue_info->vector_data;
1896         vector_data->max_vector_count = vector_count;
1897         vector_data->port = port_id;
1898         vector_data->queue = qid;
1899         vector_data->vector_pool = mp;
1900         vector_data->vector_timeout_ticks =
1901                 NSEC2TICK(vector_ns, rte_get_timer_hz());
1902         vector_data->ts = 0;
1903         flow_id = queue_info->event & 0xFFFFF;
1904         flow_id =
1905                 flow_id == 0 ? (qid & 0xFFF) | (port_id & 0xFF) << 12 : flow_id;
1906         vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
1907 }
1908
1909 static void
1910 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
1911         struct eth_device_info *dev_info,
1912         int32_t rx_queue_id)
1913 {
1914         struct eth_rx_vector_data *vec;
1915         int pollq;
1916         int intrq;
1917         int sintrq;
1918
1919
1920         if (rx_adapter->nb_queues == 0)
1921                 return;
1922
1923         if (rx_queue_id == -1) {
1924                 uint16_t nb_rx_queues;
1925                 uint16_t i;
1926
1927                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1928                 for (i = 0; i < nb_rx_queues; i++)
1929                         rxa_sw_del(rx_adapter, dev_info, i);
1930                 return;
1931         }
1932
1933         /* Push all the partial event vectors to event device. */
1934         TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1935                 if (vec->queue != rx_queue_id)
1936                         continue;
1937                 rxa_vector_expire(vec, rx_adapter);
1938                 TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
1939         }
1940
1941         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1942         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1943         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1944         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1945         rx_adapter->num_rx_polled -= pollq;
1946         dev_info->nb_rx_poll -= pollq;
1947         rx_adapter->num_rx_intr -= intrq;
1948         dev_info->nb_rx_intr -= intrq;
1949         dev_info->nb_shared_intr -= intrq && sintrq;
1950 }
1951
1952 static void
1953 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1954         struct eth_device_info *dev_info,
1955         int32_t rx_queue_id,
1956         const struct rte_event_eth_rx_adapter_queue_conf *conf)
1957 {
1958         struct eth_rx_queue_info *queue_info;
1959         const struct rte_event *ev = &conf->ev;
1960         int pollq;
1961         int intrq;
1962         int sintrq;
1963         struct rte_event *qi_ev;
1964
1965         if (rx_queue_id == -1) {
1966                 uint16_t nb_rx_queues;
1967                 uint16_t i;
1968
1969                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1970                 for (i = 0; i < nb_rx_queues; i++)
1971                         rxa_add_queue(rx_adapter, dev_info, i, conf);
1972                 return;
1973         }
1974
1975         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1976         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1977         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1978
1979         queue_info = &dev_info->rx_queue[rx_queue_id];
1980         queue_info->wt = conf->servicing_weight;
1981
1982         qi_ev = (struct rte_event *)&queue_info->event;
1983         qi_ev->event = ev->event;
1984         qi_ev->op = RTE_EVENT_OP_NEW;
1985         qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
1986         qi_ev->sub_event_type = 0;
1987
1988         if (conf->rx_queue_flags &
1989                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
1990                 queue_info->flow_id_mask = ~0;
1991         } else
1992                 qi_ev->flow_id = 0;
1993
1994         if (conf->rx_queue_flags &
1995             RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
1996                 queue_info->ena_vector = 1;
1997                 qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
1998                 rxa_set_vector_data(queue_info, conf->vector_sz,
1999                                     conf->vector_timeout_ns, conf->vector_mp,
2000                                     rx_queue_id, dev_info->dev->data->port_id);
2001                 rx_adapter->ena_vector = 1;
2002                 rx_adapter->vector_tmo_ticks =
2003                         rx_adapter->vector_tmo_ticks ?
2004                                       RTE_MIN(queue_info->vector_data
2005                                                         .vector_timeout_ticks >>
2006                                                 1,
2007                                         rx_adapter->vector_tmo_ticks) :
2008                                 queue_info->vector_data.vector_timeout_ticks >>
2009                                         1;
2010         }
2011
2012         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
2013         if (rxa_polled_queue(dev_info, rx_queue_id)) {
2014                 rx_adapter->num_rx_polled += !pollq;
2015                 dev_info->nb_rx_poll += !pollq;
2016                 rx_adapter->num_rx_intr -= intrq;
2017                 dev_info->nb_rx_intr -= intrq;
2018                 dev_info->nb_shared_intr -= intrq && sintrq;
2019         }
2020
2021         if (rxa_intr_queue(dev_info, rx_queue_id)) {
2022                 rx_adapter->num_rx_polled -= pollq;
2023                 dev_info->nb_rx_poll -= pollq;
2024                 rx_adapter->num_rx_intr += !intrq;
2025                 dev_info->nb_rx_intr += !intrq;
2026                 dev_info->nb_shared_intr += !intrq && sintrq;
2027                 if (dev_info->nb_shared_intr == 1) {
2028                         if (dev_info->multi_intr_cap)
2029                                 dev_info->next_q_idx =
2030                                         RTE_MAX_RXTX_INTR_VEC_ID - 1;
2031                         else
2032                                 dev_info->next_q_idx = 0;
2033                 }
2034         }
2035 }
2036
2037 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
2038                 uint16_t eth_dev_id,
2039                 int rx_queue_id,
2040                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2041 {
2042         struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
2043         struct rte_event_eth_rx_adapter_queue_conf temp_conf;
2044         int ret;
2045         struct eth_rx_poll_entry *rx_poll;
2046         struct eth_rx_queue_info *rx_queue;
2047         uint32_t *rx_wrr;
2048         uint16_t nb_rx_queues;
2049         uint32_t nb_rx_poll, nb_wrr;
2050         uint32_t nb_rx_intr;
2051         int num_intr_vec;
2052         uint16_t wt;
2053
2054         if (queue_conf->servicing_weight == 0) {
2055                 struct rte_eth_dev_data *data = dev_info->dev->data;
2056
2057                 temp_conf = *queue_conf;
2058                 if (!data->dev_conf.intr_conf.rxq) {
2059                         /* If Rx interrupts are disabled set wt = 1 */
2060                         temp_conf.servicing_weight = 1;
2061                 }
2062                 queue_conf = &temp_conf;
2063         }
2064
2065         nb_rx_queues = dev_info->dev->data->nb_rx_queues;
2066         rx_queue = dev_info->rx_queue;
2067         wt = queue_conf->servicing_weight;
2068
2069         if (dev_info->rx_queue == NULL) {
2070                 dev_info->rx_queue =
2071                     rte_zmalloc_socket(rx_adapter->mem_name,
2072                                        nb_rx_queues *
2073                                        sizeof(struct eth_rx_queue_info), 0,
2074                                        rx_adapter->socket_id);
2075                 if (dev_info->rx_queue == NULL)
2076                         return -ENOMEM;
2077         }
2078         rx_wrr = NULL;
2079         rx_poll = NULL;
2080
2081         rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
2082                         queue_conf->servicing_weight,
2083                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2084
2085         if (dev_info->dev->intr_handle)
2086                 dev_info->multi_intr_cap =
2087                         rte_intr_cap_multiple(dev_info->dev->intr_handle);
2088
2089         ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2090                                 &rx_poll, &rx_wrr);
2091         if (ret)
2092                 goto err_free_rxqueue;
2093
2094         if (wt == 0) {
2095                 num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
2096
2097                 ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
2098                 if (ret)
2099                         goto err_free_rxqueue;
2100
2101                 ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
2102                 if (ret)
2103                         goto err_free_rxqueue;
2104         } else {
2105
2106                 num_intr_vec = 0;
2107                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2108                         num_intr_vec = rxa_nb_intr_vect(dev_info,
2109                                                 rx_queue_id, 0);
2110                         /* interrupt based queues are being converted to
2111                          * poll mode queues, delete the interrupt configuration
2112                          * for those.
2113                          */
2114                         ret = rxa_del_intr_queue(rx_adapter,
2115                                                 dev_info, rx_queue_id);
2116                         if (ret)
2117                                 goto err_free_rxqueue;
2118                 }
2119         }
2120
2121         if (nb_rx_intr == 0) {
2122                 ret = rxa_free_intr_resources(rx_adapter);
2123                 if (ret)
2124                         goto err_free_rxqueue;
2125         }
2126
2127         if (wt == 0) {
2128                 uint16_t i;
2129
2130                 if (rx_queue_id  == -1) {
2131                         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
2132                                 dev_info->intr_queue[i] = i;
2133                 } else {
2134                         if (!rxa_intr_queue(dev_info, rx_queue_id))
2135                                 dev_info->intr_queue[nb_rx_intr - 1] =
2136                                         rx_queue_id;
2137                 }
2138         }
2139
2140
2141
2142         rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
2143         rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2144
2145         rte_free(rx_adapter->eth_rx_poll);
2146         rte_free(rx_adapter->wrr_sched);
2147
2148         rx_adapter->eth_rx_poll = rx_poll;
2149         rx_adapter->wrr_sched = rx_wrr;
2150         rx_adapter->wrr_len = nb_wrr;
2151         rx_adapter->num_intr_vec += num_intr_vec;
2152         return 0;
2153
2154 err_free_rxqueue:
2155         if (rx_queue == NULL) {
2156                 rte_free(dev_info->rx_queue);
2157                 dev_info->rx_queue = NULL;
2158         }
2159
2160         rte_free(rx_poll);
2161         rte_free(rx_wrr);
2162
2163         return 0;
2164 }
2165
2166 static int
2167 rxa_ctrl(uint8_t id, int start)
2168 {
2169         struct rte_event_eth_rx_adapter *rx_adapter;
2170         struct rte_eventdev *dev;
2171         struct eth_device_info *dev_info;
2172         uint32_t i;
2173         int use_service = 0;
2174         int stop = !start;
2175
2176         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2177         rx_adapter = rxa_id_to_adapter(id);
2178         if (rx_adapter == NULL)
2179                 return -EINVAL;
2180
2181         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2182
2183         RTE_ETH_FOREACH_DEV(i) {
2184                 dev_info = &rx_adapter->eth_devices[i];
2185                 /* if start  check for num dev queues */
2186                 if (start && !dev_info->nb_dev_queues)
2187                         continue;
2188                 /* if stop check if dev has been started */
2189                 if (stop && !dev_info->dev_rx_started)
2190                         continue;
2191                 use_service |= !dev_info->internal_event_port;
2192                 dev_info->dev_rx_started = start;
2193                 if (dev_info->internal_event_port == 0)
2194                         continue;
2195                 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
2196                                                 &rte_eth_devices[i]) :
2197                         (*dev->dev_ops->eth_rx_adapter_stop)(dev,
2198                                                 &rte_eth_devices[i]);
2199         }
2200
2201         if (use_service) {
2202                 rte_spinlock_lock(&rx_adapter->rx_lock);
2203                 rx_adapter->rxa_started = start;
2204                 rte_service_runstate_set(rx_adapter->service_id, start);
2205                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2206         }
2207
2208         return 0;
2209 }
2210
2211 static int
2212 rxa_create(uint8_t id, uint8_t dev_id,
2213            struct rte_event_eth_rx_adapter_params *rxa_params,
2214            rte_event_eth_rx_adapter_conf_cb conf_cb,
2215            void *conf_arg)
2216 {
2217         struct rte_event_eth_rx_adapter *rx_adapter;
2218         struct rte_eth_event_enqueue_buffer *buf;
2219         struct rte_event *events;
2220         int ret;
2221         int socket_id;
2222         uint16_t i;
2223         char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
2224         const uint8_t default_rss_key[] = {
2225                 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
2226                 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
2227                 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
2228                 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
2229                 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
2230         };
2231
2232         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2233         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2234
2235         if (conf_cb == NULL)
2236                 return -EINVAL;
2237
2238         if (event_eth_rx_adapter == NULL) {
2239                 ret = rte_event_eth_rx_adapter_init();
2240                 if (ret)
2241                         return ret;
2242         }
2243
2244         rx_adapter = rxa_id_to_adapter(id);
2245         if (rx_adapter != NULL) {
2246                 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
2247                 return -EEXIST;
2248         }
2249
2250         socket_id = rte_event_dev_socket_id(dev_id);
2251         snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
2252                 "rte_event_eth_rx_adapter_%d",
2253                 id);
2254
2255         rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
2256                         RTE_CACHE_LINE_SIZE, socket_id);
2257         if (rx_adapter == NULL) {
2258                 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
2259                 return -ENOMEM;
2260         }
2261
2262         rx_adapter->eventdev_id = dev_id;
2263         rx_adapter->socket_id = socket_id;
2264         rx_adapter->conf_cb = conf_cb;
2265         rx_adapter->conf_arg = conf_arg;
2266         rx_adapter->id = id;
2267         TAILQ_INIT(&rx_adapter->vector_list);
2268         strcpy(rx_adapter->mem_name, mem_name);
2269         rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
2270                                         RTE_MAX_ETHPORTS *
2271                                         sizeof(struct eth_device_info), 0,
2272                                         socket_id);
2273         rte_convert_rss_key((const uint32_t *)default_rss_key,
2274                         (uint32_t *)rx_adapter->rss_key_be,
2275                             RTE_DIM(default_rss_key));
2276
2277         if (rx_adapter->eth_devices == NULL) {
2278                 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
2279                 rte_free(rx_adapter);
2280                 return -ENOMEM;
2281         }
2282
2283         rte_spinlock_init(&rx_adapter->rx_lock);
2284
2285         for (i = 0; i < RTE_MAX_ETHPORTS; i++)
2286                 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
2287
2288         /* Rx adapter event buffer allocation */
2289         buf = &rx_adapter->event_enqueue_buffer;
2290         buf->events_size = rxa_params->event_buf_size;
2291
2292         events = rte_zmalloc_socket(rx_adapter->mem_name,
2293                                     buf->events_size * sizeof(*events),
2294                                     0, socket_id);
2295         if (events == NULL) {
2296                 RTE_EDEV_LOG_ERR("Failed to allocate mem for event buffer\n");
2297                 rte_free(rx_adapter->eth_devices);
2298                 rte_free(rx_adapter);
2299                 return -ENOMEM;
2300         }
2301
2302         rx_adapter->event_enqueue_buffer.events = events;
2303
2304         event_eth_rx_adapter[id] = rx_adapter;
2305
2306         if (conf_cb == rxa_default_conf_cb)
2307                 rx_adapter->default_cb_arg = 1;
2308
2309         if (rte_mbuf_dyn_rx_timestamp_register(
2310                         &event_eth_rx_timestamp_dynfield_offset,
2311                         &event_eth_rx_timestamp_dynflag) != 0) {
2312                 RTE_EDEV_LOG_ERR("Error registering timestamp field in mbuf\n");
2313                 return -rte_errno;
2314         }
2315
2316         rte_eventdev_trace_eth_rx_adapter_create(id, dev_id, conf_cb,
2317                 conf_arg);
2318         return 0;
2319 }
2320
2321 int
2322 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
2323                                 rte_event_eth_rx_adapter_conf_cb conf_cb,
2324                                 void *conf_arg)
2325 {
2326         struct rte_event_eth_rx_adapter_params rxa_params = {0};
2327
2328         /* use default values for adapter params */
2329         rxa_params.event_buf_size = ETH_EVENT_BUFFER_SIZE;
2330
2331         return rxa_create(id, dev_id, &rxa_params, conf_cb, conf_arg);
2332 }
2333
2334 int
2335 rte_event_eth_rx_adapter_create_with_params(uint8_t id, uint8_t dev_id,
2336                         struct rte_event_port_conf *port_config,
2337                         struct rte_event_eth_rx_adapter_params *rxa_params)
2338 {
2339         struct rte_event_port_conf *pc;
2340         int ret;
2341         struct rte_event_eth_rx_adapter_params temp_params = {0};
2342
2343         if (port_config == NULL)
2344                 return -EINVAL;
2345
2346         /* use default values if rxa_params is NULL */
2347         if (rxa_params == NULL) {
2348                 rxa_params = &temp_params;
2349                 rxa_params->event_buf_size = ETH_EVENT_BUFFER_SIZE;
2350         }
2351
2352         if (rxa_params->event_buf_size == 0)
2353                 return -EINVAL;
2354
2355         pc = rte_malloc(NULL, sizeof(*pc), 0);
2356         if (pc == NULL)
2357                 return -ENOMEM;
2358
2359         *pc = *port_config;
2360
2361         /* adjust event buff size with BATCH_SIZE used for fetching packets
2362          * from NIC rx queues to get full buffer utilization and prevent
2363          * unnecessary rollovers.
2364          */
2365         rxa_params->event_buf_size = RTE_ALIGN(rxa_params->event_buf_size,
2366                                                BATCH_SIZE);
2367         rxa_params->event_buf_size += (BATCH_SIZE + BATCH_SIZE);
2368
2369         ret = rxa_create(id, dev_id, rxa_params, rxa_default_conf_cb, pc);
2370         if (ret)
2371                 rte_free(pc);
2372
2373         return ret;
2374 }
2375
2376 int
2377 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2378                 struct rte_event_port_conf *port_config)
2379 {
2380         struct rte_event_port_conf *pc;
2381         int ret;
2382
2383         if (port_config == NULL)
2384                 return -EINVAL;
2385
2386         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2387
2388         pc = rte_malloc(NULL, sizeof(*pc), 0);
2389         if (pc == NULL)
2390                 return -ENOMEM;
2391         *pc = *port_config;
2392
2393         ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2394                                         rxa_default_conf_cb,
2395                                         pc);
2396         if (ret)
2397                 rte_free(pc);
2398         return ret;
2399 }
2400
2401 int
2402 rte_event_eth_rx_adapter_free(uint8_t id)
2403 {
2404         struct rte_event_eth_rx_adapter *rx_adapter;
2405
2406         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2407
2408         rx_adapter = rxa_id_to_adapter(id);
2409         if (rx_adapter == NULL)
2410                 return -EINVAL;
2411
2412         if (rx_adapter->nb_queues) {
2413                 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2414                                 rx_adapter->nb_queues);
2415                 return -EBUSY;
2416         }
2417
2418         if (rx_adapter->default_cb_arg)
2419                 rte_free(rx_adapter->conf_arg);
2420         rte_free(rx_adapter->eth_devices);
2421         rte_free(rx_adapter->event_enqueue_buffer.events);
2422         rte_free(rx_adapter);
2423         event_eth_rx_adapter[id] = NULL;
2424
2425         rte_eventdev_trace_eth_rx_adapter_free(id);
2426         return 0;
2427 }
2428
2429 int
2430 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2431                 uint16_t eth_dev_id,
2432                 int32_t rx_queue_id,
2433                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2434 {
2435         int ret;
2436         uint32_t cap;
2437         struct rte_event_eth_rx_adapter *rx_adapter;
2438         struct rte_eventdev *dev;
2439         struct eth_device_info *dev_info;
2440         struct rte_event_eth_rx_adapter_vector_limits limits;
2441
2442         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2443         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2444
2445         rx_adapter = rxa_id_to_adapter(id);
2446         if ((rx_adapter == NULL) || (queue_conf == NULL))
2447                 return -EINVAL;
2448
2449         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2450         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2451                                                 eth_dev_id,
2452                                                 &cap);
2453         if (ret) {
2454                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2455                         "eth port %" PRIu16, id, eth_dev_id);
2456                 return ret;
2457         }
2458
2459         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2460                 && (queue_conf->rx_queue_flags &
2461                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2462                 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2463                                 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2464                                 eth_dev_id, id);
2465                 return -EINVAL;
2466         }
2467
2468         if (queue_conf->rx_queue_flags &
2469             RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
2470
2471                 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0) {
2472                         RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
2473                                          " eth port: %" PRIu16
2474                                          " adapter id: %" PRIu8,
2475                                          eth_dev_id, id);
2476                         return -EINVAL;
2477                 }
2478
2479                 ret = rte_event_eth_rx_adapter_vector_limits_get(
2480                         rx_adapter->eventdev_id, eth_dev_id, &limits);
2481                 if (ret < 0) {
2482                         RTE_EDEV_LOG_ERR("Failed to get event device vector limits,"
2483                                          " eth port: %" PRIu16
2484                                          " adapter id: %" PRIu8,
2485                                          eth_dev_id, id);
2486                         return -EINVAL;
2487                 }
2488                 if (queue_conf->vector_sz < limits.min_sz ||
2489                     queue_conf->vector_sz > limits.max_sz ||
2490                     queue_conf->vector_timeout_ns < limits.min_timeout_ns ||
2491                     queue_conf->vector_timeout_ns > limits.max_timeout_ns ||
2492                     queue_conf->vector_mp == NULL) {
2493                         RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2494                                          " eth port: %" PRIu16
2495                                          " adapter id: %" PRIu8,
2496                                          eth_dev_id, id);
2497                         return -EINVAL;
2498                 }
2499                 if (queue_conf->vector_mp->elt_size <
2500                     (sizeof(struct rte_event_vector) +
2501                      (sizeof(uintptr_t) * queue_conf->vector_sz))) {
2502                         RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2503                                          " eth port: %" PRIu16
2504                                          " adapter id: %" PRIu8,
2505                                          eth_dev_id, id);
2506                         return -EINVAL;
2507                 }
2508         }
2509
2510         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2511                 (rx_queue_id != -1)) {
2512                 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2513                         "event queue, eth port: %" PRIu16 " adapter id: %"
2514                         PRIu8, eth_dev_id, id);
2515                 return -EINVAL;
2516         }
2517
2518         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2519                         rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2520                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2521                          (uint16_t)rx_queue_id);
2522                 return -EINVAL;
2523         }
2524
2525         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2526
2527         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2528                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2529                                         -ENOTSUP);
2530                 if (dev_info->rx_queue == NULL) {
2531                         dev_info->rx_queue =
2532                             rte_zmalloc_socket(rx_adapter->mem_name,
2533                                         dev_info->dev->data->nb_rx_queues *
2534                                         sizeof(struct eth_rx_queue_info), 0,
2535                                         rx_adapter->socket_id);
2536                         if (dev_info->rx_queue == NULL)
2537                                 return -ENOMEM;
2538                 }
2539
2540                 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2541                                 &rte_eth_devices[eth_dev_id],
2542                                 rx_queue_id, queue_conf);
2543                 if (ret == 0) {
2544                         dev_info->internal_event_port = 1;
2545                         rxa_update_queue(rx_adapter,
2546                                         &rx_adapter->eth_devices[eth_dev_id],
2547                                         rx_queue_id,
2548                                         1);
2549                 }
2550         } else {
2551                 rte_spinlock_lock(&rx_adapter->rx_lock);
2552                 dev_info->internal_event_port = 0;
2553                 ret = rxa_init_service(rx_adapter, id);
2554                 if (ret == 0) {
2555                         uint32_t service_id = rx_adapter->service_id;
2556                         ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2557                                         queue_conf);
2558                         rte_service_component_runstate_set(service_id,
2559                                 rxa_sw_adapter_queue_count(rx_adapter));
2560                 }
2561                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2562         }
2563
2564         rte_eventdev_trace_eth_rx_adapter_queue_add(id, eth_dev_id,
2565                 rx_queue_id, queue_conf, ret);
2566         if (ret)
2567                 return ret;
2568
2569         return 0;
2570 }
2571
2572 static int
2573 rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
2574 {
2575         limits->max_sz = MAX_VECTOR_SIZE;
2576         limits->min_sz = MIN_VECTOR_SIZE;
2577         limits->max_timeout_ns = MAX_VECTOR_NS;
2578         limits->min_timeout_ns = MIN_VECTOR_NS;
2579
2580         return 0;
2581 }
2582
2583 int
2584 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2585                                 int32_t rx_queue_id)
2586 {
2587         int ret = 0;
2588         struct rte_eventdev *dev;
2589         struct rte_event_eth_rx_adapter *rx_adapter;
2590         struct eth_device_info *dev_info;
2591         uint32_t cap;
2592         uint32_t nb_rx_poll = 0;
2593         uint32_t nb_wrr = 0;
2594         uint32_t nb_rx_intr;
2595         struct eth_rx_poll_entry *rx_poll = NULL;
2596         uint32_t *rx_wrr = NULL;
2597         int num_intr_vec;
2598
2599         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2600         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2601
2602         rx_adapter = rxa_id_to_adapter(id);
2603         if (rx_adapter == NULL)
2604                 return -EINVAL;
2605
2606         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2607         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2608                                                 eth_dev_id,
2609                                                 &cap);
2610         if (ret)
2611                 return ret;
2612
2613         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2614                 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2615                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2616                          (uint16_t)rx_queue_id);
2617                 return -EINVAL;
2618         }
2619
2620         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2621
2622         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2623                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2624                                  -ENOTSUP);
2625                 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2626                                                 &rte_eth_devices[eth_dev_id],
2627                                                 rx_queue_id);
2628                 if (ret == 0) {
2629                         rxa_update_queue(rx_adapter,
2630                                         &rx_adapter->eth_devices[eth_dev_id],
2631                                         rx_queue_id,
2632                                         0);
2633                         if (dev_info->nb_dev_queues == 0) {
2634                                 rte_free(dev_info->rx_queue);
2635                                 dev_info->rx_queue = NULL;
2636                         }
2637                 }
2638         } else {
2639                 rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2640                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2641
2642                 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2643                         &rx_poll, &rx_wrr);
2644                 if (ret)
2645                         return ret;
2646
2647                 rte_spinlock_lock(&rx_adapter->rx_lock);
2648
2649                 num_intr_vec = 0;
2650                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2651
2652                         num_intr_vec = rxa_nb_intr_vect(dev_info,
2653                                                 rx_queue_id, 0);
2654                         ret = rxa_del_intr_queue(rx_adapter, dev_info,
2655                                         rx_queue_id);
2656                         if (ret)
2657                                 goto unlock_ret;
2658                 }
2659
2660                 if (nb_rx_intr == 0) {
2661                         ret = rxa_free_intr_resources(rx_adapter);
2662                         if (ret)
2663                                 goto unlock_ret;
2664                 }
2665
2666                 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2667                 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2668
2669                 rte_free(rx_adapter->eth_rx_poll);
2670                 rte_free(rx_adapter->wrr_sched);
2671
2672                 if (nb_rx_intr == 0) {
2673                         rte_free(dev_info->intr_queue);
2674                         dev_info->intr_queue = NULL;
2675                 }
2676
2677                 rx_adapter->eth_rx_poll = rx_poll;
2678                 rx_adapter->wrr_sched = rx_wrr;
2679                 rx_adapter->wrr_len = nb_wrr;
2680                 rx_adapter->num_intr_vec += num_intr_vec;
2681
2682                 if (dev_info->nb_dev_queues == 0) {
2683                         rte_free(dev_info->rx_queue);
2684                         dev_info->rx_queue = NULL;
2685                 }
2686 unlock_ret:
2687                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2688                 if (ret) {
2689                         rte_free(rx_poll);
2690                         rte_free(rx_wrr);
2691                         return ret;
2692                 }
2693
2694                 rte_service_component_runstate_set(rx_adapter->service_id,
2695                                 rxa_sw_adapter_queue_count(rx_adapter));
2696         }
2697
2698         rte_eventdev_trace_eth_rx_adapter_queue_del(id, eth_dev_id,
2699                 rx_queue_id, ret);
2700         return ret;
2701 }
2702
2703 int
2704 rte_event_eth_rx_adapter_vector_limits_get(
2705         uint8_t dev_id, uint16_t eth_port_id,
2706         struct rte_event_eth_rx_adapter_vector_limits *limits)
2707 {
2708         struct rte_eventdev *dev;
2709         uint32_t cap;
2710         int ret;
2711
2712         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2713         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_port_id, -EINVAL);
2714
2715         if (limits == NULL)
2716                 return -EINVAL;
2717
2718         dev = &rte_eventdevs[dev_id];
2719
2720         ret = rte_event_eth_rx_adapter_caps_get(dev_id, eth_port_id, &cap);
2721         if (ret) {
2722                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2723                                  "eth port %" PRIu16,
2724                                  dev_id, eth_port_id);
2725                 return ret;
2726         }
2727
2728         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2729                 RTE_FUNC_PTR_OR_ERR_RET(
2730                         *dev->dev_ops->eth_rx_adapter_vector_limits_get,
2731                         -ENOTSUP);
2732                 ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
2733                         dev, &rte_eth_devices[eth_port_id], limits);
2734         } else {
2735                 ret = rxa_sw_vector_limits(limits);
2736         }
2737
2738         return ret;
2739 }
2740
2741 int
2742 rte_event_eth_rx_adapter_start(uint8_t id)
2743 {
2744         rte_eventdev_trace_eth_rx_adapter_start(id);
2745         return rxa_ctrl(id, 1);
2746 }
2747
2748 int
2749 rte_event_eth_rx_adapter_stop(uint8_t id)
2750 {
2751         rte_eventdev_trace_eth_rx_adapter_stop(id);
2752         return rxa_ctrl(id, 0);
2753 }
2754
2755 int
2756 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2757                                struct rte_event_eth_rx_adapter_stats *stats)
2758 {
2759         struct rte_event_eth_rx_adapter *rx_adapter;
2760         struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2761         struct rte_event_eth_rx_adapter_stats dev_stats;
2762         struct rte_eventdev *dev;
2763         struct eth_device_info *dev_info;
2764         uint32_t i;
2765         int ret;
2766
2767         if (rxa_memzone_lookup())
2768                 return -ENOMEM;
2769
2770         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2771
2772         rx_adapter = rxa_id_to_adapter(id);
2773         if (rx_adapter  == NULL || stats == NULL)
2774                 return -EINVAL;
2775
2776         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2777         memset(stats, 0, sizeof(*stats));
2778         RTE_ETH_FOREACH_DEV(i) {
2779                 dev_info = &rx_adapter->eth_devices[i];
2780                 if (dev_info->internal_event_port == 0 ||
2781                         dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2782                         continue;
2783                 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2784                                                 &rte_eth_devices[i],
2785                                                 &dev_stats);
2786                 if (ret)
2787                         continue;
2788                 dev_stats_sum.rx_packets += dev_stats.rx_packets;
2789                 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2790         }
2791
2792         if (rx_adapter->service_inited)
2793                 *stats = rx_adapter->stats;
2794
2795         stats->rx_packets += dev_stats_sum.rx_packets;
2796         stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2797
2798         return 0;
2799 }
2800
2801 int
2802 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
2803 {
2804         struct rte_event_eth_rx_adapter *rx_adapter;
2805         struct rte_eventdev *dev;
2806         struct eth_device_info *dev_info;
2807         uint32_t i;
2808
2809         if (rxa_memzone_lookup())
2810                 return -ENOMEM;
2811
2812         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2813
2814         rx_adapter = rxa_id_to_adapter(id);
2815         if (rx_adapter == NULL)
2816                 return -EINVAL;
2817
2818         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2819         RTE_ETH_FOREACH_DEV(i) {
2820                 dev_info = &rx_adapter->eth_devices[i];
2821                 if (dev_info->internal_event_port == 0 ||
2822                         dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
2823                         continue;
2824                 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
2825                                                         &rte_eth_devices[i]);
2826         }
2827
2828         memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
2829         return 0;
2830 }
2831
2832 int
2833 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
2834 {
2835         struct rte_event_eth_rx_adapter *rx_adapter;
2836
2837         if (rxa_memzone_lookup())
2838                 return -ENOMEM;
2839
2840         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2841
2842         rx_adapter = rxa_id_to_adapter(id);
2843         if (rx_adapter == NULL || service_id == NULL)
2844                 return -EINVAL;
2845
2846         if (rx_adapter->service_inited)
2847                 *service_id = rx_adapter->service_id;
2848
2849         return rx_adapter->service_inited ? 0 : -ESRCH;
2850 }
2851
2852 int
2853 rte_event_eth_rx_adapter_cb_register(uint8_t id,
2854                                         uint16_t eth_dev_id,
2855                                         rte_event_eth_rx_adapter_cb_fn cb_fn,
2856                                         void *cb_arg)
2857 {
2858         struct rte_event_eth_rx_adapter *rx_adapter;
2859         struct eth_device_info *dev_info;
2860         uint32_t cap;
2861         int ret;
2862
2863         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2864         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2865
2866         rx_adapter = rxa_id_to_adapter(id);
2867         if (rx_adapter == NULL)
2868                 return -EINVAL;
2869
2870         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2871         if (dev_info->rx_queue == NULL)
2872                 return -EINVAL;
2873
2874         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2875                                                 eth_dev_id,
2876                                                 &cap);
2877         if (ret) {
2878                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2879                         "eth port %" PRIu16, id, eth_dev_id);
2880                 return ret;
2881         }
2882
2883         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2884                 RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
2885                                 PRIu16, eth_dev_id);
2886                 return -EINVAL;
2887         }
2888
2889         rte_spinlock_lock(&rx_adapter->rx_lock);
2890         dev_info->cb_fn = cb_fn;
2891         dev_info->cb_arg = cb_arg;
2892         rte_spinlock_unlock(&rx_adapter->rx_lock);
2893
2894         return 0;
2895 }
2896
2897 int
2898 rte_event_eth_rx_adapter_queue_conf_get(uint8_t id,
2899                         uint16_t eth_dev_id,
2900                         uint16_t rx_queue_id,
2901                         struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2902 {
2903         struct rte_eventdev *dev;
2904         struct rte_event_eth_rx_adapter *rx_adapter;
2905         struct eth_device_info *dev_info;
2906         struct eth_rx_queue_info *queue_info;
2907         struct rte_event *qi_ev;
2908         int ret;
2909
2910         if (rxa_memzone_lookup())
2911                 return -ENOMEM;
2912
2913         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2914         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2915
2916         if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2917                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
2918                 return -EINVAL;
2919         }
2920
2921         if (queue_conf == NULL) {
2922                 RTE_EDEV_LOG_ERR("Rx queue conf struct cannot be NULL");
2923                 return -EINVAL;
2924         }
2925
2926         rx_adapter = rxa_id_to_adapter(id);
2927         if (rx_adapter == NULL)
2928                 return -EINVAL;
2929
2930         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2931         if (dev_info->rx_queue == NULL ||
2932             !dev_info->rx_queue[rx_queue_id].queue_enabled) {
2933                 RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
2934                 return -EINVAL;
2935         }
2936
2937         queue_info = &dev_info->rx_queue[rx_queue_id];
2938         qi_ev = (struct rte_event *)&queue_info->event;
2939
2940         memset(queue_conf, 0, sizeof(*queue_conf));
2941         queue_conf->rx_queue_flags = 0;
2942         if (queue_info->flow_id_mask != 0)
2943                 queue_conf->rx_queue_flags |=
2944                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID;
2945         queue_conf->servicing_weight = queue_info->wt;
2946
2947         memcpy(&queue_conf->ev, qi_ev, sizeof(*qi_ev));
2948
2949         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2950         if (dev->dev_ops->eth_rx_adapter_queue_conf_get != NULL) {
2951                 ret = (*dev->dev_ops->eth_rx_adapter_queue_conf_get)(dev,
2952                                                 &rte_eth_devices[eth_dev_id],
2953                                                 rx_queue_id,
2954                                                 queue_conf);
2955                 return ret;
2956         }
2957
2958         return 0;
2959 }