eventdev: optimize Rx adapter enqueue
[dpdk.git] / lib / librte_eventdev / rte_event_eth_rx_adapter.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #if defined(LINUX)
6 #include <sys/epoll.h>
7 #endif
8 #include <unistd.h>
9
10 #include <rte_cycles.h>
11 #include <rte_common.h>
12 #include <rte_dev.h>
13 #include <rte_errno.h>
14 #include <rte_ethdev.h>
15 #include <rte_log.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
20
21 #include "rte_eventdev.h"
22 #include "rte_eventdev_pmd.h"
23 #include "rte_event_eth_rx_adapter.h"
24
25 #define BATCH_SIZE              32
26 #define BLOCK_CNT_THRESHOLD     10
27 #define ETH_EVENT_BUFFER_SIZE   (4*BATCH_SIZE)
28
29 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
30 #define ETH_RX_ADAPTER_MEM_NAME_LEN     32
31
32 #define RSS_KEY_SIZE    40
33 /* value written to intr thread pipe to signal thread exit */
34 #define ETH_BRIDGE_INTR_THREAD_EXIT     1
35 /* Sentinel value to detect initialized file handle */
36 #define INIT_FD         -1
37
38 /*
39  * Used to store port and queue ID of interrupting Rx queue
40  */
41 union queue_data {
42         RTE_STD_C11
43         void *ptr;
44         struct {
45                 uint16_t port;
46                 uint16_t queue;
47         };
48 };
49
50 /*
51  * There is an instance of this struct per polled Rx queue added to the
52  * adapter
53  */
54 struct eth_rx_poll_entry {
55         /* Eth port to poll */
56         uint16_t eth_dev_id;
57         /* Eth rx queue to poll */
58         uint16_t eth_rx_qid;
59 };
60
61 /* Instance per adapter */
62 struct rte_eth_event_enqueue_buffer {
63         /* Count of events in this buffer */
64         uint16_t count;
65         /* Array of events in this buffer */
66         struct rte_event events[ETH_EVENT_BUFFER_SIZE];
67 };
68
69 struct rte_event_eth_rx_adapter {
70         /* RSS key */
71         uint8_t rss_key_be[RSS_KEY_SIZE];
72         /* Event device identifier */
73         uint8_t eventdev_id;
74         /* Per ethernet device structure */
75         struct eth_device_info *eth_devices;
76         /* Event port identifier */
77         uint8_t event_port_id;
78         /* Lock to serialize config updates with service function */
79         rte_spinlock_t rx_lock;
80         /* Max mbufs processed in any service function invocation */
81         uint32_t max_nb_rx;
82         /* Receive queues that need to be polled */
83         struct eth_rx_poll_entry *eth_rx_poll;
84         /* Size of the eth_rx_poll array */
85         uint16_t num_rx_polled;
86         /* Weighted round robin schedule */
87         uint32_t *wrr_sched;
88         /* wrr_sched[] size */
89         uint32_t wrr_len;
90         /* Next entry in wrr[] to begin polling */
91         uint32_t wrr_pos;
92         /* Event burst buffer */
93         struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
94         /* Per adapter stats */
95         struct rte_event_eth_rx_adapter_stats stats;
96         /* Block count, counts up to BLOCK_CNT_THRESHOLD */
97         uint16_t enq_block_count;
98         /* Block start ts */
99         uint64_t rx_enq_block_start_ts;
100         /* epoll fd used to wait for Rx interrupts */
101         int epd;
102         /* Num of interrupt driven interrupt queues */
103         uint32_t num_rx_intr;
104         /* Used to send <dev id, queue id> of interrupting Rx queues from
105          * the interrupt thread to the Rx thread
106          */
107         struct rte_ring *intr_ring;
108         /* Rx Queue data (dev id, queue id) for the last non-empty
109          * queue polled
110          */
111         union queue_data qd;
112         /* queue_data is valid */
113         int qd_valid;
114         /* Interrupt ring lock, synchronizes Rx thread
115          * and interrupt thread
116          */
117         rte_spinlock_t intr_ring_lock;
118         /* event array passed to rte_poll_wait */
119         struct rte_epoll_event *epoll_events;
120         /* Count of interrupt vectors in use */
121         uint32_t num_intr_vec;
122         /* Thread blocked on Rx interrupts */
123         pthread_t rx_intr_thread;
124         /* Configuration callback for rte_service configuration */
125         rte_event_eth_rx_adapter_conf_cb conf_cb;
126         /* Configuration callback argument */
127         void *conf_arg;
128         /* Set if  default_cb is being used */
129         int default_cb_arg;
130         /* Service initialization state */
131         uint8_t service_inited;
132         /* Total count of Rx queues in adapter */
133         uint32_t nb_queues;
134         /* Memory allocation name */
135         char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
136         /* Socket identifier cached from eventdev */
137         int socket_id;
138         /* Per adapter EAL service */
139         uint32_t service_id;
140         /* Adapter started flag */
141         uint8_t rxa_started;
142         /* Adapter ID */
143         uint8_t id;
144 } __rte_cache_aligned;
145
146 /* Per eth device */
147 struct eth_device_info {
148         struct rte_eth_dev *dev;
149         struct eth_rx_queue_info *rx_queue;
150         /* Rx callback */
151         rte_event_eth_rx_adapter_cb_fn cb_fn;
152         /* Rx callback argument */
153         void *cb_arg;
154         /* Set if ethdev->eventdev packet transfer uses a
155          * hardware mechanism
156          */
157         uint8_t internal_event_port;
158         /* Set if the adapter is processing rx queues for
159          * this eth device and packet processing has been
160          * started, allows for the code to know if the PMD
161          * rx_adapter_stop callback needs to be invoked
162          */
163         uint8_t dev_rx_started;
164         /* Number of queues added for this device */
165         uint16_t nb_dev_queues;
166         /* Number of poll based queues
167          * If nb_rx_poll > 0, the start callback will
168          * be invoked if not already invoked
169          */
170         uint16_t nb_rx_poll;
171         /* Number of interrupt based queues
172          * If nb_rx_intr > 0, the start callback will
173          * be invoked if not already invoked.
174          */
175         uint16_t nb_rx_intr;
176         /* Number of queues that use the shared interrupt */
177         uint16_t nb_shared_intr;
178         /* sum(wrr(q)) for all queues within the device
179          * useful when deleting all device queues
180          */
181         uint32_t wrr_len;
182         /* Intr based queue index to start polling from, this is used
183          * if the number of shared interrupts is non-zero
184          */
185         uint16_t next_q_idx;
186         /* Intr based queue indices */
187         uint16_t *intr_queue;
188         /* device generates per Rx queue interrupt for queue index
189          * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
190          */
191         int multi_intr_cap;
192         /* shared interrupt enabled */
193         int shared_intr_enabled;
194 };
195
196 /* Per Rx queue */
197 struct eth_rx_queue_info {
198         int queue_enabled;      /* True if added */
199         int intr_enabled;
200         uint16_t wt;            /* Polling weight */
201         uint32_t flow_id_mask;  /* Set to ~0 if app provides flow id else 0 */
202         uint64_t event;
203 };
204
205 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
206
207 static inline int
208 rxa_validate_id(uint8_t id)
209 {
210         return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
211 }
212
213 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
214         if (!rxa_validate_id(id)) { \
215                 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
216                 return retval; \
217         } \
218 } while (0)
219
220 static inline int
221 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
222 {
223         return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
224 }
225
226 /* Greatest common divisor */
227 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
228 {
229         uint16_t r = a % b;
230
231         return r ? rxa_gcd_u16(b, r) : b;
232 }
233
234 /* Returns the next queue in the polling sequence
235  *
236  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
237  */
238 static int
239 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
240          unsigned int n, int *cw,
241          struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
242          uint16_t gcd, int prev)
243 {
244         int i = prev;
245         uint16_t w;
246
247         while (1) {
248                 uint16_t q;
249                 uint16_t d;
250
251                 i = (i + 1) % n;
252                 if (i == 0) {
253                         *cw = *cw - gcd;
254                         if (*cw <= 0)
255                                 *cw = max_wt;
256                 }
257
258                 q = eth_rx_poll[i].eth_rx_qid;
259                 d = eth_rx_poll[i].eth_dev_id;
260                 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
261
262                 if ((int)w >= *cw)
263                         return i;
264         }
265 }
266
267 static inline int
268 rxa_shared_intr(struct eth_device_info *dev_info,
269         int rx_queue_id)
270 {
271         int multi_intr_cap;
272
273         if (dev_info->dev->intr_handle == NULL)
274                 return 0;
275
276         multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
277         return !multi_intr_cap ||
278                 rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
279 }
280
281 static inline int
282 rxa_intr_queue(struct eth_device_info *dev_info,
283         int rx_queue_id)
284 {
285         struct eth_rx_queue_info *queue_info;
286
287         queue_info = &dev_info->rx_queue[rx_queue_id];
288         return dev_info->rx_queue &&
289                 !dev_info->internal_event_port &&
290                 queue_info->queue_enabled && queue_info->wt == 0;
291 }
292
293 static inline int
294 rxa_polled_queue(struct eth_device_info *dev_info,
295         int rx_queue_id)
296 {
297         struct eth_rx_queue_info *queue_info;
298
299         queue_info = &dev_info->rx_queue[rx_queue_id];
300         return !dev_info->internal_event_port &&
301                 dev_info->rx_queue &&
302                 queue_info->queue_enabled && queue_info->wt != 0;
303 }
304
305 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
306 static int
307 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
308 {
309         uint16_t i;
310         int n, s;
311         uint16_t nbq;
312
313         nbq = dev_info->dev->data->nb_rx_queues;
314         n = 0; /* non shared count */
315         s = 0; /* shared count */
316
317         if (rx_queue_id == -1) {
318                 for (i = 0; i < nbq; i++) {
319                         if (!rxa_shared_intr(dev_info, i))
320                                 n += add ? !rxa_intr_queue(dev_info, i) :
321                                         rxa_intr_queue(dev_info, i);
322                         else
323                                 s += add ? !rxa_intr_queue(dev_info, i) :
324                                         rxa_intr_queue(dev_info, i);
325                 }
326
327                 if (s > 0) {
328                         if ((add && dev_info->nb_shared_intr == 0) ||
329                                 (!add && dev_info->nb_shared_intr))
330                                 n += 1;
331                 }
332         } else {
333                 if (!rxa_shared_intr(dev_info, rx_queue_id))
334                         n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
335                                 rxa_intr_queue(dev_info, rx_queue_id);
336                 else
337                         n = add ? !dev_info->nb_shared_intr :
338                                 dev_info->nb_shared_intr == 1;
339         }
340
341         return add ? n : -n;
342 }
343
344 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
345  */
346 static void
347 rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
348                         struct eth_device_info *dev_info,
349                         int rx_queue_id,
350                         uint32_t *nb_rx_intr)
351 {
352         uint32_t intr_diff;
353
354         if (rx_queue_id == -1)
355                 intr_diff = dev_info->nb_rx_intr;
356         else
357                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
358
359         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
360 }
361
362 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
363  * interrupt queues could currently be poll mode Rx queues
364  */
365 static void
366 rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
367                         struct eth_device_info *dev_info,
368                         int rx_queue_id,
369                         uint32_t *nb_rx_poll,
370                         uint32_t *nb_rx_intr,
371                         uint32_t *nb_wrr)
372 {
373         uint32_t intr_diff;
374         uint32_t poll_diff;
375         uint32_t wrr_len_diff;
376
377         if (rx_queue_id == -1) {
378                 intr_diff = dev_info->dev->data->nb_rx_queues -
379                                                 dev_info->nb_rx_intr;
380                 poll_diff = dev_info->nb_rx_poll;
381                 wrr_len_diff = dev_info->wrr_len;
382         } else {
383                 intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
384                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
385                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
386                                         0;
387         }
388
389         *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
390         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
391         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
392 }
393
394 /* Calculate size of the eth_rx_poll and wrr_sched arrays
395  * after deleting poll mode rx queues
396  */
397 static void
398 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
399                         struct eth_device_info *dev_info,
400                         int rx_queue_id,
401                         uint32_t *nb_rx_poll,
402                         uint32_t *nb_wrr)
403 {
404         uint32_t poll_diff;
405         uint32_t wrr_len_diff;
406
407         if (rx_queue_id == -1) {
408                 poll_diff = dev_info->nb_rx_poll;
409                 wrr_len_diff = dev_info->wrr_len;
410         } else {
411                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
412                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
413                                         0;
414         }
415
416         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
417         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
418 }
419
420 /* Calculate nb_rx_* after adding poll mode rx queues
421  */
422 static void
423 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
424                         struct eth_device_info *dev_info,
425                         int rx_queue_id,
426                         uint16_t wt,
427                         uint32_t *nb_rx_poll,
428                         uint32_t *nb_rx_intr,
429                         uint32_t *nb_wrr)
430 {
431         uint32_t intr_diff;
432         uint32_t poll_diff;
433         uint32_t wrr_len_diff;
434
435         if (rx_queue_id == -1) {
436                 intr_diff = dev_info->nb_rx_intr;
437                 poll_diff = dev_info->dev->data->nb_rx_queues -
438                                                 dev_info->nb_rx_poll;
439                 wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
440                                 - dev_info->wrr_len;
441         } else {
442                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
443                 poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
444                 wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
445                                 wt - dev_info->rx_queue[rx_queue_id].wt :
446                                 wt;
447         }
448
449         *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
450         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
451         *nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
452 }
453
454 /* Calculate nb_rx_* after adding rx_queue_id */
455 static void
456 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
457                 struct eth_device_info *dev_info,
458                 int rx_queue_id,
459                 uint16_t wt,
460                 uint32_t *nb_rx_poll,
461                 uint32_t *nb_rx_intr,
462                 uint32_t *nb_wrr)
463 {
464         if (wt != 0)
465                 rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
466                                         wt, nb_rx_poll, nb_rx_intr, nb_wrr);
467         else
468                 rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
469                                         nb_rx_poll, nb_rx_intr, nb_wrr);
470 }
471
472 /* Calculate nb_rx_* after deleting rx_queue_id */
473 static void
474 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
475                 struct eth_device_info *dev_info,
476                 int rx_queue_id,
477                 uint32_t *nb_rx_poll,
478                 uint32_t *nb_rx_intr,
479                 uint32_t *nb_wrr)
480 {
481         rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
482                                 nb_wrr);
483         rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
484                                 nb_rx_intr);
485 }
486
487 /*
488  * Allocate the rx_poll array
489  */
490 static struct eth_rx_poll_entry *
491 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
492         uint32_t num_rx_polled)
493 {
494         size_t len;
495
496         len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
497                                                         RTE_CACHE_LINE_SIZE);
498         return  rte_zmalloc_socket(rx_adapter->mem_name,
499                                 len,
500                                 RTE_CACHE_LINE_SIZE,
501                                 rx_adapter->socket_id);
502 }
503
504 /*
505  * Allocate the WRR array
506  */
507 static uint32_t *
508 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
509 {
510         size_t len;
511
512         len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
513                         RTE_CACHE_LINE_SIZE);
514         return  rte_zmalloc_socket(rx_adapter->mem_name,
515                                 len,
516                                 RTE_CACHE_LINE_SIZE,
517                                 rx_adapter->socket_id);
518 }
519
520 static int
521 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
522                 uint32_t nb_poll,
523                 uint32_t nb_wrr,
524                 struct eth_rx_poll_entry **rx_poll,
525                 uint32_t **wrr_sched)
526 {
527
528         if (nb_poll == 0) {
529                 *rx_poll = NULL;
530                 *wrr_sched = NULL;
531                 return 0;
532         }
533
534         *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
535         if (*rx_poll == NULL) {
536                 *wrr_sched = NULL;
537                 return -ENOMEM;
538         }
539
540         *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
541         if (*wrr_sched == NULL) {
542                 rte_free(*rx_poll);
543                 return -ENOMEM;
544         }
545         return 0;
546 }
547
548 /* Precalculate WRR polling sequence for all queues in rx_adapter */
549 static void
550 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
551                 struct eth_rx_poll_entry *rx_poll,
552                 uint32_t *rx_wrr)
553 {
554         uint16_t d;
555         uint16_t q;
556         unsigned int i;
557         int prev = -1;
558         int cw = -1;
559
560         /* Initialize variables for calculation of wrr schedule */
561         uint16_t max_wrr_pos = 0;
562         unsigned int poll_q = 0;
563         uint16_t max_wt = 0;
564         uint16_t gcd = 0;
565
566         if (rx_poll == NULL)
567                 return;
568
569         /* Generate array of all queues to poll, the size of this
570          * array is poll_q
571          */
572         RTE_ETH_FOREACH_DEV(d) {
573                 uint16_t nb_rx_queues;
574                 struct eth_device_info *dev_info =
575                                 &rx_adapter->eth_devices[d];
576                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
577                 if (dev_info->rx_queue == NULL)
578                         continue;
579                 if (dev_info->internal_event_port)
580                         continue;
581                 dev_info->wrr_len = 0;
582                 for (q = 0; q < nb_rx_queues; q++) {
583                         struct eth_rx_queue_info *queue_info =
584                                 &dev_info->rx_queue[q];
585                         uint16_t wt;
586
587                         if (!rxa_polled_queue(dev_info, q))
588                                 continue;
589                         wt = queue_info->wt;
590                         rx_poll[poll_q].eth_dev_id = d;
591                         rx_poll[poll_q].eth_rx_qid = q;
592                         max_wrr_pos += wt;
593                         dev_info->wrr_len += wt;
594                         max_wt = RTE_MAX(max_wt, wt);
595                         gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
596                         poll_q++;
597                 }
598         }
599
600         /* Generate polling sequence based on weights */
601         prev = -1;
602         cw = -1;
603         for (i = 0; i < max_wrr_pos; i++) {
604                 rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
605                                      rx_poll, max_wt, gcd, prev);
606                 prev = rx_wrr[i];
607         }
608 }
609
610 static inline void
611 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
612         struct rte_ipv6_hdr **ipv6_hdr)
613 {
614         struct rte_ether_hdr *eth_hdr =
615                 rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
616         struct rte_vlan_hdr *vlan_hdr;
617
618         *ipv4_hdr = NULL;
619         *ipv6_hdr = NULL;
620
621         switch (eth_hdr->ether_type) {
622         case RTE_BE16(RTE_ETHER_TYPE_IPV4):
623                 *ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
624                 break;
625
626         case RTE_BE16(RTE_ETHER_TYPE_IPV6):
627                 *ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
628                 break;
629
630         case RTE_BE16(RTE_ETHER_TYPE_VLAN):
631                 vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
632                 switch (vlan_hdr->eth_proto) {
633                 case RTE_BE16(RTE_ETHER_TYPE_IPV4):
634                         *ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
635                         break;
636                 case RTE_BE16(RTE_ETHER_TYPE_IPV6):
637                         *ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
638                         break;
639                 default:
640                         break;
641                 }
642                 break;
643
644         default:
645                 break;
646         }
647 }
648
649 /* Calculate RSS hash for IPv4/6 */
650 static inline uint32_t
651 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
652 {
653         uint32_t input_len;
654         void *tuple;
655         struct rte_ipv4_tuple ipv4_tuple;
656         struct rte_ipv6_tuple ipv6_tuple;
657         struct rte_ipv4_hdr *ipv4_hdr;
658         struct rte_ipv6_hdr *ipv6_hdr;
659
660         rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
661
662         if (ipv4_hdr) {
663                 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
664                 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
665                 tuple = &ipv4_tuple;
666                 input_len = RTE_THASH_V4_L3_LEN;
667         } else if (ipv6_hdr) {
668                 rte_thash_load_v6_addrs(ipv6_hdr,
669                                         (union rte_thash_tuple *)&ipv6_tuple);
670                 tuple = &ipv6_tuple;
671                 input_len = RTE_THASH_V6_L3_LEN;
672         } else
673                 return 0;
674
675         return rte_softrss_be(tuple, input_len, rss_key_be);
676 }
677
678 static inline int
679 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
680 {
681         return !!rx_adapter->enq_block_count;
682 }
683
684 static inline void
685 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
686 {
687         if (rx_adapter->rx_enq_block_start_ts)
688                 return;
689
690         rx_adapter->enq_block_count++;
691         if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
692                 return;
693
694         rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
695 }
696
697 static inline void
698 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
699                     struct rte_event_eth_rx_adapter_stats *stats)
700 {
701         if (unlikely(!stats->rx_enq_start_ts))
702                 stats->rx_enq_start_ts = rte_get_tsc_cycles();
703
704         if (likely(!rxa_enq_blocked(rx_adapter)))
705                 return;
706
707         rx_adapter->enq_block_count = 0;
708         if (rx_adapter->rx_enq_block_start_ts) {
709                 stats->rx_enq_end_ts = rte_get_tsc_cycles();
710                 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
711                     rx_adapter->rx_enq_block_start_ts;
712                 rx_adapter->rx_enq_block_start_ts = 0;
713         }
714 }
715
716 /* Enqueue buffered events to event device */
717 static inline uint16_t
718 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
719 {
720         struct rte_eth_event_enqueue_buffer *buf =
721             &rx_adapter->event_enqueue_buffer;
722         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
723
724         uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
725                                         rx_adapter->event_port_id,
726                                         buf->events,
727                                         buf->count);
728         if (n != buf->count) {
729                 memmove(buf->events,
730                         &buf->events[n],
731                         (buf->count - n) * sizeof(struct rte_event));
732                 stats->rx_enq_retry++;
733         }
734
735         n ? rxa_enq_block_end_ts(rx_adapter, stats) :
736                 rxa_enq_block_start_ts(rx_adapter);
737
738         buf->count -= n;
739         stats->rx_enq_count += n;
740
741         return n;
742 }
743
744 static inline void
745 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
746                 uint16_t eth_dev_id,
747                 uint16_t rx_queue_id,
748                 struct rte_mbuf **mbufs,
749                 uint16_t num)
750 {
751         uint32_t i;
752         struct eth_device_info *dev_info =
753                                         &rx_adapter->eth_devices[eth_dev_id];
754         struct eth_rx_queue_info *eth_rx_queue_info =
755                                         &dev_info->rx_queue[rx_queue_id];
756         struct rte_eth_event_enqueue_buffer *buf =
757                                         &rx_adapter->event_enqueue_buffer;
758         struct rte_event *ev = &buf->events[buf->count];
759         uint64_t event = eth_rx_queue_info->event;
760         uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
761         struct rte_mbuf *m = mbufs[0];
762         uint32_t rss_mask;
763         uint32_t rss;
764         int do_rss;
765         uint64_t ts;
766         struct rte_mbuf *cb_mbufs[BATCH_SIZE];
767         uint16_t nb_cb;
768
769         /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
770         rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
771         do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
772
773         if ((m->ol_flags & PKT_RX_TIMESTAMP) == 0) {
774                 ts = rte_get_tsc_cycles();
775                 for (i = 0; i < num; i++) {
776                         m = mbufs[i];
777
778                         m->timestamp = ts;
779                         m->ol_flags |= PKT_RX_TIMESTAMP;
780                 }
781         }
782
783
784         nb_cb = dev_info->cb_fn ? dev_info->cb_fn(eth_dev_id, rx_queue_id,
785                                                 ETH_EVENT_BUFFER_SIZE,
786                                                 buf->count, mbufs,
787                                                 num,
788                                                 dev_info->cb_arg,
789                                                 cb_mbufs) :
790                                                 num;
791         if (nb_cb < num) {
792                 mbufs = cb_mbufs;
793                 num = nb_cb;
794         }
795
796         for (i = 0; i < num; i++) {
797                 m = mbufs[i];
798
799                 rss = do_rss ?
800                         rxa_do_softrss(m, rx_adapter->rss_key_be) :
801                         m->hash.rss;
802                 ev->event = event;
803                 ev->flow_id = (rss & ~flow_id_mask) |
804                                 (ev->flow_id & flow_id_mask);
805                 ev->mbuf = m;
806                 ev++;
807         }
808
809         buf->count += num;
810 }
811
812 /* Enqueue packets from  <port, q>  to event buffer */
813 static inline uint32_t
814 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
815         uint16_t port_id,
816         uint16_t queue_id,
817         uint32_t rx_count,
818         uint32_t max_rx,
819         int *rxq_empty)
820 {
821         struct rte_mbuf *mbufs[BATCH_SIZE];
822         struct rte_eth_event_enqueue_buffer *buf =
823                                         &rx_adapter->event_enqueue_buffer;
824         struct rte_event_eth_rx_adapter_stats *stats =
825                                         &rx_adapter->stats;
826         uint16_t n;
827         uint32_t nb_rx = 0;
828
829         if (rxq_empty)
830                 *rxq_empty = 0;
831         /* Don't do a batch dequeue from the rx queue if there isn't
832          * enough space in the enqueue buffer.
833          */
834         while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
835                 if (buf->count >= BATCH_SIZE)
836                         rxa_flush_event_buffer(rx_adapter);
837
838                 stats->rx_poll_count++;
839                 n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
840                 if (unlikely(!n)) {
841                         if (rxq_empty)
842                                 *rxq_empty = 1;
843                         break;
844                 }
845                 rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
846                 nb_rx += n;
847                 if (rx_count + nb_rx > max_rx)
848                         break;
849         }
850
851         if (buf->count > 0)
852                 rxa_flush_event_buffer(rx_adapter);
853
854         return nb_rx;
855 }
856
857 static inline void
858 rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
859                 void *data)
860 {
861         uint16_t port_id;
862         uint16_t queue;
863         int err;
864         union queue_data qd;
865         struct eth_device_info *dev_info;
866         struct eth_rx_queue_info *queue_info;
867         int *intr_enabled;
868
869         qd.ptr = data;
870         port_id = qd.port;
871         queue = qd.queue;
872
873         dev_info = &rx_adapter->eth_devices[port_id];
874         queue_info = &dev_info->rx_queue[queue];
875         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
876         if (rxa_shared_intr(dev_info, queue))
877                 intr_enabled = &dev_info->shared_intr_enabled;
878         else
879                 intr_enabled = &queue_info->intr_enabled;
880
881         if (*intr_enabled) {
882                 *intr_enabled = 0;
883                 err = rte_ring_enqueue(rx_adapter->intr_ring, data);
884                 /* Entry should always be available.
885                  * The ring size equals the maximum number of interrupt
886                  * vectors supported (an interrupt vector is shared in
887                  * case of shared interrupts)
888                  */
889                 if (err)
890                         RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
891                                 " to ring: %s", strerror(-err));
892                 else
893                         rte_eth_dev_rx_intr_disable(port_id, queue);
894         }
895         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
896 }
897
898 static int
899 rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
900                         uint32_t num_intr_vec)
901 {
902         if (rx_adapter->num_intr_vec + num_intr_vec >
903                                 RTE_EVENT_ETH_INTR_RING_SIZE) {
904                 RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
905                 " %d needed %d limit %d", rx_adapter->num_intr_vec,
906                 num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
907                 return -ENOSPC;
908         }
909
910         return 0;
911 }
912
913 /* Delete entries for (dev, queue) from the interrupt ring */
914 static void
915 rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
916                         struct eth_device_info *dev_info,
917                         uint16_t rx_queue_id)
918 {
919         int i, n;
920         union queue_data qd;
921
922         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
923
924         n = rte_ring_count(rx_adapter->intr_ring);
925         for (i = 0; i < n; i++) {
926                 rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
927                 if (!rxa_shared_intr(dev_info, rx_queue_id)) {
928                         if (qd.port == dev_info->dev->data->port_id &&
929                                 qd.queue == rx_queue_id)
930                                 continue;
931                 } else {
932                         if (qd.port == dev_info->dev->data->port_id)
933                                 continue;
934                 }
935                 rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
936         }
937
938         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
939 }
940
941 /* pthread callback handling interrupt mode receive queues
942  * After receiving an Rx interrupt, it enqueues the port id and queue id of the
943  * interrupting queue to the adapter's ring buffer for interrupt events.
944  * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
945  * the adapter service function.
946  */
947 static void *
948 rxa_intr_thread(void *arg)
949 {
950         struct rte_event_eth_rx_adapter *rx_adapter = arg;
951         struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
952         int n, i;
953
954         while (1) {
955                 n = rte_epoll_wait(rx_adapter->epd, epoll_events,
956                                 RTE_EVENT_ETH_INTR_RING_SIZE, -1);
957                 if (unlikely(n < 0))
958                         RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
959                                         n);
960                 for (i = 0; i < n; i++) {
961                         rxa_intr_ring_enqueue(rx_adapter,
962                                         epoll_events[i].epdata.data);
963                 }
964         }
965
966         return NULL;
967 }
968
969 /* Dequeue <port, q> from interrupt ring and enqueue received
970  * mbufs to eventdev
971  */
972 static inline uint32_t
973 rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
974 {
975         uint32_t n;
976         uint32_t nb_rx = 0;
977         int rxq_empty;
978         struct rte_eth_event_enqueue_buffer *buf;
979         rte_spinlock_t *ring_lock;
980         uint8_t max_done = 0;
981
982         if (rx_adapter->num_rx_intr == 0)
983                 return 0;
984
985         if (rte_ring_count(rx_adapter->intr_ring) == 0
986                 && !rx_adapter->qd_valid)
987                 return 0;
988
989         buf = &rx_adapter->event_enqueue_buffer;
990         ring_lock = &rx_adapter->intr_ring_lock;
991
992         if (buf->count >= BATCH_SIZE)
993                 rxa_flush_event_buffer(rx_adapter);
994
995         while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
996                 struct eth_device_info *dev_info;
997                 uint16_t port;
998                 uint16_t queue;
999                 union queue_data qd  = rx_adapter->qd;
1000                 int err;
1001
1002                 if (!rx_adapter->qd_valid) {
1003                         struct eth_rx_queue_info *queue_info;
1004
1005                         rte_spinlock_lock(ring_lock);
1006                         err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1007                         if (err) {
1008                                 rte_spinlock_unlock(ring_lock);
1009                                 break;
1010                         }
1011
1012                         port = qd.port;
1013                         queue = qd.queue;
1014                         rx_adapter->qd = qd;
1015                         rx_adapter->qd_valid = 1;
1016                         dev_info = &rx_adapter->eth_devices[port];
1017                         if (rxa_shared_intr(dev_info, queue))
1018                                 dev_info->shared_intr_enabled = 1;
1019                         else {
1020                                 queue_info = &dev_info->rx_queue[queue];
1021                                 queue_info->intr_enabled = 1;
1022                         }
1023                         rte_eth_dev_rx_intr_enable(port, queue);
1024                         rte_spinlock_unlock(ring_lock);
1025                 } else {
1026                         port = qd.port;
1027                         queue = qd.queue;
1028
1029                         dev_info = &rx_adapter->eth_devices[port];
1030                 }
1031
1032                 if (rxa_shared_intr(dev_info, queue)) {
1033                         uint16_t i;
1034                         uint16_t nb_queues;
1035
1036                         nb_queues = dev_info->dev->data->nb_rx_queues;
1037                         n = 0;
1038                         for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1039                                 uint8_t enq_buffer_full;
1040
1041                                 if (!rxa_intr_queue(dev_info, i))
1042                                         continue;
1043                                 n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1044                                         rx_adapter->max_nb_rx,
1045                                         &rxq_empty);
1046                                 nb_rx += n;
1047
1048                                 enq_buffer_full = !rxq_empty && n == 0;
1049                                 max_done = nb_rx > rx_adapter->max_nb_rx;
1050
1051                                 if (enq_buffer_full || max_done) {
1052                                         dev_info->next_q_idx = i;
1053                                         goto done;
1054                                 }
1055                         }
1056
1057                         rx_adapter->qd_valid = 0;
1058
1059                         /* Reinitialize for next interrupt */
1060                         dev_info->next_q_idx = dev_info->multi_intr_cap ?
1061                                                 RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1062                                                 0;
1063                 } else {
1064                         n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1065                                 rx_adapter->max_nb_rx,
1066                                 &rxq_empty);
1067                         rx_adapter->qd_valid = !rxq_empty;
1068                         nb_rx += n;
1069                         if (nb_rx > rx_adapter->max_nb_rx)
1070                                 break;
1071                 }
1072         }
1073
1074 done:
1075         rx_adapter->stats.rx_intr_packets += nb_rx;
1076         return nb_rx;
1077 }
1078
1079 /*
1080  * Polls receive queues added to the event adapter and enqueues received
1081  * packets to the event device.
1082  *
1083  * The receive code enqueues initially to a temporary buffer, the
1084  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1085  *
1086  * If there isn't space available in the temporary buffer, packets from the
1087  * Rx queue aren't dequeued from the eth device, this back pressures the
1088  * eth device, in virtual device environments this back pressure is relayed to
1089  * the hypervisor's switching layer where adjustments can be made to deal with
1090  * it.
1091  */
1092 static inline uint32_t
1093 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
1094 {
1095         uint32_t num_queue;
1096         uint32_t nb_rx = 0;
1097         struct rte_eth_event_enqueue_buffer *buf;
1098         uint32_t wrr_pos;
1099         uint32_t max_nb_rx;
1100
1101         wrr_pos = rx_adapter->wrr_pos;
1102         max_nb_rx = rx_adapter->max_nb_rx;
1103         buf = &rx_adapter->event_enqueue_buffer;
1104
1105         /* Iterate through a WRR sequence */
1106         for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1107                 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1108                 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1109                 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1110
1111                 /* Don't do a batch dequeue from the rx queue if there isn't
1112                  * enough space in the enqueue buffer.
1113                  */
1114                 if (buf->count >= BATCH_SIZE)
1115                         rxa_flush_event_buffer(rx_adapter);
1116                 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
1117                         rx_adapter->wrr_pos = wrr_pos;
1118                         return nb_rx;
1119                 }
1120
1121                 nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1122                                 NULL);
1123                 if (nb_rx > max_nb_rx) {
1124                         rx_adapter->wrr_pos =
1125                                     (wrr_pos + 1) % rx_adapter->wrr_len;
1126                         break;
1127                 }
1128
1129                 if (++wrr_pos == rx_adapter->wrr_len)
1130                         wrr_pos = 0;
1131         }
1132         return nb_rx;
1133 }
1134
1135 static int
1136 rxa_service_func(void *args)
1137 {
1138         struct rte_event_eth_rx_adapter *rx_adapter = args;
1139         struct rte_event_eth_rx_adapter_stats *stats;
1140
1141         if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1142                 return 0;
1143         if (!rx_adapter->rxa_started) {
1144                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1145                 return 0;
1146         }
1147
1148         stats = &rx_adapter->stats;
1149         stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
1150         stats->rx_packets += rxa_poll(rx_adapter);
1151         rte_spinlock_unlock(&rx_adapter->rx_lock);
1152         return 0;
1153 }
1154
1155 static int
1156 rte_event_eth_rx_adapter_init(void)
1157 {
1158         const char *name = "rte_event_eth_rx_adapter_array";
1159         const struct rte_memzone *mz;
1160         unsigned int sz;
1161
1162         sz = sizeof(*event_eth_rx_adapter) *
1163             RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1164         sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1165
1166         mz = rte_memzone_lookup(name);
1167         if (mz == NULL) {
1168                 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1169                                                  RTE_CACHE_LINE_SIZE);
1170                 if (mz == NULL) {
1171                         RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1172                                         PRId32, rte_errno);
1173                         return -rte_errno;
1174                 }
1175         }
1176
1177         event_eth_rx_adapter = mz->addr;
1178         return 0;
1179 }
1180
1181 static inline struct rte_event_eth_rx_adapter *
1182 rxa_id_to_adapter(uint8_t id)
1183 {
1184         return event_eth_rx_adapter ?
1185                 event_eth_rx_adapter[id] : NULL;
1186 }
1187
1188 static int
1189 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1190                 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1191 {
1192         int ret;
1193         struct rte_eventdev *dev;
1194         struct rte_event_dev_config dev_conf;
1195         int started;
1196         uint8_t port_id;
1197         struct rte_event_port_conf *port_conf = arg;
1198         struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1199
1200         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1201         dev_conf = dev->data->dev_conf;
1202
1203         started = dev->data->dev_started;
1204         if (started)
1205                 rte_event_dev_stop(dev_id);
1206         port_id = dev_conf.nb_event_ports;
1207         dev_conf.nb_event_ports += 1;
1208         ret = rte_event_dev_configure(dev_id, &dev_conf);
1209         if (ret) {
1210                 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1211                                                 dev_id);
1212                 if (started) {
1213                         if (rte_event_dev_start(dev_id))
1214                                 return -EIO;
1215                 }
1216                 return ret;
1217         }
1218
1219         ret = rte_event_port_setup(dev_id, port_id, port_conf);
1220         if (ret) {
1221                 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1222                                         port_id);
1223                 return ret;
1224         }
1225
1226         conf->event_port_id = port_id;
1227         conf->max_nb_rx = 128;
1228         if (started)
1229                 ret = rte_event_dev_start(dev_id);
1230         rx_adapter->default_cb_arg = 1;
1231         return ret;
1232 }
1233
1234 static int
1235 rxa_epoll_create1(void)
1236 {
1237 #if defined(LINUX)
1238         int fd;
1239         fd = epoll_create1(EPOLL_CLOEXEC);
1240         return fd < 0 ? -errno : fd;
1241 #elif defined(BSD)
1242         return -ENOTSUP;
1243 #endif
1244 }
1245
1246 static int
1247 rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
1248 {
1249         if (rx_adapter->epd != INIT_FD)
1250                 return 0;
1251
1252         rx_adapter->epd = rxa_epoll_create1();
1253         if (rx_adapter->epd < 0) {
1254                 int err = rx_adapter->epd;
1255                 rx_adapter->epd = INIT_FD;
1256                 RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1257                 return err;
1258         }
1259
1260         return 0;
1261 }
1262
1263 static int
1264 rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1265 {
1266         int err;
1267         char thread_name[RTE_MAX_THREAD_NAME_LEN];
1268
1269         if (rx_adapter->intr_ring)
1270                 return 0;
1271
1272         rx_adapter->intr_ring = rte_ring_create("intr_ring",
1273                                         RTE_EVENT_ETH_INTR_RING_SIZE,
1274                                         rte_socket_id(), 0);
1275         if (!rx_adapter->intr_ring)
1276                 return -ENOMEM;
1277
1278         rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1279                                         RTE_EVENT_ETH_INTR_RING_SIZE *
1280                                         sizeof(struct rte_epoll_event),
1281                                         RTE_CACHE_LINE_SIZE,
1282                                         rx_adapter->socket_id);
1283         if (!rx_adapter->epoll_events) {
1284                 err = -ENOMEM;
1285                 goto error;
1286         }
1287
1288         rte_spinlock_init(&rx_adapter->intr_ring_lock);
1289
1290         snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1291                         "rx-intr-thread-%d", rx_adapter->id);
1292
1293         err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1294                                 NULL, rxa_intr_thread, rx_adapter);
1295         if (!err) {
1296                 rte_thread_setname(rx_adapter->rx_intr_thread, thread_name);
1297                 return 0;
1298         }
1299
1300         RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1301 error:
1302         rte_ring_free(rx_adapter->intr_ring);
1303         rx_adapter->intr_ring = NULL;
1304         rx_adapter->epoll_events = NULL;
1305         return err;
1306 }
1307
1308 static int
1309 rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1310 {
1311         int err;
1312
1313         err = pthread_cancel(rx_adapter->rx_intr_thread);
1314         if (err)
1315                 RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1316                                 err);
1317
1318         err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1319         if (err)
1320                 RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1321
1322         rte_free(rx_adapter->epoll_events);
1323         rte_ring_free(rx_adapter->intr_ring);
1324         rx_adapter->intr_ring = NULL;
1325         rx_adapter->epoll_events = NULL;
1326         return 0;
1327 }
1328
1329 static int
1330 rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
1331 {
1332         int ret;
1333
1334         if (rx_adapter->num_rx_intr == 0)
1335                 return 0;
1336
1337         ret = rxa_destroy_intr_thread(rx_adapter);
1338         if (ret)
1339                 return ret;
1340
1341         close(rx_adapter->epd);
1342         rx_adapter->epd = INIT_FD;
1343
1344         return ret;
1345 }
1346
1347 static int
1348 rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1349         struct eth_device_info *dev_info,
1350         uint16_t rx_queue_id)
1351 {
1352         int err;
1353         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1354         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1355
1356         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1357         if (err) {
1358                 RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1359                         rx_queue_id);
1360                 return err;
1361         }
1362
1363         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1364                                         rx_adapter->epd,
1365                                         RTE_INTR_EVENT_DEL,
1366                                         0);
1367         if (err)
1368                 RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1369
1370         if (sintr)
1371                 dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1372         else
1373                 dev_info->shared_intr_enabled = 0;
1374         return err;
1375 }
1376
1377 static int
1378 rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1379                 struct eth_device_info *dev_info,
1380                 int rx_queue_id)
1381 {
1382         int err;
1383         int i;
1384         int s;
1385
1386         if (dev_info->nb_rx_intr == 0)
1387                 return 0;
1388
1389         err = 0;
1390         if (rx_queue_id == -1) {
1391                 s = dev_info->nb_shared_intr;
1392                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1393                         int sintr;
1394                         uint16_t q;
1395
1396                         q = dev_info->intr_queue[i];
1397                         sintr = rxa_shared_intr(dev_info, q);
1398                         s -= sintr;
1399
1400                         if (!sintr || s == 0) {
1401
1402                                 err = rxa_disable_intr(rx_adapter, dev_info,
1403                                                 q);
1404                                 if (err)
1405                                         return err;
1406                                 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1407                                                         q);
1408                         }
1409                 }
1410         } else {
1411                 if (!rxa_intr_queue(dev_info, rx_queue_id))
1412                         return 0;
1413                 if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1414                                 dev_info->nb_shared_intr == 1) {
1415                         err = rxa_disable_intr(rx_adapter, dev_info,
1416                                         rx_queue_id);
1417                         if (err)
1418                                 return err;
1419                         rxa_intr_ring_del_entries(rx_adapter, dev_info,
1420                                                 rx_queue_id);
1421                 }
1422
1423                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1424                         if (dev_info->intr_queue[i] == rx_queue_id) {
1425                                 for (; i < dev_info->nb_rx_intr - 1; i++)
1426                                         dev_info->intr_queue[i] =
1427                                                 dev_info->intr_queue[i + 1];
1428                                 break;
1429                         }
1430                 }
1431         }
1432
1433         return err;
1434 }
1435
1436 static int
1437 rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1438         struct eth_device_info *dev_info,
1439         uint16_t rx_queue_id)
1440 {
1441         int err, err1;
1442         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1443         union queue_data qd;
1444         int init_fd;
1445         uint16_t *intr_queue;
1446         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1447
1448         if (rxa_intr_queue(dev_info, rx_queue_id))
1449                 return 0;
1450
1451         intr_queue = dev_info->intr_queue;
1452         if (dev_info->intr_queue == NULL) {
1453                 size_t len =
1454                         dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1455                 dev_info->intr_queue =
1456                         rte_zmalloc_socket(
1457                                 rx_adapter->mem_name,
1458                                 len,
1459                                 0,
1460                                 rx_adapter->socket_id);
1461                 if (dev_info->intr_queue == NULL)
1462                         return -ENOMEM;
1463         }
1464
1465         init_fd = rx_adapter->epd;
1466         err = rxa_init_epd(rx_adapter);
1467         if (err)
1468                 goto err_free_queue;
1469
1470         qd.port = eth_dev_id;
1471         qd.queue = rx_queue_id;
1472
1473         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1474                                         rx_adapter->epd,
1475                                         RTE_INTR_EVENT_ADD,
1476                                         qd.ptr);
1477         if (err) {
1478                 RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1479                         " Rx Queue %u err %d", rx_queue_id, err);
1480                 goto err_del_fd;
1481         }
1482
1483         err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1484         if (err) {
1485                 RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1486                                 " Rx Queue %u err %d", rx_queue_id, err);
1487
1488                 goto err_del_event;
1489         }
1490
1491         err = rxa_create_intr_thread(rx_adapter);
1492         if (!err)  {
1493                 if (sintr)
1494                         dev_info->shared_intr_enabled = 1;
1495                 else
1496                         dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1497                 return 0;
1498         }
1499
1500
1501         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1502         if (err)
1503                 RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1504                                 " Rx Queue %u err %d", rx_queue_id, err);
1505 err_del_event:
1506         err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1507                                         rx_adapter->epd,
1508                                         RTE_INTR_EVENT_DEL,
1509                                         0);
1510         if (err1) {
1511                 RTE_EDEV_LOG_ERR("Could not delete event for"
1512                                 " Rx Queue %u err %d", rx_queue_id, err1);
1513         }
1514 err_del_fd:
1515         if (init_fd == INIT_FD) {
1516                 close(rx_adapter->epd);
1517                 rx_adapter->epd = -1;
1518         }
1519 err_free_queue:
1520         if (intr_queue == NULL)
1521                 rte_free(dev_info->intr_queue);
1522
1523         return err;
1524 }
1525
1526 static int
1527 rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1528         struct eth_device_info *dev_info,
1529         int rx_queue_id)
1530
1531 {
1532         int i, j, err;
1533         int si = -1;
1534         int shared_done = (dev_info->nb_shared_intr > 0);
1535
1536         if (rx_queue_id != -1) {
1537                 if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1538                         return 0;
1539                 return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1540         }
1541
1542         err = 0;
1543         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1544
1545                 if (rxa_shared_intr(dev_info, i) && shared_done)
1546                         continue;
1547
1548                 err = rxa_config_intr(rx_adapter, dev_info, i);
1549
1550                 shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1551                 if (shared_done) {
1552                         si = i;
1553                         dev_info->shared_intr_enabled = 1;
1554                 }
1555                 if (err)
1556                         break;
1557         }
1558
1559         if (err == 0)
1560                 return 0;
1561
1562         shared_done = (dev_info->nb_shared_intr > 0);
1563         for (j = 0; j < i; j++) {
1564                 if (rxa_intr_queue(dev_info, j))
1565                         continue;
1566                 if (rxa_shared_intr(dev_info, j) && si != j)
1567                         continue;
1568                 err = rxa_disable_intr(rx_adapter, dev_info, j);
1569                 if (err)
1570                         break;
1571
1572         }
1573
1574         return err;
1575 }
1576
1577
1578 static int
1579 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
1580 {
1581         int ret;
1582         struct rte_service_spec service;
1583         struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1584
1585         if (rx_adapter->service_inited)
1586                 return 0;
1587
1588         memset(&service, 0, sizeof(service));
1589         snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1590                 "rte_event_eth_rx_adapter_%d", id);
1591         service.socket_id = rx_adapter->socket_id;
1592         service.callback = rxa_service_func;
1593         service.callback_userdata = rx_adapter;
1594         /* Service function handles locking for queue add/del updates */
1595         service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1596         ret = rte_service_component_register(&service, &rx_adapter->service_id);
1597         if (ret) {
1598                 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1599                         service.name, ret);
1600                 return ret;
1601         }
1602
1603         ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1604                 &rx_adapter_conf, rx_adapter->conf_arg);
1605         if (ret) {
1606                 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1607                         ret);
1608                 goto err_done;
1609         }
1610         rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1611         rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1612         rx_adapter->service_inited = 1;
1613         rx_adapter->epd = INIT_FD;
1614         return 0;
1615
1616 err_done:
1617         rte_service_component_unregister(rx_adapter->service_id);
1618         return ret;
1619 }
1620
1621 static void
1622 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1623                 struct eth_device_info *dev_info,
1624                 int32_t rx_queue_id,
1625                 uint8_t add)
1626 {
1627         struct eth_rx_queue_info *queue_info;
1628         int enabled;
1629         uint16_t i;
1630
1631         if (dev_info->rx_queue == NULL)
1632                 return;
1633
1634         if (rx_queue_id == -1) {
1635                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1636                         rxa_update_queue(rx_adapter, dev_info, i, add);
1637         } else {
1638                 queue_info = &dev_info->rx_queue[rx_queue_id];
1639                 enabled = queue_info->queue_enabled;
1640                 if (add) {
1641                         rx_adapter->nb_queues += !enabled;
1642                         dev_info->nb_dev_queues += !enabled;
1643                 } else {
1644                         rx_adapter->nb_queues -= enabled;
1645                         dev_info->nb_dev_queues -= enabled;
1646                 }
1647                 queue_info->queue_enabled = !!add;
1648         }
1649 }
1650
1651 static void
1652 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
1653         struct eth_device_info *dev_info,
1654         int32_t rx_queue_id)
1655 {
1656         int pollq;
1657         int intrq;
1658         int sintrq;
1659
1660
1661         if (rx_adapter->nb_queues == 0)
1662                 return;
1663
1664         if (rx_queue_id == -1) {
1665                 uint16_t nb_rx_queues;
1666                 uint16_t i;
1667
1668                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1669                 for (i = 0; i < nb_rx_queues; i++)
1670                         rxa_sw_del(rx_adapter, dev_info, i);
1671                 return;
1672         }
1673
1674         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1675         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1676         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1677         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1678         rx_adapter->num_rx_polled -= pollq;
1679         dev_info->nb_rx_poll -= pollq;
1680         rx_adapter->num_rx_intr -= intrq;
1681         dev_info->nb_rx_intr -= intrq;
1682         dev_info->nb_shared_intr -= intrq && sintrq;
1683 }
1684
1685 static void
1686 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1687         struct eth_device_info *dev_info,
1688         int32_t rx_queue_id,
1689         const struct rte_event_eth_rx_adapter_queue_conf *conf)
1690 {
1691         struct eth_rx_queue_info *queue_info;
1692         const struct rte_event *ev = &conf->ev;
1693         int pollq;
1694         int intrq;
1695         int sintrq;
1696         struct rte_event *qi_ev;
1697
1698         if (rx_queue_id == -1) {
1699                 uint16_t nb_rx_queues;
1700                 uint16_t i;
1701
1702                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1703                 for (i = 0; i < nb_rx_queues; i++)
1704                         rxa_add_queue(rx_adapter, dev_info, i, conf);
1705                 return;
1706         }
1707
1708         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1709         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1710         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1711
1712         queue_info = &dev_info->rx_queue[rx_queue_id];
1713         queue_info->wt = conf->servicing_weight;
1714
1715         qi_ev = (struct rte_event *)&queue_info->event;
1716         qi_ev->event = ev->event;
1717         qi_ev->op = RTE_EVENT_OP_NEW;
1718         qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
1719         qi_ev->sub_event_type = 0;
1720
1721         if (conf->rx_queue_flags &
1722                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
1723                 queue_info->flow_id_mask = ~0;
1724         } else
1725                 qi_ev->flow_id = 0;
1726
1727         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
1728         if (rxa_polled_queue(dev_info, rx_queue_id)) {
1729                 rx_adapter->num_rx_polled += !pollq;
1730                 dev_info->nb_rx_poll += !pollq;
1731                 rx_adapter->num_rx_intr -= intrq;
1732                 dev_info->nb_rx_intr -= intrq;
1733                 dev_info->nb_shared_intr -= intrq && sintrq;
1734         }
1735
1736         if (rxa_intr_queue(dev_info, rx_queue_id)) {
1737                 rx_adapter->num_rx_polled -= pollq;
1738                 dev_info->nb_rx_poll -= pollq;
1739                 rx_adapter->num_rx_intr += !intrq;
1740                 dev_info->nb_rx_intr += !intrq;
1741                 dev_info->nb_shared_intr += !intrq && sintrq;
1742                 if (dev_info->nb_shared_intr == 1) {
1743                         if (dev_info->multi_intr_cap)
1744                                 dev_info->next_q_idx =
1745                                         RTE_MAX_RXTX_INTR_VEC_ID - 1;
1746                         else
1747                                 dev_info->next_q_idx = 0;
1748                 }
1749         }
1750 }
1751
1752 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
1753                 uint16_t eth_dev_id,
1754                 int rx_queue_id,
1755                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
1756 {
1757         struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
1758         struct rte_event_eth_rx_adapter_queue_conf temp_conf;
1759         int ret;
1760         struct eth_rx_poll_entry *rx_poll;
1761         struct eth_rx_queue_info *rx_queue;
1762         uint32_t *rx_wrr;
1763         uint16_t nb_rx_queues;
1764         uint32_t nb_rx_poll, nb_wrr;
1765         uint32_t nb_rx_intr;
1766         int num_intr_vec;
1767         uint16_t wt;
1768
1769         if (queue_conf->servicing_weight == 0) {
1770                 struct rte_eth_dev_data *data = dev_info->dev->data;
1771
1772                 temp_conf = *queue_conf;
1773                 if (!data->dev_conf.intr_conf.rxq) {
1774                         /* If Rx interrupts are disabled set wt = 1 */
1775                         temp_conf.servicing_weight = 1;
1776                 }
1777                 queue_conf = &temp_conf;
1778         }
1779
1780         nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1781         rx_queue = dev_info->rx_queue;
1782         wt = queue_conf->servicing_weight;
1783
1784         if (dev_info->rx_queue == NULL) {
1785                 dev_info->rx_queue =
1786                     rte_zmalloc_socket(rx_adapter->mem_name,
1787                                        nb_rx_queues *
1788                                        sizeof(struct eth_rx_queue_info), 0,
1789                                        rx_adapter->socket_id);
1790                 if (dev_info->rx_queue == NULL)
1791                         return -ENOMEM;
1792         }
1793         rx_wrr = NULL;
1794         rx_poll = NULL;
1795
1796         rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
1797                         queue_conf->servicing_weight,
1798                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
1799
1800         if (dev_info->dev->intr_handle)
1801                 dev_info->multi_intr_cap =
1802                         rte_intr_cap_multiple(dev_info->dev->intr_handle);
1803
1804         ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
1805                                 &rx_poll, &rx_wrr);
1806         if (ret)
1807                 goto err_free_rxqueue;
1808
1809         if (wt == 0) {
1810                 num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
1811
1812                 ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
1813                 if (ret)
1814                         goto err_free_rxqueue;
1815
1816                 ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
1817                 if (ret)
1818                         goto err_free_rxqueue;
1819         } else {
1820
1821                 num_intr_vec = 0;
1822                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
1823                         num_intr_vec = rxa_nb_intr_vect(dev_info,
1824                                                 rx_queue_id, 0);
1825                         /* interrupt based queues are being converted to
1826                          * poll mode queues, delete the interrupt configuration
1827                          * for those.
1828                          */
1829                         ret = rxa_del_intr_queue(rx_adapter,
1830                                                 dev_info, rx_queue_id);
1831                         if (ret)
1832                                 goto err_free_rxqueue;
1833                 }
1834         }
1835
1836         if (nb_rx_intr == 0) {
1837                 ret = rxa_free_intr_resources(rx_adapter);
1838                 if (ret)
1839                         goto err_free_rxqueue;
1840         }
1841
1842         if (wt == 0) {
1843                 uint16_t i;
1844
1845                 if (rx_queue_id  == -1) {
1846                         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1847                                 dev_info->intr_queue[i] = i;
1848                 } else {
1849                         if (!rxa_intr_queue(dev_info, rx_queue_id))
1850                                 dev_info->intr_queue[nb_rx_intr - 1] =
1851                                         rx_queue_id;
1852                 }
1853         }
1854
1855
1856
1857         rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
1858         rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
1859
1860         rte_free(rx_adapter->eth_rx_poll);
1861         rte_free(rx_adapter->wrr_sched);
1862
1863         rx_adapter->eth_rx_poll = rx_poll;
1864         rx_adapter->wrr_sched = rx_wrr;
1865         rx_adapter->wrr_len = nb_wrr;
1866         rx_adapter->num_intr_vec += num_intr_vec;
1867         return 0;
1868
1869 err_free_rxqueue:
1870         if (rx_queue == NULL) {
1871                 rte_free(dev_info->rx_queue);
1872                 dev_info->rx_queue = NULL;
1873         }
1874
1875         rte_free(rx_poll);
1876         rte_free(rx_wrr);
1877
1878         return 0;
1879 }
1880
1881 static int
1882 rxa_ctrl(uint8_t id, int start)
1883 {
1884         struct rte_event_eth_rx_adapter *rx_adapter;
1885         struct rte_eventdev *dev;
1886         struct eth_device_info *dev_info;
1887         uint32_t i;
1888         int use_service = 0;
1889         int stop = !start;
1890
1891         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1892         rx_adapter = rxa_id_to_adapter(id);
1893         if (rx_adapter == NULL)
1894                 return -EINVAL;
1895
1896         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1897
1898         RTE_ETH_FOREACH_DEV(i) {
1899                 dev_info = &rx_adapter->eth_devices[i];
1900                 /* if start  check for num dev queues */
1901                 if (start && !dev_info->nb_dev_queues)
1902                         continue;
1903                 /* if stop check if dev has been started */
1904                 if (stop && !dev_info->dev_rx_started)
1905                         continue;
1906                 use_service |= !dev_info->internal_event_port;
1907                 dev_info->dev_rx_started = start;
1908                 if (dev_info->internal_event_port == 0)
1909                         continue;
1910                 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
1911                                                 &rte_eth_devices[i]) :
1912                         (*dev->dev_ops->eth_rx_adapter_stop)(dev,
1913                                                 &rte_eth_devices[i]);
1914         }
1915
1916         if (use_service) {
1917                 rte_spinlock_lock(&rx_adapter->rx_lock);
1918                 rx_adapter->rxa_started = start;
1919                 rte_service_runstate_set(rx_adapter->service_id, start);
1920                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1921         }
1922
1923         return 0;
1924 }
1925
1926 int
1927 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
1928                                 rte_event_eth_rx_adapter_conf_cb conf_cb,
1929                                 void *conf_arg)
1930 {
1931         struct rte_event_eth_rx_adapter *rx_adapter;
1932         int ret;
1933         int socket_id;
1934         uint16_t i;
1935         char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
1936         const uint8_t default_rss_key[] = {
1937                 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
1938                 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
1939                 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
1940                 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
1941                 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
1942         };
1943
1944         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1945         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1946         if (conf_cb == NULL)
1947                 return -EINVAL;
1948
1949         if (event_eth_rx_adapter == NULL) {
1950                 ret = rte_event_eth_rx_adapter_init();
1951                 if (ret)
1952                         return ret;
1953         }
1954
1955         rx_adapter = rxa_id_to_adapter(id);
1956         if (rx_adapter != NULL) {
1957                 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
1958                 return -EEXIST;
1959         }
1960
1961         socket_id = rte_event_dev_socket_id(dev_id);
1962         snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
1963                 "rte_event_eth_rx_adapter_%d",
1964                 id);
1965
1966         rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
1967                         RTE_CACHE_LINE_SIZE, socket_id);
1968         if (rx_adapter == NULL) {
1969                 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
1970                 return -ENOMEM;
1971         }
1972
1973         rx_adapter->eventdev_id = dev_id;
1974         rx_adapter->socket_id = socket_id;
1975         rx_adapter->conf_cb = conf_cb;
1976         rx_adapter->conf_arg = conf_arg;
1977         rx_adapter->id = id;
1978         strcpy(rx_adapter->mem_name, mem_name);
1979         rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
1980                                         RTE_MAX_ETHPORTS *
1981                                         sizeof(struct eth_device_info), 0,
1982                                         socket_id);
1983         rte_convert_rss_key((const uint32_t *)default_rss_key,
1984                         (uint32_t *)rx_adapter->rss_key_be,
1985                             RTE_DIM(default_rss_key));
1986
1987         if (rx_adapter->eth_devices == NULL) {
1988                 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
1989                 rte_free(rx_adapter);
1990                 return -ENOMEM;
1991         }
1992         rte_spinlock_init(&rx_adapter->rx_lock);
1993         for (i = 0; i < RTE_MAX_ETHPORTS; i++)
1994                 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
1995
1996         event_eth_rx_adapter[id] = rx_adapter;
1997         if (conf_cb == rxa_default_conf_cb)
1998                 rx_adapter->default_cb_arg = 1;
1999         return 0;
2000 }
2001
2002 int
2003 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2004                 struct rte_event_port_conf *port_config)
2005 {
2006         struct rte_event_port_conf *pc;
2007         int ret;
2008
2009         if (port_config == NULL)
2010                 return -EINVAL;
2011         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2012
2013         pc = rte_malloc(NULL, sizeof(*pc), 0);
2014         if (pc == NULL)
2015                 return -ENOMEM;
2016         *pc = *port_config;
2017         ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2018                                         rxa_default_conf_cb,
2019                                         pc);
2020         if (ret)
2021                 rte_free(pc);
2022         return ret;
2023 }
2024
2025 int
2026 rte_event_eth_rx_adapter_free(uint8_t id)
2027 {
2028         struct rte_event_eth_rx_adapter *rx_adapter;
2029
2030         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2031
2032         rx_adapter = rxa_id_to_adapter(id);
2033         if (rx_adapter == NULL)
2034                 return -EINVAL;
2035
2036         if (rx_adapter->nb_queues) {
2037                 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2038                                 rx_adapter->nb_queues);
2039                 return -EBUSY;
2040         }
2041
2042         if (rx_adapter->default_cb_arg)
2043                 rte_free(rx_adapter->conf_arg);
2044         rte_free(rx_adapter->eth_devices);
2045         rte_free(rx_adapter);
2046         event_eth_rx_adapter[id] = NULL;
2047
2048         return 0;
2049 }
2050
2051 int
2052 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2053                 uint16_t eth_dev_id,
2054                 int32_t rx_queue_id,
2055                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2056 {
2057         int ret;
2058         uint32_t cap;
2059         struct rte_event_eth_rx_adapter *rx_adapter;
2060         struct rte_eventdev *dev;
2061         struct eth_device_info *dev_info;
2062
2063         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2064         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2065
2066         rx_adapter = rxa_id_to_adapter(id);
2067         if ((rx_adapter == NULL) || (queue_conf == NULL))
2068                 return -EINVAL;
2069
2070         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2071         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2072                                                 eth_dev_id,
2073                                                 &cap);
2074         if (ret) {
2075                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2076                         "eth port %" PRIu16, id, eth_dev_id);
2077                 return ret;
2078         }
2079
2080         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2081                 && (queue_conf->rx_queue_flags &
2082                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2083                 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2084                                 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2085                                 eth_dev_id, id);
2086                 return -EINVAL;
2087         }
2088
2089         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2090                 (rx_queue_id != -1)) {
2091                 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2092                         "event queue, eth port: %" PRIu16 " adapter id: %"
2093                         PRIu8, eth_dev_id, id);
2094                 return -EINVAL;
2095         }
2096
2097         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2098                         rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2099                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2100                          (uint16_t)rx_queue_id);
2101                 return -EINVAL;
2102         }
2103
2104         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2105
2106         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2107                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2108                                         -ENOTSUP);
2109                 if (dev_info->rx_queue == NULL) {
2110                         dev_info->rx_queue =
2111                             rte_zmalloc_socket(rx_adapter->mem_name,
2112                                         dev_info->dev->data->nb_rx_queues *
2113                                         sizeof(struct eth_rx_queue_info), 0,
2114                                         rx_adapter->socket_id);
2115                         if (dev_info->rx_queue == NULL)
2116                                 return -ENOMEM;
2117                 }
2118
2119                 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2120                                 &rte_eth_devices[eth_dev_id],
2121                                 rx_queue_id, queue_conf);
2122                 if (ret == 0) {
2123                         dev_info->internal_event_port = 1;
2124                         rxa_update_queue(rx_adapter,
2125                                         &rx_adapter->eth_devices[eth_dev_id],
2126                                         rx_queue_id,
2127                                         1);
2128                 }
2129         } else {
2130                 rte_spinlock_lock(&rx_adapter->rx_lock);
2131                 dev_info->internal_event_port = 0;
2132                 ret = rxa_init_service(rx_adapter, id);
2133                 if (ret == 0) {
2134                         uint32_t service_id = rx_adapter->service_id;
2135                         ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2136                                         queue_conf);
2137                         rte_service_component_runstate_set(service_id,
2138                                 rxa_sw_adapter_queue_count(rx_adapter));
2139                 }
2140                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2141         }
2142
2143         if (ret)
2144                 return ret;
2145
2146         return 0;
2147 }
2148
2149 int
2150 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2151                                 int32_t rx_queue_id)
2152 {
2153         int ret = 0;
2154         struct rte_eventdev *dev;
2155         struct rte_event_eth_rx_adapter *rx_adapter;
2156         struct eth_device_info *dev_info;
2157         uint32_t cap;
2158         uint32_t nb_rx_poll = 0;
2159         uint32_t nb_wrr = 0;
2160         uint32_t nb_rx_intr;
2161         struct eth_rx_poll_entry *rx_poll = NULL;
2162         uint32_t *rx_wrr = NULL;
2163         int num_intr_vec;
2164
2165         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2166         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2167
2168         rx_adapter = rxa_id_to_adapter(id);
2169         if (rx_adapter == NULL)
2170                 return -EINVAL;
2171
2172         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2173         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2174                                                 eth_dev_id,
2175                                                 &cap);
2176         if (ret)
2177                 return ret;
2178
2179         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2180                 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2181                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2182                          (uint16_t)rx_queue_id);
2183                 return -EINVAL;
2184         }
2185
2186         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2187
2188         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2189                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2190                                  -ENOTSUP);
2191                 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2192                                                 &rte_eth_devices[eth_dev_id],
2193                                                 rx_queue_id);
2194                 if (ret == 0) {
2195                         rxa_update_queue(rx_adapter,
2196                                         &rx_adapter->eth_devices[eth_dev_id],
2197                                         rx_queue_id,
2198                                         0);
2199                         if (dev_info->nb_dev_queues == 0) {
2200                                 rte_free(dev_info->rx_queue);
2201                                 dev_info->rx_queue = NULL;
2202                         }
2203                 }
2204         } else {
2205                 rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2206                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2207
2208                 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2209                         &rx_poll, &rx_wrr);
2210                 if (ret)
2211                         return ret;
2212
2213                 rte_spinlock_lock(&rx_adapter->rx_lock);
2214
2215                 num_intr_vec = 0;
2216                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2217
2218                         num_intr_vec = rxa_nb_intr_vect(dev_info,
2219                                                 rx_queue_id, 0);
2220                         ret = rxa_del_intr_queue(rx_adapter, dev_info,
2221                                         rx_queue_id);
2222                         if (ret)
2223                                 goto unlock_ret;
2224                 }
2225
2226                 if (nb_rx_intr == 0) {
2227                         ret = rxa_free_intr_resources(rx_adapter);
2228                         if (ret)
2229                                 goto unlock_ret;
2230                 }
2231
2232                 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2233                 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2234
2235                 rte_free(rx_adapter->eth_rx_poll);
2236                 rte_free(rx_adapter->wrr_sched);
2237
2238                 if (nb_rx_intr == 0) {
2239                         rte_free(dev_info->intr_queue);
2240                         dev_info->intr_queue = NULL;
2241                 }
2242
2243                 rx_adapter->eth_rx_poll = rx_poll;
2244                 rx_adapter->wrr_sched = rx_wrr;
2245                 rx_adapter->wrr_len = nb_wrr;
2246                 rx_adapter->num_intr_vec += num_intr_vec;
2247
2248                 if (dev_info->nb_dev_queues == 0) {
2249                         rte_free(dev_info->rx_queue);
2250                         dev_info->rx_queue = NULL;
2251                 }
2252 unlock_ret:
2253                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2254                 if (ret) {
2255                         rte_free(rx_poll);
2256                         rte_free(rx_wrr);
2257                         return ret;
2258                 }
2259
2260                 rte_service_component_runstate_set(rx_adapter->service_id,
2261                                 rxa_sw_adapter_queue_count(rx_adapter));
2262         }
2263
2264         return ret;
2265 }
2266
2267 int
2268 rte_event_eth_rx_adapter_start(uint8_t id)
2269 {
2270         return rxa_ctrl(id, 1);
2271 }
2272
2273 int
2274 rte_event_eth_rx_adapter_stop(uint8_t id)
2275 {
2276         return rxa_ctrl(id, 0);
2277 }
2278
2279 int
2280 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2281                                struct rte_event_eth_rx_adapter_stats *stats)
2282 {
2283         struct rte_event_eth_rx_adapter *rx_adapter;
2284         struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2285         struct rte_event_eth_rx_adapter_stats dev_stats;
2286         struct rte_eventdev *dev;
2287         struct eth_device_info *dev_info;
2288         uint32_t i;
2289         int ret;
2290
2291         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2292
2293         rx_adapter = rxa_id_to_adapter(id);
2294         if (rx_adapter  == NULL || stats == NULL)
2295                 return -EINVAL;
2296
2297         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2298         memset(stats, 0, sizeof(*stats));
2299         RTE_ETH_FOREACH_DEV(i) {
2300                 dev_info = &rx_adapter->eth_devices[i];
2301                 if (dev_info->internal_event_port == 0 ||
2302                         dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2303                         continue;
2304                 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2305                                                 &rte_eth_devices[i],
2306                                                 &dev_stats);
2307                 if (ret)
2308                         continue;
2309                 dev_stats_sum.rx_packets += dev_stats.rx_packets;
2310                 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2311         }
2312
2313         if (rx_adapter->service_inited)
2314                 *stats = rx_adapter->stats;
2315
2316         stats->rx_packets += dev_stats_sum.rx_packets;
2317         stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2318         return 0;
2319 }
2320
2321 int
2322 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
2323 {
2324         struct rte_event_eth_rx_adapter *rx_adapter;
2325         struct rte_eventdev *dev;
2326         struct eth_device_info *dev_info;
2327         uint32_t i;
2328
2329         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2330
2331         rx_adapter = rxa_id_to_adapter(id);
2332         if (rx_adapter == NULL)
2333                 return -EINVAL;
2334
2335         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2336         RTE_ETH_FOREACH_DEV(i) {
2337                 dev_info = &rx_adapter->eth_devices[i];
2338                 if (dev_info->internal_event_port == 0 ||
2339                         dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
2340                         continue;
2341                 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
2342                                                         &rte_eth_devices[i]);
2343         }
2344
2345         memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
2346         return 0;
2347 }
2348
2349 int
2350 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
2351 {
2352         struct rte_event_eth_rx_adapter *rx_adapter;
2353
2354         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2355
2356         rx_adapter = rxa_id_to_adapter(id);
2357         if (rx_adapter == NULL || service_id == NULL)
2358                 return -EINVAL;
2359
2360         if (rx_adapter->service_inited)
2361                 *service_id = rx_adapter->service_id;
2362
2363         return rx_adapter->service_inited ? 0 : -ESRCH;
2364 }
2365
2366 int
2367 rte_event_eth_rx_adapter_cb_register(uint8_t id,
2368                                         uint16_t eth_dev_id,
2369                                         rte_event_eth_rx_adapter_cb_fn cb_fn,
2370                                         void *cb_arg)
2371 {
2372         struct rte_event_eth_rx_adapter *rx_adapter;
2373         struct eth_device_info *dev_info;
2374         uint32_t cap;
2375         int ret;
2376
2377         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2378         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2379
2380         rx_adapter = rxa_id_to_adapter(id);
2381         if (rx_adapter == NULL)
2382                 return -EINVAL;
2383
2384         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2385         if (dev_info->rx_queue == NULL)
2386                 return -EINVAL;
2387
2388         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2389                                                 eth_dev_id,
2390                                                 &cap);
2391         if (ret) {
2392                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2393                         "eth port %" PRIu16, id, eth_dev_id);
2394                 return ret;
2395         }
2396
2397         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2398                 RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
2399                                 PRIu16, eth_dev_id);
2400                 return -EINVAL;
2401         }
2402
2403         rte_spinlock_lock(&rx_adapter->rx_lock);
2404         dev_info->cb_fn = cb_fn;
2405         dev_info->cb_arg = cb_arg;
2406         rte_spinlock_unlock(&rx_adapter->rx_lock);
2407
2408         return 0;
2409 }