d317e89c1bdcb3ecb1c075c1807cf5a05949b400
[dpdk.git] / lib / eventdev / rte_event_eth_rx_adapter.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #if defined(LINUX)
6 #include <sys/epoll.h>
7 #endif
8 #include <unistd.h>
9
10 #include <rte_cycles.h>
11 #include <rte_common.h>
12 #include <rte_dev.h>
13 #include <rte_errno.h>
14 #include <rte_ethdev.h>
15 #include <rte_log.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
20
21 #include "rte_eventdev.h"
22 #include "eventdev_pmd.h"
23 #include "rte_eventdev_trace.h"
24 #include "rte_event_eth_rx_adapter.h"
25
26 #define BATCH_SIZE              32
27 #define BLOCK_CNT_THRESHOLD     10
28 #define ETH_EVENT_BUFFER_SIZE   (4*BATCH_SIZE)
29 #define MAX_VECTOR_SIZE         1024
30 #define MIN_VECTOR_SIZE         4
31 #define MAX_VECTOR_NS           1E9
32 #define MIN_VECTOR_NS           1E5
33
34 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
35 #define ETH_RX_ADAPTER_MEM_NAME_LEN     32
36
37 #define RSS_KEY_SIZE    40
38 /* value written to intr thread pipe to signal thread exit */
39 #define ETH_BRIDGE_INTR_THREAD_EXIT     1
40 /* Sentinel value to detect initialized file handle */
41 #define INIT_FD         -1
42
43 /*
44  * Used to store port and queue ID of interrupting Rx queue
45  */
46 union queue_data {
47         RTE_STD_C11
48         void *ptr;
49         struct {
50                 uint16_t port;
51                 uint16_t queue;
52         };
53 };
54
55 /*
56  * There is an instance of this struct per polled Rx queue added to the
57  * adapter
58  */
59 struct eth_rx_poll_entry {
60         /* Eth port to poll */
61         uint16_t eth_dev_id;
62         /* Eth rx queue to poll */
63         uint16_t eth_rx_qid;
64 };
65
66 struct eth_rx_vector_data {
67         TAILQ_ENTRY(eth_rx_vector_data) next;
68         uint16_t port;
69         uint16_t queue;
70         uint16_t max_vector_count;
71         uint64_t event;
72         uint64_t ts;
73         uint64_t vector_timeout_ticks;
74         struct rte_mempool *vector_pool;
75         struct rte_event_vector *vector_ev;
76 } __rte_cache_aligned;
77
78 TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
79
80 /* Instance per adapter */
81 struct rte_eth_event_enqueue_buffer {
82         /* Count of events in this buffer */
83         uint16_t count;
84         /* Array of events in this buffer */
85         struct rte_event events[ETH_EVENT_BUFFER_SIZE];
86 };
87
88 struct rte_event_eth_rx_adapter {
89         /* RSS key */
90         uint8_t rss_key_be[RSS_KEY_SIZE];
91         /* Event device identifier */
92         uint8_t eventdev_id;
93         /* Per ethernet device structure */
94         struct eth_device_info *eth_devices;
95         /* Event port identifier */
96         uint8_t event_port_id;
97         /* Lock to serialize config updates with service function */
98         rte_spinlock_t rx_lock;
99         /* Max mbufs processed in any service function invocation */
100         uint32_t max_nb_rx;
101         /* Receive queues that need to be polled */
102         struct eth_rx_poll_entry *eth_rx_poll;
103         /* Size of the eth_rx_poll array */
104         uint16_t num_rx_polled;
105         /* Weighted round robin schedule */
106         uint32_t *wrr_sched;
107         /* wrr_sched[] size */
108         uint32_t wrr_len;
109         /* Next entry in wrr[] to begin polling */
110         uint32_t wrr_pos;
111         /* Event burst buffer */
112         struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
113         /* Vector enable flag */
114         uint8_t ena_vector;
115         /* Timestamp of previous vector expiry list traversal */
116         uint64_t prev_expiry_ts;
117         /* Minimum ticks to wait before traversing expiry list */
118         uint64_t vector_tmo_ticks;
119         /* vector list */
120         struct eth_rx_vector_data_list vector_list;
121         /* Per adapter stats */
122         struct rte_event_eth_rx_adapter_stats stats;
123         /* Block count, counts up to BLOCK_CNT_THRESHOLD */
124         uint16_t enq_block_count;
125         /* Block start ts */
126         uint64_t rx_enq_block_start_ts;
127         /* epoll fd used to wait for Rx interrupts */
128         int epd;
129         /* Num of interrupt driven interrupt queues */
130         uint32_t num_rx_intr;
131         /* Used to send <dev id, queue id> of interrupting Rx queues from
132          * the interrupt thread to the Rx thread
133          */
134         struct rte_ring *intr_ring;
135         /* Rx Queue data (dev id, queue id) for the last non-empty
136          * queue polled
137          */
138         union queue_data qd;
139         /* queue_data is valid */
140         int qd_valid;
141         /* Interrupt ring lock, synchronizes Rx thread
142          * and interrupt thread
143          */
144         rte_spinlock_t intr_ring_lock;
145         /* event array passed to rte_poll_wait */
146         struct rte_epoll_event *epoll_events;
147         /* Count of interrupt vectors in use */
148         uint32_t num_intr_vec;
149         /* Thread blocked on Rx interrupts */
150         pthread_t rx_intr_thread;
151         /* Configuration callback for rte_service configuration */
152         rte_event_eth_rx_adapter_conf_cb conf_cb;
153         /* Configuration callback argument */
154         void *conf_arg;
155         /* Set if  default_cb is being used */
156         int default_cb_arg;
157         /* Service initialization state */
158         uint8_t service_inited;
159         /* Total count of Rx queues in adapter */
160         uint32_t nb_queues;
161         /* Memory allocation name */
162         char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
163         /* Socket identifier cached from eventdev */
164         int socket_id;
165         /* Per adapter EAL service */
166         uint32_t service_id;
167         /* Adapter started flag */
168         uint8_t rxa_started;
169         /* Adapter ID */
170         uint8_t id;
171 } __rte_cache_aligned;
172
173 /* Per eth device */
174 struct eth_device_info {
175         struct rte_eth_dev *dev;
176         struct eth_rx_queue_info *rx_queue;
177         /* Rx callback */
178         rte_event_eth_rx_adapter_cb_fn cb_fn;
179         /* Rx callback argument */
180         void *cb_arg;
181         /* Set if ethdev->eventdev packet transfer uses a
182          * hardware mechanism
183          */
184         uint8_t internal_event_port;
185         /* Set if the adapter is processing rx queues for
186          * this eth device and packet processing has been
187          * started, allows for the code to know if the PMD
188          * rx_adapter_stop callback needs to be invoked
189          */
190         uint8_t dev_rx_started;
191         /* Number of queues added for this device */
192         uint16_t nb_dev_queues;
193         /* Number of poll based queues
194          * If nb_rx_poll > 0, the start callback will
195          * be invoked if not already invoked
196          */
197         uint16_t nb_rx_poll;
198         /* Number of interrupt based queues
199          * If nb_rx_intr > 0, the start callback will
200          * be invoked if not already invoked.
201          */
202         uint16_t nb_rx_intr;
203         /* Number of queues that use the shared interrupt */
204         uint16_t nb_shared_intr;
205         /* sum(wrr(q)) for all queues within the device
206          * useful when deleting all device queues
207          */
208         uint32_t wrr_len;
209         /* Intr based queue index to start polling from, this is used
210          * if the number of shared interrupts is non-zero
211          */
212         uint16_t next_q_idx;
213         /* Intr based queue indices */
214         uint16_t *intr_queue;
215         /* device generates per Rx queue interrupt for queue index
216          * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
217          */
218         int multi_intr_cap;
219         /* shared interrupt enabled */
220         int shared_intr_enabled;
221 };
222
223 /* Per Rx queue */
224 struct eth_rx_queue_info {
225         int queue_enabled;      /* True if added */
226         int intr_enabled;
227         uint8_t ena_vector;
228         uint16_t wt;            /* Polling weight */
229         uint32_t flow_id_mask;  /* Set to ~0 if app provides flow id else 0 */
230         uint64_t event;
231         struct eth_rx_vector_data vector_data;
232 };
233
234 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
235
236 static inline int
237 rxa_validate_id(uint8_t id)
238 {
239         return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
240 }
241
242 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
243         if (!rxa_validate_id(id)) { \
244                 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
245                 return retval; \
246         } \
247 } while (0)
248
249 static inline int
250 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
251 {
252         return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
253 }
254
255 /* Greatest common divisor */
256 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
257 {
258         uint16_t r = a % b;
259
260         return r ? rxa_gcd_u16(b, r) : b;
261 }
262
263 /* Returns the next queue in the polling sequence
264  *
265  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
266  */
267 static int
268 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
269          unsigned int n, int *cw,
270          struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
271          uint16_t gcd, int prev)
272 {
273         int i = prev;
274         uint16_t w;
275
276         while (1) {
277                 uint16_t q;
278                 uint16_t d;
279
280                 i = (i + 1) % n;
281                 if (i == 0) {
282                         *cw = *cw - gcd;
283                         if (*cw <= 0)
284                                 *cw = max_wt;
285                 }
286
287                 q = eth_rx_poll[i].eth_rx_qid;
288                 d = eth_rx_poll[i].eth_dev_id;
289                 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
290
291                 if ((int)w >= *cw)
292                         return i;
293         }
294 }
295
296 static inline int
297 rxa_shared_intr(struct eth_device_info *dev_info,
298         int rx_queue_id)
299 {
300         int multi_intr_cap;
301
302         if (dev_info->dev->intr_handle == NULL)
303                 return 0;
304
305         multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
306         return !multi_intr_cap ||
307                 rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
308 }
309
310 static inline int
311 rxa_intr_queue(struct eth_device_info *dev_info,
312         int rx_queue_id)
313 {
314         struct eth_rx_queue_info *queue_info;
315
316         queue_info = &dev_info->rx_queue[rx_queue_id];
317         return dev_info->rx_queue &&
318                 !dev_info->internal_event_port &&
319                 queue_info->queue_enabled && queue_info->wt == 0;
320 }
321
322 static inline int
323 rxa_polled_queue(struct eth_device_info *dev_info,
324         int rx_queue_id)
325 {
326         struct eth_rx_queue_info *queue_info;
327
328         queue_info = &dev_info->rx_queue[rx_queue_id];
329         return !dev_info->internal_event_port &&
330                 dev_info->rx_queue &&
331                 queue_info->queue_enabled && queue_info->wt != 0;
332 }
333
334 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
335 static int
336 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
337 {
338         uint16_t i;
339         int n, s;
340         uint16_t nbq;
341
342         nbq = dev_info->dev->data->nb_rx_queues;
343         n = 0; /* non shared count */
344         s = 0; /* shared count */
345
346         if (rx_queue_id == -1) {
347                 for (i = 0; i < nbq; i++) {
348                         if (!rxa_shared_intr(dev_info, i))
349                                 n += add ? !rxa_intr_queue(dev_info, i) :
350                                         rxa_intr_queue(dev_info, i);
351                         else
352                                 s += add ? !rxa_intr_queue(dev_info, i) :
353                                         rxa_intr_queue(dev_info, i);
354                 }
355
356                 if (s > 0) {
357                         if ((add && dev_info->nb_shared_intr == 0) ||
358                                 (!add && dev_info->nb_shared_intr))
359                                 n += 1;
360                 }
361         } else {
362                 if (!rxa_shared_intr(dev_info, rx_queue_id))
363                         n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
364                                 rxa_intr_queue(dev_info, rx_queue_id);
365                 else
366                         n = add ? !dev_info->nb_shared_intr :
367                                 dev_info->nb_shared_intr == 1;
368         }
369
370         return add ? n : -n;
371 }
372
373 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
374  */
375 static void
376 rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
377                         struct eth_device_info *dev_info,
378                         int rx_queue_id,
379                         uint32_t *nb_rx_intr)
380 {
381         uint32_t intr_diff;
382
383         if (rx_queue_id == -1)
384                 intr_diff = dev_info->nb_rx_intr;
385         else
386                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
387
388         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
389 }
390
391 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
392  * interrupt queues could currently be poll mode Rx queues
393  */
394 static void
395 rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
396                         struct eth_device_info *dev_info,
397                         int rx_queue_id,
398                         uint32_t *nb_rx_poll,
399                         uint32_t *nb_rx_intr,
400                         uint32_t *nb_wrr)
401 {
402         uint32_t intr_diff;
403         uint32_t poll_diff;
404         uint32_t wrr_len_diff;
405
406         if (rx_queue_id == -1) {
407                 intr_diff = dev_info->dev->data->nb_rx_queues -
408                                                 dev_info->nb_rx_intr;
409                 poll_diff = dev_info->nb_rx_poll;
410                 wrr_len_diff = dev_info->wrr_len;
411         } else {
412                 intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
413                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
414                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
415                                         0;
416         }
417
418         *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
419         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
420         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
421 }
422
423 /* Calculate size of the eth_rx_poll and wrr_sched arrays
424  * after deleting poll mode rx queues
425  */
426 static void
427 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
428                         struct eth_device_info *dev_info,
429                         int rx_queue_id,
430                         uint32_t *nb_rx_poll,
431                         uint32_t *nb_wrr)
432 {
433         uint32_t poll_diff;
434         uint32_t wrr_len_diff;
435
436         if (rx_queue_id == -1) {
437                 poll_diff = dev_info->nb_rx_poll;
438                 wrr_len_diff = dev_info->wrr_len;
439         } else {
440                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
441                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
442                                         0;
443         }
444
445         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
446         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
447 }
448
449 /* Calculate nb_rx_* after adding poll mode rx queues
450  */
451 static void
452 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
453                         struct eth_device_info *dev_info,
454                         int rx_queue_id,
455                         uint16_t wt,
456                         uint32_t *nb_rx_poll,
457                         uint32_t *nb_rx_intr,
458                         uint32_t *nb_wrr)
459 {
460         uint32_t intr_diff;
461         uint32_t poll_diff;
462         uint32_t wrr_len_diff;
463
464         if (rx_queue_id == -1) {
465                 intr_diff = dev_info->nb_rx_intr;
466                 poll_diff = dev_info->dev->data->nb_rx_queues -
467                                                 dev_info->nb_rx_poll;
468                 wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
469                                 - dev_info->wrr_len;
470         } else {
471                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
472                 poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
473                 wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
474                                 wt - dev_info->rx_queue[rx_queue_id].wt :
475                                 wt;
476         }
477
478         *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
479         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
480         *nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
481 }
482
483 /* Calculate nb_rx_* after adding rx_queue_id */
484 static void
485 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
486                 struct eth_device_info *dev_info,
487                 int rx_queue_id,
488                 uint16_t wt,
489                 uint32_t *nb_rx_poll,
490                 uint32_t *nb_rx_intr,
491                 uint32_t *nb_wrr)
492 {
493         if (wt != 0)
494                 rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
495                                         wt, nb_rx_poll, nb_rx_intr, nb_wrr);
496         else
497                 rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
498                                         nb_rx_poll, nb_rx_intr, nb_wrr);
499 }
500
501 /* Calculate nb_rx_* after deleting rx_queue_id */
502 static void
503 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
504                 struct eth_device_info *dev_info,
505                 int rx_queue_id,
506                 uint32_t *nb_rx_poll,
507                 uint32_t *nb_rx_intr,
508                 uint32_t *nb_wrr)
509 {
510         rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
511                                 nb_wrr);
512         rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
513                                 nb_rx_intr);
514 }
515
516 /*
517  * Allocate the rx_poll array
518  */
519 static struct eth_rx_poll_entry *
520 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
521         uint32_t num_rx_polled)
522 {
523         size_t len;
524
525         len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
526                                                         RTE_CACHE_LINE_SIZE);
527         return  rte_zmalloc_socket(rx_adapter->mem_name,
528                                 len,
529                                 RTE_CACHE_LINE_SIZE,
530                                 rx_adapter->socket_id);
531 }
532
533 /*
534  * Allocate the WRR array
535  */
536 static uint32_t *
537 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
538 {
539         size_t len;
540
541         len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
542                         RTE_CACHE_LINE_SIZE);
543         return  rte_zmalloc_socket(rx_adapter->mem_name,
544                                 len,
545                                 RTE_CACHE_LINE_SIZE,
546                                 rx_adapter->socket_id);
547 }
548
549 static int
550 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
551                 uint32_t nb_poll,
552                 uint32_t nb_wrr,
553                 struct eth_rx_poll_entry **rx_poll,
554                 uint32_t **wrr_sched)
555 {
556
557         if (nb_poll == 0) {
558                 *rx_poll = NULL;
559                 *wrr_sched = NULL;
560                 return 0;
561         }
562
563         *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
564         if (*rx_poll == NULL) {
565                 *wrr_sched = NULL;
566                 return -ENOMEM;
567         }
568
569         *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
570         if (*wrr_sched == NULL) {
571                 rte_free(*rx_poll);
572                 return -ENOMEM;
573         }
574         return 0;
575 }
576
577 /* Precalculate WRR polling sequence for all queues in rx_adapter */
578 static void
579 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
580                 struct eth_rx_poll_entry *rx_poll,
581                 uint32_t *rx_wrr)
582 {
583         uint16_t d;
584         uint16_t q;
585         unsigned int i;
586         int prev = -1;
587         int cw = -1;
588
589         /* Initialize variables for calculation of wrr schedule */
590         uint16_t max_wrr_pos = 0;
591         unsigned int poll_q = 0;
592         uint16_t max_wt = 0;
593         uint16_t gcd = 0;
594
595         if (rx_poll == NULL)
596                 return;
597
598         /* Generate array of all queues to poll, the size of this
599          * array is poll_q
600          */
601         RTE_ETH_FOREACH_DEV(d) {
602                 uint16_t nb_rx_queues;
603                 struct eth_device_info *dev_info =
604                                 &rx_adapter->eth_devices[d];
605                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
606                 if (dev_info->rx_queue == NULL)
607                         continue;
608                 if (dev_info->internal_event_port)
609                         continue;
610                 dev_info->wrr_len = 0;
611                 for (q = 0; q < nb_rx_queues; q++) {
612                         struct eth_rx_queue_info *queue_info =
613                                 &dev_info->rx_queue[q];
614                         uint16_t wt;
615
616                         if (!rxa_polled_queue(dev_info, q))
617                                 continue;
618                         wt = queue_info->wt;
619                         rx_poll[poll_q].eth_dev_id = d;
620                         rx_poll[poll_q].eth_rx_qid = q;
621                         max_wrr_pos += wt;
622                         dev_info->wrr_len += wt;
623                         max_wt = RTE_MAX(max_wt, wt);
624                         gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
625                         poll_q++;
626                 }
627         }
628
629         /* Generate polling sequence based on weights */
630         prev = -1;
631         cw = -1;
632         for (i = 0; i < max_wrr_pos; i++) {
633                 rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
634                                      rx_poll, max_wt, gcd, prev);
635                 prev = rx_wrr[i];
636         }
637 }
638
639 static inline void
640 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
641         struct rte_ipv6_hdr **ipv6_hdr)
642 {
643         struct rte_ether_hdr *eth_hdr =
644                 rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
645         struct rte_vlan_hdr *vlan_hdr;
646
647         *ipv4_hdr = NULL;
648         *ipv6_hdr = NULL;
649
650         switch (eth_hdr->ether_type) {
651         case RTE_BE16(RTE_ETHER_TYPE_IPV4):
652                 *ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
653                 break;
654
655         case RTE_BE16(RTE_ETHER_TYPE_IPV6):
656                 *ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
657                 break;
658
659         case RTE_BE16(RTE_ETHER_TYPE_VLAN):
660                 vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
661                 switch (vlan_hdr->eth_proto) {
662                 case RTE_BE16(RTE_ETHER_TYPE_IPV4):
663                         *ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
664                         break;
665                 case RTE_BE16(RTE_ETHER_TYPE_IPV6):
666                         *ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
667                         break;
668                 default:
669                         break;
670                 }
671                 break;
672
673         default:
674                 break;
675         }
676 }
677
678 /* Calculate RSS hash for IPv4/6 */
679 static inline uint32_t
680 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
681 {
682         uint32_t input_len;
683         void *tuple;
684         struct rte_ipv4_tuple ipv4_tuple;
685         struct rte_ipv6_tuple ipv6_tuple;
686         struct rte_ipv4_hdr *ipv4_hdr;
687         struct rte_ipv6_hdr *ipv6_hdr;
688
689         rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
690
691         if (ipv4_hdr) {
692                 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
693                 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
694                 tuple = &ipv4_tuple;
695                 input_len = RTE_THASH_V4_L3_LEN;
696         } else if (ipv6_hdr) {
697                 rte_thash_load_v6_addrs(ipv6_hdr,
698                                         (union rte_thash_tuple *)&ipv6_tuple);
699                 tuple = &ipv6_tuple;
700                 input_len = RTE_THASH_V6_L3_LEN;
701         } else
702                 return 0;
703
704         return rte_softrss_be(tuple, input_len, rss_key_be);
705 }
706
707 static inline int
708 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
709 {
710         return !!rx_adapter->enq_block_count;
711 }
712
713 static inline void
714 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
715 {
716         if (rx_adapter->rx_enq_block_start_ts)
717                 return;
718
719         rx_adapter->enq_block_count++;
720         if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
721                 return;
722
723         rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
724 }
725
726 static inline void
727 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
728                     struct rte_event_eth_rx_adapter_stats *stats)
729 {
730         if (unlikely(!stats->rx_enq_start_ts))
731                 stats->rx_enq_start_ts = rte_get_tsc_cycles();
732
733         if (likely(!rxa_enq_blocked(rx_adapter)))
734                 return;
735
736         rx_adapter->enq_block_count = 0;
737         if (rx_adapter->rx_enq_block_start_ts) {
738                 stats->rx_enq_end_ts = rte_get_tsc_cycles();
739                 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
740                     rx_adapter->rx_enq_block_start_ts;
741                 rx_adapter->rx_enq_block_start_ts = 0;
742         }
743 }
744
745 /* Enqueue buffered events to event device */
746 static inline uint16_t
747 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
748 {
749         struct rte_eth_event_enqueue_buffer *buf =
750             &rx_adapter->event_enqueue_buffer;
751         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
752
753         if (!buf->count)
754                 return 0;
755
756         uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
757                                         rx_adapter->event_port_id,
758                                         buf->events,
759                                         buf->count);
760         if (n != buf->count) {
761                 memmove(buf->events,
762                         &buf->events[n],
763                         (buf->count - n) * sizeof(struct rte_event));
764                 stats->rx_enq_retry++;
765         }
766
767         n ? rxa_enq_block_end_ts(rx_adapter, stats) :
768                 rxa_enq_block_start_ts(rx_adapter);
769
770         buf->count -= n;
771         stats->rx_enq_count += n;
772
773         return n;
774 }
775
776 static inline void
777 rxa_init_vector(struct rte_event_eth_rx_adapter *rx_adapter,
778                 struct eth_rx_vector_data *vec)
779 {
780         vec->vector_ev->nb_elem = 0;
781         vec->vector_ev->port = vec->port;
782         vec->vector_ev->queue = vec->queue;
783         vec->vector_ev->attr_valid = true;
784         TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
785 }
786
787 static inline uint16_t
788 rxa_create_event_vector(struct rte_event_eth_rx_adapter *rx_adapter,
789                         struct eth_rx_queue_info *queue_info,
790                         struct rte_eth_event_enqueue_buffer *buf,
791                         struct rte_mbuf **mbufs, uint16_t num)
792 {
793         struct rte_event *ev = &buf->events[buf->count];
794         struct eth_rx_vector_data *vec;
795         uint16_t filled, space, sz;
796
797         filled = 0;
798         vec = &queue_info->vector_data;
799
800         if (vec->vector_ev == NULL) {
801                 if (rte_mempool_get(vec->vector_pool,
802                                     (void **)&vec->vector_ev) < 0) {
803                         rte_pktmbuf_free_bulk(mbufs, num);
804                         return 0;
805                 }
806                 rxa_init_vector(rx_adapter, vec);
807         }
808         while (num) {
809                 if (vec->vector_ev->nb_elem == vec->max_vector_count) {
810                         /* Event ready. */
811                         ev->event = vec->event;
812                         ev->vec = vec->vector_ev;
813                         ev++;
814                         filled++;
815                         vec->vector_ev = NULL;
816                         TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
817                         if (rte_mempool_get(vec->vector_pool,
818                                             (void **)&vec->vector_ev) < 0) {
819                                 rte_pktmbuf_free_bulk(mbufs, num);
820                                 return 0;
821                         }
822                         rxa_init_vector(rx_adapter, vec);
823                 }
824
825                 space = vec->max_vector_count - vec->vector_ev->nb_elem;
826                 sz = num > space ? space : num;
827                 memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
828                        sizeof(void *) * sz);
829                 vec->vector_ev->nb_elem += sz;
830                 num -= sz;
831                 mbufs += sz;
832                 vec->ts = rte_rdtsc();
833         }
834
835         if (vec->vector_ev->nb_elem == vec->max_vector_count) {
836                 ev->event = vec->event;
837                 ev->vec = vec->vector_ev;
838                 ev++;
839                 filled++;
840                 vec->vector_ev = NULL;
841                 TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
842         }
843
844         return filled;
845 }
846
847 static inline void
848 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
849                 uint16_t eth_dev_id,
850                 uint16_t rx_queue_id,
851                 struct rte_mbuf **mbufs,
852                 uint16_t num)
853 {
854         uint32_t i;
855         struct eth_device_info *dev_info =
856                                         &rx_adapter->eth_devices[eth_dev_id];
857         struct eth_rx_queue_info *eth_rx_queue_info =
858                                         &dev_info->rx_queue[rx_queue_id];
859         struct rte_eth_event_enqueue_buffer *buf =
860                                         &rx_adapter->event_enqueue_buffer;
861         struct rte_event *ev = &buf->events[buf->count];
862         uint64_t event = eth_rx_queue_info->event;
863         uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
864         struct rte_mbuf *m = mbufs[0];
865         uint32_t rss_mask;
866         uint32_t rss;
867         int do_rss;
868         uint16_t nb_cb;
869         uint16_t dropped;
870
871         if (!eth_rx_queue_info->ena_vector) {
872                 /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
873                 rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
874                 do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
875                 for (i = 0; i < num; i++) {
876                         m = mbufs[i];
877
878                         rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
879                                      : m->hash.rss;
880                         ev->event = event;
881                         ev->flow_id = (rss & ~flow_id_mask) |
882                                       (ev->flow_id & flow_id_mask);
883                         ev->mbuf = m;
884                         ev++;
885                 }
886         } else {
887                 num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
888                                               buf, mbufs, num);
889         }
890
891         if (num && dev_info->cb_fn) {
892
893                 dropped = 0;
894                 nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
895                                         ETH_EVENT_BUFFER_SIZE, buf->count,
896                                         &buf->events[buf->count], num,
897                                         dev_info->cb_arg, &dropped);
898                 if (unlikely(nb_cb > num))
899                         RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
900                                 nb_cb, num);
901                 else
902                         num = nb_cb;
903                 if (dropped)
904                         rx_adapter->stats.rx_dropped += dropped;
905         }
906
907         buf->count += num;
908 }
909
910 /* Enqueue packets from  <port, q>  to event buffer */
911 static inline uint32_t
912 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
913         uint16_t port_id,
914         uint16_t queue_id,
915         uint32_t rx_count,
916         uint32_t max_rx,
917         int *rxq_empty)
918 {
919         struct rte_mbuf *mbufs[BATCH_SIZE];
920         struct rte_eth_event_enqueue_buffer *buf =
921                                         &rx_adapter->event_enqueue_buffer;
922         struct rte_event_eth_rx_adapter_stats *stats =
923                                         &rx_adapter->stats;
924         uint16_t n;
925         uint32_t nb_rx = 0;
926
927         if (rxq_empty)
928                 *rxq_empty = 0;
929         /* Don't do a batch dequeue from the rx queue if there isn't
930          * enough space in the enqueue buffer.
931          */
932         while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
933                 if (buf->count >= BATCH_SIZE)
934                         rxa_flush_event_buffer(rx_adapter);
935
936                 stats->rx_poll_count++;
937                 n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
938                 if (unlikely(!n)) {
939                         if (rxq_empty)
940                                 *rxq_empty = 1;
941                         break;
942                 }
943                 rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
944                 nb_rx += n;
945                 if (rx_count + nb_rx > max_rx)
946                         break;
947         }
948
949         if (buf->count > 0)
950                 rxa_flush_event_buffer(rx_adapter);
951
952         return nb_rx;
953 }
954
955 static inline void
956 rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
957                 void *data)
958 {
959         uint16_t port_id;
960         uint16_t queue;
961         int err;
962         union queue_data qd;
963         struct eth_device_info *dev_info;
964         struct eth_rx_queue_info *queue_info;
965         int *intr_enabled;
966
967         qd.ptr = data;
968         port_id = qd.port;
969         queue = qd.queue;
970
971         dev_info = &rx_adapter->eth_devices[port_id];
972         queue_info = &dev_info->rx_queue[queue];
973         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
974         if (rxa_shared_intr(dev_info, queue))
975                 intr_enabled = &dev_info->shared_intr_enabled;
976         else
977                 intr_enabled = &queue_info->intr_enabled;
978
979         if (*intr_enabled) {
980                 *intr_enabled = 0;
981                 err = rte_ring_enqueue(rx_adapter->intr_ring, data);
982                 /* Entry should always be available.
983                  * The ring size equals the maximum number of interrupt
984                  * vectors supported (an interrupt vector is shared in
985                  * case of shared interrupts)
986                  */
987                 if (err)
988                         RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
989                                 " to ring: %s", strerror(-err));
990                 else
991                         rte_eth_dev_rx_intr_disable(port_id, queue);
992         }
993         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
994 }
995
996 static int
997 rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
998                         uint32_t num_intr_vec)
999 {
1000         if (rx_adapter->num_intr_vec + num_intr_vec >
1001                                 RTE_EVENT_ETH_INTR_RING_SIZE) {
1002                 RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
1003                 " %d needed %d limit %d", rx_adapter->num_intr_vec,
1004                 num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
1005                 return -ENOSPC;
1006         }
1007
1008         return 0;
1009 }
1010
1011 /* Delete entries for (dev, queue) from the interrupt ring */
1012 static void
1013 rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
1014                         struct eth_device_info *dev_info,
1015                         uint16_t rx_queue_id)
1016 {
1017         int i, n;
1018         union queue_data qd;
1019
1020         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1021
1022         n = rte_ring_count(rx_adapter->intr_ring);
1023         for (i = 0; i < n; i++) {
1024                 rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1025                 if (!rxa_shared_intr(dev_info, rx_queue_id)) {
1026                         if (qd.port == dev_info->dev->data->port_id &&
1027                                 qd.queue == rx_queue_id)
1028                                 continue;
1029                 } else {
1030                         if (qd.port == dev_info->dev->data->port_id)
1031                                 continue;
1032                 }
1033                 rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
1034         }
1035
1036         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1037 }
1038
1039 /* pthread callback handling interrupt mode receive queues
1040  * After receiving an Rx interrupt, it enqueues the port id and queue id of the
1041  * interrupting queue to the adapter's ring buffer for interrupt events.
1042  * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
1043  * the adapter service function.
1044  */
1045 static void *
1046 rxa_intr_thread(void *arg)
1047 {
1048         struct rte_event_eth_rx_adapter *rx_adapter = arg;
1049         struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
1050         int n, i;
1051
1052         while (1) {
1053                 n = rte_epoll_wait(rx_adapter->epd, epoll_events,
1054                                 RTE_EVENT_ETH_INTR_RING_SIZE, -1);
1055                 if (unlikely(n < 0))
1056                         RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
1057                                         n);
1058                 for (i = 0; i < n; i++) {
1059                         rxa_intr_ring_enqueue(rx_adapter,
1060                                         epoll_events[i].epdata.data);
1061                 }
1062         }
1063
1064         return NULL;
1065 }
1066
1067 /* Dequeue <port, q> from interrupt ring and enqueue received
1068  * mbufs to eventdev
1069  */
1070 static inline uint32_t
1071 rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
1072 {
1073         uint32_t n;
1074         uint32_t nb_rx = 0;
1075         int rxq_empty;
1076         struct rte_eth_event_enqueue_buffer *buf;
1077         rte_spinlock_t *ring_lock;
1078         uint8_t max_done = 0;
1079
1080         if (rx_adapter->num_rx_intr == 0)
1081                 return 0;
1082
1083         if (rte_ring_count(rx_adapter->intr_ring) == 0
1084                 && !rx_adapter->qd_valid)
1085                 return 0;
1086
1087         buf = &rx_adapter->event_enqueue_buffer;
1088         ring_lock = &rx_adapter->intr_ring_lock;
1089
1090         if (buf->count >= BATCH_SIZE)
1091                 rxa_flush_event_buffer(rx_adapter);
1092
1093         while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
1094                 struct eth_device_info *dev_info;
1095                 uint16_t port;
1096                 uint16_t queue;
1097                 union queue_data qd  = rx_adapter->qd;
1098                 int err;
1099
1100                 if (!rx_adapter->qd_valid) {
1101                         struct eth_rx_queue_info *queue_info;
1102
1103                         rte_spinlock_lock(ring_lock);
1104                         err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1105                         if (err) {
1106                                 rte_spinlock_unlock(ring_lock);
1107                                 break;
1108                         }
1109
1110                         port = qd.port;
1111                         queue = qd.queue;
1112                         rx_adapter->qd = qd;
1113                         rx_adapter->qd_valid = 1;
1114                         dev_info = &rx_adapter->eth_devices[port];
1115                         if (rxa_shared_intr(dev_info, queue))
1116                                 dev_info->shared_intr_enabled = 1;
1117                         else {
1118                                 queue_info = &dev_info->rx_queue[queue];
1119                                 queue_info->intr_enabled = 1;
1120                         }
1121                         rte_eth_dev_rx_intr_enable(port, queue);
1122                         rte_spinlock_unlock(ring_lock);
1123                 } else {
1124                         port = qd.port;
1125                         queue = qd.queue;
1126
1127                         dev_info = &rx_adapter->eth_devices[port];
1128                 }
1129
1130                 if (rxa_shared_intr(dev_info, queue)) {
1131                         uint16_t i;
1132                         uint16_t nb_queues;
1133
1134                         nb_queues = dev_info->dev->data->nb_rx_queues;
1135                         n = 0;
1136                         for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1137                                 uint8_t enq_buffer_full;
1138
1139                                 if (!rxa_intr_queue(dev_info, i))
1140                                         continue;
1141                                 n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1142                                         rx_adapter->max_nb_rx,
1143                                         &rxq_empty);
1144                                 nb_rx += n;
1145
1146                                 enq_buffer_full = !rxq_empty && n == 0;
1147                                 max_done = nb_rx > rx_adapter->max_nb_rx;
1148
1149                                 if (enq_buffer_full || max_done) {
1150                                         dev_info->next_q_idx = i;
1151                                         goto done;
1152                                 }
1153                         }
1154
1155                         rx_adapter->qd_valid = 0;
1156
1157                         /* Reinitialize for next interrupt */
1158                         dev_info->next_q_idx = dev_info->multi_intr_cap ?
1159                                                 RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1160                                                 0;
1161                 } else {
1162                         n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1163                                 rx_adapter->max_nb_rx,
1164                                 &rxq_empty);
1165                         rx_adapter->qd_valid = !rxq_empty;
1166                         nb_rx += n;
1167                         if (nb_rx > rx_adapter->max_nb_rx)
1168                                 break;
1169                 }
1170         }
1171
1172 done:
1173         rx_adapter->stats.rx_intr_packets += nb_rx;
1174         return nb_rx;
1175 }
1176
1177 /*
1178  * Polls receive queues added to the event adapter and enqueues received
1179  * packets to the event device.
1180  *
1181  * The receive code enqueues initially to a temporary buffer, the
1182  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1183  *
1184  * If there isn't space available in the temporary buffer, packets from the
1185  * Rx queue aren't dequeued from the eth device, this back pressures the
1186  * eth device, in virtual device environments this back pressure is relayed to
1187  * the hypervisor's switching layer where adjustments can be made to deal with
1188  * it.
1189  */
1190 static inline uint32_t
1191 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
1192 {
1193         uint32_t num_queue;
1194         uint32_t nb_rx = 0;
1195         struct rte_eth_event_enqueue_buffer *buf;
1196         uint32_t wrr_pos;
1197         uint32_t max_nb_rx;
1198
1199         wrr_pos = rx_adapter->wrr_pos;
1200         max_nb_rx = rx_adapter->max_nb_rx;
1201         buf = &rx_adapter->event_enqueue_buffer;
1202
1203         /* Iterate through a WRR sequence */
1204         for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1205                 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1206                 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1207                 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1208
1209                 /* Don't do a batch dequeue from the rx queue if there isn't
1210                  * enough space in the enqueue buffer.
1211                  */
1212                 if (buf->count >= BATCH_SIZE)
1213                         rxa_flush_event_buffer(rx_adapter);
1214                 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
1215                         rx_adapter->wrr_pos = wrr_pos;
1216                         return nb_rx;
1217                 }
1218
1219                 nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1220                                 NULL);
1221                 if (nb_rx > max_nb_rx) {
1222                         rx_adapter->wrr_pos =
1223                                     (wrr_pos + 1) % rx_adapter->wrr_len;
1224                         break;
1225                 }
1226
1227                 if (++wrr_pos == rx_adapter->wrr_len)
1228                         wrr_pos = 0;
1229         }
1230         return nb_rx;
1231 }
1232
1233 static void
1234 rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
1235 {
1236         struct rte_event_eth_rx_adapter *rx_adapter = arg;
1237         struct rte_eth_event_enqueue_buffer *buf =
1238                 &rx_adapter->event_enqueue_buffer;
1239         struct rte_event *ev;
1240
1241         if (buf->count)
1242                 rxa_flush_event_buffer(rx_adapter);
1243
1244         if (vec->vector_ev->nb_elem == 0)
1245                 return;
1246         ev = &buf->events[buf->count];
1247
1248         /* Event ready. */
1249         ev->event = vec->event;
1250         ev->vec = vec->vector_ev;
1251         buf->count++;
1252
1253         vec->vector_ev = NULL;
1254         vec->ts = 0;
1255 }
1256
1257 static int
1258 rxa_service_func(void *args)
1259 {
1260         struct rte_event_eth_rx_adapter *rx_adapter = args;
1261         struct rte_event_eth_rx_adapter_stats *stats;
1262
1263         if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1264                 return 0;
1265         if (!rx_adapter->rxa_started) {
1266                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1267                 return 0;
1268         }
1269
1270         if (rx_adapter->ena_vector) {
1271                 if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
1272                     rx_adapter->vector_tmo_ticks) {
1273                         struct eth_rx_vector_data *vec;
1274
1275                         TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1276                                 uint64_t elapsed_time = rte_rdtsc() - vec->ts;
1277
1278                                 if (elapsed_time >= vec->vector_timeout_ticks) {
1279                                         rxa_vector_expire(vec, rx_adapter);
1280                                         TAILQ_REMOVE(&rx_adapter->vector_list,
1281                                                      vec, next);
1282                                 }
1283                         }
1284                         rx_adapter->prev_expiry_ts = rte_rdtsc();
1285                 }
1286         }
1287
1288         stats = &rx_adapter->stats;
1289         stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
1290         stats->rx_packets += rxa_poll(rx_adapter);
1291         rte_spinlock_unlock(&rx_adapter->rx_lock);
1292         return 0;
1293 }
1294
1295 static int
1296 rte_event_eth_rx_adapter_init(void)
1297 {
1298         const char *name = "rte_event_eth_rx_adapter_array";
1299         const struct rte_memzone *mz;
1300         unsigned int sz;
1301
1302         sz = sizeof(*event_eth_rx_adapter) *
1303             RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1304         sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1305
1306         mz = rte_memzone_lookup(name);
1307         if (mz == NULL) {
1308                 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1309                                                  RTE_CACHE_LINE_SIZE);
1310                 if (mz == NULL) {
1311                         RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1312                                         PRId32, rte_errno);
1313                         return -rte_errno;
1314                 }
1315         }
1316
1317         event_eth_rx_adapter = mz->addr;
1318         return 0;
1319 }
1320
1321 static inline struct rte_event_eth_rx_adapter *
1322 rxa_id_to_adapter(uint8_t id)
1323 {
1324         return event_eth_rx_adapter ?
1325                 event_eth_rx_adapter[id] : NULL;
1326 }
1327
1328 static int
1329 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1330                 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1331 {
1332         int ret;
1333         struct rte_eventdev *dev;
1334         struct rte_event_dev_config dev_conf;
1335         int started;
1336         uint8_t port_id;
1337         struct rte_event_port_conf *port_conf = arg;
1338         struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1339
1340         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1341         dev_conf = dev->data->dev_conf;
1342
1343         started = dev->data->dev_started;
1344         if (started)
1345                 rte_event_dev_stop(dev_id);
1346         port_id = dev_conf.nb_event_ports;
1347         dev_conf.nb_event_ports += 1;
1348         ret = rte_event_dev_configure(dev_id, &dev_conf);
1349         if (ret) {
1350                 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1351                                                 dev_id);
1352                 if (started) {
1353                         if (rte_event_dev_start(dev_id))
1354                                 return -EIO;
1355                 }
1356                 return ret;
1357         }
1358
1359         ret = rte_event_port_setup(dev_id, port_id, port_conf);
1360         if (ret) {
1361                 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1362                                         port_id);
1363                 return ret;
1364         }
1365
1366         conf->event_port_id = port_id;
1367         conf->max_nb_rx = 128;
1368         if (started)
1369                 ret = rte_event_dev_start(dev_id);
1370         rx_adapter->default_cb_arg = 1;
1371         return ret;
1372 }
1373
1374 static int
1375 rxa_epoll_create1(void)
1376 {
1377 #if defined(LINUX)
1378         int fd;
1379         fd = epoll_create1(EPOLL_CLOEXEC);
1380         return fd < 0 ? -errno : fd;
1381 #elif defined(BSD)
1382         return -ENOTSUP;
1383 #endif
1384 }
1385
1386 static int
1387 rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
1388 {
1389         if (rx_adapter->epd != INIT_FD)
1390                 return 0;
1391
1392         rx_adapter->epd = rxa_epoll_create1();
1393         if (rx_adapter->epd < 0) {
1394                 int err = rx_adapter->epd;
1395                 rx_adapter->epd = INIT_FD;
1396                 RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1397                 return err;
1398         }
1399
1400         return 0;
1401 }
1402
1403 static int
1404 rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1405 {
1406         int err;
1407         char thread_name[RTE_MAX_THREAD_NAME_LEN];
1408
1409         if (rx_adapter->intr_ring)
1410                 return 0;
1411
1412         rx_adapter->intr_ring = rte_ring_create("intr_ring",
1413                                         RTE_EVENT_ETH_INTR_RING_SIZE,
1414                                         rte_socket_id(), 0);
1415         if (!rx_adapter->intr_ring)
1416                 return -ENOMEM;
1417
1418         rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1419                                         RTE_EVENT_ETH_INTR_RING_SIZE *
1420                                         sizeof(struct rte_epoll_event),
1421                                         RTE_CACHE_LINE_SIZE,
1422                                         rx_adapter->socket_id);
1423         if (!rx_adapter->epoll_events) {
1424                 err = -ENOMEM;
1425                 goto error;
1426         }
1427
1428         rte_spinlock_init(&rx_adapter->intr_ring_lock);
1429
1430         snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1431                         "rx-intr-thread-%d", rx_adapter->id);
1432
1433         err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1434                                 NULL, rxa_intr_thread, rx_adapter);
1435         if (!err)
1436                 return 0;
1437
1438         RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1439 error:
1440         rte_ring_free(rx_adapter->intr_ring);
1441         rx_adapter->intr_ring = NULL;
1442         rx_adapter->epoll_events = NULL;
1443         return err;
1444 }
1445
1446 static int
1447 rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1448 {
1449         int err;
1450
1451         err = pthread_cancel(rx_adapter->rx_intr_thread);
1452         if (err)
1453                 RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1454                                 err);
1455
1456         err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1457         if (err)
1458                 RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1459
1460         rte_free(rx_adapter->epoll_events);
1461         rte_ring_free(rx_adapter->intr_ring);
1462         rx_adapter->intr_ring = NULL;
1463         rx_adapter->epoll_events = NULL;
1464         return 0;
1465 }
1466
1467 static int
1468 rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
1469 {
1470         int ret;
1471
1472         if (rx_adapter->num_rx_intr == 0)
1473                 return 0;
1474
1475         ret = rxa_destroy_intr_thread(rx_adapter);
1476         if (ret)
1477                 return ret;
1478
1479         close(rx_adapter->epd);
1480         rx_adapter->epd = INIT_FD;
1481
1482         return ret;
1483 }
1484
1485 static int
1486 rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1487         struct eth_device_info *dev_info,
1488         uint16_t rx_queue_id)
1489 {
1490         int err;
1491         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1492         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1493
1494         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1495         if (err) {
1496                 RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1497                         rx_queue_id);
1498                 return err;
1499         }
1500
1501         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1502                                         rx_adapter->epd,
1503                                         RTE_INTR_EVENT_DEL,
1504                                         0);
1505         if (err)
1506                 RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1507
1508         if (sintr)
1509                 dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1510         else
1511                 dev_info->shared_intr_enabled = 0;
1512         return err;
1513 }
1514
1515 static int
1516 rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1517                 struct eth_device_info *dev_info,
1518                 int rx_queue_id)
1519 {
1520         int err;
1521         int i;
1522         int s;
1523
1524         if (dev_info->nb_rx_intr == 0)
1525                 return 0;
1526
1527         err = 0;
1528         if (rx_queue_id == -1) {
1529                 s = dev_info->nb_shared_intr;
1530                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1531                         int sintr;
1532                         uint16_t q;
1533
1534                         q = dev_info->intr_queue[i];
1535                         sintr = rxa_shared_intr(dev_info, q);
1536                         s -= sintr;
1537
1538                         if (!sintr || s == 0) {
1539
1540                                 err = rxa_disable_intr(rx_adapter, dev_info,
1541                                                 q);
1542                                 if (err)
1543                                         return err;
1544                                 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1545                                                         q);
1546                         }
1547                 }
1548         } else {
1549                 if (!rxa_intr_queue(dev_info, rx_queue_id))
1550                         return 0;
1551                 if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1552                                 dev_info->nb_shared_intr == 1) {
1553                         err = rxa_disable_intr(rx_adapter, dev_info,
1554                                         rx_queue_id);
1555                         if (err)
1556                                 return err;
1557                         rxa_intr_ring_del_entries(rx_adapter, dev_info,
1558                                                 rx_queue_id);
1559                 }
1560
1561                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1562                         if (dev_info->intr_queue[i] == rx_queue_id) {
1563                                 for (; i < dev_info->nb_rx_intr - 1; i++)
1564                                         dev_info->intr_queue[i] =
1565                                                 dev_info->intr_queue[i + 1];
1566                                 break;
1567                         }
1568                 }
1569         }
1570
1571         return err;
1572 }
1573
1574 static int
1575 rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1576         struct eth_device_info *dev_info,
1577         uint16_t rx_queue_id)
1578 {
1579         int err, err1;
1580         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1581         union queue_data qd;
1582         int init_fd;
1583         uint16_t *intr_queue;
1584         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1585
1586         if (rxa_intr_queue(dev_info, rx_queue_id))
1587                 return 0;
1588
1589         intr_queue = dev_info->intr_queue;
1590         if (dev_info->intr_queue == NULL) {
1591                 size_t len =
1592                         dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1593                 dev_info->intr_queue =
1594                         rte_zmalloc_socket(
1595                                 rx_adapter->mem_name,
1596                                 len,
1597                                 0,
1598                                 rx_adapter->socket_id);
1599                 if (dev_info->intr_queue == NULL)
1600                         return -ENOMEM;
1601         }
1602
1603         init_fd = rx_adapter->epd;
1604         err = rxa_init_epd(rx_adapter);
1605         if (err)
1606                 goto err_free_queue;
1607
1608         qd.port = eth_dev_id;
1609         qd.queue = rx_queue_id;
1610
1611         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1612                                         rx_adapter->epd,
1613                                         RTE_INTR_EVENT_ADD,
1614                                         qd.ptr);
1615         if (err) {
1616                 RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1617                         " Rx Queue %u err %d", rx_queue_id, err);
1618                 goto err_del_fd;
1619         }
1620
1621         err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1622         if (err) {
1623                 RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1624                                 " Rx Queue %u err %d", rx_queue_id, err);
1625
1626                 goto err_del_event;
1627         }
1628
1629         err = rxa_create_intr_thread(rx_adapter);
1630         if (!err)  {
1631                 if (sintr)
1632                         dev_info->shared_intr_enabled = 1;
1633                 else
1634                         dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1635                 return 0;
1636         }
1637
1638
1639         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1640         if (err)
1641                 RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1642                                 " Rx Queue %u err %d", rx_queue_id, err);
1643 err_del_event:
1644         err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1645                                         rx_adapter->epd,
1646                                         RTE_INTR_EVENT_DEL,
1647                                         0);
1648         if (err1) {
1649                 RTE_EDEV_LOG_ERR("Could not delete event for"
1650                                 " Rx Queue %u err %d", rx_queue_id, err1);
1651         }
1652 err_del_fd:
1653         if (init_fd == INIT_FD) {
1654                 close(rx_adapter->epd);
1655                 rx_adapter->epd = -1;
1656         }
1657 err_free_queue:
1658         if (intr_queue == NULL)
1659                 rte_free(dev_info->intr_queue);
1660
1661         return err;
1662 }
1663
1664 static int
1665 rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1666         struct eth_device_info *dev_info,
1667         int rx_queue_id)
1668
1669 {
1670         int i, j, err;
1671         int si = -1;
1672         int shared_done = (dev_info->nb_shared_intr > 0);
1673
1674         if (rx_queue_id != -1) {
1675                 if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1676                         return 0;
1677                 return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1678         }
1679
1680         err = 0;
1681         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1682
1683                 if (rxa_shared_intr(dev_info, i) && shared_done)
1684                         continue;
1685
1686                 err = rxa_config_intr(rx_adapter, dev_info, i);
1687
1688                 shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1689                 if (shared_done) {
1690                         si = i;
1691                         dev_info->shared_intr_enabled = 1;
1692                 }
1693                 if (err)
1694                         break;
1695         }
1696
1697         if (err == 0)
1698                 return 0;
1699
1700         shared_done = (dev_info->nb_shared_intr > 0);
1701         for (j = 0; j < i; j++) {
1702                 if (rxa_intr_queue(dev_info, j))
1703                         continue;
1704                 if (rxa_shared_intr(dev_info, j) && si != j)
1705                         continue;
1706                 err = rxa_disable_intr(rx_adapter, dev_info, j);
1707                 if (err)
1708                         break;
1709
1710         }
1711
1712         return err;
1713 }
1714
1715
1716 static int
1717 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
1718 {
1719         int ret;
1720         struct rte_service_spec service;
1721         struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1722
1723         if (rx_adapter->service_inited)
1724                 return 0;
1725
1726         memset(&service, 0, sizeof(service));
1727         snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1728                 "rte_event_eth_rx_adapter_%d", id);
1729         service.socket_id = rx_adapter->socket_id;
1730         service.callback = rxa_service_func;
1731         service.callback_userdata = rx_adapter;
1732         /* Service function handles locking for queue add/del updates */
1733         service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1734         ret = rte_service_component_register(&service, &rx_adapter->service_id);
1735         if (ret) {
1736                 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1737                         service.name, ret);
1738                 return ret;
1739         }
1740
1741         ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1742                 &rx_adapter_conf, rx_adapter->conf_arg);
1743         if (ret) {
1744                 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1745                         ret);
1746                 goto err_done;
1747         }
1748         rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1749         rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1750         rx_adapter->service_inited = 1;
1751         rx_adapter->epd = INIT_FD;
1752         return 0;
1753
1754 err_done:
1755         rte_service_component_unregister(rx_adapter->service_id);
1756         return ret;
1757 }
1758
1759 static void
1760 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1761                 struct eth_device_info *dev_info,
1762                 int32_t rx_queue_id,
1763                 uint8_t add)
1764 {
1765         struct eth_rx_queue_info *queue_info;
1766         int enabled;
1767         uint16_t i;
1768
1769         if (dev_info->rx_queue == NULL)
1770                 return;
1771
1772         if (rx_queue_id == -1) {
1773                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1774                         rxa_update_queue(rx_adapter, dev_info, i, add);
1775         } else {
1776                 queue_info = &dev_info->rx_queue[rx_queue_id];
1777                 enabled = queue_info->queue_enabled;
1778                 if (add) {
1779                         rx_adapter->nb_queues += !enabled;
1780                         dev_info->nb_dev_queues += !enabled;
1781                 } else {
1782                         rx_adapter->nb_queues -= enabled;
1783                         dev_info->nb_dev_queues -= enabled;
1784                 }
1785                 queue_info->queue_enabled = !!add;
1786         }
1787 }
1788
1789 static void
1790 rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t vector_count,
1791                     uint64_t vector_ns, struct rte_mempool *mp, uint32_t qid,
1792                     uint16_t port_id)
1793 {
1794 #define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
1795         struct eth_rx_vector_data *vector_data;
1796         uint32_t flow_id;
1797
1798         vector_data = &queue_info->vector_data;
1799         vector_data->max_vector_count = vector_count;
1800         vector_data->port = port_id;
1801         vector_data->queue = qid;
1802         vector_data->vector_pool = mp;
1803         vector_data->vector_timeout_ticks =
1804                 NSEC2TICK(vector_ns, rte_get_timer_hz());
1805         vector_data->ts = 0;
1806         flow_id = queue_info->event & 0xFFFFF;
1807         flow_id =
1808                 flow_id == 0 ? (qid & 0xFFF) | (port_id & 0xFF) << 12 : flow_id;
1809         vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
1810 }
1811
1812 static void
1813 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
1814         struct eth_device_info *dev_info,
1815         int32_t rx_queue_id)
1816 {
1817         struct eth_rx_vector_data *vec;
1818         int pollq;
1819         int intrq;
1820         int sintrq;
1821
1822
1823         if (rx_adapter->nb_queues == 0)
1824                 return;
1825
1826         if (rx_queue_id == -1) {
1827                 uint16_t nb_rx_queues;
1828                 uint16_t i;
1829
1830                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1831                 for (i = 0; i < nb_rx_queues; i++)
1832                         rxa_sw_del(rx_adapter, dev_info, i);
1833                 return;
1834         }
1835
1836         /* Push all the partial event vectors to event device. */
1837         TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1838                 if (vec->queue != rx_queue_id)
1839                         continue;
1840                 rxa_vector_expire(vec, rx_adapter);
1841                 TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
1842         }
1843
1844         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1845         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1846         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1847         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1848         rx_adapter->num_rx_polled -= pollq;
1849         dev_info->nb_rx_poll -= pollq;
1850         rx_adapter->num_rx_intr -= intrq;
1851         dev_info->nb_rx_intr -= intrq;
1852         dev_info->nb_shared_intr -= intrq && sintrq;
1853 }
1854
1855 static void
1856 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1857         struct eth_device_info *dev_info,
1858         int32_t rx_queue_id,
1859         const struct rte_event_eth_rx_adapter_queue_conf *conf)
1860 {
1861         struct eth_rx_queue_info *queue_info;
1862         const struct rte_event *ev = &conf->ev;
1863         int pollq;
1864         int intrq;
1865         int sintrq;
1866         struct rte_event *qi_ev;
1867
1868         if (rx_queue_id == -1) {
1869                 uint16_t nb_rx_queues;
1870                 uint16_t i;
1871
1872                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1873                 for (i = 0; i < nb_rx_queues; i++)
1874                         rxa_add_queue(rx_adapter, dev_info, i, conf);
1875                 return;
1876         }
1877
1878         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1879         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1880         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1881
1882         queue_info = &dev_info->rx_queue[rx_queue_id];
1883         queue_info->wt = conf->servicing_weight;
1884
1885         qi_ev = (struct rte_event *)&queue_info->event;
1886         qi_ev->event = ev->event;
1887         qi_ev->op = RTE_EVENT_OP_NEW;
1888         qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
1889         qi_ev->sub_event_type = 0;
1890
1891         if (conf->rx_queue_flags &
1892                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
1893                 queue_info->flow_id_mask = ~0;
1894         } else
1895                 qi_ev->flow_id = 0;
1896
1897         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
1898         if (rxa_polled_queue(dev_info, rx_queue_id)) {
1899                 rx_adapter->num_rx_polled += !pollq;
1900                 dev_info->nb_rx_poll += !pollq;
1901                 rx_adapter->num_rx_intr -= intrq;
1902                 dev_info->nb_rx_intr -= intrq;
1903                 dev_info->nb_shared_intr -= intrq && sintrq;
1904         }
1905
1906         if (rxa_intr_queue(dev_info, rx_queue_id)) {
1907                 rx_adapter->num_rx_polled -= pollq;
1908                 dev_info->nb_rx_poll -= pollq;
1909                 rx_adapter->num_rx_intr += !intrq;
1910                 dev_info->nb_rx_intr += !intrq;
1911                 dev_info->nb_shared_intr += !intrq && sintrq;
1912                 if (dev_info->nb_shared_intr == 1) {
1913                         if (dev_info->multi_intr_cap)
1914                                 dev_info->next_q_idx =
1915                                         RTE_MAX_RXTX_INTR_VEC_ID - 1;
1916                         else
1917                                 dev_info->next_q_idx = 0;
1918                 }
1919         }
1920 }
1921
1922 static void
1923 rxa_sw_event_vector_configure(
1924         struct rte_event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
1925         int rx_queue_id,
1926         const struct rte_event_eth_rx_adapter_event_vector_config *config)
1927 {
1928         struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
1929         struct eth_rx_queue_info *queue_info;
1930         struct rte_event *qi_ev;
1931
1932         if (rx_queue_id == -1) {
1933                 uint16_t nb_rx_queues;
1934                 uint16_t i;
1935
1936                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1937                 for (i = 0; i < nb_rx_queues; i++)
1938                         rxa_sw_event_vector_configure(rx_adapter, eth_dev_id, i,
1939                                                       config);
1940                 return;
1941         }
1942
1943         queue_info = &dev_info->rx_queue[rx_queue_id];
1944         qi_ev = (struct rte_event *)&queue_info->event;
1945         queue_info->ena_vector = 1;
1946         qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
1947         rxa_set_vector_data(queue_info, config->vector_sz,
1948                             config->vector_timeout_ns, config->vector_mp,
1949                             rx_queue_id, dev_info->dev->data->port_id);
1950         rx_adapter->ena_vector = 1;
1951         rx_adapter->vector_tmo_ticks =
1952                 rx_adapter->vector_tmo_ticks ?
1953                               RTE_MIN(config->vector_timeout_ns >> 1,
1954                                 rx_adapter->vector_tmo_ticks) :
1955                               config->vector_timeout_ns >> 1;
1956 }
1957
1958 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
1959                 uint16_t eth_dev_id,
1960                 int rx_queue_id,
1961                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
1962 {
1963         struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
1964         struct rte_event_eth_rx_adapter_queue_conf temp_conf;
1965         int ret;
1966         struct eth_rx_poll_entry *rx_poll;
1967         struct eth_rx_queue_info *rx_queue;
1968         uint32_t *rx_wrr;
1969         uint16_t nb_rx_queues;
1970         uint32_t nb_rx_poll, nb_wrr;
1971         uint32_t nb_rx_intr;
1972         int num_intr_vec;
1973         uint16_t wt;
1974
1975         if (queue_conf->servicing_weight == 0) {
1976                 struct rte_eth_dev_data *data = dev_info->dev->data;
1977
1978                 temp_conf = *queue_conf;
1979                 if (!data->dev_conf.intr_conf.rxq) {
1980                         /* If Rx interrupts are disabled set wt = 1 */
1981                         temp_conf.servicing_weight = 1;
1982                 }
1983                 queue_conf = &temp_conf;
1984         }
1985
1986         nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1987         rx_queue = dev_info->rx_queue;
1988         wt = queue_conf->servicing_weight;
1989
1990         if (dev_info->rx_queue == NULL) {
1991                 dev_info->rx_queue =
1992                     rte_zmalloc_socket(rx_adapter->mem_name,
1993                                        nb_rx_queues *
1994                                        sizeof(struct eth_rx_queue_info), 0,
1995                                        rx_adapter->socket_id);
1996                 if (dev_info->rx_queue == NULL)
1997                         return -ENOMEM;
1998         }
1999         rx_wrr = NULL;
2000         rx_poll = NULL;
2001
2002         rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
2003                         queue_conf->servicing_weight,
2004                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2005
2006         if (dev_info->dev->intr_handle)
2007                 dev_info->multi_intr_cap =
2008                         rte_intr_cap_multiple(dev_info->dev->intr_handle);
2009
2010         ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2011                                 &rx_poll, &rx_wrr);
2012         if (ret)
2013                 goto err_free_rxqueue;
2014
2015         if (wt == 0) {
2016                 num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
2017
2018                 ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
2019                 if (ret)
2020                         goto err_free_rxqueue;
2021
2022                 ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
2023                 if (ret)
2024                         goto err_free_rxqueue;
2025         } else {
2026
2027                 num_intr_vec = 0;
2028                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2029                         num_intr_vec = rxa_nb_intr_vect(dev_info,
2030                                                 rx_queue_id, 0);
2031                         /* interrupt based queues are being converted to
2032                          * poll mode queues, delete the interrupt configuration
2033                          * for those.
2034                          */
2035                         ret = rxa_del_intr_queue(rx_adapter,
2036                                                 dev_info, rx_queue_id);
2037                         if (ret)
2038                                 goto err_free_rxqueue;
2039                 }
2040         }
2041
2042         if (nb_rx_intr == 0) {
2043                 ret = rxa_free_intr_resources(rx_adapter);
2044                 if (ret)
2045                         goto err_free_rxqueue;
2046         }
2047
2048         if (wt == 0) {
2049                 uint16_t i;
2050
2051                 if (rx_queue_id  == -1) {
2052                         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
2053                                 dev_info->intr_queue[i] = i;
2054                 } else {
2055                         if (!rxa_intr_queue(dev_info, rx_queue_id))
2056                                 dev_info->intr_queue[nb_rx_intr - 1] =
2057                                         rx_queue_id;
2058                 }
2059         }
2060
2061
2062
2063         rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
2064         rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2065
2066         rte_free(rx_adapter->eth_rx_poll);
2067         rte_free(rx_adapter->wrr_sched);
2068
2069         rx_adapter->eth_rx_poll = rx_poll;
2070         rx_adapter->wrr_sched = rx_wrr;
2071         rx_adapter->wrr_len = nb_wrr;
2072         rx_adapter->num_intr_vec += num_intr_vec;
2073         return 0;
2074
2075 err_free_rxqueue:
2076         if (rx_queue == NULL) {
2077                 rte_free(dev_info->rx_queue);
2078                 dev_info->rx_queue = NULL;
2079         }
2080
2081         rte_free(rx_poll);
2082         rte_free(rx_wrr);
2083
2084         return 0;
2085 }
2086
2087 static int
2088 rxa_ctrl(uint8_t id, int start)
2089 {
2090         struct rte_event_eth_rx_adapter *rx_adapter;
2091         struct rte_eventdev *dev;
2092         struct eth_device_info *dev_info;
2093         uint32_t i;
2094         int use_service = 0;
2095         int stop = !start;
2096
2097         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2098         rx_adapter = rxa_id_to_adapter(id);
2099         if (rx_adapter == NULL)
2100                 return -EINVAL;
2101
2102         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2103
2104         RTE_ETH_FOREACH_DEV(i) {
2105                 dev_info = &rx_adapter->eth_devices[i];
2106                 /* if start  check for num dev queues */
2107                 if (start && !dev_info->nb_dev_queues)
2108                         continue;
2109                 /* if stop check if dev has been started */
2110                 if (stop && !dev_info->dev_rx_started)
2111                         continue;
2112                 use_service |= !dev_info->internal_event_port;
2113                 dev_info->dev_rx_started = start;
2114                 if (dev_info->internal_event_port == 0)
2115                         continue;
2116                 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
2117                                                 &rte_eth_devices[i]) :
2118                         (*dev->dev_ops->eth_rx_adapter_stop)(dev,
2119                                                 &rte_eth_devices[i]);
2120         }
2121
2122         if (use_service) {
2123                 rte_spinlock_lock(&rx_adapter->rx_lock);
2124                 rx_adapter->rxa_started = start;
2125                 rte_service_runstate_set(rx_adapter->service_id, start);
2126                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2127         }
2128
2129         return 0;
2130 }
2131
2132 int
2133 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
2134                                 rte_event_eth_rx_adapter_conf_cb conf_cb,
2135                                 void *conf_arg)
2136 {
2137         struct rte_event_eth_rx_adapter *rx_adapter;
2138         int ret;
2139         int socket_id;
2140         uint16_t i;
2141         char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
2142         const uint8_t default_rss_key[] = {
2143                 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
2144                 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
2145                 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
2146                 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
2147                 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
2148         };
2149
2150         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2151         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2152         if (conf_cb == NULL)
2153                 return -EINVAL;
2154
2155         if (event_eth_rx_adapter == NULL) {
2156                 ret = rte_event_eth_rx_adapter_init();
2157                 if (ret)
2158                         return ret;
2159         }
2160
2161         rx_adapter = rxa_id_to_adapter(id);
2162         if (rx_adapter != NULL) {
2163                 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
2164                 return -EEXIST;
2165         }
2166
2167         socket_id = rte_event_dev_socket_id(dev_id);
2168         snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
2169                 "rte_event_eth_rx_adapter_%d",
2170                 id);
2171
2172         rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
2173                         RTE_CACHE_LINE_SIZE, socket_id);
2174         if (rx_adapter == NULL) {
2175                 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
2176                 return -ENOMEM;
2177         }
2178
2179         rx_adapter->eventdev_id = dev_id;
2180         rx_adapter->socket_id = socket_id;
2181         rx_adapter->conf_cb = conf_cb;
2182         rx_adapter->conf_arg = conf_arg;
2183         rx_adapter->id = id;
2184         TAILQ_INIT(&rx_adapter->vector_list);
2185         strcpy(rx_adapter->mem_name, mem_name);
2186         rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
2187                                         RTE_MAX_ETHPORTS *
2188                                         sizeof(struct eth_device_info), 0,
2189                                         socket_id);
2190         rte_convert_rss_key((const uint32_t *)default_rss_key,
2191                         (uint32_t *)rx_adapter->rss_key_be,
2192                             RTE_DIM(default_rss_key));
2193
2194         if (rx_adapter->eth_devices == NULL) {
2195                 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
2196                 rte_free(rx_adapter);
2197                 return -ENOMEM;
2198         }
2199         rte_spinlock_init(&rx_adapter->rx_lock);
2200         for (i = 0; i < RTE_MAX_ETHPORTS; i++)
2201                 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
2202
2203         event_eth_rx_adapter[id] = rx_adapter;
2204         if (conf_cb == rxa_default_conf_cb)
2205                 rx_adapter->default_cb_arg = 1;
2206         rte_eventdev_trace_eth_rx_adapter_create(id, dev_id, conf_cb,
2207                 conf_arg);
2208         return 0;
2209 }
2210
2211 int
2212 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2213                 struct rte_event_port_conf *port_config)
2214 {
2215         struct rte_event_port_conf *pc;
2216         int ret;
2217
2218         if (port_config == NULL)
2219                 return -EINVAL;
2220         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2221
2222         pc = rte_malloc(NULL, sizeof(*pc), 0);
2223         if (pc == NULL)
2224                 return -ENOMEM;
2225         *pc = *port_config;
2226         ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2227                                         rxa_default_conf_cb,
2228                                         pc);
2229         if (ret)
2230                 rte_free(pc);
2231         return ret;
2232 }
2233
2234 int
2235 rte_event_eth_rx_adapter_free(uint8_t id)
2236 {
2237         struct rte_event_eth_rx_adapter *rx_adapter;
2238
2239         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2240
2241         rx_adapter = rxa_id_to_adapter(id);
2242         if (rx_adapter == NULL)
2243                 return -EINVAL;
2244
2245         if (rx_adapter->nb_queues) {
2246                 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2247                                 rx_adapter->nb_queues);
2248                 return -EBUSY;
2249         }
2250
2251         if (rx_adapter->default_cb_arg)
2252                 rte_free(rx_adapter->conf_arg);
2253         rte_free(rx_adapter->eth_devices);
2254         rte_free(rx_adapter);
2255         event_eth_rx_adapter[id] = NULL;
2256
2257         rte_eventdev_trace_eth_rx_adapter_free(id);
2258         return 0;
2259 }
2260
2261 int
2262 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2263                 uint16_t eth_dev_id,
2264                 int32_t rx_queue_id,
2265                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2266 {
2267         int ret;
2268         uint32_t cap;
2269         struct rte_event_eth_rx_adapter *rx_adapter;
2270         struct rte_eventdev *dev;
2271         struct eth_device_info *dev_info;
2272
2273         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2274         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2275
2276         rx_adapter = rxa_id_to_adapter(id);
2277         if ((rx_adapter == NULL) || (queue_conf == NULL))
2278                 return -EINVAL;
2279
2280         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2281         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2282                                                 eth_dev_id,
2283                                                 &cap);
2284         if (ret) {
2285                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2286                         "eth port %" PRIu16, id, eth_dev_id);
2287                 return ret;
2288         }
2289
2290         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2291                 && (queue_conf->rx_queue_flags &
2292                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2293                 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2294                                 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2295                                 eth_dev_id, id);
2296                 return -EINVAL;
2297         }
2298
2299         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0 &&
2300             (queue_conf->rx_queue_flags &
2301              RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR)) {
2302                 RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
2303                                  " eth port: %" PRIu16 " adapter id: %" PRIu8,
2304                                  eth_dev_id, id);
2305                 return -EINVAL;
2306         }
2307
2308         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2309                 (rx_queue_id != -1)) {
2310                 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2311                         "event queue, eth port: %" PRIu16 " adapter id: %"
2312                         PRIu8, eth_dev_id, id);
2313                 return -EINVAL;
2314         }
2315
2316         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2317                         rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2318                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2319                          (uint16_t)rx_queue_id);
2320                 return -EINVAL;
2321         }
2322
2323         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2324
2325         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2326                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2327                                         -ENOTSUP);
2328                 if (dev_info->rx_queue == NULL) {
2329                         dev_info->rx_queue =
2330                             rte_zmalloc_socket(rx_adapter->mem_name,
2331                                         dev_info->dev->data->nb_rx_queues *
2332                                         sizeof(struct eth_rx_queue_info), 0,
2333                                         rx_adapter->socket_id);
2334                         if (dev_info->rx_queue == NULL)
2335                                 return -ENOMEM;
2336                 }
2337
2338                 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2339                                 &rte_eth_devices[eth_dev_id],
2340                                 rx_queue_id, queue_conf);
2341                 if (ret == 0) {
2342                         dev_info->internal_event_port = 1;
2343                         rxa_update_queue(rx_adapter,
2344                                         &rx_adapter->eth_devices[eth_dev_id],
2345                                         rx_queue_id,
2346                                         1);
2347                 }
2348         } else {
2349                 rte_spinlock_lock(&rx_adapter->rx_lock);
2350                 dev_info->internal_event_port = 0;
2351                 ret = rxa_init_service(rx_adapter, id);
2352                 if (ret == 0) {
2353                         uint32_t service_id = rx_adapter->service_id;
2354                         ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2355                                         queue_conf);
2356                         rte_service_component_runstate_set(service_id,
2357                                 rxa_sw_adapter_queue_count(rx_adapter));
2358                 }
2359                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2360         }
2361
2362         rte_eventdev_trace_eth_rx_adapter_queue_add(id, eth_dev_id,
2363                 rx_queue_id, queue_conf, ret);
2364         if (ret)
2365                 return ret;
2366
2367         return 0;
2368 }
2369
2370 static int
2371 rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
2372 {
2373         limits->max_sz = MAX_VECTOR_SIZE;
2374         limits->min_sz = MIN_VECTOR_SIZE;
2375         limits->max_timeout_ns = MAX_VECTOR_NS;
2376         limits->min_timeout_ns = MIN_VECTOR_NS;
2377
2378         return 0;
2379 }
2380
2381 int
2382 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2383                                 int32_t rx_queue_id)
2384 {
2385         int ret = 0;
2386         struct rte_eventdev *dev;
2387         struct rte_event_eth_rx_adapter *rx_adapter;
2388         struct eth_device_info *dev_info;
2389         uint32_t cap;
2390         uint32_t nb_rx_poll = 0;
2391         uint32_t nb_wrr = 0;
2392         uint32_t nb_rx_intr;
2393         struct eth_rx_poll_entry *rx_poll = NULL;
2394         uint32_t *rx_wrr = NULL;
2395         int num_intr_vec;
2396
2397         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2398         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2399
2400         rx_adapter = rxa_id_to_adapter(id);
2401         if (rx_adapter == NULL)
2402                 return -EINVAL;
2403
2404         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2405         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2406                                                 eth_dev_id,
2407                                                 &cap);
2408         if (ret)
2409                 return ret;
2410
2411         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2412                 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2413                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2414                          (uint16_t)rx_queue_id);
2415                 return -EINVAL;
2416         }
2417
2418         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2419
2420         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2421                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2422                                  -ENOTSUP);
2423                 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2424                                                 &rte_eth_devices[eth_dev_id],
2425                                                 rx_queue_id);
2426                 if (ret == 0) {
2427                         rxa_update_queue(rx_adapter,
2428                                         &rx_adapter->eth_devices[eth_dev_id],
2429                                         rx_queue_id,
2430                                         0);
2431                         if (dev_info->nb_dev_queues == 0) {
2432                                 rte_free(dev_info->rx_queue);
2433                                 dev_info->rx_queue = NULL;
2434                         }
2435                 }
2436         } else {
2437                 rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2438                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2439
2440                 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2441                         &rx_poll, &rx_wrr);
2442                 if (ret)
2443                         return ret;
2444
2445                 rte_spinlock_lock(&rx_adapter->rx_lock);
2446
2447                 num_intr_vec = 0;
2448                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2449
2450                         num_intr_vec = rxa_nb_intr_vect(dev_info,
2451                                                 rx_queue_id, 0);
2452                         ret = rxa_del_intr_queue(rx_adapter, dev_info,
2453                                         rx_queue_id);
2454                         if (ret)
2455                                 goto unlock_ret;
2456                 }
2457
2458                 if (nb_rx_intr == 0) {
2459                         ret = rxa_free_intr_resources(rx_adapter);
2460                         if (ret)
2461                                 goto unlock_ret;
2462                 }
2463
2464                 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2465                 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2466
2467                 rte_free(rx_adapter->eth_rx_poll);
2468                 rte_free(rx_adapter->wrr_sched);
2469
2470                 if (nb_rx_intr == 0) {
2471                         rte_free(dev_info->intr_queue);
2472                         dev_info->intr_queue = NULL;
2473                 }
2474
2475                 rx_adapter->eth_rx_poll = rx_poll;
2476                 rx_adapter->wrr_sched = rx_wrr;
2477                 rx_adapter->wrr_len = nb_wrr;
2478                 rx_adapter->num_intr_vec += num_intr_vec;
2479
2480                 if (dev_info->nb_dev_queues == 0) {
2481                         rte_free(dev_info->rx_queue);
2482                         dev_info->rx_queue = NULL;
2483                 }
2484 unlock_ret:
2485                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2486                 if (ret) {
2487                         rte_free(rx_poll);
2488                         rte_free(rx_wrr);
2489                         return ret;
2490                 }
2491
2492                 rte_service_component_runstate_set(rx_adapter->service_id,
2493                                 rxa_sw_adapter_queue_count(rx_adapter));
2494         }
2495
2496         rte_eventdev_trace_eth_rx_adapter_queue_del(id, eth_dev_id,
2497                 rx_queue_id, ret);
2498         return ret;
2499 }
2500
2501 int
2502 rte_event_eth_rx_adapter_queue_event_vector_config(
2503         uint8_t id, uint16_t eth_dev_id, int32_t rx_queue_id,
2504         struct rte_event_eth_rx_adapter_event_vector_config *config)
2505 {
2506         struct rte_event_eth_rx_adapter_vector_limits limits;
2507         struct rte_event_eth_rx_adapter *rx_adapter;
2508         struct rte_eventdev *dev;
2509         uint32_t cap;
2510         int ret;
2511
2512         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2513         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2514
2515         rx_adapter = rxa_id_to_adapter(id);
2516         if ((rx_adapter == NULL) || (config == NULL))
2517                 return -EINVAL;
2518
2519         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2520         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2521                                                 eth_dev_id, &cap);
2522         if (ret) {
2523                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2524                                  "eth port %" PRIu16,
2525                                  id, eth_dev_id);
2526                 return ret;
2527         }
2528
2529         if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR)) {
2530                 RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
2531                                  " eth port: %" PRIu16 " adapter id: %" PRIu8,
2532                                  eth_dev_id, id);
2533                 return -EINVAL;
2534         }
2535
2536         ret = rte_event_eth_rx_adapter_vector_limits_get(
2537                 rx_adapter->eventdev_id, eth_dev_id, &limits);
2538         if (ret) {
2539                 RTE_EDEV_LOG_ERR("Failed to get vector limits edev %" PRIu8
2540                                  "eth port %" PRIu16,
2541                                  rx_adapter->eventdev_id, eth_dev_id);
2542                 return ret;
2543         }
2544
2545         if (config->vector_sz < limits.min_sz ||
2546             config->vector_sz > limits.max_sz ||
2547             config->vector_timeout_ns < limits.min_timeout_ns ||
2548             config->vector_timeout_ns > limits.max_timeout_ns ||
2549             config->vector_mp == NULL) {
2550                 RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2551                                  " eth port: %" PRIu16 " adapter id: %" PRIu8,
2552                                  eth_dev_id, id);
2553                 return -EINVAL;
2554         }
2555         if (config->vector_mp->elt_size <
2556             (sizeof(struct rte_event_vector) +
2557              (sizeof(uintptr_t) * config->vector_sz))) {
2558                 RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2559                                  " eth port: %" PRIu16 " adapter id: %" PRIu8,
2560                                  eth_dev_id, id);
2561                 return -EINVAL;
2562         }
2563
2564         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2565                 RTE_FUNC_PTR_OR_ERR_RET(
2566                         *dev->dev_ops->eth_rx_adapter_event_vector_config,
2567                         -ENOTSUP);
2568                 ret = dev->dev_ops->eth_rx_adapter_event_vector_config(
2569                         dev, &rte_eth_devices[eth_dev_id], rx_queue_id, config);
2570         } else {
2571                 rxa_sw_event_vector_configure(rx_adapter, eth_dev_id,
2572                                               rx_queue_id, config);
2573         }
2574
2575         return ret;
2576 }
2577
2578 int
2579 rte_event_eth_rx_adapter_vector_limits_get(
2580         uint8_t dev_id, uint16_t eth_port_id,
2581         struct rte_event_eth_rx_adapter_vector_limits *limits)
2582 {
2583         struct rte_eventdev *dev;
2584         uint32_t cap;
2585         int ret;
2586
2587         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2588         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_port_id, -EINVAL);
2589
2590         if (limits == NULL)
2591                 return -EINVAL;
2592
2593         dev = &rte_eventdevs[dev_id];
2594
2595         ret = rte_event_eth_rx_adapter_caps_get(dev_id, eth_port_id, &cap);
2596         if (ret) {
2597                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2598                                  "eth port %" PRIu16,
2599                                  dev_id, eth_port_id);
2600                 return ret;
2601         }
2602
2603         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2604                 RTE_FUNC_PTR_OR_ERR_RET(
2605                         *dev->dev_ops->eth_rx_adapter_vector_limits_get,
2606                         -ENOTSUP);
2607                 ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
2608                         dev, &rte_eth_devices[eth_port_id], limits);
2609         } else {
2610                 ret = rxa_sw_vector_limits(limits);
2611         }
2612
2613         return ret;
2614 }
2615
2616 int
2617 rte_event_eth_rx_adapter_start(uint8_t id)
2618 {
2619         rte_eventdev_trace_eth_rx_adapter_start(id);
2620         return rxa_ctrl(id, 1);
2621 }
2622
2623 int
2624 rte_event_eth_rx_adapter_stop(uint8_t id)
2625 {
2626         rte_eventdev_trace_eth_rx_adapter_stop(id);
2627         return rxa_ctrl(id, 0);
2628 }
2629
2630 int
2631 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2632                                struct rte_event_eth_rx_adapter_stats *stats)
2633 {
2634         struct rte_event_eth_rx_adapter *rx_adapter;
2635         struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2636         struct rte_event_eth_rx_adapter_stats dev_stats;
2637         struct rte_eventdev *dev;
2638         struct eth_device_info *dev_info;
2639         uint32_t i;
2640         int ret;
2641
2642         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2643
2644         rx_adapter = rxa_id_to_adapter(id);
2645         if (rx_adapter  == NULL || stats == NULL)
2646                 return -EINVAL;
2647
2648         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2649         memset(stats, 0, sizeof(*stats));
2650         RTE_ETH_FOREACH_DEV(i) {
2651                 dev_info = &rx_adapter->eth_devices[i];
2652                 if (dev_info->internal_event_port == 0 ||
2653                         dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2654                         continue;
2655                 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2656                                                 &rte_eth_devices[i],
2657                                                 &dev_stats);
2658                 if (ret)
2659                         continue;
2660                 dev_stats_sum.rx_packets += dev_stats.rx_packets;
2661                 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2662         }
2663
2664         if (rx_adapter->service_inited)
2665                 *stats = rx_adapter->stats;
2666
2667         stats->rx_packets += dev_stats_sum.rx_packets;
2668         stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2669         return 0;
2670 }
2671
2672 int
2673 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
2674 {
2675         struct rte_event_eth_rx_adapter *rx_adapter;
2676         struct rte_eventdev *dev;
2677         struct eth_device_info *dev_info;
2678         uint32_t i;
2679
2680         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2681
2682         rx_adapter = rxa_id_to_adapter(id);
2683         if (rx_adapter == NULL)
2684                 return -EINVAL;
2685
2686         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2687         RTE_ETH_FOREACH_DEV(i) {
2688                 dev_info = &rx_adapter->eth_devices[i];
2689                 if (dev_info->internal_event_port == 0 ||
2690                         dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
2691                         continue;
2692                 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
2693                                                         &rte_eth_devices[i]);
2694         }
2695
2696         memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
2697         return 0;
2698 }
2699
2700 int
2701 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
2702 {
2703         struct rte_event_eth_rx_adapter *rx_adapter;
2704
2705         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2706
2707         rx_adapter = rxa_id_to_adapter(id);
2708         if (rx_adapter == NULL || service_id == NULL)
2709                 return -EINVAL;
2710
2711         if (rx_adapter->service_inited)
2712                 *service_id = rx_adapter->service_id;
2713
2714         return rx_adapter->service_inited ? 0 : -ESRCH;
2715 }
2716
2717 int
2718 rte_event_eth_rx_adapter_cb_register(uint8_t id,
2719                                         uint16_t eth_dev_id,
2720                                         rte_event_eth_rx_adapter_cb_fn cb_fn,
2721                                         void *cb_arg)
2722 {
2723         struct rte_event_eth_rx_adapter *rx_adapter;
2724         struct eth_device_info *dev_info;
2725         uint32_t cap;
2726         int ret;
2727
2728         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2729         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2730
2731         rx_adapter = rxa_id_to_adapter(id);
2732         if (rx_adapter == NULL)
2733                 return -EINVAL;
2734
2735         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2736         if (dev_info->rx_queue == NULL)
2737                 return -EINVAL;
2738
2739         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2740                                                 eth_dev_id,
2741                                                 &cap);
2742         if (ret) {
2743                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2744                         "eth port %" PRIu16, id, eth_dev_id);
2745                 return ret;
2746         }
2747
2748         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2749                 RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
2750                                 PRIu16, eth_dev_id);
2751                 return -EINVAL;
2752         }
2753
2754         rte_spinlock_lock(&rx_adapter->rx_lock);
2755         dev_info->cb_fn = cb_fn;
2756         dev_info->cb_arg = cb_arg;
2757         rte_spinlock_unlock(&rx_adapter->rx_lock);
2758
2759         return 0;
2760 }