6ab34bbd2d6bbc6109fde9fd7d62a9352f17d533
[dpdk.git] / lib / eventdev / rte_event_eth_rx_adapter.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #if defined(LINUX)
6 #include <sys/epoll.h>
7 #endif
8 #include <unistd.h>
9
10 #include <rte_cycles.h>
11 #include <rte_common.h>
12 #include <rte_dev.h>
13 #include <rte_errno.h>
14 #include <ethdev_driver.h>
15 #include <rte_log.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
20 #include <rte_mbuf_dyn.h>
21
22 #include "rte_eventdev.h"
23 #include "eventdev_pmd.h"
24 #include "rte_eventdev_trace.h"
25 #include "rte_event_eth_rx_adapter.h"
26
27 #define BATCH_SIZE              32
28 #define BLOCK_CNT_THRESHOLD     10
29 #define ETH_EVENT_BUFFER_SIZE   (6*BATCH_SIZE)
30 #define MAX_VECTOR_SIZE         1024
31 #define MIN_VECTOR_SIZE         4
32 #define MAX_VECTOR_NS           1E9
33 #define MIN_VECTOR_NS           1E5
34
35 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
36 #define ETH_RX_ADAPTER_MEM_NAME_LEN     32
37
38 #define RSS_KEY_SIZE    40
39 /* value written to intr thread pipe to signal thread exit */
40 #define ETH_BRIDGE_INTR_THREAD_EXIT     1
41 /* Sentinel value to detect initialized file handle */
42 #define INIT_FD         -1
43
44 #define RXA_ADAPTER_ARRAY "rte_event_eth_rx_adapter_array"
45
46 /*
47  * Used to store port and queue ID of interrupting Rx queue
48  */
49 union queue_data {
50         RTE_STD_C11
51         void *ptr;
52         struct {
53                 uint16_t port;
54                 uint16_t queue;
55         };
56 };
57
58 /*
59  * There is an instance of this struct per polled Rx queue added to the
60  * adapter
61  */
62 struct eth_rx_poll_entry {
63         /* Eth port to poll */
64         uint16_t eth_dev_id;
65         /* Eth rx queue to poll */
66         uint16_t eth_rx_qid;
67 };
68
69 struct eth_rx_vector_data {
70         TAILQ_ENTRY(eth_rx_vector_data) next;
71         uint16_t port;
72         uint16_t queue;
73         uint16_t max_vector_count;
74         uint64_t event;
75         uint64_t ts;
76         uint64_t vector_timeout_ticks;
77         struct rte_mempool *vector_pool;
78         struct rte_event_vector *vector_ev;
79 } __rte_cache_aligned;
80
81 TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
82
83 /* Instance per adapter */
84 struct rte_eth_event_enqueue_buffer {
85         /* Count of events in this buffer */
86         uint16_t count;
87         /* Array of events in this buffer */
88         struct rte_event *events;
89         /* size of event buffer */
90         uint16_t events_size;
91         /* Event enqueue happens from head */
92         uint16_t head;
93         /* New packets from rte_eth_rx_burst is enqued from tail */
94         uint16_t tail;
95         /* last element in the buffer before rollover */
96         uint16_t last;
97         uint16_t last_mask;
98 };
99
100 struct rte_event_eth_rx_adapter {
101         /* RSS key */
102         uint8_t rss_key_be[RSS_KEY_SIZE];
103         /* Event device identifier */
104         uint8_t eventdev_id;
105         /* Event port identifier */
106         uint8_t event_port_id;
107         /* Flag indicating per rxq event buffer */
108         bool use_queue_event_buf;
109         /* Per ethernet device structure */
110         struct eth_device_info *eth_devices;
111         /* Lock to serialize config updates with service function */
112         rte_spinlock_t rx_lock;
113         /* Max mbufs processed in any service function invocation */
114         uint32_t max_nb_rx;
115         /* Receive queues that need to be polled */
116         struct eth_rx_poll_entry *eth_rx_poll;
117         /* Size of the eth_rx_poll array */
118         uint16_t num_rx_polled;
119         /* Weighted round robin schedule */
120         uint32_t *wrr_sched;
121         /* wrr_sched[] size */
122         uint32_t wrr_len;
123         /* Next entry in wrr[] to begin polling */
124         uint32_t wrr_pos;
125         /* Event burst buffer */
126         struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
127         /* Vector enable flag */
128         uint8_t ena_vector;
129         /* Timestamp of previous vector expiry list traversal */
130         uint64_t prev_expiry_ts;
131         /* Minimum ticks to wait before traversing expiry list */
132         uint64_t vector_tmo_ticks;
133         /* vector list */
134         struct eth_rx_vector_data_list vector_list;
135         /* Per adapter stats */
136         struct rte_event_eth_rx_adapter_stats stats;
137         /* Block count, counts up to BLOCK_CNT_THRESHOLD */
138         uint16_t enq_block_count;
139         /* Block start ts */
140         uint64_t rx_enq_block_start_ts;
141         /* epoll fd used to wait for Rx interrupts */
142         int epd;
143         /* Num of interrupt driven interrupt queues */
144         uint32_t num_rx_intr;
145         /* Used to send <dev id, queue id> of interrupting Rx queues from
146          * the interrupt thread to the Rx thread
147          */
148         struct rte_ring *intr_ring;
149         /* Rx Queue data (dev id, queue id) for the last non-empty
150          * queue polled
151          */
152         union queue_data qd;
153         /* queue_data is valid */
154         int qd_valid;
155         /* Interrupt ring lock, synchronizes Rx thread
156          * and interrupt thread
157          */
158         rte_spinlock_t intr_ring_lock;
159         /* event array passed to rte_poll_wait */
160         struct rte_epoll_event *epoll_events;
161         /* Count of interrupt vectors in use */
162         uint32_t num_intr_vec;
163         /* Thread blocked on Rx interrupts */
164         pthread_t rx_intr_thread;
165         /* Configuration callback for rte_service configuration */
166         rte_event_eth_rx_adapter_conf_cb conf_cb;
167         /* Configuration callback argument */
168         void *conf_arg;
169         /* Set if  default_cb is being used */
170         int default_cb_arg;
171         /* Service initialization state */
172         uint8_t service_inited;
173         /* Total count of Rx queues in adapter */
174         uint32_t nb_queues;
175         /* Memory allocation name */
176         char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
177         /* Socket identifier cached from eventdev */
178         int socket_id;
179         /* Per adapter EAL service */
180         uint32_t service_id;
181         /* Adapter started flag */
182         uint8_t rxa_started;
183         /* Adapter ID */
184         uint8_t id;
185 } __rte_cache_aligned;
186
187 /* Per eth device */
188 struct eth_device_info {
189         struct rte_eth_dev *dev;
190         struct eth_rx_queue_info *rx_queue;
191         /* Rx callback */
192         rte_event_eth_rx_adapter_cb_fn cb_fn;
193         /* Rx callback argument */
194         void *cb_arg;
195         /* Set if ethdev->eventdev packet transfer uses a
196          * hardware mechanism
197          */
198         uint8_t internal_event_port;
199         /* Set if the adapter is processing rx queues for
200          * this eth device and packet processing has been
201          * started, allows for the code to know if the PMD
202          * rx_adapter_stop callback needs to be invoked
203          */
204         uint8_t dev_rx_started;
205         /* Number of queues added for this device */
206         uint16_t nb_dev_queues;
207         /* Number of poll based queues
208          * If nb_rx_poll > 0, the start callback will
209          * be invoked if not already invoked
210          */
211         uint16_t nb_rx_poll;
212         /* Number of interrupt based queues
213          * If nb_rx_intr > 0, the start callback will
214          * be invoked if not already invoked.
215          */
216         uint16_t nb_rx_intr;
217         /* Number of queues that use the shared interrupt */
218         uint16_t nb_shared_intr;
219         /* sum(wrr(q)) for all queues within the device
220          * useful when deleting all device queues
221          */
222         uint32_t wrr_len;
223         /* Intr based queue index to start polling from, this is used
224          * if the number of shared interrupts is non-zero
225          */
226         uint16_t next_q_idx;
227         /* Intr based queue indices */
228         uint16_t *intr_queue;
229         /* device generates per Rx queue interrupt for queue index
230          * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
231          */
232         int multi_intr_cap;
233         /* shared interrupt enabled */
234         int shared_intr_enabled;
235 };
236
237 /* Per Rx queue */
238 struct eth_rx_queue_info {
239         int queue_enabled;      /* True if added */
240         int intr_enabled;
241         uint8_t ena_vector;
242         uint16_t wt;            /* Polling weight */
243         uint32_t flow_id_mask;  /* Set to ~0 if app provides flow id else 0 */
244         uint64_t event;
245         struct eth_rx_vector_data vector_data;
246         struct rte_eth_event_enqueue_buffer *event_buf;
247 };
248
249 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
250
251 /* Enable dynamic timestamp field in mbuf */
252 static uint64_t event_eth_rx_timestamp_dynflag;
253 static int event_eth_rx_timestamp_dynfield_offset = -1;
254
255 static inline rte_mbuf_timestamp_t *
256 rxa_timestamp_dynfield(struct rte_mbuf *mbuf)
257 {
258         return RTE_MBUF_DYNFIELD(mbuf,
259                 event_eth_rx_timestamp_dynfield_offset, rte_mbuf_timestamp_t *);
260 }
261
262 static inline int
263 rxa_validate_id(uint8_t id)
264 {
265         return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
266 }
267
268 static inline struct rte_eth_event_enqueue_buffer *
269 rxa_event_buf_get(struct rte_event_eth_rx_adapter *rx_adapter,
270                   uint16_t eth_dev_id, uint16_t rx_queue_id)
271 {
272         if (rx_adapter->use_queue_event_buf) {
273                 struct eth_device_info *dev_info =
274                         &rx_adapter->eth_devices[eth_dev_id];
275                 return dev_info->rx_queue[rx_queue_id].event_buf;
276         } else
277                 return &rx_adapter->event_enqueue_buffer;
278 }
279
280 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
281         if (!rxa_validate_id(id)) { \
282                 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
283                 return retval; \
284         } \
285 } while (0)
286
287 static inline int
288 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
289 {
290         return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
291 }
292
293 /* Greatest common divisor */
294 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
295 {
296         uint16_t r = a % b;
297
298         return r ? rxa_gcd_u16(b, r) : b;
299 }
300
301 /* Returns the next queue in the polling sequence
302  *
303  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
304  */
305 static int
306 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
307          unsigned int n, int *cw,
308          struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
309          uint16_t gcd, int prev)
310 {
311         int i = prev;
312         uint16_t w;
313
314         while (1) {
315                 uint16_t q;
316                 uint16_t d;
317
318                 i = (i + 1) % n;
319                 if (i == 0) {
320                         *cw = *cw - gcd;
321                         if (*cw <= 0)
322                                 *cw = max_wt;
323                 }
324
325                 q = eth_rx_poll[i].eth_rx_qid;
326                 d = eth_rx_poll[i].eth_dev_id;
327                 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
328
329                 if ((int)w >= *cw)
330                         return i;
331         }
332 }
333
334 static inline int
335 rxa_shared_intr(struct eth_device_info *dev_info,
336         int rx_queue_id)
337 {
338         int multi_intr_cap;
339
340         if (dev_info->dev->intr_handle == NULL)
341                 return 0;
342
343         multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
344         return !multi_intr_cap ||
345                 rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
346 }
347
348 static inline int
349 rxa_intr_queue(struct eth_device_info *dev_info,
350         int rx_queue_id)
351 {
352         struct eth_rx_queue_info *queue_info;
353
354         queue_info = &dev_info->rx_queue[rx_queue_id];
355         return dev_info->rx_queue &&
356                 !dev_info->internal_event_port &&
357                 queue_info->queue_enabled && queue_info->wt == 0;
358 }
359
360 static inline int
361 rxa_polled_queue(struct eth_device_info *dev_info,
362         int rx_queue_id)
363 {
364         struct eth_rx_queue_info *queue_info;
365
366         queue_info = &dev_info->rx_queue[rx_queue_id];
367         return !dev_info->internal_event_port &&
368                 dev_info->rx_queue &&
369                 queue_info->queue_enabled && queue_info->wt != 0;
370 }
371
372 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
373 static int
374 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
375 {
376         uint16_t i;
377         int n, s;
378         uint16_t nbq;
379
380         nbq = dev_info->dev->data->nb_rx_queues;
381         n = 0; /* non shared count */
382         s = 0; /* shared count */
383
384         if (rx_queue_id == -1) {
385                 for (i = 0; i < nbq; i++) {
386                         if (!rxa_shared_intr(dev_info, i))
387                                 n += add ? !rxa_intr_queue(dev_info, i) :
388                                         rxa_intr_queue(dev_info, i);
389                         else
390                                 s += add ? !rxa_intr_queue(dev_info, i) :
391                                         rxa_intr_queue(dev_info, i);
392                 }
393
394                 if (s > 0) {
395                         if ((add && dev_info->nb_shared_intr == 0) ||
396                                 (!add && dev_info->nb_shared_intr))
397                                 n += 1;
398                 }
399         } else {
400                 if (!rxa_shared_intr(dev_info, rx_queue_id))
401                         n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
402                                 rxa_intr_queue(dev_info, rx_queue_id);
403                 else
404                         n = add ? !dev_info->nb_shared_intr :
405                                 dev_info->nb_shared_intr == 1;
406         }
407
408         return add ? n : -n;
409 }
410
411 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
412  */
413 static void
414 rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
415                         struct eth_device_info *dev_info,
416                         int rx_queue_id,
417                         uint32_t *nb_rx_intr)
418 {
419         uint32_t intr_diff;
420
421         if (rx_queue_id == -1)
422                 intr_diff = dev_info->nb_rx_intr;
423         else
424                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
425
426         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
427 }
428
429 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
430  * interrupt queues could currently be poll mode Rx queues
431  */
432 static void
433 rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
434                         struct eth_device_info *dev_info,
435                         int rx_queue_id,
436                         uint32_t *nb_rx_poll,
437                         uint32_t *nb_rx_intr,
438                         uint32_t *nb_wrr)
439 {
440         uint32_t intr_diff;
441         uint32_t poll_diff;
442         uint32_t wrr_len_diff;
443
444         if (rx_queue_id == -1) {
445                 intr_diff = dev_info->dev->data->nb_rx_queues -
446                                                 dev_info->nb_rx_intr;
447                 poll_diff = dev_info->nb_rx_poll;
448                 wrr_len_diff = dev_info->wrr_len;
449         } else {
450                 intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
451                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
452                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
453                                         0;
454         }
455
456         *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
457         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
458         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
459 }
460
461 /* Calculate size of the eth_rx_poll and wrr_sched arrays
462  * after deleting poll mode rx queues
463  */
464 static void
465 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
466                         struct eth_device_info *dev_info,
467                         int rx_queue_id,
468                         uint32_t *nb_rx_poll,
469                         uint32_t *nb_wrr)
470 {
471         uint32_t poll_diff;
472         uint32_t wrr_len_diff;
473
474         if (rx_queue_id == -1) {
475                 poll_diff = dev_info->nb_rx_poll;
476                 wrr_len_diff = dev_info->wrr_len;
477         } else {
478                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
479                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
480                                         0;
481         }
482
483         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
484         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
485 }
486
487 /* Calculate nb_rx_* after adding poll mode rx queues
488  */
489 static void
490 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
491                         struct eth_device_info *dev_info,
492                         int rx_queue_id,
493                         uint16_t wt,
494                         uint32_t *nb_rx_poll,
495                         uint32_t *nb_rx_intr,
496                         uint32_t *nb_wrr)
497 {
498         uint32_t intr_diff;
499         uint32_t poll_diff;
500         uint32_t wrr_len_diff;
501
502         if (rx_queue_id == -1) {
503                 intr_diff = dev_info->nb_rx_intr;
504                 poll_diff = dev_info->dev->data->nb_rx_queues -
505                                                 dev_info->nb_rx_poll;
506                 wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
507                                 - dev_info->wrr_len;
508         } else {
509                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
510                 poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
511                 wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
512                                 wt - dev_info->rx_queue[rx_queue_id].wt :
513                                 wt;
514         }
515
516         *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
517         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
518         *nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
519 }
520
521 /* Calculate nb_rx_* after adding rx_queue_id */
522 static void
523 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
524                 struct eth_device_info *dev_info,
525                 int rx_queue_id,
526                 uint16_t wt,
527                 uint32_t *nb_rx_poll,
528                 uint32_t *nb_rx_intr,
529                 uint32_t *nb_wrr)
530 {
531         if (wt != 0)
532                 rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
533                                         wt, nb_rx_poll, nb_rx_intr, nb_wrr);
534         else
535                 rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
536                                         nb_rx_poll, nb_rx_intr, nb_wrr);
537 }
538
539 /* Calculate nb_rx_* after deleting rx_queue_id */
540 static void
541 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
542                 struct eth_device_info *dev_info,
543                 int rx_queue_id,
544                 uint32_t *nb_rx_poll,
545                 uint32_t *nb_rx_intr,
546                 uint32_t *nb_wrr)
547 {
548         rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
549                                 nb_wrr);
550         rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
551                                 nb_rx_intr);
552 }
553
554 /*
555  * Allocate the rx_poll array
556  */
557 static struct eth_rx_poll_entry *
558 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
559         uint32_t num_rx_polled)
560 {
561         size_t len;
562
563         len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
564                                                         RTE_CACHE_LINE_SIZE);
565         return  rte_zmalloc_socket(rx_adapter->mem_name,
566                                 len,
567                                 RTE_CACHE_LINE_SIZE,
568                                 rx_adapter->socket_id);
569 }
570
571 /*
572  * Allocate the WRR array
573  */
574 static uint32_t *
575 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
576 {
577         size_t len;
578
579         len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
580                         RTE_CACHE_LINE_SIZE);
581         return  rte_zmalloc_socket(rx_adapter->mem_name,
582                                 len,
583                                 RTE_CACHE_LINE_SIZE,
584                                 rx_adapter->socket_id);
585 }
586
587 static int
588 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
589                 uint32_t nb_poll,
590                 uint32_t nb_wrr,
591                 struct eth_rx_poll_entry **rx_poll,
592                 uint32_t **wrr_sched)
593 {
594
595         if (nb_poll == 0) {
596                 *rx_poll = NULL;
597                 *wrr_sched = NULL;
598                 return 0;
599         }
600
601         *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
602         if (*rx_poll == NULL) {
603                 *wrr_sched = NULL;
604                 return -ENOMEM;
605         }
606
607         *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
608         if (*wrr_sched == NULL) {
609                 rte_free(*rx_poll);
610                 return -ENOMEM;
611         }
612         return 0;
613 }
614
615 /* Precalculate WRR polling sequence for all queues in rx_adapter */
616 static void
617 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
618                 struct eth_rx_poll_entry *rx_poll,
619                 uint32_t *rx_wrr)
620 {
621         uint16_t d;
622         uint16_t q;
623         unsigned int i;
624         int prev = -1;
625         int cw = -1;
626
627         /* Initialize variables for calculation of wrr schedule */
628         uint16_t max_wrr_pos = 0;
629         unsigned int poll_q = 0;
630         uint16_t max_wt = 0;
631         uint16_t gcd = 0;
632
633         if (rx_poll == NULL)
634                 return;
635
636         /* Generate array of all queues to poll, the size of this
637          * array is poll_q
638          */
639         RTE_ETH_FOREACH_DEV(d) {
640                 uint16_t nb_rx_queues;
641                 struct eth_device_info *dev_info =
642                                 &rx_adapter->eth_devices[d];
643                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
644                 if (dev_info->rx_queue == NULL)
645                         continue;
646                 if (dev_info->internal_event_port)
647                         continue;
648                 dev_info->wrr_len = 0;
649                 for (q = 0; q < nb_rx_queues; q++) {
650                         struct eth_rx_queue_info *queue_info =
651                                 &dev_info->rx_queue[q];
652                         uint16_t wt;
653
654                         if (!rxa_polled_queue(dev_info, q))
655                                 continue;
656                         wt = queue_info->wt;
657                         rx_poll[poll_q].eth_dev_id = d;
658                         rx_poll[poll_q].eth_rx_qid = q;
659                         max_wrr_pos += wt;
660                         dev_info->wrr_len += wt;
661                         max_wt = RTE_MAX(max_wt, wt);
662                         gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
663                         poll_q++;
664                 }
665         }
666
667         /* Generate polling sequence based on weights */
668         prev = -1;
669         cw = -1;
670         for (i = 0; i < max_wrr_pos; i++) {
671                 rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
672                                      rx_poll, max_wt, gcd, prev);
673                 prev = rx_wrr[i];
674         }
675 }
676
677 static inline void
678 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
679         struct rte_ipv6_hdr **ipv6_hdr)
680 {
681         struct rte_ether_hdr *eth_hdr =
682                 rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
683         struct rte_vlan_hdr *vlan_hdr;
684
685         *ipv4_hdr = NULL;
686         *ipv6_hdr = NULL;
687
688         switch (eth_hdr->ether_type) {
689         case RTE_BE16(RTE_ETHER_TYPE_IPV4):
690                 *ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
691                 break;
692
693         case RTE_BE16(RTE_ETHER_TYPE_IPV6):
694                 *ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
695                 break;
696
697         case RTE_BE16(RTE_ETHER_TYPE_VLAN):
698                 vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
699                 switch (vlan_hdr->eth_proto) {
700                 case RTE_BE16(RTE_ETHER_TYPE_IPV4):
701                         *ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
702                         break;
703                 case RTE_BE16(RTE_ETHER_TYPE_IPV6):
704                         *ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
705                         break;
706                 default:
707                         break;
708                 }
709                 break;
710
711         default:
712                 break;
713         }
714 }
715
716 /* Calculate RSS hash for IPv4/6 */
717 static inline uint32_t
718 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
719 {
720         uint32_t input_len;
721         void *tuple;
722         struct rte_ipv4_tuple ipv4_tuple;
723         struct rte_ipv6_tuple ipv6_tuple;
724         struct rte_ipv4_hdr *ipv4_hdr;
725         struct rte_ipv6_hdr *ipv6_hdr;
726
727         rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
728
729         if (ipv4_hdr) {
730                 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
731                 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
732                 tuple = &ipv4_tuple;
733                 input_len = RTE_THASH_V4_L3_LEN;
734         } else if (ipv6_hdr) {
735                 rte_thash_load_v6_addrs(ipv6_hdr,
736                                         (union rte_thash_tuple *)&ipv6_tuple);
737                 tuple = &ipv6_tuple;
738                 input_len = RTE_THASH_V6_L3_LEN;
739         } else
740                 return 0;
741
742         return rte_softrss_be(tuple, input_len, rss_key_be);
743 }
744
745 static inline int
746 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
747 {
748         return !!rx_adapter->enq_block_count;
749 }
750
751 static inline void
752 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
753 {
754         if (rx_adapter->rx_enq_block_start_ts)
755                 return;
756
757         rx_adapter->enq_block_count++;
758         if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
759                 return;
760
761         rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
762 }
763
764 static inline void
765 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
766                     struct rte_event_eth_rx_adapter_stats *stats)
767 {
768         if (unlikely(!stats->rx_enq_start_ts))
769                 stats->rx_enq_start_ts = rte_get_tsc_cycles();
770
771         if (likely(!rxa_enq_blocked(rx_adapter)))
772                 return;
773
774         rx_adapter->enq_block_count = 0;
775         if (rx_adapter->rx_enq_block_start_ts) {
776                 stats->rx_enq_end_ts = rte_get_tsc_cycles();
777                 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
778                     rx_adapter->rx_enq_block_start_ts;
779                 rx_adapter->rx_enq_block_start_ts = 0;
780         }
781 }
782
783 /* Enqueue buffered events to event device */
784 static inline uint16_t
785 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter,
786                        struct rte_eth_event_enqueue_buffer *buf)
787 {
788         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
789         uint16_t count = buf->last ? buf->last - buf->head : buf->count;
790
791         if (!count)
792                 return 0;
793
794         uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
795                                         rx_adapter->event_port_id,
796                                         &buf->events[buf->head],
797                                         count);
798         if (n != count)
799                 stats->rx_enq_retry++;
800
801         buf->head += n;
802
803         if (buf->last && n == count) {
804                 uint16_t n1;
805
806                 n1 = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
807                                         rx_adapter->event_port_id,
808                                         &buf->events[0],
809                                         buf->tail);
810
811                 if (n1 != buf->tail)
812                         stats->rx_enq_retry++;
813
814                 buf->last = 0;
815                 buf->head = n1;
816                 buf->last_mask = 0;
817                 n += n1;
818         }
819
820         n ? rxa_enq_block_end_ts(rx_adapter, stats) :
821                 rxa_enq_block_start_ts(rx_adapter);
822
823         buf->count -= n;
824         stats->rx_enq_count += n;
825
826         return n;
827 }
828
829 static inline void
830 rxa_init_vector(struct rte_event_eth_rx_adapter *rx_adapter,
831                 struct eth_rx_vector_data *vec)
832 {
833         vec->vector_ev->nb_elem = 0;
834         vec->vector_ev->port = vec->port;
835         vec->vector_ev->queue = vec->queue;
836         vec->vector_ev->attr_valid = true;
837         TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
838 }
839
840 static inline uint16_t
841 rxa_create_event_vector(struct rte_event_eth_rx_adapter *rx_adapter,
842                         struct eth_rx_queue_info *queue_info,
843                         struct rte_eth_event_enqueue_buffer *buf,
844                         struct rte_mbuf **mbufs, uint16_t num)
845 {
846         struct rte_event *ev = &buf->events[buf->count];
847         struct eth_rx_vector_data *vec;
848         uint16_t filled, space, sz;
849
850         filled = 0;
851         vec = &queue_info->vector_data;
852
853         if (vec->vector_ev == NULL) {
854                 if (rte_mempool_get(vec->vector_pool,
855                                     (void **)&vec->vector_ev) < 0) {
856                         rte_pktmbuf_free_bulk(mbufs, num);
857                         return 0;
858                 }
859                 rxa_init_vector(rx_adapter, vec);
860         }
861         while (num) {
862                 if (vec->vector_ev->nb_elem == vec->max_vector_count) {
863                         /* Event ready. */
864                         ev->event = vec->event;
865                         ev->vec = vec->vector_ev;
866                         ev++;
867                         filled++;
868                         vec->vector_ev = NULL;
869                         TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
870                         if (rte_mempool_get(vec->vector_pool,
871                                             (void **)&vec->vector_ev) < 0) {
872                                 rte_pktmbuf_free_bulk(mbufs, num);
873                                 return 0;
874                         }
875                         rxa_init_vector(rx_adapter, vec);
876                 }
877
878                 space = vec->max_vector_count - vec->vector_ev->nb_elem;
879                 sz = num > space ? space : num;
880                 memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
881                        sizeof(void *) * sz);
882                 vec->vector_ev->nb_elem += sz;
883                 num -= sz;
884                 mbufs += sz;
885                 vec->ts = rte_rdtsc();
886         }
887
888         if (vec->vector_ev->nb_elem == vec->max_vector_count) {
889                 ev->event = vec->event;
890                 ev->vec = vec->vector_ev;
891                 ev++;
892                 filled++;
893                 vec->vector_ev = NULL;
894                 TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
895         }
896
897         return filled;
898 }
899
900 static inline void
901 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
902                 uint16_t eth_dev_id,
903                 uint16_t rx_queue_id,
904                 struct rte_mbuf **mbufs,
905                 uint16_t num,
906                 struct rte_eth_event_enqueue_buffer *buf)
907 {
908         uint32_t i;
909         struct eth_device_info *dev_info =
910                                         &rx_adapter->eth_devices[eth_dev_id];
911         struct eth_rx_queue_info *eth_rx_queue_info =
912                                         &dev_info->rx_queue[rx_queue_id];
913         uint16_t new_tail = buf->tail;
914         uint64_t event = eth_rx_queue_info->event;
915         uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
916         struct rte_mbuf *m = mbufs[0];
917         uint32_t rss_mask;
918         uint32_t rss;
919         int do_rss;
920         uint16_t nb_cb;
921         uint16_t dropped;
922         uint64_t ts, ts_mask;
923
924         if (!eth_rx_queue_info->ena_vector) {
925                 ts = m->ol_flags & event_eth_rx_timestamp_dynflag ?
926                                                 0 : rte_get_tsc_cycles();
927
928                 /* 0xffff ffff ffff ffff if PKT_RX_TIMESTAMP is set,
929                  * otherwise 0
930                  */
931                 ts_mask = (uint64_t)(!(m->ol_flags &
932                                        event_eth_rx_timestamp_dynflag)) - 1ULL;
933
934                 /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
935                 rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
936                 do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
937                 for (i = 0; i < num; i++) {
938                         struct rte_event *ev;
939
940                         m = mbufs[i];
941                         *rxa_timestamp_dynfield(m) = ts |
942                                         (*rxa_timestamp_dynfield(m) & ts_mask);
943
944                         ev = &buf->events[new_tail];
945
946                         rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
947                                      : m->hash.rss;
948                         ev->event = event;
949                         ev->flow_id = (rss & ~flow_id_mask) |
950                                       (ev->flow_id & flow_id_mask);
951                         ev->mbuf = m;
952                         new_tail++;
953                 }
954         } else {
955                 num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
956                                               buf, mbufs, num);
957         }
958
959         if (num && dev_info->cb_fn) {
960
961                 dropped = 0;
962                 nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
963                                        buf->last |
964                                        (buf->events_size & ~buf->last_mask),
965                                        buf->count >= BATCH_SIZE ?
966                                                 buf->count - BATCH_SIZE : 0,
967                                        &buf->events[buf->tail],
968                                        num,
969                                        dev_info->cb_arg,
970                                        &dropped);
971                 if (unlikely(nb_cb > num))
972                         RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
973                                 nb_cb, num);
974                 else
975                         num = nb_cb;
976                 if (dropped)
977                         rx_adapter->stats.rx_dropped += dropped;
978         }
979
980         buf->count += num;
981         buf->tail += num;
982 }
983
984 static inline bool
985 rxa_pkt_buf_available(struct rte_eth_event_enqueue_buffer *buf)
986 {
987         uint32_t nb_req = buf->tail + BATCH_SIZE;
988
989         if (!buf->last) {
990                 if (nb_req <= buf->events_size)
991                         return true;
992
993                 if (buf->head >= BATCH_SIZE) {
994                         buf->last_mask = ~0;
995                         buf->last = buf->tail;
996                         buf->tail = 0;
997                         return true;
998                 }
999         }
1000
1001         return nb_req <= buf->head;
1002 }
1003
1004 /* Enqueue packets from  <port, q>  to event buffer */
1005 static inline uint32_t
1006 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
1007         uint16_t port_id,
1008         uint16_t queue_id,
1009         uint32_t rx_count,
1010         uint32_t max_rx,
1011         int *rxq_empty,
1012         struct rte_eth_event_enqueue_buffer *buf)
1013 {
1014         struct rte_mbuf *mbufs[BATCH_SIZE];
1015         struct rte_event_eth_rx_adapter_stats *stats =
1016                                         &rx_adapter->stats;
1017         uint16_t n;
1018         uint32_t nb_rx = 0;
1019
1020         if (rxq_empty)
1021                 *rxq_empty = 0;
1022         /* Don't do a batch dequeue from the rx queue if there isn't
1023          * enough space in the enqueue buffer.
1024          */
1025         while (rxa_pkt_buf_available(buf)) {
1026                 if (buf->count >= BATCH_SIZE)
1027                         rxa_flush_event_buffer(rx_adapter, buf);
1028
1029                 stats->rx_poll_count++;
1030                 n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
1031                 if (unlikely(!n)) {
1032                         if (rxq_empty)
1033                                 *rxq_empty = 1;
1034                         break;
1035                 }
1036                 rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n, buf);
1037                 nb_rx += n;
1038                 if (rx_count + nb_rx > max_rx)
1039                         break;
1040         }
1041
1042         if (buf->count > 0)
1043                 rxa_flush_event_buffer(rx_adapter, buf);
1044
1045         return nb_rx;
1046 }
1047
1048 static inline void
1049 rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
1050                 void *data)
1051 {
1052         uint16_t port_id;
1053         uint16_t queue;
1054         int err;
1055         union queue_data qd;
1056         struct eth_device_info *dev_info;
1057         struct eth_rx_queue_info *queue_info;
1058         int *intr_enabled;
1059
1060         qd.ptr = data;
1061         port_id = qd.port;
1062         queue = qd.queue;
1063
1064         dev_info = &rx_adapter->eth_devices[port_id];
1065         queue_info = &dev_info->rx_queue[queue];
1066         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1067         if (rxa_shared_intr(dev_info, queue))
1068                 intr_enabled = &dev_info->shared_intr_enabled;
1069         else
1070                 intr_enabled = &queue_info->intr_enabled;
1071
1072         if (*intr_enabled) {
1073                 *intr_enabled = 0;
1074                 err = rte_ring_enqueue(rx_adapter->intr_ring, data);
1075                 /* Entry should always be available.
1076                  * The ring size equals the maximum number of interrupt
1077                  * vectors supported (an interrupt vector is shared in
1078                  * case of shared interrupts)
1079                  */
1080                 if (err)
1081                         RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
1082                                 " to ring: %s", strerror(-err));
1083                 else
1084                         rte_eth_dev_rx_intr_disable(port_id, queue);
1085         }
1086         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1087 }
1088
1089 static int
1090 rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
1091                         uint32_t num_intr_vec)
1092 {
1093         if (rx_adapter->num_intr_vec + num_intr_vec >
1094                                 RTE_EVENT_ETH_INTR_RING_SIZE) {
1095                 RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
1096                 " %d needed %d limit %d", rx_adapter->num_intr_vec,
1097                 num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
1098                 return -ENOSPC;
1099         }
1100
1101         return 0;
1102 }
1103
1104 /* Delete entries for (dev, queue) from the interrupt ring */
1105 static void
1106 rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
1107                         struct eth_device_info *dev_info,
1108                         uint16_t rx_queue_id)
1109 {
1110         int i, n;
1111         union queue_data qd;
1112
1113         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1114
1115         n = rte_ring_count(rx_adapter->intr_ring);
1116         for (i = 0; i < n; i++) {
1117                 rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1118                 if (!rxa_shared_intr(dev_info, rx_queue_id)) {
1119                         if (qd.port == dev_info->dev->data->port_id &&
1120                                 qd.queue == rx_queue_id)
1121                                 continue;
1122                 } else {
1123                         if (qd.port == dev_info->dev->data->port_id)
1124                                 continue;
1125                 }
1126                 rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
1127         }
1128
1129         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1130 }
1131
1132 /* pthread callback handling interrupt mode receive queues
1133  * After receiving an Rx interrupt, it enqueues the port id and queue id of the
1134  * interrupting queue to the adapter's ring buffer for interrupt events.
1135  * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
1136  * the adapter service function.
1137  */
1138 static void *
1139 rxa_intr_thread(void *arg)
1140 {
1141         struct rte_event_eth_rx_adapter *rx_adapter = arg;
1142         struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
1143         int n, i;
1144
1145         while (1) {
1146                 n = rte_epoll_wait(rx_adapter->epd, epoll_events,
1147                                 RTE_EVENT_ETH_INTR_RING_SIZE, -1);
1148                 if (unlikely(n < 0))
1149                         RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
1150                                         n);
1151                 for (i = 0; i < n; i++) {
1152                         rxa_intr_ring_enqueue(rx_adapter,
1153                                         epoll_events[i].epdata.data);
1154                 }
1155         }
1156
1157         return NULL;
1158 }
1159
1160 /* Dequeue <port, q> from interrupt ring and enqueue received
1161  * mbufs to eventdev
1162  */
1163 static inline uint32_t
1164 rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
1165 {
1166         uint32_t n;
1167         uint32_t nb_rx = 0;
1168         int rxq_empty;
1169         struct rte_eth_event_enqueue_buffer *buf;
1170         rte_spinlock_t *ring_lock;
1171         uint8_t max_done = 0;
1172
1173         if (rx_adapter->num_rx_intr == 0)
1174                 return 0;
1175
1176         if (rte_ring_count(rx_adapter->intr_ring) == 0
1177                 && !rx_adapter->qd_valid)
1178                 return 0;
1179
1180         buf = &rx_adapter->event_enqueue_buffer;
1181         ring_lock = &rx_adapter->intr_ring_lock;
1182
1183         if (buf->count >= BATCH_SIZE)
1184                 rxa_flush_event_buffer(rx_adapter, buf);
1185
1186         while (rxa_pkt_buf_available(buf)) {
1187                 struct eth_device_info *dev_info;
1188                 uint16_t port;
1189                 uint16_t queue;
1190                 union queue_data qd  = rx_adapter->qd;
1191                 int err;
1192
1193                 if (!rx_adapter->qd_valid) {
1194                         struct eth_rx_queue_info *queue_info;
1195
1196                         rte_spinlock_lock(ring_lock);
1197                         err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1198                         if (err) {
1199                                 rte_spinlock_unlock(ring_lock);
1200                                 break;
1201                         }
1202
1203                         port = qd.port;
1204                         queue = qd.queue;
1205                         rx_adapter->qd = qd;
1206                         rx_adapter->qd_valid = 1;
1207                         dev_info = &rx_adapter->eth_devices[port];
1208                         if (rxa_shared_intr(dev_info, queue))
1209                                 dev_info->shared_intr_enabled = 1;
1210                         else {
1211                                 queue_info = &dev_info->rx_queue[queue];
1212                                 queue_info->intr_enabled = 1;
1213                         }
1214                         rte_eth_dev_rx_intr_enable(port, queue);
1215                         rte_spinlock_unlock(ring_lock);
1216                 } else {
1217                         port = qd.port;
1218                         queue = qd.queue;
1219
1220                         dev_info = &rx_adapter->eth_devices[port];
1221                 }
1222
1223                 if (rxa_shared_intr(dev_info, queue)) {
1224                         uint16_t i;
1225                         uint16_t nb_queues;
1226
1227                         nb_queues = dev_info->dev->data->nb_rx_queues;
1228                         n = 0;
1229                         for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1230                                 uint8_t enq_buffer_full;
1231
1232                                 if (!rxa_intr_queue(dev_info, i))
1233                                         continue;
1234                                 n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1235                                         rx_adapter->max_nb_rx,
1236                                         &rxq_empty, buf);
1237                                 nb_rx += n;
1238
1239                                 enq_buffer_full = !rxq_empty && n == 0;
1240                                 max_done = nb_rx > rx_adapter->max_nb_rx;
1241
1242                                 if (enq_buffer_full || max_done) {
1243                                         dev_info->next_q_idx = i;
1244                                         goto done;
1245                                 }
1246                         }
1247
1248                         rx_adapter->qd_valid = 0;
1249
1250                         /* Reinitialize for next interrupt */
1251                         dev_info->next_q_idx = dev_info->multi_intr_cap ?
1252                                                 RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1253                                                 0;
1254                 } else {
1255                         n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1256                                 rx_adapter->max_nb_rx,
1257                                 &rxq_empty, buf);
1258                         rx_adapter->qd_valid = !rxq_empty;
1259                         nb_rx += n;
1260                         if (nb_rx > rx_adapter->max_nb_rx)
1261                                 break;
1262                 }
1263         }
1264
1265 done:
1266         rx_adapter->stats.rx_intr_packets += nb_rx;
1267         return nb_rx;
1268 }
1269
1270 /*
1271  * Polls receive queues added to the event adapter and enqueues received
1272  * packets to the event device.
1273  *
1274  * The receive code enqueues initially to a temporary buffer, the
1275  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1276  *
1277  * If there isn't space available in the temporary buffer, packets from the
1278  * Rx queue aren't dequeued from the eth device, this back pressures the
1279  * eth device, in virtual device environments this back pressure is relayed to
1280  * the hypervisor's switching layer where adjustments can be made to deal with
1281  * it.
1282  */
1283 static inline uint32_t
1284 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
1285 {
1286         uint32_t num_queue;
1287         uint32_t nb_rx = 0;
1288         struct rte_eth_event_enqueue_buffer *buf = NULL;
1289         uint32_t wrr_pos;
1290         uint32_t max_nb_rx;
1291
1292         wrr_pos = rx_adapter->wrr_pos;
1293         max_nb_rx = rx_adapter->max_nb_rx;
1294
1295         /* Iterate through a WRR sequence */
1296         for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1297                 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1298                 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1299                 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1300
1301                 buf = rxa_event_buf_get(rx_adapter, d, qid);
1302
1303                 /* Don't do a batch dequeue from the rx queue if there isn't
1304                  * enough space in the enqueue buffer.
1305                  */
1306                 if (buf->count >= BATCH_SIZE)
1307                         rxa_flush_event_buffer(rx_adapter, buf);
1308                 if (!rxa_pkt_buf_available(buf)) {
1309                         if (rx_adapter->use_queue_event_buf)
1310                                 goto poll_next_entry;
1311                         else {
1312                                 rx_adapter->wrr_pos = wrr_pos;
1313                                 return nb_rx;
1314                         }
1315                 }
1316
1317                 nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1318                                 NULL, buf);
1319                 if (nb_rx > max_nb_rx) {
1320                         rx_adapter->wrr_pos =
1321                                     (wrr_pos + 1) % rx_adapter->wrr_len;
1322                         break;
1323                 }
1324
1325 poll_next_entry:
1326                 if (++wrr_pos == rx_adapter->wrr_len)
1327                         wrr_pos = 0;
1328         }
1329         return nb_rx;
1330 }
1331
1332 static void
1333 rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
1334 {
1335         struct rte_event_eth_rx_adapter *rx_adapter = arg;
1336         struct rte_eth_event_enqueue_buffer *buf = NULL;
1337         struct rte_event *ev;
1338
1339         buf = rxa_event_buf_get(rx_adapter, vec->port, vec->queue);
1340
1341         if (buf->count)
1342                 rxa_flush_event_buffer(rx_adapter, buf);
1343
1344         if (vec->vector_ev->nb_elem == 0)
1345                 return;
1346         ev = &buf->events[buf->count];
1347
1348         /* Event ready. */
1349         ev->event = vec->event;
1350         ev->vec = vec->vector_ev;
1351         buf->count++;
1352
1353         vec->vector_ev = NULL;
1354         vec->ts = 0;
1355 }
1356
1357 static int
1358 rxa_service_func(void *args)
1359 {
1360         struct rte_event_eth_rx_adapter *rx_adapter = args;
1361         struct rte_event_eth_rx_adapter_stats *stats;
1362
1363         if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1364                 return 0;
1365         if (!rx_adapter->rxa_started) {
1366                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1367                 return 0;
1368         }
1369
1370         if (rx_adapter->ena_vector) {
1371                 if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
1372                     rx_adapter->vector_tmo_ticks) {
1373                         struct eth_rx_vector_data *vec;
1374
1375                         TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1376                                 uint64_t elapsed_time = rte_rdtsc() - vec->ts;
1377
1378                                 if (elapsed_time >= vec->vector_timeout_ticks) {
1379                                         rxa_vector_expire(vec, rx_adapter);
1380                                         TAILQ_REMOVE(&rx_adapter->vector_list,
1381                                                      vec, next);
1382                                 }
1383                         }
1384                         rx_adapter->prev_expiry_ts = rte_rdtsc();
1385                 }
1386         }
1387
1388         stats = &rx_adapter->stats;
1389         stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
1390         stats->rx_packets += rxa_poll(rx_adapter);
1391         rte_spinlock_unlock(&rx_adapter->rx_lock);
1392         return 0;
1393 }
1394
1395 static int
1396 rte_event_eth_rx_adapter_init(void)
1397 {
1398         const char *name = RXA_ADAPTER_ARRAY;
1399         const struct rte_memzone *mz;
1400         unsigned int sz;
1401
1402         sz = sizeof(*event_eth_rx_adapter) *
1403             RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1404         sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1405
1406         mz = rte_memzone_lookup(name);
1407         if (mz == NULL) {
1408                 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1409                                                  RTE_CACHE_LINE_SIZE);
1410                 if (mz == NULL) {
1411                         RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1412                                         PRId32, rte_errno);
1413                         return -rte_errno;
1414                 }
1415         }
1416
1417         event_eth_rx_adapter = mz->addr;
1418         return 0;
1419 }
1420
1421 static int
1422 rxa_memzone_lookup(void)
1423 {
1424         const struct rte_memzone *mz;
1425
1426         if (event_eth_rx_adapter == NULL) {
1427                 mz = rte_memzone_lookup(RXA_ADAPTER_ARRAY);
1428                 if (mz == NULL)
1429                         return -ENOMEM;
1430                 event_eth_rx_adapter = mz->addr;
1431         }
1432
1433         return 0;
1434 }
1435
1436 static inline struct rte_event_eth_rx_adapter *
1437 rxa_id_to_adapter(uint8_t id)
1438 {
1439         return event_eth_rx_adapter ?
1440                 event_eth_rx_adapter[id] : NULL;
1441 }
1442
1443 static int
1444 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1445                 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1446 {
1447         int ret;
1448         struct rte_eventdev *dev;
1449         struct rte_event_dev_config dev_conf;
1450         int started;
1451         uint8_t port_id;
1452         struct rte_event_port_conf *port_conf = arg;
1453         struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1454
1455         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1456         dev_conf = dev->data->dev_conf;
1457
1458         started = dev->data->dev_started;
1459         if (started)
1460                 rte_event_dev_stop(dev_id);
1461         port_id = dev_conf.nb_event_ports;
1462         dev_conf.nb_event_ports += 1;
1463         ret = rte_event_dev_configure(dev_id, &dev_conf);
1464         if (ret) {
1465                 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1466                                                 dev_id);
1467                 if (started) {
1468                         if (rte_event_dev_start(dev_id))
1469                                 return -EIO;
1470                 }
1471                 return ret;
1472         }
1473
1474         ret = rte_event_port_setup(dev_id, port_id, port_conf);
1475         if (ret) {
1476                 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1477                                         port_id);
1478                 return ret;
1479         }
1480
1481         conf->event_port_id = port_id;
1482         conf->max_nb_rx = 128;
1483         if (started)
1484                 ret = rte_event_dev_start(dev_id);
1485         rx_adapter->default_cb_arg = 1;
1486         return ret;
1487 }
1488
1489 static int
1490 rxa_epoll_create1(void)
1491 {
1492 #if defined(LINUX)
1493         int fd;
1494         fd = epoll_create1(EPOLL_CLOEXEC);
1495         return fd < 0 ? -errno : fd;
1496 #elif defined(BSD)
1497         return -ENOTSUP;
1498 #endif
1499 }
1500
1501 static int
1502 rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
1503 {
1504         if (rx_adapter->epd != INIT_FD)
1505                 return 0;
1506
1507         rx_adapter->epd = rxa_epoll_create1();
1508         if (rx_adapter->epd < 0) {
1509                 int err = rx_adapter->epd;
1510                 rx_adapter->epd = INIT_FD;
1511                 RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1512                 return err;
1513         }
1514
1515         return 0;
1516 }
1517
1518 static int
1519 rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1520 {
1521         int err;
1522         char thread_name[RTE_MAX_THREAD_NAME_LEN];
1523
1524         if (rx_adapter->intr_ring)
1525                 return 0;
1526
1527         rx_adapter->intr_ring = rte_ring_create("intr_ring",
1528                                         RTE_EVENT_ETH_INTR_RING_SIZE,
1529                                         rte_socket_id(), 0);
1530         if (!rx_adapter->intr_ring)
1531                 return -ENOMEM;
1532
1533         rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1534                                         RTE_EVENT_ETH_INTR_RING_SIZE *
1535                                         sizeof(struct rte_epoll_event),
1536                                         RTE_CACHE_LINE_SIZE,
1537                                         rx_adapter->socket_id);
1538         if (!rx_adapter->epoll_events) {
1539                 err = -ENOMEM;
1540                 goto error;
1541         }
1542
1543         rte_spinlock_init(&rx_adapter->intr_ring_lock);
1544
1545         snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1546                         "rx-intr-thread-%d", rx_adapter->id);
1547
1548         err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1549                                 NULL, rxa_intr_thread, rx_adapter);
1550         if (!err)
1551                 return 0;
1552
1553         RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1554         rte_free(rx_adapter->epoll_events);
1555 error:
1556         rte_ring_free(rx_adapter->intr_ring);
1557         rx_adapter->intr_ring = NULL;
1558         rx_adapter->epoll_events = NULL;
1559         return err;
1560 }
1561
1562 static int
1563 rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1564 {
1565         int err;
1566
1567         err = pthread_cancel(rx_adapter->rx_intr_thread);
1568         if (err)
1569                 RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1570                                 err);
1571
1572         err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1573         if (err)
1574                 RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1575
1576         rte_free(rx_adapter->epoll_events);
1577         rte_ring_free(rx_adapter->intr_ring);
1578         rx_adapter->intr_ring = NULL;
1579         rx_adapter->epoll_events = NULL;
1580         return 0;
1581 }
1582
1583 static int
1584 rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
1585 {
1586         int ret;
1587
1588         if (rx_adapter->num_rx_intr == 0)
1589                 return 0;
1590
1591         ret = rxa_destroy_intr_thread(rx_adapter);
1592         if (ret)
1593                 return ret;
1594
1595         close(rx_adapter->epd);
1596         rx_adapter->epd = INIT_FD;
1597
1598         return ret;
1599 }
1600
1601 static int
1602 rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1603         struct eth_device_info *dev_info,
1604         uint16_t rx_queue_id)
1605 {
1606         int err;
1607         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1608         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1609
1610         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1611         if (err) {
1612                 RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1613                         rx_queue_id);
1614                 return err;
1615         }
1616
1617         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1618                                         rx_adapter->epd,
1619                                         RTE_INTR_EVENT_DEL,
1620                                         0);
1621         if (err)
1622                 RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1623
1624         if (sintr)
1625                 dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1626         else
1627                 dev_info->shared_intr_enabled = 0;
1628         return err;
1629 }
1630
1631 static int
1632 rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1633                 struct eth_device_info *dev_info,
1634                 int rx_queue_id)
1635 {
1636         int err;
1637         int i;
1638         int s;
1639
1640         if (dev_info->nb_rx_intr == 0)
1641                 return 0;
1642
1643         err = 0;
1644         if (rx_queue_id == -1) {
1645                 s = dev_info->nb_shared_intr;
1646                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1647                         int sintr;
1648                         uint16_t q;
1649
1650                         q = dev_info->intr_queue[i];
1651                         sintr = rxa_shared_intr(dev_info, q);
1652                         s -= sintr;
1653
1654                         if (!sintr || s == 0) {
1655
1656                                 err = rxa_disable_intr(rx_adapter, dev_info,
1657                                                 q);
1658                                 if (err)
1659                                         return err;
1660                                 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1661                                                         q);
1662                         }
1663                 }
1664         } else {
1665                 if (!rxa_intr_queue(dev_info, rx_queue_id))
1666                         return 0;
1667                 if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1668                                 dev_info->nb_shared_intr == 1) {
1669                         err = rxa_disable_intr(rx_adapter, dev_info,
1670                                         rx_queue_id);
1671                         if (err)
1672                                 return err;
1673                         rxa_intr_ring_del_entries(rx_adapter, dev_info,
1674                                                 rx_queue_id);
1675                 }
1676
1677                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1678                         if (dev_info->intr_queue[i] == rx_queue_id) {
1679                                 for (; i < dev_info->nb_rx_intr - 1; i++)
1680                                         dev_info->intr_queue[i] =
1681                                                 dev_info->intr_queue[i + 1];
1682                                 break;
1683                         }
1684                 }
1685         }
1686
1687         return err;
1688 }
1689
1690 static int
1691 rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1692         struct eth_device_info *dev_info,
1693         uint16_t rx_queue_id)
1694 {
1695         int err, err1;
1696         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1697         union queue_data qd;
1698         int init_fd;
1699         uint16_t *intr_queue;
1700         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1701
1702         if (rxa_intr_queue(dev_info, rx_queue_id))
1703                 return 0;
1704
1705         intr_queue = dev_info->intr_queue;
1706         if (dev_info->intr_queue == NULL) {
1707                 size_t len =
1708                         dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1709                 dev_info->intr_queue =
1710                         rte_zmalloc_socket(
1711                                 rx_adapter->mem_name,
1712                                 len,
1713                                 0,
1714                                 rx_adapter->socket_id);
1715                 if (dev_info->intr_queue == NULL)
1716                         return -ENOMEM;
1717         }
1718
1719         init_fd = rx_adapter->epd;
1720         err = rxa_init_epd(rx_adapter);
1721         if (err)
1722                 goto err_free_queue;
1723
1724         qd.port = eth_dev_id;
1725         qd.queue = rx_queue_id;
1726
1727         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1728                                         rx_adapter->epd,
1729                                         RTE_INTR_EVENT_ADD,
1730                                         qd.ptr);
1731         if (err) {
1732                 RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1733                         " Rx Queue %u err %d", rx_queue_id, err);
1734                 goto err_del_fd;
1735         }
1736
1737         err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1738         if (err) {
1739                 RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1740                                 " Rx Queue %u err %d", rx_queue_id, err);
1741
1742                 goto err_del_event;
1743         }
1744
1745         err = rxa_create_intr_thread(rx_adapter);
1746         if (!err)  {
1747                 if (sintr)
1748                         dev_info->shared_intr_enabled = 1;
1749                 else
1750                         dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1751                 return 0;
1752         }
1753
1754
1755         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1756         if (err)
1757                 RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1758                                 " Rx Queue %u err %d", rx_queue_id, err);
1759 err_del_event:
1760         err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1761                                         rx_adapter->epd,
1762                                         RTE_INTR_EVENT_DEL,
1763                                         0);
1764         if (err1) {
1765                 RTE_EDEV_LOG_ERR("Could not delete event for"
1766                                 " Rx Queue %u err %d", rx_queue_id, err1);
1767         }
1768 err_del_fd:
1769         if (init_fd == INIT_FD) {
1770                 close(rx_adapter->epd);
1771                 rx_adapter->epd = -1;
1772         }
1773 err_free_queue:
1774         if (intr_queue == NULL)
1775                 rte_free(dev_info->intr_queue);
1776
1777         return err;
1778 }
1779
1780 static int
1781 rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1782         struct eth_device_info *dev_info,
1783         int rx_queue_id)
1784
1785 {
1786         int i, j, err;
1787         int si = -1;
1788         int shared_done = (dev_info->nb_shared_intr > 0);
1789
1790         if (rx_queue_id != -1) {
1791                 if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1792                         return 0;
1793                 return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1794         }
1795
1796         err = 0;
1797         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1798
1799                 if (rxa_shared_intr(dev_info, i) && shared_done)
1800                         continue;
1801
1802                 err = rxa_config_intr(rx_adapter, dev_info, i);
1803
1804                 shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1805                 if (shared_done) {
1806                         si = i;
1807                         dev_info->shared_intr_enabled = 1;
1808                 }
1809                 if (err)
1810                         break;
1811         }
1812
1813         if (err == 0)
1814                 return 0;
1815
1816         shared_done = (dev_info->nb_shared_intr > 0);
1817         for (j = 0; j < i; j++) {
1818                 if (rxa_intr_queue(dev_info, j))
1819                         continue;
1820                 if (rxa_shared_intr(dev_info, j) && si != j)
1821                         continue;
1822                 err = rxa_disable_intr(rx_adapter, dev_info, j);
1823                 if (err)
1824                         break;
1825
1826         }
1827
1828         return err;
1829 }
1830
1831
1832 static int
1833 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
1834 {
1835         int ret;
1836         struct rte_service_spec service;
1837         struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1838
1839         if (rx_adapter->service_inited)
1840                 return 0;
1841
1842         memset(&service, 0, sizeof(service));
1843         snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1844                 "rte_event_eth_rx_adapter_%d", id);
1845         service.socket_id = rx_adapter->socket_id;
1846         service.callback = rxa_service_func;
1847         service.callback_userdata = rx_adapter;
1848         /* Service function handles locking for queue add/del updates */
1849         service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1850         ret = rte_service_component_register(&service, &rx_adapter->service_id);
1851         if (ret) {
1852                 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1853                         service.name, ret);
1854                 return ret;
1855         }
1856
1857         ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1858                 &rx_adapter_conf, rx_adapter->conf_arg);
1859         if (ret) {
1860                 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1861                         ret);
1862                 goto err_done;
1863         }
1864         rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1865         rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1866         rx_adapter->service_inited = 1;
1867         rx_adapter->epd = INIT_FD;
1868         return 0;
1869
1870 err_done:
1871         rte_service_component_unregister(rx_adapter->service_id);
1872         return ret;
1873 }
1874
1875 static void
1876 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1877                 struct eth_device_info *dev_info,
1878                 int32_t rx_queue_id,
1879                 uint8_t add)
1880 {
1881         struct eth_rx_queue_info *queue_info;
1882         int enabled;
1883         uint16_t i;
1884
1885         if (dev_info->rx_queue == NULL)
1886                 return;
1887
1888         if (rx_queue_id == -1) {
1889                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1890                         rxa_update_queue(rx_adapter, dev_info, i, add);
1891         } else {
1892                 queue_info = &dev_info->rx_queue[rx_queue_id];
1893                 enabled = queue_info->queue_enabled;
1894                 if (add) {
1895                         rx_adapter->nb_queues += !enabled;
1896                         dev_info->nb_dev_queues += !enabled;
1897                 } else {
1898                         rx_adapter->nb_queues -= enabled;
1899                         dev_info->nb_dev_queues -= enabled;
1900                 }
1901                 queue_info->queue_enabled = !!add;
1902         }
1903 }
1904
1905 static void
1906 rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t vector_count,
1907                     uint64_t vector_ns, struct rte_mempool *mp, uint32_t qid,
1908                     uint16_t port_id)
1909 {
1910 #define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
1911         struct eth_rx_vector_data *vector_data;
1912         uint32_t flow_id;
1913
1914         vector_data = &queue_info->vector_data;
1915         vector_data->max_vector_count = vector_count;
1916         vector_data->port = port_id;
1917         vector_data->queue = qid;
1918         vector_data->vector_pool = mp;
1919         vector_data->vector_timeout_ticks =
1920                 NSEC2TICK(vector_ns, rte_get_timer_hz());
1921         vector_data->ts = 0;
1922         flow_id = queue_info->event & 0xFFFFF;
1923         flow_id =
1924                 flow_id == 0 ? (qid & 0xFFF) | (port_id & 0xFF) << 12 : flow_id;
1925         vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
1926 }
1927
1928 static void
1929 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
1930         struct eth_device_info *dev_info,
1931         int32_t rx_queue_id)
1932 {
1933         struct eth_rx_vector_data *vec;
1934         int pollq;
1935         int intrq;
1936         int sintrq;
1937
1938
1939         if (rx_adapter->nb_queues == 0)
1940                 return;
1941
1942         if (rx_queue_id == -1) {
1943                 uint16_t nb_rx_queues;
1944                 uint16_t i;
1945
1946                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1947                 for (i = 0; i < nb_rx_queues; i++)
1948                         rxa_sw_del(rx_adapter, dev_info, i);
1949                 return;
1950         }
1951
1952         /* Push all the partial event vectors to event device. */
1953         TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1954                 if (vec->queue != rx_queue_id)
1955                         continue;
1956                 rxa_vector_expire(vec, rx_adapter);
1957                 TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
1958         }
1959
1960         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1961         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1962         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1963         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1964         rx_adapter->num_rx_polled -= pollq;
1965         dev_info->nb_rx_poll -= pollq;
1966         rx_adapter->num_rx_intr -= intrq;
1967         dev_info->nb_rx_intr -= intrq;
1968         dev_info->nb_shared_intr -= intrq && sintrq;
1969         if (rx_adapter->use_queue_event_buf) {
1970                 struct rte_eth_event_enqueue_buffer *event_buf =
1971                         dev_info->rx_queue[rx_queue_id].event_buf;
1972                 rte_free(event_buf->events);
1973                 rte_free(event_buf);
1974                 dev_info->rx_queue[rx_queue_id].event_buf = NULL;
1975         }
1976 }
1977
1978 static int
1979 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1980         struct eth_device_info *dev_info,
1981         int32_t rx_queue_id,
1982         const struct rte_event_eth_rx_adapter_queue_conf *conf)
1983 {
1984         struct eth_rx_queue_info *queue_info;
1985         const struct rte_event *ev = &conf->ev;
1986         int pollq;
1987         int intrq;
1988         int sintrq;
1989         struct rte_event *qi_ev;
1990         struct rte_eth_event_enqueue_buffer *new_rx_buf = NULL;
1991         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1992         int ret;
1993
1994         if (rx_queue_id == -1) {
1995                 uint16_t nb_rx_queues;
1996                 uint16_t i;
1997
1998                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1999                 for (i = 0; i < nb_rx_queues; i++) {
2000                         ret = rxa_add_queue(rx_adapter, dev_info, i, conf);
2001                         if (ret)
2002                                 return ret;
2003                 }
2004                 return 0;
2005         }
2006
2007         pollq = rxa_polled_queue(dev_info, rx_queue_id);
2008         intrq = rxa_intr_queue(dev_info, rx_queue_id);
2009         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
2010
2011         queue_info = &dev_info->rx_queue[rx_queue_id];
2012         queue_info->wt = conf->servicing_weight;
2013
2014         qi_ev = (struct rte_event *)&queue_info->event;
2015         qi_ev->event = ev->event;
2016         qi_ev->op = RTE_EVENT_OP_NEW;
2017         qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
2018         qi_ev->sub_event_type = 0;
2019
2020         if (conf->rx_queue_flags &
2021                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
2022                 queue_info->flow_id_mask = ~0;
2023         } else
2024                 qi_ev->flow_id = 0;
2025
2026         if (conf->rx_queue_flags &
2027             RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
2028                 queue_info->ena_vector = 1;
2029                 qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
2030                 rxa_set_vector_data(queue_info, conf->vector_sz,
2031                                     conf->vector_timeout_ns, conf->vector_mp,
2032                                     rx_queue_id, dev_info->dev->data->port_id);
2033                 rx_adapter->ena_vector = 1;
2034                 rx_adapter->vector_tmo_ticks =
2035                         rx_adapter->vector_tmo_ticks ?
2036                                       RTE_MIN(queue_info->vector_data
2037                                                         .vector_timeout_ticks >>
2038                                                 1,
2039                                         rx_adapter->vector_tmo_ticks) :
2040                                 queue_info->vector_data.vector_timeout_ticks >>
2041                                         1;
2042         }
2043
2044         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
2045         if (rxa_polled_queue(dev_info, rx_queue_id)) {
2046                 rx_adapter->num_rx_polled += !pollq;
2047                 dev_info->nb_rx_poll += !pollq;
2048                 rx_adapter->num_rx_intr -= intrq;
2049                 dev_info->nb_rx_intr -= intrq;
2050                 dev_info->nb_shared_intr -= intrq && sintrq;
2051         }
2052
2053         if (rxa_intr_queue(dev_info, rx_queue_id)) {
2054                 rx_adapter->num_rx_polled -= pollq;
2055                 dev_info->nb_rx_poll -= pollq;
2056                 rx_adapter->num_rx_intr += !intrq;
2057                 dev_info->nb_rx_intr += !intrq;
2058                 dev_info->nb_shared_intr += !intrq && sintrq;
2059                 if (dev_info->nb_shared_intr == 1) {
2060                         if (dev_info->multi_intr_cap)
2061                                 dev_info->next_q_idx =
2062                                         RTE_MAX_RXTX_INTR_VEC_ID - 1;
2063                         else
2064                                 dev_info->next_q_idx = 0;
2065                 }
2066         }
2067
2068         if (!rx_adapter->use_queue_event_buf)
2069                 return 0;
2070
2071         new_rx_buf = rte_zmalloc_socket("rx_buffer_meta",
2072                                 sizeof(*new_rx_buf), 0,
2073                                 rte_eth_dev_socket_id(eth_dev_id));
2074         if (new_rx_buf == NULL) {
2075                 RTE_EDEV_LOG_ERR("Failed to allocate event buffer meta for "
2076                                  "dev_id: %d queue_id: %d",
2077                                  eth_dev_id, rx_queue_id);
2078                 return -ENOMEM;
2079         }
2080
2081         new_rx_buf->events_size = RTE_ALIGN(conf->event_buf_size, BATCH_SIZE);
2082         new_rx_buf->events_size += (2 * BATCH_SIZE);
2083         new_rx_buf->events = rte_zmalloc_socket("rx_buffer",
2084                                 sizeof(struct rte_event) *
2085                                 new_rx_buf->events_size, 0,
2086                                 rte_eth_dev_socket_id(eth_dev_id));
2087         if (new_rx_buf->events == NULL) {
2088                 rte_free(new_rx_buf);
2089                 RTE_EDEV_LOG_ERR("Failed to allocate event buffer for "
2090                                  "dev_id: %d queue_id: %d",
2091                                  eth_dev_id, rx_queue_id);
2092                 return -ENOMEM;
2093         }
2094
2095         queue_info->event_buf = new_rx_buf;
2096
2097         return 0;
2098 }
2099
2100 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
2101                 uint16_t eth_dev_id,
2102                 int rx_queue_id,
2103                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2104 {
2105         struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
2106         struct rte_event_eth_rx_adapter_queue_conf temp_conf;
2107         int ret;
2108         struct eth_rx_poll_entry *rx_poll;
2109         struct eth_rx_queue_info *rx_queue;
2110         uint32_t *rx_wrr;
2111         uint16_t nb_rx_queues;
2112         uint32_t nb_rx_poll, nb_wrr;
2113         uint32_t nb_rx_intr;
2114         int num_intr_vec;
2115         uint16_t wt;
2116
2117         if (queue_conf->servicing_weight == 0) {
2118                 struct rte_eth_dev_data *data = dev_info->dev->data;
2119
2120                 temp_conf = *queue_conf;
2121                 if (!data->dev_conf.intr_conf.rxq) {
2122                         /* If Rx interrupts are disabled set wt = 1 */
2123                         temp_conf.servicing_weight = 1;
2124                 }
2125                 queue_conf = &temp_conf;
2126
2127                 if (queue_conf->servicing_weight == 0 &&
2128                     rx_adapter->use_queue_event_buf) {
2129
2130                         RTE_EDEV_LOG_ERR("Use of queue level event buffer "
2131                                          "not supported for interrupt queues "
2132                                          "dev_id: %d queue_id: %d",
2133                                          eth_dev_id, rx_queue_id);
2134                         return -EINVAL;
2135                 }
2136         }
2137
2138         nb_rx_queues = dev_info->dev->data->nb_rx_queues;
2139         rx_queue = dev_info->rx_queue;
2140         wt = queue_conf->servicing_weight;
2141
2142         if (dev_info->rx_queue == NULL) {
2143                 dev_info->rx_queue =
2144                     rte_zmalloc_socket(rx_adapter->mem_name,
2145                                        nb_rx_queues *
2146                                        sizeof(struct eth_rx_queue_info), 0,
2147                                        rx_adapter->socket_id);
2148                 if (dev_info->rx_queue == NULL)
2149                         return -ENOMEM;
2150         }
2151         rx_wrr = NULL;
2152         rx_poll = NULL;
2153
2154         rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
2155                         queue_conf->servicing_weight,
2156                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2157
2158         if (dev_info->dev->intr_handle)
2159                 dev_info->multi_intr_cap =
2160                         rte_intr_cap_multiple(dev_info->dev->intr_handle);
2161
2162         ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2163                                 &rx_poll, &rx_wrr);
2164         if (ret)
2165                 goto err_free_rxqueue;
2166
2167         if (wt == 0) {
2168                 num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
2169
2170                 ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
2171                 if (ret)
2172                         goto err_free_rxqueue;
2173
2174                 ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
2175                 if (ret)
2176                         goto err_free_rxqueue;
2177         } else {
2178
2179                 num_intr_vec = 0;
2180                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2181                         num_intr_vec = rxa_nb_intr_vect(dev_info,
2182                                                 rx_queue_id, 0);
2183                         /* interrupt based queues are being converted to
2184                          * poll mode queues, delete the interrupt configuration
2185                          * for those.
2186                          */
2187                         ret = rxa_del_intr_queue(rx_adapter,
2188                                                 dev_info, rx_queue_id);
2189                         if (ret)
2190                                 goto err_free_rxqueue;
2191                 }
2192         }
2193
2194         if (nb_rx_intr == 0) {
2195                 ret = rxa_free_intr_resources(rx_adapter);
2196                 if (ret)
2197                         goto err_free_rxqueue;
2198         }
2199
2200         if (wt == 0) {
2201                 uint16_t i;
2202
2203                 if (rx_queue_id  == -1) {
2204                         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
2205                                 dev_info->intr_queue[i] = i;
2206                 } else {
2207                         if (!rxa_intr_queue(dev_info, rx_queue_id))
2208                                 dev_info->intr_queue[nb_rx_intr - 1] =
2209                                         rx_queue_id;
2210                 }
2211         }
2212
2213
2214
2215         ret = rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
2216         if (ret)
2217                 goto err_free_rxqueue;
2218         rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2219
2220         rte_free(rx_adapter->eth_rx_poll);
2221         rte_free(rx_adapter->wrr_sched);
2222
2223         rx_adapter->eth_rx_poll = rx_poll;
2224         rx_adapter->wrr_sched = rx_wrr;
2225         rx_adapter->wrr_len = nb_wrr;
2226         rx_adapter->num_intr_vec += num_intr_vec;
2227         return 0;
2228
2229 err_free_rxqueue:
2230         if (rx_queue == NULL) {
2231                 rte_free(dev_info->rx_queue);
2232                 dev_info->rx_queue = NULL;
2233         }
2234
2235         rte_free(rx_poll);
2236         rte_free(rx_wrr);
2237
2238         return ret;
2239 }
2240
2241 static int
2242 rxa_ctrl(uint8_t id, int start)
2243 {
2244         struct rte_event_eth_rx_adapter *rx_adapter;
2245         struct rte_eventdev *dev;
2246         struct eth_device_info *dev_info;
2247         uint32_t i;
2248         int use_service = 0;
2249         int stop = !start;
2250
2251         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2252         rx_adapter = rxa_id_to_adapter(id);
2253         if (rx_adapter == NULL)
2254                 return -EINVAL;
2255
2256         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2257
2258         RTE_ETH_FOREACH_DEV(i) {
2259                 dev_info = &rx_adapter->eth_devices[i];
2260                 /* if start  check for num dev queues */
2261                 if (start && !dev_info->nb_dev_queues)
2262                         continue;
2263                 /* if stop check if dev has been started */
2264                 if (stop && !dev_info->dev_rx_started)
2265                         continue;
2266                 use_service |= !dev_info->internal_event_port;
2267                 dev_info->dev_rx_started = start;
2268                 if (dev_info->internal_event_port == 0)
2269                         continue;
2270                 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
2271                                                 &rte_eth_devices[i]) :
2272                         (*dev->dev_ops->eth_rx_adapter_stop)(dev,
2273                                                 &rte_eth_devices[i]);
2274         }
2275
2276         if (use_service) {
2277                 rte_spinlock_lock(&rx_adapter->rx_lock);
2278                 rx_adapter->rxa_started = start;
2279                 rte_service_runstate_set(rx_adapter->service_id, start);
2280                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2281         }
2282
2283         return 0;
2284 }
2285
2286 static int
2287 rxa_create(uint8_t id, uint8_t dev_id,
2288            struct rte_event_eth_rx_adapter_params *rxa_params,
2289            rte_event_eth_rx_adapter_conf_cb conf_cb,
2290            void *conf_arg)
2291 {
2292         struct rte_event_eth_rx_adapter *rx_adapter;
2293         struct rte_eth_event_enqueue_buffer *buf;
2294         struct rte_event *events;
2295         int ret;
2296         int socket_id;
2297         uint16_t i;
2298         char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
2299         const uint8_t default_rss_key[] = {
2300                 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
2301                 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
2302                 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
2303                 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
2304                 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
2305         };
2306
2307         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2308         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2309
2310         if (conf_cb == NULL)
2311                 return -EINVAL;
2312
2313         if (event_eth_rx_adapter == NULL) {
2314                 ret = rte_event_eth_rx_adapter_init();
2315                 if (ret)
2316                         return ret;
2317         }
2318
2319         rx_adapter = rxa_id_to_adapter(id);
2320         if (rx_adapter != NULL) {
2321                 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
2322                 return -EEXIST;
2323         }
2324
2325         socket_id = rte_event_dev_socket_id(dev_id);
2326         snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
2327                 "rte_event_eth_rx_adapter_%d",
2328                 id);
2329
2330         rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
2331                         RTE_CACHE_LINE_SIZE, socket_id);
2332         if (rx_adapter == NULL) {
2333                 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
2334                 return -ENOMEM;
2335         }
2336
2337         rx_adapter->eventdev_id = dev_id;
2338         rx_adapter->socket_id = socket_id;
2339         rx_adapter->conf_cb = conf_cb;
2340         rx_adapter->conf_arg = conf_arg;
2341         rx_adapter->id = id;
2342         TAILQ_INIT(&rx_adapter->vector_list);
2343         strcpy(rx_adapter->mem_name, mem_name);
2344         rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
2345                                         RTE_MAX_ETHPORTS *
2346                                         sizeof(struct eth_device_info), 0,
2347                                         socket_id);
2348         rte_convert_rss_key((const uint32_t *)default_rss_key,
2349                         (uint32_t *)rx_adapter->rss_key_be,
2350                             RTE_DIM(default_rss_key));
2351
2352         if (rx_adapter->eth_devices == NULL) {
2353                 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
2354                 rte_free(rx_adapter);
2355                 return -ENOMEM;
2356         }
2357
2358         rte_spinlock_init(&rx_adapter->rx_lock);
2359
2360         for (i = 0; i < RTE_MAX_ETHPORTS; i++)
2361                 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
2362
2363         /* Rx adapter event buffer allocation */
2364         rx_adapter->use_queue_event_buf = rxa_params->use_queue_event_buf;
2365
2366         if (!rx_adapter->use_queue_event_buf) {
2367                 buf = &rx_adapter->event_enqueue_buffer;
2368                 buf->events_size = rxa_params->event_buf_size;
2369
2370                 events = rte_zmalloc_socket(rx_adapter->mem_name,
2371                                             buf->events_size * sizeof(*events),
2372                                             0, socket_id);
2373                 if (events == NULL) {
2374                         RTE_EDEV_LOG_ERR("Failed to allocate memory "
2375                                          "for adapter event buffer");
2376                         rte_free(rx_adapter->eth_devices);
2377                         rte_free(rx_adapter);
2378                         return -ENOMEM;
2379                 }
2380
2381                 rx_adapter->event_enqueue_buffer.events = events;
2382         }
2383
2384         event_eth_rx_adapter[id] = rx_adapter;
2385
2386         if (conf_cb == rxa_default_conf_cb)
2387                 rx_adapter->default_cb_arg = 1;
2388
2389         if (rte_mbuf_dyn_rx_timestamp_register(
2390                         &event_eth_rx_timestamp_dynfield_offset,
2391                         &event_eth_rx_timestamp_dynflag) != 0) {
2392                 RTE_EDEV_LOG_ERR("Error registering timestamp field in mbuf\n");
2393                 return -rte_errno;
2394         }
2395
2396         rte_eventdev_trace_eth_rx_adapter_create(id, dev_id, conf_cb,
2397                 conf_arg);
2398         return 0;
2399 }
2400
2401 int
2402 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
2403                                 rte_event_eth_rx_adapter_conf_cb conf_cb,
2404                                 void *conf_arg)
2405 {
2406         struct rte_event_eth_rx_adapter_params rxa_params = {0};
2407
2408         /* use default values for adapter params */
2409         rxa_params.event_buf_size = ETH_EVENT_BUFFER_SIZE;
2410         rxa_params.use_queue_event_buf = false;
2411
2412         return rxa_create(id, dev_id, &rxa_params, conf_cb, conf_arg);
2413 }
2414
2415 int
2416 rte_event_eth_rx_adapter_create_with_params(uint8_t id, uint8_t dev_id,
2417                         struct rte_event_port_conf *port_config,
2418                         struct rte_event_eth_rx_adapter_params *rxa_params)
2419 {
2420         struct rte_event_port_conf *pc;
2421         int ret;
2422         struct rte_event_eth_rx_adapter_params temp_params = {0};
2423
2424         if (port_config == NULL)
2425                 return -EINVAL;
2426
2427         if (rxa_params == NULL) {
2428                 /* use default values if rxa_params is NULL */
2429                 rxa_params = &temp_params;
2430                 rxa_params->event_buf_size = ETH_EVENT_BUFFER_SIZE;
2431                 rxa_params->use_queue_event_buf = false;
2432         } else if ((!rxa_params->use_queue_event_buf &&
2433                     rxa_params->event_buf_size == 0) ||
2434                    (rxa_params->use_queue_event_buf &&
2435                     rxa_params->event_buf_size != 0)) {
2436                 RTE_EDEV_LOG_ERR("Invalid adapter params\n");
2437                 return -EINVAL;
2438         } else if (!rxa_params->use_queue_event_buf) {
2439                 /* adjust event buff size with BATCH_SIZE used for fetching
2440                  * packets from NIC rx queues to get full buffer utilization
2441                  * and prevent unnecessary rollovers.
2442                  */
2443
2444                 rxa_params->event_buf_size =
2445                         RTE_ALIGN(rxa_params->event_buf_size, BATCH_SIZE);
2446                 rxa_params->event_buf_size += (BATCH_SIZE + BATCH_SIZE);
2447         }
2448
2449         pc = rte_malloc(NULL, sizeof(*pc), 0);
2450         if (pc == NULL)
2451                 return -ENOMEM;
2452
2453         *pc = *port_config;
2454
2455         ret = rxa_create(id, dev_id, rxa_params, rxa_default_conf_cb, pc);
2456         if (ret)
2457                 rte_free(pc);
2458
2459         return ret;
2460 }
2461
2462 int
2463 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2464                 struct rte_event_port_conf *port_config)
2465 {
2466         struct rte_event_port_conf *pc;
2467         int ret;
2468
2469         if (port_config == NULL)
2470                 return -EINVAL;
2471
2472         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2473
2474         pc = rte_malloc(NULL, sizeof(*pc), 0);
2475         if (pc == NULL)
2476                 return -ENOMEM;
2477         *pc = *port_config;
2478
2479         ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2480                                         rxa_default_conf_cb,
2481                                         pc);
2482         if (ret)
2483                 rte_free(pc);
2484         return ret;
2485 }
2486
2487 int
2488 rte_event_eth_rx_adapter_free(uint8_t id)
2489 {
2490         struct rte_event_eth_rx_adapter *rx_adapter;
2491
2492         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2493
2494         rx_adapter = rxa_id_to_adapter(id);
2495         if (rx_adapter == NULL)
2496                 return -EINVAL;
2497
2498         if (rx_adapter->nb_queues) {
2499                 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2500                                 rx_adapter->nb_queues);
2501                 return -EBUSY;
2502         }
2503
2504         if (rx_adapter->default_cb_arg)
2505                 rte_free(rx_adapter->conf_arg);
2506         rte_free(rx_adapter->eth_devices);
2507         if (!rx_adapter->use_queue_event_buf)
2508                 rte_free(rx_adapter->event_enqueue_buffer.events);
2509         rte_free(rx_adapter);
2510         event_eth_rx_adapter[id] = NULL;
2511
2512         rte_eventdev_trace_eth_rx_adapter_free(id);
2513         return 0;
2514 }
2515
2516 int
2517 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2518                 uint16_t eth_dev_id,
2519                 int32_t rx_queue_id,
2520                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2521 {
2522         int ret;
2523         uint32_t cap;
2524         struct rte_event_eth_rx_adapter *rx_adapter;
2525         struct rte_eventdev *dev;
2526         struct eth_device_info *dev_info;
2527         struct rte_event_eth_rx_adapter_vector_limits limits;
2528
2529         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2530         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2531
2532         rx_adapter = rxa_id_to_adapter(id);
2533         if ((rx_adapter == NULL) || (queue_conf == NULL))
2534                 return -EINVAL;
2535
2536         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2537         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2538                                                 eth_dev_id,
2539                                                 &cap);
2540         if (ret) {
2541                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2542                         "eth port %" PRIu16, id, eth_dev_id);
2543                 return ret;
2544         }
2545
2546         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2547                 && (queue_conf->rx_queue_flags &
2548                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2549                 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2550                                 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2551                                 eth_dev_id, id);
2552                 return -EINVAL;
2553         }
2554
2555         if (queue_conf->rx_queue_flags &
2556             RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
2557
2558                 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0) {
2559                         RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
2560                                          " eth port: %" PRIu16
2561                                          " adapter id: %" PRIu8,
2562                                          eth_dev_id, id);
2563                         return -EINVAL;
2564                 }
2565
2566                 ret = rte_event_eth_rx_adapter_vector_limits_get(
2567                         rx_adapter->eventdev_id, eth_dev_id, &limits);
2568                 if (ret < 0) {
2569                         RTE_EDEV_LOG_ERR("Failed to get event device vector limits,"
2570                                          " eth port: %" PRIu16
2571                                          " adapter id: %" PRIu8,
2572                                          eth_dev_id, id);
2573                         return -EINVAL;
2574                 }
2575                 if (queue_conf->vector_sz < limits.min_sz ||
2576                     queue_conf->vector_sz > limits.max_sz ||
2577                     queue_conf->vector_timeout_ns < limits.min_timeout_ns ||
2578                     queue_conf->vector_timeout_ns > limits.max_timeout_ns ||
2579                     queue_conf->vector_mp == NULL) {
2580                         RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2581                                          " eth port: %" PRIu16
2582                                          " adapter id: %" PRIu8,
2583                                          eth_dev_id, id);
2584                         return -EINVAL;
2585                 }
2586                 if (queue_conf->vector_mp->elt_size <
2587                     (sizeof(struct rte_event_vector) +
2588                      (sizeof(uintptr_t) * queue_conf->vector_sz))) {
2589                         RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2590                                          " eth port: %" PRIu16
2591                                          " adapter id: %" PRIu8,
2592                                          eth_dev_id, id);
2593                         return -EINVAL;
2594                 }
2595         }
2596
2597         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2598                 (rx_queue_id != -1)) {
2599                 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2600                         "event queue, eth port: %" PRIu16 " adapter id: %"
2601                         PRIu8, eth_dev_id, id);
2602                 return -EINVAL;
2603         }
2604
2605         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2606                         rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2607                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2608                          (uint16_t)rx_queue_id);
2609                 return -EINVAL;
2610         }
2611
2612         if ((rx_adapter->use_queue_event_buf &&
2613              queue_conf->event_buf_size == 0) ||
2614             (!rx_adapter->use_queue_event_buf &&
2615              queue_conf->event_buf_size != 0)) {
2616                 RTE_EDEV_LOG_ERR("Invalid Event buffer size for the queue");
2617                 return -EINVAL;
2618         }
2619
2620         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2621
2622         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2623                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2624                                         -ENOTSUP);
2625                 if (dev_info->rx_queue == NULL) {
2626                         dev_info->rx_queue =
2627                             rte_zmalloc_socket(rx_adapter->mem_name,
2628                                         dev_info->dev->data->nb_rx_queues *
2629                                         sizeof(struct eth_rx_queue_info), 0,
2630                                         rx_adapter->socket_id);
2631                         if (dev_info->rx_queue == NULL)
2632                                 return -ENOMEM;
2633                 }
2634
2635                 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2636                                 &rte_eth_devices[eth_dev_id],
2637                                 rx_queue_id, queue_conf);
2638                 if (ret == 0) {
2639                         dev_info->internal_event_port = 1;
2640                         rxa_update_queue(rx_adapter,
2641                                         &rx_adapter->eth_devices[eth_dev_id],
2642                                         rx_queue_id,
2643                                         1);
2644                 }
2645         } else {
2646                 rte_spinlock_lock(&rx_adapter->rx_lock);
2647                 dev_info->internal_event_port = 0;
2648                 ret = rxa_init_service(rx_adapter, id);
2649                 if (ret == 0) {
2650                         uint32_t service_id = rx_adapter->service_id;
2651                         ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2652                                         queue_conf);
2653                         rte_service_component_runstate_set(service_id,
2654                                 rxa_sw_adapter_queue_count(rx_adapter));
2655                 }
2656                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2657         }
2658
2659         rte_eventdev_trace_eth_rx_adapter_queue_add(id, eth_dev_id,
2660                 rx_queue_id, queue_conf, ret);
2661         if (ret)
2662                 return ret;
2663
2664         return 0;
2665 }
2666
2667 static int
2668 rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
2669 {
2670         limits->max_sz = MAX_VECTOR_SIZE;
2671         limits->min_sz = MIN_VECTOR_SIZE;
2672         limits->max_timeout_ns = MAX_VECTOR_NS;
2673         limits->min_timeout_ns = MIN_VECTOR_NS;
2674
2675         return 0;
2676 }
2677
2678 int
2679 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2680                                 int32_t rx_queue_id)
2681 {
2682         int ret = 0;
2683         struct rte_eventdev *dev;
2684         struct rte_event_eth_rx_adapter *rx_adapter;
2685         struct eth_device_info *dev_info;
2686         uint32_t cap;
2687         uint32_t nb_rx_poll = 0;
2688         uint32_t nb_wrr = 0;
2689         uint32_t nb_rx_intr;
2690         struct eth_rx_poll_entry *rx_poll = NULL;
2691         uint32_t *rx_wrr = NULL;
2692         int num_intr_vec;
2693
2694         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2695         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2696
2697         rx_adapter = rxa_id_to_adapter(id);
2698         if (rx_adapter == NULL)
2699                 return -EINVAL;
2700
2701         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2702         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2703                                                 eth_dev_id,
2704                                                 &cap);
2705         if (ret)
2706                 return ret;
2707
2708         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2709                 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2710                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2711                          (uint16_t)rx_queue_id);
2712                 return -EINVAL;
2713         }
2714
2715         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2716
2717         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2718                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2719                                  -ENOTSUP);
2720                 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2721                                                 &rte_eth_devices[eth_dev_id],
2722                                                 rx_queue_id);
2723                 if (ret == 0) {
2724                         rxa_update_queue(rx_adapter,
2725                                         &rx_adapter->eth_devices[eth_dev_id],
2726                                         rx_queue_id,
2727                                         0);
2728                         if (dev_info->nb_dev_queues == 0) {
2729                                 rte_free(dev_info->rx_queue);
2730                                 dev_info->rx_queue = NULL;
2731                         }
2732                 }
2733         } else {
2734                 rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2735                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2736
2737                 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2738                         &rx_poll, &rx_wrr);
2739                 if (ret)
2740                         return ret;
2741
2742                 rte_spinlock_lock(&rx_adapter->rx_lock);
2743
2744                 num_intr_vec = 0;
2745                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2746
2747                         num_intr_vec = rxa_nb_intr_vect(dev_info,
2748                                                 rx_queue_id, 0);
2749                         ret = rxa_del_intr_queue(rx_adapter, dev_info,
2750                                         rx_queue_id);
2751                         if (ret)
2752                                 goto unlock_ret;
2753                 }
2754
2755                 if (nb_rx_intr == 0) {
2756                         ret = rxa_free_intr_resources(rx_adapter);
2757                         if (ret)
2758                                 goto unlock_ret;
2759                 }
2760
2761                 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2762                 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2763
2764                 rte_free(rx_adapter->eth_rx_poll);
2765                 rte_free(rx_adapter->wrr_sched);
2766
2767                 if (nb_rx_intr == 0) {
2768                         rte_free(dev_info->intr_queue);
2769                         dev_info->intr_queue = NULL;
2770                 }
2771
2772                 rx_adapter->eth_rx_poll = rx_poll;
2773                 rx_adapter->wrr_sched = rx_wrr;
2774                 rx_adapter->wrr_len = nb_wrr;
2775                 rx_adapter->num_intr_vec += num_intr_vec;
2776
2777                 if (dev_info->nb_dev_queues == 0) {
2778                         rte_free(dev_info->rx_queue);
2779                         dev_info->rx_queue = NULL;
2780                 }
2781 unlock_ret:
2782                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2783                 if (ret) {
2784                         rte_free(rx_poll);
2785                         rte_free(rx_wrr);
2786                         return ret;
2787                 }
2788
2789                 rte_service_component_runstate_set(rx_adapter->service_id,
2790                                 rxa_sw_adapter_queue_count(rx_adapter));
2791         }
2792
2793         rte_eventdev_trace_eth_rx_adapter_queue_del(id, eth_dev_id,
2794                 rx_queue_id, ret);
2795         return ret;
2796 }
2797
2798 int
2799 rte_event_eth_rx_adapter_vector_limits_get(
2800         uint8_t dev_id, uint16_t eth_port_id,
2801         struct rte_event_eth_rx_adapter_vector_limits *limits)
2802 {
2803         struct rte_eventdev *dev;
2804         uint32_t cap;
2805         int ret;
2806
2807         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2808         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_port_id, -EINVAL);
2809
2810         if (limits == NULL)
2811                 return -EINVAL;
2812
2813         dev = &rte_eventdevs[dev_id];
2814
2815         ret = rte_event_eth_rx_adapter_caps_get(dev_id, eth_port_id, &cap);
2816         if (ret) {
2817                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2818                                  "eth port %" PRIu16,
2819                                  dev_id, eth_port_id);
2820                 return ret;
2821         }
2822
2823         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2824                 RTE_FUNC_PTR_OR_ERR_RET(
2825                         *dev->dev_ops->eth_rx_adapter_vector_limits_get,
2826                         -ENOTSUP);
2827                 ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
2828                         dev, &rte_eth_devices[eth_port_id], limits);
2829         } else {
2830                 ret = rxa_sw_vector_limits(limits);
2831         }
2832
2833         return ret;
2834 }
2835
2836 int
2837 rte_event_eth_rx_adapter_start(uint8_t id)
2838 {
2839         rte_eventdev_trace_eth_rx_adapter_start(id);
2840         return rxa_ctrl(id, 1);
2841 }
2842
2843 int
2844 rte_event_eth_rx_adapter_stop(uint8_t id)
2845 {
2846         rte_eventdev_trace_eth_rx_adapter_stop(id);
2847         return rxa_ctrl(id, 0);
2848 }
2849
2850 int
2851 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2852                                struct rte_event_eth_rx_adapter_stats *stats)
2853 {
2854         struct rte_event_eth_rx_adapter *rx_adapter;
2855         struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2856         struct rte_event_eth_rx_adapter_stats dev_stats;
2857         struct rte_eventdev *dev;
2858         struct eth_device_info *dev_info;
2859         uint32_t i;
2860         int ret;
2861
2862         if (rxa_memzone_lookup())
2863                 return -ENOMEM;
2864
2865         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2866
2867         rx_adapter = rxa_id_to_adapter(id);
2868         if (rx_adapter  == NULL || stats == NULL)
2869                 return -EINVAL;
2870
2871         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2872         memset(stats, 0, sizeof(*stats));
2873         RTE_ETH_FOREACH_DEV(i) {
2874                 dev_info = &rx_adapter->eth_devices[i];
2875                 if (dev_info->internal_event_port == 0 ||
2876                         dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2877                         continue;
2878                 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2879                                                 &rte_eth_devices[i],
2880                                                 &dev_stats);
2881                 if (ret)
2882                         continue;
2883                 dev_stats_sum.rx_packets += dev_stats.rx_packets;
2884                 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2885         }
2886
2887         if (rx_adapter->service_inited)
2888                 *stats = rx_adapter->stats;
2889
2890         stats->rx_packets += dev_stats_sum.rx_packets;
2891         stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2892
2893         return 0;
2894 }
2895
2896 int
2897 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
2898 {
2899         struct rte_event_eth_rx_adapter *rx_adapter;
2900         struct rte_eventdev *dev;
2901         struct eth_device_info *dev_info;
2902         uint32_t i;
2903
2904         if (rxa_memzone_lookup())
2905                 return -ENOMEM;
2906
2907         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2908
2909         rx_adapter = rxa_id_to_adapter(id);
2910         if (rx_adapter == NULL)
2911                 return -EINVAL;
2912
2913         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2914         RTE_ETH_FOREACH_DEV(i) {
2915                 dev_info = &rx_adapter->eth_devices[i];
2916                 if (dev_info->internal_event_port == 0 ||
2917                         dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
2918                         continue;
2919                 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
2920                                                         &rte_eth_devices[i]);
2921         }
2922
2923         memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
2924         return 0;
2925 }
2926
2927 int
2928 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
2929 {
2930         struct rte_event_eth_rx_adapter *rx_adapter;
2931
2932         if (rxa_memzone_lookup())
2933                 return -ENOMEM;
2934
2935         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2936
2937         rx_adapter = rxa_id_to_adapter(id);
2938         if (rx_adapter == NULL || service_id == NULL)
2939                 return -EINVAL;
2940
2941         if (rx_adapter->service_inited)
2942                 *service_id = rx_adapter->service_id;
2943
2944         return rx_adapter->service_inited ? 0 : -ESRCH;
2945 }
2946
2947 int
2948 rte_event_eth_rx_adapter_cb_register(uint8_t id,
2949                                         uint16_t eth_dev_id,
2950                                         rte_event_eth_rx_adapter_cb_fn cb_fn,
2951                                         void *cb_arg)
2952 {
2953         struct rte_event_eth_rx_adapter *rx_adapter;
2954         struct eth_device_info *dev_info;
2955         uint32_t cap;
2956         int ret;
2957
2958         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2959         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2960
2961         rx_adapter = rxa_id_to_adapter(id);
2962         if (rx_adapter == NULL)
2963                 return -EINVAL;
2964
2965         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2966         if (dev_info->rx_queue == NULL)
2967                 return -EINVAL;
2968
2969         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2970                                                 eth_dev_id,
2971                                                 &cap);
2972         if (ret) {
2973                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2974                         "eth port %" PRIu16, id, eth_dev_id);
2975                 return ret;
2976         }
2977
2978         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2979                 RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
2980                                 PRIu16, eth_dev_id);
2981                 return -EINVAL;
2982         }
2983
2984         rte_spinlock_lock(&rx_adapter->rx_lock);
2985         dev_info->cb_fn = cb_fn;
2986         dev_info->cb_arg = cb_arg;
2987         rte_spinlock_unlock(&rx_adapter->rx_lock);
2988
2989         return 0;
2990 }
2991
2992 int
2993 rte_event_eth_rx_adapter_queue_conf_get(uint8_t id,
2994                         uint16_t eth_dev_id,
2995                         uint16_t rx_queue_id,
2996                         struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2997 {
2998         struct rte_eventdev *dev;
2999         struct rte_event_eth_rx_adapter *rx_adapter;
3000         struct eth_device_info *dev_info;
3001         struct eth_rx_queue_info *queue_info;
3002         struct rte_event *qi_ev;
3003         int ret;
3004
3005         if (rxa_memzone_lookup())
3006                 return -ENOMEM;
3007
3008         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
3009         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
3010
3011         if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
3012                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
3013                 return -EINVAL;
3014         }
3015
3016         if (queue_conf == NULL) {
3017                 RTE_EDEV_LOG_ERR("Rx queue conf struct cannot be NULL");
3018                 return -EINVAL;
3019         }
3020
3021         rx_adapter = rxa_id_to_adapter(id);
3022         if (rx_adapter == NULL)
3023                 return -EINVAL;
3024
3025         dev_info = &rx_adapter->eth_devices[eth_dev_id];
3026         if (dev_info->rx_queue == NULL ||
3027             !dev_info->rx_queue[rx_queue_id].queue_enabled) {
3028                 RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
3029                 return -EINVAL;
3030         }
3031
3032         queue_info = &dev_info->rx_queue[rx_queue_id];
3033         qi_ev = (struct rte_event *)&queue_info->event;
3034
3035         memset(queue_conf, 0, sizeof(*queue_conf));
3036         queue_conf->rx_queue_flags = 0;
3037         if (queue_info->flow_id_mask != 0)
3038                 queue_conf->rx_queue_flags |=
3039                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID;
3040         queue_conf->servicing_weight = queue_info->wt;
3041
3042         memcpy(&queue_conf->ev, qi_ev, sizeof(*qi_ev));
3043
3044         dev = &rte_eventdevs[rx_adapter->eventdev_id];
3045         if (dev->dev_ops->eth_rx_adapter_queue_conf_get != NULL) {
3046                 ret = (*dev->dev_ops->eth_rx_adapter_queue_conf_get)(dev,
3047                                                 &rte_eth_devices[eth_dev_id],
3048                                                 rx_queue_id,
3049                                                 queue_conf);
3050                 return ret;
3051         }
3052
3053         return 0;
3054 }