13dfb284015e3c8c2d9278b2c634f7cef2ed90f3
[dpdk.git] / lib / eventdev / rte_event_eth_rx_adapter.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #if defined(LINUX)
6 #include <sys/epoll.h>
7 #endif
8 #include <unistd.h>
9
10 #include <rte_cycles.h>
11 #include <rte_common.h>
12 #include <rte_dev.h>
13 #include <rte_errno.h>
14 #include <rte_ethdev.h>
15 #include <rte_log.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
20
21 #include "rte_eventdev.h"
22 #include "eventdev_pmd.h"
23 #include "rte_eventdev_trace.h"
24 #include "rte_event_eth_rx_adapter.h"
25
26 #define BATCH_SIZE              32
27 #define BLOCK_CNT_THRESHOLD     10
28 #define ETH_EVENT_BUFFER_SIZE   (4*BATCH_SIZE)
29 #define MAX_VECTOR_SIZE         1024
30 #define MIN_VECTOR_SIZE         4
31 #define MAX_VECTOR_NS           1E9
32 #define MIN_VECTOR_NS           1E5
33
34 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
35 #define ETH_RX_ADAPTER_MEM_NAME_LEN     32
36
37 #define RSS_KEY_SIZE    40
38 /* value written to intr thread pipe to signal thread exit */
39 #define ETH_BRIDGE_INTR_THREAD_EXIT     1
40 /* Sentinel value to detect initialized file handle */
41 #define INIT_FD         -1
42
43 /*
44  * Used to store port and queue ID of interrupting Rx queue
45  */
46 union queue_data {
47         RTE_STD_C11
48         void *ptr;
49         struct {
50                 uint16_t port;
51                 uint16_t queue;
52         };
53 };
54
55 /*
56  * There is an instance of this struct per polled Rx queue added to the
57  * adapter
58  */
59 struct eth_rx_poll_entry {
60         /* Eth port to poll */
61         uint16_t eth_dev_id;
62         /* Eth rx queue to poll */
63         uint16_t eth_rx_qid;
64 };
65
66 struct eth_rx_vector_data {
67         TAILQ_ENTRY(eth_rx_vector_data) next;
68         uint16_t port;
69         uint16_t queue;
70         uint16_t max_vector_count;
71         uint64_t event;
72         uint64_t ts;
73         uint64_t vector_timeout_ticks;
74         struct rte_mempool *vector_pool;
75         struct rte_event_vector *vector_ev;
76 } __rte_cache_aligned;
77
78 TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
79
80 /* Instance per adapter */
81 struct rte_eth_event_enqueue_buffer {
82         /* Count of events in this buffer */
83         uint16_t count;
84         /* Array of events in this buffer */
85         struct rte_event events[ETH_EVENT_BUFFER_SIZE];
86 };
87
88 struct rte_event_eth_rx_adapter {
89         /* RSS key */
90         uint8_t rss_key_be[RSS_KEY_SIZE];
91         /* Event device identifier */
92         uint8_t eventdev_id;
93         /* Per ethernet device structure */
94         struct eth_device_info *eth_devices;
95         /* Event port identifier */
96         uint8_t event_port_id;
97         /* Lock to serialize config updates with service function */
98         rte_spinlock_t rx_lock;
99         /* Max mbufs processed in any service function invocation */
100         uint32_t max_nb_rx;
101         /* Receive queues that need to be polled */
102         struct eth_rx_poll_entry *eth_rx_poll;
103         /* Size of the eth_rx_poll array */
104         uint16_t num_rx_polled;
105         /* Weighted round robin schedule */
106         uint32_t *wrr_sched;
107         /* wrr_sched[] size */
108         uint32_t wrr_len;
109         /* Next entry in wrr[] to begin polling */
110         uint32_t wrr_pos;
111         /* Event burst buffer */
112         struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
113         /* Vector enable flag */
114         uint8_t ena_vector;
115         /* Timestamp of previous vector expiry list traversal */
116         uint64_t prev_expiry_ts;
117         /* Minimum ticks to wait before traversing expiry list */
118         uint64_t vector_tmo_ticks;
119         /* vector list */
120         struct eth_rx_vector_data_list vector_list;
121         /* Per adapter stats */
122         struct rte_event_eth_rx_adapter_stats stats;
123         /* Block count, counts up to BLOCK_CNT_THRESHOLD */
124         uint16_t enq_block_count;
125         /* Block start ts */
126         uint64_t rx_enq_block_start_ts;
127         /* epoll fd used to wait for Rx interrupts */
128         int epd;
129         /* Num of interrupt driven interrupt queues */
130         uint32_t num_rx_intr;
131         /* Used to send <dev id, queue id> of interrupting Rx queues from
132          * the interrupt thread to the Rx thread
133          */
134         struct rte_ring *intr_ring;
135         /* Rx Queue data (dev id, queue id) for the last non-empty
136          * queue polled
137          */
138         union queue_data qd;
139         /* queue_data is valid */
140         int qd_valid;
141         /* Interrupt ring lock, synchronizes Rx thread
142          * and interrupt thread
143          */
144         rte_spinlock_t intr_ring_lock;
145         /* event array passed to rte_poll_wait */
146         struct rte_epoll_event *epoll_events;
147         /* Count of interrupt vectors in use */
148         uint32_t num_intr_vec;
149         /* Thread blocked on Rx interrupts */
150         pthread_t rx_intr_thread;
151         /* Configuration callback for rte_service configuration */
152         rte_event_eth_rx_adapter_conf_cb conf_cb;
153         /* Configuration callback argument */
154         void *conf_arg;
155         /* Set if  default_cb is being used */
156         int default_cb_arg;
157         /* Service initialization state */
158         uint8_t service_inited;
159         /* Total count of Rx queues in adapter */
160         uint32_t nb_queues;
161         /* Memory allocation name */
162         char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
163         /* Socket identifier cached from eventdev */
164         int socket_id;
165         /* Per adapter EAL service */
166         uint32_t service_id;
167         /* Adapter started flag */
168         uint8_t rxa_started;
169         /* Adapter ID */
170         uint8_t id;
171 } __rte_cache_aligned;
172
173 /* Per eth device */
174 struct eth_device_info {
175         struct rte_eth_dev *dev;
176         struct eth_rx_queue_info *rx_queue;
177         /* Rx callback */
178         rte_event_eth_rx_adapter_cb_fn cb_fn;
179         /* Rx callback argument */
180         void *cb_arg;
181         /* Set if ethdev->eventdev packet transfer uses a
182          * hardware mechanism
183          */
184         uint8_t internal_event_port;
185         /* Set if the adapter is processing rx queues for
186          * this eth device and packet processing has been
187          * started, allows for the code to know if the PMD
188          * rx_adapter_stop callback needs to be invoked
189          */
190         uint8_t dev_rx_started;
191         /* Number of queues added for this device */
192         uint16_t nb_dev_queues;
193         /* Number of poll based queues
194          * If nb_rx_poll > 0, the start callback will
195          * be invoked if not already invoked
196          */
197         uint16_t nb_rx_poll;
198         /* Number of interrupt based queues
199          * If nb_rx_intr > 0, the start callback will
200          * be invoked if not already invoked.
201          */
202         uint16_t nb_rx_intr;
203         /* Number of queues that use the shared interrupt */
204         uint16_t nb_shared_intr;
205         /* sum(wrr(q)) for all queues within the device
206          * useful when deleting all device queues
207          */
208         uint32_t wrr_len;
209         /* Intr based queue index to start polling from, this is used
210          * if the number of shared interrupts is non-zero
211          */
212         uint16_t next_q_idx;
213         /* Intr based queue indices */
214         uint16_t *intr_queue;
215         /* device generates per Rx queue interrupt for queue index
216          * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
217          */
218         int multi_intr_cap;
219         /* shared interrupt enabled */
220         int shared_intr_enabled;
221 };
222
223 /* Per Rx queue */
224 struct eth_rx_queue_info {
225         int queue_enabled;      /* True if added */
226         int intr_enabled;
227         uint8_t ena_vector;
228         uint16_t wt;            /* Polling weight */
229         uint32_t flow_id_mask;  /* Set to ~0 if app provides flow id else 0 */
230         uint64_t event;
231         struct eth_rx_vector_data vector_data;
232 };
233
234 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
235
236 static inline int
237 rxa_validate_id(uint8_t id)
238 {
239         return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
240 }
241
242 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
243         if (!rxa_validate_id(id)) { \
244                 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
245                 return retval; \
246         } \
247 } while (0)
248
249 static inline int
250 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
251 {
252         return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
253 }
254
255 /* Greatest common divisor */
256 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
257 {
258         uint16_t r = a % b;
259
260         return r ? rxa_gcd_u16(b, r) : b;
261 }
262
263 /* Returns the next queue in the polling sequence
264  *
265  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
266  */
267 static int
268 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
269          unsigned int n, int *cw,
270          struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
271          uint16_t gcd, int prev)
272 {
273         int i = prev;
274         uint16_t w;
275
276         while (1) {
277                 uint16_t q;
278                 uint16_t d;
279
280                 i = (i + 1) % n;
281                 if (i == 0) {
282                         *cw = *cw - gcd;
283                         if (*cw <= 0)
284                                 *cw = max_wt;
285                 }
286
287                 q = eth_rx_poll[i].eth_rx_qid;
288                 d = eth_rx_poll[i].eth_dev_id;
289                 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
290
291                 if ((int)w >= *cw)
292                         return i;
293         }
294 }
295
296 static inline int
297 rxa_shared_intr(struct eth_device_info *dev_info,
298         int rx_queue_id)
299 {
300         int multi_intr_cap;
301
302         if (dev_info->dev->intr_handle == NULL)
303                 return 0;
304
305         multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
306         return !multi_intr_cap ||
307                 rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
308 }
309
310 static inline int
311 rxa_intr_queue(struct eth_device_info *dev_info,
312         int rx_queue_id)
313 {
314         struct eth_rx_queue_info *queue_info;
315
316         queue_info = &dev_info->rx_queue[rx_queue_id];
317         return dev_info->rx_queue &&
318                 !dev_info->internal_event_port &&
319                 queue_info->queue_enabled && queue_info->wt == 0;
320 }
321
322 static inline int
323 rxa_polled_queue(struct eth_device_info *dev_info,
324         int rx_queue_id)
325 {
326         struct eth_rx_queue_info *queue_info;
327
328         queue_info = &dev_info->rx_queue[rx_queue_id];
329         return !dev_info->internal_event_port &&
330                 dev_info->rx_queue &&
331                 queue_info->queue_enabled && queue_info->wt != 0;
332 }
333
334 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
335 static int
336 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
337 {
338         uint16_t i;
339         int n, s;
340         uint16_t nbq;
341
342         nbq = dev_info->dev->data->nb_rx_queues;
343         n = 0; /* non shared count */
344         s = 0; /* shared count */
345
346         if (rx_queue_id == -1) {
347                 for (i = 0; i < nbq; i++) {
348                         if (!rxa_shared_intr(dev_info, i))
349                                 n += add ? !rxa_intr_queue(dev_info, i) :
350                                         rxa_intr_queue(dev_info, i);
351                         else
352                                 s += add ? !rxa_intr_queue(dev_info, i) :
353                                         rxa_intr_queue(dev_info, i);
354                 }
355
356                 if (s > 0) {
357                         if ((add && dev_info->nb_shared_intr == 0) ||
358                                 (!add && dev_info->nb_shared_intr))
359                                 n += 1;
360                 }
361         } else {
362                 if (!rxa_shared_intr(dev_info, rx_queue_id))
363                         n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
364                                 rxa_intr_queue(dev_info, rx_queue_id);
365                 else
366                         n = add ? !dev_info->nb_shared_intr :
367                                 dev_info->nb_shared_intr == 1;
368         }
369
370         return add ? n : -n;
371 }
372
373 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
374  */
375 static void
376 rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
377                         struct eth_device_info *dev_info,
378                         int rx_queue_id,
379                         uint32_t *nb_rx_intr)
380 {
381         uint32_t intr_diff;
382
383         if (rx_queue_id == -1)
384                 intr_diff = dev_info->nb_rx_intr;
385         else
386                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
387
388         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
389 }
390
391 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
392  * interrupt queues could currently be poll mode Rx queues
393  */
394 static void
395 rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
396                         struct eth_device_info *dev_info,
397                         int rx_queue_id,
398                         uint32_t *nb_rx_poll,
399                         uint32_t *nb_rx_intr,
400                         uint32_t *nb_wrr)
401 {
402         uint32_t intr_diff;
403         uint32_t poll_diff;
404         uint32_t wrr_len_diff;
405
406         if (rx_queue_id == -1) {
407                 intr_diff = dev_info->dev->data->nb_rx_queues -
408                                                 dev_info->nb_rx_intr;
409                 poll_diff = dev_info->nb_rx_poll;
410                 wrr_len_diff = dev_info->wrr_len;
411         } else {
412                 intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
413                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
414                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
415                                         0;
416         }
417
418         *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
419         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
420         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
421 }
422
423 /* Calculate size of the eth_rx_poll and wrr_sched arrays
424  * after deleting poll mode rx queues
425  */
426 static void
427 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
428                         struct eth_device_info *dev_info,
429                         int rx_queue_id,
430                         uint32_t *nb_rx_poll,
431                         uint32_t *nb_wrr)
432 {
433         uint32_t poll_diff;
434         uint32_t wrr_len_diff;
435
436         if (rx_queue_id == -1) {
437                 poll_diff = dev_info->nb_rx_poll;
438                 wrr_len_diff = dev_info->wrr_len;
439         } else {
440                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
441                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
442                                         0;
443         }
444
445         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
446         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
447 }
448
449 /* Calculate nb_rx_* after adding poll mode rx queues
450  */
451 static void
452 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
453                         struct eth_device_info *dev_info,
454                         int rx_queue_id,
455                         uint16_t wt,
456                         uint32_t *nb_rx_poll,
457                         uint32_t *nb_rx_intr,
458                         uint32_t *nb_wrr)
459 {
460         uint32_t intr_diff;
461         uint32_t poll_diff;
462         uint32_t wrr_len_diff;
463
464         if (rx_queue_id == -1) {
465                 intr_diff = dev_info->nb_rx_intr;
466                 poll_diff = dev_info->dev->data->nb_rx_queues -
467                                                 dev_info->nb_rx_poll;
468                 wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
469                                 - dev_info->wrr_len;
470         } else {
471                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
472                 poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
473                 wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
474                                 wt - dev_info->rx_queue[rx_queue_id].wt :
475                                 wt;
476         }
477
478         *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
479         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
480         *nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
481 }
482
483 /* Calculate nb_rx_* after adding rx_queue_id */
484 static void
485 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
486                 struct eth_device_info *dev_info,
487                 int rx_queue_id,
488                 uint16_t wt,
489                 uint32_t *nb_rx_poll,
490                 uint32_t *nb_rx_intr,
491                 uint32_t *nb_wrr)
492 {
493         if (wt != 0)
494                 rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
495                                         wt, nb_rx_poll, nb_rx_intr, nb_wrr);
496         else
497                 rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
498                                         nb_rx_poll, nb_rx_intr, nb_wrr);
499 }
500
501 /* Calculate nb_rx_* after deleting rx_queue_id */
502 static void
503 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
504                 struct eth_device_info *dev_info,
505                 int rx_queue_id,
506                 uint32_t *nb_rx_poll,
507                 uint32_t *nb_rx_intr,
508                 uint32_t *nb_wrr)
509 {
510         rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
511                                 nb_wrr);
512         rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
513                                 nb_rx_intr);
514 }
515
516 /*
517  * Allocate the rx_poll array
518  */
519 static struct eth_rx_poll_entry *
520 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
521         uint32_t num_rx_polled)
522 {
523         size_t len;
524
525         len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
526                                                         RTE_CACHE_LINE_SIZE);
527         return  rte_zmalloc_socket(rx_adapter->mem_name,
528                                 len,
529                                 RTE_CACHE_LINE_SIZE,
530                                 rx_adapter->socket_id);
531 }
532
533 /*
534  * Allocate the WRR array
535  */
536 static uint32_t *
537 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
538 {
539         size_t len;
540
541         len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
542                         RTE_CACHE_LINE_SIZE);
543         return  rte_zmalloc_socket(rx_adapter->mem_name,
544                                 len,
545                                 RTE_CACHE_LINE_SIZE,
546                                 rx_adapter->socket_id);
547 }
548
549 static int
550 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
551                 uint32_t nb_poll,
552                 uint32_t nb_wrr,
553                 struct eth_rx_poll_entry **rx_poll,
554                 uint32_t **wrr_sched)
555 {
556
557         if (nb_poll == 0) {
558                 *rx_poll = NULL;
559                 *wrr_sched = NULL;
560                 return 0;
561         }
562
563         *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
564         if (*rx_poll == NULL) {
565                 *wrr_sched = NULL;
566                 return -ENOMEM;
567         }
568
569         *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
570         if (*wrr_sched == NULL) {
571                 rte_free(*rx_poll);
572                 return -ENOMEM;
573         }
574         return 0;
575 }
576
577 /* Precalculate WRR polling sequence for all queues in rx_adapter */
578 static void
579 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
580                 struct eth_rx_poll_entry *rx_poll,
581                 uint32_t *rx_wrr)
582 {
583         uint16_t d;
584         uint16_t q;
585         unsigned int i;
586         int prev = -1;
587         int cw = -1;
588
589         /* Initialize variables for calculation of wrr schedule */
590         uint16_t max_wrr_pos = 0;
591         unsigned int poll_q = 0;
592         uint16_t max_wt = 0;
593         uint16_t gcd = 0;
594
595         if (rx_poll == NULL)
596                 return;
597
598         /* Generate array of all queues to poll, the size of this
599          * array is poll_q
600          */
601         RTE_ETH_FOREACH_DEV(d) {
602                 uint16_t nb_rx_queues;
603                 struct eth_device_info *dev_info =
604                                 &rx_adapter->eth_devices[d];
605                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
606                 if (dev_info->rx_queue == NULL)
607                         continue;
608                 if (dev_info->internal_event_port)
609                         continue;
610                 dev_info->wrr_len = 0;
611                 for (q = 0; q < nb_rx_queues; q++) {
612                         struct eth_rx_queue_info *queue_info =
613                                 &dev_info->rx_queue[q];
614                         uint16_t wt;
615
616                         if (!rxa_polled_queue(dev_info, q))
617                                 continue;
618                         wt = queue_info->wt;
619                         rx_poll[poll_q].eth_dev_id = d;
620                         rx_poll[poll_q].eth_rx_qid = q;
621                         max_wrr_pos += wt;
622                         dev_info->wrr_len += wt;
623                         max_wt = RTE_MAX(max_wt, wt);
624                         gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
625                         poll_q++;
626                 }
627         }
628
629         /* Generate polling sequence based on weights */
630         prev = -1;
631         cw = -1;
632         for (i = 0; i < max_wrr_pos; i++) {
633                 rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
634                                      rx_poll, max_wt, gcd, prev);
635                 prev = rx_wrr[i];
636         }
637 }
638
639 static inline void
640 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
641         struct rte_ipv6_hdr **ipv6_hdr)
642 {
643         struct rte_ether_hdr *eth_hdr =
644                 rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
645         struct rte_vlan_hdr *vlan_hdr;
646
647         *ipv4_hdr = NULL;
648         *ipv6_hdr = NULL;
649
650         switch (eth_hdr->ether_type) {
651         case RTE_BE16(RTE_ETHER_TYPE_IPV4):
652                 *ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
653                 break;
654
655         case RTE_BE16(RTE_ETHER_TYPE_IPV6):
656                 *ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
657                 break;
658
659         case RTE_BE16(RTE_ETHER_TYPE_VLAN):
660                 vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
661                 switch (vlan_hdr->eth_proto) {
662                 case RTE_BE16(RTE_ETHER_TYPE_IPV4):
663                         *ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
664                         break;
665                 case RTE_BE16(RTE_ETHER_TYPE_IPV6):
666                         *ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
667                         break;
668                 default:
669                         break;
670                 }
671                 break;
672
673         default:
674                 break;
675         }
676 }
677
678 /* Calculate RSS hash for IPv4/6 */
679 static inline uint32_t
680 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
681 {
682         uint32_t input_len;
683         void *tuple;
684         struct rte_ipv4_tuple ipv4_tuple;
685         struct rte_ipv6_tuple ipv6_tuple;
686         struct rte_ipv4_hdr *ipv4_hdr;
687         struct rte_ipv6_hdr *ipv6_hdr;
688
689         rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
690
691         if (ipv4_hdr) {
692                 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
693                 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
694                 tuple = &ipv4_tuple;
695                 input_len = RTE_THASH_V4_L3_LEN;
696         } else if (ipv6_hdr) {
697                 rte_thash_load_v6_addrs(ipv6_hdr,
698                                         (union rte_thash_tuple *)&ipv6_tuple);
699                 tuple = &ipv6_tuple;
700                 input_len = RTE_THASH_V6_L3_LEN;
701         } else
702                 return 0;
703
704         return rte_softrss_be(tuple, input_len, rss_key_be);
705 }
706
707 static inline int
708 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
709 {
710         return !!rx_adapter->enq_block_count;
711 }
712
713 static inline void
714 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
715 {
716         if (rx_adapter->rx_enq_block_start_ts)
717                 return;
718
719         rx_adapter->enq_block_count++;
720         if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
721                 return;
722
723         rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
724 }
725
726 static inline void
727 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
728                     struct rte_event_eth_rx_adapter_stats *stats)
729 {
730         if (unlikely(!stats->rx_enq_start_ts))
731                 stats->rx_enq_start_ts = rte_get_tsc_cycles();
732
733         if (likely(!rxa_enq_blocked(rx_adapter)))
734                 return;
735
736         rx_adapter->enq_block_count = 0;
737         if (rx_adapter->rx_enq_block_start_ts) {
738                 stats->rx_enq_end_ts = rte_get_tsc_cycles();
739                 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
740                     rx_adapter->rx_enq_block_start_ts;
741                 rx_adapter->rx_enq_block_start_ts = 0;
742         }
743 }
744
745 /* Enqueue buffered events to event device */
746 static inline uint16_t
747 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
748 {
749         struct rte_eth_event_enqueue_buffer *buf =
750             &rx_adapter->event_enqueue_buffer;
751         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
752
753         if (!buf->count)
754                 return 0;
755
756         uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
757                                         rx_adapter->event_port_id,
758                                         buf->events,
759                                         buf->count);
760         if (n != buf->count) {
761                 memmove(buf->events,
762                         &buf->events[n],
763                         (buf->count - n) * sizeof(struct rte_event));
764                 stats->rx_enq_retry++;
765         }
766
767         n ? rxa_enq_block_end_ts(rx_adapter, stats) :
768                 rxa_enq_block_start_ts(rx_adapter);
769
770         buf->count -= n;
771         stats->rx_enq_count += n;
772
773         return n;
774 }
775
776 static inline void
777 rxa_init_vector(struct rte_event_eth_rx_adapter *rx_adapter,
778                 struct eth_rx_vector_data *vec)
779 {
780         vec->vector_ev->nb_elem = 0;
781         vec->vector_ev->port = vec->port;
782         vec->vector_ev->queue = vec->queue;
783         vec->vector_ev->attr_valid = true;
784         TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
785 }
786
787 static inline uint16_t
788 rxa_create_event_vector(struct rte_event_eth_rx_adapter *rx_adapter,
789                         struct eth_rx_queue_info *queue_info,
790                         struct rte_eth_event_enqueue_buffer *buf,
791                         struct rte_mbuf **mbufs, uint16_t num)
792 {
793         struct rte_event *ev = &buf->events[buf->count];
794         struct eth_rx_vector_data *vec;
795         uint16_t filled, space, sz;
796
797         filled = 0;
798         vec = &queue_info->vector_data;
799
800         if (vec->vector_ev == NULL) {
801                 if (rte_mempool_get(vec->vector_pool,
802                                     (void **)&vec->vector_ev) < 0) {
803                         rte_pktmbuf_free_bulk(mbufs, num);
804                         return 0;
805                 }
806                 rxa_init_vector(rx_adapter, vec);
807         }
808         while (num) {
809                 if (vec->vector_ev->nb_elem == vec->max_vector_count) {
810                         /* Event ready. */
811                         ev->event = vec->event;
812                         ev->vec = vec->vector_ev;
813                         ev++;
814                         filled++;
815                         vec->vector_ev = NULL;
816                         TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
817                         if (rte_mempool_get(vec->vector_pool,
818                                             (void **)&vec->vector_ev) < 0) {
819                                 rte_pktmbuf_free_bulk(mbufs, num);
820                                 return 0;
821                         }
822                         rxa_init_vector(rx_adapter, vec);
823                 }
824
825                 space = vec->max_vector_count - vec->vector_ev->nb_elem;
826                 sz = num > space ? space : num;
827                 memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
828                        sizeof(void *) * sz);
829                 vec->vector_ev->nb_elem += sz;
830                 num -= sz;
831                 mbufs += sz;
832                 vec->ts = rte_rdtsc();
833         }
834
835         if (vec->vector_ev->nb_elem == vec->max_vector_count) {
836                 ev->event = vec->event;
837                 ev->vec = vec->vector_ev;
838                 ev++;
839                 filled++;
840                 vec->vector_ev = NULL;
841                 TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
842         }
843
844         return filled;
845 }
846
847 static inline void
848 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
849                 uint16_t eth_dev_id,
850                 uint16_t rx_queue_id,
851                 struct rte_mbuf **mbufs,
852                 uint16_t num)
853 {
854         uint32_t i;
855         struct eth_device_info *dev_info =
856                                         &rx_adapter->eth_devices[eth_dev_id];
857         struct eth_rx_queue_info *eth_rx_queue_info =
858                                         &dev_info->rx_queue[rx_queue_id];
859         struct rte_eth_event_enqueue_buffer *buf =
860                                         &rx_adapter->event_enqueue_buffer;
861         struct rte_event *ev = &buf->events[buf->count];
862         uint64_t event = eth_rx_queue_info->event;
863         uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
864         struct rte_mbuf *m = mbufs[0];
865         uint32_t rss_mask;
866         uint32_t rss;
867         int do_rss;
868         uint16_t nb_cb;
869         uint16_t dropped;
870
871         if (!eth_rx_queue_info->ena_vector) {
872                 /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
873                 rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
874                 do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
875                 for (i = 0; i < num; i++) {
876                         m = mbufs[i];
877
878                         rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
879                                      : m->hash.rss;
880                         ev->event = event;
881                         ev->flow_id = (rss & ~flow_id_mask) |
882                                       (ev->flow_id & flow_id_mask);
883                         ev->mbuf = m;
884                         ev++;
885                 }
886         } else {
887                 num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
888                                               buf, mbufs, num);
889         }
890
891         if (num && dev_info->cb_fn) {
892
893                 dropped = 0;
894                 nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
895                                         ETH_EVENT_BUFFER_SIZE, buf->count,
896                                         &buf->events[buf->count], num,
897                                         dev_info->cb_arg, &dropped);
898                 if (unlikely(nb_cb > num))
899                         RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
900                                 nb_cb, num);
901                 else
902                         num = nb_cb;
903                 if (dropped)
904                         rx_adapter->stats.rx_dropped += dropped;
905         }
906
907         buf->count += num;
908 }
909
910 /* Enqueue packets from  <port, q>  to event buffer */
911 static inline uint32_t
912 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
913         uint16_t port_id,
914         uint16_t queue_id,
915         uint32_t rx_count,
916         uint32_t max_rx,
917         int *rxq_empty)
918 {
919         struct rte_mbuf *mbufs[BATCH_SIZE];
920         struct rte_eth_event_enqueue_buffer *buf =
921                                         &rx_adapter->event_enqueue_buffer;
922         struct rte_event_eth_rx_adapter_stats *stats =
923                                         &rx_adapter->stats;
924         uint16_t n;
925         uint32_t nb_rx = 0;
926
927         if (rxq_empty)
928                 *rxq_empty = 0;
929         /* Don't do a batch dequeue from the rx queue if there isn't
930          * enough space in the enqueue buffer.
931          */
932         while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
933                 if (buf->count >= BATCH_SIZE)
934                         rxa_flush_event_buffer(rx_adapter);
935
936                 stats->rx_poll_count++;
937                 n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
938                 if (unlikely(!n)) {
939                         if (rxq_empty)
940                                 *rxq_empty = 1;
941                         break;
942                 }
943                 rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
944                 nb_rx += n;
945                 if (rx_count + nb_rx > max_rx)
946                         break;
947         }
948
949         if (buf->count > 0)
950                 rxa_flush_event_buffer(rx_adapter);
951
952         return nb_rx;
953 }
954
955 static inline void
956 rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
957                 void *data)
958 {
959         uint16_t port_id;
960         uint16_t queue;
961         int err;
962         union queue_data qd;
963         struct eth_device_info *dev_info;
964         struct eth_rx_queue_info *queue_info;
965         int *intr_enabled;
966
967         qd.ptr = data;
968         port_id = qd.port;
969         queue = qd.queue;
970
971         dev_info = &rx_adapter->eth_devices[port_id];
972         queue_info = &dev_info->rx_queue[queue];
973         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
974         if (rxa_shared_intr(dev_info, queue))
975                 intr_enabled = &dev_info->shared_intr_enabled;
976         else
977                 intr_enabled = &queue_info->intr_enabled;
978
979         if (*intr_enabled) {
980                 *intr_enabled = 0;
981                 err = rte_ring_enqueue(rx_adapter->intr_ring, data);
982                 /* Entry should always be available.
983                  * The ring size equals the maximum number of interrupt
984                  * vectors supported (an interrupt vector is shared in
985                  * case of shared interrupts)
986                  */
987                 if (err)
988                         RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
989                                 " to ring: %s", strerror(-err));
990                 else
991                         rte_eth_dev_rx_intr_disable(port_id, queue);
992         }
993         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
994 }
995
996 static int
997 rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
998                         uint32_t num_intr_vec)
999 {
1000         if (rx_adapter->num_intr_vec + num_intr_vec >
1001                                 RTE_EVENT_ETH_INTR_RING_SIZE) {
1002                 RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
1003                 " %d needed %d limit %d", rx_adapter->num_intr_vec,
1004                 num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
1005                 return -ENOSPC;
1006         }
1007
1008         return 0;
1009 }
1010
1011 /* Delete entries for (dev, queue) from the interrupt ring */
1012 static void
1013 rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
1014                         struct eth_device_info *dev_info,
1015                         uint16_t rx_queue_id)
1016 {
1017         int i, n;
1018         union queue_data qd;
1019
1020         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1021
1022         n = rte_ring_count(rx_adapter->intr_ring);
1023         for (i = 0; i < n; i++) {
1024                 rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1025                 if (!rxa_shared_intr(dev_info, rx_queue_id)) {
1026                         if (qd.port == dev_info->dev->data->port_id &&
1027                                 qd.queue == rx_queue_id)
1028                                 continue;
1029                 } else {
1030                         if (qd.port == dev_info->dev->data->port_id)
1031                                 continue;
1032                 }
1033                 rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
1034         }
1035
1036         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1037 }
1038
1039 /* pthread callback handling interrupt mode receive queues
1040  * After receiving an Rx interrupt, it enqueues the port id and queue id of the
1041  * interrupting queue to the adapter's ring buffer for interrupt events.
1042  * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
1043  * the adapter service function.
1044  */
1045 static void *
1046 rxa_intr_thread(void *arg)
1047 {
1048         struct rte_event_eth_rx_adapter *rx_adapter = arg;
1049         struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
1050         int n, i;
1051
1052         while (1) {
1053                 n = rte_epoll_wait(rx_adapter->epd, epoll_events,
1054                                 RTE_EVENT_ETH_INTR_RING_SIZE, -1);
1055                 if (unlikely(n < 0))
1056                         RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
1057                                         n);
1058                 for (i = 0; i < n; i++) {
1059                         rxa_intr_ring_enqueue(rx_adapter,
1060                                         epoll_events[i].epdata.data);
1061                 }
1062         }
1063
1064         return NULL;
1065 }
1066
1067 /* Dequeue <port, q> from interrupt ring and enqueue received
1068  * mbufs to eventdev
1069  */
1070 static inline uint32_t
1071 rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
1072 {
1073         uint32_t n;
1074         uint32_t nb_rx = 0;
1075         int rxq_empty;
1076         struct rte_eth_event_enqueue_buffer *buf;
1077         rte_spinlock_t *ring_lock;
1078         uint8_t max_done = 0;
1079
1080         if (rx_adapter->num_rx_intr == 0)
1081                 return 0;
1082
1083         if (rte_ring_count(rx_adapter->intr_ring) == 0
1084                 && !rx_adapter->qd_valid)
1085                 return 0;
1086
1087         buf = &rx_adapter->event_enqueue_buffer;
1088         ring_lock = &rx_adapter->intr_ring_lock;
1089
1090         if (buf->count >= BATCH_SIZE)
1091                 rxa_flush_event_buffer(rx_adapter);
1092
1093         while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
1094                 struct eth_device_info *dev_info;
1095                 uint16_t port;
1096                 uint16_t queue;
1097                 union queue_data qd  = rx_adapter->qd;
1098                 int err;
1099
1100                 if (!rx_adapter->qd_valid) {
1101                         struct eth_rx_queue_info *queue_info;
1102
1103                         rte_spinlock_lock(ring_lock);
1104                         err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1105                         if (err) {
1106                                 rte_spinlock_unlock(ring_lock);
1107                                 break;
1108                         }
1109
1110                         port = qd.port;
1111                         queue = qd.queue;
1112                         rx_adapter->qd = qd;
1113                         rx_adapter->qd_valid = 1;
1114                         dev_info = &rx_adapter->eth_devices[port];
1115                         if (rxa_shared_intr(dev_info, queue))
1116                                 dev_info->shared_intr_enabled = 1;
1117                         else {
1118                                 queue_info = &dev_info->rx_queue[queue];
1119                                 queue_info->intr_enabled = 1;
1120                         }
1121                         rte_eth_dev_rx_intr_enable(port, queue);
1122                         rte_spinlock_unlock(ring_lock);
1123                 } else {
1124                         port = qd.port;
1125                         queue = qd.queue;
1126
1127                         dev_info = &rx_adapter->eth_devices[port];
1128                 }
1129
1130                 if (rxa_shared_intr(dev_info, queue)) {
1131                         uint16_t i;
1132                         uint16_t nb_queues;
1133
1134                         nb_queues = dev_info->dev->data->nb_rx_queues;
1135                         n = 0;
1136                         for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1137                                 uint8_t enq_buffer_full;
1138
1139                                 if (!rxa_intr_queue(dev_info, i))
1140                                         continue;
1141                                 n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1142                                         rx_adapter->max_nb_rx,
1143                                         &rxq_empty);
1144                                 nb_rx += n;
1145
1146                                 enq_buffer_full = !rxq_empty && n == 0;
1147                                 max_done = nb_rx > rx_adapter->max_nb_rx;
1148
1149                                 if (enq_buffer_full || max_done) {
1150                                         dev_info->next_q_idx = i;
1151                                         goto done;
1152                                 }
1153                         }
1154
1155                         rx_adapter->qd_valid = 0;
1156
1157                         /* Reinitialize for next interrupt */
1158                         dev_info->next_q_idx = dev_info->multi_intr_cap ?
1159                                                 RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1160                                                 0;
1161                 } else {
1162                         n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1163                                 rx_adapter->max_nb_rx,
1164                                 &rxq_empty);
1165                         rx_adapter->qd_valid = !rxq_empty;
1166                         nb_rx += n;
1167                         if (nb_rx > rx_adapter->max_nb_rx)
1168                                 break;
1169                 }
1170         }
1171
1172 done:
1173         rx_adapter->stats.rx_intr_packets += nb_rx;
1174         return nb_rx;
1175 }
1176
1177 /*
1178  * Polls receive queues added to the event adapter and enqueues received
1179  * packets to the event device.
1180  *
1181  * The receive code enqueues initially to a temporary buffer, the
1182  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1183  *
1184  * If there isn't space available in the temporary buffer, packets from the
1185  * Rx queue aren't dequeued from the eth device, this back pressures the
1186  * eth device, in virtual device environments this back pressure is relayed to
1187  * the hypervisor's switching layer where adjustments can be made to deal with
1188  * it.
1189  */
1190 static inline uint32_t
1191 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
1192 {
1193         uint32_t num_queue;
1194         uint32_t nb_rx = 0;
1195         struct rte_eth_event_enqueue_buffer *buf;
1196         uint32_t wrr_pos;
1197         uint32_t max_nb_rx;
1198
1199         wrr_pos = rx_adapter->wrr_pos;
1200         max_nb_rx = rx_adapter->max_nb_rx;
1201         buf = &rx_adapter->event_enqueue_buffer;
1202
1203         /* Iterate through a WRR sequence */
1204         for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1205                 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1206                 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1207                 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1208
1209                 /* Don't do a batch dequeue from the rx queue if there isn't
1210                  * enough space in the enqueue buffer.
1211                  */
1212                 if (buf->count >= BATCH_SIZE)
1213                         rxa_flush_event_buffer(rx_adapter);
1214                 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
1215                         rx_adapter->wrr_pos = wrr_pos;
1216                         return nb_rx;
1217                 }
1218
1219                 nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1220                                 NULL);
1221                 if (nb_rx > max_nb_rx) {
1222                         rx_adapter->wrr_pos =
1223                                     (wrr_pos + 1) % rx_adapter->wrr_len;
1224                         break;
1225                 }
1226
1227                 if (++wrr_pos == rx_adapter->wrr_len)
1228                         wrr_pos = 0;
1229         }
1230         return nb_rx;
1231 }
1232
1233 static void
1234 rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
1235 {
1236         struct rte_event_eth_rx_adapter *rx_adapter = arg;
1237         struct rte_eth_event_enqueue_buffer *buf =
1238                 &rx_adapter->event_enqueue_buffer;
1239         struct rte_event *ev;
1240
1241         if (buf->count)
1242                 rxa_flush_event_buffer(rx_adapter);
1243
1244         if (vec->vector_ev->nb_elem == 0)
1245                 return;
1246         ev = &buf->events[buf->count];
1247
1248         /* Event ready. */
1249         ev->event = vec->event;
1250         ev->vec = vec->vector_ev;
1251         buf->count++;
1252
1253         vec->vector_ev = NULL;
1254         vec->ts = 0;
1255 }
1256
1257 static int
1258 rxa_service_func(void *args)
1259 {
1260         struct rte_event_eth_rx_adapter *rx_adapter = args;
1261         struct rte_event_eth_rx_adapter_stats *stats;
1262
1263         if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1264                 return 0;
1265         if (!rx_adapter->rxa_started) {
1266                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1267                 return 0;
1268         }
1269
1270         if (rx_adapter->ena_vector) {
1271                 if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
1272                     rx_adapter->vector_tmo_ticks) {
1273                         struct eth_rx_vector_data *vec;
1274
1275                         TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1276                                 uint64_t elapsed_time = rte_rdtsc() - vec->ts;
1277
1278                                 if (elapsed_time >= vec->vector_timeout_ticks) {
1279                                         rxa_vector_expire(vec, rx_adapter);
1280                                         TAILQ_REMOVE(&rx_adapter->vector_list,
1281                                                      vec, next);
1282                                 }
1283                         }
1284                         rx_adapter->prev_expiry_ts = rte_rdtsc();
1285                 }
1286         }
1287
1288         stats = &rx_adapter->stats;
1289         stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
1290         stats->rx_packets += rxa_poll(rx_adapter);
1291         rte_spinlock_unlock(&rx_adapter->rx_lock);
1292         return 0;
1293 }
1294
1295 static int
1296 rte_event_eth_rx_adapter_init(void)
1297 {
1298         const char *name = "rte_event_eth_rx_adapter_array";
1299         const struct rte_memzone *mz;
1300         unsigned int sz;
1301
1302         sz = sizeof(*event_eth_rx_adapter) *
1303             RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1304         sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1305
1306         mz = rte_memzone_lookup(name);
1307         if (mz == NULL) {
1308                 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1309                                                  RTE_CACHE_LINE_SIZE);
1310                 if (mz == NULL) {
1311                         RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1312                                         PRId32, rte_errno);
1313                         return -rte_errno;
1314                 }
1315         }
1316
1317         event_eth_rx_adapter = mz->addr;
1318         return 0;
1319 }
1320
1321 static inline struct rte_event_eth_rx_adapter *
1322 rxa_id_to_adapter(uint8_t id)
1323 {
1324         return event_eth_rx_adapter ?
1325                 event_eth_rx_adapter[id] : NULL;
1326 }
1327
1328 static int
1329 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1330                 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1331 {
1332         int ret;
1333         struct rte_eventdev *dev;
1334         struct rte_event_dev_config dev_conf;
1335         int started;
1336         uint8_t port_id;
1337         struct rte_event_port_conf *port_conf = arg;
1338         struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1339
1340         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1341         dev_conf = dev->data->dev_conf;
1342
1343         started = dev->data->dev_started;
1344         if (started)
1345                 rte_event_dev_stop(dev_id);
1346         port_id = dev_conf.nb_event_ports;
1347         dev_conf.nb_event_ports += 1;
1348         ret = rte_event_dev_configure(dev_id, &dev_conf);
1349         if (ret) {
1350                 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1351                                                 dev_id);
1352                 if (started) {
1353                         if (rte_event_dev_start(dev_id))
1354                                 return -EIO;
1355                 }
1356                 return ret;
1357         }
1358
1359         ret = rte_event_port_setup(dev_id, port_id, port_conf);
1360         if (ret) {
1361                 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1362                                         port_id);
1363                 return ret;
1364         }
1365
1366         conf->event_port_id = port_id;
1367         conf->max_nb_rx = 128;
1368         if (started)
1369                 ret = rte_event_dev_start(dev_id);
1370         rx_adapter->default_cb_arg = 1;
1371         return ret;
1372 }
1373
1374 static int
1375 rxa_epoll_create1(void)
1376 {
1377 #if defined(LINUX)
1378         int fd;
1379         fd = epoll_create1(EPOLL_CLOEXEC);
1380         return fd < 0 ? -errno : fd;
1381 #elif defined(BSD)
1382         return -ENOTSUP;
1383 #endif
1384 }
1385
1386 static int
1387 rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
1388 {
1389         if (rx_adapter->epd != INIT_FD)
1390                 return 0;
1391
1392         rx_adapter->epd = rxa_epoll_create1();
1393         if (rx_adapter->epd < 0) {
1394                 int err = rx_adapter->epd;
1395                 rx_adapter->epd = INIT_FD;
1396                 RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1397                 return err;
1398         }
1399
1400         return 0;
1401 }
1402
1403 static int
1404 rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1405 {
1406         int err;
1407         char thread_name[RTE_MAX_THREAD_NAME_LEN];
1408
1409         if (rx_adapter->intr_ring)
1410                 return 0;
1411
1412         rx_adapter->intr_ring = rte_ring_create("intr_ring",
1413                                         RTE_EVENT_ETH_INTR_RING_SIZE,
1414                                         rte_socket_id(), 0);
1415         if (!rx_adapter->intr_ring)
1416                 return -ENOMEM;
1417
1418         rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1419                                         RTE_EVENT_ETH_INTR_RING_SIZE *
1420                                         sizeof(struct rte_epoll_event),
1421                                         RTE_CACHE_LINE_SIZE,
1422                                         rx_adapter->socket_id);
1423         if (!rx_adapter->epoll_events) {
1424                 err = -ENOMEM;
1425                 goto error;
1426         }
1427
1428         rte_spinlock_init(&rx_adapter->intr_ring_lock);
1429
1430         snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1431                         "rx-intr-thread-%d", rx_adapter->id);
1432
1433         err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1434                                 NULL, rxa_intr_thread, rx_adapter);
1435         if (!err)
1436                 return 0;
1437
1438         RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1439         rte_free(rx_adapter->epoll_events);
1440 error:
1441         rte_ring_free(rx_adapter->intr_ring);
1442         rx_adapter->intr_ring = NULL;
1443         rx_adapter->epoll_events = NULL;
1444         return err;
1445 }
1446
1447 static int
1448 rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1449 {
1450         int err;
1451
1452         err = pthread_cancel(rx_adapter->rx_intr_thread);
1453         if (err)
1454                 RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1455                                 err);
1456
1457         err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1458         if (err)
1459                 RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1460
1461         rte_free(rx_adapter->epoll_events);
1462         rte_ring_free(rx_adapter->intr_ring);
1463         rx_adapter->intr_ring = NULL;
1464         rx_adapter->epoll_events = NULL;
1465         return 0;
1466 }
1467
1468 static int
1469 rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
1470 {
1471         int ret;
1472
1473         if (rx_adapter->num_rx_intr == 0)
1474                 return 0;
1475
1476         ret = rxa_destroy_intr_thread(rx_adapter);
1477         if (ret)
1478                 return ret;
1479
1480         close(rx_adapter->epd);
1481         rx_adapter->epd = INIT_FD;
1482
1483         return ret;
1484 }
1485
1486 static int
1487 rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1488         struct eth_device_info *dev_info,
1489         uint16_t rx_queue_id)
1490 {
1491         int err;
1492         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1493         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1494
1495         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1496         if (err) {
1497                 RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1498                         rx_queue_id);
1499                 return err;
1500         }
1501
1502         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1503                                         rx_adapter->epd,
1504                                         RTE_INTR_EVENT_DEL,
1505                                         0);
1506         if (err)
1507                 RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1508
1509         if (sintr)
1510                 dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1511         else
1512                 dev_info->shared_intr_enabled = 0;
1513         return err;
1514 }
1515
1516 static int
1517 rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1518                 struct eth_device_info *dev_info,
1519                 int rx_queue_id)
1520 {
1521         int err;
1522         int i;
1523         int s;
1524
1525         if (dev_info->nb_rx_intr == 0)
1526                 return 0;
1527
1528         err = 0;
1529         if (rx_queue_id == -1) {
1530                 s = dev_info->nb_shared_intr;
1531                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1532                         int sintr;
1533                         uint16_t q;
1534
1535                         q = dev_info->intr_queue[i];
1536                         sintr = rxa_shared_intr(dev_info, q);
1537                         s -= sintr;
1538
1539                         if (!sintr || s == 0) {
1540
1541                                 err = rxa_disable_intr(rx_adapter, dev_info,
1542                                                 q);
1543                                 if (err)
1544                                         return err;
1545                                 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1546                                                         q);
1547                         }
1548                 }
1549         } else {
1550                 if (!rxa_intr_queue(dev_info, rx_queue_id))
1551                         return 0;
1552                 if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1553                                 dev_info->nb_shared_intr == 1) {
1554                         err = rxa_disable_intr(rx_adapter, dev_info,
1555                                         rx_queue_id);
1556                         if (err)
1557                                 return err;
1558                         rxa_intr_ring_del_entries(rx_adapter, dev_info,
1559                                                 rx_queue_id);
1560                 }
1561
1562                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1563                         if (dev_info->intr_queue[i] == rx_queue_id) {
1564                                 for (; i < dev_info->nb_rx_intr - 1; i++)
1565                                         dev_info->intr_queue[i] =
1566                                                 dev_info->intr_queue[i + 1];
1567                                 break;
1568                         }
1569                 }
1570         }
1571
1572         return err;
1573 }
1574
1575 static int
1576 rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1577         struct eth_device_info *dev_info,
1578         uint16_t rx_queue_id)
1579 {
1580         int err, err1;
1581         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1582         union queue_data qd;
1583         int init_fd;
1584         uint16_t *intr_queue;
1585         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1586
1587         if (rxa_intr_queue(dev_info, rx_queue_id))
1588                 return 0;
1589
1590         intr_queue = dev_info->intr_queue;
1591         if (dev_info->intr_queue == NULL) {
1592                 size_t len =
1593                         dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1594                 dev_info->intr_queue =
1595                         rte_zmalloc_socket(
1596                                 rx_adapter->mem_name,
1597                                 len,
1598                                 0,
1599                                 rx_adapter->socket_id);
1600                 if (dev_info->intr_queue == NULL)
1601                         return -ENOMEM;
1602         }
1603
1604         init_fd = rx_adapter->epd;
1605         err = rxa_init_epd(rx_adapter);
1606         if (err)
1607                 goto err_free_queue;
1608
1609         qd.port = eth_dev_id;
1610         qd.queue = rx_queue_id;
1611
1612         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1613                                         rx_adapter->epd,
1614                                         RTE_INTR_EVENT_ADD,
1615                                         qd.ptr);
1616         if (err) {
1617                 RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1618                         " Rx Queue %u err %d", rx_queue_id, err);
1619                 goto err_del_fd;
1620         }
1621
1622         err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1623         if (err) {
1624                 RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1625                                 " Rx Queue %u err %d", rx_queue_id, err);
1626
1627                 goto err_del_event;
1628         }
1629
1630         err = rxa_create_intr_thread(rx_adapter);
1631         if (!err)  {
1632                 if (sintr)
1633                         dev_info->shared_intr_enabled = 1;
1634                 else
1635                         dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1636                 return 0;
1637         }
1638
1639
1640         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1641         if (err)
1642                 RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1643                                 " Rx Queue %u err %d", rx_queue_id, err);
1644 err_del_event:
1645         err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1646                                         rx_adapter->epd,
1647                                         RTE_INTR_EVENT_DEL,
1648                                         0);
1649         if (err1) {
1650                 RTE_EDEV_LOG_ERR("Could not delete event for"
1651                                 " Rx Queue %u err %d", rx_queue_id, err1);
1652         }
1653 err_del_fd:
1654         if (init_fd == INIT_FD) {
1655                 close(rx_adapter->epd);
1656                 rx_adapter->epd = -1;
1657         }
1658 err_free_queue:
1659         if (intr_queue == NULL)
1660                 rte_free(dev_info->intr_queue);
1661
1662         return err;
1663 }
1664
1665 static int
1666 rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1667         struct eth_device_info *dev_info,
1668         int rx_queue_id)
1669
1670 {
1671         int i, j, err;
1672         int si = -1;
1673         int shared_done = (dev_info->nb_shared_intr > 0);
1674
1675         if (rx_queue_id != -1) {
1676                 if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1677                         return 0;
1678                 return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1679         }
1680
1681         err = 0;
1682         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1683
1684                 if (rxa_shared_intr(dev_info, i) && shared_done)
1685                         continue;
1686
1687                 err = rxa_config_intr(rx_adapter, dev_info, i);
1688
1689                 shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1690                 if (shared_done) {
1691                         si = i;
1692                         dev_info->shared_intr_enabled = 1;
1693                 }
1694                 if (err)
1695                         break;
1696         }
1697
1698         if (err == 0)
1699                 return 0;
1700
1701         shared_done = (dev_info->nb_shared_intr > 0);
1702         for (j = 0; j < i; j++) {
1703                 if (rxa_intr_queue(dev_info, j))
1704                         continue;
1705                 if (rxa_shared_intr(dev_info, j) && si != j)
1706                         continue;
1707                 err = rxa_disable_intr(rx_adapter, dev_info, j);
1708                 if (err)
1709                         break;
1710
1711         }
1712
1713         return err;
1714 }
1715
1716
1717 static int
1718 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
1719 {
1720         int ret;
1721         struct rte_service_spec service;
1722         struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1723
1724         if (rx_adapter->service_inited)
1725                 return 0;
1726
1727         memset(&service, 0, sizeof(service));
1728         snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1729                 "rte_event_eth_rx_adapter_%d", id);
1730         service.socket_id = rx_adapter->socket_id;
1731         service.callback = rxa_service_func;
1732         service.callback_userdata = rx_adapter;
1733         /* Service function handles locking for queue add/del updates */
1734         service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1735         ret = rte_service_component_register(&service, &rx_adapter->service_id);
1736         if (ret) {
1737                 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1738                         service.name, ret);
1739                 return ret;
1740         }
1741
1742         ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1743                 &rx_adapter_conf, rx_adapter->conf_arg);
1744         if (ret) {
1745                 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1746                         ret);
1747                 goto err_done;
1748         }
1749         rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1750         rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1751         rx_adapter->service_inited = 1;
1752         rx_adapter->epd = INIT_FD;
1753         return 0;
1754
1755 err_done:
1756         rte_service_component_unregister(rx_adapter->service_id);
1757         return ret;
1758 }
1759
1760 static void
1761 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1762                 struct eth_device_info *dev_info,
1763                 int32_t rx_queue_id,
1764                 uint8_t add)
1765 {
1766         struct eth_rx_queue_info *queue_info;
1767         int enabled;
1768         uint16_t i;
1769
1770         if (dev_info->rx_queue == NULL)
1771                 return;
1772
1773         if (rx_queue_id == -1) {
1774                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1775                         rxa_update_queue(rx_adapter, dev_info, i, add);
1776         } else {
1777                 queue_info = &dev_info->rx_queue[rx_queue_id];
1778                 enabled = queue_info->queue_enabled;
1779                 if (add) {
1780                         rx_adapter->nb_queues += !enabled;
1781                         dev_info->nb_dev_queues += !enabled;
1782                 } else {
1783                         rx_adapter->nb_queues -= enabled;
1784                         dev_info->nb_dev_queues -= enabled;
1785                 }
1786                 queue_info->queue_enabled = !!add;
1787         }
1788 }
1789
1790 static void
1791 rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t vector_count,
1792                     uint64_t vector_ns, struct rte_mempool *mp, uint32_t qid,
1793                     uint16_t port_id)
1794 {
1795 #define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
1796         struct eth_rx_vector_data *vector_data;
1797         uint32_t flow_id;
1798
1799         vector_data = &queue_info->vector_data;
1800         vector_data->max_vector_count = vector_count;
1801         vector_data->port = port_id;
1802         vector_data->queue = qid;
1803         vector_data->vector_pool = mp;
1804         vector_data->vector_timeout_ticks =
1805                 NSEC2TICK(vector_ns, rte_get_timer_hz());
1806         vector_data->ts = 0;
1807         flow_id = queue_info->event & 0xFFFFF;
1808         flow_id =
1809                 flow_id == 0 ? (qid & 0xFFF) | (port_id & 0xFF) << 12 : flow_id;
1810         vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
1811 }
1812
1813 static void
1814 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
1815         struct eth_device_info *dev_info,
1816         int32_t rx_queue_id)
1817 {
1818         struct eth_rx_vector_data *vec;
1819         int pollq;
1820         int intrq;
1821         int sintrq;
1822
1823
1824         if (rx_adapter->nb_queues == 0)
1825                 return;
1826
1827         if (rx_queue_id == -1) {
1828                 uint16_t nb_rx_queues;
1829                 uint16_t i;
1830
1831                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1832                 for (i = 0; i < nb_rx_queues; i++)
1833                         rxa_sw_del(rx_adapter, dev_info, i);
1834                 return;
1835         }
1836
1837         /* Push all the partial event vectors to event device. */
1838         TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1839                 if (vec->queue != rx_queue_id)
1840                         continue;
1841                 rxa_vector_expire(vec, rx_adapter);
1842                 TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
1843         }
1844
1845         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1846         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1847         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1848         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1849         rx_adapter->num_rx_polled -= pollq;
1850         dev_info->nb_rx_poll -= pollq;
1851         rx_adapter->num_rx_intr -= intrq;
1852         dev_info->nb_rx_intr -= intrq;
1853         dev_info->nb_shared_intr -= intrq && sintrq;
1854 }
1855
1856 static void
1857 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1858         struct eth_device_info *dev_info,
1859         int32_t rx_queue_id,
1860         const struct rte_event_eth_rx_adapter_queue_conf *conf)
1861 {
1862         struct eth_rx_queue_info *queue_info;
1863         const struct rte_event *ev = &conf->ev;
1864         int pollq;
1865         int intrq;
1866         int sintrq;
1867         struct rte_event *qi_ev;
1868
1869         if (rx_queue_id == -1) {
1870                 uint16_t nb_rx_queues;
1871                 uint16_t i;
1872
1873                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1874                 for (i = 0; i < nb_rx_queues; i++)
1875                         rxa_add_queue(rx_adapter, dev_info, i, conf);
1876                 return;
1877         }
1878
1879         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1880         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1881         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1882
1883         queue_info = &dev_info->rx_queue[rx_queue_id];
1884         queue_info->wt = conf->servicing_weight;
1885
1886         qi_ev = (struct rte_event *)&queue_info->event;
1887         qi_ev->event = ev->event;
1888         qi_ev->op = RTE_EVENT_OP_NEW;
1889         qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
1890         qi_ev->sub_event_type = 0;
1891
1892         if (conf->rx_queue_flags &
1893                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
1894                 queue_info->flow_id_mask = ~0;
1895         } else
1896                 qi_ev->flow_id = 0;
1897
1898         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
1899         if (rxa_polled_queue(dev_info, rx_queue_id)) {
1900                 rx_adapter->num_rx_polled += !pollq;
1901                 dev_info->nb_rx_poll += !pollq;
1902                 rx_adapter->num_rx_intr -= intrq;
1903                 dev_info->nb_rx_intr -= intrq;
1904                 dev_info->nb_shared_intr -= intrq && sintrq;
1905         }
1906
1907         if (rxa_intr_queue(dev_info, rx_queue_id)) {
1908                 rx_adapter->num_rx_polled -= pollq;
1909                 dev_info->nb_rx_poll -= pollq;
1910                 rx_adapter->num_rx_intr += !intrq;
1911                 dev_info->nb_rx_intr += !intrq;
1912                 dev_info->nb_shared_intr += !intrq && sintrq;
1913                 if (dev_info->nb_shared_intr == 1) {
1914                         if (dev_info->multi_intr_cap)
1915                                 dev_info->next_q_idx =
1916                                         RTE_MAX_RXTX_INTR_VEC_ID - 1;
1917                         else
1918                                 dev_info->next_q_idx = 0;
1919                 }
1920         }
1921 }
1922
1923 static void
1924 rxa_sw_event_vector_configure(
1925         struct rte_event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
1926         int rx_queue_id,
1927         const struct rte_event_eth_rx_adapter_event_vector_config *config)
1928 {
1929         struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
1930         struct eth_rx_queue_info *queue_info;
1931         struct rte_event *qi_ev;
1932
1933         if (rx_queue_id == -1) {
1934                 uint16_t nb_rx_queues;
1935                 uint16_t i;
1936
1937                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1938                 for (i = 0; i < nb_rx_queues; i++)
1939                         rxa_sw_event_vector_configure(rx_adapter, eth_dev_id, i,
1940                                                       config);
1941                 return;
1942         }
1943
1944         queue_info = &dev_info->rx_queue[rx_queue_id];
1945         qi_ev = (struct rte_event *)&queue_info->event;
1946         queue_info->ena_vector = 1;
1947         qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
1948         rxa_set_vector_data(queue_info, config->vector_sz,
1949                             config->vector_timeout_ns, config->vector_mp,
1950                             rx_queue_id, dev_info->dev->data->port_id);
1951         rx_adapter->ena_vector = 1;
1952         rx_adapter->vector_tmo_ticks =
1953                 rx_adapter->vector_tmo_ticks ?
1954                               RTE_MIN(config->vector_timeout_ns >> 1,
1955                                 rx_adapter->vector_tmo_ticks) :
1956                               config->vector_timeout_ns >> 1;
1957 }
1958
1959 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
1960                 uint16_t eth_dev_id,
1961                 int rx_queue_id,
1962                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
1963 {
1964         struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
1965         struct rte_event_eth_rx_adapter_queue_conf temp_conf;
1966         int ret;
1967         struct eth_rx_poll_entry *rx_poll;
1968         struct eth_rx_queue_info *rx_queue;
1969         uint32_t *rx_wrr;
1970         uint16_t nb_rx_queues;
1971         uint32_t nb_rx_poll, nb_wrr;
1972         uint32_t nb_rx_intr;
1973         int num_intr_vec;
1974         uint16_t wt;
1975
1976         if (queue_conf->servicing_weight == 0) {
1977                 struct rte_eth_dev_data *data = dev_info->dev->data;
1978
1979                 temp_conf = *queue_conf;
1980                 if (!data->dev_conf.intr_conf.rxq) {
1981                         /* If Rx interrupts are disabled set wt = 1 */
1982                         temp_conf.servicing_weight = 1;
1983                 }
1984                 queue_conf = &temp_conf;
1985         }
1986
1987         nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1988         rx_queue = dev_info->rx_queue;
1989         wt = queue_conf->servicing_weight;
1990
1991         if (dev_info->rx_queue == NULL) {
1992                 dev_info->rx_queue =
1993                     rte_zmalloc_socket(rx_adapter->mem_name,
1994                                        nb_rx_queues *
1995                                        sizeof(struct eth_rx_queue_info), 0,
1996                                        rx_adapter->socket_id);
1997                 if (dev_info->rx_queue == NULL)
1998                         return -ENOMEM;
1999         }
2000         rx_wrr = NULL;
2001         rx_poll = NULL;
2002
2003         rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
2004                         queue_conf->servicing_weight,
2005                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2006
2007         if (dev_info->dev->intr_handle)
2008                 dev_info->multi_intr_cap =
2009                         rte_intr_cap_multiple(dev_info->dev->intr_handle);
2010
2011         ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2012                                 &rx_poll, &rx_wrr);
2013         if (ret)
2014                 goto err_free_rxqueue;
2015
2016         if (wt == 0) {
2017                 num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
2018
2019                 ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
2020                 if (ret)
2021                         goto err_free_rxqueue;
2022
2023                 ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
2024                 if (ret)
2025                         goto err_free_rxqueue;
2026         } else {
2027
2028                 num_intr_vec = 0;
2029                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2030                         num_intr_vec = rxa_nb_intr_vect(dev_info,
2031                                                 rx_queue_id, 0);
2032                         /* interrupt based queues are being converted to
2033                          * poll mode queues, delete the interrupt configuration
2034                          * for those.
2035                          */
2036                         ret = rxa_del_intr_queue(rx_adapter,
2037                                                 dev_info, rx_queue_id);
2038                         if (ret)
2039                                 goto err_free_rxqueue;
2040                 }
2041         }
2042
2043         if (nb_rx_intr == 0) {
2044                 ret = rxa_free_intr_resources(rx_adapter);
2045                 if (ret)
2046                         goto err_free_rxqueue;
2047         }
2048
2049         if (wt == 0) {
2050                 uint16_t i;
2051
2052                 if (rx_queue_id  == -1) {
2053                         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
2054                                 dev_info->intr_queue[i] = i;
2055                 } else {
2056                         if (!rxa_intr_queue(dev_info, rx_queue_id))
2057                                 dev_info->intr_queue[nb_rx_intr - 1] =
2058                                         rx_queue_id;
2059                 }
2060         }
2061
2062
2063
2064         rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
2065         rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2066
2067         rte_free(rx_adapter->eth_rx_poll);
2068         rte_free(rx_adapter->wrr_sched);
2069
2070         rx_adapter->eth_rx_poll = rx_poll;
2071         rx_adapter->wrr_sched = rx_wrr;
2072         rx_adapter->wrr_len = nb_wrr;
2073         rx_adapter->num_intr_vec += num_intr_vec;
2074         return 0;
2075
2076 err_free_rxqueue:
2077         if (rx_queue == NULL) {
2078                 rte_free(dev_info->rx_queue);
2079                 dev_info->rx_queue = NULL;
2080         }
2081
2082         rte_free(rx_poll);
2083         rte_free(rx_wrr);
2084
2085         return 0;
2086 }
2087
2088 static int
2089 rxa_ctrl(uint8_t id, int start)
2090 {
2091         struct rte_event_eth_rx_adapter *rx_adapter;
2092         struct rte_eventdev *dev;
2093         struct eth_device_info *dev_info;
2094         uint32_t i;
2095         int use_service = 0;
2096         int stop = !start;
2097
2098         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2099         rx_adapter = rxa_id_to_adapter(id);
2100         if (rx_adapter == NULL)
2101                 return -EINVAL;
2102
2103         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2104
2105         RTE_ETH_FOREACH_DEV(i) {
2106                 dev_info = &rx_adapter->eth_devices[i];
2107                 /* if start  check for num dev queues */
2108                 if (start && !dev_info->nb_dev_queues)
2109                         continue;
2110                 /* if stop check if dev has been started */
2111                 if (stop && !dev_info->dev_rx_started)
2112                         continue;
2113                 use_service |= !dev_info->internal_event_port;
2114                 dev_info->dev_rx_started = start;
2115                 if (dev_info->internal_event_port == 0)
2116                         continue;
2117                 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
2118                                                 &rte_eth_devices[i]) :
2119                         (*dev->dev_ops->eth_rx_adapter_stop)(dev,
2120                                                 &rte_eth_devices[i]);
2121         }
2122
2123         if (use_service) {
2124                 rte_spinlock_lock(&rx_adapter->rx_lock);
2125                 rx_adapter->rxa_started = start;
2126                 rte_service_runstate_set(rx_adapter->service_id, start);
2127                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2128         }
2129
2130         return 0;
2131 }
2132
2133 int
2134 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
2135                                 rte_event_eth_rx_adapter_conf_cb conf_cb,
2136                                 void *conf_arg)
2137 {
2138         struct rte_event_eth_rx_adapter *rx_adapter;
2139         int ret;
2140         int socket_id;
2141         uint16_t i;
2142         char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
2143         const uint8_t default_rss_key[] = {
2144                 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
2145                 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
2146                 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
2147                 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
2148                 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
2149         };
2150
2151         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2152         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2153         if (conf_cb == NULL)
2154                 return -EINVAL;
2155
2156         if (event_eth_rx_adapter == NULL) {
2157                 ret = rte_event_eth_rx_adapter_init();
2158                 if (ret)
2159                         return ret;
2160         }
2161
2162         rx_adapter = rxa_id_to_adapter(id);
2163         if (rx_adapter != NULL) {
2164                 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
2165                 return -EEXIST;
2166         }
2167
2168         socket_id = rte_event_dev_socket_id(dev_id);
2169         snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
2170                 "rte_event_eth_rx_adapter_%d",
2171                 id);
2172
2173         rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
2174                         RTE_CACHE_LINE_SIZE, socket_id);
2175         if (rx_adapter == NULL) {
2176                 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
2177                 return -ENOMEM;
2178         }
2179
2180         rx_adapter->eventdev_id = dev_id;
2181         rx_adapter->socket_id = socket_id;
2182         rx_adapter->conf_cb = conf_cb;
2183         rx_adapter->conf_arg = conf_arg;
2184         rx_adapter->id = id;
2185         TAILQ_INIT(&rx_adapter->vector_list);
2186         strcpy(rx_adapter->mem_name, mem_name);
2187         rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
2188                                         RTE_MAX_ETHPORTS *
2189                                         sizeof(struct eth_device_info), 0,
2190                                         socket_id);
2191         rte_convert_rss_key((const uint32_t *)default_rss_key,
2192                         (uint32_t *)rx_adapter->rss_key_be,
2193                             RTE_DIM(default_rss_key));
2194
2195         if (rx_adapter->eth_devices == NULL) {
2196                 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
2197                 rte_free(rx_adapter);
2198                 return -ENOMEM;
2199         }
2200         rte_spinlock_init(&rx_adapter->rx_lock);
2201         for (i = 0; i < RTE_MAX_ETHPORTS; i++)
2202                 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
2203
2204         event_eth_rx_adapter[id] = rx_adapter;
2205         if (conf_cb == rxa_default_conf_cb)
2206                 rx_adapter->default_cb_arg = 1;
2207         rte_eventdev_trace_eth_rx_adapter_create(id, dev_id, conf_cb,
2208                 conf_arg);
2209         return 0;
2210 }
2211
2212 int
2213 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2214                 struct rte_event_port_conf *port_config)
2215 {
2216         struct rte_event_port_conf *pc;
2217         int ret;
2218
2219         if (port_config == NULL)
2220                 return -EINVAL;
2221         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2222
2223         pc = rte_malloc(NULL, sizeof(*pc), 0);
2224         if (pc == NULL)
2225                 return -ENOMEM;
2226         *pc = *port_config;
2227         ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2228                                         rxa_default_conf_cb,
2229                                         pc);
2230         if (ret)
2231                 rte_free(pc);
2232         return ret;
2233 }
2234
2235 int
2236 rte_event_eth_rx_adapter_free(uint8_t id)
2237 {
2238         struct rte_event_eth_rx_adapter *rx_adapter;
2239
2240         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2241
2242         rx_adapter = rxa_id_to_adapter(id);
2243         if (rx_adapter == NULL)
2244                 return -EINVAL;
2245
2246         if (rx_adapter->nb_queues) {
2247                 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2248                                 rx_adapter->nb_queues);
2249                 return -EBUSY;
2250         }
2251
2252         if (rx_adapter->default_cb_arg)
2253                 rte_free(rx_adapter->conf_arg);
2254         rte_free(rx_adapter->eth_devices);
2255         rte_free(rx_adapter);
2256         event_eth_rx_adapter[id] = NULL;
2257
2258         rte_eventdev_trace_eth_rx_adapter_free(id);
2259         return 0;
2260 }
2261
2262 int
2263 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2264                 uint16_t eth_dev_id,
2265                 int32_t rx_queue_id,
2266                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2267 {
2268         int ret;
2269         uint32_t cap;
2270         struct rte_event_eth_rx_adapter *rx_adapter;
2271         struct rte_eventdev *dev;
2272         struct eth_device_info *dev_info;
2273
2274         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2275         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2276
2277         rx_adapter = rxa_id_to_adapter(id);
2278         if ((rx_adapter == NULL) || (queue_conf == NULL))
2279                 return -EINVAL;
2280
2281         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2282         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2283                                                 eth_dev_id,
2284                                                 &cap);
2285         if (ret) {
2286                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2287                         "eth port %" PRIu16, id, eth_dev_id);
2288                 return ret;
2289         }
2290
2291         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2292                 && (queue_conf->rx_queue_flags &
2293                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2294                 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2295                                 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2296                                 eth_dev_id, id);
2297                 return -EINVAL;
2298         }
2299
2300         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0 &&
2301             (queue_conf->rx_queue_flags &
2302              RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR)) {
2303                 RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
2304                                  " eth port: %" PRIu16 " adapter id: %" PRIu8,
2305                                  eth_dev_id, id);
2306                 return -EINVAL;
2307         }
2308
2309         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2310                 (rx_queue_id != -1)) {
2311                 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2312                         "event queue, eth port: %" PRIu16 " adapter id: %"
2313                         PRIu8, eth_dev_id, id);
2314                 return -EINVAL;
2315         }
2316
2317         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2318                         rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2319                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2320                          (uint16_t)rx_queue_id);
2321                 return -EINVAL;
2322         }
2323
2324         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2325
2326         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2327                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2328                                         -ENOTSUP);
2329                 if (dev_info->rx_queue == NULL) {
2330                         dev_info->rx_queue =
2331                             rte_zmalloc_socket(rx_adapter->mem_name,
2332                                         dev_info->dev->data->nb_rx_queues *
2333                                         sizeof(struct eth_rx_queue_info), 0,
2334                                         rx_adapter->socket_id);
2335                         if (dev_info->rx_queue == NULL)
2336                                 return -ENOMEM;
2337                 }
2338
2339                 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2340                                 &rte_eth_devices[eth_dev_id],
2341                                 rx_queue_id, queue_conf);
2342                 if (ret == 0) {
2343                         dev_info->internal_event_port = 1;
2344                         rxa_update_queue(rx_adapter,
2345                                         &rx_adapter->eth_devices[eth_dev_id],
2346                                         rx_queue_id,
2347                                         1);
2348                 }
2349         } else {
2350                 rte_spinlock_lock(&rx_adapter->rx_lock);
2351                 dev_info->internal_event_port = 0;
2352                 ret = rxa_init_service(rx_adapter, id);
2353                 if (ret == 0) {
2354                         uint32_t service_id = rx_adapter->service_id;
2355                         ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2356                                         queue_conf);
2357                         rte_service_component_runstate_set(service_id,
2358                                 rxa_sw_adapter_queue_count(rx_adapter));
2359                 }
2360                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2361         }
2362
2363         rte_eventdev_trace_eth_rx_adapter_queue_add(id, eth_dev_id,
2364                 rx_queue_id, queue_conf, ret);
2365         if (ret)
2366                 return ret;
2367
2368         return 0;
2369 }
2370
2371 static int
2372 rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
2373 {
2374         limits->max_sz = MAX_VECTOR_SIZE;
2375         limits->min_sz = MIN_VECTOR_SIZE;
2376         limits->max_timeout_ns = MAX_VECTOR_NS;
2377         limits->min_timeout_ns = MIN_VECTOR_NS;
2378
2379         return 0;
2380 }
2381
2382 int
2383 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2384                                 int32_t rx_queue_id)
2385 {
2386         int ret = 0;
2387         struct rte_eventdev *dev;
2388         struct rte_event_eth_rx_adapter *rx_adapter;
2389         struct eth_device_info *dev_info;
2390         uint32_t cap;
2391         uint32_t nb_rx_poll = 0;
2392         uint32_t nb_wrr = 0;
2393         uint32_t nb_rx_intr;
2394         struct eth_rx_poll_entry *rx_poll = NULL;
2395         uint32_t *rx_wrr = NULL;
2396         int num_intr_vec;
2397
2398         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2399         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2400
2401         rx_adapter = rxa_id_to_adapter(id);
2402         if (rx_adapter == NULL)
2403                 return -EINVAL;
2404
2405         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2406         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2407                                                 eth_dev_id,
2408                                                 &cap);
2409         if (ret)
2410                 return ret;
2411
2412         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2413                 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2414                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2415                          (uint16_t)rx_queue_id);
2416                 return -EINVAL;
2417         }
2418
2419         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2420
2421         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2422                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2423                                  -ENOTSUP);
2424                 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2425                                                 &rte_eth_devices[eth_dev_id],
2426                                                 rx_queue_id);
2427                 if (ret == 0) {
2428                         rxa_update_queue(rx_adapter,
2429                                         &rx_adapter->eth_devices[eth_dev_id],
2430                                         rx_queue_id,
2431                                         0);
2432                         if (dev_info->nb_dev_queues == 0) {
2433                                 rte_free(dev_info->rx_queue);
2434                                 dev_info->rx_queue = NULL;
2435                         }
2436                 }
2437         } else {
2438                 rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2439                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2440
2441                 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2442                         &rx_poll, &rx_wrr);
2443                 if (ret)
2444                         return ret;
2445
2446                 rte_spinlock_lock(&rx_adapter->rx_lock);
2447
2448                 num_intr_vec = 0;
2449                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2450
2451                         num_intr_vec = rxa_nb_intr_vect(dev_info,
2452                                                 rx_queue_id, 0);
2453                         ret = rxa_del_intr_queue(rx_adapter, dev_info,
2454                                         rx_queue_id);
2455                         if (ret)
2456                                 goto unlock_ret;
2457                 }
2458
2459                 if (nb_rx_intr == 0) {
2460                         ret = rxa_free_intr_resources(rx_adapter);
2461                         if (ret)
2462                                 goto unlock_ret;
2463                 }
2464
2465                 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2466                 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2467
2468                 rte_free(rx_adapter->eth_rx_poll);
2469                 rte_free(rx_adapter->wrr_sched);
2470
2471                 if (nb_rx_intr == 0) {
2472                         rte_free(dev_info->intr_queue);
2473                         dev_info->intr_queue = NULL;
2474                 }
2475
2476                 rx_adapter->eth_rx_poll = rx_poll;
2477                 rx_adapter->wrr_sched = rx_wrr;
2478                 rx_adapter->wrr_len = nb_wrr;
2479                 rx_adapter->num_intr_vec += num_intr_vec;
2480
2481                 if (dev_info->nb_dev_queues == 0) {
2482                         rte_free(dev_info->rx_queue);
2483                         dev_info->rx_queue = NULL;
2484                 }
2485 unlock_ret:
2486                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2487                 if (ret) {
2488                         rte_free(rx_poll);
2489                         rte_free(rx_wrr);
2490                         return ret;
2491                 }
2492
2493                 rte_service_component_runstate_set(rx_adapter->service_id,
2494                                 rxa_sw_adapter_queue_count(rx_adapter));
2495         }
2496
2497         rte_eventdev_trace_eth_rx_adapter_queue_del(id, eth_dev_id,
2498                 rx_queue_id, ret);
2499         return ret;
2500 }
2501
2502 int
2503 rte_event_eth_rx_adapter_queue_event_vector_config(
2504         uint8_t id, uint16_t eth_dev_id, int32_t rx_queue_id,
2505         struct rte_event_eth_rx_adapter_event_vector_config *config)
2506 {
2507         struct rte_event_eth_rx_adapter_vector_limits limits;
2508         struct rte_event_eth_rx_adapter *rx_adapter;
2509         struct rte_eventdev *dev;
2510         uint32_t cap;
2511         int ret;
2512
2513         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2514         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2515
2516         rx_adapter = rxa_id_to_adapter(id);
2517         if ((rx_adapter == NULL) || (config == NULL))
2518                 return -EINVAL;
2519
2520         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2521         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2522                                                 eth_dev_id, &cap);
2523         if (ret) {
2524                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2525                                  "eth port %" PRIu16,
2526                                  id, eth_dev_id);
2527                 return ret;
2528         }
2529
2530         if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR)) {
2531                 RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
2532                                  " eth port: %" PRIu16 " adapter id: %" PRIu8,
2533                                  eth_dev_id, id);
2534                 return -EINVAL;
2535         }
2536
2537         ret = rte_event_eth_rx_adapter_vector_limits_get(
2538                 rx_adapter->eventdev_id, eth_dev_id, &limits);
2539         if (ret) {
2540                 RTE_EDEV_LOG_ERR("Failed to get vector limits edev %" PRIu8
2541                                  "eth port %" PRIu16,
2542                                  rx_adapter->eventdev_id, eth_dev_id);
2543                 return ret;
2544         }
2545
2546         if (config->vector_sz < limits.min_sz ||
2547             config->vector_sz > limits.max_sz ||
2548             config->vector_timeout_ns < limits.min_timeout_ns ||
2549             config->vector_timeout_ns > limits.max_timeout_ns ||
2550             config->vector_mp == NULL) {
2551                 RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2552                                  " eth port: %" PRIu16 " adapter id: %" PRIu8,
2553                                  eth_dev_id, id);
2554                 return -EINVAL;
2555         }
2556         if (config->vector_mp->elt_size <
2557             (sizeof(struct rte_event_vector) +
2558              (sizeof(uintptr_t) * config->vector_sz))) {
2559                 RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2560                                  " eth port: %" PRIu16 " adapter id: %" PRIu8,
2561                                  eth_dev_id, id);
2562                 return -EINVAL;
2563         }
2564
2565         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2566                 RTE_FUNC_PTR_OR_ERR_RET(
2567                         *dev->dev_ops->eth_rx_adapter_event_vector_config,
2568                         -ENOTSUP);
2569                 ret = dev->dev_ops->eth_rx_adapter_event_vector_config(
2570                         dev, &rte_eth_devices[eth_dev_id], rx_queue_id, config);
2571         } else {
2572                 rxa_sw_event_vector_configure(rx_adapter, eth_dev_id,
2573                                               rx_queue_id, config);
2574         }
2575
2576         return ret;
2577 }
2578
2579 int
2580 rte_event_eth_rx_adapter_vector_limits_get(
2581         uint8_t dev_id, uint16_t eth_port_id,
2582         struct rte_event_eth_rx_adapter_vector_limits *limits)
2583 {
2584         struct rte_eventdev *dev;
2585         uint32_t cap;
2586         int ret;
2587
2588         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2589         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_port_id, -EINVAL);
2590
2591         if (limits == NULL)
2592                 return -EINVAL;
2593
2594         dev = &rte_eventdevs[dev_id];
2595
2596         ret = rte_event_eth_rx_adapter_caps_get(dev_id, eth_port_id, &cap);
2597         if (ret) {
2598                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2599                                  "eth port %" PRIu16,
2600                                  dev_id, eth_port_id);
2601                 return ret;
2602         }
2603
2604         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2605                 RTE_FUNC_PTR_OR_ERR_RET(
2606                         *dev->dev_ops->eth_rx_adapter_vector_limits_get,
2607                         -ENOTSUP);
2608                 ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
2609                         dev, &rte_eth_devices[eth_port_id], limits);
2610         } else {
2611                 ret = rxa_sw_vector_limits(limits);
2612         }
2613
2614         return ret;
2615 }
2616
2617 int
2618 rte_event_eth_rx_adapter_start(uint8_t id)
2619 {
2620         rte_eventdev_trace_eth_rx_adapter_start(id);
2621         return rxa_ctrl(id, 1);
2622 }
2623
2624 int
2625 rte_event_eth_rx_adapter_stop(uint8_t id)
2626 {
2627         rte_eventdev_trace_eth_rx_adapter_stop(id);
2628         return rxa_ctrl(id, 0);
2629 }
2630
2631 int
2632 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2633                                struct rte_event_eth_rx_adapter_stats *stats)
2634 {
2635         struct rte_event_eth_rx_adapter *rx_adapter;
2636         struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2637         struct rte_event_eth_rx_adapter_stats dev_stats;
2638         struct rte_eventdev *dev;
2639         struct eth_device_info *dev_info;
2640         uint32_t i;
2641         int ret;
2642
2643         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2644
2645         rx_adapter = rxa_id_to_adapter(id);
2646         if (rx_adapter  == NULL || stats == NULL)
2647                 return -EINVAL;
2648
2649         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2650         memset(stats, 0, sizeof(*stats));
2651         RTE_ETH_FOREACH_DEV(i) {
2652                 dev_info = &rx_adapter->eth_devices[i];
2653                 if (dev_info->internal_event_port == 0 ||
2654                         dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2655                         continue;
2656                 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2657                                                 &rte_eth_devices[i],
2658                                                 &dev_stats);
2659                 if (ret)
2660                         continue;
2661                 dev_stats_sum.rx_packets += dev_stats.rx_packets;
2662                 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2663         }
2664
2665         if (rx_adapter->service_inited)
2666                 *stats = rx_adapter->stats;
2667
2668         stats->rx_packets += dev_stats_sum.rx_packets;
2669         stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2670         return 0;
2671 }
2672
2673 int
2674 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
2675 {
2676         struct rte_event_eth_rx_adapter *rx_adapter;
2677         struct rte_eventdev *dev;
2678         struct eth_device_info *dev_info;
2679         uint32_t i;
2680
2681         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2682
2683         rx_adapter = rxa_id_to_adapter(id);
2684         if (rx_adapter == NULL)
2685                 return -EINVAL;
2686
2687         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2688         RTE_ETH_FOREACH_DEV(i) {
2689                 dev_info = &rx_adapter->eth_devices[i];
2690                 if (dev_info->internal_event_port == 0 ||
2691                         dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
2692                         continue;
2693                 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
2694                                                         &rte_eth_devices[i]);
2695         }
2696
2697         memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
2698         return 0;
2699 }
2700
2701 int
2702 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
2703 {
2704         struct rte_event_eth_rx_adapter *rx_adapter;
2705
2706         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2707
2708         rx_adapter = rxa_id_to_adapter(id);
2709         if (rx_adapter == NULL || service_id == NULL)
2710                 return -EINVAL;
2711
2712         if (rx_adapter->service_inited)
2713                 *service_id = rx_adapter->service_id;
2714
2715         return rx_adapter->service_inited ? 0 : -ESRCH;
2716 }
2717
2718 int
2719 rte_event_eth_rx_adapter_cb_register(uint8_t id,
2720                                         uint16_t eth_dev_id,
2721                                         rte_event_eth_rx_adapter_cb_fn cb_fn,
2722                                         void *cb_arg)
2723 {
2724         struct rte_event_eth_rx_adapter *rx_adapter;
2725         struct eth_device_info *dev_info;
2726         uint32_t cap;
2727         int ret;
2728
2729         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2730         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2731
2732         rx_adapter = rxa_id_to_adapter(id);
2733         if (rx_adapter == NULL)
2734                 return -EINVAL;
2735
2736         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2737         if (dev_info->rx_queue == NULL)
2738                 return -EINVAL;
2739
2740         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2741                                                 eth_dev_id,
2742                                                 &cap);
2743         if (ret) {
2744                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2745                         "eth port %" PRIu16, id, eth_dev_id);
2746                 return ret;
2747         }
2748
2749         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2750                 RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
2751                                 PRIu16, eth_dev_id);
2752                 return -EINVAL;
2753         }
2754
2755         rte_spinlock_lock(&rx_adapter->rx_lock);
2756         dev_info->cb_fn = cb_fn;
2757         dev_info->cb_arg = cb_arg;
2758         rte_spinlock_unlock(&rx_adapter->rx_lock);
2759
2760         return 0;
2761 }