lib: remove librte_ prefix from directory names
[dpdk.git] / lib / eventdev / rte_event_eth_rx_adapter.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #if defined(LINUX)
6 #include <sys/epoll.h>
7 #endif
8 #include <unistd.h>
9
10 #include <rte_cycles.h>
11 #include <rte_common.h>
12 #include <rte_dev.h>
13 #include <rte_errno.h>
14 #include <rte_ethdev.h>
15 #include <rte_log.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
20
21 #include "rte_eventdev.h"
22 #include "eventdev_pmd.h"
23 #include "rte_eventdev_trace.h"
24 #include "rte_event_eth_rx_adapter.h"
25
26 #define BATCH_SIZE              32
27 #define BLOCK_CNT_THRESHOLD     10
28 #define ETH_EVENT_BUFFER_SIZE   (4*BATCH_SIZE)
29 #define MAX_VECTOR_SIZE         1024
30 #define MIN_VECTOR_SIZE         4
31 #define MAX_VECTOR_NS           1E9
32 #define MIN_VECTOR_NS           1E5
33
34 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
35 #define ETH_RX_ADAPTER_MEM_NAME_LEN     32
36
37 #define RSS_KEY_SIZE    40
38 /* value written to intr thread pipe to signal thread exit */
39 #define ETH_BRIDGE_INTR_THREAD_EXIT     1
40 /* Sentinel value to detect initialized file handle */
41 #define INIT_FD         -1
42
43 /*
44  * Used to store port and queue ID of interrupting Rx queue
45  */
46 union queue_data {
47         RTE_STD_C11
48         void *ptr;
49         struct {
50                 uint16_t port;
51                 uint16_t queue;
52         };
53 };
54
55 /*
56  * There is an instance of this struct per polled Rx queue added to the
57  * adapter
58  */
59 struct eth_rx_poll_entry {
60         /* Eth port to poll */
61         uint16_t eth_dev_id;
62         /* Eth rx queue to poll */
63         uint16_t eth_rx_qid;
64 };
65
66 struct eth_rx_vector_data {
67         TAILQ_ENTRY(eth_rx_vector_data) next;
68         uint16_t port;
69         uint16_t queue;
70         uint16_t max_vector_count;
71         uint64_t event;
72         uint64_t ts;
73         uint64_t vector_timeout_ticks;
74         struct rte_mempool *vector_pool;
75         struct rte_event_vector *vector_ev;
76 } __rte_cache_aligned;
77
78 TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
79
80 /* Instance per adapter */
81 struct rte_eth_event_enqueue_buffer {
82         /* Count of events in this buffer */
83         uint16_t count;
84         /* Array of events in this buffer */
85         struct rte_event events[ETH_EVENT_BUFFER_SIZE];
86 };
87
88 struct rte_event_eth_rx_adapter {
89         /* RSS key */
90         uint8_t rss_key_be[RSS_KEY_SIZE];
91         /* Event device identifier */
92         uint8_t eventdev_id;
93         /* Per ethernet device structure */
94         struct eth_device_info *eth_devices;
95         /* Event port identifier */
96         uint8_t event_port_id;
97         /* Lock to serialize config updates with service function */
98         rte_spinlock_t rx_lock;
99         /* Max mbufs processed in any service function invocation */
100         uint32_t max_nb_rx;
101         /* Receive queues that need to be polled */
102         struct eth_rx_poll_entry *eth_rx_poll;
103         /* Size of the eth_rx_poll array */
104         uint16_t num_rx_polled;
105         /* Weighted round robin schedule */
106         uint32_t *wrr_sched;
107         /* wrr_sched[] size */
108         uint32_t wrr_len;
109         /* Next entry in wrr[] to begin polling */
110         uint32_t wrr_pos;
111         /* Event burst buffer */
112         struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
113         /* Vector enable flag */
114         uint8_t ena_vector;
115         /* Timestamp of previous vector expiry list traversal */
116         uint64_t prev_expiry_ts;
117         /* Minimum ticks to wait before traversing expiry list */
118         uint64_t vector_tmo_ticks;
119         /* vector list */
120         struct eth_rx_vector_data_list vector_list;
121         /* Per adapter stats */
122         struct rte_event_eth_rx_adapter_stats stats;
123         /* Block count, counts up to BLOCK_CNT_THRESHOLD */
124         uint16_t enq_block_count;
125         /* Block start ts */
126         uint64_t rx_enq_block_start_ts;
127         /* epoll fd used to wait for Rx interrupts */
128         int epd;
129         /* Num of interrupt driven interrupt queues */
130         uint32_t num_rx_intr;
131         /* Used to send <dev id, queue id> of interrupting Rx queues from
132          * the interrupt thread to the Rx thread
133          */
134         struct rte_ring *intr_ring;
135         /* Rx Queue data (dev id, queue id) for the last non-empty
136          * queue polled
137          */
138         union queue_data qd;
139         /* queue_data is valid */
140         int qd_valid;
141         /* Interrupt ring lock, synchronizes Rx thread
142          * and interrupt thread
143          */
144         rte_spinlock_t intr_ring_lock;
145         /* event array passed to rte_poll_wait */
146         struct rte_epoll_event *epoll_events;
147         /* Count of interrupt vectors in use */
148         uint32_t num_intr_vec;
149         /* Thread blocked on Rx interrupts */
150         pthread_t rx_intr_thread;
151         /* Configuration callback for rte_service configuration */
152         rte_event_eth_rx_adapter_conf_cb conf_cb;
153         /* Configuration callback argument */
154         void *conf_arg;
155         /* Set if  default_cb is being used */
156         int default_cb_arg;
157         /* Service initialization state */
158         uint8_t service_inited;
159         /* Total count of Rx queues in adapter */
160         uint32_t nb_queues;
161         /* Memory allocation name */
162         char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
163         /* Socket identifier cached from eventdev */
164         int socket_id;
165         /* Per adapter EAL service */
166         uint32_t service_id;
167         /* Adapter started flag */
168         uint8_t rxa_started;
169         /* Adapter ID */
170         uint8_t id;
171 } __rte_cache_aligned;
172
173 /* Per eth device */
174 struct eth_device_info {
175         struct rte_eth_dev *dev;
176         struct eth_rx_queue_info *rx_queue;
177         /* Rx callback */
178         rte_event_eth_rx_adapter_cb_fn cb_fn;
179         /* Rx callback argument */
180         void *cb_arg;
181         /* Set if ethdev->eventdev packet transfer uses a
182          * hardware mechanism
183          */
184         uint8_t internal_event_port;
185         /* Set if the adapter is processing rx queues for
186          * this eth device and packet processing has been
187          * started, allows for the code to know if the PMD
188          * rx_adapter_stop callback needs to be invoked
189          */
190         uint8_t dev_rx_started;
191         /* Number of queues added for this device */
192         uint16_t nb_dev_queues;
193         /* Number of poll based queues
194          * If nb_rx_poll > 0, the start callback will
195          * be invoked if not already invoked
196          */
197         uint16_t nb_rx_poll;
198         /* Number of interrupt based queues
199          * If nb_rx_intr > 0, the start callback will
200          * be invoked if not already invoked.
201          */
202         uint16_t nb_rx_intr;
203         /* Number of queues that use the shared interrupt */
204         uint16_t nb_shared_intr;
205         /* sum(wrr(q)) for all queues within the device
206          * useful when deleting all device queues
207          */
208         uint32_t wrr_len;
209         /* Intr based queue index to start polling from, this is used
210          * if the number of shared interrupts is non-zero
211          */
212         uint16_t next_q_idx;
213         /* Intr based queue indices */
214         uint16_t *intr_queue;
215         /* device generates per Rx queue interrupt for queue index
216          * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
217          */
218         int multi_intr_cap;
219         /* shared interrupt enabled */
220         int shared_intr_enabled;
221 };
222
223 /* Per Rx queue */
224 struct eth_rx_queue_info {
225         int queue_enabled;      /* True if added */
226         int intr_enabled;
227         uint8_t ena_vector;
228         uint16_t wt;            /* Polling weight */
229         uint32_t flow_id_mask;  /* Set to ~0 if app provides flow id else 0 */
230         uint64_t event;
231         struct eth_rx_vector_data vector_data;
232 };
233
234 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
235
236 static inline int
237 rxa_validate_id(uint8_t id)
238 {
239         return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
240 }
241
242 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
243         if (!rxa_validate_id(id)) { \
244                 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
245                 return retval; \
246         } \
247 } while (0)
248
249 static inline int
250 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
251 {
252         return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
253 }
254
255 /* Greatest common divisor */
256 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
257 {
258         uint16_t r = a % b;
259
260         return r ? rxa_gcd_u16(b, r) : b;
261 }
262
263 /* Returns the next queue in the polling sequence
264  *
265  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
266  */
267 static int
268 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
269          unsigned int n, int *cw,
270          struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
271          uint16_t gcd, int prev)
272 {
273         int i = prev;
274         uint16_t w;
275
276         while (1) {
277                 uint16_t q;
278                 uint16_t d;
279
280                 i = (i + 1) % n;
281                 if (i == 0) {
282                         *cw = *cw - gcd;
283                         if (*cw <= 0)
284                                 *cw = max_wt;
285                 }
286
287                 q = eth_rx_poll[i].eth_rx_qid;
288                 d = eth_rx_poll[i].eth_dev_id;
289                 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
290
291                 if ((int)w >= *cw)
292                         return i;
293         }
294 }
295
296 static inline int
297 rxa_shared_intr(struct eth_device_info *dev_info,
298         int rx_queue_id)
299 {
300         int multi_intr_cap;
301
302         if (dev_info->dev->intr_handle == NULL)
303                 return 0;
304
305         multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
306         return !multi_intr_cap ||
307                 rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
308 }
309
310 static inline int
311 rxa_intr_queue(struct eth_device_info *dev_info,
312         int rx_queue_id)
313 {
314         struct eth_rx_queue_info *queue_info;
315
316         queue_info = &dev_info->rx_queue[rx_queue_id];
317         return dev_info->rx_queue &&
318                 !dev_info->internal_event_port &&
319                 queue_info->queue_enabled && queue_info->wt == 0;
320 }
321
322 static inline int
323 rxa_polled_queue(struct eth_device_info *dev_info,
324         int rx_queue_id)
325 {
326         struct eth_rx_queue_info *queue_info;
327
328         queue_info = &dev_info->rx_queue[rx_queue_id];
329         return !dev_info->internal_event_port &&
330                 dev_info->rx_queue &&
331                 queue_info->queue_enabled && queue_info->wt != 0;
332 }
333
334 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
335 static int
336 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
337 {
338         uint16_t i;
339         int n, s;
340         uint16_t nbq;
341
342         nbq = dev_info->dev->data->nb_rx_queues;
343         n = 0; /* non shared count */
344         s = 0; /* shared count */
345
346         if (rx_queue_id == -1) {
347                 for (i = 0; i < nbq; i++) {
348                         if (!rxa_shared_intr(dev_info, i))
349                                 n += add ? !rxa_intr_queue(dev_info, i) :
350                                         rxa_intr_queue(dev_info, i);
351                         else
352                                 s += add ? !rxa_intr_queue(dev_info, i) :
353                                         rxa_intr_queue(dev_info, i);
354                 }
355
356                 if (s > 0) {
357                         if ((add && dev_info->nb_shared_intr == 0) ||
358                                 (!add && dev_info->nb_shared_intr))
359                                 n += 1;
360                 }
361         } else {
362                 if (!rxa_shared_intr(dev_info, rx_queue_id))
363                         n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
364                                 rxa_intr_queue(dev_info, rx_queue_id);
365                 else
366                         n = add ? !dev_info->nb_shared_intr :
367                                 dev_info->nb_shared_intr == 1;
368         }
369
370         return add ? n : -n;
371 }
372
373 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
374  */
375 static void
376 rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
377                         struct eth_device_info *dev_info,
378                         int rx_queue_id,
379                         uint32_t *nb_rx_intr)
380 {
381         uint32_t intr_diff;
382
383         if (rx_queue_id == -1)
384                 intr_diff = dev_info->nb_rx_intr;
385         else
386                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
387
388         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
389 }
390
391 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
392  * interrupt queues could currently be poll mode Rx queues
393  */
394 static void
395 rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
396                         struct eth_device_info *dev_info,
397                         int rx_queue_id,
398                         uint32_t *nb_rx_poll,
399                         uint32_t *nb_rx_intr,
400                         uint32_t *nb_wrr)
401 {
402         uint32_t intr_diff;
403         uint32_t poll_diff;
404         uint32_t wrr_len_diff;
405
406         if (rx_queue_id == -1) {
407                 intr_diff = dev_info->dev->data->nb_rx_queues -
408                                                 dev_info->nb_rx_intr;
409                 poll_diff = dev_info->nb_rx_poll;
410                 wrr_len_diff = dev_info->wrr_len;
411         } else {
412                 intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
413                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
414                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
415                                         0;
416         }
417
418         *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
419         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
420         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
421 }
422
423 /* Calculate size of the eth_rx_poll and wrr_sched arrays
424  * after deleting poll mode rx queues
425  */
426 static void
427 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
428                         struct eth_device_info *dev_info,
429                         int rx_queue_id,
430                         uint32_t *nb_rx_poll,
431                         uint32_t *nb_wrr)
432 {
433         uint32_t poll_diff;
434         uint32_t wrr_len_diff;
435
436         if (rx_queue_id == -1) {
437                 poll_diff = dev_info->nb_rx_poll;
438                 wrr_len_diff = dev_info->wrr_len;
439         } else {
440                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
441                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
442                                         0;
443         }
444
445         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
446         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
447 }
448
449 /* Calculate nb_rx_* after adding poll mode rx queues
450  */
451 static void
452 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
453                         struct eth_device_info *dev_info,
454                         int rx_queue_id,
455                         uint16_t wt,
456                         uint32_t *nb_rx_poll,
457                         uint32_t *nb_rx_intr,
458                         uint32_t *nb_wrr)
459 {
460         uint32_t intr_diff;
461         uint32_t poll_diff;
462         uint32_t wrr_len_diff;
463
464         if (rx_queue_id == -1) {
465                 intr_diff = dev_info->nb_rx_intr;
466                 poll_diff = dev_info->dev->data->nb_rx_queues -
467                                                 dev_info->nb_rx_poll;
468                 wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
469                                 - dev_info->wrr_len;
470         } else {
471                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
472                 poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
473                 wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
474                                 wt - dev_info->rx_queue[rx_queue_id].wt :
475                                 wt;
476         }
477
478         *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
479         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
480         *nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
481 }
482
483 /* Calculate nb_rx_* after adding rx_queue_id */
484 static void
485 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
486                 struct eth_device_info *dev_info,
487                 int rx_queue_id,
488                 uint16_t wt,
489                 uint32_t *nb_rx_poll,
490                 uint32_t *nb_rx_intr,
491                 uint32_t *nb_wrr)
492 {
493         if (wt != 0)
494                 rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
495                                         wt, nb_rx_poll, nb_rx_intr, nb_wrr);
496         else
497                 rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
498                                         nb_rx_poll, nb_rx_intr, nb_wrr);
499 }
500
501 /* Calculate nb_rx_* after deleting rx_queue_id */
502 static void
503 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
504                 struct eth_device_info *dev_info,
505                 int rx_queue_id,
506                 uint32_t *nb_rx_poll,
507                 uint32_t *nb_rx_intr,
508                 uint32_t *nb_wrr)
509 {
510         rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
511                                 nb_wrr);
512         rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
513                                 nb_rx_intr);
514 }
515
516 /*
517  * Allocate the rx_poll array
518  */
519 static struct eth_rx_poll_entry *
520 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
521         uint32_t num_rx_polled)
522 {
523         size_t len;
524
525         len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
526                                                         RTE_CACHE_LINE_SIZE);
527         return  rte_zmalloc_socket(rx_adapter->mem_name,
528                                 len,
529                                 RTE_CACHE_LINE_SIZE,
530                                 rx_adapter->socket_id);
531 }
532
533 /*
534  * Allocate the WRR array
535  */
536 static uint32_t *
537 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
538 {
539         size_t len;
540
541         len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
542                         RTE_CACHE_LINE_SIZE);
543         return  rte_zmalloc_socket(rx_adapter->mem_name,
544                                 len,
545                                 RTE_CACHE_LINE_SIZE,
546                                 rx_adapter->socket_id);
547 }
548
549 static int
550 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
551                 uint32_t nb_poll,
552                 uint32_t nb_wrr,
553                 struct eth_rx_poll_entry **rx_poll,
554                 uint32_t **wrr_sched)
555 {
556
557         if (nb_poll == 0) {
558                 *rx_poll = NULL;
559                 *wrr_sched = NULL;
560                 return 0;
561         }
562
563         *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
564         if (*rx_poll == NULL) {
565                 *wrr_sched = NULL;
566                 return -ENOMEM;
567         }
568
569         *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
570         if (*wrr_sched == NULL) {
571                 rte_free(*rx_poll);
572                 return -ENOMEM;
573         }
574         return 0;
575 }
576
577 /* Precalculate WRR polling sequence for all queues in rx_adapter */
578 static void
579 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
580                 struct eth_rx_poll_entry *rx_poll,
581                 uint32_t *rx_wrr)
582 {
583         uint16_t d;
584         uint16_t q;
585         unsigned int i;
586         int prev = -1;
587         int cw = -1;
588
589         /* Initialize variables for calculation of wrr schedule */
590         uint16_t max_wrr_pos = 0;
591         unsigned int poll_q = 0;
592         uint16_t max_wt = 0;
593         uint16_t gcd = 0;
594
595         if (rx_poll == NULL)
596                 return;
597
598         /* Generate array of all queues to poll, the size of this
599          * array is poll_q
600          */
601         RTE_ETH_FOREACH_DEV(d) {
602                 uint16_t nb_rx_queues;
603                 struct eth_device_info *dev_info =
604                                 &rx_adapter->eth_devices[d];
605                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
606                 if (dev_info->rx_queue == NULL)
607                         continue;
608                 if (dev_info->internal_event_port)
609                         continue;
610                 dev_info->wrr_len = 0;
611                 for (q = 0; q < nb_rx_queues; q++) {
612                         struct eth_rx_queue_info *queue_info =
613                                 &dev_info->rx_queue[q];
614                         uint16_t wt;
615
616                         if (!rxa_polled_queue(dev_info, q))
617                                 continue;
618                         wt = queue_info->wt;
619                         rx_poll[poll_q].eth_dev_id = d;
620                         rx_poll[poll_q].eth_rx_qid = q;
621                         max_wrr_pos += wt;
622                         dev_info->wrr_len += wt;
623                         max_wt = RTE_MAX(max_wt, wt);
624                         gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
625                         poll_q++;
626                 }
627         }
628
629         /* Generate polling sequence based on weights */
630         prev = -1;
631         cw = -1;
632         for (i = 0; i < max_wrr_pos; i++) {
633                 rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
634                                      rx_poll, max_wt, gcd, prev);
635                 prev = rx_wrr[i];
636         }
637 }
638
639 static inline void
640 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
641         struct rte_ipv6_hdr **ipv6_hdr)
642 {
643         struct rte_ether_hdr *eth_hdr =
644                 rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
645         struct rte_vlan_hdr *vlan_hdr;
646
647         *ipv4_hdr = NULL;
648         *ipv6_hdr = NULL;
649
650         switch (eth_hdr->ether_type) {
651         case RTE_BE16(RTE_ETHER_TYPE_IPV4):
652                 *ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
653                 break;
654
655         case RTE_BE16(RTE_ETHER_TYPE_IPV6):
656                 *ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
657                 break;
658
659         case RTE_BE16(RTE_ETHER_TYPE_VLAN):
660                 vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
661                 switch (vlan_hdr->eth_proto) {
662                 case RTE_BE16(RTE_ETHER_TYPE_IPV4):
663                         *ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
664                         break;
665                 case RTE_BE16(RTE_ETHER_TYPE_IPV6):
666                         *ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
667                         break;
668                 default:
669                         break;
670                 }
671                 break;
672
673         default:
674                 break;
675         }
676 }
677
678 /* Calculate RSS hash for IPv4/6 */
679 static inline uint32_t
680 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
681 {
682         uint32_t input_len;
683         void *tuple;
684         struct rte_ipv4_tuple ipv4_tuple;
685         struct rte_ipv6_tuple ipv6_tuple;
686         struct rte_ipv4_hdr *ipv4_hdr;
687         struct rte_ipv6_hdr *ipv6_hdr;
688
689         rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
690
691         if (ipv4_hdr) {
692                 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
693                 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
694                 tuple = &ipv4_tuple;
695                 input_len = RTE_THASH_V4_L3_LEN;
696         } else if (ipv6_hdr) {
697                 rte_thash_load_v6_addrs(ipv6_hdr,
698                                         (union rte_thash_tuple *)&ipv6_tuple);
699                 tuple = &ipv6_tuple;
700                 input_len = RTE_THASH_V6_L3_LEN;
701         } else
702                 return 0;
703
704         return rte_softrss_be(tuple, input_len, rss_key_be);
705 }
706
707 static inline int
708 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
709 {
710         return !!rx_adapter->enq_block_count;
711 }
712
713 static inline void
714 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
715 {
716         if (rx_adapter->rx_enq_block_start_ts)
717                 return;
718
719         rx_adapter->enq_block_count++;
720         if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
721                 return;
722
723         rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
724 }
725
726 static inline void
727 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
728                     struct rte_event_eth_rx_adapter_stats *stats)
729 {
730         if (unlikely(!stats->rx_enq_start_ts))
731                 stats->rx_enq_start_ts = rte_get_tsc_cycles();
732
733         if (likely(!rxa_enq_blocked(rx_adapter)))
734                 return;
735
736         rx_adapter->enq_block_count = 0;
737         if (rx_adapter->rx_enq_block_start_ts) {
738                 stats->rx_enq_end_ts = rte_get_tsc_cycles();
739                 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
740                     rx_adapter->rx_enq_block_start_ts;
741                 rx_adapter->rx_enq_block_start_ts = 0;
742         }
743 }
744
745 /* Enqueue buffered events to event device */
746 static inline uint16_t
747 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
748 {
749         struct rte_eth_event_enqueue_buffer *buf =
750             &rx_adapter->event_enqueue_buffer;
751         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
752
753         if (!buf->count)
754                 return 0;
755
756         uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
757                                         rx_adapter->event_port_id,
758                                         buf->events,
759                                         buf->count);
760         if (n != buf->count) {
761                 memmove(buf->events,
762                         &buf->events[n],
763                         (buf->count - n) * sizeof(struct rte_event));
764                 stats->rx_enq_retry++;
765         }
766
767         n ? rxa_enq_block_end_ts(rx_adapter, stats) :
768                 rxa_enq_block_start_ts(rx_adapter);
769
770         buf->count -= n;
771         stats->rx_enq_count += n;
772
773         return n;
774 }
775
776 static inline void
777 rxa_init_vector(struct rte_event_eth_rx_adapter *rx_adapter,
778                 struct eth_rx_vector_data *vec)
779 {
780         vec->vector_ev->nb_elem = 0;
781         vec->vector_ev->port = vec->port;
782         vec->vector_ev->queue = vec->queue;
783         vec->vector_ev->attr_valid = true;
784         TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
785 }
786
787 static inline uint16_t
788 rxa_create_event_vector(struct rte_event_eth_rx_adapter *rx_adapter,
789                         struct eth_rx_queue_info *queue_info,
790                         struct rte_eth_event_enqueue_buffer *buf,
791                         struct rte_mbuf **mbufs, uint16_t num)
792 {
793         struct rte_event *ev = &buf->events[buf->count];
794         struct eth_rx_vector_data *vec;
795         uint16_t filled, space, sz;
796
797         filled = 0;
798         vec = &queue_info->vector_data;
799
800         if (vec->vector_ev == NULL) {
801                 if (rte_mempool_get(vec->vector_pool,
802                                     (void **)&vec->vector_ev) < 0) {
803                         rte_pktmbuf_free_bulk(mbufs, num);
804                         return 0;
805                 }
806                 rxa_init_vector(rx_adapter, vec);
807         }
808         while (num) {
809                 if (vec->vector_ev->nb_elem == vec->max_vector_count) {
810                         /* Event ready. */
811                         ev->event = vec->event;
812                         ev->vec = vec->vector_ev;
813                         ev++;
814                         filled++;
815                         vec->vector_ev = NULL;
816                         TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
817                         if (rte_mempool_get(vec->vector_pool,
818                                             (void **)&vec->vector_ev) < 0) {
819                                 rte_pktmbuf_free_bulk(mbufs, num);
820                                 return 0;
821                         }
822                         rxa_init_vector(rx_adapter, vec);
823                 }
824
825                 space = vec->max_vector_count - vec->vector_ev->nb_elem;
826                 sz = num > space ? space : num;
827                 memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
828                        sizeof(void *) * sz);
829                 vec->vector_ev->nb_elem += sz;
830                 num -= sz;
831                 mbufs += sz;
832                 vec->ts = rte_rdtsc();
833         }
834
835         if (vec->vector_ev->nb_elem == vec->max_vector_count) {
836                 ev->event = vec->event;
837                 ev->vec = vec->vector_ev;
838                 ev++;
839                 filled++;
840                 vec->vector_ev = NULL;
841                 TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
842         }
843
844         return filled;
845 }
846
847 static inline void
848 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
849                 uint16_t eth_dev_id,
850                 uint16_t rx_queue_id,
851                 struct rte_mbuf **mbufs,
852                 uint16_t num)
853 {
854         uint32_t i;
855         struct eth_device_info *dev_info =
856                                         &rx_adapter->eth_devices[eth_dev_id];
857         struct eth_rx_queue_info *eth_rx_queue_info =
858                                         &dev_info->rx_queue[rx_queue_id];
859         struct rte_eth_event_enqueue_buffer *buf =
860                                         &rx_adapter->event_enqueue_buffer;
861         struct rte_event *ev = &buf->events[buf->count];
862         uint64_t event = eth_rx_queue_info->event;
863         uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
864         struct rte_mbuf *m = mbufs[0];
865         uint32_t rss_mask;
866         uint32_t rss;
867         int do_rss;
868         uint16_t nb_cb;
869         uint16_t dropped;
870
871         if (!eth_rx_queue_info->ena_vector) {
872                 /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
873                 rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
874                 do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
875                 for (i = 0; i < num; i++) {
876                         m = mbufs[i];
877
878                         rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
879                                      : m->hash.rss;
880                         ev->event = event;
881                         ev->flow_id = (rss & ~flow_id_mask) |
882                                       (ev->flow_id & flow_id_mask);
883                         ev->mbuf = m;
884                         ev++;
885                 }
886         } else {
887                 num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
888                                               buf, mbufs, num);
889         }
890
891         if (num && dev_info->cb_fn) {
892
893                 dropped = 0;
894                 nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
895                                         ETH_EVENT_BUFFER_SIZE, buf->count,
896                                         &buf->events[buf->count], num,
897                                         dev_info->cb_arg, &dropped);
898                 if (unlikely(nb_cb > num))
899                         RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
900                                 nb_cb, num);
901                 else
902                         num = nb_cb;
903                 if (dropped)
904                         rx_adapter->stats.rx_dropped += dropped;
905         }
906
907         buf->count += num;
908 }
909
910 /* Enqueue packets from  <port, q>  to event buffer */
911 static inline uint32_t
912 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
913         uint16_t port_id,
914         uint16_t queue_id,
915         uint32_t rx_count,
916         uint32_t max_rx,
917         int *rxq_empty)
918 {
919         struct rte_mbuf *mbufs[BATCH_SIZE];
920         struct rte_eth_event_enqueue_buffer *buf =
921                                         &rx_adapter->event_enqueue_buffer;
922         struct rte_event_eth_rx_adapter_stats *stats =
923                                         &rx_adapter->stats;
924         uint16_t n;
925         uint32_t nb_rx = 0;
926
927         if (rxq_empty)
928                 *rxq_empty = 0;
929         /* Don't do a batch dequeue from the rx queue if there isn't
930          * enough space in the enqueue buffer.
931          */
932         while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
933                 if (buf->count >= BATCH_SIZE)
934                         rxa_flush_event_buffer(rx_adapter);
935
936                 stats->rx_poll_count++;
937                 n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
938                 if (unlikely(!n)) {
939                         if (rxq_empty)
940                                 *rxq_empty = 1;
941                         break;
942                 }
943                 rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
944                 nb_rx += n;
945                 if (rx_count + nb_rx > max_rx)
946                         break;
947         }
948
949         if (buf->count > 0)
950                 rxa_flush_event_buffer(rx_adapter);
951
952         return nb_rx;
953 }
954
955 static inline void
956 rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
957                 void *data)
958 {
959         uint16_t port_id;
960         uint16_t queue;
961         int err;
962         union queue_data qd;
963         struct eth_device_info *dev_info;
964         struct eth_rx_queue_info *queue_info;
965         int *intr_enabled;
966
967         qd.ptr = data;
968         port_id = qd.port;
969         queue = qd.queue;
970
971         dev_info = &rx_adapter->eth_devices[port_id];
972         queue_info = &dev_info->rx_queue[queue];
973         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
974         if (rxa_shared_intr(dev_info, queue))
975                 intr_enabled = &dev_info->shared_intr_enabled;
976         else
977                 intr_enabled = &queue_info->intr_enabled;
978
979         if (*intr_enabled) {
980                 *intr_enabled = 0;
981                 err = rte_ring_enqueue(rx_adapter->intr_ring, data);
982                 /* Entry should always be available.
983                  * The ring size equals the maximum number of interrupt
984                  * vectors supported (an interrupt vector is shared in
985                  * case of shared interrupts)
986                  */
987                 if (err)
988                         RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
989                                 " to ring: %s", strerror(-err));
990                 else
991                         rte_eth_dev_rx_intr_disable(port_id, queue);
992         }
993         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
994 }
995
996 static int
997 rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
998                         uint32_t num_intr_vec)
999 {
1000         if (rx_adapter->num_intr_vec + num_intr_vec >
1001                                 RTE_EVENT_ETH_INTR_RING_SIZE) {
1002                 RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
1003                 " %d needed %d limit %d", rx_adapter->num_intr_vec,
1004                 num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
1005                 return -ENOSPC;
1006         }
1007
1008         return 0;
1009 }
1010
1011 /* Delete entries for (dev, queue) from the interrupt ring */
1012 static void
1013 rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
1014                         struct eth_device_info *dev_info,
1015                         uint16_t rx_queue_id)
1016 {
1017         int i, n;
1018         union queue_data qd;
1019
1020         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1021
1022         n = rte_ring_count(rx_adapter->intr_ring);
1023         for (i = 0; i < n; i++) {
1024                 rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1025                 if (!rxa_shared_intr(dev_info, rx_queue_id)) {
1026                         if (qd.port == dev_info->dev->data->port_id &&
1027                                 qd.queue == rx_queue_id)
1028                                 continue;
1029                 } else {
1030                         if (qd.port == dev_info->dev->data->port_id)
1031                                 continue;
1032                 }
1033                 rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
1034         }
1035
1036         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1037 }
1038
1039 /* pthread callback handling interrupt mode receive queues
1040  * After receiving an Rx interrupt, it enqueues the port id and queue id of the
1041  * interrupting queue to the adapter's ring buffer for interrupt events.
1042  * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
1043  * the adapter service function.
1044  */
1045 static void *
1046 rxa_intr_thread(void *arg)
1047 {
1048         struct rte_event_eth_rx_adapter *rx_adapter = arg;
1049         struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
1050         int n, i;
1051
1052         while (1) {
1053                 n = rte_epoll_wait(rx_adapter->epd, epoll_events,
1054                                 RTE_EVENT_ETH_INTR_RING_SIZE, -1);
1055                 if (unlikely(n < 0))
1056                         RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
1057                                         n);
1058                 for (i = 0; i < n; i++) {
1059                         rxa_intr_ring_enqueue(rx_adapter,
1060                                         epoll_events[i].epdata.data);
1061                 }
1062         }
1063
1064         return NULL;
1065 }
1066
1067 /* Dequeue <port, q> from interrupt ring and enqueue received
1068  * mbufs to eventdev
1069  */
1070 static inline uint32_t
1071 rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
1072 {
1073         uint32_t n;
1074         uint32_t nb_rx = 0;
1075         int rxq_empty;
1076         struct rte_eth_event_enqueue_buffer *buf;
1077         rte_spinlock_t *ring_lock;
1078         uint8_t max_done = 0;
1079
1080         if (rx_adapter->num_rx_intr == 0)
1081                 return 0;
1082
1083         if (rte_ring_count(rx_adapter->intr_ring) == 0
1084                 && !rx_adapter->qd_valid)
1085                 return 0;
1086
1087         buf = &rx_adapter->event_enqueue_buffer;
1088         ring_lock = &rx_adapter->intr_ring_lock;
1089
1090         if (buf->count >= BATCH_SIZE)
1091                 rxa_flush_event_buffer(rx_adapter);
1092
1093         while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
1094                 struct eth_device_info *dev_info;
1095                 uint16_t port;
1096                 uint16_t queue;
1097                 union queue_data qd  = rx_adapter->qd;
1098                 int err;
1099
1100                 if (!rx_adapter->qd_valid) {
1101                         struct eth_rx_queue_info *queue_info;
1102
1103                         rte_spinlock_lock(ring_lock);
1104                         err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1105                         if (err) {
1106                                 rte_spinlock_unlock(ring_lock);
1107                                 break;
1108                         }
1109
1110                         port = qd.port;
1111                         queue = qd.queue;
1112                         rx_adapter->qd = qd;
1113                         rx_adapter->qd_valid = 1;
1114                         dev_info = &rx_adapter->eth_devices[port];
1115                         if (rxa_shared_intr(dev_info, queue))
1116                                 dev_info->shared_intr_enabled = 1;
1117                         else {
1118                                 queue_info = &dev_info->rx_queue[queue];
1119                                 queue_info->intr_enabled = 1;
1120                         }
1121                         rte_eth_dev_rx_intr_enable(port, queue);
1122                         rte_spinlock_unlock(ring_lock);
1123                 } else {
1124                         port = qd.port;
1125                         queue = qd.queue;
1126
1127                         dev_info = &rx_adapter->eth_devices[port];
1128                 }
1129
1130                 if (rxa_shared_intr(dev_info, queue)) {
1131                         uint16_t i;
1132                         uint16_t nb_queues;
1133
1134                         nb_queues = dev_info->dev->data->nb_rx_queues;
1135                         n = 0;
1136                         for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1137                                 uint8_t enq_buffer_full;
1138
1139                                 if (!rxa_intr_queue(dev_info, i))
1140                                         continue;
1141                                 n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1142                                         rx_adapter->max_nb_rx,
1143                                         &rxq_empty);
1144                                 nb_rx += n;
1145
1146                                 enq_buffer_full = !rxq_empty && n == 0;
1147                                 max_done = nb_rx > rx_adapter->max_nb_rx;
1148
1149                                 if (enq_buffer_full || max_done) {
1150                                         dev_info->next_q_idx = i;
1151                                         goto done;
1152                                 }
1153                         }
1154
1155                         rx_adapter->qd_valid = 0;
1156
1157                         /* Reinitialize for next interrupt */
1158                         dev_info->next_q_idx = dev_info->multi_intr_cap ?
1159                                                 RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1160                                                 0;
1161                 } else {
1162                         n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1163                                 rx_adapter->max_nb_rx,
1164                                 &rxq_empty);
1165                         rx_adapter->qd_valid = !rxq_empty;
1166                         nb_rx += n;
1167                         if (nb_rx > rx_adapter->max_nb_rx)
1168                                 break;
1169                 }
1170         }
1171
1172 done:
1173         rx_adapter->stats.rx_intr_packets += nb_rx;
1174         return nb_rx;
1175 }
1176
1177 /*
1178  * Polls receive queues added to the event adapter and enqueues received
1179  * packets to the event device.
1180  *
1181  * The receive code enqueues initially to a temporary buffer, the
1182  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1183  *
1184  * If there isn't space available in the temporary buffer, packets from the
1185  * Rx queue aren't dequeued from the eth device, this back pressures the
1186  * eth device, in virtual device environments this back pressure is relayed to
1187  * the hypervisor's switching layer where adjustments can be made to deal with
1188  * it.
1189  */
1190 static inline uint32_t
1191 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
1192 {
1193         uint32_t num_queue;
1194         uint32_t nb_rx = 0;
1195         struct rte_eth_event_enqueue_buffer *buf;
1196         uint32_t wrr_pos;
1197         uint32_t max_nb_rx;
1198
1199         wrr_pos = rx_adapter->wrr_pos;
1200         max_nb_rx = rx_adapter->max_nb_rx;
1201         buf = &rx_adapter->event_enqueue_buffer;
1202
1203         /* Iterate through a WRR sequence */
1204         for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1205                 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1206                 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1207                 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1208
1209                 /* Don't do a batch dequeue from the rx queue if there isn't
1210                  * enough space in the enqueue buffer.
1211                  */
1212                 if (buf->count >= BATCH_SIZE)
1213                         rxa_flush_event_buffer(rx_adapter);
1214                 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
1215                         rx_adapter->wrr_pos = wrr_pos;
1216                         return nb_rx;
1217                 }
1218
1219                 nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1220                                 NULL);
1221                 if (nb_rx > max_nb_rx) {
1222                         rx_adapter->wrr_pos =
1223                                     (wrr_pos + 1) % rx_adapter->wrr_len;
1224                         break;
1225                 }
1226
1227                 if (++wrr_pos == rx_adapter->wrr_len)
1228                         wrr_pos = 0;
1229         }
1230         return nb_rx;
1231 }
1232
1233 static void
1234 rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
1235 {
1236         struct rte_event_eth_rx_adapter *rx_adapter = arg;
1237         struct rte_eth_event_enqueue_buffer *buf =
1238                 &rx_adapter->event_enqueue_buffer;
1239         struct rte_event *ev;
1240
1241         if (buf->count)
1242                 rxa_flush_event_buffer(rx_adapter);
1243
1244         if (vec->vector_ev->nb_elem == 0)
1245                 return;
1246         ev = &buf->events[buf->count];
1247
1248         /* Event ready. */
1249         ev->event = vec->event;
1250         ev->vec = vec->vector_ev;
1251         buf->count++;
1252
1253         vec->vector_ev = NULL;
1254         vec->ts = 0;
1255 }
1256
1257 static int
1258 rxa_service_func(void *args)
1259 {
1260         struct rte_event_eth_rx_adapter *rx_adapter = args;
1261         struct rte_event_eth_rx_adapter_stats *stats;
1262
1263         if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1264                 return 0;
1265         if (!rx_adapter->rxa_started) {
1266                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1267                 return 0;
1268         }
1269
1270         if (rx_adapter->ena_vector) {
1271                 if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
1272                     rx_adapter->vector_tmo_ticks) {
1273                         struct eth_rx_vector_data *vec;
1274
1275                         TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1276                                 uint64_t elapsed_time = rte_rdtsc() - vec->ts;
1277
1278                                 if (elapsed_time >= vec->vector_timeout_ticks) {
1279                                         rxa_vector_expire(vec, rx_adapter);
1280                                         TAILQ_REMOVE(&rx_adapter->vector_list,
1281                                                      vec, next);
1282                                 }
1283                         }
1284                         rx_adapter->prev_expiry_ts = rte_rdtsc();
1285                 }
1286         }
1287
1288         stats = &rx_adapter->stats;
1289         stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
1290         stats->rx_packets += rxa_poll(rx_adapter);
1291         rte_spinlock_unlock(&rx_adapter->rx_lock);
1292         return 0;
1293 }
1294
1295 static int
1296 rte_event_eth_rx_adapter_init(void)
1297 {
1298         const char *name = "rte_event_eth_rx_adapter_array";
1299         const struct rte_memzone *mz;
1300         unsigned int sz;
1301
1302         sz = sizeof(*event_eth_rx_adapter) *
1303             RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1304         sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1305
1306         mz = rte_memzone_lookup(name);
1307         if (mz == NULL) {
1308                 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1309                                                  RTE_CACHE_LINE_SIZE);
1310                 if (mz == NULL) {
1311                         RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1312                                         PRId32, rte_errno);
1313                         return -rte_errno;
1314                 }
1315         }
1316
1317         event_eth_rx_adapter = mz->addr;
1318         return 0;
1319 }
1320
1321 static inline struct rte_event_eth_rx_adapter *
1322 rxa_id_to_adapter(uint8_t id)
1323 {
1324         return event_eth_rx_adapter ?
1325                 event_eth_rx_adapter[id] : NULL;
1326 }
1327
1328 static int
1329 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1330                 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1331 {
1332         int ret;
1333         struct rte_eventdev *dev;
1334         struct rte_event_dev_config dev_conf;
1335         int started;
1336         uint8_t port_id;
1337         struct rte_event_port_conf *port_conf = arg;
1338         struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1339
1340         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1341         dev_conf = dev->data->dev_conf;
1342
1343         started = dev->data->dev_started;
1344         if (started)
1345                 rte_event_dev_stop(dev_id);
1346         port_id = dev_conf.nb_event_ports;
1347         dev_conf.nb_event_ports += 1;
1348         ret = rte_event_dev_configure(dev_id, &dev_conf);
1349         if (ret) {
1350                 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1351                                                 dev_id);
1352                 if (started) {
1353                         if (rte_event_dev_start(dev_id))
1354                                 return -EIO;
1355                 }
1356                 return ret;
1357         }
1358
1359         ret = rte_event_port_setup(dev_id, port_id, port_conf);
1360         if (ret) {
1361                 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1362                                         port_id);
1363                 return ret;
1364         }
1365
1366         conf->event_port_id = port_id;
1367         conf->max_nb_rx = 128;
1368         if (started)
1369                 ret = rte_event_dev_start(dev_id);
1370         rx_adapter->default_cb_arg = 1;
1371         return ret;
1372 }
1373
1374 static int
1375 rxa_epoll_create1(void)
1376 {
1377 #if defined(LINUX)
1378         int fd;
1379         fd = epoll_create1(EPOLL_CLOEXEC);
1380         return fd < 0 ? -errno : fd;
1381 #elif defined(BSD)
1382         return -ENOTSUP;
1383 #endif
1384 }
1385
1386 static int
1387 rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
1388 {
1389         if (rx_adapter->epd != INIT_FD)
1390                 return 0;
1391
1392         rx_adapter->epd = rxa_epoll_create1();
1393         if (rx_adapter->epd < 0) {
1394                 int err = rx_adapter->epd;
1395                 rx_adapter->epd = INIT_FD;
1396                 RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1397                 return err;
1398         }
1399
1400         return 0;
1401 }
1402
1403 static int
1404 rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1405 {
1406         int err;
1407         char thread_name[RTE_MAX_THREAD_NAME_LEN];
1408
1409         if (rx_adapter->intr_ring)
1410                 return 0;
1411
1412         rx_adapter->intr_ring = rte_ring_create("intr_ring",
1413                                         RTE_EVENT_ETH_INTR_RING_SIZE,
1414                                         rte_socket_id(), 0);
1415         if (!rx_adapter->intr_ring)
1416                 return -ENOMEM;
1417
1418         rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1419                                         RTE_EVENT_ETH_INTR_RING_SIZE *
1420                                         sizeof(struct rte_epoll_event),
1421                                         RTE_CACHE_LINE_SIZE,
1422                                         rx_adapter->socket_id);
1423         if (!rx_adapter->epoll_events) {
1424                 err = -ENOMEM;
1425                 goto error;
1426         }
1427
1428         rte_spinlock_init(&rx_adapter->intr_ring_lock);
1429
1430         snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1431                         "rx-intr-thread-%d", rx_adapter->id);
1432
1433         err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1434                                 NULL, rxa_intr_thread, rx_adapter);
1435         if (!err) {
1436                 rte_thread_setname(rx_adapter->rx_intr_thread, thread_name);
1437                 return 0;
1438         }
1439
1440         RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1441 error:
1442         rte_ring_free(rx_adapter->intr_ring);
1443         rx_adapter->intr_ring = NULL;
1444         rx_adapter->epoll_events = NULL;
1445         return err;
1446 }
1447
1448 static int
1449 rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1450 {
1451         int err;
1452
1453         err = pthread_cancel(rx_adapter->rx_intr_thread);
1454         if (err)
1455                 RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1456                                 err);
1457
1458         err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1459         if (err)
1460                 RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1461
1462         rte_free(rx_adapter->epoll_events);
1463         rte_ring_free(rx_adapter->intr_ring);
1464         rx_adapter->intr_ring = NULL;
1465         rx_adapter->epoll_events = NULL;
1466         return 0;
1467 }
1468
1469 static int
1470 rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
1471 {
1472         int ret;
1473
1474         if (rx_adapter->num_rx_intr == 0)
1475                 return 0;
1476
1477         ret = rxa_destroy_intr_thread(rx_adapter);
1478         if (ret)
1479                 return ret;
1480
1481         close(rx_adapter->epd);
1482         rx_adapter->epd = INIT_FD;
1483
1484         return ret;
1485 }
1486
1487 static int
1488 rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1489         struct eth_device_info *dev_info,
1490         uint16_t rx_queue_id)
1491 {
1492         int err;
1493         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1494         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1495
1496         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1497         if (err) {
1498                 RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1499                         rx_queue_id);
1500                 return err;
1501         }
1502
1503         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1504                                         rx_adapter->epd,
1505                                         RTE_INTR_EVENT_DEL,
1506                                         0);
1507         if (err)
1508                 RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1509
1510         if (sintr)
1511                 dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1512         else
1513                 dev_info->shared_intr_enabled = 0;
1514         return err;
1515 }
1516
1517 static int
1518 rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1519                 struct eth_device_info *dev_info,
1520                 int rx_queue_id)
1521 {
1522         int err;
1523         int i;
1524         int s;
1525
1526         if (dev_info->nb_rx_intr == 0)
1527                 return 0;
1528
1529         err = 0;
1530         if (rx_queue_id == -1) {
1531                 s = dev_info->nb_shared_intr;
1532                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1533                         int sintr;
1534                         uint16_t q;
1535
1536                         q = dev_info->intr_queue[i];
1537                         sintr = rxa_shared_intr(dev_info, q);
1538                         s -= sintr;
1539
1540                         if (!sintr || s == 0) {
1541
1542                                 err = rxa_disable_intr(rx_adapter, dev_info,
1543                                                 q);
1544                                 if (err)
1545                                         return err;
1546                                 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1547                                                         q);
1548                         }
1549                 }
1550         } else {
1551                 if (!rxa_intr_queue(dev_info, rx_queue_id))
1552                         return 0;
1553                 if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1554                                 dev_info->nb_shared_intr == 1) {
1555                         err = rxa_disable_intr(rx_adapter, dev_info,
1556                                         rx_queue_id);
1557                         if (err)
1558                                 return err;
1559                         rxa_intr_ring_del_entries(rx_adapter, dev_info,
1560                                                 rx_queue_id);
1561                 }
1562
1563                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1564                         if (dev_info->intr_queue[i] == rx_queue_id) {
1565                                 for (; i < dev_info->nb_rx_intr - 1; i++)
1566                                         dev_info->intr_queue[i] =
1567                                                 dev_info->intr_queue[i + 1];
1568                                 break;
1569                         }
1570                 }
1571         }
1572
1573         return err;
1574 }
1575
1576 static int
1577 rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1578         struct eth_device_info *dev_info,
1579         uint16_t rx_queue_id)
1580 {
1581         int err, err1;
1582         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1583         union queue_data qd;
1584         int init_fd;
1585         uint16_t *intr_queue;
1586         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1587
1588         if (rxa_intr_queue(dev_info, rx_queue_id))
1589                 return 0;
1590
1591         intr_queue = dev_info->intr_queue;
1592         if (dev_info->intr_queue == NULL) {
1593                 size_t len =
1594                         dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1595                 dev_info->intr_queue =
1596                         rte_zmalloc_socket(
1597                                 rx_adapter->mem_name,
1598                                 len,
1599                                 0,
1600                                 rx_adapter->socket_id);
1601                 if (dev_info->intr_queue == NULL)
1602                         return -ENOMEM;
1603         }
1604
1605         init_fd = rx_adapter->epd;
1606         err = rxa_init_epd(rx_adapter);
1607         if (err)
1608                 goto err_free_queue;
1609
1610         qd.port = eth_dev_id;
1611         qd.queue = rx_queue_id;
1612
1613         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1614                                         rx_adapter->epd,
1615                                         RTE_INTR_EVENT_ADD,
1616                                         qd.ptr);
1617         if (err) {
1618                 RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1619                         " Rx Queue %u err %d", rx_queue_id, err);
1620                 goto err_del_fd;
1621         }
1622
1623         err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1624         if (err) {
1625                 RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1626                                 " Rx Queue %u err %d", rx_queue_id, err);
1627
1628                 goto err_del_event;
1629         }
1630
1631         err = rxa_create_intr_thread(rx_adapter);
1632         if (!err)  {
1633                 if (sintr)
1634                         dev_info->shared_intr_enabled = 1;
1635                 else
1636                         dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1637                 return 0;
1638         }
1639
1640
1641         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1642         if (err)
1643                 RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1644                                 " Rx Queue %u err %d", rx_queue_id, err);
1645 err_del_event:
1646         err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1647                                         rx_adapter->epd,
1648                                         RTE_INTR_EVENT_DEL,
1649                                         0);
1650         if (err1) {
1651                 RTE_EDEV_LOG_ERR("Could not delete event for"
1652                                 " Rx Queue %u err %d", rx_queue_id, err1);
1653         }
1654 err_del_fd:
1655         if (init_fd == INIT_FD) {
1656                 close(rx_adapter->epd);
1657                 rx_adapter->epd = -1;
1658         }
1659 err_free_queue:
1660         if (intr_queue == NULL)
1661                 rte_free(dev_info->intr_queue);
1662
1663         return err;
1664 }
1665
1666 static int
1667 rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1668         struct eth_device_info *dev_info,
1669         int rx_queue_id)
1670
1671 {
1672         int i, j, err;
1673         int si = -1;
1674         int shared_done = (dev_info->nb_shared_intr > 0);
1675
1676         if (rx_queue_id != -1) {
1677                 if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1678                         return 0;
1679                 return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1680         }
1681
1682         err = 0;
1683         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1684
1685                 if (rxa_shared_intr(dev_info, i) && shared_done)
1686                         continue;
1687
1688                 err = rxa_config_intr(rx_adapter, dev_info, i);
1689
1690                 shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1691                 if (shared_done) {
1692                         si = i;
1693                         dev_info->shared_intr_enabled = 1;
1694                 }
1695                 if (err)
1696                         break;
1697         }
1698
1699         if (err == 0)
1700                 return 0;
1701
1702         shared_done = (dev_info->nb_shared_intr > 0);
1703         for (j = 0; j < i; j++) {
1704                 if (rxa_intr_queue(dev_info, j))
1705                         continue;
1706                 if (rxa_shared_intr(dev_info, j) && si != j)
1707                         continue;
1708                 err = rxa_disable_intr(rx_adapter, dev_info, j);
1709                 if (err)
1710                         break;
1711
1712         }
1713
1714         return err;
1715 }
1716
1717
1718 static int
1719 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
1720 {
1721         int ret;
1722         struct rte_service_spec service;
1723         struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1724
1725         if (rx_adapter->service_inited)
1726                 return 0;
1727
1728         memset(&service, 0, sizeof(service));
1729         snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1730                 "rte_event_eth_rx_adapter_%d", id);
1731         service.socket_id = rx_adapter->socket_id;
1732         service.callback = rxa_service_func;
1733         service.callback_userdata = rx_adapter;
1734         /* Service function handles locking for queue add/del updates */
1735         service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1736         ret = rte_service_component_register(&service, &rx_adapter->service_id);
1737         if (ret) {
1738                 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1739                         service.name, ret);
1740                 return ret;
1741         }
1742
1743         ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1744                 &rx_adapter_conf, rx_adapter->conf_arg);
1745         if (ret) {
1746                 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1747                         ret);
1748                 goto err_done;
1749         }
1750         rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1751         rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1752         rx_adapter->service_inited = 1;
1753         rx_adapter->epd = INIT_FD;
1754         return 0;
1755
1756 err_done:
1757         rte_service_component_unregister(rx_adapter->service_id);
1758         return ret;
1759 }
1760
1761 static void
1762 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1763                 struct eth_device_info *dev_info,
1764                 int32_t rx_queue_id,
1765                 uint8_t add)
1766 {
1767         struct eth_rx_queue_info *queue_info;
1768         int enabled;
1769         uint16_t i;
1770
1771         if (dev_info->rx_queue == NULL)
1772                 return;
1773
1774         if (rx_queue_id == -1) {
1775                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1776                         rxa_update_queue(rx_adapter, dev_info, i, add);
1777         } else {
1778                 queue_info = &dev_info->rx_queue[rx_queue_id];
1779                 enabled = queue_info->queue_enabled;
1780                 if (add) {
1781                         rx_adapter->nb_queues += !enabled;
1782                         dev_info->nb_dev_queues += !enabled;
1783                 } else {
1784                         rx_adapter->nb_queues -= enabled;
1785                         dev_info->nb_dev_queues -= enabled;
1786                 }
1787                 queue_info->queue_enabled = !!add;
1788         }
1789 }
1790
1791 static void
1792 rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t vector_count,
1793                     uint64_t vector_ns, struct rte_mempool *mp, uint32_t qid,
1794                     uint16_t port_id)
1795 {
1796 #define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
1797         struct eth_rx_vector_data *vector_data;
1798         uint32_t flow_id;
1799
1800         vector_data = &queue_info->vector_data;
1801         vector_data->max_vector_count = vector_count;
1802         vector_data->port = port_id;
1803         vector_data->queue = qid;
1804         vector_data->vector_pool = mp;
1805         vector_data->vector_timeout_ticks =
1806                 NSEC2TICK(vector_ns, rte_get_timer_hz());
1807         vector_data->ts = 0;
1808         flow_id = queue_info->event & 0xFFFFF;
1809         flow_id =
1810                 flow_id == 0 ? (qid & 0xFFF) | (port_id & 0xFF) << 12 : flow_id;
1811         vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
1812 }
1813
1814 static void
1815 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
1816         struct eth_device_info *dev_info,
1817         int32_t rx_queue_id)
1818 {
1819         struct eth_rx_vector_data *vec;
1820         int pollq;
1821         int intrq;
1822         int sintrq;
1823
1824
1825         if (rx_adapter->nb_queues == 0)
1826                 return;
1827
1828         if (rx_queue_id == -1) {
1829                 uint16_t nb_rx_queues;
1830                 uint16_t i;
1831
1832                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1833                 for (i = 0; i < nb_rx_queues; i++)
1834                         rxa_sw_del(rx_adapter, dev_info, i);
1835                 return;
1836         }
1837
1838         /* Push all the partial event vectors to event device. */
1839         TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1840                 if (vec->queue != rx_queue_id)
1841                         continue;
1842                 rxa_vector_expire(vec, rx_adapter);
1843                 TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
1844         }
1845
1846         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1847         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1848         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1849         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1850         rx_adapter->num_rx_polled -= pollq;
1851         dev_info->nb_rx_poll -= pollq;
1852         rx_adapter->num_rx_intr -= intrq;
1853         dev_info->nb_rx_intr -= intrq;
1854         dev_info->nb_shared_intr -= intrq && sintrq;
1855 }
1856
1857 static void
1858 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1859         struct eth_device_info *dev_info,
1860         int32_t rx_queue_id,
1861         const struct rte_event_eth_rx_adapter_queue_conf *conf)
1862 {
1863         struct eth_rx_queue_info *queue_info;
1864         const struct rte_event *ev = &conf->ev;
1865         int pollq;
1866         int intrq;
1867         int sintrq;
1868         struct rte_event *qi_ev;
1869
1870         if (rx_queue_id == -1) {
1871                 uint16_t nb_rx_queues;
1872                 uint16_t i;
1873
1874                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1875                 for (i = 0; i < nb_rx_queues; i++)
1876                         rxa_add_queue(rx_adapter, dev_info, i, conf);
1877                 return;
1878         }
1879
1880         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1881         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1882         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1883
1884         queue_info = &dev_info->rx_queue[rx_queue_id];
1885         queue_info->wt = conf->servicing_weight;
1886
1887         qi_ev = (struct rte_event *)&queue_info->event;
1888         qi_ev->event = ev->event;
1889         qi_ev->op = RTE_EVENT_OP_NEW;
1890         qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
1891         qi_ev->sub_event_type = 0;
1892
1893         if (conf->rx_queue_flags &
1894                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
1895                 queue_info->flow_id_mask = ~0;
1896         } else
1897                 qi_ev->flow_id = 0;
1898
1899         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
1900         if (rxa_polled_queue(dev_info, rx_queue_id)) {
1901                 rx_adapter->num_rx_polled += !pollq;
1902                 dev_info->nb_rx_poll += !pollq;
1903                 rx_adapter->num_rx_intr -= intrq;
1904                 dev_info->nb_rx_intr -= intrq;
1905                 dev_info->nb_shared_intr -= intrq && sintrq;
1906         }
1907
1908         if (rxa_intr_queue(dev_info, rx_queue_id)) {
1909                 rx_adapter->num_rx_polled -= pollq;
1910                 dev_info->nb_rx_poll -= pollq;
1911                 rx_adapter->num_rx_intr += !intrq;
1912                 dev_info->nb_rx_intr += !intrq;
1913                 dev_info->nb_shared_intr += !intrq && sintrq;
1914                 if (dev_info->nb_shared_intr == 1) {
1915                         if (dev_info->multi_intr_cap)
1916                                 dev_info->next_q_idx =
1917                                         RTE_MAX_RXTX_INTR_VEC_ID - 1;
1918                         else
1919                                 dev_info->next_q_idx = 0;
1920                 }
1921         }
1922 }
1923
1924 static void
1925 rxa_sw_event_vector_configure(
1926         struct rte_event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
1927         int rx_queue_id,
1928         const struct rte_event_eth_rx_adapter_event_vector_config *config)
1929 {
1930         struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
1931         struct eth_rx_queue_info *queue_info;
1932         struct rte_event *qi_ev;
1933
1934         if (rx_queue_id == -1) {
1935                 uint16_t nb_rx_queues;
1936                 uint16_t i;
1937
1938                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1939                 for (i = 0; i < nb_rx_queues; i++)
1940                         rxa_sw_event_vector_configure(rx_adapter, eth_dev_id, i,
1941                                                       config);
1942                 return;
1943         }
1944
1945         queue_info = &dev_info->rx_queue[rx_queue_id];
1946         qi_ev = (struct rte_event *)&queue_info->event;
1947         queue_info->ena_vector = 1;
1948         qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
1949         rxa_set_vector_data(queue_info, config->vector_sz,
1950                             config->vector_timeout_ns, config->vector_mp,
1951                             rx_queue_id, dev_info->dev->data->port_id);
1952         rx_adapter->ena_vector = 1;
1953         rx_adapter->vector_tmo_ticks =
1954                 rx_adapter->vector_tmo_ticks ?
1955                               RTE_MIN(config->vector_timeout_ns >> 1,
1956                                 rx_adapter->vector_tmo_ticks) :
1957                               config->vector_timeout_ns >> 1;
1958 }
1959
1960 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
1961                 uint16_t eth_dev_id,
1962                 int rx_queue_id,
1963                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
1964 {
1965         struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
1966         struct rte_event_eth_rx_adapter_queue_conf temp_conf;
1967         int ret;
1968         struct eth_rx_poll_entry *rx_poll;
1969         struct eth_rx_queue_info *rx_queue;
1970         uint32_t *rx_wrr;
1971         uint16_t nb_rx_queues;
1972         uint32_t nb_rx_poll, nb_wrr;
1973         uint32_t nb_rx_intr;
1974         int num_intr_vec;
1975         uint16_t wt;
1976
1977         if (queue_conf->servicing_weight == 0) {
1978                 struct rte_eth_dev_data *data = dev_info->dev->data;
1979
1980                 temp_conf = *queue_conf;
1981                 if (!data->dev_conf.intr_conf.rxq) {
1982                         /* If Rx interrupts are disabled set wt = 1 */
1983                         temp_conf.servicing_weight = 1;
1984                 }
1985                 queue_conf = &temp_conf;
1986         }
1987
1988         nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1989         rx_queue = dev_info->rx_queue;
1990         wt = queue_conf->servicing_weight;
1991
1992         if (dev_info->rx_queue == NULL) {
1993                 dev_info->rx_queue =
1994                     rte_zmalloc_socket(rx_adapter->mem_name,
1995                                        nb_rx_queues *
1996                                        sizeof(struct eth_rx_queue_info), 0,
1997                                        rx_adapter->socket_id);
1998                 if (dev_info->rx_queue == NULL)
1999                         return -ENOMEM;
2000         }
2001         rx_wrr = NULL;
2002         rx_poll = NULL;
2003
2004         rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
2005                         queue_conf->servicing_weight,
2006                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2007
2008         if (dev_info->dev->intr_handle)
2009                 dev_info->multi_intr_cap =
2010                         rte_intr_cap_multiple(dev_info->dev->intr_handle);
2011
2012         ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2013                                 &rx_poll, &rx_wrr);
2014         if (ret)
2015                 goto err_free_rxqueue;
2016
2017         if (wt == 0) {
2018                 num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
2019
2020                 ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
2021                 if (ret)
2022                         goto err_free_rxqueue;
2023
2024                 ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
2025                 if (ret)
2026                         goto err_free_rxqueue;
2027         } else {
2028
2029                 num_intr_vec = 0;
2030                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2031                         num_intr_vec = rxa_nb_intr_vect(dev_info,
2032                                                 rx_queue_id, 0);
2033                         /* interrupt based queues are being converted to
2034                          * poll mode queues, delete the interrupt configuration
2035                          * for those.
2036                          */
2037                         ret = rxa_del_intr_queue(rx_adapter,
2038                                                 dev_info, rx_queue_id);
2039                         if (ret)
2040                                 goto err_free_rxqueue;
2041                 }
2042         }
2043
2044         if (nb_rx_intr == 0) {
2045                 ret = rxa_free_intr_resources(rx_adapter);
2046                 if (ret)
2047                         goto err_free_rxqueue;
2048         }
2049
2050         if (wt == 0) {
2051                 uint16_t i;
2052
2053                 if (rx_queue_id  == -1) {
2054                         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
2055                                 dev_info->intr_queue[i] = i;
2056                 } else {
2057                         if (!rxa_intr_queue(dev_info, rx_queue_id))
2058                                 dev_info->intr_queue[nb_rx_intr - 1] =
2059                                         rx_queue_id;
2060                 }
2061         }
2062
2063
2064
2065         rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
2066         rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2067
2068         rte_free(rx_adapter->eth_rx_poll);
2069         rte_free(rx_adapter->wrr_sched);
2070
2071         rx_adapter->eth_rx_poll = rx_poll;
2072         rx_adapter->wrr_sched = rx_wrr;
2073         rx_adapter->wrr_len = nb_wrr;
2074         rx_adapter->num_intr_vec += num_intr_vec;
2075         return 0;
2076
2077 err_free_rxqueue:
2078         if (rx_queue == NULL) {
2079                 rte_free(dev_info->rx_queue);
2080                 dev_info->rx_queue = NULL;
2081         }
2082
2083         rte_free(rx_poll);
2084         rte_free(rx_wrr);
2085
2086         return 0;
2087 }
2088
2089 static int
2090 rxa_ctrl(uint8_t id, int start)
2091 {
2092         struct rte_event_eth_rx_adapter *rx_adapter;
2093         struct rte_eventdev *dev;
2094         struct eth_device_info *dev_info;
2095         uint32_t i;
2096         int use_service = 0;
2097         int stop = !start;
2098
2099         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2100         rx_adapter = rxa_id_to_adapter(id);
2101         if (rx_adapter == NULL)
2102                 return -EINVAL;
2103
2104         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2105
2106         RTE_ETH_FOREACH_DEV(i) {
2107                 dev_info = &rx_adapter->eth_devices[i];
2108                 /* if start  check for num dev queues */
2109                 if (start && !dev_info->nb_dev_queues)
2110                         continue;
2111                 /* if stop check if dev has been started */
2112                 if (stop && !dev_info->dev_rx_started)
2113                         continue;
2114                 use_service |= !dev_info->internal_event_port;
2115                 dev_info->dev_rx_started = start;
2116                 if (dev_info->internal_event_port == 0)
2117                         continue;
2118                 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
2119                                                 &rte_eth_devices[i]) :
2120                         (*dev->dev_ops->eth_rx_adapter_stop)(dev,
2121                                                 &rte_eth_devices[i]);
2122         }
2123
2124         if (use_service) {
2125                 rte_spinlock_lock(&rx_adapter->rx_lock);
2126                 rx_adapter->rxa_started = start;
2127                 rte_service_runstate_set(rx_adapter->service_id, start);
2128                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2129         }
2130
2131         return 0;
2132 }
2133
2134 int
2135 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
2136                                 rte_event_eth_rx_adapter_conf_cb conf_cb,
2137                                 void *conf_arg)
2138 {
2139         struct rte_event_eth_rx_adapter *rx_adapter;
2140         int ret;
2141         int socket_id;
2142         uint16_t i;
2143         char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
2144         const uint8_t default_rss_key[] = {
2145                 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
2146                 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
2147                 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
2148                 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
2149                 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
2150         };
2151
2152         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2153         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2154         if (conf_cb == NULL)
2155                 return -EINVAL;
2156
2157         if (event_eth_rx_adapter == NULL) {
2158                 ret = rte_event_eth_rx_adapter_init();
2159                 if (ret)
2160                         return ret;
2161         }
2162
2163         rx_adapter = rxa_id_to_adapter(id);
2164         if (rx_adapter != NULL) {
2165                 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
2166                 return -EEXIST;
2167         }
2168
2169         socket_id = rte_event_dev_socket_id(dev_id);
2170         snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
2171                 "rte_event_eth_rx_adapter_%d",
2172                 id);
2173
2174         rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
2175                         RTE_CACHE_LINE_SIZE, socket_id);
2176         if (rx_adapter == NULL) {
2177                 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
2178                 return -ENOMEM;
2179         }
2180
2181         rx_adapter->eventdev_id = dev_id;
2182         rx_adapter->socket_id = socket_id;
2183         rx_adapter->conf_cb = conf_cb;
2184         rx_adapter->conf_arg = conf_arg;
2185         rx_adapter->id = id;
2186         TAILQ_INIT(&rx_adapter->vector_list);
2187         strcpy(rx_adapter->mem_name, mem_name);
2188         rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
2189                                         RTE_MAX_ETHPORTS *
2190                                         sizeof(struct eth_device_info), 0,
2191                                         socket_id);
2192         rte_convert_rss_key((const uint32_t *)default_rss_key,
2193                         (uint32_t *)rx_adapter->rss_key_be,
2194                             RTE_DIM(default_rss_key));
2195
2196         if (rx_adapter->eth_devices == NULL) {
2197                 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
2198                 rte_free(rx_adapter);
2199                 return -ENOMEM;
2200         }
2201         rte_spinlock_init(&rx_adapter->rx_lock);
2202         for (i = 0; i < RTE_MAX_ETHPORTS; i++)
2203                 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
2204
2205         event_eth_rx_adapter[id] = rx_adapter;
2206         if (conf_cb == rxa_default_conf_cb)
2207                 rx_adapter->default_cb_arg = 1;
2208         rte_eventdev_trace_eth_rx_adapter_create(id, dev_id, conf_cb,
2209                 conf_arg);
2210         return 0;
2211 }
2212
2213 int
2214 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2215                 struct rte_event_port_conf *port_config)
2216 {
2217         struct rte_event_port_conf *pc;
2218         int ret;
2219
2220         if (port_config == NULL)
2221                 return -EINVAL;
2222         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2223
2224         pc = rte_malloc(NULL, sizeof(*pc), 0);
2225         if (pc == NULL)
2226                 return -ENOMEM;
2227         *pc = *port_config;
2228         ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2229                                         rxa_default_conf_cb,
2230                                         pc);
2231         if (ret)
2232                 rte_free(pc);
2233         return ret;
2234 }
2235
2236 int
2237 rte_event_eth_rx_adapter_free(uint8_t id)
2238 {
2239         struct rte_event_eth_rx_adapter *rx_adapter;
2240
2241         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2242
2243         rx_adapter = rxa_id_to_adapter(id);
2244         if (rx_adapter == NULL)
2245                 return -EINVAL;
2246
2247         if (rx_adapter->nb_queues) {
2248                 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2249                                 rx_adapter->nb_queues);
2250                 return -EBUSY;
2251         }
2252
2253         if (rx_adapter->default_cb_arg)
2254                 rte_free(rx_adapter->conf_arg);
2255         rte_free(rx_adapter->eth_devices);
2256         rte_free(rx_adapter);
2257         event_eth_rx_adapter[id] = NULL;
2258
2259         rte_eventdev_trace_eth_rx_adapter_free(id);
2260         return 0;
2261 }
2262
2263 int
2264 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2265                 uint16_t eth_dev_id,
2266                 int32_t rx_queue_id,
2267                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2268 {
2269         int ret;
2270         uint32_t cap;
2271         struct rte_event_eth_rx_adapter *rx_adapter;
2272         struct rte_eventdev *dev;
2273         struct eth_device_info *dev_info;
2274
2275         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2276         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2277
2278         rx_adapter = rxa_id_to_adapter(id);
2279         if ((rx_adapter == NULL) || (queue_conf == NULL))
2280                 return -EINVAL;
2281
2282         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2283         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2284                                                 eth_dev_id,
2285                                                 &cap);
2286         if (ret) {
2287                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2288                         "eth port %" PRIu16, id, eth_dev_id);
2289                 return ret;
2290         }
2291
2292         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2293                 && (queue_conf->rx_queue_flags &
2294                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2295                 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2296                                 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2297                                 eth_dev_id, id);
2298                 return -EINVAL;
2299         }
2300
2301         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0 &&
2302             (queue_conf->rx_queue_flags &
2303              RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR)) {
2304                 RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
2305                                  " eth port: %" PRIu16 " adapter id: %" PRIu8,
2306                                  eth_dev_id, id);
2307                 return -EINVAL;
2308         }
2309
2310         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2311                 (rx_queue_id != -1)) {
2312                 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2313                         "event queue, eth port: %" PRIu16 " adapter id: %"
2314                         PRIu8, eth_dev_id, id);
2315                 return -EINVAL;
2316         }
2317
2318         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2319                         rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2320                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2321                          (uint16_t)rx_queue_id);
2322                 return -EINVAL;
2323         }
2324
2325         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2326
2327         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2328                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2329                                         -ENOTSUP);
2330                 if (dev_info->rx_queue == NULL) {
2331                         dev_info->rx_queue =
2332                             rte_zmalloc_socket(rx_adapter->mem_name,
2333                                         dev_info->dev->data->nb_rx_queues *
2334                                         sizeof(struct eth_rx_queue_info), 0,
2335                                         rx_adapter->socket_id);
2336                         if (dev_info->rx_queue == NULL)
2337                                 return -ENOMEM;
2338                 }
2339
2340                 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2341                                 &rte_eth_devices[eth_dev_id],
2342                                 rx_queue_id, queue_conf);
2343                 if (ret == 0) {
2344                         dev_info->internal_event_port = 1;
2345                         rxa_update_queue(rx_adapter,
2346                                         &rx_adapter->eth_devices[eth_dev_id],
2347                                         rx_queue_id,
2348                                         1);
2349                 }
2350         } else {
2351                 rte_spinlock_lock(&rx_adapter->rx_lock);
2352                 dev_info->internal_event_port = 0;
2353                 ret = rxa_init_service(rx_adapter, id);
2354                 if (ret == 0) {
2355                         uint32_t service_id = rx_adapter->service_id;
2356                         ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2357                                         queue_conf);
2358                         rte_service_component_runstate_set(service_id,
2359                                 rxa_sw_adapter_queue_count(rx_adapter));
2360                 }
2361                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2362         }
2363
2364         rte_eventdev_trace_eth_rx_adapter_queue_add(id, eth_dev_id,
2365                 rx_queue_id, queue_conf, ret);
2366         if (ret)
2367                 return ret;
2368
2369         return 0;
2370 }
2371
2372 static int
2373 rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
2374 {
2375         limits->max_sz = MAX_VECTOR_SIZE;
2376         limits->min_sz = MIN_VECTOR_SIZE;
2377         limits->max_timeout_ns = MAX_VECTOR_NS;
2378         limits->min_timeout_ns = MIN_VECTOR_NS;
2379
2380         return 0;
2381 }
2382
2383 int
2384 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2385                                 int32_t rx_queue_id)
2386 {
2387         int ret = 0;
2388         struct rte_eventdev *dev;
2389         struct rte_event_eth_rx_adapter *rx_adapter;
2390         struct eth_device_info *dev_info;
2391         uint32_t cap;
2392         uint32_t nb_rx_poll = 0;
2393         uint32_t nb_wrr = 0;
2394         uint32_t nb_rx_intr;
2395         struct eth_rx_poll_entry *rx_poll = NULL;
2396         uint32_t *rx_wrr = NULL;
2397         int num_intr_vec;
2398
2399         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2400         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2401
2402         rx_adapter = rxa_id_to_adapter(id);
2403         if (rx_adapter == NULL)
2404                 return -EINVAL;
2405
2406         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2407         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2408                                                 eth_dev_id,
2409                                                 &cap);
2410         if (ret)
2411                 return ret;
2412
2413         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2414                 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2415                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2416                          (uint16_t)rx_queue_id);
2417                 return -EINVAL;
2418         }
2419
2420         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2421
2422         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2423                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2424                                  -ENOTSUP);
2425                 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2426                                                 &rte_eth_devices[eth_dev_id],
2427                                                 rx_queue_id);
2428                 if (ret == 0) {
2429                         rxa_update_queue(rx_adapter,
2430                                         &rx_adapter->eth_devices[eth_dev_id],
2431                                         rx_queue_id,
2432                                         0);
2433                         if (dev_info->nb_dev_queues == 0) {
2434                                 rte_free(dev_info->rx_queue);
2435                                 dev_info->rx_queue = NULL;
2436                         }
2437                 }
2438         } else {
2439                 rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2440                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2441
2442                 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2443                         &rx_poll, &rx_wrr);
2444                 if (ret)
2445                         return ret;
2446
2447                 rte_spinlock_lock(&rx_adapter->rx_lock);
2448
2449                 num_intr_vec = 0;
2450                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2451
2452                         num_intr_vec = rxa_nb_intr_vect(dev_info,
2453                                                 rx_queue_id, 0);
2454                         ret = rxa_del_intr_queue(rx_adapter, dev_info,
2455                                         rx_queue_id);
2456                         if (ret)
2457                                 goto unlock_ret;
2458                 }
2459
2460                 if (nb_rx_intr == 0) {
2461                         ret = rxa_free_intr_resources(rx_adapter);
2462                         if (ret)
2463                                 goto unlock_ret;
2464                 }
2465
2466                 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2467                 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2468
2469                 rte_free(rx_adapter->eth_rx_poll);
2470                 rte_free(rx_adapter->wrr_sched);
2471
2472                 if (nb_rx_intr == 0) {
2473                         rte_free(dev_info->intr_queue);
2474                         dev_info->intr_queue = NULL;
2475                 }
2476
2477                 rx_adapter->eth_rx_poll = rx_poll;
2478                 rx_adapter->wrr_sched = rx_wrr;
2479                 rx_adapter->wrr_len = nb_wrr;
2480                 rx_adapter->num_intr_vec += num_intr_vec;
2481
2482                 if (dev_info->nb_dev_queues == 0) {
2483                         rte_free(dev_info->rx_queue);
2484                         dev_info->rx_queue = NULL;
2485                 }
2486 unlock_ret:
2487                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2488                 if (ret) {
2489                         rte_free(rx_poll);
2490                         rte_free(rx_wrr);
2491                         return ret;
2492                 }
2493
2494                 rte_service_component_runstate_set(rx_adapter->service_id,
2495                                 rxa_sw_adapter_queue_count(rx_adapter));
2496         }
2497
2498         rte_eventdev_trace_eth_rx_adapter_queue_del(id, eth_dev_id,
2499                 rx_queue_id, ret);
2500         return ret;
2501 }
2502
2503 int
2504 rte_event_eth_rx_adapter_queue_event_vector_config(
2505         uint8_t id, uint16_t eth_dev_id, int32_t rx_queue_id,
2506         struct rte_event_eth_rx_adapter_event_vector_config *config)
2507 {
2508         struct rte_event_eth_rx_adapter_vector_limits limits;
2509         struct rte_event_eth_rx_adapter *rx_adapter;
2510         struct rte_eventdev *dev;
2511         uint32_t cap;
2512         int ret;
2513
2514         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2515         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2516
2517         rx_adapter = rxa_id_to_adapter(id);
2518         if ((rx_adapter == NULL) || (config == NULL))
2519                 return -EINVAL;
2520
2521         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2522         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2523                                                 eth_dev_id, &cap);
2524         if (ret) {
2525                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2526                                  "eth port %" PRIu16,
2527                                  id, eth_dev_id);
2528                 return ret;
2529         }
2530
2531         if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR)) {
2532                 RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
2533                                  " eth port: %" PRIu16 " adapter id: %" PRIu8,
2534                                  eth_dev_id, id);
2535                 return -EINVAL;
2536         }
2537
2538         ret = rte_event_eth_rx_adapter_vector_limits_get(
2539                 rx_adapter->eventdev_id, eth_dev_id, &limits);
2540         if (ret) {
2541                 RTE_EDEV_LOG_ERR("Failed to get vector limits edev %" PRIu8
2542                                  "eth port %" PRIu16,
2543                                  rx_adapter->eventdev_id, eth_dev_id);
2544                 return ret;
2545         }
2546
2547         if (config->vector_sz < limits.min_sz ||
2548             config->vector_sz > limits.max_sz ||
2549             config->vector_timeout_ns < limits.min_timeout_ns ||
2550             config->vector_timeout_ns > limits.max_timeout_ns ||
2551             config->vector_mp == NULL) {
2552                 RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2553                                  " eth port: %" PRIu16 " adapter id: %" PRIu8,
2554                                  eth_dev_id, id);
2555                 return -EINVAL;
2556         }
2557         if (config->vector_mp->elt_size <
2558             (sizeof(struct rte_event_vector) +
2559              (sizeof(uintptr_t) * config->vector_sz))) {
2560                 RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2561                                  " eth port: %" PRIu16 " adapter id: %" PRIu8,
2562                                  eth_dev_id, id);
2563                 return -EINVAL;
2564         }
2565
2566         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2567                 RTE_FUNC_PTR_OR_ERR_RET(
2568                         *dev->dev_ops->eth_rx_adapter_event_vector_config,
2569                         -ENOTSUP);
2570                 ret = dev->dev_ops->eth_rx_adapter_event_vector_config(
2571                         dev, &rte_eth_devices[eth_dev_id], rx_queue_id, config);
2572         } else {
2573                 rxa_sw_event_vector_configure(rx_adapter, eth_dev_id,
2574                                               rx_queue_id, config);
2575         }
2576
2577         return ret;
2578 }
2579
2580 int
2581 rte_event_eth_rx_adapter_vector_limits_get(
2582         uint8_t dev_id, uint16_t eth_port_id,
2583         struct rte_event_eth_rx_adapter_vector_limits *limits)
2584 {
2585         struct rte_eventdev *dev;
2586         uint32_t cap;
2587         int ret;
2588
2589         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2590         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_port_id, -EINVAL);
2591
2592         if (limits == NULL)
2593                 return -EINVAL;
2594
2595         dev = &rte_eventdevs[dev_id];
2596
2597         ret = rte_event_eth_rx_adapter_caps_get(dev_id, eth_port_id, &cap);
2598         if (ret) {
2599                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2600                                  "eth port %" PRIu16,
2601                                  dev_id, eth_port_id);
2602                 return ret;
2603         }
2604
2605         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2606                 RTE_FUNC_PTR_OR_ERR_RET(
2607                         *dev->dev_ops->eth_rx_adapter_vector_limits_get,
2608                         -ENOTSUP);
2609                 ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
2610                         dev, &rte_eth_devices[eth_port_id], limits);
2611         } else {
2612                 ret = rxa_sw_vector_limits(limits);
2613         }
2614
2615         return ret;
2616 }
2617
2618 int
2619 rte_event_eth_rx_adapter_start(uint8_t id)
2620 {
2621         rte_eventdev_trace_eth_rx_adapter_start(id);
2622         return rxa_ctrl(id, 1);
2623 }
2624
2625 int
2626 rte_event_eth_rx_adapter_stop(uint8_t id)
2627 {
2628         rte_eventdev_trace_eth_rx_adapter_stop(id);
2629         return rxa_ctrl(id, 0);
2630 }
2631
2632 int
2633 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2634                                struct rte_event_eth_rx_adapter_stats *stats)
2635 {
2636         struct rte_event_eth_rx_adapter *rx_adapter;
2637         struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2638         struct rte_event_eth_rx_adapter_stats dev_stats;
2639         struct rte_eventdev *dev;
2640         struct eth_device_info *dev_info;
2641         uint32_t i;
2642         int ret;
2643
2644         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2645
2646         rx_adapter = rxa_id_to_adapter(id);
2647         if (rx_adapter  == NULL || stats == NULL)
2648                 return -EINVAL;
2649
2650         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2651         memset(stats, 0, sizeof(*stats));
2652         RTE_ETH_FOREACH_DEV(i) {
2653                 dev_info = &rx_adapter->eth_devices[i];
2654                 if (dev_info->internal_event_port == 0 ||
2655                         dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2656                         continue;
2657                 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2658                                                 &rte_eth_devices[i],
2659                                                 &dev_stats);
2660                 if (ret)
2661                         continue;
2662                 dev_stats_sum.rx_packets += dev_stats.rx_packets;
2663                 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2664         }
2665
2666         if (rx_adapter->service_inited)
2667                 *stats = rx_adapter->stats;
2668
2669         stats->rx_packets += dev_stats_sum.rx_packets;
2670         stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2671         return 0;
2672 }
2673
2674 int
2675 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
2676 {
2677         struct rte_event_eth_rx_adapter *rx_adapter;
2678         struct rte_eventdev *dev;
2679         struct eth_device_info *dev_info;
2680         uint32_t i;
2681
2682         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2683
2684         rx_adapter = rxa_id_to_adapter(id);
2685         if (rx_adapter == NULL)
2686                 return -EINVAL;
2687
2688         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2689         RTE_ETH_FOREACH_DEV(i) {
2690                 dev_info = &rx_adapter->eth_devices[i];
2691                 if (dev_info->internal_event_port == 0 ||
2692                         dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
2693                         continue;
2694                 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
2695                                                         &rte_eth_devices[i]);
2696         }
2697
2698         memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
2699         return 0;
2700 }
2701
2702 int
2703 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
2704 {
2705         struct rte_event_eth_rx_adapter *rx_adapter;
2706
2707         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2708
2709         rx_adapter = rxa_id_to_adapter(id);
2710         if (rx_adapter == NULL || service_id == NULL)
2711                 return -EINVAL;
2712
2713         if (rx_adapter->service_inited)
2714                 *service_id = rx_adapter->service_id;
2715
2716         return rx_adapter->service_inited ? 0 : -ESRCH;
2717 }
2718
2719 int
2720 rte_event_eth_rx_adapter_cb_register(uint8_t id,
2721                                         uint16_t eth_dev_id,
2722                                         rte_event_eth_rx_adapter_cb_fn cb_fn,
2723                                         void *cb_arg)
2724 {
2725         struct rte_event_eth_rx_adapter *rx_adapter;
2726         struct eth_device_info *dev_info;
2727         uint32_t cap;
2728         int ret;
2729
2730         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2731         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2732
2733         rx_adapter = rxa_id_to_adapter(id);
2734         if (rx_adapter == NULL)
2735                 return -EINVAL;
2736
2737         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2738         if (dev_info->rx_queue == NULL)
2739                 return -EINVAL;
2740
2741         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2742                                                 eth_dev_id,
2743                                                 &cap);
2744         if (ret) {
2745                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2746                         "eth port %" PRIu16, id, eth_dev_id);
2747                 return ret;
2748         }
2749
2750         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2751                 RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
2752                                 PRIu16, eth_dev_id);
2753                 return -EINVAL;
2754         }
2755
2756         rte_spinlock_lock(&rx_adapter->rx_lock);
2757         dev_info->cb_fn = cb_fn;
2758         dev_info->cb_arg = cb_arg;
2759         rte_spinlock_unlock(&rx_adapter->rx_lock);
2760
2761         return 0;
2762 }