cc93fcc10b7765e076be8ab8f11be09aff2cf513
[dpdk.git] / lib / eventdev / rte_event_eth_rx_adapter.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #if defined(LINUX)
6 #include <sys/epoll.h>
7 #endif
8 #include <unistd.h>
9
10 #include <rte_cycles.h>
11 #include <rte_common.h>
12 #include <rte_dev.h>
13 #include <rte_errno.h>
14 #include <ethdev_driver.h>
15 #include <rte_log.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
20
21 #include "rte_eventdev.h"
22 #include "eventdev_pmd.h"
23 #include "rte_eventdev_trace.h"
24 #include "rte_event_eth_rx_adapter.h"
25
26 #define BATCH_SIZE              32
27 #define BLOCK_CNT_THRESHOLD     10
28 #define ETH_EVENT_BUFFER_SIZE   (6*BATCH_SIZE)
29 #define MAX_VECTOR_SIZE         1024
30 #define MIN_VECTOR_SIZE         4
31 #define MAX_VECTOR_NS           1E9
32 #define MIN_VECTOR_NS           1E5
33
34 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
35 #define ETH_RX_ADAPTER_MEM_NAME_LEN     32
36
37 #define RSS_KEY_SIZE    40
38 /* value written to intr thread pipe to signal thread exit */
39 #define ETH_BRIDGE_INTR_THREAD_EXIT     1
40 /* Sentinel value to detect initialized file handle */
41 #define INIT_FD         -1
42
43 /*
44  * Used to store port and queue ID of interrupting Rx queue
45  */
46 union queue_data {
47         RTE_STD_C11
48         void *ptr;
49         struct {
50                 uint16_t port;
51                 uint16_t queue;
52         };
53 };
54
55 /*
56  * There is an instance of this struct per polled Rx queue added to the
57  * adapter
58  */
59 struct eth_rx_poll_entry {
60         /* Eth port to poll */
61         uint16_t eth_dev_id;
62         /* Eth rx queue to poll */
63         uint16_t eth_rx_qid;
64 };
65
66 struct eth_rx_vector_data {
67         TAILQ_ENTRY(eth_rx_vector_data) next;
68         uint16_t port;
69         uint16_t queue;
70         uint16_t max_vector_count;
71         uint64_t event;
72         uint64_t ts;
73         uint64_t vector_timeout_ticks;
74         struct rte_mempool *vector_pool;
75         struct rte_event_vector *vector_ev;
76 } __rte_cache_aligned;
77
78 TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
79
80 /* Instance per adapter */
81 struct rte_eth_event_enqueue_buffer {
82         /* Count of events in this buffer */
83         uint16_t count;
84         /* Array of events in this buffer */
85         struct rte_event events[ETH_EVENT_BUFFER_SIZE];
86         /* Event enqueue happens from head */
87         uint16_t head;
88         /* New packets from rte_eth_rx_burst is enqued from tail */
89         uint16_t tail;
90         /* last element in the buffer before rollover */
91         uint16_t last;
92         uint16_t last_mask;
93 };
94
95 struct rte_event_eth_rx_adapter {
96         /* RSS key */
97         uint8_t rss_key_be[RSS_KEY_SIZE];
98         /* Event device identifier */
99         uint8_t eventdev_id;
100         /* Per ethernet device structure */
101         struct eth_device_info *eth_devices;
102         /* Event port identifier */
103         uint8_t event_port_id;
104         /* Lock to serialize config updates with service function */
105         rte_spinlock_t rx_lock;
106         /* Max mbufs processed in any service function invocation */
107         uint32_t max_nb_rx;
108         /* Receive queues that need to be polled */
109         struct eth_rx_poll_entry *eth_rx_poll;
110         /* Size of the eth_rx_poll array */
111         uint16_t num_rx_polled;
112         /* Weighted round robin schedule */
113         uint32_t *wrr_sched;
114         /* wrr_sched[] size */
115         uint32_t wrr_len;
116         /* Next entry in wrr[] to begin polling */
117         uint32_t wrr_pos;
118         /* Event burst buffer */
119         struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
120         /* Vector enable flag */
121         uint8_t ena_vector;
122         /* Timestamp of previous vector expiry list traversal */
123         uint64_t prev_expiry_ts;
124         /* Minimum ticks to wait before traversing expiry list */
125         uint64_t vector_tmo_ticks;
126         /* vector list */
127         struct eth_rx_vector_data_list vector_list;
128         /* Per adapter stats */
129         struct rte_event_eth_rx_adapter_stats stats;
130         /* Block count, counts up to BLOCK_CNT_THRESHOLD */
131         uint16_t enq_block_count;
132         /* Block start ts */
133         uint64_t rx_enq_block_start_ts;
134         /* epoll fd used to wait for Rx interrupts */
135         int epd;
136         /* Num of interrupt driven interrupt queues */
137         uint32_t num_rx_intr;
138         /* Used to send <dev id, queue id> of interrupting Rx queues from
139          * the interrupt thread to the Rx thread
140          */
141         struct rte_ring *intr_ring;
142         /* Rx Queue data (dev id, queue id) for the last non-empty
143          * queue polled
144          */
145         union queue_data qd;
146         /* queue_data is valid */
147         int qd_valid;
148         /* Interrupt ring lock, synchronizes Rx thread
149          * and interrupt thread
150          */
151         rte_spinlock_t intr_ring_lock;
152         /* event array passed to rte_poll_wait */
153         struct rte_epoll_event *epoll_events;
154         /* Count of interrupt vectors in use */
155         uint32_t num_intr_vec;
156         /* Thread blocked on Rx interrupts */
157         pthread_t rx_intr_thread;
158         /* Configuration callback for rte_service configuration */
159         rte_event_eth_rx_adapter_conf_cb conf_cb;
160         /* Configuration callback argument */
161         void *conf_arg;
162         /* Set if  default_cb is being used */
163         int default_cb_arg;
164         /* Service initialization state */
165         uint8_t service_inited;
166         /* Total count of Rx queues in adapter */
167         uint32_t nb_queues;
168         /* Memory allocation name */
169         char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
170         /* Socket identifier cached from eventdev */
171         int socket_id;
172         /* Per adapter EAL service */
173         uint32_t service_id;
174         /* Adapter started flag */
175         uint8_t rxa_started;
176         /* Adapter ID */
177         uint8_t id;
178 } __rte_cache_aligned;
179
180 /* Per eth device */
181 struct eth_device_info {
182         struct rte_eth_dev *dev;
183         struct eth_rx_queue_info *rx_queue;
184         /* Rx callback */
185         rte_event_eth_rx_adapter_cb_fn cb_fn;
186         /* Rx callback argument */
187         void *cb_arg;
188         /* Set if ethdev->eventdev packet transfer uses a
189          * hardware mechanism
190          */
191         uint8_t internal_event_port;
192         /* Set if the adapter is processing rx queues for
193          * this eth device and packet processing has been
194          * started, allows for the code to know if the PMD
195          * rx_adapter_stop callback needs to be invoked
196          */
197         uint8_t dev_rx_started;
198         /* Number of queues added for this device */
199         uint16_t nb_dev_queues;
200         /* Number of poll based queues
201          * If nb_rx_poll > 0, the start callback will
202          * be invoked if not already invoked
203          */
204         uint16_t nb_rx_poll;
205         /* Number of interrupt based queues
206          * If nb_rx_intr > 0, the start callback will
207          * be invoked if not already invoked.
208          */
209         uint16_t nb_rx_intr;
210         /* Number of queues that use the shared interrupt */
211         uint16_t nb_shared_intr;
212         /* sum(wrr(q)) for all queues within the device
213          * useful when deleting all device queues
214          */
215         uint32_t wrr_len;
216         /* Intr based queue index to start polling from, this is used
217          * if the number of shared interrupts is non-zero
218          */
219         uint16_t next_q_idx;
220         /* Intr based queue indices */
221         uint16_t *intr_queue;
222         /* device generates per Rx queue interrupt for queue index
223          * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
224          */
225         int multi_intr_cap;
226         /* shared interrupt enabled */
227         int shared_intr_enabled;
228 };
229
230 /* Per Rx queue */
231 struct eth_rx_queue_info {
232         int queue_enabled;      /* True if added */
233         int intr_enabled;
234         uint8_t ena_vector;
235         uint16_t wt;            /* Polling weight */
236         uint32_t flow_id_mask;  /* Set to ~0 if app provides flow id else 0 */
237         uint64_t event;
238         struct eth_rx_vector_data vector_data;
239 };
240
241 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
242
243 static inline int
244 rxa_validate_id(uint8_t id)
245 {
246         return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
247 }
248
249 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
250         if (!rxa_validate_id(id)) { \
251                 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
252                 return retval; \
253         } \
254 } while (0)
255
256 static inline int
257 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
258 {
259         return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
260 }
261
262 /* Greatest common divisor */
263 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
264 {
265         uint16_t r = a % b;
266
267         return r ? rxa_gcd_u16(b, r) : b;
268 }
269
270 /* Returns the next queue in the polling sequence
271  *
272  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
273  */
274 static int
275 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
276          unsigned int n, int *cw,
277          struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
278          uint16_t gcd, int prev)
279 {
280         int i = prev;
281         uint16_t w;
282
283         while (1) {
284                 uint16_t q;
285                 uint16_t d;
286
287                 i = (i + 1) % n;
288                 if (i == 0) {
289                         *cw = *cw - gcd;
290                         if (*cw <= 0)
291                                 *cw = max_wt;
292                 }
293
294                 q = eth_rx_poll[i].eth_rx_qid;
295                 d = eth_rx_poll[i].eth_dev_id;
296                 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
297
298                 if ((int)w >= *cw)
299                         return i;
300         }
301 }
302
303 static inline int
304 rxa_shared_intr(struct eth_device_info *dev_info,
305         int rx_queue_id)
306 {
307         int multi_intr_cap;
308
309         if (dev_info->dev->intr_handle == NULL)
310                 return 0;
311
312         multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
313         return !multi_intr_cap ||
314                 rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
315 }
316
317 static inline int
318 rxa_intr_queue(struct eth_device_info *dev_info,
319         int rx_queue_id)
320 {
321         struct eth_rx_queue_info *queue_info;
322
323         queue_info = &dev_info->rx_queue[rx_queue_id];
324         return dev_info->rx_queue &&
325                 !dev_info->internal_event_port &&
326                 queue_info->queue_enabled && queue_info->wt == 0;
327 }
328
329 static inline int
330 rxa_polled_queue(struct eth_device_info *dev_info,
331         int rx_queue_id)
332 {
333         struct eth_rx_queue_info *queue_info;
334
335         queue_info = &dev_info->rx_queue[rx_queue_id];
336         return !dev_info->internal_event_port &&
337                 dev_info->rx_queue &&
338                 queue_info->queue_enabled && queue_info->wt != 0;
339 }
340
341 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
342 static int
343 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
344 {
345         uint16_t i;
346         int n, s;
347         uint16_t nbq;
348
349         nbq = dev_info->dev->data->nb_rx_queues;
350         n = 0; /* non shared count */
351         s = 0; /* shared count */
352
353         if (rx_queue_id == -1) {
354                 for (i = 0; i < nbq; i++) {
355                         if (!rxa_shared_intr(dev_info, i))
356                                 n += add ? !rxa_intr_queue(dev_info, i) :
357                                         rxa_intr_queue(dev_info, i);
358                         else
359                                 s += add ? !rxa_intr_queue(dev_info, i) :
360                                         rxa_intr_queue(dev_info, i);
361                 }
362
363                 if (s > 0) {
364                         if ((add && dev_info->nb_shared_intr == 0) ||
365                                 (!add && dev_info->nb_shared_intr))
366                                 n += 1;
367                 }
368         } else {
369                 if (!rxa_shared_intr(dev_info, rx_queue_id))
370                         n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
371                                 rxa_intr_queue(dev_info, rx_queue_id);
372                 else
373                         n = add ? !dev_info->nb_shared_intr :
374                                 dev_info->nb_shared_intr == 1;
375         }
376
377         return add ? n : -n;
378 }
379
380 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
381  */
382 static void
383 rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
384                         struct eth_device_info *dev_info,
385                         int rx_queue_id,
386                         uint32_t *nb_rx_intr)
387 {
388         uint32_t intr_diff;
389
390         if (rx_queue_id == -1)
391                 intr_diff = dev_info->nb_rx_intr;
392         else
393                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
394
395         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
396 }
397
398 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
399  * interrupt queues could currently be poll mode Rx queues
400  */
401 static void
402 rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
403                         struct eth_device_info *dev_info,
404                         int rx_queue_id,
405                         uint32_t *nb_rx_poll,
406                         uint32_t *nb_rx_intr,
407                         uint32_t *nb_wrr)
408 {
409         uint32_t intr_diff;
410         uint32_t poll_diff;
411         uint32_t wrr_len_diff;
412
413         if (rx_queue_id == -1) {
414                 intr_diff = dev_info->dev->data->nb_rx_queues -
415                                                 dev_info->nb_rx_intr;
416                 poll_diff = dev_info->nb_rx_poll;
417                 wrr_len_diff = dev_info->wrr_len;
418         } else {
419                 intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
420                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
421                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
422                                         0;
423         }
424
425         *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
426         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
427         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
428 }
429
430 /* Calculate size of the eth_rx_poll and wrr_sched arrays
431  * after deleting poll mode rx queues
432  */
433 static void
434 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
435                         struct eth_device_info *dev_info,
436                         int rx_queue_id,
437                         uint32_t *nb_rx_poll,
438                         uint32_t *nb_wrr)
439 {
440         uint32_t poll_diff;
441         uint32_t wrr_len_diff;
442
443         if (rx_queue_id == -1) {
444                 poll_diff = dev_info->nb_rx_poll;
445                 wrr_len_diff = dev_info->wrr_len;
446         } else {
447                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
448                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
449                                         0;
450         }
451
452         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
453         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
454 }
455
456 /* Calculate nb_rx_* after adding poll mode rx queues
457  */
458 static void
459 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
460                         struct eth_device_info *dev_info,
461                         int rx_queue_id,
462                         uint16_t wt,
463                         uint32_t *nb_rx_poll,
464                         uint32_t *nb_rx_intr,
465                         uint32_t *nb_wrr)
466 {
467         uint32_t intr_diff;
468         uint32_t poll_diff;
469         uint32_t wrr_len_diff;
470
471         if (rx_queue_id == -1) {
472                 intr_diff = dev_info->nb_rx_intr;
473                 poll_diff = dev_info->dev->data->nb_rx_queues -
474                                                 dev_info->nb_rx_poll;
475                 wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
476                                 - dev_info->wrr_len;
477         } else {
478                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
479                 poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
480                 wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
481                                 wt - dev_info->rx_queue[rx_queue_id].wt :
482                                 wt;
483         }
484
485         *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
486         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
487         *nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
488 }
489
490 /* Calculate nb_rx_* after adding rx_queue_id */
491 static void
492 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
493                 struct eth_device_info *dev_info,
494                 int rx_queue_id,
495                 uint16_t wt,
496                 uint32_t *nb_rx_poll,
497                 uint32_t *nb_rx_intr,
498                 uint32_t *nb_wrr)
499 {
500         if (wt != 0)
501                 rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
502                                         wt, nb_rx_poll, nb_rx_intr, nb_wrr);
503         else
504                 rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
505                                         nb_rx_poll, nb_rx_intr, nb_wrr);
506 }
507
508 /* Calculate nb_rx_* after deleting rx_queue_id */
509 static void
510 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
511                 struct eth_device_info *dev_info,
512                 int rx_queue_id,
513                 uint32_t *nb_rx_poll,
514                 uint32_t *nb_rx_intr,
515                 uint32_t *nb_wrr)
516 {
517         rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
518                                 nb_wrr);
519         rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
520                                 nb_rx_intr);
521 }
522
523 /*
524  * Allocate the rx_poll array
525  */
526 static struct eth_rx_poll_entry *
527 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
528         uint32_t num_rx_polled)
529 {
530         size_t len;
531
532         len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
533                                                         RTE_CACHE_LINE_SIZE);
534         return  rte_zmalloc_socket(rx_adapter->mem_name,
535                                 len,
536                                 RTE_CACHE_LINE_SIZE,
537                                 rx_adapter->socket_id);
538 }
539
540 /*
541  * Allocate the WRR array
542  */
543 static uint32_t *
544 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
545 {
546         size_t len;
547
548         len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
549                         RTE_CACHE_LINE_SIZE);
550         return  rte_zmalloc_socket(rx_adapter->mem_name,
551                                 len,
552                                 RTE_CACHE_LINE_SIZE,
553                                 rx_adapter->socket_id);
554 }
555
556 static int
557 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
558                 uint32_t nb_poll,
559                 uint32_t nb_wrr,
560                 struct eth_rx_poll_entry **rx_poll,
561                 uint32_t **wrr_sched)
562 {
563
564         if (nb_poll == 0) {
565                 *rx_poll = NULL;
566                 *wrr_sched = NULL;
567                 return 0;
568         }
569
570         *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
571         if (*rx_poll == NULL) {
572                 *wrr_sched = NULL;
573                 return -ENOMEM;
574         }
575
576         *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
577         if (*wrr_sched == NULL) {
578                 rte_free(*rx_poll);
579                 return -ENOMEM;
580         }
581         return 0;
582 }
583
584 /* Precalculate WRR polling sequence for all queues in rx_adapter */
585 static void
586 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
587                 struct eth_rx_poll_entry *rx_poll,
588                 uint32_t *rx_wrr)
589 {
590         uint16_t d;
591         uint16_t q;
592         unsigned int i;
593         int prev = -1;
594         int cw = -1;
595
596         /* Initialize variables for calculation of wrr schedule */
597         uint16_t max_wrr_pos = 0;
598         unsigned int poll_q = 0;
599         uint16_t max_wt = 0;
600         uint16_t gcd = 0;
601
602         if (rx_poll == NULL)
603                 return;
604
605         /* Generate array of all queues to poll, the size of this
606          * array is poll_q
607          */
608         RTE_ETH_FOREACH_DEV(d) {
609                 uint16_t nb_rx_queues;
610                 struct eth_device_info *dev_info =
611                                 &rx_adapter->eth_devices[d];
612                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
613                 if (dev_info->rx_queue == NULL)
614                         continue;
615                 if (dev_info->internal_event_port)
616                         continue;
617                 dev_info->wrr_len = 0;
618                 for (q = 0; q < nb_rx_queues; q++) {
619                         struct eth_rx_queue_info *queue_info =
620                                 &dev_info->rx_queue[q];
621                         uint16_t wt;
622
623                         if (!rxa_polled_queue(dev_info, q))
624                                 continue;
625                         wt = queue_info->wt;
626                         rx_poll[poll_q].eth_dev_id = d;
627                         rx_poll[poll_q].eth_rx_qid = q;
628                         max_wrr_pos += wt;
629                         dev_info->wrr_len += wt;
630                         max_wt = RTE_MAX(max_wt, wt);
631                         gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
632                         poll_q++;
633                 }
634         }
635
636         /* Generate polling sequence based on weights */
637         prev = -1;
638         cw = -1;
639         for (i = 0; i < max_wrr_pos; i++) {
640                 rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
641                                      rx_poll, max_wt, gcd, prev);
642                 prev = rx_wrr[i];
643         }
644 }
645
646 static inline void
647 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
648         struct rte_ipv6_hdr **ipv6_hdr)
649 {
650         struct rte_ether_hdr *eth_hdr =
651                 rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
652         struct rte_vlan_hdr *vlan_hdr;
653
654         *ipv4_hdr = NULL;
655         *ipv6_hdr = NULL;
656
657         switch (eth_hdr->ether_type) {
658         case RTE_BE16(RTE_ETHER_TYPE_IPV4):
659                 *ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
660                 break;
661
662         case RTE_BE16(RTE_ETHER_TYPE_IPV6):
663                 *ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
664                 break;
665
666         case RTE_BE16(RTE_ETHER_TYPE_VLAN):
667                 vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
668                 switch (vlan_hdr->eth_proto) {
669                 case RTE_BE16(RTE_ETHER_TYPE_IPV4):
670                         *ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
671                         break;
672                 case RTE_BE16(RTE_ETHER_TYPE_IPV6):
673                         *ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
674                         break;
675                 default:
676                         break;
677                 }
678                 break;
679
680         default:
681                 break;
682         }
683 }
684
685 /* Calculate RSS hash for IPv4/6 */
686 static inline uint32_t
687 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
688 {
689         uint32_t input_len;
690         void *tuple;
691         struct rte_ipv4_tuple ipv4_tuple;
692         struct rte_ipv6_tuple ipv6_tuple;
693         struct rte_ipv4_hdr *ipv4_hdr;
694         struct rte_ipv6_hdr *ipv6_hdr;
695
696         rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
697
698         if (ipv4_hdr) {
699                 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
700                 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
701                 tuple = &ipv4_tuple;
702                 input_len = RTE_THASH_V4_L3_LEN;
703         } else if (ipv6_hdr) {
704                 rte_thash_load_v6_addrs(ipv6_hdr,
705                                         (union rte_thash_tuple *)&ipv6_tuple);
706                 tuple = &ipv6_tuple;
707                 input_len = RTE_THASH_V6_L3_LEN;
708         } else
709                 return 0;
710
711         return rte_softrss_be(tuple, input_len, rss_key_be);
712 }
713
714 static inline int
715 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
716 {
717         return !!rx_adapter->enq_block_count;
718 }
719
720 static inline void
721 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
722 {
723         if (rx_adapter->rx_enq_block_start_ts)
724                 return;
725
726         rx_adapter->enq_block_count++;
727         if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
728                 return;
729
730         rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
731 }
732
733 static inline void
734 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
735                     struct rte_event_eth_rx_adapter_stats *stats)
736 {
737         if (unlikely(!stats->rx_enq_start_ts))
738                 stats->rx_enq_start_ts = rte_get_tsc_cycles();
739
740         if (likely(!rxa_enq_blocked(rx_adapter)))
741                 return;
742
743         rx_adapter->enq_block_count = 0;
744         if (rx_adapter->rx_enq_block_start_ts) {
745                 stats->rx_enq_end_ts = rte_get_tsc_cycles();
746                 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
747                     rx_adapter->rx_enq_block_start_ts;
748                 rx_adapter->rx_enq_block_start_ts = 0;
749         }
750 }
751
752 /* Enqueue buffered events to event device */
753 static inline uint16_t
754 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
755 {
756         struct rte_eth_event_enqueue_buffer *buf =
757             &rx_adapter->event_enqueue_buffer;
758         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
759         uint16_t count = buf->last ? buf->last - buf->head : buf->count;
760
761         if (!count)
762                 return 0;
763
764         uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
765                                         rx_adapter->event_port_id,
766                                         &buf->events[buf->head],
767                                         count);
768         if (n != count)
769                 stats->rx_enq_retry++;
770
771         buf->head += n;
772
773         if (buf->last && n == count) {
774                 uint16_t n1;
775
776                 n1 = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
777                                         rx_adapter->event_port_id,
778                                         &buf->events[0],
779                                         buf->tail);
780
781                 if (n1 != buf->tail)
782                         stats->rx_enq_retry++;
783
784                 buf->last = 0;
785                 buf->head = n1;
786                 buf->last_mask = 0;
787                 n += n1;
788         }
789
790         n ? rxa_enq_block_end_ts(rx_adapter, stats) :
791                 rxa_enq_block_start_ts(rx_adapter);
792
793         buf->count -= n;
794         stats->rx_enq_count += n;
795
796         return n;
797 }
798
799 static inline void
800 rxa_init_vector(struct rte_event_eth_rx_adapter *rx_adapter,
801                 struct eth_rx_vector_data *vec)
802 {
803         vec->vector_ev->nb_elem = 0;
804         vec->vector_ev->port = vec->port;
805         vec->vector_ev->queue = vec->queue;
806         vec->vector_ev->attr_valid = true;
807         TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
808 }
809
810 static inline uint16_t
811 rxa_create_event_vector(struct rte_event_eth_rx_adapter *rx_adapter,
812                         struct eth_rx_queue_info *queue_info,
813                         struct rte_eth_event_enqueue_buffer *buf,
814                         struct rte_mbuf **mbufs, uint16_t num)
815 {
816         struct rte_event *ev = &buf->events[buf->count];
817         struct eth_rx_vector_data *vec;
818         uint16_t filled, space, sz;
819
820         filled = 0;
821         vec = &queue_info->vector_data;
822
823         if (vec->vector_ev == NULL) {
824                 if (rte_mempool_get(vec->vector_pool,
825                                     (void **)&vec->vector_ev) < 0) {
826                         rte_pktmbuf_free_bulk(mbufs, num);
827                         return 0;
828                 }
829                 rxa_init_vector(rx_adapter, vec);
830         }
831         while (num) {
832                 if (vec->vector_ev->nb_elem == vec->max_vector_count) {
833                         /* Event ready. */
834                         ev->event = vec->event;
835                         ev->vec = vec->vector_ev;
836                         ev++;
837                         filled++;
838                         vec->vector_ev = NULL;
839                         TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
840                         if (rte_mempool_get(vec->vector_pool,
841                                             (void **)&vec->vector_ev) < 0) {
842                                 rte_pktmbuf_free_bulk(mbufs, num);
843                                 return 0;
844                         }
845                         rxa_init_vector(rx_adapter, vec);
846                 }
847
848                 space = vec->max_vector_count - vec->vector_ev->nb_elem;
849                 sz = num > space ? space : num;
850                 memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
851                        sizeof(void *) * sz);
852                 vec->vector_ev->nb_elem += sz;
853                 num -= sz;
854                 mbufs += sz;
855                 vec->ts = rte_rdtsc();
856         }
857
858         if (vec->vector_ev->nb_elem == vec->max_vector_count) {
859                 ev->event = vec->event;
860                 ev->vec = vec->vector_ev;
861                 ev++;
862                 filled++;
863                 vec->vector_ev = NULL;
864                 TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
865         }
866
867         return filled;
868 }
869
870 static inline void
871 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
872                 uint16_t eth_dev_id,
873                 uint16_t rx_queue_id,
874                 struct rte_mbuf **mbufs,
875                 uint16_t num)
876 {
877         uint32_t i;
878         struct eth_device_info *dev_info =
879                                         &rx_adapter->eth_devices[eth_dev_id];
880         struct eth_rx_queue_info *eth_rx_queue_info =
881                                         &dev_info->rx_queue[rx_queue_id];
882         struct rte_eth_event_enqueue_buffer *buf =
883                                         &rx_adapter->event_enqueue_buffer;
884         uint16_t new_tail = buf->tail;
885         uint64_t event = eth_rx_queue_info->event;
886         uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
887         struct rte_mbuf *m = mbufs[0];
888         uint32_t rss_mask;
889         uint32_t rss;
890         int do_rss;
891         uint16_t nb_cb;
892         uint16_t dropped;
893
894         if (!eth_rx_queue_info->ena_vector) {
895                 /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
896                 rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
897                 do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
898                 for (i = 0; i < num; i++) {
899                         struct rte_event *ev;
900
901                         m = mbufs[i];
902                         ev = &buf->events[new_tail];
903
904                         rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
905                                      : m->hash.rss;
906                         ev->event = event;
907                         ev->flow_id = (rss & ~flow_id_mask) |
908                                       (ev->flow_id & flow_id_mask);
909                         ev->mbuf = m;
910                         new_tail++;
911                 }
912         } else {
913                 num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
914                                               buf, mbufs, num);
915         }
916
917         if (num && dev_info->cb_fn) {
918
919                 dropped = 0;
920                 nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
921                                        buf->last |
922                                        (RTE_DIM(buf->events) & ~buf->last_mask),
923                                        buf->count >= BATCH_SIZE ?
924                                                 buf->count - BATCH_SIZE : 0,
925                                        &buf->events[buf->tail],
926                                        num,
927                                        dev_info->cb_arg,
928                                        &dropped);
929                 if (unlikely(nb_cb > num))
930                         RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
931                                 nb_cb, num);
932                 else
933                         num = nb_cb;
934                 if (dropped)
935                         rx_adapter->stats.rx_dropped += dropped;
936         }
937
938         buf->count += num;
939         buf->tail += num;
940 }
941
942 static inline bool
943 rxa_pkt_buf_available(struct rte_eth_event_enqueue_buffer *buf)
944 {
945         uint32_t nb_req = buf->tail + BATCH_SIZE;
946
947         if (!buf->last) {
948                 if (nb_req <= RTE_DIM(buf->events))
949                         return true;
950
951                 if (buf->head >= BATCH_SIZE) {
952                         buf->last_mask = ~0;
953                         buf->last = buf->tail;
954                         buf->tail = 0;
955                         return true;
956                 }
957         }
958
959         return nb_req <= buf->head;
960 }
961
962 /* Enqueue packets from  <port, q>  to event buffer */
963 static inline uint32_t
964 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
965         uint16_t port_id,
966         uint16_t queue_id,
967         uint32_t rx_count,
968         uint32_t max_rx,
969         int *rxq_empty)
970 {
971         struct rte_mbuf *mbufs[BATCH_SIZE];
972         struct rte_eth_event_enqueue_buffer *buf =
973                                         &rx_adapter->event_enqueue_buffer;
974         struct rte_event_eth_rx_adapter_stats *stats =
975                                         &rx_adapter->stats;
976         uint16_t n;
977         uint32_t nb_rx = 0;
978
979         if (rxq_empty)
980                 *rxq_empty = 0;
981         /* Don't do a batch dequeue from the rx queue if there isn't
982          * enough space in the enqueue buffer.
983          */
984         while (rxa_pkt_buf_available(buf)) {
985                 if (buf->count >= BATCH_SIZE)
986                         rxa_flush_event_buffer(rx_adapter);
987
988                 stats->rx_poll_count++;
989                 n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
990                 if (unlikely(!n)) {
991                         if (rxq_empty)
992                                 *rxq_empty = 1;
993                         break;
994                 }
995                 rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
996                 nb_rx += n;
997                 if (rx_count + nb_rx > max_rx)
998                         break;
999         }
1000
1001         if (buf->count > 0)
1002                 rxa_flush_event_buffer(rx_adapter);
1003
1004         return nb_rx;
1005 }
1006
1007 static inline void
1008 rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
1009                 void *data)
1010 {
1011         uint16_t port_id;
1012         uint16_t queue;
1013         int err;
1014         union queue_data qd;
1015         struct eth_device_info *dev_info;
1016         struct eth_rx_queue_info *queue_info;
1017         int *intr_enabled;
1018
1019         qd.ptr = data;
1020         port_id = qd.port;
1021         queue = qd.queue;
1022
1023         dev_info = &rx_adapter->eth_devices[port_id];
1024         queue_info = &dev_info->rx_queue[queue];
1025         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1026         if (rxa_shared_intr(dev_info, queue))
1027                 intr_enabled = &dev_info->shared_intr_enabled;
1028         else
1029                 intr_enabled = &queue_info->intr_enabled;
1030
1031         if (*intr_enabled) {
1032                 *intr_enabled = 0;
1033                 err = rte_ring_enqueue(rx_adapter->intr_ring, data);
1034                 /* Entry should always be available.
1035                  * The ring size equals the maximum number of interrupt
1036                  * vectors supported (an interrupt vector is shared in
1037                  * case of shared interrupts)
1038                  */
1039                 if (err)
1040                         RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
1041                                 " to ring: %s", strerror(-err));
1042                 else
1043                         rte_eth_dev_rx_intr_disable(port_id, queue);
1044         }
1045         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1046 }
1047
1048 static int
1049 rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
1050                         uint32_t num_intr_vec)
1051 {
1052         if (rx_adapter->num_intr_vec + num_intr_vec >
1053                                 RTE_EVENT_ETH_INTR_RING_SIZE) {
1054                 RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
1055                 " %d needed %d limit %d", rx_adapter->num_intr_vec,
1056                 num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
1057                 return -ENOSPC;
1058         }
1059
1060         return 0;
1061 }
1062
1063 /* Delete entries for (dev, queue) from the interrupt ring */
1064 static void
1065 rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
1066                         struct eth_device_info *dev_info,
1067                         uint16_t rx_queue_id)
1068 {
1069         int i, n;
1070         union queue_data qd;
1071
1072         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1073
1074         n = rte_ring_count(rx_adapter->intr_ring);
1075         for (i = 0; i < n; i++) {
1076                 rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1077                 if (!rxa_shared_intr(dev_info, rx_queue_id)) {
1078                         if (qd.port == dev_info->dev->data->port_id &&
1079                                 qd.queue == rx_queue_id)
1080                                 continue;
1081                 } else {
1082                         if (qd.port == dev_info->dev->data->port_id)
1083                                 continue;
1084                 }
1085                 rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
1086         }
1087
1088         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1089 }
1090
1091 /* pthread callback handling interrupt mode receive queues
1092  * After receiving an Rx interrupt, it enqueues the port id and queue id of the
1093  * interrupting queue to the adapter's ring buffer for interrupt events.
1094  * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
1095  * the adapter service function.
1096  */
1097 static void *
1098 rxa_intr_thread(void *arg)
1099 {
1100         struct rte_event_eth_rx_adapter *rx_adapter = arg;
1101         struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
1102         int n, i;
1103
1104         while (1) {
1105                 n = rte_epoll_wait(rx_adapter->epd, epoll_events,
1106                                 RTE_EVENT_ETH_INTR_RING_SIZE, -1);
1107                 if (unlikely(n < 0))
1108                         RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
1109                                         n);
1110                 for (i = 0; i < n; i++) {
1111                         rxa_intr_ring_enqueue(rx_adapter,
1112                                         epoll_events[i].epdata.data);
1113                 }
1114         }
1115
1116         return NULL;
1117 }
1118
1119 /* Dequeue <port, q> from interrupt ring and enqueue received
1120  * mbufs to eventdev
1121  */
1122 static inline uint32_t
1123 rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
1124 {
1125         uint32_t n;
1126         uint32_t nb_rx = 0;
1127         int rxq_empty;
1128         struct rte_eth_event_enqueue_buffer *buf;
1129         rte_spinlock_t *ring_lock;
1130         uint8_t max_done = 0;
1131
1132         if (rx_adapter->num_rx_intr == 0)
1133                 return 0;
1134
1135         if (rte_ring_count(rx_adapter->intr_ring) == 0
1136                 && !rx_adapter->qd_valid)
1137                 return 0;
1138
1139         buf = &rx_adapter->event_enqueue_buffer;
1140         ring_lock = &rx_adapter->intr_ring_lock;
1141
1142         if (buf->count >= BATCH_SIZE)
1143                 rxa_flush_event_buffer(rx_adapter);
1144
1145         while (rxa_pkt_buf_available(buf)) {
1146                 struct eth_device_info *dev_info;
1147                 uint16_t port;
1148                 uint16_t queue;
1149                 union queue_data qd  = rx_adapter->qd;
1150                 int err;
1151
1152                 if (!rx_adapter->qd_valid) {
1153                         struct eth_rx_queue_info *queue_info;
1154
1155                         rte_spinlock_lock(ring_lock);
1156                         err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1157                         if (err) {
1158                                 rte_spinlock_unlock(ring_lock);
1159                                 break;
1160                         }
1161
1162                         port = qd.port;
1163                         queue = qd.queue;
1164                         rx_adapter->qd = qd;
1165                         rx_adapter->qd_valid = 1;
1166                         dev_info = &rx_adapter->eth_devices[port];
1167                         if (rxa_shared_intr(dev_info, queue))
1168                                 dev_info->shared_intr_enabled = 1;
1169                         else {
1170                                 queue_info = &dev_info->rx_queue[queue];
1171                                 queue_info->intr_enabled = 1;
1172                         }
1173                         rte_eth_dev_rx_intr_enable(port, queue);
1174                         rte_spinlock_unlock(ring_lock);
1175                 } else {
1176                         port = qd.port;
1177                         queue = qd.queue;
1178
1179                         dev_info = &rx_adapter->eth_devices[port];
1180                 }
1181
1182                 if (rxa_shared_intr(dev_info, queue)) {
1183                         uint16_t i;
1184                         uint16_t nb_queues;
1185
1186                         nb_queues = dev_info->dev->data->nb_rx_queues;
1187                         n = 0;
1188                         for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1189                                 uint8_t enq_buffer_full;
1190
1191                                 if (!rxa_intr_queue(dev_info, i))
1192                                         continue;
1193                                 n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1194                                         rx_adapter->max_nb_rx,
1195                                         &rxq_empty);
1196                                 nb_rx += n;
1197
1198                                 enq_buffer_full = !rxq_empty && n == 0;
1199                                 max_done = nb_rx > rx_adapter->max_nb_rx;
1200
1201                                 if (enq_buffer_full || max_done) {
1202                                         dev_info->next_q_idx = i;
1203                                         goto done;
1204                                 }
1205                         }
1206
1207                         rx_adapter->qd_valid = 0;
1208
1209                         /* Reinitialize for next interrupt */
1210                         dev_info->next_q_idx = dev_info->multi_intr_cap ?
1211                                                 RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1212                                                 0;
1213                 } else {
1214                         n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1215                                 rx_adapter->max_nb_rx,
1216                                 &rxq_empty);
1217                         rx_adapter->qd_valid = !rxq_empty;
1218                         nb_rx += n;
1219                         if (nb_rx > rx_adapter->max_nb_rx)
1220                                 break;
1221                 }
1222         }
1223
1224 done:
1225         rx_adapter->stats.rx_intr_packets += nb_rx;
1226         return nb_rx;
1227 }
1228
1229 /*
1230  * Polls receive queues added to the event adapter and enqueues received
1231  * packets to the event device.
1232  *
1233  * The receive code enqueues initially to a temporary buffer, the
1234  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1235  *
1236  * If there isn't space available in the temporary buffer, packets from the
1237  * Rx queue aren't dequeued from the eth device, this back pressures the
1238  * eth device, in virtual device environments this back pressure is relayed to
1239  * the hypervisor's switching layer where adjustments can be made to deal with
1240  * it.
1241  */
1242 static inline uint32_t
1243 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
1244 {
1245         uint32_t num_queue;
1246         uint32_t nb_rx = 0;
1247         struct rte_eth_event_enqueue_buffer *buf;
1248         uint32_t wrr_pos;
1249         uint32_t max_nb_rx;
1250
1251         wrr_pos = rx_adapter->wrr_pos;
1252         max_nb_rx = rx_adapter->max_nb_rx;
1253         buf = &rx_adapter->event_enqueue_buffer;
1254
1255         /* Iterate through a WRR sequence */
1256         for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1257                 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1258                 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1259                 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1260
1261                 /* Don't do a batch dequeue from the rx queue if there isn't
1262                  * enough space in the enqueue buffer.
1263                  */
1264                 if (buf->count >= BATCH_SIZE)
1265                         rxa_flush_event_buffer(rx_adapter);
1266                 if (!rxa_pkt_buf_available(buf)) {
1267                         rx_adapter->wrr_pos = wrr_pos;
1268                         return nb_rx;
1269                 }
1270
1271                 nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1272                                 NULL);
1273                 if (nb_rx > max_nb_rx) {
1274                         rx_adapter->wrr_pos =
1275                                     (wrr_pos + 1) % rx_adapter->wrr_len;
1276                         break;
1277                 }
1278
1279                 if (++wrr_pos == rx_adapter->wrr_len)
1280                         wrr_pos = 0;
1281         }
1282         return nb_rx;
1283 }
1284
1285 static void
1286 rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
1287 {
1288         struct rte_event_eth_rx_adapter *rx_adapter = arg;
1289         struct rte_eth_event_enqueue_buffer *buf =
1290                 &rx_adapter->event_enqueue_buffer;
1291         struct rte_event *ev;
1292
1293         if (buf->count)
1294                 rxa_flush_event_buffer(rx_adapter);
1295
1296         if (vec->vector_ev->nb_elem == 0)
1297                 return;
1298         ev = &buf->events[buf->count];
1299
1300         /* Event ready. */
1301         ev->event = vec->event;
1302         ev->vec = vec->vector_ev;
1303         buf->count++;
1304
1305         vec->vector_ev = NULL;
1306         vec->ts = 0;
1307 }
1308
1309 static int
1310 rxa_service_func(void *args)
1311 {
1312         struct rte_event_eth_rx_adapter *rx_adapter = args;
1313         struct rte_event_eth_rx_adapter_stats *stats;
1314
1315         if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1316                 return 0;
1317         if (!rx_adapter->rxa_started) {
1318                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1319                 return 0;
1320         }
1321
1322         if (rx_adapter->ena_vector) {
1323                 if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
1324                     rx_adapter->vector_tmo_ticks) {
1325                         struct eth_rx_vector_data *vec;
1326
1327                         TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1328                                 uint64_t elapsed_time = rte_rdtsc() - vec->ts;
1329
1330                                 if (elapsed_time >= vec->vector_timeout_ticks) {
1331                                         rxa_vector_expire(vec, rx_adapter);
1332                                         TAILQ_REMOVE(&rx_adapter->vector_list,
1333                                                      vec, next);
1334                                 }
1335                         }
1336                         rx_adapter->prev_expiry_ts = rte_rdtsc();
1337                 }
1338         }
1339
1340         stats = &rx_adapter->stats;
1341         stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
1342         stats->rx_packets += rxa_poll(rx_adapter);
1343         rte_spinlock_unlock(&rx_adapter->rx_lock);
1344         return 0;
1345 }
1346
1347 static int
1348 rte_event_eth_rx_adapter_init(void)
1349 {
1350         const char *name = "rte_event_eth_rx_adapter_array";
1351         const struct rte_memzone *mz;
1352         unsigned int sz;
1353
1354         sz = sizeof(*event_eth_rx_adapter) *
1355             RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1356         sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1357
1358         mz = rte_memzone_lookup(name);
1359         if (mz == NULL) {
1360                 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1361                                                  RTE_CACHE_LINE_SIZE);
1362                 if (mz == NULL) {
1363                         RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1364                                         PRId32, rte_errno);
1365                         return -rte_errno;
1366                 }
1367         }
1368
1369         event_eth_rx_adapter = mz->addr;
1370         return 0;
1371 }
1372
1373 static inline struct rte_event_eth_rx_adapter *
1374 rxa_id_to_adapter(uint8_t id)
1375 {
1376         return event_eth_rx_adapter ?
1377                 event_eth_rx_adapter[id] : NULL;
1378 }
1379
1380 static int
1381 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1382                 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1383 {
1384         int ret;
1385         struct rte_eventdev *dev;
1386         struct rte_event_dev_config dev_conf;
1387         int started;
1388         uint8_t port_id;
1389         struct rte_event_port_conf *port_conf = arg;
1390         struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1391
1392         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1393         dev_conf = dev->data->dev_conf;
1394
1395         started = dev->data->dev_started;
1396         if (started)
1397                 rte_event_dev_stop(dev_id);
1398         port_id = dev_conf.nb_event_ports;
1399         dev_conf.nb_event_ports += 1;
1400         ret = rte_event_dev_configure(dev_id, &dev_conf);
1401         if (ret) {
1402                 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1403                                                 dev_id);
1404                 if (started) {
1405                         if (rte_event_dev_start(dev_id))
1406                                 return -EIO;
1407                 }
1408                 return ret;
1409         }
1410
1411         ret = rte_event_port_setup(dev_id, port_id, port_conf);
1412         if (ret) {
1413                 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1414                                         port_id);
1415                 return ret;
1416         }
1417
1418         conf->event_port_id = port_id;
1419         conf->max_nb_rx = 128;
1420         if (started)
1421                 ret = rte_event_dev_start(dev_id);
1422         rx_adapter->default_cb_arg = 1;
1423         return ret;
1424 }
1425
1426 static int
1427 rxa_epoll_create1(void)
1428 {
1429 #if defined(LINUX)
1430         int fd;
1431         fd = epoll_create1(EPOLL_CLOEXEC);
1432         return fd < 0 ? -errno : fd;
1433 #elif defined(BSD)
1434         return -ENOTSUP;
1435 #endif
1436 }
1437
1438 static int
1439 rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
1440 {
1441         if (rx_adapter->epd != INIT_FD)
1442                 return 0;
1443
1444         rx_adapter->epd = rxa_epoll_create1();
1445         if (rx_adapter->epd < 0) {
1446                 int err = rx_adapter->epd;
1447                 rx_adapter->epd = INIT_FD;
1448                 RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1449                 return err;
1450         }
1451
1452         return 0;
1453 }
1454
1455 static int
1456 rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1457 {
1458         int err;
1459         char thread_name[RTE_MAX_THREAD_NAME_LEN];
1460
1461         if (rx_adapter->intr_ring)
1462                 return 0;
1463
1464         rx_adapter->intr_ring = rte_ring_create("intr_ring",
1465                                         RTE_EVENT_ETH_INTR_RING_SIZE,
1466                                         rte_socket_id(), 0);
1467         if (!rx_adapter->intr_ring)
1468                 return -ENOMEM;
1469
1470         rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1471                                         RTE_EVENT_ETH_INTR_RING_SIZE *
1472                                         sizeof(struct rte_epoll_event),
1473                                         RTE_CACHE_LINE_SIZE,
1474                                         rx_adapter->socket_id);
1475         if (!rx_adapter->epoll_events) {
1476                 err = -ENOMEM;
1477                 goto error;
1478         }
1479
1480         rte_spinlock_init(&rx_adapter->intr_ring_lock);
1481
1482         snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1483                         "rx-intr-thread-%d", rx_adapter->id);
1484
1485         err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1486                                 NULL, rxa_intr_thread, rx_adapter);
1487         if (!err)
1488                 return 0;
1489
1490         RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1491         rte_free(rx_adapter->epoll_events);
1492 error:
1493         rte_ring_free(rx_adapter->intr_ring);
1494         rx_adapter->intr_ring = NULL;
1495         rx_adapter->epoll_events = NULL;
1496         return err;
1497 }
1498
1499 static int
1500 rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1501 {
1502         int err;
1503
1504         err = pthread_cancel(rx_adapter->rx_intr_thread);
1505         if (err)
1506                 RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1507                                 err);
1508
1509         err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1510         if (err)
1511                 RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1512
1513         rte_free(rx_adapter->epoll_events);
1514         rte_ring_free(rx_adapter->intr_ring);
1515         rx_adapter->intr_ring = NULL;
1516         rx_adapter->epoll_events = NULL;
1517         return 0;
1518 }
1519
1520 static int
1521 rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
1522 {
1523         int ret;
1524
1525         if (rx_adapter->num_rx_intr == 0)
1526                 return 0;
1527
1528         ret = rxa_destroy_intr_thread(rx_adapter);
1529         if (ret)
1530                 return ret;
1531
1532         close(rx_adapter->epd);
1533         rx_adapter->epd = INIT_FD;
1534
1535         return ret;
1536 }
1537
1538 static int
1539 rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1540         struct eth_device_info *dev_info,
1541         uint16_t rx_queue_id)
1542 {
1543         int err;
1544         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1545         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1546
1547         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1548         if (err) {
1549                 RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1550                         rx_queue_id);
1551                 return err;
1552         }
1553
1554         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1555                                         rx_adapter->epd,
1556                                         RTE_INTR_EVENT_DEL,
1557                                         0);
1558         if (err)
1559                 RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1560
1561         if (sintr)
1562                 dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1563         else
1564                 dev_info->shared_intr_enabled = 0;
1565         return err;
1566 }
1567
1568 static int
1569 rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1570                 struct eth_device_info *dev_info,
1571                 int rx_queue_id)
1572 {
1573         int err;
1574         int i;
1575         int s;
1576
1577         if (dev_info->nb_rx_intr == 0)
1578                 return 0;
1579
1580         err = 0;
1581         if (rx_queue_id == -1) {
1582                 s = dev_info->nb_shared_intr;
1583                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1584                         int sintr;
1585                         uint16_t q;
1586
1587                         q = dev_info->intr_queue[i];
1588                         sintr = rxa_shared_intr(dev_info, q);
1589                         s -= sintr;
1590
1591                         if (!sintr || s == 0) {
1592
1593                                 err = rxa_disable_intr(rx_adapter, dev_info,
1594                                                 q);
1595                                 if (err)
1596                                         return err;
1597                                 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1598                                                         q);
1599                         }
1600                 }
1601         } else {
1602                 if (!rxa_intr_queue(dev_info, rx_queue_id))
1603                         return 0;
1604                 if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1605                                 dev_info->nb_shared_intr == 1) {
1606                         err = rxa_disable_intr(rx_adapter, dev_info,
1607                                         rx_queue_id);
1608                         if (err)
1609                                 return err;
1610                         rxa_intr_ring_del_entries(rx_adapter, dev_info,
1611                                                 rx_queue_id);
1612                 }
1613
1614                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1615                         if (dev_info->intr_queue[i] == rx_queue_id) {
1616                                 for (; i < dev_info->nb_rx_intr - 1; i++)
1617                                         dev_info->intr_queue[i] =
1618                                                 dev_info->intr_queue[i + 1];
1619                                 break;
1620                         }
1621                 }
1622         }
1623
1624         return err;
1625 }
1626
1627 static int
1628 rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1629         struct eth_device_info *dev_info,
1630         uint16_t rx_queue_id)
1631 {
1632         int err, err1;
1633         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1634         union queue_data qd;
1635         int init_fd;
1636         uint16_t *intr_queue;
1637         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1638
1639         if (rxa_intr_queue(dev_info, rx_queue_id))
1640                 return 0;
1641
1642         intr_queue = dev_info->intr_queue;
1643         if (dev_info->intr_queue == NULL) {
1644                 size_t len =
1645                         dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1646                 dev_info->intr_queue =
1647                         rte_zmalloc_socket(
1648                                 rx_adapter->mem_name,
1649                                 len,
1650                                 0,
1651                                 rx_adapter->socket_id);
1652                 if (dev_info->intr_queue == NULL)
1653                         return -ENOMEM;
1654         }
1655
1656         init_fd = rx_adapter->epd;
1657         err = rxa_init_epd(rx_adapter);
1658         if (err)
1659                 goto err_free_queue;
1660
1661         qd.port = eth_dev_id;
1662         qd.queue = rx_queue_id;
1663
1664         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1665                                         rx_adapter->epd,
1666                                         RTE_INTR_EVENT_ADD,
1667                                         qd.ptr);
1668         if (err) {
1669                 RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1670                         " Rx Queue %u err %d", rx_queue_id, err);
1671                 goto err_del_fd;
1672         }
1673
1674         err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1675         if (err) {
1676                 RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1677                                 " Rx Queue %u err %d", rx_queue_id, err);
1678
1679                 goto err_del_event;
1680         }
1681
1682         err = rxa_create_intr_thread(rx_adapter);
1683         if (!err)  {
1684                 if (sintr)
1685                         dev_info->shared_intr_enabled = 1;
1686                 else
1687                         dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1688                 return 0;
1689         }
1690
1691
1692         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1693         if (err)
1694                 RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1695                                 " Rx Queue %u err %d", rx_queue_id, err);
1696 err_del_event:
1697         err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1698                                         rx_adapter->epd,
1699                                         RTE_INTR_EVENT_DEL,
1700                                         0);
1701         if (err1) {
1702                 RTE_EDEV_LOG_ERR("Could not delete event for"
1703                                 " Rx Queue %u err %d", rx_queue_id, err1);
1704         }
1705 err_del_fd:
1706         if (init_fd == INIT_FD) {
1707                 close(rx_adapter->epd);
1708                 rx_adapter->epd = -1;
1709         }
1710 err_free_queue:
1711         if (intr_queue == NULL)
1712                 rte_free(dev_info->intr_queue);
1713
1714         return err;
1715 }
1716
1717 static int
1718 rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1719         struct eth_device_info *dev_info,
1720         int rx_queue_id)
1721
1722 {
1723         int i, j, err;
1724         int si = -1;
1725         int shared_done = (dev_info->nb_shared_intr > 0);
1726
1727         if (rx_queue_id != -1) {
1728                 if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1729                         return 0;
1730                 return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1731         }
1732
1733         err = 0;
1734         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1735
1736                 if (rxa_shared_intr(dev_info, i) && shared_done)
1737                         continue;
1738
1739                 err = rxa_config_intr(rx_adapter, dev_info, i);
1740
1741                 shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1742                 if (shared_done) {
1743                         si = i;
1744                         dev_info->shared_intr_enabled = 1;
1745                 }
1746                 if (err)
1747                         break;
1748         }
1749
1750         if (err == 0)
1751                 return 0;
1752
1753         shared_done = (dev_info->nb_shared_intr > 0);
1754         for (j = 0; j < i; j++) {
1755                 if (rxa_intr_queue(dev_info, j))
1756                         continue;
1757                 if (rxa_shared_intr(dev_info, j) && si != j)
1758                         continue;
1759                 err = rxa_disable_intr(rx_adapter, dev_info, j);
1760                 if (err)
1761                         break;
1762
1763         }
1764
1765         return err;
1766 }
1767
1768
1769 static int
1770 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
1771 {
1772         int ret;
1773         struct rte_service_spec service;
1774         struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1775
1776         if (rx_adapter->service_inited)
1777                 return 0;
1778
1779         memset(&service, 0, sizeof(service));
1780         snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1781                 "rte_event_eth_rx_adapter_%d", id);
1782         service.socket_id = rx_adapter->socket_id;
1783         service.callback = rxa_service_func;
1784         service.callback_userdata = rx_adapter;
1785         /* Service function handles locking for queue add/del updates */
1786         service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1787         ret = rte_service_component_register(&service, &rx_adapter->service_id);
1788         if (ret) {
1789                 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1790                         service.name, ret);
1791                 return ret;
1792         }
1793
1794         ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1795                 &rx_adapter_conf, rx_adapter->conf_arg);
1796         if (ret) {
1797                 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1798                         ret);
1799                 goto err_done;
1800         }
1801         rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1802         rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1803         rx_adapter->service_inited = 1;
1804         rx_adapter->epd = INIT_FD;
1805         return 0;
1806
1807 err_done:
1808         rte_service_component_unregister(rx_adapter->service_id);
1809         return ret;
1810 }
1811
1812 static void
1813 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1814                 struct eth_device_info *dev_info,
1815                 int32_t rx_queue_id,
1816                 uint8_t add)
1817 {
1818         struct eth_rx_queue_info *queue_info;
1819         int enabled;
1820         uint16_t i;
1821
1822         if (dev_info->rx_queue == NULL)
1823                 return;
1824
1825         if (rx_queue_id == -1) {
1826                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1827                         rxa_update_queue(rx_adapter, dev_info, i, add);
1828         } else {
1829                 queue_info = &dev_info->rx_queue[rx_queue_id];
1830                 enabled = queue_info->queue_enabled;
1831                 if (add) {
1832                         rx_adapter->nb_queues += !enabled;
1833                         dev_info->nb_dev_queues += !enabled;
1834                 } else {
1835                         rx_adapter->nb_queues -= enabled;
1836                         dev_info->nb_dev_queues -= enabled;
1837                 }
1838                 queue_info->queue_enabled = !!add;
1839         }
1840 }
1841
1842 static void
1843 rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t vector_count,
1844                     uint64_t vector_ns, struct rte_mempool *mp, uint32_t qid,
1845                     uint16_t port_id)
1846 {
1847 #define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
1848         struct eth_rx_vector_data *vector_data;
1849         uint32_t flow_id;
1850
1851         vector_data = &queue_info->vector_data;
1852         vector_data->max_vector_count = vector_count;
1853         vector_data->port = port_id;
1854         vector_data->queue = qid;
1855         vector_data->vector_pool = mp;
1856         vector_data->vector_timeout_ticks =
1857                 NSEC2TICK(vector_ns, rte_get_timer_hz());
1858         vector_data->ts = 0;
1859         flow_id = queue_info->event & 0xFFFFF;
1860         flow_id =
1861                 flow_id == 0 ? (qid & 0xFFF) | (port_id & 0xFF) << 12 : flow_id;
1862         vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
1863 }
1864
1865 static void
1866 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
1867         struct eth_device_info *dev_info,
1868         int32_t rx_queue_id)
1869 {
1870         struct eth_rx_vector_data *vec;
1871         int pollq;
1872         int intrq;
1873         int sintrq;
1874
1875
1876         if (rx_adapter->nb_queues == 0)
1877                 return;
1878
1879         if (rx_queue_id == -1) {
1880                 uint16_t nb_rx_queues;
1881                 uint16_t i;
1882
1883                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1884                 for (i = 0; i < nb_rx_queues; i++)
1885                         rxa_sw_del(rx_adapter, dev_info, i);
1886                 return;
1887         }
1888
1889         /* Push all the partial event vectors to event device. */
1890         TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1891                 if (vec->queue != rx_queue_id)
1892                         continue;
1893                 rxa_vector_expire(vec, rx_adapter);
1894                 TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
1895         }
1896
1897         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1898         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1899         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1900         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1901         rx_adapter->num_rx_polled -= pollq;
1902         dev_info->nb_rx_poll -= pollq;
1903         rx_adapter->num_rx_intr -= intrq;
1904         dev_info->nb_rx_intr -= intrq;
1905         dev_info->nb_shared_intr -= intrq && sintrq;
1906 }
1907
1908 static void
1909 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1910         struct eth_device_info *dev_info,
1911         int32_t rx_queue_id,
1912         const struct rte_event_eth_rx_adapter_queue_conf *conf)
1913 {
1914         struct eth_rx_queue_info *queue_info;
1915         const struct rte_event *ev = &conf->ev;
1916         int pollq;
1917         int intrq;
1918         int sintrq;
1919         struct rte_event *qi_ev;
1920
1921         if (rx_queue_id == -1) {
1922                 uint16_t nb_rx_queues;
1923                 uint16_t i;
1924
1925                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1926                 for (i = 0; i < nb_rx_queues; i++)
1927                         rxa_add_queue(rx_adapter, dev_info, i, conf);
1928                 return;
1929         }
1930
1931         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1932         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1933         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1934
1935         queue_info = &dev_info->rx_queue[rx_queue_id];
1936         queue_info->wt = conf->servicing_weight;
1937
1938         qi_ev = (struct rte_event *)&queue_info->event;
1939         qi_ev->event = ev->event;
1940         qi_ev->op = RTE_EVENT_OP_NEW;
1941         qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
1942         qi_ev->sub_event_type = 0;
1943
1944         if (conf->rx_queue_flags &
1945                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
1946                 queue_info->flow_id_mask = ~0;
1947         } else
1948                 qi_ev->flow_id = 0;
1949
1950         if (conf->rx_queue_flags &
1951             RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
1952                 queue_info->ena_vector = 1;
1953                 qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
1954                 rxa_set_vector_data(queue_info, conf->vector_sz,
1955                                     conf->vector_timeout_ns, conf->vector_mp,
1956                                     rx_queue_id, dev_info->dev->data->port_id);
1957                 rx_adapter->ena_vector = 1;
1958                 rx_adapter->vector_tmo_ticks =
1959                         rx_adapter->vector_tmo_ticks ?
1960                                       RTE_MIN(queue_info->vector_data
1961                                                         .vector_timeout_ticks >>
1962                                                 1,
1963                                         rx_adapter->vector_tmo_ticks) :
1964                                 queue_info->vector_data.vector_timeout_ticks >>
1965                                         1;
1966         }
1967
1968         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
1969         if (rxa_polled_queue(dev_info, rx_queue_id)) {
1970                 rx_adapter->num_rx_polled += !pollq;
1971                 dev_info->nb_rx_poll += !pollq;
1972                 rx_adapter->num_rx_intr -= intrq;
1973                 dev_info->nb_rx_intr -= intrq;
1974                 dev_info->nb_shared_intr -= intrq && sintrq;
1975         }
1976
1977         if (rxa_intr_queue(dev_info, rx_queue_id)) {
1978                 rx_adapter->num_rx_polled -= pollq;
1979                 dev_info->nb_rx_poll -= pollq;
1980                 rx_adapter->num_rx_intr += !intrq;
1981                 dev_info->nb_rx_intr += !intrq;
1982                 dev_info->nb_shared_intr += !intrq && sintrq;
1983                 if (dev_info->nb_shared_intr == 1) {
1984                         if (dev_info->multi_intr_cap)
1985                                 dev_info->next_q_idx =
1986                                         RTE_MAX_RXTX_INTR_VEC_ID - 1;
1987                         else
1988                                 dev_info->next_q_idx = 0;
1989                 }
1990         }
1991 }
1992
1993 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
1994                 uint16_t eth_dev_id,
1995                 int rx_queue_id,
1996                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
1997 {
1998         struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
1999         struct rte_event_eth_rx_adapter_queue_conf temp_conf;
2000         int ret;
2001         struct eth_rx_poll_entry *rx_poll;
2002         struct eth_rx_queue_info *rx_queue;
2003         uint32_t *rx_wrr;
2004         uint16_t nb_rx_queues;
2005         uint32_t nb_rx_poll, nb_wrr;
2006         uint32_t nb_rx_intr;
2007         int num_intr_vec;
2008         uint16_t wt;
2009
2010         if (queue_conf->servicing_weight == 0) {
2011                 struct rte_eth_dev_data *data = dev_info->dev->data;
2012
2013                 temp_conf = *queue_conf;
2014                 if (!data->dev_conf.intr_conf.rxq) {
2015                         /* If Rx interrupts are disabled set wt = 1 */
2016                         temp_conf.servicing_weight = 1;
2017                 }
2018                 queue_conf = &temp_conf;
2019         }
2020
2021         nb_rx_queues = dev_info->dev->data->nb_rx_queues;
2022         rx_queue = dev_info->rx_queue;
2023         wt = queue_conf->servicing_weight;
2024
2025         if (dev_info->rx_queue == NULL) {
2026                 dev_info->rx_queue =
2027                     rte_zmalloc_socket(rx_adapter->mem_name,
2028                                        nb_rx_queues *
2029                                        sizeof(struct eth_rx_queue_info), 0,
2030                                        rx_adapter->socket_id);
2031                 if (dev_info->rx_queue == NULL)
2032                         return -ENOMEM;
2033         }
2034         rx_wrr = NULL;
2035         rx_poll = NULL;
2036
2037         rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
2038                         queue_conf->servicing_weight,
2039                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2040
2041         if (dev_info->dev->intr_handle)
2042                 dev_info->multi_intr_cap =
2043                         rte_intr_cap_multiple(dev_info->dev->intr_handle);
2044
2045         ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2046                                 &rx_poll, &rx_wrr);
2047         if (ret)
2048                 goto err_free_rxqueue;
2049
2050         if (wt == 0) {
2051                 num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
2052
2053                 ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
2054                 if (ret)
2055                         goto err_free_rxqueue;
2056
2057                 ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
2058                 if (ret)
2059                         goto err_free_rxqueue;
2060         } else {
2061
2062                 num_intr_vec = 0;
2063                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2064                         num_intr_vec = rxa_nb_intr_vect(dev_info,
2065                                                 rx_queue_id, 0);
2066                         /* interrupt based queues are being converted to
2067                          * poll mode queues, delete the interrupt configuration
2068                          * for those.
2069                          */
2070                         ret = rxa_del_intr_queue(rx_adapter,
2071                                                 dev_info, rx_queue_id);
2072                         if (ret)
2073                                 goto err_free_rxqueue;
2074                 }
2075         }
2076
2077         if (nb_rx_intr == 0) {
2078                 ret = rxa_free_intr_resources(rx_adapter);
2079                 if (ret)
2080                         goto err_free_rxqueue;
2081         }
2082
2083         if (wt == 0) {
2084                 uint16_t i;
2085
2086                 if (rx_queue_id  == -1) {
2087                         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
2088                                 dev_info->intr_queue[i] = i;
2089                 } else {
2090                         if (!rxa_intr_queue(dev_info, rx_queue_id))
2091                                 dev_info->intr_queue[nb_rx_intr - 1] =
2092                                         rx_queue_id;
2093                 }
2094         }
2095
2096
2097
2098         rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
2099         rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2100
2101         rte_free(rx_adapter->eth_rx_poll);
2102         rte_free(rx_adapter->wrr_sched);
2103
2104         rx_adapter->eth_rx_poll = rx_poll;
2105         rx_adapter->wrr_sched = rx_wrr;
2106         rx_adapter->wrr_len = nb_wrr;
2107         rx_adapter->num_intr_vec += num_intr_vec;
2108         return 0;
2109
2110 err_free_rxqueue:
2111         if (rx_queue == NULL) {
2112                 rte_free(dev_info->rx_queue);
2113                 dev_info->rx_queue = NULL;
2114         }
2115
2116         rte_free(rx_poll);
2117         rte_free(rx_wrr);
2118
2119         return 0;
2120 }
2121
2122 static int
2123 rxa_ctrl(uint8_t id, int start)
2124 {
2125         struct rte_event_eth_rx_adapter *rx_adapter;
2126         struct rte_eventdev *dev;
2127         struct eth_device_info *dev_info;
2128         uint32_t i;
2129         int use_service = 0;
2130         int stop = !start;
2131
2132         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2133         rx_adapter = rxa_id_to_adapter(id);
2134         if (rx_adapter == NULL)
2135                 return -EINVAL;
2136
2137         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2138
2139         RTE_ETH_FOREACH_DEV(i) {
2140                 dev_info = &rx_adapter->eth_devices[i];
2141                 /* if start  check for num dev queues */
2142                 if (start && !dev_info->nb_dev_queues)
2143                         continue;
2144                 /* if stop check if dev has been started */
2145                 if (stop && !dev_info->dev_rx_started)
2146                         continue;
2147                 use_service |= !dev_info->internal_event_port;
2148                 dev_info->dev_rx_started = start;
2149                 if (dev_info->internal_event_port == 0)
2150                         continue;
2151                 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
2152                                                 &rte_eth_devices[i]) :
2153                         (*dev->dev_ops->eth_rx_adapter_stop)(dev,
2154                                                 &rte_eth_devices[i]);
2155         }
2156
2157         if (use_service) {
2158                 rte_spinlock_lock(&rx_adapter->rx_lock);
2159                 rx_adapter->rxa_started = start;
2160                 rte_service_runstate_set(rx_adapter->service_id, start);
2161                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2162         }
2163
2164         return 0;
2165 }
2166
2167 int
2168 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
2169                                 rte_event_eth_rx_adapter_conf_cb conf_cb,
2170                                 void *conf_arg)
2171 {
2172         struct rte_event_eth_rx_adapter *rx_adapter;
2173         int ret;
2174         int socket_id;
2175         uint16_t i;
2176         char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
2177         const uint8_t default_rss_key[] = {
2178                 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
2179                 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
2180                 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
2181                 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
2182                 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
2183         };
2184
2185         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2186         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2187         if (conf_cb == NULL)
2188                 return -EINVAL;
2189
2190         if (event_eth_rx_adapter == NULL) {
2191                 ret = rte_event_eth_rx_adapter_init();
2192                 if (ret)
2193                         return ret;
2194         }
2195
2196         rx_adapter = rxa_id_to_adapter(id);
2197         if (rx_adapter != NULL) {
2198                 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
2199                 return -EEXIST;
2200         }
2201
2202         socket_id = rte_event_dev_socket_id(dev_id);
2203         snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
2204                 "rte_event_eth_rx_adapter_%d",
2205                 id);
2206
2207         rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
2208                         RTE_CACHE_LINE_SIZE, socket_id);
2209         if (rx_adapter == NULL) {
2210                 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
2211                 return -ENOMEM;
2212         }
2213
2214         rx_adapter->eventdev_id = dev_id;
2215         rx_adapter->socket_id = socket_id;
2216         rx_adapter->conf_cb = conf_cb;
2217         rx_adapter->conf_arg = conf_arg;
2218         rx_adapter->id = id;
2219         TAILQ_INIT(&rx_adapter->vector_list);
2220         strcpy(rx_adapter->mem_name, mem_name);
2221         rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
2222                                         RTE_MAX_ETHPORTS *
2223                                         sizeof(struct eth_device_info), 0,
2224                                         socket_id);
2225         rte_convert_rss_key((const uint32_t *)default_rss_key,
2226                         (uint32_t *)rx_adapter->rss_key_be,
2227                             RTE_DIM(default_rss_key));
2228
2229         if (rx_adapter->eth_devices == NULL) {
2230                 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
2231                 rte_free(rx_adapter);
2232                 return -ENOMEM;
2233         }
2234         rte_spinlock_init(&rx_adapter->rx_lock);
2235         for (i = 0; i < RTE_MAX_ETHPORTS; i++)
2236                 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
2237
2238         event_eth_rx_adapter[id] = rx_adapter;
2239         if (conf_cb == rxa_default_conf_cb)
2240                 rx_adapter->default_cb_arg = 1;
2241         rte_eventdev_trace_eth_rx_adapter_create(id, dev_id, conf_cb,
2242                 conf_arg);
2243         return 0;
2244 }
2245
2246 int
2247 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2248                 struct rte_event_port_conf *port_config)
2249 {
2250         struct rte_event_port_conf *pc;
2251         int ret;
2252
2253         if (port_config == NULL)
2254                 return -EINVAL;
2255         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2256
2257         pc = rte_malloc(NULL, sizeof(*pc), 0);
2258         if (pc == NULL)
2259                 return -ENOMEM;
2260         *pc = *port_config;
2261         ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2262                                         rxa_default_conf_cb,
2263                                         pc);
2264         if (ret)
2265                 rte_free(pc);
2266         return ret;
2267 }
2268
2269 int
2270 rte_event_eth_rx_adapter_free(uint8_t id)
2271 {
2272         struct rte_event_eth_rx_adapter *rx_adapter;
2273
2274         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2275
2276         rx_adapter = rxa_id_to_adapter(id);
2277         if (rx_adapter == NULL)
2278                 return -EINVAL;
2279
2280         if (rx_adapter->nb_queues) {
2281                 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2282                                 rx_adapter->nb_queues);
2283                 return -EBUSY;
2284         }
2285
2286         if (rx_adapter->default_cb_arg)
2287                 rte_free(rx_adapter->conf_arg);
2288         rte_free(rx_adapter->eth_devices);
2289         rte_free(rx_adapter);
2290         event_eth_rx_adapter[id] = NULL;
2291
2292         rte_eventdev_trace_eth_rx_adapter_free(id);
2293         return 0;
2294 }
2295
2296 int
2297 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2298                 uint16_t eth_dev_id,
2299                 int32_t rx_queue_id,
2300                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2301 {
2302         int ret;
2303         uint32_t cap;
2304         struct rte_event_eth_rx_adapter *rx_adapter;
2305         struct rte_eventdev *dev;
2306         struct eth_device_info *dev_info;
2307         struct rte_event_eth_rx_adapter_vector_limits limits;
2308
2309         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2310         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2311
2312         rx_adapter = rxa_id_to_adapter(id);
2313         if ((rx_adapter == NULL) || (queue_conf == NULL))
2314                 return -EINVAL;
2315
2316         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2317         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2318                                                 eth_dev_id,
2319                                                 &cap);
2320         if (ret) {
2321                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2322                         "eth port %" PRIu16, id, eth_dev_id);
2323                 return ret;
2324         }
2325
2326         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2327                 && (queue_conf->rx_queue_flags &
2328                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2329                 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2330                                 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2331                                 eth_dev_id, id);
2332                 return -EINVAL;
2333         }
2334
2335         if (queue_conf->rx_queue_flags &
2336             RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
2337
2338                 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0) {
2339                         RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
2340                                          " eth port: %" PRIu16
2341                                          " adapter id: %" PRIu8,
2342                                          eth_dev_id, id);
2343                         return -EINVAL;
2344                 }
2345
2346                 ret = rte_event_eth_rx_adapter_vector_limits_get(
2347                         rx_adapter->eventdev_id, eth_dev_id, &limits);
2348                 if (ret < 0) {
2349                         RTE_EDEV_LOG_ERR("Failed to get event device vector limits,"
2350                                          " eth port: %" PRIu16
2351                                          " adapter id: %" PRIu8,
2352                                          eth_dev_id, id);
2353                         return -EINVAL;
2354                 }
2355                 if (queue_conf->vector_sz < limits.min_sz ||
2356                     queue_conf->vector_sz > limits.max_sz ||
2357                     queue_conf->vector_timeout_ns < limits.min_timeout_ns ||
2358                     queue_conf->vector_timeout_ns > limits.max_timeout_ns ||
2359                     queue_conf->vector_mp == NULL) {
2360                         RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2361                                          " eth port: %" PRIu16
2362                                          " adapter id: %" PRIu8,
2363                                          eth_dev_id, id);
2364                         return -EINVAL;
2365                 }
2366                 if (queue_conf->vector_mp->elt_size <
2367                     (sizeof(struct rte_event_vector) +
2368                      (sizeof(uintptr_t) * queue_conf->vector_sz))) {
2369                         RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2370                                          " eth port: %" PRIu16
2371                                          " adapter id: %" PRIu8,
2372                                          eth_dev_id, id);
2373                         return -EINVAL;
2374                 }
2375         }
2376
2377         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2378                 (rx_queue_id != -1)) {
2379                 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2380                         "event queue, eth port: %" PRIu16 " adapter id: %"
2381                         PRIu8, eth_dev_id, id);
2382                 return -EINVAL;
2383         }
2384
2385         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2386                         rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2387                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2388                          (uint16_t)rx_queue_id);
2389                 return -EINVAL;
2390         }
2391
2392         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2393
2394         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2395                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2396                                         -ENOTSUP);
2397                 if (dev_info->rx_queue == NULL) {
2398                         dev_info->rx_queue =
2399                             rte_zmalloc_socket(rx_adapter->mem_name,
2400                                         dev_info->dev->data->nb_rx_queues *
2401                                         sizeof(struct eth_rx_queue_info), 0,
2402                                         rx_adapter->socket_id);
2403                         if (dev_info->rx_queue == NULL)
2404                                 return -ENOMEM;
2405                 }
2406
2407                 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2408                                 &rte_eth_devices[eth_dev_id],
2409                                 rx_queue_id, queue_conf);
2410                 if (ret == 0) {
2411                         dev_info->internal_event_port = 1;
2412                         rxa_update_queue(rx_adapter,
2413                                         &rx_adapter->eth_devices[eth_dev_id],
2414                                         rx_queue_id,
2415                                         1);
2416                 }
2417         } else {
2418                 rte_spinlock_lock(&rx_adapter->rx_lock);
2419                 dev_info->internal_event_port = 0;
2420                 ret = rxa_init_service(rx_adapter, id);
2421                 if (ret == 0) {
2422                         uint32_t service_id = rx_adapter->service_id;
2423                         ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2424                                         queue_conf);
2425                         rte_service_component_runstate_set(service_id,
2426                                 rxa_sw_adapter_queue_count(rx_adapter));
2427                 }
2428                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2429         }
2430
2431         rte_eventdev_trace_eth_rx_adapter_queue_add(id, eth_dev_id,
2432                 rx_queue_id, queue_conf, ret);
2433         if (ret)
2434                 return ret;
2435
2436         return 0;
2437 }
2438
2439 static int
2440 rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
2441 {
2442         limits->max_sz = MAX_VECTOR_SIZE;
2443         limits->min_sz = MIN_VECTOR_SIZE;
2444         limits->max_timeout_ns = MAX_VECTOR_NS;
2445         limits->min_timeout_ns = MIN_VECTOR_NS;
2446
2447         return 0;
2448 }
2449
2450 int
2451 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2452                                 int32_t rx_queue_id)
2453 {
2454         int ret = 0;
2455         struct rte_eventdev *dev;
2456         struct rte_event_eth_rx_adapter *rx_adapter;
2457         struct eth_device_info *dev_info;
2458         uint32_t cap;
2459         uint32_t nb_rx_poll = 0;
2460         uint32_t nb_wrr = 0;
2461         uint32_t nb_rx_intr;
2462         struct eth_rx_poll_entry *rx_poll = NULL;
2463         uint32_t *rx_wrr = NULL;
2464         int num_intr_vec;
2465
2466         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2467         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2468
2469         rx_adapter = rxa_id_to_adapter(id);
2470         if (rx_adapter == NULL)
2471                 return -EINVAL;
2472
2473         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2474         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2475                                                 eth_dev_id,
2476                                                 &cap);
2477         if (ret)
2478                 return ret;
2479
2480         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2481                 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2482                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2483                          (uint16_t)rx_queue_id);
2484                 return -EINVAL;
2485         }
2486
2487         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2488
2489         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2490                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2491                                  -ENOTSUP);
2492                 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2493                                                 &rte_eth_devices[eth_dev_id],
2494                                                 rx_queue_id);
2495                 if (ret == 0) {
2496                         rxa_update_queue(rx_adapter,
2497                                         &rx_adapter->eth_devices[eth_dev_id],
2498                                         rx_queue_id,
2499                                         0);
2500                         if (dev_info->nb_dev_queues == 0) {
2501                                 rte_free(dev_info->rx_queue);
2502                                 dev_info->rx_queue = NULL;
2503                         }
2504                 }
2505         } else {
2506                 rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2507                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2508
2509                 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2510                         &rx_poll, &rx_wrr);
2511                 if (ret)
2512                         return ret;
2513
2514                 rte_spinlock_lock(&rx_adapter->rx_lock);
2515
2516                 num_intr_vec = 0;
2517                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2518
2519                         num_intr_vec = rxa_nb_intr_vect(dev_info,
2520                                                 rx_queue_id, 0);
2521                         ret = rxa_del_intr_queue(rx_adapter, dev_info,
2522                                         rx_queue_id);
2523                         if (ret)
2524                                 goto unlock_ret;
2525                 }
2526
2527                 if (nb_rx_intr == 0) {
2528                         ret = rxa_free_intr_resources(rx_adapter);
2529                         if (ret)
2530                                 goto unlock_ret;
2531                 }
2532
2533                 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2534                 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2535
2536                 rte_free(rx_adapter->eth_rx_poll);
2537                 rte_free(rx_adapter->wrr_sched);
2538
2539                 if (nb_rx_intr == 0) {
2540                         rte_free(dev_info->intr_queue);
2541                         dev_info->intr_queue = NULL;
2542                 }
2543
2544                 rx_adapter->eth_rx_poll = rx_poll;
2545                 rx_adapter->wrr_sched = rx_wrr;
2546                 rx_adapter->wrr_len = nb_wrr;
2547                 rx_adapter->num_intr_vec += num_intr_vec;
2548
2549                 if (dev_info->nb_dev_queues == 0) {
2550                         rte_free(dev_info->rx_queue);
2551                         dev_info->rx_queue = NULL;
2552                 }
2553 unlock_ret:
2554                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2555                 if (ret) {
2556                         rte_free(rx_poll);
2557                         rte_free(rx_wrr);
2558                         return ret;
2559                 }
2560
2561                 rte_service_component_runstate_set(rx_adapter->service_id,
2562                                 rxa_sw_adapter_queue_count(rx_adapter));
2563         }
2564
2565         rte_eventdev_trace_eth_rx_adapter_queue_del(id, eth_dev_id,
2566                 rx_queue_id, ret);
2567         return ret;
2568 }
2569
2570 int
2571 rte_event_eth_rx_adapter_vector_limits_get(
2572         uint8_t dev_id, uint16_t eth_port_id,
2573         struct rte_event_eth_rx_adapter_vector_limits *limits)
2574 {
2575         struct rte_eventdev *dev;
2576         uint32_t cap;
2577         int ret;
2578
2579         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2580         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_port_id, -EINVAL);
2581
2582         if (limits == NULL)
2583                 return -EINVAL;
2584
2585         dev = &rte_eventdevs[dev_id];
2586
2587         ret = rte_event_eth_rx_adapter_caps_get(dev_id, eth_port_id, &cap);
2588         if (ret) {
2589                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2590                                  "eth port %" PRIu16,
2591                                  dev_id, eth_port_id);
2592                 return ret;
2593         }
2594
2595         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2596                 RTE_FUNC_PTR_OR_ERR_RET(
2597                         *dev->dev_ops->eth_rx_adapter_vector_limits_get,
2598                         -ENOTSUP);
2599                 ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
2600                         dev, &rte_eth_devices[eth_port_id], limits);
2601         } else {
2602                 ret = rxa_sw_vector_limits(limits);
2603         }
2604
2605         return ret;
2606 }
2607
2608 int
2609 rte_event_eth_rx_adapter_start(uint8_t id)
2610 {
2611         rte_eventdev_trace_eth_rx_adapter_start(id);
2612         return rxa_ctrl(id, 1);
2613 }
2614
2615 int
2616 rte_event_eth_rx_adapter_stop(uint8_t id)
2617 {
2618         rte_eventdev_trace_eth_rx_adapter_stop(id);
2619         return rxa_ctrl(id, 0);
2620 }
2621
2622 int
2623 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2624                                struct rte_event_eth_rx_adapter_stats *stats)
2625 {
2626         struct rte_event_eth_rx_adapter *rx_adapter;
2627         struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2628         struct rte_event_eth_rx_adapter_stats dev_stats;
2629         struct rte_eventdev *dev;
2630         struct eth_device_info *dev_info;
2631         uint32_t i;
2632         int ret;
2633
2634         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2635
2636         rx_adapter = rxa_id_to_adapter(id);
2637         if (rx_adapter  == NULL || stats == NULL)
2638                 return -EINVAL;
2639
2640         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2641         memset(stats, 0, sizeof(*stats));
2642         RTE_ETH_FOREACH_DEV(i) {
2643                 dev_info = &rx_adapter->eth_devices[i];
2644                 if (dev_info->internal_event_port == 0 ||
2645                         dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2646                         continue;
2647                 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2648                                                 &rte_eth_devices[i],
2649                                                 &dev_stats);
2650                 if (ret)
2651                         continue;
2652                 dev_stats_sum.rx_packets += dev_stats.rx_packets;
2653                 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2654         }
2655
2656         if (rx_adapter->service_inited)
2657                 *stats = rx_adapter->stats;
2658
2659         stats->rx_packets += dev_stats_sum.rx_packets;
2660         stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2661         return 0;
2662 }
2663
2664 int
2665 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
2666 {
2667         struct rte_event_eth_rx_adapter *rx_adapter;
2668         struct rte_eventdev *dev;
2669         struct eth_device_info *dev_info;
2670         uint32_t i;
2671
2672         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2673
2674         rx_adapter = rxa_id_to_adapter(id);
2675         if (rx_adapter == NULL)
2676                 return -EINVAL;
2677
2678         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2679         RTE_ETH_FOREACH_DEV(i) {
2680                 dev_info = &rx_adapter->eth_devices[i];
2681                 if (dev_info->internal_event_port == 0 ||
2682                         dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
2683                         continue;
2684                 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
2685                                                         &rte_eth_devices[i]);
2686         }
2687
2688         memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
2689         return 0;
2690 }
2691
2692 int
2693 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
2694 {
2695         struct rte_event_eth_rx_adapter *rx_adapter;
2696
2697         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2698
2699         rx_adapter = rxa_id_to_adapter(id);
2700         if (rx_adapter == NULL || service_id == NULL)
2701                 return -EINVAL;
2702
2703         if (rx_adapter->service_inited)
2704                 *service_id = rx_adapter->service_id;
2705
2706         return rx_adapter->service_inited ? 0 : -ESRCH;
2707 }
2708
2709 int
2710 rte_event_eth_rx_adapter_cb_register(uint8_t id,
2711                                         uint16_t eth_dev_id,
2712                                         rte_event_eth_rx_adapter_cb_fn cb_fn,
2713                                         void *cb_arg)
2714 {
2715         struct rte_event_eth_rx_adapter *rx_adapter;
2716         struct eth_device_info *dev_info;
2717         uint32_t cap;
2718         int ret;
2719
2720         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2721         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2722
2723         rx_adapter = rxa_id_to_adapter(id);
2724         if (rx_adapter == NULL)
2725                 return -EINVAL;
2726
2727         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2728         if (dev_info->rx_queue == NULL)
2729                 return -EINVAL;
2730
2731         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2732                                                 eth_dev_id,
2733                                                 &cap);
2734         if (ret) {
2735                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2736                         "eth port %" PRIu16, id, eth_dev_id);
2737                 return ret;
2738         }
2739
2740         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2741                 RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
2742                                 PRIu16, eth_dev_id);
2743                 return -EINVAL;
2744         }
2745
2746         rte_spinlock_lock(&rx_adapter->rx_lock);
2747         dev_info->cb_fn = cb_fn;
2748         dev_info->cb_arg = cb_arg;
2749         rte_spinlock_unlock(&rx_adapter->rx_lock);
2750
2751         return 0;
2752 }