eventdev: change Rx adapter callback and stats structure
[dpdk.git] / lib / librte_eventdev / rte_event_eth_rx_adapter.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #if defined(LINUX)
6 #include <sys/epoll.h>
7 #endif
8 #include <unistd.h>
9
10 #include <rte_cycles.h>
11 #include <rte_common.h>
12 #include <rte_dev.h>
13 #include <rte_errno.h>
14 #include <rte_ethdev.h>
15 #include <rte_log.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
20
21 #include "rte_eventdev.h"
22 #include "rte_eventdev_pmd.h"
23 #include "rte_event_eth_rx_adapter.h"
24
25 #define BATCH_SIZE              32
26 #define BLOCK_CNT_THRESHOLD     10
27 #define ETH_EVENT_BUFFER_SIZE   (4*BATCH_SIZE)
28
29 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
30 #define ETH_RX_ADAPTER_MEM_NAME_LEN     32
31
32 #define RSS_KEY_SIZE    40
33 /* value written to intr thread pipe to signal thread exit */
34 #define ETH_BRIDGE_INTR_THREAD_EXIT     1
35 /* Sentinel value to detect initialized file handle */
36 #define INIT_FD         -1
37
38 /*
39  * Used to store port and queue ID of interrupting Rx queue
40  */
41 union queue_data {
42         RTE_STD_C11
43         void *ptr;
44         struct {
45                 uint16_t port;
46                 uint16_t queue;
47         };
48 };
49
50 /*
51  * There is an instance of this struct per polled Rx queue added to the
52  * adapter
53  */
54 struct eth_rx_poll_entry {
55         /* Eth port to poll */
56         uint16_t eth_dev_id;
57         /* Eth rx queue to poll */
58         uint16_t eth_rx_qid;
59 };
60
61 /* Instance per adapter */
62 struct rte_eth_event_enqueue_buffer {
63         /* Count of events in this buffer */
64         uint16_t count;
65         /* Array of events in this buffer */
66         struct rte_event events[ETH_EVENT_BUFFER_SIZE];
67 };
68
69 struct rte_event_eth_rx_adapter {
70         /* RSS key */
71         uint8_t rss_key_be[RSS_KEY_SIZE];
72         /* Event device identifier */
73         uint8_t eventdev_id;
74         /* Per ethernet device structure */
75         struct eth_device_info *eth_devices;
76         /* Event port identifier */
77         uint8_t event_port_id;
78         /* Lock to serialize config updates with service function */
79         rte_spinlock_t rx_lock;
80         /* Max mbufs processed in any service function invocation */
81         uint32_t max_nb_rx;
82         /* Receive queues that need to be polled */
83         struct eth_rx_poll_entry *eth_rx_poll;
84         /* Size of the eth_rx_poll array */
85         uint16_t num_rx_polled;
86         /* Weighted round robin schedule */
87         uint32_t *wrr_sched;
88         /* wrr_sched[] size */
89         uint32_t wrr_len;
90         /* Next entry in wrr[] to begin polling */
91         uint32_t wrr_pos;
92         /* Event burst buffer */
93         struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
94         /* Per adapter stats */
95         struct rte_event_eth_rx_adapter_stats stats;
96         /* Block count, counts up to BLOCK_CNT_THRESHOLD */
97         uint16_t enq_block_count;
98         /* Block start ts */
99         uint64_t rx_enq_block_start_ts;
100         /* epoll fd used to wait for Rx interrupts */
101         int epd;
102         /* Num of interrupt driven interrupt queues */
103         uint32_t num_rx_intr;
104         /* Used to send <dev id, queue id> of interrupting Rx queues from
105          * the interrupt thread to the Rx thread
106          */
107         struct rte_ring *intr_ring;
108         /* Rx Queue data (dev id, queue id) for the last non-empty
109          * queue polled
110          */
111         union queue_data qd;
112         /* queue_data is valid */
113         int qd_valid;
114         /* Interrupt ring lock, synchronizes Rx thread
115          * and interrupt thread
116          */
117         rte_spinlock_t intr_ring_lock;
118         /* event array passed to rte_poll_wait */
119         struct rte_epoll_event *epoll_events;
120         /* Count of interrupt vectors in use */
121         uint32_t num_intr_vec;
122         /* Thread blocked on Rx interrupts */
123         pthread_t rx_intr_thread;
124         /* Configuration callback for rte_service configuration */
125         rte_event_eth_rx_adapter_conf_cb conf_cb;
126         /* Configuration callback argument */
127         void *conf_arg;
128         /* Set if  default_cb is being used */
129         int default_cb_arg;
130         /* Service initialization state */
131         uint8_t service_inited;
132         /* Total count of Rx queues in adapter */
133         uint32_t nb_queues;
134         /* Memory allocation name */
135         char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
136         /* Socket identifier cached from eventdev */
137         int socket_id;
138         /* Per adapter EAL service */
139         uint32_t service_id;
140         /* Adapter started flag */
141         uint8_t rxa_started;
142         /* Adapter ID */
143         uint8_t id;
144 } __rte_cache_aligned;
145
146 /* Per eth device */
147 struct eth_device_info {
148         struct rte_eth_dev *dev;
149         struct eth_rx_queue_info *rx_queue;
150         /* Rx callback */
151         rte_event_eth_rx_adapter_cb_fn cb_fn;
152         /* Rx callback argument */
153         void *cb_arg;
154         /* Set if ethdev->eventdev packet transfer uses a
155          * hardware mechanism
156          */
157         uint8_t internal_event_port;
158         /* Set if the adapter is processing rx queues for
159          * this eth device and packet processing has been
160          * started, allows for the code to know if the PMD
161          * rx_adapter_stop callback needs to be invoked
162          */
163         uint8_t dev_rx_started;
164         /* Number of queues added for this device */
165         uint16_t nb_dev_queues;
166         /* Number of poll based queues
167          * If nb_rx_poll > 0, the start callback will
168          * be invoked if not already invoked
169          */
170         uint16_t nb_rx_poll;
171         /* Number of interrupt based queues
172          * If nb_rx_intr > 0, the start callback will
173          * be invoked if not already invoked.
174          */
175         uint16_t nb_rx_intr;
176         /* Number of queues that use the shared interrupt */
177         uint16_t nb_shared_intr;
178         /* sum(wrr(q)) for all queues within the device
179          * useful when deleting all device queues
180          */
181         uint32_t wrr_len;
182         /* Intr based queue index to start polling from, this is used
183          * if the number of shared interrupts is non-zero
184          */
185         uint16_t next_q_idx;
186         /* Intr based queue indices */
187         uint16_t *intr_queue;
188         /* device generates per Rx queue interrupt for queue index
189          * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
190          */
191         int multi_intr_cap;
192         /* shared interrupt enabled */
193         int shared_intr_enabled;
194 };
195
196 /* Per Rx queue */
197 struct eth_rx_queue_info {
198         int queue_enabled;      /* True if added */
199         int intr_enabled;
200         uint16_t wt;            /* Polling weight */
201         uint32_t flow_id_mask;  /* Set to ~0 if app provides flow id else 0 */
202         uint64_t event;
203 };
204
205 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
206
207 static inline int
208 rxa_validate_id(uint8_t id)
209 {
210         return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
211 }
212
213 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
214         if (!rxa_validate_id(id)) { \
215                 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
216                 return retval; \
217         } \
218 } while (0)
219
220 static inline int
221 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
222 {
223         return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
224 }
225
226 /* Greatest common divisor */
227 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
228 {
229         uint16_t r = a % b;
230
231         return r ? rxa_gcd_u16(b, r) : b;
232 }
233
234 /* Returns the next queue in the polling sequence
235  *
236  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
237  */
238 static int
239 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
240          unsigned int n, int *cw,
241          struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
242          uint16_t gcd, int prev)
243 {
244         int i = prev;
245         uint16_t w;
246
247         while (1) {
248                 uint16_t q;
249                 uint16_t d;
250
251                 i = (i + 1) % n;
252                 if (i == 0) {
253                         *cw = *cw - gcd;
254                         if (*cw <= 0)
255                                 *cw = max_wt;
256                 }
257
258                 q = eth_rx_poll[i].eth_rx_qid;
259                 d = eth_rx_poll[i].eth_dev_id;
260                 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
261
262                 if ((int)w >= *cw)
263                         return i;
264         }
265 }
266
267 static inline int
268 rxa_shared_intr(struct eth_device_info *dev_info,
269         int rx_queue_id)
270 {
271         int multi_intr_cap;
272
273         if (dev_info->dev->intr_handle == NULL)
274                 return 0;
275
276         multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
277         return !multi_intr_cap ||
278                 rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
279 }
280
281 static inline int
282 rxa_intr_queue(struct eth_device_info *dev_info,
283         int rx_queue_id)
284 {
285         struct eth_rx_queue_info *queue_info;
286
287         queue_info = &dev_info->rx_queue[rx_queue_id];
288         return dev_info->rx_queue &&
289                 !dev_info->internal_event_port &&
290                 queue_info->queue_enabled && queue_info->wt == 0;
291 }
292
293 static inline int
294 rxa_polled_queue(struct eth_device_info *dev_info,
295         int rx_queue_id)
296 {
297         struct eth_rx_queue_info *queue_info;
298
299         queue_info = &dev_info->rx_queue[rx_queue_id];
300         return !dev_info->internal_event_port &&
301                 dev_info->rx_queue &&
302                 queue_info->queue_enabled && queue_info->wt != 0;
303 }
304
305 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
306 static int
307 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
308 {
309         uint16_t i;
310         int n, s;
311         uint16_t nbq;
312
313         nbq = dev_info->dev->data->nb_rx_queues;
314         n = 0; /* non shared count */
315         s = 0; /* shared count */
316
317         if (rx_queue_id == -1) {
318                 for (i = 0; i < nbq; i++) {
319                         if (!rxa_shared_intr(dev_info, i))
320                                 n += add ? !rxa_intr_queue(dev_info, i) :
321                                         rxa_intr_queue(dev_info, i);
322                         else
323                                 s += add ? !rxa_intr_queue(dev_info, i) :
324                                         rxa_intr_queue(dev_info, i);
325                 }
326
327                 if (s > 0) {
328                         if ((add && dev_info->nb_shared_intr == 0) ||
329                                 (!add && dev_info->nb_shared_intr))
330                                 n += 1;
331                 }
332         } else {
333                 if (!rxa_shared_intr(dev_info, rx_queue_id))
334                         n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
335                                 rxa_intr_queue(dev_info, rx_queue_id);
336                 else
337                         n = add ? !dev_info->nb_shared_intr :
338                                 dev_info->nb_shared_intr == 1;
339         }
340
341         return add ? n : -n;
342 }
343
344 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
345  */
346 static void
347 rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
348                         struct eth_device_info *dev_info,
349                         int rx_queue_id,
350                         uint32_t *nb_rx_intr)
351 {
352         uint32_t intr_diff;
353
354         if (rx_queue_id == -1)
355                 intr_diff = dev_info->nb_rx_intr;
356         else
357                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
358
359         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
360 }
361
362 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
363  * interrupt queues could currently be poll mode Rx queues
364  */
365 static void
366 rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
367                         struct eth_device_info *dev_info,
368                         int rx_queue_id,
369                         uint32_t *nb_rx_poll,
370                         uint32_t *nb_rx_intr,
371                         uint32_t *nb_wrr)
372 {
373         uint32_t intr_diff;
374         uint32_t poll_diff;
375         uint32_t wrr_len_diff;
376
377         if (rx_queue_id == -1) {
378                 intr_diff = dev_info->dev->data->nb_rx_queues -
379                                                 dev_info->nb_rx_intr;
380                 poll_diff = dev_info->nb_rx_poll;
381                 wrr_len_diff = dev_info->wrr_len;
382         } else {
383                 intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
384                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
385                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
386                                         0;
387         }
388
389         *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
390         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
391         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
392 }
393
394 /* Calculate size of the eth_rx_poll and wrr_sched arrays
395  * after deleting poll mode rx queues
396  */
397 static void
398 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
399                         struct eth_device_info *dev_info,
400                         int rx_queue_id,
401                         uint32_t *nb_rx_poll,
402                         uint32_t *nb_wrr)
403 {
404         uint32_t poll_diff;
405         uint32_t wrr_len_diff;
406
407         if (rx_queue_id == -1) {
408                 poll_diff = dev_info->nb_rx_poll;
409                 wrr_len_diff = dev_info->wrr_len;
410         } else {
411                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
412                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
413                                         0;
414         }
415
416         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
417         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
418 }
419
420 /* Calculate nb_rx_* after adding poll mode rx queues
421  */
422 static void
423 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
424                         struct eth_device_info *dev_info,
425                         int rx_queue_id,
426                         uint16_t wt,
427                         uint32_t *nb_rx_poll,
428                         uint32_t *nb_rx_intr,
429                         uint32_t *nb_wrr)
430 {
431         uint32_t intr_diff;
432         uint32_t poll_diff;
433         uint32_t wrr_len_diff;
434
435         if (rx_queue_id == -1) {
436                 intr_diff = dev_info->nb_rx_intr;
437                 poll_diff = dev_info->dev->data->nb_rx_queues -
438                                                 dev_info->nb_rx_poll;
439                 wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
440                                 - dev_info->wrr_len;
441         } else {
442                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
443                 poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
444                 wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
445                                 wt - dev_info->rx_queue[rx_queue_id].wt :
446                                 wt;
447         }
448
449         *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
450         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
451         *nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
452 }
453
454 /* Calculate nb_rx_* after adding rx_queue_id */
455 static void
456 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
457                 struct eth_device_info *dev_info,
458                 int rx_queue_id,
459                 uint16_t wt,
460                 uint32_t *nb_rx_poll,
461                 uint32_t *nb_rx_intr,
462                 uint32_t *nb_wrr)
463 {
464         if (wt != 0)
465                 rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
466                                         wt, nb_rx_poll, nb_rx_intr, nb_wrr);
467         else
468                 rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
469                                         nb_rx_poll, nb_rx_intr, nb_wrr);
470 }
471
472 /* Calculate nb_rx_* after deleting rx_queue_id */
473 static void
474 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
475                 struct eth_device_info *dev_info,
476                 int rx_queue_id,
477                 uint32_t *nb_rx_poll,
478                 uint32_t *nb_rx_intr,
479                 uint32_t *nb_wrr)
480 {
481         rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
482                                 nb_wrr);
483         rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
484                                 nb_rx_intr);
485 }
486
487 /*
488  * Allocate the rx_poll array
489  */
490 static struct eth_rx_poll_entry *
491 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
492         uint32_t num_rx_polled)
493 {
494         size_t len;
495
496         len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
497                                                         RTE_CACHE_LINE_SIZE);
498         return  rte_zmalloc_socket(rx_adapter->mem_name,
499                                 len,
500                                 RTE_CACHE_LINE_SIZE,
501                                 rx_adapter->socket_id);
502 }
503
504 /*
505  * Allocate the WRR array
506  */
507 static uint32_t *
508 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
509 {
510         size_t len;
511
512         len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
513                         RTE_CACHE_LINE_SIZE);
514         return  rte_zmalloc_socket(rx_adapter->mem_name,
515                                 len,
516                                 RTE_CACHE_LINE_SIZE,
517                                 rx_adapter->socket_id);
518 }
519
520 static int
521 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
522                 uint32_t nb_poll,
523                 uint32_t nb_wrr,
524                 struct eth_rx_poll_entry **rx_poll,
525                 uint32_t **wrr_sched)
526 {
527
528         if (nb_poll == 0) {
529                 *rx_poll = NULL;
530                 *wrr_sched = NULL;
531                 return 0;
532         }
533
534         *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
535         if (*rx_poll == NULL) {
536                 *wrr_sched = NULL;
537                 return -ENOMEM;
538         }
539
540         *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
541         if (*wrr_sched == NULL) {
542                 rte_free(*rx_poll);
543                 return -ENOMEM;
544         }
545         return 0;
546 }
547
548 /* Precalculate WRR polling sequence for all queues in rx_adapter */
549 static void
550 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
551                 struct eth_rx_poll_entry *rx_poll,
552                 uint32_t *rx_wrr)
553 {
554         uint16_t d;
555         uint16_t q;
556         unsigned int i;
557         int prev = -1;
558         int cw = -1;
559
560         /* Initialize variables for calculation of wrr schedule */
561         uint16_t max_wrr_pos = 0;
562         unsigned int poll_q = 0;
563         uint16_t max_wt = 0;
564         uint16_t gcd = 0;
565
566         if (rx_poll == NULL)
567                 return;
568
569         /* Generate array of all queues to poll, the size of this
570          * array is poll_q
571          */
572         RTE_ETH_FOREACH_DEV(d) {
573                 uint16_t nb_rx_queues;
574                 struct eth_device_info *dev_info =
575                                 &rx_adapter->eth_devices[d];
576                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
577                 if (dev_info->rx_queue == NULL)
578                         continue;
579                 if (dev_info->internal_event_port)
580                         continue;
581                 dev_info->wrr_len = 0;
582                 for (q = 0; q < nb_rx_queues; q++) {
583                         struct eth_rx_queue_info *queue_info =
584                                 &dev_info->rx_queue[q];
585                         uint16_t wt;
586
587                         if (!rxa_polled_queue(dev_info, q))
588                                 continue;
589                         wt = queue_info->wt;
590                         rx_poll[poll_q].eth_dev_id = d;
591                         rx_poll[poll_q].eth_rx_qid = q;
592                         max_wrr_pos += wt;
593                         dev_info->wrr_len += wt;
594                         max_wt = RTE_MAX(max_wt, wt);
595                         gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
596                         poll_q++;
597                 }
598         }
599
600         /* Generate polling sequence based on weights */
601         prev = -1;
602         cw = -1;
603         for (i = 0; i < max_wrr_pos; i++) {
604                 rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
605                                      rx_poll, max_wt, gcd, prev);
606                 prev = rx_wrr[i];
607         }
608 }
609
610 static inline void
611 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
612         struct rte_ipv6_hdr **ipv6_hdr)
613 {
614         struct rte_ether_hdr *eth_hdr =
615                 rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
616         struct rte_vlan_hdr *vlan_hdr;
617
618         *ipv4_hdr = NULL;
619         *ipv6_hdr = NULL;
620
621         switch (eth_hdr->ether_type) {
622         case RTE_BE16(RTE_ETHER_TYPE_IPV4):
623                 *ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
624                 break;
625
626         case RTE_BE16(RTE_ETHER_TYPE_IPV6):
627                 *ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
628                 break;
629
630         case RTE_BE16(RTE_ETHER_TYPE_VLAN):
631                 vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
632                 switch (vlan_hdr->eth_proto) {
633                 case RTE_BE16(RTE_ETHER_TYPE_IPV4):
634                         *ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
635                         break;
636                 case RTE_BE16(RTE_ETHER_TYPE_IPV6):
637                         *ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
638                         break;
639                 default:
640                         break;
641                 }
642                 break;
643
644         default:
645                 break;
646         }
647 }
648
649 /* Calculate RSS hash for IPv4/6 */
650 static inline uint32_t
651 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
652 {
653         uint32_t input_len;
654         void *tuple;
655         struct rte_ipv4_tuple ipv4_tuple;
656         struct rte_ipv6_tuple ipv6_tuple;
657         struct rte_ipv4_hdr *ipv4_hdr;
658         struct rte_ipv6_hdr *ipv6_hdr;
659
660         rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
661
662         if (ipv4_hdr) {
663                 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
664                 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
665                 tuple = &ipv4_tuple;
666                 input_len = RTE_THASH_V4_L3_LEN;
667         } else if (ipv6_hdr) {
668                 rte_thash_load_v6_addrs(ipv6_hdr,
669                                         (union rte_thash_tuple *)&ipv6_tuple);
670                 tuple = &ipv6_tuple;
671                 input_len = RTE_THASH_V6_L3_LEN;
672         } else
673                 return 0;
674
675         return rte_softrss_be(tuple, input_len, rss_key_be);
676 }
677
678 static inline int
679 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
680 {
681         return !!rx_adapter->enq_block_count;
682 }
683
684 static inline void
685 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
686 {
687         if (rx_adapter->rx_enq_block_start_ts)
688                 return;
689
690         rx_adapter->enq_block_count++;
691         if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
692                 return;
693
694         rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
695 }
696
697 static inline void
698 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
699                     struct rte_event_eth_rx_adapter_stats *stats)
700 {
701         if (unlikely(!stats->rx_enq_start_ts))
702                 stats->rx_enq_start_ts = rte_get_tsc_cycles();
703
704         if (likely(!rxa_enq_blocked(rx_adapter)))
705                 return;
706
707         rx_adapter->enq_block_count = 0;
708         if (rx_adapter->rx_enq_block_start_ts) {
709                 stats->rx_enq_end_ts = rte_get_tsc_cycles();
710                 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
711                     rx_adapter->rx_enq_block_start_ts;
712                 rx_adapter->rx_enq_block_start_ts = 0;
713         }
714 }
715
716 /* Enqueue buffered events to event device */
717 static inline uint16_t
718 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
719 {
720         struct rte_eth_event_enqueue_buffer *buf =
721             &rx_adapter->event_enqueue_buffer;
722         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
723
724         uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
725                                         rx_adapter->event_port_id,
726                                         buf->events,
727                                         buf->count);
728         if (n != buf->count) {
729                 memmove(buf->events,
730                         &buf->events[n],
731                         (buf->count - n) * sizeof(struct rte_event));
732                 stats->rx_enq_retry++;
733         }
734
735         n ? rxa_enq_block_end_ts(rx_adapter, stats) :
736                 rxa_enq_block_start_ts(rx_adapter);
737
738         buf->count -= n;
739         stats->rx_enq_count += n;
740
741         return n;
742 }
743
744 static inline void
745 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
746                 uint16_t eth_dev_id,
747                 uint16_t rx_queue_id,
748                 struct rte_mbuf **mbufs,
749                 uint16_t num)
750 {
751         uint32_t i;
752         struct eth_device_info *dev_info =
753                                         &rx_adapter->eth_devices[eth_dev_id];
754         struct eth_rx_queue_info *eth_rx_queue_info =
755                                         &dev_info->rx_queue[rx_queue_id];
756         struct rte_eth_event_enqueue_buffer *buf =
757                                         &rx_adapter->event_enqueue_buffer;
758         struct rte_event *ev = &buf->events[buf->count];
759         uint64_t event = eth_rx_queue_info->event;
760         uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
761         struct rte_mbuf *m = mbufs[0];
762         uint32_t rss_mask;
763         uint32_t rss;
764         int do_rss;
765         uint64_t ts;
766         uint16_t nb_cb;
767         uint16_t dropped;
768
769         /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
770         rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
771         do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
772
773         if ((m->ol_flags & PKT_RX_TIMESTAMP) == 0) {
774                 ts = rte_get_tsc_cycles();
775                 for (i = 0; i < num; i++) {
776                         m = mbufs[i];
777
778                         m->timestamp = ts;
779                         m->ol_flags |= PKT_RX_TIMESTAMP;
780                 }
781         }
782
783         for (i = 0; i < num; i++) {
784                 m = mbufs[i];
785
786                 rss = do_rss ?
787                         rxa_do_softrss(m, rx_adapter->rss_key_be) :
788                         m->hash.rss;
789                 ev->event = event;
790                 ev->flow_id = (rss & ~flow_id_mask) |
791                                 (ev->flow_id & flow_id_mask);
792                 ev->mbuf = m;
793                 ev++;
794         }
795
796         if (dev_info->cb_fn) {
797
798                 dropped = 0;
799                 nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
800                                         ETH_EVENT_BUFFER_SIZE, buf->count, ev,
801                                         num, dev_info->cb_arg, &dropped);
802                 if (unlikely(nb_cb > num))
803                         RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
804                                 nb_cb, num);
805                 else
806                         num = nb_cb;
807                 if (dropped)
808                         rx_adapter->stats.rx_dropped += dropped;
809         }
810
811         buf->count += num;
812 }
813
814 /* Enqueue packets from  <port, q>  to event buffer */
815 static inline uint32_t
816 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
817         uint16_t port_id,
818         uint16_t queue_id,
819         uint32_t rx_count,
820         uint32_t max_rx,
821         int *rxq_empty)
822 {
823         struct rte_mbuf *mbufs[BATCH_SIZE];
824         struct rte_eth_event_enqueue_buffer *buf =
825                                         &rx_adapter->event_enqueue_buffer;
826         struct rte_event_eth_rx_adapter_stats *stats =
827                                         &rx_adapter->stats;
828         uint16_t n;
829         uint32_t nb_rx = 0;
830
831         if (rxq_empty)
832                 *rxq_empty = 0;
833         /* Don't do a batch dequeue from the rx queue if there isn't
834          * enough space in the enqueue buffer.
835          */
836         while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
837                 if (buf->count >= BATCH_SIZE)
838                         rxa_flush_event_buffer(rx_adapter);
839
840                 stats->rx_poll_count++;
841                 n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
842                 if (unlikely(!n)) {
843                         if (rxq_empty)
844                                 *rxq_empty = 1;
845                         break;
846                 }
847                 rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
848                 nb_rx += n;
849                 if (rx_count + nb_rx > max_rx)
850                         break;
851         }
852
853         if (buf->count > 0)
854                 rxa_flush_event_buffer(rx_adapter);
855
856         return nb_rx;
857 }
858
859 static inline void
860 rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
861                 void *data)
862 {
863         uint16_t port_id;
864         uint16_t queue;
865         int err;
866         union queue_data qd;
867         struct eth_device_info *dev_info;
868         struct eth_rx_queue_info *queue_info;
869         int *intr_enabled;
870
871         qd.ptr = data;
872         port_id = qd.port;
873         queue = qd.queue;
874
875         dev_info = &rx_adapter->eth_devices[port_id];
876         queue_info = &dev_info->rx_queue[queue];
877         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
878         if (rxa_shared_intr(dev_info, queue))
879                 intr_enabled = &dev_info->shared_intr_enabled;
880         else
881                 intr_enabled = &queue_info->intr_enabled;
882
883         if (*intr_enabled) {
884                 *intr_enabled = 0;
885                 err = rte_ring_enqueue(rx_adapter->intr_ring, data);
886                 /* Entry should always be available.
887                  * The ring size equals the maximum number of interrupt
888                  * vectors supported (an interrupt vector is shared in
889                  * case of shared interrupts)
890                  */
891                 if (err)
892                         RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
893                                 " to ring: %s", strerror(-err));
894                 else
895                         rte_eth_dev_rx_intr_disable(port_id, queue);
896         }
897         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
898 }
899
900 static int
901 rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
902                         uint32_t num_intr_vec)
903 {
904         if (rx_adapter->num_intr_vec + num_intr_vec >
905                                 RTE_EVENT_ETH_INTR_RING_SIZE) {
906                 RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
907                 " %d needed %d limit %d", rx_adapter->num_intr_vec,
908                 num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
909                 return -ENOSPC;
910         }
911
912         return 0;
913 }
914
915 /* Delete entries for (dev, queue) from the interrupt ring */
916 static void
917 rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
918                         struct eth_device_info *dev_info,
919                         uint16_t rx_queue_id)
920 {
921         int i, n;
922         union queue_data qd;
923
924         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
925
926         n = rte_ring_count(rx_adapter->intr_ring);
927         for (i = 0; i < n; i++) {
928                 rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
929                 if (!rxa_shared_intr(dev_info, rx_queue_id)) {
930                         if (qd.port == dev_info->dev->data->port_id &&
931                                 qd.queue == rx_queue_id)
932                                 continue;
933                 } else {
934                         if (qd.port == dev_info->dev->data->port_id)
935                                 continue;
936                 }
937                 rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
938         }
939
940         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
941 }
942
943 /* pthread callback handling interrupt mode receive queues
944  * After receiving an Rx interrupt, it enqueues the port id and queue id of the
945  * interrupting queue to the adapter's ring buffer for interrupt events.
946  * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
947  * the adapter service function.
948  */
949 static void *
950 rxa_intr_thread(void *arg)
951 {
952         struct rte_event_eth_rx_adapter *rx_adapter = arg;
953         struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
954         int n, i;
955
956         while (1) {
957                 n = rte_epoll_wait(rx_adapter->epd, epoll_events,
958                                 RTE_EVENT_ETH_INTR_RING_SIZE, -1);
959                 if (unlikely(n < 0))
960                         RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
961                                         n);
962                 for (i = 0; i < n; i++) {
963                         rxa_intr_ring_enqueue(rx_adapter,
964                                         epoll_events[i].epdata.data);
965                 }
966         }
967
968         return NULL;
969 }
970
971 /* Dequeue <port, q> from interrupt ring and enqueue received
972  * mbufs to eventdev
973  */
974 static inline uint32_t
975 rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
976 {
977         uint32_t n;
978         uint32_t nb_rx = 0;
979         int rxq_empty;
980         struct rte_eth_event_enqueue_buffer *buf;
981         rte_spinlock_t *ring_lock;
982         uint8_t max_done = 0;
983
984         if (rx_adapter->num_rx_intr == 0)
985                 return 0;
986
987         if (rte_ring_count(rx_adapter->intr_ring) == 0
988                 && !rx_adapter->qd_valid)
989                 return 0;
990
991         buf = &rx_adapter->event_enqueue_buffer;
992         ring_lock = &rx_adapter->intr_ring_lock;
993
994         if (buf->count >= BATCH_SIZE)
995                 rxa_flush_event_buffer(rx_adapter);
996
997         while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
998                 struct eth_device_info *dev_info;
999                 uint16_t port;
1000                 uint16_t queue;
1001                 union queue_data qd  = rx_adapter->qd;
1002                 int err;
1003
1004                 if (!rx_adapter->qd_valid) {
1005                         struct eth_rx_queue_info *queue_info;
1006
1007                         rte_spinlock_lock(ring_lock);
1008                         err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1009                         if (err) {
1010                                 rte_spinlock_unlock(ring_lock);
1011                                 break;
1012                         }
1013
1014                         port = qd.port;
1015                         queue = qd.queue;
1016                         rx_adapter->qd = qd;
1017                         rx_adapter->qd_valid = 1;
1018                         dev_info = &rx_adapter->eth_devices[port];
1019                         if (rxa_shared_intr(dev_info, queue))
1020                                 dev_info->shared_intr_enabled = 1;
1021                         else {
1022                                 queue_info = &dev_info->rx_queue[queue];
1023                                 queue_info->intr_enabled = 1;
1024                         }
1025                         rte_eth_dev_rx_intr_enable(port, queue);
1026                         rte_spinlock_unlock(ring_lock);
1027                 } else {
1028                         port = qd.port;
1029                         queue = qd.queue;
1030
1031                         dev_info = &rx_adapter->eth_devices[port];
1032                 }
1033
1034                 if (rxa_shared_intr(dev_info, queue)) {
1035                         uint16_t i;
1036                         uint16_t nb_queues;
1037
1038                         nb_queues = dev_info->dev->data->nb_rx_queues;
1039                         n = 0;
1040                         for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1041                                 uint8_t enq_buffer_full;
1042
1043                                 if (!rxa_intr_queue(dev_info, i))
1044                                         continue;
1045                                 n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1046                                         rx_adapter->max_nb_rx,
1047                                         &rxq_empty);
1048                                 nb_rx += n;
1049
1050                                 enq_buffer_full = !rxq_empty && n == 0;
1051                                 max_done = nb_rx > rx_adapter->max_nb_rx;
1052
1053                                 if (enq_buffer_full || max_done) {
1054                                         dev_info->next_q_idx = i;
1055                                         goto done;
1056                                 }
1057                         }
1058
1059                         rx_adapter->qd_valid = 0;
1060
1061                         /* Reinitialize for next interrupt */
1062                         dev_info->next_q_idx = dev_info->multi_intr_cap ?
1063                                                 RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1064                                                 0;
1065                 } else {
1066                         n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1067                                 rx_adapter->max_nb_rx,
1068                                 &rxq_empty);
1069                         rx_adapter->qd_valid = !rxq_empty;
1070                         nb_rx += n;
1071                         if (nb_rx > rx_adapter->max_nb_rx)
1072                                 break;
1073                 }
1074         }
1075
1076 done:
1077         rx_adapter->stats.rx_intr_packets += nb_rx;
1078         return nb_rx;
1079 }
1080
1081 /*
1082  * Polls receive queues added to the event adapter and enqueues received
1083  * packets to the event device.
1084  *
1085  * The receive code enqueues initially to a temporary buffer, the
1086  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1087  *
1088  * If there isn't space available in the temporary buffer, packets from the
1089  * Rx queue aren't dequeued from the eth device, this back pressures the
1090  * eth device, in virtual device environments this back pressure is relayed to
1091  * the hypervisor's switching layer where adjustments can be made to deal with
1092  * it.
1093  */
1094 static inline uint32_t
1095 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
1096 {
1097         uint32_t num_queue;
1098         uint32_t nb_rx = 0;
1099         struct rte_eth_event_enqueue_buffer *buf;
1100         uint32_t wrr_pos;
1101         uint32_t max_nb_rx;
1102
1103         wrr_pos = rx_adapter->wrr_pos;
1104         max_nb_rx = rx_adapter->max_nb_rx;
1105         buf = &rx_adapter->event_enqueue_buffer;
1106
1107         /* Iterate through a WRR sequence */
1108         for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1109                 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1110                 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1111                 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1112
1113                 /* Don't do a batch dequeue from the rx queue if there isn't
1114                  * enough space in the enqueue buffer.
1115                  */
1116                 if (buf->count >= BATCH_SIZE)
1117                         rxa_flush_event_buffer(rx_adapter);
1118                 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
1119                         rx_adapter->wrr_pos = wrr_pos;
1120                         return nb_rx;
1121                 }
1122
1123                 nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1124                                 NULL);
1125                 if (nb_rx > max_nb_rx) {
1126                         rx_adapter->wrr_pos =
1127                                     (wrr_pos + 1) % rx_adapter->wrr_len;
1128                         break;
1129                 }
1130
1131                 if (++wrr_pos == rx_adapter->wrr_len)
1132                         wrr_pos = 0;
1133         }
1134         return nb_rx;
1135 }
1136
1137 static int
1138 rxa_service_func(void *args)
1139 {
1140         struct rte_event_eth_rx_adapter *rx_adapter = args;
1141         struct rte_event_eth_rx_adapter_stats *stats;
1142
1143         if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1144                 return 0;
1145         if (!rx_adapter->rxa_started) {
1146                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1147                 return 0;
1148         }
1149
1150         stats = &rx_adapter->stats;
1151         stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
1152         stats->rx_packets += rxa_poll(rx_adapter);
1153         rte_spinlock_unlock(&rx_adapter->rx_lock);
1154         return 0;
1155 }
1156
1157 static int
1158 rte_event_eth_rx_adapter_init(void)
1159 {
1160         const char *name = "rte_event_eth_rx_adapter_array";
1161         const struct rte_memzone *mz;
1162         unsigned int sz;
1163
1164         sz = sizeof(*event_eth_rx_adapter) *
1165             RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1166         sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1167
1168         mz = rte_memzone_lookup(name);
1169         if (mz == NULL) {
1170                 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1171                                                  RTE_CACHE_LINE_SIZE);
1172                 if (mz == NULL) {
1173                         RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1174                                         PRId32, rte_errno);
1175                         return -rte_errno;
1176                 }
1177         }
1178
1179         event_eth_rx_adapter = mz->addr;
1180         return 0;
1181 }
1182
1183 static inline struct rte_event_eth_rx_adapter *
1184 rxa_id_to_adapter(uint8_t id)
1185 {
1186         return event_eth_rx_adapter ?
1187                 event_eth_rx_adapter[id] : NULL;
1188 }
1189
1190 static int
1191 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1192                 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1193 {
1194         int ret;
1195         struct rte_eventdev *dev;
1196         struct rte_event_dev_config dev_conf;
1197         int started;
1198         uint8_t port_id;
1199         struct rte_event_port_conf *port_conf = arg;
1200         struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1201
1202         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1203         dev_conf = dev->data->dev_conf;
1204
1205         started = dev->data->dev_started;
1206         if (started)
1207                 rte_event_dev_stop(dev_id);
1208         port_id = dev_conf.nb_event_ports;
1209         dev_conf.nb_event_ports += 1;
1210         ret = rte_event_dev_configure(dev_id, &dev_conf);
1211         if (ret) {
1212                 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1213                                                 dev_id);
1214                 if (started) {
1215                         if (rte_event_dev_start(dev_id))
1216                                 return -EIO;
1217                 }
1218                 return ret;
1219         }
1220
1221         ret = rte_event_port_setup(dev_id, port_id, port_conf);
1222         if (ret) {
1223                 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1224                                         port_id);
1225                 return ret;
1226         }
1227
1228         conf->event_port_id = port_id;
1229         conf->max_nb_rx = 128;
1230         if (started)
1231                 ret = rte_event_dev_start(dev_id);
1232         rx_adapter->default_cb_arg = 1;
1233         return ret;
1234 }
1235
1236 static int
1237 rxa_epoll_create1(void)
1238 {
1239 #if defined(LINUX)
1240         int fd;
1241         fd = epoll_create1(EPOLL_CLOEXEC);
1242         return fd < 0 ? -errno : fd;
1243 #elif defined(BSD)
1244         return -ENOTSUP;
1245 #endif
1246 }
1247
1248 static int
1249 rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
1250 {
1251         if (rx_adapter->epd != INIT_FD)
1252                 return 0;
1253
1254         rx_adapter->epd = rxa_epoll_create1();
1255         if (rx_adapter->epd < 0) {
1256                 int err = rx_adapter->epd;
1257                 rx_adapter->epd = INIT_FD;
1258                 RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1259                 return err;
1260         }
1261
1262         return 0;
1263 }
1264
1265 static int
1266 rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1267 {
1268         int err;
1269         char thread_name[RTE_MAX_THREAD_NAME_LEN];
1270
1271         if (rx_adapter->intr_ring)
1272                 return 0;
1273
1274         rx_adapter->intr_ring = rte_ring_create("intr_ring",
1275                                         RTE_EVENT_ETH_INTR_RING_SIZE,
1276                                         rte_socket_id(), 0);
1277         if (!rx_adapter->intr_ring)
1278                 return -ENOMEM;
1279
1280         rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1281                                         RTE_EVENT_ETH_INTR_RING_SIZE *
1282                                         sizeof(struct rte_epoll_event),
1283                                         RTE_CACHE_LINE_SIZE,
1284                                         rx_adapter->socket_id);
1285         if (!rx_adapter->epoll_events) {
1286                 err = -ENOMEM;
1287                 goto error;
1288         }
1289
1290         rte_spinlock_init(&rx_adapter->intr_ring_lock);
1291
1292         snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1293                         "rx-intr-thread-%d", rx_adapter->id);
1294
1295         err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1296                                 NULL, rxa_intr_thread, rx_adapter);
1297         if (!err) {
1298                 rte_thread_setname(rx_adapter->rx_intr_thread, thread_name);
1299                 return 0;
1300         }
1301
1302         RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1303 error:
1304         rte_ring_free(rx_adapter->intr_ring);
1305         rx_adapter->intr_ring = NULL;
1306         rx_adapter->epoll_events = NULL;
1307         return err;
1308 }
1309
1310 static int
1311 rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1312 {
1313         int err;
1314
1315         err = pthread_cancel(rx_adapter->rx_intr_thread);
1316         if (err)
1317                 RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1318                                 err);
1319
1320         err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1321         if (err)
1322                 RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1323
1324         rte_free(rx_adapter->epoll_events);
1325         rte_ring_free(rx_adapter->intr_ring);
1326         rx_adapter->intr_ring = NULL;
1327         rx_adapter->epoll_events = NULL;
1328         return 0;
1329 }
1330
1331 static int
1332 rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
1333 {
1334         int ret;
1335
1336         if (rx_adapter->num_rx_intr == 0)
1337                 return 0;
1338
1339         ret = rxa_destroy_intr_thread(rx_adapter);
1340         if (ret)
1341                 return ret;
1342
1343         close(rx_adapter->epd);
1344         rx_adapter->epd = INIT_FD;
1345
1346         return ret;
1347 }
1348
1349 static int
1350 rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1351         struct eth_device_info *dev_info,
1352         uint16_t rx_queue_id)
1353 {
1354         int err;
1355         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1356         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1357
1358         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1359         if (err) {
1360                 RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1361                         rx_queue_id);
1362                 return err;
1363         }
1364
1365         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1366                                         rx_adapter->epd,
1367                                         RTE_INTR_EVENT_DEL,
1368                                         0);
1369         if (err)
1370                 RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1371
1372         if (sintr)
1373                 dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1374         else
1375                 dev_info->shared_intr_enabled = 0;
1376         return err;
1377 }
1378
1379 static int
1380 rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1381                 struct eth_device_info *dev_info,
1382                 int rx_queue_id)
1383 {
1384         int err;
1385         int i;
1386         int s;
1387
1388         if (dev_info->nb_rx_intr == 0)
1389                 return 0;
1390
1391         err = 0;
1392         if (rx_queue_id == -1) {
1393                 s = dev_info->nb_shared_intr;
1394                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1395                         int sintr;
1396                         uint16_t q;
1397
1398                         q = dev_info->intr_queue[i];
1399                         sintr = rxa_shared_intr(dev_info, q);
1400                         s -= sintr;
1401
1402                         if (!sintr || s == 0) {
1403
1404                                 err = rxa_disable_intr(rx_adapter, dev_info,
1405                                                 q);
1406                                 if (err)
1407                                         return err;
1408                                 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1409                                                         q);
1410                         }
1411                 }
1412         } else {
1413                 if (!rxa_intr_queue(dev_info, rx_queue_id))
1414                         return 0;
1415                 if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1416                                 dev_info->nb_shared_intr == 1) {
1417                         err = rxa_disable_intr(rx_adapter, dev_info,
1418                                         rx_queue_id);
1419                         if (err)
1420                                 return err;
1421                         rxa_intr_ring_del_entries(rx_adapter, dev_info,
1422                                                 rx_queue_id);
1423                 }
1424
1425                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1426                         if (dev_info->intr_queue[i] == rx_queue_id) {
1427                                 for (; i < dev_info->nb_rx_intr - 1; i++)
1428                                         dev_info->intr_queue[i] =
1429                                                 dev_info->intr_queue[i + 1];
1430                                 break;
1431                         }
1432                 }
1433         }
1434
1435         return err;
1436 }
1437
1438 static int
1439 rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1440         struct eth_device_info *dev_info,
1441         uint16_t rx_queue_id)
1442 {
1443         int err, err1;
1444         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1445         union queue_data qd;
1446         int init_fd;
1447         uint16_t *intr_queue;
1448         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1449
1450         if (rxa_intr_queue(dev_info, rx_queue_id))
1451                 return 0;
1452
1453         intr_queue = dev_info->intr_queue;
1454         if (dev_info->intr_queue == NULL) {
1455                 size_t len =
1456                         dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1457                 dev_info->intr_queue =
1458                         rte_zmalloc_socket(
1459                                 rx_adapter->mem_name,
1460                                 len,
1461                                 0,
1462                                 rx_adapter->socket_id);
1463                 if (dev_info->intr_queue == NULL)
1464                         return -ENOMEM;
1465         }
1466
1467         init_fd = rx_adapter->epd;
1468         err = rxa_init_epd(rx_adapter);
1469         if (err)
1470                 goto err_free_queue;
1471
1472         qd.port = eth_dev_id;
1473         qd.queue = rx_queue_id;
1474
1475         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1476                                         rx_adapter->epd,
1477                                         RTE_INTR_EVENT_ADD,
1478                                         qd.ptr);
1479         if (err) {
1480                 RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1481                         " Rx Queue %u err %d", rx_queue_id, err);
1482                 goto err_del_fd;
1483         }
1484
1485         err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1486         if (err) {
1487                 RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1488                                 " Rx Queue %u err %d", rx_queue_id, err);
1489
1490                 goto err_del_event;
1491         }
1492
1493         err = rxa_create_intr_thread(rx_adapter);
1494         if (!err)  {
1495                 if (sintr)
1496                         dev_info->shared_intr_enabled = 1;
1497                 else
1498                         dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1499                 return 0;
1500         }
1501
1502
1503         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1504         if (err)
1505                 RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1506                                 " Rx Queue %u err %d", rx_queue_id, err);
1507 err_del_event:
1508         err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1509                                         rx_adapter->epd,
1510                                         RTE_INTR_EVENT_DEL,
1511                                         0);
1512         if (err1) {
1513                 RTE_EDEV_LOG_ERR("Could not delete event for"
1514                                 " Rx Queue %u err %d", rx_queue_id, err1);
1515         }
1516 err_del_fd:
1517         if (init_fd == INIT_FD) {
1518                 close(rx_adapter->epd);
1519                 rx_adapter->epd = -1;
1520         }
1521 err_free_queue:
1522         if (intr_queue == NULL)
1523                 rte_free(dev_info->intr_queue);
1524
1525         return err;
1526 }
1527
1528 static int
1529 rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1530         struct eth_device_info *dev_info,
1531         int rx_queue_id)
1532
1533 {
1534         int i, j, err;
1535         int si = -1;
1536         int shared_done = (dev_info->nb_shared_intr > 0);
1537
1538         if (rx_queue_id != -1) {
1539                 if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1540                         return 0;
1541                 return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1542         }
1543
1544         err = 0;
1545         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1546
1547                 if (rxa_shared_intr(dev_info, i) && shared_done)
1548                         continue;
1549
1550                 err = rxa_config_intr(rx_adapter, dev_info, i);
1551
1552                 shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1553                 if (shared_done) {
1554                         si = i;
1555                         dev_info->shared_intr_enabled = 1;
1556                 }
1557                 if (err)
1558                         break;
1559         }
1560
1561         if (err == 0)
1562                 return 0;
1563
1564         shared_done = (dev_info->nb_shared_intr > 0);
1565         for (j = 0; j < i; j++) {
1566                 if (rxa_intr_queue(dev_info, j))
1567                         continue;
1568                 if (rxa_shared_intr(dev_info, j) && si != j)
1569                         continue;
1570                 err = rxa_disable_intr(rx_adapter, dev_info, j);
1571                 if (err)
1572                         break;
1573
1574         }
1575
1576         return err;
1577 }
1578
1579
1580 static int
1581 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
1582 {
1583         int ret;
1584         struct rte_service_spec service;
1585         struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1586
1587         if (rx_adapter->service_inited)
1588                 return 0;
1589
1590         memset(&service, 0, sizeof(service));
1591         snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1592                 "rte_event_eth_rx_adapter_%d", id);
1593         service.socket_id = rx_adapter->socket_id;
1594         service.callback = rxa_service_func;
1595         service.callback_userdata = rx_adapter;
1596         /* Service function handles locking for queue add/del updates */
1597         service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1598         ret = rte_service_component_register(&service, &rx_adapter->service_id);
1599         if (ret) {
1600                 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1601                         service.name, ret);
1602                 return ret;
1603         }
1604
1605         ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1606                 &rx_adapter_conf, rx_adapter->conf_arg);
1607         if (ret) {
1608                 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1609                         ret);
1610                 goto err_done;
1611         }
1612         rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1613         rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1614         rx_adapter->service_inited = 1;
1615         rx_adapter->epd = INIT_FD;
1616         return 0;
1617
1618 err_done:
1619         rte_service_component_unregister(rx_adapter->service_id);
1620         return ret;
1621 }
1622
1623 static void
1624 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1625                 struct eth_device_info *dev_info,
1626                 int32_t rx_queue_id,
1627                 uint8_t add)
1628 {
1629         struct eth_rx_queue_info *queue_info;
1630         int enabled;
1631         uint16_t i;
1632
1633         if (dev_info->rx_queue == NULL)
1634                 return;
1635
1636         if (rx_queue_id == -1) {
1637                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1638                         rxa_update_queue(rx_adapter, dev_info, i, add);
1639         } else {
1640                 queue_info = &dev_info->rx_queue[rx_queue_id];
1641                 enabled = queue_info->queue_enabled;
1642                 if (add) {
1643                         rx_adapter->nb_queues += !enabled;
1644                         dev_info->nb_dev_queues += !enabled;
1645                 } else {
1646                         rx_adapter->nb_queues -= enabled;
1647                         dev_info->nb_dev_queues -= enabled;
1648                 }
1649                 queue_info->queue_enabled = !!add;
1650         }
1651 }
1652
1653 static void
1654 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
1655         struct eth_device_info *dev_info,
1656         int32_t rx_queue_id)
1657 {
1658         int pollq;
1659         int intrq;
1660         int sintrq;
1661
1662
1663         if (rx_adapter->nb_queues == 0)
1664                 return;
1665
1666         if (rx_queue_id == -1) {
1667                 uint16_t nb_rx_queues;
1668                 uint16_t i;
1669
1670                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1671                 for (i = 0; i < nb_rx_queues; i++)
1672                         rxa_sw_del(rx_adapter, dev_info, i);
1673                 return;
1674         }
1675
1676         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1677         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1678         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1679         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1680         rx_adapter->num_rx_polled -= pollq;
1681         dev_info->nb_rx_poll -= pollq;
1682         rx_adapter->num_rx_intr -= intrq;
1683         dev_info->nb_rx_intr -= intrq;
1684         dev_info->nb_shared_intr -= intrq && sintrq;
1685 }
1686
1687 static void
1688 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1689         struct eth_device_info *dev_info,
1690         int32_t rx_queue_id,
1691         const struct rte_event_eth_rx_adapter_queue_conf *conf)
1692 {
1693         struct eth_rx_queue_info *queue_info;
1694         const struct rte_event *ev = &conf->ev;
1695         int pollq;
1696         int intrq;
1697         int sintrq;
1698         struct rte_event *qi_ev;
1699
1700         if (rx_queue_id == -1) {
1701                 uint16_t nb_rx_queues;
1702                 uint16_t i;
1703
1704                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1705                 for (i = 0; i < nb_rx_queues; i++)
1706                         rxa_add_queue(rx_adapter, dev_info, i, conf);
1707                 return;
1708         }
1709
1710         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1711         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1712         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1713
1714         queue_info = &dev_info->rx_queue[rx_queue_id];
1715         queue_info->wt = conf->servicing_weight;
1716
1717         qi_ev = (struct rte_event *)&queue_info->event;
1718         qi_ev->event = ev->event;
1719         qi_ev->op = RTE_EVENT_OP_NEW;
1720         qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
1721         qi_ev->sub_event_type = 0;
1722
1723         if (conf->rx_queue_flags &
1724                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
1725                 queue_info->flow_id_mask = ~0;
1726         } else
1727                 qi_ev->flow_id = 0;
1728
1729         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
1730         if (rxa_polled_queue(dev_info, rx_queue_id)) {
1731                 rx_adapter->num_rx_polled += !pollq;
1732                 dev_info->nb_rx_poll += !pollq;
1733                 rx_adapter->num_rx_intr -= intrq;
1734                 dev_info->nb_rx_intr -= intrq;
1735                 dev_info->nb_shared_intr -= intrq && sintrq;
1736         }
1737
1738         if (rxa_intr_queue(dev_info, rx_queue_id)) {
1739                 rx_adapter->num_rx_polled -= pollq;
1740                 dev_info->nb_rx_poll -= pollq;
1741                 rx_adapter->num_rx_intr += !intrq;
1742                 dev_info->nb_rx_intr += !intrq;
1743                 dev_info->nb_shared_intr += !intrq && sintrq;
1744                 if (dev_info->nb_shared_intr == 1) {
1745                         if (dev_info->multi_intr_cap)
1746                                 dev_info->next_q_idx =
1747                                         RTE_MAX_RXTX_INTR_VEC_ID - 1;
1748                         else
1749                                 dev_info->next_q_idx = 0;
1750                 }
1751         }
1752 }
1753
1754 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
1755                 uint16_t eth_dev_id,
1756                 int rx_queue_id,
1757                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
1758 {
1759         struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
1760         struct rte_event_eth_rx_adapter_queue_conf temp_conf;
1761         int ret;
1762         struct eth_rx_poll_entry *rx_poll;
1763         struct eth_rx_queue_info *rx_queue;
1764         uint32_t *rx_wrr;
1765         uint16_t nb_rx_queues;
1766         uint32_t nb_rx_poll, nb_wrr;
1767         uint32_t nb_rx_intr;
1768         int num_intr_vec;
1769         uint16_t wt;
1770
1771         if (queue_conf->servicing_weight == 0) {
1772                 struct rte_eth_dev_data *data = dev_info->dev->data;
1773
1774                 temp_conf = *queue_conf;
1775                 if (!data->dev_conf.intr_conf.rxq) {
1776                         /* If Rx interrupts are disabled set wt = 1 */
1777                         temp_conf.servicing_weight = 1;
1778                 }
1779                 queue_conf = &temp_conf;
1780         }
1781
1782         nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1783         rx_queue = dev_info->rx_queue;
1784         wt = queue_conf->servicing_weight;
1785
1786         if (dev_info->rx_queue == NULL) {
1787                 dev_info->rx_queue =
1788                     rte_zmalloc_socket(rx_adapter->mem_name,
1789                                        nb_rx_queues *
1790                                        sizeof(struct eth_rx_queue_info), 0,
1791                                        rx_adapter->socket_id);
1792                 if (dev_info->rx_queue == NULL)
1793                         return -ENOMEM;
1794         }
1795         rx_wrr = NULL;
1796         rx_poll = NULL;
1797
1798         rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
1799                         queue_conf->servicing_weight,
1800                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
1801
1802         if (dev_info->dev->intr_handle)
1803                 dev_info->multi_intr_cap =
1804                         rte_intr_cap_multiple(dev_info->dev->intr_handle);
1805
1806         ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
1807                                 &rx_poll, &rx_wrr);
1808         if (ret)
1809                 goto err_free_rxqueue;
1810
1811         if (wt == 0) {
1812                 num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
1813
1814                 ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
1815                 if (ret)
1816                         goto err_free_rxqueue;
1817
1818                 ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
1819                 if (ret)
1820                         goto err_free_rxqueue;
1821         } else {
1822
1823                 num_intr_vec = 0;
1824                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
1825                         num_intr_vec = rxa_nb_intr_vect(dev_info,
1826                                                 rx_queue_id, 0);
1827                         /* interrupt based queues are being converted to
1828                          * poll mode queues, delete the interrupt configuration
1829                          * for those.
1830                          */
1831                         ret = rxa_del_intr_queue(rx_adapter,
1832                                                 dev_info, rx_queue_id);
1833                         if (ret)
1834                                 goto err_free_rxqueue;
1835                 }
1836         }
1837
1838         if (nb_rx_intr == 0) {
1839                 ret = rxa_free_intr_resources(rx_adapter);
1840                 if (ret)
1841                         goto err_free_rxqueue;
1842         }
1843
1844         if (wt == 0) {
1845                 uint16_t i;
1846
1847                 if (rx_queue_id  == -1) {
1848                         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1849                                 dev_info->intr_queue[i] = i;
1850                 } else {
1851                         if (!rxa_intr_queue(dev_info, rx_queue_id))
1852                                 dev_info->intr_queue[nb_rx_intr - 1] =
1853                                         rx_queue_id;
1854                 }
1855         }
1856
1857
1858
1859         rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
1860         rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
1861
1862         rte_free(rx_adapter->eth_rx_poll);
1863         rte_free(rx_adapter->wrr_sched);
1864
1865         rx_adapter->eth_rx_poll = rx_poll;
1866         rx_adapter->wrr_sched = rx_wrr;
1867         rx_adapter->wrr_len = nb_wrr;
1868         rx_adapter->num_intr_vec += num_intr_vec;
1869         return 0;
1870
1871 err_free_rxqueue:
1872         if (rx_queue == NULL) {
1873                 rte_free(dev_info->rx_queue);
1874                 dev_info->rx_queue = NULL;
1875         }
1876
1877         rte_free(rx_poll);
1878         rte_free(rx_wrr);
1879
1880         return 0;
1881 }
1882
1883 static int
1884 rxa_ctrl(uint8_t id, int start)
1885 {
1886         struct rte_event_eth_rx_adapter *rx_adapter;
1887         struct rte_eventdev *dev;
1888         struct eth_device_info *dev_info;
1889         uint32_t i;
1890         int use_service = 0;
1891         int stop = !start;
1892
1893         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1894         rx_adapter = rxa_id_to_adapter(id);
1895         if (rx_adapter == NULL)
1896                 return -EINVAL;
1897
1898         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1899
1900         RTE_ETH_FOREACH_DEV(i) {
1901                 dev_info = &rx_adapter->eth_devices[i];
1902                 /* if start  check for num dev queues */
1903                 if (start && !dev_info->nb_dev_queues)
1904                         continue;
1905                 /* if stop check if dev has been started */
1906                 if (stop && !dev_info->dev_rx_started)
1907                         continue;
1908                 use_service |= !dev_info->internal_event_port;
1909                 dev_info->dev_rx_started = start;
1910                 if (dev_info->internal_event_port == 0)
1911                         continue;
1912                 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
1913                                                 &rte_eth_devices[i]) :
1914                         (*dev->dev_ops->eth_rx_adapter_stop)(dev,
1915                                                 &rte_eth_devices[i]);
1916         }
1917
1918         if (use_service) {
1919                 rte_spinlock_lock(&rx_adapter->rx_lock);
1920                 rx_adapter->rxa_started = start;
1921                 rte_service_runstate_set(rx_adapter->service_id, start);
1922                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1923         }
1924
1925         return 0;
1926 }
1927
1928 int
1929 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
1930                                 rte_event_eth_rx_adapter_conf_cb conf_cb,
1931                                 void *conf_arg)
1932 {
1933         struct rte_event_eth_rx_adapter *rx_adapter;
1934         int ret;
1935         int socket_id;
1936         uint16_t i;
1937         char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
1938         const uint8_t default_rss_key[] = {
1939                 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
1940                 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
1941                 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
1942                 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
1943                 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
1944         };
1945
1946         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1947         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1948         if (conf_cb == NULL)
1949                 return -EINVAL;
1950
1951         if (event_eth_rx_adapter == NULL) {
1952                 ret = rte_event_eth_rx_adapter_init();
1953                 if (ret)
1954                         return ret;
1955         }
1956
1957         rx_adapter = rxa_id_to_adapter(id);
1958         if (rx_adapter != NULL) {
1959                 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
1960                 return -EEXIST;
1961         }
1962
1963         socket_id = rte_event_dev_socket_id(dev_id);
1964         snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
1965                 "rte_event_eth_rx_adapter_%d",
1966                 id);
1967
1968         rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
1969                         RTE_CACHE_LINE_SIZE, socket_id);
1970         if (rx_adapter == NULL) {
1971                 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
1972                 return -ENOMEM;
1973         }
1974
1975         rx_adapter->eventdev_id = dev_id;
1976         rx_adapter->socket_id = socket_id;
1977         rx_adapter->conf_cb = conf_cb;
1978         rx_adapter->conf_arg = conf_arg;
1979         rx_adapter->id = id;
1980         strcpy(rx_adapter->mem_name, mem_name);
1981         rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
1982                                         RTE_MAX_ETHPORTS *
1983                                         sizeof(struct eth_device_info), 0,
1984                                         socket_id);
1985         rte_convert_rss_key((const uint32_t *)default_rss_key,
1986                         (uint32_t *)rx_adapter->rss_key_be,
1987                             RTE_DIM(default_rss_key));
1988
1989         if (rx_adapter->eth_devices == NULL) {
1990                 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
1991                 rte_free(rx_adapter);
1992                 return -ENOMEM;
1993         }
1994         rte_spinlock_init(&rx_adapter->rx_lock);
1995         for (i = 0; i < RTE_MAX_ETHPORTS; i++)
1996                 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
1997
1998         event_eth_rx_adapter[id] = rx_adapter;
1999         if (conf_cb == rxa_default_conf_cb)
2000                 rx_adapter->default_cb_arg = 1;
2001         return 0;
2002 }
2003
2004 int
2005 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2006                 struct rte_event_port_conf *port_config)
2007 {
2008         struct rte_event_port_conf *pc;
2009         int ret;
2010
2011         if (port_config == NULL)
2012                 return -EINVAL;
2013         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2014
2015         pc = rte_malloc(NULL, sizeof(*pc), 0);
2016         if (pc == NULL)
2017                 return -ENOMEM;
2018         *pc = *port_config;
2019         ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2020                                         rxa_default_conf_cb,
2021                                         pc);
2022         if (ret)
2023                 rte_free(pc);
2024         return ret;
2025 }
2026
2027 int
2028 rte_event_eth_rx_adapter_free(uint8_t id)
2029 {
2030         struct rte_event_eth_rx_adapter *rx_adapter;
2031
2032         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2033
2034         rx_adapter = rxa_id_to_adapter(id);
2035         if (rx_adapter == NULL)
2036                 return -EINVAL;
2037
2038         if (rx_adapter->nb_queues) {
2039                 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2040                                 rx_adapter->nb_queues);
2041                 return -EBUSY;
2042         }
2043
2044         if (rx_adapter->default_cb_arg)
2045                 rte_free(rx_adapter->conf_arg);
2046         rte_free(rx_adapter->eth_devices);
2047         rte_free(rx_adapter);
2048         event_eth_rx_adapter[id] = NULL;
2049
2050         return 0;
2051 }
2052
2053 int
2054 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2055                 uint16_t eth_dev_id,
2056                 int32_t rx_queue_id,
2057                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2058 {
2059         int ret;
2060         uint32_t cap;
2061         struct rte_event_eth_rx_adapter *rx_adapter;
2062         struct rte_eventdev *dev;
2063         struct eth_device_info *dev_info;
2064
2065         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2066         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2067
2068         rx_adapter = rxa_id_to_adapter(id);
2069         if ((rx_adapter == NULL) || (queue_conf == NULL))
2070                 return -EINVAL;
2071
2072         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2073         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2074                                                 eth_dev_id,
2075                                                 &cap);
2076         if (ret) {
2077                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2078                         "eth port %" PRIu16, id, eth_dev_id);
2079                 return ret;
2080         }
2081
2082         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2083                 && (queue_conf->rx_queue_flags &
2084                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2085                 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2086                                 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2087                                 eth_dev_id, id);
2088                 return -EINVAL;
2089         }
2090
2091         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2092                 (rx_queue_id != -1)) {
2093                 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2094                         "event queue, eth port: %" PRIu16 " adapter id: %"
2095                         PRIu8, eth_dev_id, id);
2096                 return -EINVAL;
2097         }
2098
2099         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2100                         rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2101                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2102                          (uint16_t)rx_queue_id);
2103                 return -EINVAL;
2104         }
2105
2106         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2107
2108         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2109                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2110                                         -ENOTSUP);
2111                 if (dev_info->rx_queue == NULL) {
2112                         dev_info->rx_queue =
2113                             rte_zmalloc_socket(rx_adapter->mem_name,
2114                                         dev_info->dev->data->nb_rx_queues *
2115                                         sizeof(struct eth_rx_queue_info), 0,
2116                                         rx_adapter->socket_id);
2117                         if (dev_info->rx_queue == NULL)
2118                                 return -ENOMEM;
2119                 }
2120
2121                 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2122                                 &rte_eth_devices[eth_dev_id],
2123                                 rx_queue_id, queue_conf);
2124                 if (ret == 0) {
2125                         dev_info->internal_event_port = 1;
2126                         rxa_update_queue(rx_adapter,
2127                                         &rx_adapter->eth_devices[eth_dev_id],
2128                                         rx_queue_id,
2129                                         1);
2130                 }
2131         } else {
2132                 rte_spinlock_lock(&rx_adapter->rx_lock);
2133                 dev_info->internal_event_port = 0;
2134                 ret = rxa_init_service(rx_adapter, id);
2135                 if (ret == 0) {
2136                         uint32_t service_id = rx_adapter->service_id;
2137                         ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2138                                         queue_conf);
2139                         rte_service_component_runstate_set(service_id,
2140                                 rxa_sw_adapter_queue_count(rx_adapter));
2141                 }
2142                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2143         }
2144
2145         if (ret)
2146                 return ret;
2147
2148         return 0;
2149 }
2150
2151 int
2152 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2153                                 int32_t rx_queue_id)
2154 {
2155         int ret = 0;
2156         struct rte_eventdev *dev;
2157         struct rte_event_eth_rx_adapter *rx_adapter;
2158         struct eth_device_info *dev_info;
2159         uint32_t cap;
2160         uint32_t nb_rx_poll = 0;
2161         uint32_t nb_wrr = 0;
2162         uint32_t nb_rx_intr;
2163         struct eth_rx_poll_entry *rx_poll = NULL;
2164         uint32_t *rx_wrr = NULL;
2165         int num_intr_vec;
2166
2167         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2168         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2169
2170         rx_adapter = rxa_id_to_adapter(id);
2171         if (rx_adapter == NULL)
2172                 return -EINVAL;
2173
2174         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2175         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2176                                                 eth_dev_id,
2177                                                 &cap);
2178         if (ret)
2179                 return ret;
2180
2181         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2182                 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2183                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2184                          (uint16_t)rx_queue_id);
2185                 return -EINVAL;
2186         }
2187
2188         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2189
2190         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2191                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2192                                  -ENOTSUP);
2193                 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2194                                                 &rte_eth_devices[eth_dev_id],
2195                                                 rx_queue_id);
2196                 if (ret == 0) {
2197                         rxa_update_queue(rx_adapter,
2198                                         &rx_adapter->eth_devices[eth_dev_id],
2199                                         rx_queue_id,
2200                                         0);
2201                         if (dev_info->nb_dev_queues == 0) {
2202                                 rte_free(dev_info->rx_queue);
2203                                 dev_info->rx_queue = NULL;
2204                         }
2205                 }
2206         } else {
2207                 rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2208                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2209
2210                 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2211                         &rx_poll, &rx_wrr);
2212                 if (ret)
2213                         return ret;
2214
2215                 rte_spinlock_lock(&rx_adapter->rx_lock);
2216
2217                 num_intr_vec = 0;
2218                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2219
2220                         num_intr_vec = rxa_nb_intr_vect(dev_info,
2221                                                 rx_queue_id, 0);
2222                         ret = rxa_del_intr_queue(rx_adapter, dev_info,
2223                                         rx_queue_id);
2224                         if (ret)
2225                                 goto unlock_ret;
2226                 }
2227
2228                 if (nb_rx_intr == 0) {
2229                         ret = rxa_free_intr_resources(rx_adapter);
2230                         if (ret)
2231                                 goto unlock_ret;
2232                 }
2233
2234                 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2235                 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2236
2237                 rte_free(rx_adapter->eth_rx_poll);
2238                 rte_free(rx_adapter->wrr_sched);
2239
2240                 if (nb_rx_intr == 0) {
2241                         rte_free(dev_info->intr_queue);
2242                         dev_info->intr_queue = NULL;
2243                 }
2244
2245                 rx_adapter->eth_rx_poll = rx_poll;
2246                 rx_adapter->wrr_sched = rx_wrr;
2247                 rx_adapter->wrr_len = nb_wrr;
2248                 rx_adapter->num_intr_vec += num_intr_vec;
2249
2250                 if (dev_info->nb_dev_queues == 0) {
2251                         rte_free(dev_info->rx_queue);
2252                         dev_info->rx_queue = NULL;
2253                 }
2254 unlock_ret:
2255                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2256                 if (ret) {
2257                         rte_free(rx_poll);
2258                         rte_free(rx_wrr);
2259                         return ret;
2260                 }
2261
2262                 rte_service_component_runstate_set(rx_adapter->service_id,
2263                                 rxa_sw_adapter_queue_count(rx_adapter));
2264         }
2265
2266         return ret;
2267 }
2268
2269 int
2270 rte_event_eth_rx_adapter_start(uint8_t id)
2271 {
2272         return rxa_ctrl(id, 1);
2273 }
2274
2275 int
2276 rte_event_eth_rx_adapter_stop(uint8_t id)
2277 {
2278         return rxa_ctrl(id, 0);
2279 }
2280
2281 int
2282 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2283                                struct rte_event_eth_rx_adapter_stats *stats)
2284 {
2285         struct rte_event_eth_rx_adapter *rx_adapter;
2286         struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2287         struct rte_event_eth_rx_adapter_stats dev_stats;
2288         struct rte_eventdev *dev;
2289         struct eth_device_info *dev_info;
2290         uint32_t i;
2291         int ret;
2292
2293         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2294
2295         rx_adapter = rxa_id_to_adapter(id);
2296         if (rx_adapter  == NULL || stats == NULL)
2297                 return -EINVAL;
2298
2299         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2300         memset(stats, 0, sizeof(*stats));
2301         RTE_ETH_FOREACH_DEV(i) {
2302                 dev_info = &rx_adapter->eth_devices[i];
2303                 if (dev_info->internal_event_port == 0 ||
2304                         dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2305                         continue;
2306                 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2307                                                 &rte_eth_devices[i],
2308                                                 &dev_stats);
2309                 if (ret)
2310                         continue;
2311                 dev_stats_sum.rx_packets += dev_stats.rx_packets;
2312                 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2313         }
2314
2315         if (rx_adapter->service_inited)
2316                 *stats = rx_adapter->stats;
2317
2318         stats->rx_packets += dev_stats_sum.rx_packets;
2319         stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2320         return 0;
2321 }
2322
2323 int
2324 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
2325 {
2326         struct rte_event_eth_rx_adapter *rx_adapter;
2327         struct rte_eventdev *dev;
2328         struct eth_device_info *dev_info;
2329         uint32_t i;
2330
2331         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2332
2333         rx_adapter = rxa_id_to_adapter(id);
2334         if (rx_adapter == NULL)
2335                 return -EINVAL;
2336
2337         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2338         RTE_ETH_FOREACH_DEV(i) {
2339                 dev_info = &rx_adapter->eth_devices[i];
2340                 if (dev_info->internal_event_port == 0 ||
2341                         dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
2342                         continue;
2343                 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
2344                                                         &rte_eth_devices[i]);
2345         }
2346
2347         memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
2348         return 0;
2349 }
2350
2351 int
2352 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
2353 {
2354         struct rte_event_eth_rx_adapter *rx_adapter;
2355
2356         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2357
2358         rx_adapter = rxa_id_to_adapter(id);
2359         if (rx_adapter == NULL || service_id == NULL)
2360                 return -EINVAL;
2361
2362         if (rx_adapter->service_inited)
2363                 *service_id = rx_adapter->service_id;
2364
2365         return rx_adapter->service_inited ? 0 : -ESRCH;
2366 }
2367
2368 int
2369 rte_event_eth_rx_adapter_cb_register(uint8_t id,
2370                                         uint16_t eth_dev_id,
2371                                         rte_event_eth_rx_adapter_cb_fn cb_fn,
2372                                         void *cb_arg)
2373 {
2374         struct rte_event_eth_rx_adapter *rx_adapter;
2375         struct eth_device_info *dev_info;
2376         uint32_t cap;
2377         int ret;
2378
2379         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2380         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2381
2382         rx_adapter = rxa_id_to_adapter(id);
2383         if (rx_adapter == NULL)
2384                 return -EINVAL;
2385
2386         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2387         if (dev_info->rx_queue == NULL)
2388                 return -EINVAL;
2389
2390         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2391                                                 eth_dev_id,
2392                                                 &cap);
2393         if (ret) {
2394                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2395                         "eth port %" PRIu16, id, eth_dev_id);
2396                 return ret;
2397         }
2398
2399         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2400                 RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
2401                                 PRIu16, eth_dev_id);
2402                 return -EINVAL;
2403         }
2404
2405         rte_spinlock_lock(&rx_adapter->rx_lock);
2406         dev_info->cb_fn = cb_fn;
2407         dev_info->cb_arg = cb_arg;
2408         rte_spinlock_unlock(&rx_adapter->rx_lock);
2409
2410         return 0;
2411 }