25595afdf8be75f55373b2a11024122a96836a69
[dpdk.git] / lib / eventdev / rte_event_eth_rx_adapter.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #if defined(LINUX)
6 #include <sys/epoll.h>
7 #endif
8 #include <unistd.h>
9
10 #include <rte_cycles.h>
11 #include <rte_common.h>
12 #include <rte_dev.h>
13 #include <rte_errno.h>
14 #include <ethdev_driver.h>
15 #include <rte_log.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
20 #include <rte_mbuf_dyn.h>
21
22 #include "rte_eventdev.h"
23 #include "eventdev_pmd.h"
24 #include "rte_eventdev_trace.h"
25 #include "rte_event_eth_rx_adapter.h"
26
27 #define BATCH_SIZE              32
28 #define BLOCK_CNT_THRESHOLD     10
29 #define ETH_EVENT_BUFFER_SIZE   (6*BATCH_SIZE)
30 #define MAX_VECTOR_SIZE         1024
31 #define MIN_VECTOR_SIZE         4
32 #define MAX_VECTOR_NS           1E9
33 #define MIN_VECTOR_NS           1E5
34
35 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
36 #define ETH_RX_ADAPTER_MEM_NAME_LEN     32
37
38 #define RSS_KEY_SIZE    40
39 /* value written to intr thread pipe to signal thread exit */
40 #define ETH_BRIDGE_INTR_THREAD_EXIT     1
41 /* Sentinel value to detect initialized file handle */
42 #define INIT_FD         -1
43
44 /*
45  * Used to store port and queue ID of interrupting Rx queue
46  */
47 union queue_data {
48         RTE_STD_C11
49         void *ptr;
50         struct {
51                 uint16_t port;
52                 uint16_t queue;
53         };
54 };
55
56 /*
57  * There is an instance of this struct per polled Rx queue added to the
58  * adapter
59  */
60 struct eth_rx_poll_entry {
61         /* Eth port to poll */
62         uint16_t eth_dev_id;
63         /* Eth rx queue to poll */
64         uint16_t eth_rx_qid;
65 };
66
67 struct eth_rx_vector_data {
68         TAILQ_ENTRY(eth_rx_vector_data) next;
69         uint16_t port;
70         uint16_t queue;
71         uint16_t max_vector_count;
72         uint64_t event;
73         uint64_t ts;
74         uint64_t vector_timeout_ticks;
75         struct rte_mempool *vector_pool;
76         struct rte_event_vector *vector_ev;
77 } __rte_cache_aligned;
78
79 TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
80
81 /* Instance per adapter */
82 struct rte_eth_event_enqueue_buffer {
83         /* Count of events in this buffer */
84         uint16_t count;
85         /* Array of events in this buffer */
86         struct rte_event events[ETH_EVENT_BUFFER_SIZE];
87         /* Event enqueue happens from head */
88         uint16_t head;
89         /* New packets from rte_eth_rx_burst is enqued from tail */
90         uint16_t tail;
91         /* last element in the buffer before rollover */
92         uint16_t last;
93         uint16_t last_mask;
94 };
95
96 struct rte_event_eth_rx_adapter {
97         /* RSS key */
98         uint8_t rss_key_be[RSS_KEY_SIZE];
99         /* Event device identifier */
100         uint8_t eventdev_id;
101         /* Per ethernet device structure */
102         struct eth_device_info *eth_devices;
103         /* Event port identifier */
104         uint8_t event_port_id;
105         /* Lock to serialize config updates with service function */
106         rte_spinlock_t rx_lock;
107         /* Max mbufs processed in any service function invocation */
108         uint32_t max_nb_rx;
109         /* Receive queues that need to be polled */
110         struct eth_rx_poll_entry *eth_rx_poll;
111         /* Size of the eth_rx_poll array */
112         uint16_t num_rx_polled;
113         /* Weighted round robin schedule */
114         uint32_t *wrr_sched;
115         /* wrr_sched[] size */
116         uint32_t wrr_len;
117         /* Next entry in wrr[] to begin polling */
118         uint32_t wrr_pos;
119         /* Event burst buffer */
120         struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
121         /* Vector enable flag */
122         uint8_t ena_vector;
123         /* Timestamp of previous vector expiry list traversal */
124         uint64_t prev_expiry_ts;
125         /* Minimum ticks to wait before traversing expiry list */
126         uint64_t vector_tmo_ticks;
127         /* vector list */
128         struct eth_rx_vector_data_list vector_list;
129         /* Per adapter stats */
130         struct rte_event_eth_rx_adapter_stats stats;
131         /* Block count, counts up to BLOCK_CNT_THRESHOLD */
132         uint16_t enq_block_count;
133         /* Block start ts */
134         uint64_t rx_enq_block_start_ts;
135         /* epoll fd used to wait for Rx interrupts */
136         int epd;
137         /* Num of interrupt driven interrupt queues */
138         uint32_t num_rx_intr;
139         /* Used to send <dev id, queue id> of interrupting Rx queues from
140          * the interrupt thread to the Rx thread
141          */
142         struct rte_ring *intr_ring;
143         /* Rx Queue data (dev id, queue id) for the last non-empty
144          * queue polled
145          */
146         union queue_data qd;
147         /* queue_data is valid */
148         int qd_valid;
149         /* Interrupt ring lock, synchronizes Rx thread
150          * and interrupt thread
151          */
152         rte_spinlock_t intr_ring_lock;
153         /* event array passed to rte_poll_wait */
154         struct rte_epoll_event *epoll_events;
155         /* Count of interrupt vectors in use */
156         uint32_t num_intr_vec;
157         /* Thread blocked on Rx interrupts */
158         pthread_t rx_intr_thread;
159         /* Configuration callback for rte_service configuration */
160         rte_event_eth_rx_adapter_conf_cb conf_cb;
161         /* Configuration callback argument */
162         void *conf_arg;
163         /* Set if  default_cb is being used */
164         int default_cb_arg;
165         /* Service initialization state */
166         uint8_t service_inited;
167         /* Total count of Rx queues in adapter */
168         uint32_t nb_queues;
169         /* Memory allocation name */
170         char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
171         /* Socket identifier cached from eventdev */
172         int socket_id;
173         /* Per adapter EAL service */
174         uint32_t service_id;
175         /* Adapter started flag */
176         uint8_t rxa_started;
177         /* Adapter ID */
178         uint8_t id;
179 } __rte_cache_aligned;
180
181 /* Per eth device */
182 struct eth_device_info {
183         struct rte_eth_dev *dev;
184         struct eth_rx_queue_info *rx_queue;
185         /* Rx callback */
186         rte_event_eth_rx_adapter_cb_fn cb_fn;
187         /* Rx callback argument */
188         void *cb_arg;
189         /* Set if ethdev->eventdev packet transfer uses a
190          * hardware mechanism
191          */
192         uint8_t internal_event_port;
193         /* Set if the adapter is processing rx queues for
194          * this eth device and packet processing has been
195          * started, allows for the code to know if the PMD
196          * rx_adapter_stop callback needs to be invoked
197          */
198         uint8_t dev_rx_started;
199         /* Number of queues added for this device */
200         uint16_t nb_dev_queues;
201         /* Number of poll based queues
202          * If nb_rx_poll > 0, the start callback will
203          * be invoked if not already invoked
204          */
205         uint16_t nb_rx_poll;
206         /* Number of interrupt based queues
207          * If nb_rx_intr > 0, the start callback will
208          * be invoked if not already invoked.
209          */
210         uint16_t nb_rx_intr;
211         /* Number of queues that use the shared interrupt */
212         uint16_t nb_shared_intr;
213         /* sum(wrr(q)) for all queues within the device
214          * useful when deleting all device queues
215          */
216         uint32_t wrr_len;
217         /* Intr based queue index to start polling from, this is used
218          * if the number of shared interrupts is non-zero
219          */
220         uint16_t next_q_idx;
221         /* Intr based queue indices */
222         uint16_t *intr_queue;
223         /* device generates per Rx queue interrupt for queue index
224          * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
225          */
226         int multi_intr_cap;
227         /* shared interrupt enabled */
228         int shared_intr_enabled;
229 };
230
231 /* Per Rx queue */
232 struct eth_rx_queue_info {
233         int queue_enabled;      /* True if added */
234         int intr_enabled;
235         uint8_t ena_vector;
236         uint16_t wt;            /* Polling weight */
237         uint32_t flow_id_mask;  /* Set to ~0 if app provides flow id else 0 */
238         uint64_t event;
239         struct eth_rx_vector_data vector_data;
240 };
241
242 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
243
244 /* Enable dynamic timestamp field in mbuf */
245 static uint64_t event_eth_rx_timestamp_dynflag;
246 static int event_eth_rx_timestamp_dynfield_offset = -1;
247
248 static inline rte_mbuf_timestamp_t *
249 rxa_timestamp_dynfield(struct rte_mbuf *mbuf)
250 {
251         return RTE_MBUF_DYNFIELD(mbuf,
252                 event_eth_rx_timestamp_dynfield_offset, rte_mbuf_timestamp_t *);
253 }
254
255 static inline int
256 rxa_validate_id(uint8_t id)
257 {
258         return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
259 }
260
261 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
262         if (!rxa_validate_id(id)) { \
263                 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
264                 return retval; \
265         } \
266 } while (0)
267
268 static inline int
269 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
270 {
271         return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
272 }
273
274 /* Greatest common divisor */
275 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
276 {
277         uint16_t r = a % b;
278
279         return r ? rxa_gcd_u16(b, r) : b;
280 }
281
282 /* Returns the next queue in the polling sequence
283  *
284  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
285  */
286 static int
287 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
288          unsigned int n, int *cw,
289          struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
290          uint16_t gcd, int prev)
291 {
292         int i = prev;
293         uint16_t w;
294
295         while (1) {
296                 uint16_t q;
297                 uint16_t d;
298
299                 i = (i + 1) % n;
300                 if (i == 0) {
301                         *cw = *cw - gcd;
302                         if (*cw <= 0)
303                                 *cw = max_wt;
304                 }
305
306                 q = eth_rx_poll[i].eth_rx_qid;
307                 d = eth_rx_poll[i].eth_dev_id;
308                 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
309
310                 if ((int)w >= *cw)
311                         return i;
312         }
313 }
314
315 static inline int
316 rxa_shared_intr(struct eth_device_info *dev_info,
317         int rx_queue_id)
318 {
319         int multi_intr_cap;
320
321         if (dev_info->dev->intr_handle == NULL)
322                 return 0;
323
324         multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
325         return !multi_intr_cap ||
326                 rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
327 }
328
329 static inline int
330 rxa_intr_queue(struct eth_device_info *dev_info,
331         int rx_queue_id)
332 {
333         struct eth_rx_queue_info *queue_info;
334
335         queue_info = &dev_info->rx_queue[rx_queue_id];
336         return dev_info->rx_queue &&
337                 !dev_info->internal_event_port &&
338                 queue_info->queue_enabled && queue_info->wt == 0;
339 }
340
341 static inline int
342 rxa_polled_queue(struct eth_device_info *dev_info,
343         int rx_queue_id)
344 {
345         struct eth_rx_queue_info *queue_info;
346
347         queue_info = &dev_info->rx_queue[rx_queue_id];
348         return !dev_info->internal_event_port &&
349                 dev_info->rx_queue &&
350                 queue_info->queue_enabled && queue_info->wt != 0;
351 }
352
353 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
354 static int
355 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
356 {
357         uint16_t i;
358         int n, s;
359         uint16_t nbq;
360
361         nbq = dev_info->dev->data->nb_rx_queues;
362         n = 0; /* non shared count */
363         s = 0; /* shared count */
364
365         if (rx_queue_id == -1) {
366                 for (i = 0; i < nbq; i++) {
367                         if (!rxa_shared_intr(dev_info, i))
368                                 n += add ? !rxa_intr_queue(dev_info, i) :
369                                         rxa_intr_queue(dev_info, i);
370                         else
371                                 s += add ? !rxa_intr_queue(dev_info, i) :
372                                         rxa_intr_queue(dev_info, i);
373                 }
374
375                 if (s > 0) {
376                         if ((add && dev_info->nb_shared_intr == 0) ||
377                                 (!add && dev_info->nb_shared_intr))
378                                 n += 1;
379                 }
380         } else {
381                 if (!rxa_shared_intr(dev_info, rx_queue_id))
382                         n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
383                                 rxa_intr_queue(dev_info, rx_queue_id);
384                 else
385                         n = add ? !dev_info->nb_shared_intr :
386                                 dev_info->nb_shared_intr == 1;
387         }
388
389         return add ? n : -n;
390 }
391
392 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
393  */
394 static void
395 rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
396                         struct eth_device_info *dev_info,
397                         int rx_queue_id,
398                         uint32_t *nb_rx_intr)
399 {
400         uint32_t intr_diff;
401
402         if (rx_queue_id == -1)
403                 intr_diff = dev_info->nb_rx_intr;
404         else
405                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
406
407         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
408 }
409
410 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
411  * interrupt queues could currently be poll mode Rx queues
412  */
413 static void
414 rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
415                         struct eth_device_info *dev_info,
416                         int rx_queue_id,
417                         uint32_t *nb_rx_poll,
418                         uint32_t *nb_rx_intr,
419                         uint32_t *nb_wrr)
420 {
421         uint32_t intr_diff;
422         uint32_t poll_diff;
423         uint32_t wrr_len_diff;
424
425         if (rx_queue_id == -1) {
426                 intr_diff = dev_info->dev->data->nb_rx_queues -
427                                                 dev_info->nb_rx_intr;
428                 poll_diff = dev_info->nb_rx_poll;
429                 wrr_len_diff = dev_info->wrr_len;
430         } else {
431                 intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
432                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
433                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
434                                         0;
435         }
436
437         *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
438         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
439         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
440 }
441
442 /* Calculate size of the eth_rx_poll and wrr_sched arrays
443  * after deleting poll mode rx queues
444  */
445 static void
446 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
447                         struct eth_device_info *dev_info,
448                         int rx_queue_id,
449                         uint32_t *nb_rx_poll,
450                         uint32_t *nb_wrr)
451 {
452         uint32_t poll_diff;
453         uint32_t wrr_len_diff;
454
455         if (rx_queue_id == -1) {
456                 poll_diff = dev_info->nb_rx_poll;
457                 wrr_len_diff = dev_info->wrr_len;
458         } else {
459                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
460                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
461                                         0;
462         }
463
464         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
465         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
466 }
467
468 /* Calculate nb_rx_* after adding poll mode rx queues
469  */
470 static void
471 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
472                         struct eth_device_info *dev_info,
473                         int rx_queue_id,
474                         uint16_t wt,
475                         uint32_t *nb_rx_poll,
476                         uint32_t *nb_rx_intr,
477                         uint32_t *nb_wrr)
478 {
479         uint32_t intr_diff;
480         uint32_t poll_diff;
481         uint32_t wrr_len_diff;
482
483         if (rx_queue_id == -1) {
484                 intr_diff = dev_info->nb_rx_intr;
485                 poll_diff = dev_info->dev->data->nb_rx_queues -
486                                                 dev_info->nb_rx_poll;
487                 wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
488                                 - dev_info->wrr_len;
489         } else {
490                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
491                 poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
492                 wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
493                                 wt - dev_info->rx_queue[rx_queue_id].wt :
494                                 wt;
495         }
496
497         *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
498         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
499         *nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
500 }
501
502 /* Calculate nb_rx_* after adding rx_queue_id */
503 static void
504 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
505                 struct eth_device_info *dev_info,
506                 int rx_queue_id,
507                 uint16_t wt,
508                 uint32_t *nb_rx_poll,
509                 uint32_t *nb_rx_intr,
510                 uint32_t *nb_wrr)
511 {
512         if (wt != 0)
513                 rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
514                                         wt, nb_rx_poll, nb_rx_intr, nb_wrr);
515         else
516                 rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
517                                         nb_rx_poll, nb_rx_intr, nb_wrr);
518 }
519
520 /* Calculate nb_rx_* after deleting rx_queue_id */
521 static void
522 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
523                 struct eth_device_info *dev_info,
524                 int rx_queue_id,
525                 uint32_t *nb_rx_poll,
526                 uint32_t *nb_rx_intr,
527                 uint32_t *nb_wrr)
528 {
529         rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
530                                 nb_wrr);
531         rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
532                                 nb_rx_intr);
533 }
534
535 /*
536  * Allocate the rx_poll array
537  */
538 static struct eth_rx_poll_entry *
539 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
540         uint32_t num_rx_polled)
541 {
542         size_t len;
543
544         len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
545                                                         RTE_CACHE_LINE_SIZE);
546         return  rte_zmalloc_socket(rx_adapter->mem_name,
547                                 len,
548                                 RTE_CACHE_LINE_SIZE,
549                                 rx_adapter->socket_id);
550 }
551
552 /*
553  * Allocate the WRR array
554  */
555 static uint32_t *
556 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
557 {
558         size_t len;
559
560         len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
561                         RTE_CACHE_LINE_SIZE);
562         return  rte_zmalloc_socket(rx_adapter->mem_name,
563                                 len,
564                                 RTE_CACHE_LINE_SIZE,
565                                 rx_adapter->socket_id);
566 }
567
568 static int
569 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
570                 uint32_t nb_poll,
571                 uint32_t nb_wrr,
572                 struct eth_rx_poll_entry **rx_poll,
573                 uint32_t **wrr_sched)
574 {
575
576         if (nb_poll == 0) {
577                 *rx_poll = NULL;
578                 *wrr_sched = NULL;
579                 return 0;
580         }
581
582         *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
583         if (*rx_poll == NULL) {
584                 *wrr_sched = NULL;
585                 return -ENOMEM;
586         }
587
588         *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
589         if (*wrr_sched == NULL) {
590                 rte_free(*rx_poll);
591                 return -ENOMEM;
592         }
593         return 0;
594 }
595
596 /* Precalculate WRR polling sequence for all queues in rx_adapter */
597 static void
598 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
599                 struct eth_rx_poll_entry *rx_poll,
600                 uint32_t *rx_wrr)
601 {
602         uint16_t d;
603         uint16_t q;
604         unsigned int i;
605         int prev = -1;
606         int cw = -1;
607
608         /* Initialize variables for calculation of wrr schedule */
609         uint16_t max_wrr_pos = 0;
610         unsigned int poll_q = 0;
611         uint16_t max_wt = 0;
612         uint16_t gcd = 0;
613
614         if (rx_poll == NULL)
615                 return;
616
617         /* Generate array of all queues to poll, the size of this
618          * array is poll_q
619          */
620         RTE_ETH_FOREACH_DEV(d) {
621                 uint16_t nb_rx_queues;
622                 struct eth_device_info *dev_info =
623                                 &rx_adapter->eth_devices[d];
624                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
625                 if (dev_info->rx_queue == NULL)
626                         continue;
627                 if (dev_info->internal_event_port)
628                         continue;
629                 dev_info->wrr_len = 0;
630                 for (q = 0; q < nb_rx_queues; q++) {
631                         struct eth_rx_queue_info *queue_info =
632                                 &dev_info->rx_queue[q];
633                         uint16_t wt;
634
635                         if (!rxa_polled_queue(dev_info, q))
636                                 continue;
637                         wt = queue_info->wt;
638                         rx_poll[poll_q].eth_dev_id = d;
639                         rx_poll[poll_q].eth_rx_qid = q;
640                         max_wrr_pos += wt;
641                         dev_info->wrr_len += wt;
642                         max_wt = RTE_MAX(max_wt, wt);
643                         gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
644                         poll_q++;
645                 }
646         }
647
648         /* Generate polling sequence based on weights */
649         prev = -1;
650         cw = -1;
651         for (i = 0; i < max_wrr_pos; i++) {
652                 rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
653                                      rx_poll, max_wt, gcd, prev);
654                 prev = rx_wrr[i];
655         }
656 }
657
658 static inline void
659 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
660         struct rte_ipv6_hdr **ipv6_hdr)
661 {
662         struct rte_ether_hdr *eth_hdr =
663                 rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
664         struct rte_vlan_hdr *vlan_hdr;
665
666         *ipv4_hdr = NULL;
667         *ipv6_hdr = NULL;
668
669         switch (eth_hdr->ether_type) {
670         case RTE_BE16(RTE_ETHER_TYPE_IPV4):
671                 *ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
672                 break;
673
674         case RTE_BE16(RTE_ETHER_TYPE_IPV6):
675                 *ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
676                 break;
677
678         case RTE_BE16(RTE_ETHER_TYPE_VLAN):
679                 vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
680                 switch (vlan_hdr->eth_proto) {
681                 case RTE_BE16(RTE_ETHER_TYPE_IPV4):
682                         *ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
683                         break;
684                 case RTE_BE16(RTE_ETHER_TYPE_IPV6):
685                         *ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
686                         break;
687                 default:
688                         break;
689                 }
690                 break;
691
692         default:
693                 break;
694         }
695 }
696
697 /* Calculate RSS hash for IPv4/6 */
698 static inline uint32_t
699 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
700 {
701         uint32_t input_len;
702         void *tuple;
703         struct rte_ipv4_tuple ipv4_tuple;
704         struct rte_ipv6_tuple ipv6_tuple;
705         struct rte_ipv4_hdr *ipv4_hdr;
706         struct rte_ipv6_hdr *ipv6_hdr;
707
708         rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
709
710         if (ipv4_hdr) {
711                 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
712                 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
713                 tuple = &ipv4_tuple;
714                 input_len = RTE_THASH_V4_L3_LEN;
715         } else if (ipv6_hdr) {
716                 rte_thash_load_v6_addrs(ipv6_hdr,
717                                         (union rte_thash_tuple *)&ipv6_tuple);
718                 tuple = &ipv6_tuple;
719                 input_len = RTE_THASH_V6_L3_LEN;
720         } else
721                 return 0;
722
723         return rte_softrss_be(tuple, input_len, rss_key_be);
724 }
725
726 static inline int
727 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
728 {
729         return !!rx_adapter->enq_block_count;
730 }
731
732 static inline void
733 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
734 {
735         if (rx_adapter->rx_enq_block_start_ts)
736                 return;
737
738         rx_adapter->enq_block_count++;
739         if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
740                 return;
741
742         rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
743 }
744
745 static inline void
746 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
747                     struct rte_event_eth_rx_adapter_stats *stats)
748 {
749         if (unlikely(!stats->rx_enq_start_ts))
750                 stats->rx_enq_start_ts = rte_get_tsc_cycles();
751
752         if (likely(!rxa_enq_blocked(rx_adapter)))
753                 return;
754
755         rx_adapter->enq_block_count = 0;
756         if (rx_adapter->rx_enq_block_start_ts) {
757                 stats->rx_enq_end_ts = rte_get_tsc_cycles();
758                 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
759                     rx_adapter->rx_enq_block_start_ts;
760                 rx_adapter->rx_enq_block_start_ts = 0;
761         }
762 }
763
764 /* Enqueue buffered events to event device */
765 static inline uint16_t
766 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
767 {
768         struct rte_eth_event_enqueue_buffer *buf =
769             &rx_adapter->event_enqueue_buffer;
770         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
771         uint16_t count = buf->last ? buf->last - buf->head : buf->count;
772
773         if (!count)
774                 return 0;
775
776         uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
777                                         rx_adapter->event_port_id,
778                                         &buf->events[buf->head],
779                                         count);
780         if (n != count)
781                 stats->rx_enq_retry++;
782
783         buf->head += n;
784
785         if (buf->last && n == count) {
786                 uint16_t n1;
787
788                 n1 = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
789                                         rx_adapter->event_port_id,
790                                         &buf->events[0],
791                                         buf->tail);
792
793                 if (n1 != buf->tail)
794                         stats->rx_enq_retry++;
795
796                 buf->last = 0;
797                 buf->head = n1;
798                 buf->last_mask = 0;
799                 n += n1;
800         }
801
802         n ? rxa_enq_block_end_ts(rx_adapter, stats) :
803                 rxa_enq_block_start_ts(rx_adapter);
804
805         buf->count -= n;
806         stats->rx_enq_count += n;
807
808         return n;
809 }
810
811 static inline void
812 rxa_init_vector(struct rte_event_eth_rx_adapter *rx_adapter,
813                 struct eth_rx_vector_data *vec)
814 {
815         vec->vector_ev->nb_elem = 0;
816         vec->vector_ev->port = vec->port;
817         vec->vector_ev->queue = vec->queue;
818         vec->vector_ev->attr_valid = true;
819         TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
820 }
821
822 static inline uint16_t
823 rxa_create_event_vector(struct rte_event_eth_rx_adapter *rx_adapter,
824                         struct eth_rx_queue_info *queue_info,
825                         struct rte_eth_event_enqueue_buffer *buf,
826                         struct rte_mbuf **mbufs, uint16_t num)
827 {
828         struct rte_event *ev = &buf->events[buf->count];
829         struct eth_rx_vector_data *vec;
830         uint16_t filled, space, sz;
831
832         filled = 0;
833         vec = &queue_info->vector_data;
834
835         if (vec->vector_ev == NULL) {
836                 if (rte_mempool_get(vec->vector_pool,
837                                     (void **)&vec->vector_ev) < 0) {
838                         rte_pktmbuf_free_bulk(mbufs, num);
839                         return 0;
840                 }
841                 rxa_init_vector(rx_adapter, vec);
842         }
843         while (num) {
844                 if (vec->vector_ev->nb_elem == vec->max_vector_count) {
845                         /* Event ready. */
846                         ev->event = vec->event;
847                         ev->vec = vec->vector_ev;
848                         ev++;
849                         filled++;
850                         vec->vector_ev = NULL;
851                         TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
852                         if (rte_mempool_get(vec->vector_pool,
853                                             (void **)&vec->vector_ev) < 0) {
854                                 rte_pktmbuf_free_bulk(mbufs, num);
855                                 return 0;
856                         }
857                         rxa_init_vector(rx_adapter, vec);
858                 }
859
860                 space = vec->max_vector_count - vec->vector_ev->nb_elem;
861                 sz = num > space ? space : num;
862                 memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
863                        sizeof(void *) * sz);
864                 vec->vector_ev->nb_elem += sz;
865                 num -= sz;
866                 mbufs += sz;
867                 vec->ts = rte_rdtsc();
868         }
869
870         if (vec->vector_ev->nb_elem == vec->max_vector_count) {
871                 ev->event = vec->event;
872                 ev->vec = vec->vector_ev;
873                 ev++;
874                 filled++;
875                 vec->vector_ev = NULL;
876                 TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
877         }
878
879         return filled;
880 }
881
882 static inline void
883 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
884                 uint16_t eth_dev_id,
885                 uint16_t rx_queue_id,
886                 struct rte_mbuf **mbufs,
887                 uint16_t num)
888 {
889         uint32_t i;
890         struct eth_device_info *dev_info =
891                                         &rx_adapter->eth_devices[eth_dev_id];
892         struct eth_rx_queue_info *eth_rx_queue_info =
893                                         &dev_info->rx_queue[rx_queue_id];
894         struct rte_eth_event_enqueue_buffer *buf =
895                                         &rx_adapter->event_enqueue_buffer;
896         uint16_t new_tail = buf->tail;
897         uint64_t event = eth_rx_queue_info->event;
898         uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
899         struct rte_mbuf *m = mbufs[0];
900         uint32_t rss_mask;
901         uint32_t rss;
902         int do_rss;
903         uint16_t nb_cb;
904         uint16_t dropped;
905         uint64_t ts, ts_mask;
906
907         if (!eth_rx_queue_info->ena_vector) {
908                 ts = m->ol_flags & event_eth_rx_timestamp_dynflag ?
909                                                 0 : rte_get_tsc_cycles();
910
911                 /* 0xffff ffff ffff ffff if PKT_RX_TIMESTAMP is set,
912                  * otherwise 0
913                  */
914                 ts_mask = (uint64_t)(!(m->ol_flags &
915                                        event_eth_rx_timestamp_dynflag)) - 1ULL;
916
917                 /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
918                 rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
919                 do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
920                 for (i = 0; i < num; i++) {
921                         struct rte_event *ev;
922
923                         m = mbufs[i];
924                         *rxa_timestamp_dynfield(m) = ts |
925                                         (*rxa_timestamp_dynfield(m) & ts_mask);
926
927                         ev = &buf->events[new_tail];
928
929                         rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
930                                      : m->hash.rss;
931                         ev->event = event;
932                         ev->flow_id = (rss & ~flow_id_mask) |
933                                       (ev->flow_id & flow_id_mask);
934                         ev->mbuf = m;
935                         new_tail++;
936                 }
937         } else {
938                 num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
939                                               buf, mbufs, num);
940         }
941
942         if (num && dev_info->cb_fn) {
943
944                 dropped = 0;
945                 nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
946                                        buf->last |
947                                        (RTE_DIM(buf->events) & ~buf->last_mask),
948                                        buf->count >= BATCH_SIZE ?
949                                                 buf->count - BATCH_SIZE : 0,
950                                        &buf->events[buf->tail],
951                                        num,
952                                        dev_info->cb_arg,
953                                        &dropped);
954                 if (unlikely(nb_cb > num))
955                         RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
956                                 nb_cb, num);
957                 else
958                         num = nb_cb;
959                 if (dropped)
960                         rx_adapter->stats.rx_dropped += dropped;
961         }
962
963         buf->count += num;
964         buf->tail += num;
965 }
966
967 static inline bool
968 rxa_pkt_buf_available(struct rte_eth_event_enqueue_buffer *buf)
969 {
970         uint32_t nb_req = buf->tail + BATCH_SIZE;
971
972         if (!buf->last) {
973                 if (nb_req <= RTE_DIM(buf->events))
974                         return true;
975
976                 if (buf->head >= BATCH_SIZE) {
977                         buf->last_mask = ~0;
978                         buf->last = buf->tail;
979                         buf->tail = 0;
980                         return true;
981                 }
982         }
983
984         return nb_req <= buf->head;
985 }
986
987 /* Enqueue packets from  <port, q>  to event buffer */
988 static inline uint32_t
989 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
990         uint16_t port_id,
991         uint16_t queue_id,
992         uint32_t rx_count,
993         uint32_t max_rx,
994         int *rxq_empty)
995 {
996         struct rte_mbuf *mbufs[BATCH_SIZE];
997         struct rte_eth_event_enqueue_buffer *buf =
998                                         &rx_adapter->event_enqueue_buffer;
999         struct rte_event_eth_rx_adapter_stats *stats =
1000                                         &rx_adapter->stats;
1001         uint16_t n;
1002         uint32_t nb_rx = 0;
1003
1004         if (rxq_empty)
1005                 *rxq_empty = 0;
1006         /* Don't do a batch dequeue from the rx queue if there isn't
1007          * enough space in the enqueue buffer.
1008          */
1009         while (rxa_pkt_buf_available(buf)) {
1010                 if (buf->count >= BATCH_SIZE)
1011                         rxa_flush_event_buffer(rx_adapter);
1012
1013                 stats->rx_poll_count++;
1014                 n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
1015                 if (unlikely(!n)) {
1016                         if (rxq_empty)
1017                                 *rxq_empty = 1;
1018                         break;
1019                 }
1020                 rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
1021                 nb_rx += n;
1022                 if (rx_count + nb_rx > max_rx)
1023                         break;
1024         }
1025
1026         if (buf->count > 0)
1027                 rxa_flush_event_buffer(rx_adapter);
1028
1029         return nb_rx;
1030 }
1031
1032 static inline void
1033 rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
1034                 void *data)
1035 {
1036         uint16_t port_id;
1037         uint16_t queue;
1038         int err;
1039         union queue_data qd;
1040         struct eth_device_info *dev_info;
1041         struct eth_rx_queue_info *queue_info;
1042         int *intr_enabled;
1043
1044         qd.ptr = data;
1045         port_id = qd.port;
1046         queue = qd.queue;
1047
1048         dev_info = &rx_adapter->eth_devices[port_id];
1049         queue_info = &dev_info->rx_queue[queue];
1050         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1051         if (rxa_shared_intr(dev_info, queue))
1052                 intr_enabled = &dev_info->shared_intr_enabled;
1053         else
1054                 intr_enabled = &queue_info->intr_enabled;
1055
1056         if (*intr_enabled) {
1057                 *intr_enabled = 0;
1058                 err = rte_ring_enqueue(rx_adapter->intr_ring, data);
1059                 /* Entry should always be available.
1060                  * The ring size equals the maximum number of interrupt
1061                  * vectors supported (an interrupt vector is shared in
1062                  * case of shared interrupts)
1063                  */
1064                 if (err)
1065                         RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
1066                                 " to ring: %s", strerror(-err));
1067                 else
1068                         rte_eth_dev_rx_intr_disable(port_id, queue);
1069         }
1070         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1071 }
1072
1073 static int
1074 rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
1075                         uint32_t num_intr_vec)
1076 {
1077         if (rx_adapter->num_intr_vec + num_intr_vec >
1078                                 RTE_EVENT_ETH_INTR_RING_SIZE) {
1079                 RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
1080                 " %d needed %d limit %d", rx_adapter->num_intr_vec,
1081                 num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
1082                 return -ENOSPC;
1083         }
1084
1085         return 0;
1086 }
1087
1088 /* Delete entries for (dev, queue) from the interrupt ring */
1089 static void
1090 rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
1091                         struct eth_device_info *dev_info,
1092                         uint16_t rx_queue_id)
1093 {
1094         int i, n;
1095         union queue_data qd;
1096
1097         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1098
1099         n = rte_ring_count(rx_adapter->intr_ring);
1100         for (i = 0; i < n; i++) {
1101                 rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1102                 if (!rxa_shared_intr(dev_info, rx_queue_id)) {
1103                         if (qd.port == dev_info->dev->data->port_id &&
1104                                 qd.queue == rx_queue_id)
1105                                 continue;
1106                 } else {
1107                         if (qd.port == dev_info->dev->data->port_id)
1108                                 continue;
1109                 }
1110                 rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
1111         }
1112
1113         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1114 }
1115
1116 /* pthread callback handling interrupt mode receive queues
1117  * After receiving an Rx interrupt, it enqueues the port id and queue id of the
1118  * interrupting queue to the adapter's ring buffer for interrupt events.
1119  * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
1120  * the adapter service function.
1121  */
1122 static void *
1123 rxa_intr_thread(void *arg)
1124 {
1125         struct rte_event_eth_rx_adapter *rx_adapter = arg;
1126         struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
1127         int n, i;
1128
1129         while (1) {
1130                 n = rte_epoll_wait(rx_adapter->epd, epoll_events,
1131                                 RTE_EVENT_ETH_INTR_RING_SIZE, -1);
1132                 if (unlikely(n < 0))
1133                         RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
1134                                         n);
1135                 for (i = 0; i < n; i++) {
1136                         rxa_intr_ring_enqueue(rx_adapter,
1137                                         epoll_events[i].epdata.data);
1138                 }
1139         }
1140
1141         return NULL;
1142 }
1143
1144 /* Dequeue <port, q> from interrupt ring and enqueue received
1145  * mbufs to eventdev
1146  */
1147 static inline uint32_t
1148 rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
1149 {
1150         uint32_t n;
1151         uint32_t nb_rx = 0;
1152         int rxq_empty;
1153         struct rte_eth_event_enqueue_buffer *buf;
1154         rte_spinlock_t *ring_lock;
1155         uint8_t max_done = 0;
1156
1157         if (rx_adapter->num_rx_intr == 0)
1158                 return 0;
1159
1160         if (rte_ring_count(rx_adapter->intr_ring) == 0
1161                 && !rx_adapter->qd_valid)
1162                 return 0;
1163
1164         buf = &rx_adapter->event_enqueue_buffer;
1165         ring_lock = &rx_adapter->intr_ring_lock;
1166
1167         if (buf->count >= BATCH_SIZE)
1168                 rxa_flush_event_buffer(rx_adapter);
1169
1170         while (rxa_pkt_buf_available(buf)) {
1171                 struct eth_device_info *dev_info;
1172                 uint16_t port;
1173                 uint16_t queue;
1174                 union queue_data qd  = rx_adapter->qd;
1175                 int err;
1176
1177                 if (!rx_adapter->qd_valid) {
1178                         struct eth_rx_queue_info *queue_info;
1179
1180                         rte_spinlock_lock(ring_lock);
1181                         err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1182                         if (err) {
1183                                 rte_spinlock_unlock(ring_lock);
1184                                 break;
1185                         }
1186
1187                         port = qd.port;
1188                         queue = qd.queue;
1189                         rx_adapter->qd = qd;
1190                         rx_adapter->qd_valid = 1;
1191                         dev_info = &rx_adapter->eth_devices[port];
1192                         if (rxa_shared_intr(dev_info, queue))
1193                                 dev_info->shared_intr_enabled = 1;
1194                         else {
1195                                 queue_info = &dev_info->rx_queue[queue];
1196                                 queue_info->intr_enabled = 1;
1197                         }
1198                         rte_eth_dev_rx_intr_enable(port, queue);
1199                         rte_spinlock_unlock(ring_lock);
1200                 } else {
1201                         port = qd.port;
1202                         queue = qd.queue;
1203
1204                         dev_info = &rx_adapter->eth_devices[port];
1205                 }
1206
1207                 if (rxa_shared_intr(dev_info, queue)) {
1208                         uint16_t i;
1209                         uint16_t nb_queues;
1210
1211                         nb_queues = dev_info->dev->data->nb_rx_queues;
1212                         n = 0;
1213                         for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1214                                 uint8_t enq_buffer_full;
1215
1216                                 if (!rxa_intr_queue(dev_info, i))
1217                                         continue;
1218                                 n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1219                                         rx_adapter->max_nb_rx,
1220                                         &rxq_empty);
1221                                 nb_rx += n;
1222
1223                                 enq_buffer_full = !rxq_empty && n == 0;
1224                                 max_done = nb_rx > rx_adapter->max_nb_rx;
1225
1226                                 if (enq_buffer_full || max_done) {
1227                                         dev_info->next_q_idx = i;
1228                                         goto done;
1229                                 }
1230                         }
1231
1232                         rx_adapter->qd_valid = 0;
1233
1234                         /* Reinitialize for next interrupt */
1235                         dev_info->next_q_idx = dev_info->multi_intr_cap ?
1236                                                 RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1237                                                 0;
1238                 } else {
1239                         n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1240                                 rx_adapter->max_nb_rx,
1241                                 &rxq_empty);
1242                         rx_adapter->qd_valid = !rxq_empty;
1243                         nb_rx += n;
1244                         if (nb_rx > rx_adapter->max_nb_rx)
1245                                 break;
1246                 }
1247         }
1248
1249 done:
1250         rx_adapter->stats.rx_intr_packets += nb_rx;
1251         return nb_rx;
1252 }
1253
1254 /*
1255  * Polls receive queues added to the event adapter and enqueues received
1256  * packets to the event device.
1257  *
1258  * The receive code enqueues initially to a temporary buffer, the
1259  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1260  *
1261  * If there isn't space available in the temporary buffer, packets from the
1262  * Rx queue aren't dequeued from the eth device, this back pressures the
1263  * eth device, in virtual device environments this back pressure is relayed to
1264  * the hypervisor's switching layer where adjustments can be made to deal with
1265  * it.
1266  */
1267 static inline uint32_t
1268 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
1269 {
1270         uint32_t num_queue;
1271         uint32_t nb_rx = 0;
1272         struct rte_eth_event_enqueue_buffer *buf;
1273         uint32_t wrr_pos;
1274         uint32_t max_nb_rx;
1275
1276         wrr_pos = rx_adapter->wrr_pos;
1277         max_nb_rx = rx_adapter->max_nb_rx;
1278         buf = &rx_adapter->event_enqueue_buffer;
1279
1280         /* Iterate through a WRR sequence */
1281         for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1282                 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1283                 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1284                 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1285
1286                 /* Don't do a batch dequeue from the rx queue if there isn't
1287                  * enough space in the enqueue buffer.
1288                  */
1289                 if (buf->count >= BATCH_SIZE)
1290                         rxa_flush_event_buffer(rx_adapter);
1291                 if (!rxa_pkt_buf_available(buf)) {
1292                         rx_adapter->wrr_pos = wrr_pos;
1293                         return nb_rx;
1294                 }
1295
1296                 nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1297                                 NULL);
1298                 if (nb_rx > max_nb_rx) {
1299                         rx_adapter->wrr_pos =
1300                                     (wrr_pos + 1) % rx_adapter->wrr_len;
1301                         break;
1302                 }
1303
1304                 if (++wrr_pos == rx_adapter->wrr_len)
1305                         wrr_pos = 0;
1306         }
1307         return nb_rx;
1308 }
1309
1310 static void
1311 rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
1312 {
1313         struct rte_event_eth_rx_adapter *rx_adapter = arg;
1314         struct rte_eth_event_enqueue_buffer *buf =
1315                 &rx_adapter->event_enqueue_buffer;
1316         struct rte_event *ev;
1317
1318         if (buf->count)
1319                 rxa_flush_event_buffer(rx_adapter);
1320
1321         if (vec->vector_ev->nb_elem == 0)
1322                 return;
1323         ev = &buf->events[buf->count];
1324
1325         /* Event ready. */
1326         ev->event = vec->event;
1327         ev->vec = vec->vector_ev;
1328         buf->count++;
1329
1330         vec->vector_ev = NULL;
1331         vec->ts = 0;
1332 }
1333
1334 static int
1335 rxa_service_func(void *args)
1336 {
1337         struct rte_event_eth_rx_adapter *rx_adapter = args;
1338         struct rte_event_eth_rx_adapter_stats *stats;
1339
1340         if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1341                 return 0;
1342         if (!rx_adapter->rxa_started) {
1343                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1344                 return 0;
1345         }
1346
1347         if (rx_adapter->ena_vector) {
1348                 if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
1349                     rx_adapter->vector_tmo_ticks) {
1350                         struct eth_rx_vector_data *vec;
1351
1352                         TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1353                                 uint64_t elapsed_time = rte_rdtsc() - vec->ts;
1354
1355                                 if (elapsed_time >= vec->vector_timeout_ticks) {
1356                                         rxa_vector_expire(vec, rx_adapter);
1357                                         TAILQ_REMOVE(&rx_adapter->vector_list,
1358                                                      vec, next);
1359                                 }
1360                         }
1361                         rx_adapter->prev_expiry_ts = rte_rdtsc();
1362                 }
1363         }
1364
1365         stats = &rx_adapter->stats;
1366         stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
1367         stats->rx_packets += rxa_poll(rx_adapter);
1368         rte_spinlock_unlock(&rx_adapter->rx_lock);
1369         return 0;
1370 }
1371
1372 static int
1373 rte_event_eth_rx_adapter_init(void)
1374 {
1375         const char *name = "rte_event_eth_rx_adapter_array";
1376         const struct rte_memzone *mz;
1377         unsigned int sz;
1378
1379         sz = sizeof(*event_eth_rx_adapter) *
1380             RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1381         sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1382
1383         mz = rte_memzone_lookup(name);
1384         if (mz == NULL) {
1385                 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1386                                                  RTE_CACHE_LINE_SIZE);
1387                 if (mz == NULL) {
1388                         RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1389                                         PRId32, rte_errno);
1390                         return -rte_errno;
1391                 }
1392         }
1393
1394         event_eth_rx_adapter = mz->addr;
1395         return 0;
1396 }
1397
1398 static inline struct rte_event_eth_rx_adapter *
1399 rxa_id_to_adapter(uint8_t id)
1400 {
1401         return event_eth_rx_adapter ?
1402                 event_eth_rx_adapter[id] : NULL;
1403 }
1404
1405 static int
1406 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1407                 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1408 {
1409         int ret;
1410         struct rte_eventdev *dev;
1411         struct rte_event_dev_config dev_conf;
1412         int started;
1413         uint8_t port_id;
1414         struct rte_event_port_conf *port_conf = arg;
1415         struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1416
1417         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1418         dev_conf = dev->data->dev_conf;
1419
1420         started = dev->data->dev_started;
1421         if (started)
1422                 rte_event_dev_stop(dev_id);
1423         port_id = dev_conf.nb_event_ports;
1424         dev_conf.nb_event_ports += 1;
1425         ret = rte_event_dev_configure(dev_id, &dev_conf);
1426         if (ret) {
1427                 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1428                                                 dev_id);
1429                 if (started) {
1430                         if (rte_event_dev_start(dev_id))
1431                                 return -EIO;
1432                 }
1433                 return ret;
1434         }
1435
1436         ret = rte_event_port_setup(dev_id, port_id, port_conf);
1437         if (ret) {
1438                 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1439                                         port_id);
1440                 return ret;
1441         }
1442
1443         conf->event_port_id = port_id;
1444         conf->max_nb_rx = 128;
1445         if (started)
1446                 ret = rte_event_dev_start(dev_id);
1447         rx_adapter->default_cb_arg = 1;
1448         return ret;
1449 }
1450
1451 static int
1452 rxa_epoll_create1(void)
1453 {
1454 #if defined(LINUX)
1455         int fd;
1456         fd = epoll_create1(EPOLL_CLOEXEC);
1457         return fd < 0 ? -errno : fd;
1458 #elif defined(BSD)
1459         return -ENOTSUP;
1460 #endif
1461 }
1462
1463 static int
1464 rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
1465 {
1466         if (rx_adapter->epd != INIT_FD)
1467                 return 0;
1468
1469         rx_adapter->epd = rxa_epoll_create1();
1470         if (rx_adapter->epd < 0) {
1471                 int err = rx_adapter->epd;
1472                 rx_adapter->epd = INIT_FD;
1473                 RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1474                 return err;
1475         }
1476
1477         return 0;
1478 }
1479
1480 static int
1481 rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1482 {
1483         int err;
1484         char thread_name[RTE_MAX_THREAD_NAME_LEN];
1485
1486         if (rx_adapter->intr_ring)
1487                 return 0;
1488
1489         rx_adapter->intr_ring = rte_ring_create("intr_ring",
1490                                         RTE_EVENT_ETH_INTR_RING_SIZE,
1491                                         rte_socket_id(), 0);
1492         if (!rx_adapter->intr_ring)
1493                 return -ENOMEM;
1494
1495         rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1496                                         RTE_EVENT_ETH_INTR_RING_SIZE *
1497                                         sizeof(struct rte_epoll_event),
1498                                         RTE_CACHE_LINE_SIZE,
1499                                         rx_adapter->socket_id);
1500         if (!rx_adapter->epoll_events) {
1501                 err = -ENOMEM;
1502                 goto error;
1503         }
1504
1505         rte_spinlock_init(&rx_adapter->intr_ring_lock);
1506
1507         snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1508                         "rx-intr-thread-%d", rx_adapter->id);
1509
1510         err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1511                                 NULL, rxa_intr_thread, rx_adapter);
1512         if (!err)
1513                 return 0;
1514
1515         RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1516         rte_free(rx_adapter->epoll_events);
1517 error:
1518         rte_ring_free(rx_adapter->intr_ring);
1519         rx_adapter->intr_ring = NULL;
1520         rx_adapter->epoll_events = NULL;
1521         return err;
1522 }
1523
1524 static int
1525 rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1526 {
1527         int err;
1528
1529         err = pthread_cancel(rx_adapter->rx_intr_thread);
1530         if (err)
1531                 RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1532                                 err);
1533
1534         err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1535         if (err)
1536                 RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1537
1538         rte_free(rx_adapter->epoll_events);
1539         rte_ring_free(rx_adapter->intr_ring);
1540         rx_adapter->intr_ring = NULL;
1541         rx_adapter->epoll_events = NULL;
1542         return 0;
1543 }
1544
1545 static int
1546 rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
1547 {
1548         int ret;
1549
1550         if (rx_adapter->num_rx_intr == 0)
1551                 return 0;
1552
1553         ret = rxa_destroy_intr_thread(rx_adapter);
1554         if (ret)
1555                 return ret;
1556
1557         close(rx_adapter->epd);
1558         rx_adapter->epd = INIT_FD;
1559
1560         return ret;
1561 }
1562
1563 static int
1564 rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1565         struct eth_device_info *dev_info,
1566         uint16_t rx_queue_id)
1567 {
1568         int err;
1569         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1570         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1571
1572         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1573         if (err) {
1574                 RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1575                         rx_queue_id);
1576                 return err;
1577         }
1578
1579         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1580                                         rx_adapter->epd,
1581                                         RTE_INTR_EVENT_DEL,
1582                                         0);
1583         if (err)
1584                 RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1585
1586         if (sintr)
1587                 dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1588         else
1589                 dev_info->shared_intr_enabled = 0;
1590         return err;
1591 }
1592
1593 static int
1594 rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1595                 struct eth_device_info *dev_info,
1596                 int rx_queue_id)
1597 {
1598         int err;
1599         int i;
1600         int s;
1601
1602         if (dev_info->nb_rx_intr == 0)
1603                 return 0;
1604
1605         err = 0;
1606         if (rx_queue_id == -1) {
1607                 s = dev_info->nb_shared_intr;
1608                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1609                         int sintr;
1610                         uint16_t q;
1611
1612                         q = dev_info->intr_queue[i];
1613                         sintr = rxa_shared_intr(dev_info, q);
1614                         s -= sintr;
1615
1616                         if (!sintr || s == 0) {
1617
1618                                 err = rxa_disable_intr(rx_adapter, dev_info,
1619                                                 q);
1620                                 if (err)
1621                                         return err;
1622                                 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1623                                                         q);
1624                         }
1625                 }
1626         } else {
1627                 if (!rxa_intr_queue(dev_info, rx_queue_id))
1628                         return 0;
1629                 if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1630                                 dev_info->nb_shared_intr == 1) {
1631                         err = rxa_disable_intr(rx_adapter, dev_info,
1632                                         rx_queue_id);
1633                         if (err)
1634                                 return err;
1635                         rxa_intr_ring_del_entries(rx_adapter, dev_info,
1636                                                 rx_queue_id);
1637                 }
1638
1639                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1640                         if (dev_info->intr_queue[i] == rx_queue_id) {
1641                                 for (; i < dev_info->nb_rx_intr - 1; i++)
1642                                         dev_info->intr_queue[i] =
1643                                                 dev_info->intr_queue[i + 1];
1644                                 break;
1645                         }
1646                 }
1647         }
1648
1649         return err;
1650 }
1651
1652 static int
1653 rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1654         struct eth_device_info *dev_info,
1655         uint16_t rx_queue_id)
1656 {
1657         int err, err1;
1658         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1659         union queue_data qd;
1660         int init_fd;
1661         uint16_t *intr_queue;
1662         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1663
1664         if (rxa_intr_queue(dev_info, rx_queue_id))
1665                 return 0;
1666
1667         intr_queue = dev_info->intr_queue;
1668         if (dev_info->intr_queue == NULL) {
1669                 size_t len =
1670                         dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1671                 dev_info->intr_queue =
1672                         rte_zmalloc_socket(
1673                                 rx_adapter->mem_name,
1674                                 len,
1675                                 0,
1676                                 rx_adapter->socket_id);
1677                 if (dev_info->intr_queue == NULL)
1678                         return -ENOMEM;
1679         }
1680
1681         init_fd = rx_adapter->epd;
1682         err = rxa_init_epd(rx_adapter);
1683         if (err)
1684                 goto err_free_queue;
1685
1686         qd.port = eth_dev_id;
1687         qd.queue = rx_queue_id;
1688
1689         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1690                                         rx_adapter->epd,
1691                                         RTE_INTR_EVENT_ADD,
1692                                         qd.ptr);
1693         if (err) {
1694                 RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1695                         " Rx Queue %u err %d", rx_queue_id, err);
1696                 goto err_del_fd;
1697         }
1698
1699         err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1700         if (err) {
1701                 RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1702                                 " Rx Queue %u err %d", rx_queue_id, err);
1703
1704                 goto err_del_event;
1705         }
1706
1707         err = rxa_create_intr_thread(rx_adapter);
1708         if (!err)  {
1709                 if (sintr)
1710                         dev_info->shared_intr_enabled = 1;
1711                 else
1712                         dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1713                 return 0;
1714         }
1715
1716
1717         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1718         if (err)
1719                 RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1720                                 " Rx Queue %u err %d", rx_queue_id, err);
1721 err_del_event:
1722         err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1723                                         rx_adapter->epd,
1724                                         RTE_INTR_EVENT_DEL,
1725                                         0);
1726         if (err1) {
1727                 RTE_EDEV_LOG_ERR("Could not delete event for"
1728                                 " Rx Queue %u err %d", rx_queue_id, err1);
1729         }
1730 err_del_fd:
1731         if (init_fd == INIT_FD) {
1732                 close(rx_adapter->epd);
1733                 rx_adapter->epd = -1;
1734         }
1735 err_free_queue:
1736         if (intr_queue == NULL)
1737                 rte_free(dev_info->intr_queue);
1738
1739         return err;
1740 }
1741
1742 static int
1743 rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1744         struct eth_device_info *dev_info,
1745         int rx_queue_id)
1746
1747 {
1748         int i, j, err;
1749         int si = -1;
1750         int shared_done = (dev_info->nb_shared_intr > 0);
1751
1752         if (rx_queue_id != -1) {
1753                 if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1754                         return 0;
1755                 return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1756         }
1757
1758         err = 0;
1759         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1760
1761                 if (rxa_shared_intr(dev_info, i) && shared_done)
1762                         continue;
1763
1764                 err = rxa_config_intr(rx_adapter, dev_info, i);
1765
1766                 shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1767                 if (shared_done) {
1768                         si = i;
1769                         dev_info->shared_intr_enabled = 1;
1770                 }
1771                 if (err)
1772                         break;
1773         }
1774
1775         if (err == 0)
1776                 return 0;
1777
1778         shared_done = (dev_info->nb_shared_intr > 0);
1779         for (j = 0; j < i; j++) {
1780                 if (rxa_intr_queue(dev_info, j))
1781                         continue;
1782                 if (rxa_shared_intr(dev_info, j) && si != j)
1783                         continue;
1784                 err = rxa_disable_intr(rx_adapter, dev_info, j);
1785                 if (err)
1786                         break;
1787
1788         }
1789
1790         return err;
1791 }
1792
1793
1794 static int
1795 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
1796 {
1797         int ret;
1798         struct rte_service_spec service;
1799         struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1800
1801         if (rx_adapter->service_inited)
1802                 return 0;
1803
1804         memset(&service, 0, sizeof(service));
1805         snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1806                 "rte_event_eth_rx_adapter_%d", id);
1807         service.socket_id = rx_adapter->socket_id;
1808         service.callback = rxa_service_func;
1809         service.callback_userdata = rx_adapter;
1810         /* Service function handles locking for queue add/del updates */
1811         service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1812         ret = rte_service_component_register(&service, &rx_adapter->service_id);
1813         if (ret) {
1814                 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1815                         service.name, ret);
1816                 return ret;
1817         }
1818
1819         ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1820                 &rx_adapter_conf, rx_adapter->conf_arg);
1821         if (ret) {
1822                 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1823                         ret);
1824                 goto err_done;
1825         }
1826         rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1827         rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1828         rx_adapter->service_inited = 1;
1829         rx_adapter->epd = INIT_FD;
1830         return 0;
1831
1832 err_done:
1833         rte_service_component_unregister(rx_adapter->service_id);
1834         return ret;
1835 }
1836
1837 static void
1838 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1839                 struct eth_device_info *dev_info,
1840                 int32_t rx_queue_id,
1841                 uint8_t add)
1842 {
1843         struct eth_rx_queue_info *queue_info;
1844         int enabled;
1845         uint16_t i;
1846
1847         if (dev_info->rx_queue == NULL)
1848                 return;
1849
1850         if (rx_queue_id == -1) {
1851                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1852                         rxa_update_queue(rx_adapter, dev_info, i, add);
1853         } else {
1854                 queue_info = &dev_info->rx_queue[rx_queue_id];
1855                 enabled = queue_info->queue_enabled;
1856                 if (add) {
1857                         rx_adapter->nb_queues += !enabled;
1858                         dev_info->nb_dev_queues += !enabled;
1859                 } else {
1860                         rx_adapter->nb_queues -= enabled;
1861                         dev_info->nb_dev_queues -= enabled;
1862                 }
1863                 queue_info->queue_enabled = !!add;
1864         }
1865 }
1866
1867 static void
1868 rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t vector_count,
1869                     uint64_t vector_ns, struct rte_mempool *mp, uint32_t qid,
1870                     uint16_t port_id)
1871 {
1872 #define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
1873         struct eth_rx_vector_data *vector_data;
1874         uint32_t flow_id;
1875
1876         vector_data = &queue_info->vector_data;
1877         vector_data->max_vector_count = vector_count;
1878         vector_data->port = port_id;
1879         vector_data->queue = qid;
1880         vector_data->vector_pool = mp;
1881         vector_data->vector_timeout_ticks =
1882                 NSEC2TICK(vector_ns, rte_get_timer_hz());
1883         vector_data->ts = 0;
1884         flow_id = queue_info->event & 0xFFFFF;
1885         flow_id =
1886                 flow_id == 0 ? (qid & 0xFFF) | (port_id & 0xFF) << 12 : flow_id;
1887         vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
1888 }
1889
1890 static void
1891 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
1892         struct eth_device_info *dev_info,
1893         int32_t rx_queue_id)
1894 {
1895         struct eth_rx_vector_data *vec;
1896         int pollq;
1897         int intrq;
1898         int sintrq;
1899
1900
1901         if (rx_adapter->nb_queues == 0)
1902                 return;
1903
1904         if (rx_queue_id == -1) {
1905                 uint16_t nb_rx_queues;
1906                 uint16_t i;
1907
1908                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1909                 for (i = 0; i < nb_rx_queues; i++)
1910                         rxa_sw_del(rx_adapter, dev_info, i);
1911                 return;
1912         }
1913
1914         /* Push all the partial event vectors to event device. */
1915         TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1916                 if (vec->queue != rx_queue_id)
1917                         continue;
1918                 rxa_vector_expire(vec, rx_adapter);
1919                 TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
1920         }
1921
1922         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1923         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1924         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1925         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1926         rx_adapter->num_rx_polled -= pollq;
1927         dev_info->nb_rx_poll -= pollq;
1928         rx_adapter->num_rx_intr -= intrq;
1929         dev_info->nb_rx_intr -= intrq;
1930         dev_info->nb_shared_intr -= intrq && sintrq;
1931 }
1932
1933 static void
1934 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1935         struct eth_device_info *dev_info,
1936         int32_t rx_queue_id,
1937         const struct rte_event_eth_rx_adapter_queue_conf *conf)
1938 {
1939         struct eth_rx_queue_info *queue_info;
1940         const struct rte_event *ev = &conf->ev;
1941         int pollq;
1942         int intrq;
1943         int sintrq;
1944         struct rte_event *qi_ev;
1945
1946         if (rx_queue_id == -1) {
1947                 uint16_t nb_rx_queues;
1948                 uint16_t i;
1949
1950                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1951                 for (i = 0; i < nb_rx_queues; i++)
1952                         rxa_add_queue(rx_adapter, dev_info, i, conf);
1953                 return;
1954         }
1955
1956         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1957         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1958         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1959
1960         queue_info = &dev_info->rx_queue[rx_queue_id];
1961         queue_info->wt = conf->servicing_weight;
1962
1963         qi_ev = (struct rte_event *)&queue_info->event;
1964         qi_ev->event = ev->event;
1965         qi_ev->op = RTE_EVENT_OP_NEW;
1966         qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
1967         qi_ev->sub_event_type = 0;
1968
1969         if (conf->rx_queue_flags &
1970                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
1971                 queue_info->flow_id_mask = ~0;
1972         } else
1973                 qi_ev->flow_id = 0;
1974
1975         if (conf->rx_queue_flags &
1976             RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
1977                 queue_info->ena_vector = 1;
1978                 qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
1979                 rxa_set_vector_data(queue_info, conf->vector_sz,
1980                                     conf->vector_timeout_ns, conf->vector_mp,
1981                                     rx_queue_id, dev_info->dev->data->port_id);
1982                 rx_adapter->ena_vector = 1;
1983                 rx_adapter->vector_tmo_ticks =
1984                         rx_adapter->vector_tmo_ticks ?
1985                                       RTE_MIN(queue_info->vector_data
1986                                                         .vector_timeout_ticks >>
1987                                                 1,
1988                                         rx_adapter->vector_tmo_ticks) :
1989                                 queue_info->vector_data.vector_timeout_ticks >>
1990                                         1;
1991         }
1992
1993         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
1994         if (rxa_polled_queue(dev_info, rx_queue_id)) {
1995                 rx_adapter->num_rx_polled += !pollq;
1996                 dev_info->nb_rx_poll += !pollq;
1997                 rx_adapter->num_rx_intr -= intrq;
1998                 dev_info->nb_rx_intr -= intrq;
1999                 dev_info->nb_shared_intr -= intrq && sintrq;
2000         }
2001
2002         if (rxa_intr_queue(dev_info, rx_queue_id)) {
2003                 rx_adapter->num_rx_polled -= pollq;
2004                 dev_info->nb_rx_poll -= pollq;
2005                 rx_adapter->num_rx_intr += !intrq;
2006                 dev_info->nb_rx_intr += !intrq;
2007                 dev_info->nb_shared_intr += !intrq && sintrq;
2008                 if (dev_info->nb_shared_intr == 1) {
2009                         if (dev_info->multi_intr_cap)
2010                                 dev_info->next_q_idx =
2011                                         RTE_MAX_RXTX_INTR_VEC_ID - 1;
2012                         else
2013                                 dev_info->next_q_idx = 0;
2014                 }
2015         }
2016 }
2017
2018 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
2019                 uint16_t eth_dev_id,
2020                 int rx_queue_id,
2021                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2022 {
2023         struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
2024         struct rte_event_eth_rx_adapter_queue_conf temp_conf;
2025         int ret;
2026         struct eth_rx_poll_entry *rx_poll;
2027         struct eth_rx_queue_info *rx_queue;
2028         uint32_t *rx_wrr;
2029         uint16_t nb_rx_queues;
2030         uint32_t nb_rx_poll, nb_wrr;
2031         uint32_t nb_rx_intr;
2032         int num_intr_vec;
2033         uint16_t wt;
2034
2035         if (queue_conf->servicing_weight == 0) {
2036                 struct rte_eth_dev_data *data = dev_info->dev->data;
2037
2038                 temp_conf = *queue_conf;
2039                 if (!data->dev_conf.intr_conf.rxq) {
2040                         /* If Rx interrupts are disabled set wt = 1 */
2041                         temp_conf.servicing_weight = 1;
2042                 }
2043                 queue_conf = &temp_conf;
2044         }
2045
2046         nb_rx_queues = dev_info->dev->data->nb_rx_queues;
2047         rx_queue = dev_info->rx_queue;
2048         wt = queue_conf->servicing_weight;
2049
2050         if (dev_info->rx_queue == NULL) {
2051                 dev_info->rx_queue =
2052                     rte_zmalloc_socket(rx_adapter->mem_name,
2053                                        nb_rx_queues *
2054                                        sizeof(struct eth_rx_queue_info), 0,
2055                                        rx_adapter->socket_id);
2056                 if (dev_info->rx_queue == NULL)
2057                         return -ENOMEM;
2058         }
2059         rx_wrr = NULL;
2060         rx_poll = NULL;
2061
2062         rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
2063                         queue_conf->servicing_weight,
2064                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2065
2066         if (dev_info->dev->intr_handle)
2067                 dev_info->multi_intr_cap =
2068                         rte_intr_cap_multiple(dev_info->dev->intr_handle);
2069
2070         ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2071                                 &rx_poll, &rx_wrr);
2072         if (ret)
2073                 goto err_free_rxqueue;
2074
2075         if (wt == 0) {
2076                 num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
2077
2078                 ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
2079                 if (ret)
2080                         goto err_free_rxqueue;
2081
2082                 ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
2083                 if (ret)
2084                         goto err_free_rxqueue;
2085         } else {
2086
2087                 num_intr_vec = 0;
2088                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2089                         num_intr_vec = rxa_nb_intr_vect(dev_info,
2090                                                 rx_queue_id, 0);
2091                         /* interrupt based queues are being converted to
2092                          * poll mode queues, delete the interrupt configuration
2093                          * for those.
2094                          */
2095                         ret = rxa_del_intr_queue(rx_adapter,
2096                                                 dev_info, rx_queue_id);
2097                         if (ret)
2098                                 goto err_free_rxqueue;
2099                 }
2100         }
2101
2102         if (nb_rx_intr == 0) {
2103                 ret = rxa_free_intr_resources(rx_adapter);
2104                 if (ret)
2105                         goto err_free_rxqueue;
2106         }
2107
2108         if (wt == 0) {
2109                 uint16_t i;
2110
2111                 if (rx_queue_id  == -1) {
2112                         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
2113                                 dev_info->intr_queue[i] = i;
2114                 } else {
2115                         if (!rxa_intr_queue(dev_info, rx_queue_id))
2116                                 dev_info->intr_queue[nb_rx_intr - 1] =
2117                                         rx_queue_id;
2118                 }
2119         }
2120
2121
2122
2123         rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
2124         rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2125
2126         rte_free(rx_adapter->eth_rx_poll);
2127         rte_free(rx_adapter->wrr_sched);
2128
2129         rx_adapter->eth_rx_poll = rx_poll;
2130         rx_adapter->wrr_sched = rx_wrr;
2131         rx_adapter->wrr_len = nb_wrr;
2132         rx_adapter->num_intr_vec += num_intr_vec;
2133         return 0;
2134
2135 err_free_rxqueue:
2136         if (rx_queue == NULL) {
2137                 rte_free(dev_info->rx_queue);
2138                 dev_info->rx_queue = NULL;
2139         }
2140
2141         rte_free(rx_poll);
2142         rte_free(rx_wrr);
2143
2144         return 0;
2145 }
2146
2147 static int
2148 rxa_ctrl(uint8_t id, int start)
2149 {
2150         struct rte_event_eth_rx_adapter *rx_adapter;
2151         struct rte_eventdev *dev;
2152         struct eth_device_info *dev_info;
2153         uint32_t i;
2154         int use_service = 0;
2155         int stop = !start;
2156
2157         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2158         rx_adapter = rxa_id_to_adapter(id);
2159         if (rx_adapter == NULL)
2160                 return -EINVAL;
2161
2162         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2163
2164         RTE_ETH_FOREACH_DEV(i) {
2165                 dev_info = &rx_adapter->eth_devices[i];
2166                 /* if start  check for num dev queues */
2167                 if (start && !dev_info->nb_dev_queues)
2168                         continue;
2169                 /* if stop check if dev has been started */
2170                 if (stop && !dev_info->dev_rx_started)
2171                         continue;
2172                 use_service |= !dev_info->internal_event_port;
2173                 dev_info->dev_rx_started = start;
2174                 if (dev_info->internal_event_port == 0)
2175                         continue;
2176                 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
2177                                                 &rte_eth_devices[i]) :
2178                         (*dev->dev_ops->eth_rx_adapter_stop)(dev,
2179                                                 &rte_eth_devices[i]);
2180         }
2181
2182         if (use_service) {
2183                 rte_spinlock_lock(&rx_adapter->rx_lock);
2184                 rx_adapter->rxa_started = start;
2185                 rte_service_runstate_set(rx_adapter->service_id, start);
2186                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2187         }
2188
2189         return 0;
2190 }
2191
2192 int
2193 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
2194                                 rte_event_eth_rx_adapter_conf_cb conf_cb,
2195                                 void *conf_arg)
2196 {
2197         struct rte_event_eth_rx_adapter *rx_adapter;
2198         int ret;
2199         int socket_id;
2200         uint16_t i;
2201         char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
2202         const uint8_t default_rss_key[] = {
2203                 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
2204                 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
2205                 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
2206                 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
2207                 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
2208         };
2209
2210         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2211         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2212         if (conf_cb == NULL)
2213                 return -EINVAL;
2214
2215         if (event_eth_rx_adapter == NULL) {
2216                 ret = rte_event_eth_rx_adapter_init();
2217                 if (ret)
2218                         return ret;
2219         }
2220
2221         rx_adapter = rxa_id_to_adapter(id);
2222         if (rx_adapter != NULL) {
2223                 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
2224                 return -EEXIST;
2225         }
2226
2227         socket_id = rte_event_dev_socket_id(dev_id);
2228         snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
2229                 "rte_event_eth_rx_adapter_%d",
2230                 id);
2231
2232         rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
2233                         RTE_CACHE_LINE_SIZE, socket_id);
2234         if (rx_adapter == NULL) {
2235                 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
2236                 return -ENOMEM;
2237         }
2238
2239         rx_adapter->eventdev_id = dev_id;
2240         rx_adapter->socket_id = socket_id;
2241         rx_adapter->conf_cb = conf_cb;
2242         rx_adapter->conf_arg = conf_arg;
2243         rx_adapter->id = id;
2244         TAILQ_INIT(&rx_adapter->vector_list);
2245         strcpy(rx_adapter->mem_name, mem_name);
2246         rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
2247                                         RTE_MAX_ETHPORTS *
2248                                         sizeof(struct eth_device_info), 0,
2249                                         socket_id);
2250         rte_convert_rss_key((const uint32_t *)default_rss_key,
2251                         (uint32_t *)rx_adapter->rss_key_be,
2252                             RTE_DIM(default_rss_key));
2253
2254         if (rx_adapter->eth_devices == NULL) {
2255                 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
2256                 rte_free(rx_adapter);
2257                 return -ENOMEM;
2258         }
2259         rte_spinlock_init(&rx_adapter->rx_lock);
2260         for (i = 0; i < RTE_MAX_ETHPORTS; i++)
2261                 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
2262
2263         event_eth_rx_adapter[id] = rx_adapter;
2264         if (conf_cb == rxa_default_conf_cb)
2265                 rx_adapter->default_cb_arg = 1;
2266
2267         if (rte_mbuf_dyn_rx_timestamp_register(
2268                         &event_eth_rx_timestamp_dynfield_offset,
2269                         &event_eth_rx_timestamp_dynflag) != 0) {
2270                 RTE_EDEV_LOG_ERR("Error registering timestamp field in mbuf\n");
2271                 return -rte_errno;
2272         }
2273
2274         rte_eventdev_trace_eth_rx_adapter_create(id, dev_id, conf_cb,
2275                 conf_arg);
2276         return 0;
2277 }
2278
2279 int
2280 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2281                 struct rte_event_port_conf *port_config)
2282 {
2283         struct rte_event_port_conf *pc;
2284         int ret;
2285
2286         if (port_config == NULL)
2287                 return -EINVAL;
2288         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2289
2290         pc = rte_malloc(NULL, sizeof(*pc), 0);
2291         if (pc == NULL)
2292                 return -ENOMEM;
2293         *pc = *port_config;
2294         ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2295                                         rxa_default_conf_cb,
2296                                         pc);
2297         if (ret)
2298                 rte_free(pc);
2299         return ret;
2300 }
2301
2302 int
2303 rte_event_eth_rx_adapter_free(uint8_t id)
2304 {
2305         struct rte_event_eth_rx_adapter *rx_adapter;
2306
2307         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2308
2309         rx_adapter = rxa_id_to_adapter(id);
2310         if (rx_adapter == NULL)
2311                 return -EINVAL;
2312
2313         if (rx_adapter->nb_queues) {
2314                 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2315                                 rx_adapter->nb_queues);
2316                 return -EBUSY;
2317         }
2318
2319         if (rx_adapter->default_cb_arg)
2320                 rte_free(rx_adapter->conf_arg);
2321         rte_free(rx_adapter->eth_devices);
2322         rte_free(rx_adapter);
2323         event_eth_rx_adapter[id] = NULL;
2324
2325         rte_eventdev_trace_eth_rx_adapter_free(id);
2326         return 0;
2327 }
2328
2329 int
2330 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2331                 uint16_t eth_dev_id,
2332                 int32_t rx_queue_id,
2333                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2334 {
2335         int ret;
2336         uint32_t cap;
2337         struct rte_event_eth_rx_adapter *rx_adapter;
2338         struct rte_eventdev *dev;
2339         struct eth_device_info *dev_info;
2340         struct rte_event_eth_rx_adapter_vector_limits limits;
2341
2342         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2343         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2344
2345         rx_adapter = rxa_id_to_adapter(id);
2346         if ((rx_adapter == NULL) || (queue_conf == NULL))
2347                 return -EINVAL;
2348
2349         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2350         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2351                                                 eth_dev_id,
2352                                                 &cap);
2353         if (ret) {
2354                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2355                         "eth port %" PRIu16, id, eth_dev_id);
2356                 return ret;
2357         }
2358
2359         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2360                 && (queue_conf->rx_queue_flags &
2361                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2362                 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2363                                 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2364                                 eth_dev_id, id);
2365                 return -EINVAL;
2366         }
2367
2368         if (queue_conf->rx_queue_flags &
2369             RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
2370
2371                 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0) {
2372                         RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
2373                                          " eth port: %" PRIu16
2374                                          " adapter id: %" PRIu8,
2375                                          eth_dev_id, id);
2376                         return -EINVAL;
2377                 }
2378
2379                 ret = rte_event_eth_rx_adapter_vector_limits_get(
2380                         rx_adapter->eventdev_id, eth_dev_id, &limits);
2381                 if (ret < 0) {
2382                         RTE_EDEV_LOG_ERR("Failed to get event device vector limits,"
2383                                          " eth port: %" PRIu16
2384                                          " adapter id: %" PRIu8,
2385                                          eth_dev_id, id);
2386                         return -EINVAL;
2387                 }
2388                 if (queue_conf->vector_sz < limits.min_sz ||
2389                     queue_conf->vector_sz > limits.max_sz ||
2390                     queue_conf->vector_timeout_ns < limits.min_timeout_ns ||
2391                     queue_conf->vector_timeout_ns > limits.max_timeout_ns ||
2392                     queue_conf->vector_mp == NULL) {
2393                         RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2394                                          " eth port: %" PRIu16
2395                                          " adapter id: %" PRIu8,
2396                                          eth_dev_id, id);
2397                         return -EINVAL;
2398                 }
2399                 if (queue_conf->vector_mp->elt_size <
2400                     (sizeof(struct rte_event_vector) +
2401                      (sizeof(uintptr_t) * queue_conf->vector_sz))) {
2402                         RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2403                                          " eth port: %" PRIu16
2404                                          " adapter id: %" PRIu8,
2405                                          eth_dev_id, id);
2406                         return -EINVAL;
2407                 }
2408         }
2409
2410         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2411                 (rx_queue_id != -1)) {
2412                 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2413                         "event queue, eth port: %" PRIu16 " adapter id: %"
2414                         PRIu8, eth_dev_id, id);
2415                 return -EINVAL;
2416         }
2417
2418         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2419                         rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2420                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2421                          (uint16_t)rx_queue_id);
2422                 return -EINVAL;
2423         }
2424
2425         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2426
2427         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2428                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2429                                         -ENOTSUP);
2430                 if (dev_info->rx_queue == NULL) {
2431                         dev_info->rx_queue =
2432                             rte_zmalloc_socket(rx_adapter->mem_name,
2433                                         dev_info->dev->data->nb_rx_queues *
2434                                         sizeof(struct eth_rx_queue_info), 0,
2435                                         rx_adapter->socket_id);
2436                         if (dev_info->rx_queue == NULL)
2437                                 return -ENOMEM;
2438                 }
2439
2440                 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2441                                 &rte_eth_devices[eth_dev_id],
2442                                 rx_queue_id, queue_conf);
2443                 if (ret == 0) {
2444                         dev_info->internal_event_port = 1;
2445                         rxa_update_queue(rx_adapter,
2446                                         &rx_adapter->eth_devices[eth_dev_id],
2447                                         rx_queue_id,
2448                                         1);
2449                 }
2450         } else {
2451                 rte_spinlock_lock(&rx_adapter->rx_lock);
2452                 dev_info->internal_event_port = 0;
2453                 ret = rxa_init_service(rx_adapter, id);
2454                 if (ret == 0) {
2455                         uint32_t service_id = rx_adapter->service_id;
2456                         ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2457                                         queue_conf);
2458                         rte_service_component_runstate_set(service_id,
2459                                 rxa_sw_adapter_queue_count(rx_adapter));
2460                 }
2461                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2462         }
2463
2464         rte_eventdev_trace_eth_rx_adapter_queue_add(id, eth_dev_id,
2465                 rx_queue_id, queue_conf, ret);
2466         if (ret)
2467                 return ret;
2468
2469         return 0;
2470 }
2471
2472 static int
2473 rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
2474 {
2475         limits->max_sz = MAX_VECTOR_SIZE;
2476         limits->min_sz = MIN_VECTOR_SIZE;
2477         limits->max_timeout_ns = MAX_VECTOR_NS;
2478         limits->min_timeout_ns = MIN_VECTOR_NS;
2479
2480         return 0;
2481 }
2482
2483 int
2484 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2485                                 int32_t rx_queue_id)
2486 {
2487         int ret = 0;
2488         struct rte_eventdev *dev;
2489         struct rte_event_eth_rx_adapter *rx_adapter;
2490         struct eth_device_info *dev_info;
2491         uint32_t cap;
2492         uint32_t nb_rx_poll = 0;
2493         uint32_t nb_wrr = 0;
2494         uint32_t nb_rx_intr;
2495         struct eth_rx_poll_entry *rx_poll = NULL;
2496         uint32_t *rx_wrr = NULL;
2497         int num_intr_vec;
2498
2499         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2500         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2501
2502         rx_adapter = rxa_id_to_adapter(id);
2503         if (rx_adapter == NULL)
2504                 return -EINVAL;
2505
2506         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2507         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2508                                                 eth_dev_id,
2509                                                 &cap);
2510         if (ret)
2511                 return ret;
2512
2513         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2514                 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2515                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2516                          (uint16_t)rx_queue_id);
2517                 return -EINVAL;
2518         }
2519
2520         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2521
2522         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2523                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2524                                  -ENOTSUP);
2525                 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2526                                                 &rte_eth_devices[eth_dev_id],
2527                                                 rx_queue_id);
2528                 if (ret == 0) {
2529                         rxa_update_queue(rx_adapter,
2530                                         &rx_adapter->eth_devices[eth_dev_id],
2531                                         rx_queue_id,
2532                                         0);
2533                         if (dev_info->nb_dev_queues == 0) {
2534                                 rte_free(dev_info->rx_queue);
2535                                 dev_info->rx_queue = NULL;
2536                         }
2537                 }
2538         } else {
2539                 rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2540                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2541
2542                 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2543                         &rx_poll, &rx_wrr);
2544                 if (ret)
2545                         return ret;
2546
2547                 rte_spinlock_lock(&rx_adapter->rx_lock);
2548
2549                 num_intr_vec = 0;
2550                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2551
2552                         num_intr_vec = rxa_nb_intr_vect(dev_info,
2553                                                 rx_queue_id, 0);
2554                         ret = rxa_del_intr_queue(rx_adapter, dev_info,
2555                                         rx_queue_id);
2556                         if (ret)
2557                                 goto unlock_ret;
2558                 }
2559
2560                 if (nb_rx_intr == 0) {
2561                         ret = rxa_free_intr_resources(rx_adapter);
2562                         if (ret)
2563                                 goto unlock_ret;
2564                 }
2565
2566                 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2567                 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2568
2569                 rte_free(rx_adapter->eth_rx_poll);
2570                 rte_free(rx_adapter->wrr_sched);
2571
2572                 if (nb_rx_intr == 0) {
2573                         rte_free(dev_info->intr_queue);
2574                         dev_info->intr_queue = NULL;
2575                 }
2576
2577                 rx_adapter->eth_rx_poll = rx_poll;
2578                 rx_adapter->wrr_sched = rx_wrr;
2579                 rx_adapter->wrr_len = nb_wrr;
2580                 rx_adapter->num_intr_vec += num_intr_vec;
2581
2582                 if (dev_info->nb_dev_queues == 0) {
2583                         rte_free(dev_info->rx_queue);
2584                         dev_info->rx_queue = NULL;
2585                 }
2586 unlock_ret:
2587                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2588                 if (ret) {
2589                         rte_free(rx_poll);
2590                         rte_free(rx_wrr);
2591                         return ret;
2592                 }
2593
2594                 rte_service_component_runstate_set(rx_adapter->service_id,
2595                                 rxa_sw_adapter_queue_count(rx_adapter));
2596         }
2597
2598         rte_eventdev_trace_eth_rx_adapter_queue_del(id, eth_dev_id,
2599                 rx_queue_id, ret);
2600         return ret;
2601 }
2602
2603 int
2604 rte_event_eth_rx_adapter_vector_limits_get(
2605         uint8_t dev_id, uint16_t eth_port_id,
2606         struct rte_event_eth_rx_adapter_vector_limits *limits)
2607 {
2608         struct rte_eventdev *dev;
2609         uint32_t cap;
2610         int ret;
2611
2612         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2613         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_port_id, -EINVAL);
2614
2615         if (limits == NULL)
2616                 return -EINVAL;
2617
2618         dev = &rte_eventdevs[dev_id];
2619
2620         ret = rte_event_eth_rx_adapter_caps_get(dev_id, eth_port_id, &cap);
2621         if (ret) {
2622                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2623                                  "eth port %" PRIu16,
2624                                  dev_id, eth_port_id);
2625                 return ret;
2626         }
2627
2628         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2629                 RTE_FUNC_PTR_OR_ERR_RET(
2630                         *dev->dev_ops->eth_rx_adapter_vector_limits_get,
2631                         -ENOTSUP);
2632                 ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
2633                         dev, &rte_eth_devices[eth_port_id], limits);
2634         } else {
2635                 ret = rxa_sw_vector_limits(limits);
2636         }
2637
2638         return ret;
2639 }
2640
2641 int
2642 rte_event_eth_rx_adapter_start(uint8_t id)
2643 {
2644         rte_eventdev_trace_eth_rx_adapter_start(id);
2645         return rxa_ctrl(id, 1);
2646 }
2647
2648 int
2649 rte_event_eth_rx_adapter_stop(uint8_t id)
2650 {
2651         rte_eventdev_trace_eth_rx_adapter_stop(id);
2652         return rxa_ctrl(id, 0);
2653 }
2654
2655 int
2656 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2657                                struct rte_event_eth_rx_adapter_stats *stats)
2658 {
2659         struct rte_event_eth_rx_adapter *rx_adapter;
2660         struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2661         struct rte_event_eth_rx_adapter_stats dev_stats;
2662         struct rte_eventdev *dev;
2663         struct eth_device_info *dev_info;
2664         uint32_t i;
2665         int ret;
2666
2667         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2668
2669         rx_adapter = rxa_id_to_adapter(id);
2670         if (rx_adapter  == NULL || stats == NULL)
2671                 return -EINVAL;
2672
2673         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2674         memset(stats, 0, sizeof(*stats));
2675         RTE_ETH_FOREACH_DEV(i) {
2676                 dev_info = &rx_adapter->eth_devices[i];
2677                 if (dev_info->internal_event_port == 0 ||
2678                         dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2679                         continue;
2680                 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2681                                                 &rte_eth_devices[i],
2682                                                 &dev_stats);
2683                 if (ret)
2684                         continue;
2685                 dev_stats_sum.rx_packets += dev_stats.rx_packets;
2686                 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2687         }
2688
2689         if (rx_adapter->service_inited)
2690                 *stats = rx_adapter->stats;
2691
2692         stats->rx_packets += dev_stats_sum.rx_packets;
2693         stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2694         return 0;
2695 }
2696
2697 int
2698 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
2699 {
2700         struct rte_event_eth_rx_adapter *rx_adapter;
2701         struct rte_eventdev *dev;
2702         struct eth_device_info *dev_info;
2703         uint32_t i;
2704
2705         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2706
2707         rx_adapter = rxa_id_to_adapter(id);
2708         if (rx_adapter == NULL)
2709                 return -EINVAL;
2710
2711         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2712         RTE_ETH_FOREACH_DEV(i) {
2713                 dev_info = &rx_adapter->eth_devices[i];
2714                 if (dev_info->internal_event_port == 0 ||
2715                         dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
2716                         continue;
2717                 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
2718                                                         &rte_eth_devices[i]);
2719         }
2720
2721         memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
2722         return 0;
2723 }
2724
2725 int
2726 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
2727 {
2728         struct rte_event_eth_rx_adapter *rx_adapter;
2729
2730         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2731
2732         rx_adapter = rxa_id_to_adapter(id);
2733         if (rx_adapter == NULL || service_id == NULL)
2734                 return -EINVAL;
2735
2736         if (rx_adapter->service_inited)
2737                 *service_id = rx_adapter->service_id;
2738
2739         return rx_adapter->service_inited ? 0 : -ESRCH;
2740 }
2741
2742 int
2743 rte_event_eth_rx_adapter_cb_register(uint8_t id,
2744                                         uint16_t eth_dev_id,
2745                                         rte_event_eth_rx_adapter_cb_fn cb_fn,
2746                                         void *cb_arg)
2747 {
2748         struct rte_event_eth_rx_adapter *rx_adapter;
2749         struct eth_device_info *dev_info;
2750         uint32_t cap;
2751         int ret;
2752
2753         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2754         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2755
2756         rx_adapter = rxa_id_to_adapter(id);
2757         if (rx_adapter == NULL)
2758                 return -EINVAL;
2759
2760         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2761         if (dev_info->rx_queue == NULL)
2762                 return -EINVAL;
2763
2764         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2765                                                 eth_dev_id,
2766                                                 &cap);
2767         if (ret) {
2768                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2769                         "eth port %" PRIu16, id, eth_dev_id);
2770                 return ret;
2771         }
2772
2773         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2774                 RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
2775                                 PRIu16, eth_dev_id);
2776                 return -EINVAL;
2777         }
2778
2779         rte_spinlock_lock(&rx_adapter->rx_lock);
2780         dev_info->cb_fn = cb_fn;
2781         dev_info->cb_arg = cb_arg;
2782         rte_spinlock_unlock(&rx_adapter->rx_lock);
2783
2784         return 0;
2785 }