net/bnxt: ignore VLAN priority mask
[dpdk.git] / lib / librte_eventdev / rte_event_eth_rx_adapter.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #if defined(LINUX)
6 #include <sys/epoll.h>
7 #endif
8 #include <unistd.h>
9
10 #include <rte_cycles.h>
11 #include <rte_common.h>
12 #include <rte_dev.h>
13 #include <rte_errno.h>
14 #include <rte_ethdev.h>
15 #include <rte_log.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
20
21 #include "rte_eventdev.h"
22 #include "rte_eventdev_pmd.h"
23 #include "rte_eventdev_trace.h"
24 #include "rte_event_eth_rx_adapter.h"
25
26 #define BATCH_SIZE              32
27 #define BLOCK_CNT_THRESHOLD     10
28 #define ETH_EVENT_BUFFER_SIZE   (4*BATCH_SIZE)
29
30 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
31 #define ETH_RX_ADAPTER_MEM_NAME_LEN     32
32
33 #define RSS_KEY_SIZE    40
34 /* value written to intr thread pipe to signal thread exit */
35 #define ETH_BRIDGE_INTR_THREAD_EXIT     1
36 /* Sentinel value to detect initialized file handle */
37 #define INIT_FD         -1
38
39 /*
40  * Used to store port and queue ID of interrupting Rx queue
41  */
42 union queue_data {
43         RTE_STD_C11
44         void *ptr;
45         struct {
46                 uint16_t port;
47                 uint16_t queue;
48         };
49 };
50
51 /*
52  * There is an instance of this struct per polled Rx queue added to the
53  * adapter
54  */
55 struct eth_rx_poll_entry {
56         /* Eth port to poll */
57         uint16_t eth_dev_id;
58         /* Eth rx queue to poll */
59         uint16_t eth_rx_qid;
60 };
61
62 /* Instance per adapter */
63 struct rte_eth_event_enqueue_buffer {
64         /* Count of events in this buffer */
65         uint16_t count;
66         /* Array of events in this buffer */
67         struct rte_event events[ETH_EVENT_BUFFER_SIZE];
68 };
69
70 struct rte_event_eth_rx_adapter {
71         /* RSS key */
72         uint8_t rss_key_be[RSS_KEY_SIZE];
73         /* Event device identifier */
74         uint8_t eventdev_id;
75         /* Per ethernet device structure */
76         struct eth_device_info *eth_devices;
77         /* Event port identifier */
78         uint8_t event_port_id;
79         /* Lock to serialize config updates with service function */
80         rte_spinlock_t rx_lock;
81         /* Max mbufs processed in any service function invocation */
82         uint32_t max_nb_rx;
83         /* Receive queues that need to be polled */
84         struct eth_rx_poll_entry *eth_rx_poll;
85         /* Size of the eth_rx_poll array */
86         uint16_t num_rx_polled;
87         /* Weighted round robin schedule */
88         uint32_t *wrr_sched;
89         /* wrr_sched[] size */
90         uint32_t wrr_len;
91         /* Next entry in wrr[] to begin polling */
92         uint32_t wrr_pos;
93         /* Event burst buffer */
94         struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
95         /* Per adapter stats */
96         struct rte_event_eth_rx_adapter_stats stats;
97         /* Block count, counts up to BLOCK_CNT_THRESHOLD */
98         uint16_t enq_block_count;
99         /* Block start ts */
100         uint64_t rx_enq_block_start_ts;
101         /* epoll fd used to wait for Rx interrupts */
102         int epd;
103         /* Num of interrupt driven interrupt queues */
104         uint32_t num_rx_intr;
105         /* Used to send <dev id, queue id> of interrupting Rx queues from
106          * the interrupt thread to the Rx thread
107          */
108         struct rte_ring *intr_ring;
109         /* Rx Queue data (dev id, queue id) for the last non-empty
110          * queue polled
111          */
112         union queue_data qd;
113         /* queue_data is valid */
114         int qd_valid;
115         /* Interrupt ring lock, synchronizes Rx thread
116          * and interrupt thread
117          */
118         rte_spinlock_t intr_ring_lock;
119         /* event array passed to rte_poll_wait */
120         struct rte_epoll_event *epoll_events;
121         /* Count of interrupt vectors in use */
122         uint32_t num_intr_vec;
123         /* Thread blocked on Rx interrupts */
124         pthread_t rx_intr_thread;
125         /* Configuration callback for rte_service configuration */
126         rte_event_eth_rx_adapter_conf_cb conf_cb;
127         /* Configuration callback argument */
128         void *conf_arg;
129         /* Set if  default_cb is being used */
130         int default_cb_arg;
131         /* Service initialization state */
132         uint8_t service_inited;
133         /* Total count of Rx queues in adapter */
134         uint32_t nb_queues;
135         /* Memory allocation name */
136         char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
137         /* Socket identifier cached from eventdev */
138         int socket_id;
139         /* Per adapter EAL service */
140         uint32_t service_id;
141         /* Adapter started flag */
142         uint8_t rxa_started;
143         /* Adapter ID */
144         uint8_t id;
145 } __rte_cache_aligned;
146
147 /* Per eth device */
148 struct eth_device_info {
149         struct rte_eth_dev *dev;
150         struct eth_rx_queue_info *rx_queue;
151         /* Rx callback */
152         rte_event_eth_rx_adapter_cb_fn cb_fn;
153         /* Rx callback argument */
154         void *cb_arg;
155         /* Set if ethdev->eventdev packet transfer uses a
156          * hardware mechanism
157          */
158         uint8_t internal_event_port;
159         /* Set if the adapter is processing rx queues for
160          * this eth device and packet processing has been
161          * started, allows for the code to know if the PMD
162          * rx_adapter_stop callback needs to be invoked
163          */
164         uint8_t dev_rx_started;
165         /* Number of queues added for this device */
166         uint16_t nb_dev_queues;
167         /* Number of poll based queues
168          * If nb_rx_poll > 0, the start callback will
169          * be invoked if not already invoked
170          */
171         uint16_t nb_rx_poll;
172         /* Number of interrupt based queues
173          * If nb_rx_intr > 0, the start callback will
174          * be invoked if not already invoked.
175          */
176         uint16_t nb_rx_intr;
177         /* Number of queues that use the shared interrupt */
178         uint16_t nb_shared_intr;
179         /* sum(wrr(q)) for all queues within the device
180          * useful when deleting all device queues
181          */
182         uint32_t wrr_len;
183         /* Intr based queue index to start polling from, this is used
184          * if the number of shared interrupts is non-zero
185          */
186         uint16_t next_q_idx;
187         /* Intr based queue indices */
188         uint16_t *intr_queue;
189         /* device generates per Rx queue interrupt for queue index
190          * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
191          */
192         int multi_intr_cap;
193         /* shared interrupt enabled */
194         int shared_intr_enabled;
195 };
196
197 /* Per Rx queue */
198 struct eth_rx_queue_info {
199         int queue_enabled;      /* True if added */
200         int intr_enabled;
201         uint16_t wt;            /* Polling weight */
202         uint32_t flow_id_mask;  /* Set to ~0 if app provides flow id else 0 */
203         uint64_t event;
204 };
205
206 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
207
208 static inline int
209 rxa_validate_id(uint8_t id)
210 {
211         return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
212 }
213
214 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
215         if (!rxa_validate_id(id)) { \
216                 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
217                 return retval; \
218         } \
219 } while (0)
220
221 static inline int
222 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
223 {
224         return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
225 }
226
227 /* Greatest common divisor */
228 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
229 {
230         uint16_t r = a % b;
231
232         return r ? rxa_gcd_u16(b, r) : b;
233 }
234
235 /* Returns the next queue in the polling sequence
236  *
237  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
238  */
239 static int
240 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
241          unsigned int n, int *cw,
242          struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
243          uint16_t gcd, int prev)
244 {
245         int i = prev;
246         uint16_t w;
247
248         while (1) {
249                 uint16_t q;
250                 uint16_t d;
251
252                 i = (i + 1) % n;
253                 if (i == 0) {
254                         *cw = *cw - gcd;
255                         if (*cw <= 0)
256                                 *cw = max_wt;
257                 }
258
259                 q = eth_rx_poll[i].eth_rx_qid;
260                 d = eth_rx_poll[i].eth_dev_id;
261                 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
262
263                 if ((int)w >= *cw)
264                         return i;
265         }
266 }
267
268 static inline int
269 rxa_shared_intr(struct eth_device_info *dev_info,
270         int rx_queue_id)
271 {
272         int multi_intr_cap;
273
274         if (dev_info->dev->intr_handle == NULL)
275                 return 0;
276
277         multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
278         return !multi_intr_cap ||
279                 rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
280 }
281
282 static inline int
283 rxa_intr_queue(struct eth_device_info *dev_info,
284         int rx_queue_id)
285 {
286         struct eth_rx_queue_info *queue_info;
287
288         queue_info = &dev_info->rx_queue[rx_queue_id];
289         return dev_info->rx_queue &&
290                 !dev_info->internal_event_port &&
291                 queue_info->queue_enabled && queue_info->wt == 0;
292 }
293
294 static inline int
295 rxa_polled_queue(struct eth_device_info *dev_info,
296         int rx_queue_id)
297 {
298         struct eth_rx_queue_info *queue_info;
299
300         queue_info = &dev_info->rx_queue[rx_queue_id];
301         return !dev_info->internal_event_port &&
302                 dev_info->rx_queue &&
303                 queue_info->queue_enabled && queue_info->wt != 0;
304 }
305
306 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
307 static int
308 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
309 {
310         uint16_t i;
311         int n, s;
312         uint16_t nbq;
313
314         nbq = dev_info->dev->data->nb_rx_queues;
315         n = 0; /* non shared count */
316         s = 0; /* shared count */
317
318         if (rx_queue_id == -1) {
319                 for (i = 0; i < nbq; i++) {
320                         if (!rxa_shared_intr(dev_info, i))
321                                 n += add ? !rxa_intr_queue(dev_info, i) :
322                                         rxa_intr_queue(dev_info, i);
323                         else
324                                 s += add ? !rxa_intr_queue(dev_info, i) :
325                                         rxa_intr_queue(dev_info, i);
326                 }
327
328                 if (s > 0) {
329                         if ((add && dev_info->nb_shared_intr == 0) ||
330                                 (!add && dev_info->nb_shared_intr))
331                                 n += 1;
332                 }
333         } else {
334                 if (!rxa_shared_intr(dev_info, rx_queue_id))
335                         n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
336                                 rxa_intr_queue(dev_info, rx_queue_id);
337                 else
338                         n = add ? !dev_info->nb_shared_intr :
339                                 dev_info->nb_shared_intr == 1;
340         }
341
342         return add ? n : -n;
343 }
344
345 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
346  */
347 static void
348 rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
349                         struct eth_device_info *dev_info,
350                         int rx_queue_id,
351                         uint32_t *nb_rx_intr)
352 {
353         uint32_t intr_diff;
354
355         if (rx_queue_id == -1)
356                 intr_diff = dev_info->nb_rx_intr;
357         else
358                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
359
360         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
361 }
362
363 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
364  * interrupt queues could currently be poll mode Rx queues
365  */
366 static void
367 rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
368                         struct eth_device_info *dev_info,
369                         int rx_queue_id,
370                         uint32_t *nb_rx_poll,
371                         uint32_t *nb_rx_intr,
372                         uint32_t *nb_wrr)
373 {
374         uint32_t intr_diff;
375         uint32_t poll_diff;
376         uint32_t wrr_len_diff;
377
378         if (rx_queue_id == -1) {
379                 intr_diff = dev_info->dev->data->nb_rx_queues -
380                                                 dev_info->nb_rx_intr;
381                 poll_diff = dev_info->nb_rx_poll;
382                 wrr_len_diff = dev_info->wrr_len;
383         } else {
384                 intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
385                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
386                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
387                                         0;
388         }
389
390         *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
391         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
392         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
393 }
394
395 /* Calculate size of the eth_rx_poll and wrr_sched arrays
396  * after deleting poll mode rx queues
397  */
398 static void
399 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
400                         struct eth_device_info *dev_info,
401                         int rx_queue_id,
402                         uint32_t *nb_rx_poll,
403                         uint32_t *nb_wrr)
404 {
405         uint32_t poll_diff;
406         uint32_t wrr_len_diff;
407
408         if (rx_queue_id == -1) {
409                 poll_diff = dev_info->nb_rx_poll;
410                 wrr_len_diff = dev_info->wrr_len;
411         } else {
412                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
413                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
414                                         0;
415         }
416
417         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
418         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
419 }
420
421 /* Calculate nb_rx_* after adding poll mode rx queues
422  */
423 static void
424 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
425                         struct eth_device_info *dev_info,
426                         int rx_queue_id,
427                         uint16_t wt,
428                         uint32_t *nb_rx_poll,
429                         uint32_t *nb_rx_intr,
430                         uint32_t *nb_wrr)
431 {
432         uint32_t intr_diff;
433         uint32_t poll_diff;
434         uint32_t wrr_len_diff;
435
436         if (rx_queue_id == -1) {
437                 intr_diff = dev_info->nb_rx_intr;
438                 poll_diff = dev_info->dev->data->nb_rx_queues -
439                                                 dev_info->nb_rx_poll;
440                 wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
441                                 - dev_info->wrr_len;
442         } else {
443                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
444                 poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
445                 wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
446                                 wt - dev_info->rx_queue[rx_queue_id].wt :
447                                 wt;
448         }
449
450         *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
451         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
452         *nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
453 }
454
455 /* Calculate nb_rx_* after adding rx_queue_id */
456 static void
457 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
458                 struct eth_device_info *dev_info,
459                 int rx_queue_id,
460                 uint16_t wt,
461                 uint32_t *nb_rx_poll,
462                 uint32_t *nb_rx_intr,
463                 uint32_t *nb_wrr)
464 {
465         if (wt != 0)
466                 rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
467                                         wt, nb_rx_poll, nb_rx_intr, nb_wrr);
468         else
469                 rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
470                                         nb_rx_poll, nb_rx_intr, nb_wrr);
471 }
472
473 /* Calculate nb_rx_* after deleting rx_queue_id */
474 static void
475 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
476                 struct eth_device_info *dev_info,
477                 int rx_queue_id,
478                 uint32_t *nb_rx_poll,
479                 uint32_t *nb_rx_intr,
480                 uint32_t *nb_wrr)
481 {
482         rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
483                                 nb_wrr);
484         rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
485                                 nb_rx_intr);
486 }
487
488 /*
489  * Allocate the rx_poll array
490  */
491 static struct eth_rx_poll_entry *
492 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
493         uint32_t num_rx_polled)
494 {
495         size_t len;
496
497         len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
498                                                         RTE_CACHE_LINE_SIZE);
499         return  rte_zmalloc_socket(rx_adapter->mem_name,
500                                 len,
501                                 RTE_CACHE_LINE_SIZE,
502                                 rx_adapter->socket_id);
503 }
504
505 /*
506  * Allocate the WRR array
507  */
508 static uint32_t *
509 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
510 {
511         size_t len;
512
513         len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
514                         RTE_CACHE_LINE_SIZE);
515         return  rte_zmalloc_socket(rx_adapter->mem_name,
516                                 len,
517                                 RTE_CACHE_LINE_SIZE,
518                                 rx_adapter->socket_id);
519 }
520
521 static int
522 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
523                 uint32_t nb_poll,
524                 uint32_t nb_wrr,
525                 struct eth_rx_poll_entry **rx_poll,
526                 uint32_t **wrr_sched)
527 {
528
529         if (nb_poll == 0) {
530                 *rx_poll = NULL;
531                 *wrr_sched = NULL;
532                 return 0;
533         }
534
535         *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
536         if (*rx_poll == NULL) {
537                 *wrr_sched = NULL;
538                 return -ENOMEM;
539         }
540
541         *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
542         if (*wrr_sched == NULL) {
543                 rte_free(*rx_poll);
544                 return -ENOMEM;
545         }
546         return 0;
547 }
548
549 /* Precalculate WRR polling sequence for all queues in rx_adapter */
550 static void
551 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
552                 struct eth_rx_poll_entry *rx_poll,
553                 uint32_t *rx_wrr)
554 {
555         uint16_t d;
556         uint16_t q;
557         unsigned int i;
558         int prev = -1;
559         int cw = -1;
560
561         /* Initialize variables for calculation of wrr schedule */
562         uint16_t max_wrr_pos = 0;
563         unsigned int poll_q = 0;
564         uint16_t max_wt = 0;
565         uint16_t gcd = 0;
566
567         if (rx_poll == NULL)
568                 return;
569
570         /* Generate array of all queues to poll, the size of this
571          * array is poll_q
572          */
573         RTE_ETH_FOREACH_DEV(d) {
574                 uint16_t nb_rx_queues;
575                 struct eth_device_info *dev_info =
576                                 &rx_adapter->eth_devices[d];
577                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
578                 if (dev_info->rx_queue == NULL)
579                         continue;
580                 if (dev_info->internal_event_port)
581                         continue;
582                 dev_info->wrr_len = 0;
583                 for (q = 0; q < nb_rx_queues; q++) {
584                         struct eth_rx_queue_info *queue_info =
585                                 &dev_info->rx_queue[q];
586                         uint16_t wt;
587
588                         if (!rxa_polled_queue(dev_info, q))
589                                 continue;
590                         wt = queue_info->wt;
591                         rx_poll[poll_q].eth_dev_id = d;
592                         rx_poll[poll_q].eth_rx_qid = q;
593                         max_wrr_pos += wt;
594                         dev_info->wrr_len += wt;
595                         max_wt = RTE_MAX(max_wt, wt);
596                         gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
597                         poll_q++;
598                 }
599         }
600
601         /* Generate polling sequence based on weights */
602         prev = -1;
603         cw = -1;
604         for (i = 0; i < max_wrr_pos; i++) {
605                 rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
606                                      rx_poll, max_wt, gcd, prev);
607                 prev = rx_wrr[i];
608         }
609 }
610
611 static inline void
612 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
613         struct rte_ipv6_hdr **ipv6_hdr)
614 {
615         struct rte_ether_hdr *eth_hdr =
616                 rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
617         struct rte_vlan_hdr *vlan_hdr;
618
619         *ipv4_hdr = NULL;
620         *ipv6_hdr = NULL;
621
622         switch (eth_hdr->ether_type) {
623         case RTE_BE16(RTE_ETHER_TYPE_IPV4):
624                 *ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
625                 break;
626
627         case RTE_BE16(RTE_ETHER_TYPE_IPV6):
628                 *ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
629                 break;
630
631         case RTE_BE16(RTE_ETHER_TYPE_VLAN):
632                 vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
633                 switch (vlan_hdr->eth_proto) {
634                 case RTE_BE16(RTE_ETHER_TYPE_IPV4):
635                         *ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
636                         break;
637                 case RTE_BE16(RTE_ETHER_TYPE_IPV6):
638                         *ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
639                         break;
640                 default:
641                         break;
642                 }
643                 break;
644
645         default:
646                 break;
647         }
648 }
649
650 /* Calculate RSS hash for IPv4/6 */
651 static inline uint32_t
652 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
653 {
654         uint32_t input_len;
655         void *tuple;
656         struct rte_ipv4_tuple ipv4_tuple;
657         struct rte_ipv6_tuple ipv6_tuple;
658         struct rte_ipv4_hdr *ipv4_hdr;
659         struct rte_ipv6_hdr *ipv6_hdr;
660
661         rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
662
663         if (ipv4_hdr) {
664                 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
665                 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
666                 tuple = &ipv4_tuple;
667                 input_len = RTE_THASH_V4_L3_LEN;
668         } else if (ipv6_hdr) {
669                 rte_thash_load_v6_addrs(ipv6_hdr,
670                                         (union rte_thash_tuple *)&ipv6_tuple);
671                 tuple = &ipv6_tuple;
672                 input_len = RTE_THASH_V6_L3_LEN;
673         } else
674                 return 0;
675
676         return rte_softrss_be(tuple, input_len, rss_key_be);
677 }
678
679 static inline int
680 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
681 {
682         return !!rx_adapter->enq_block_count;
683 }
684
685 static inline void
686 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
687 {
688         if (rx_adapter->rx_enq_block_start_ts)
689                 return;
690
691         rx_adapter->enq_block_count++;
692         if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
693                 return;
694
695         rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
696 }
697
698 static inline void
699 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
700                     struct rte_event_eth_rx_adapter_stats *stats)
701 {
702         if (unlikely(!stats->rx_enq_start_ts))
703                 stats->rx_enq_start_ts = rte_get_tsc_cycles();
704
705         if (likely(!rxa_enq_blocked(rx_adapter)))
706                 return;
707
708         rx_adapter->enq_block_count = 0;
709         if (rx_adapter->rx_enq_block_start_ts) {
710                 stats->rx_enq_end_ts = rte_get_tsc_cycles();
711                 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
712                     rx_adapter->rx_enq_block_start_ts;
713                 rx_adapter->rx_enq_block_start_ts = 0;
714         }
715 }
716
717 /* Enqueue buffered events to event device */
718 static inline uint16_t
719 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
720 {
721         struct rte_eth_event_enqueue_buffer *buf =
722             &rx_adapter->event_enqueue_buffer;
723         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
724
725         uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
726                                         rx_adapter->event_port_id,
727                                         buf->events,
728                                         buf->count);
729         if (n != buf->count) {
730                 memmove(buf->events,
731                         &buf->events[n],
732                         (buf->count - n) * sizeof(struct rte_event));
733                 stats->rx_enq_retry++;
734         }
735
736         n ? rxa_enq_block_end_ts(rx_adapter, stats) :
737                 rxa_enq_block_start_ts(rx_adapter);
738
739         buf->count -= n;
740         stats->rx_enq_count += n;
741
742         return n;
743 }
744
745 static inline void
746 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
747                 uint16_t eth_dev_id,
748                 uint16_t rx_queue_id,
749                 struct rte_mbuf **mbufs,
750                 uint16_t num)
751 {
752         uint32_t i;
753         struct eth_device_info *dev_info =
754                                         &rx_adapter->eth_devices[eth_dev_id];
755         struct eth_rx_queue_info *eth_rx_queue_info =
756                                         &dev_info->rx_queue[rx_queue_id];
757         struct rte_eth_event_enqueue_buffer *buf =
758                                         &rx_adapter->event_enqueue_buffer;
759         struct rte_event *ev = &buf->events[buf->count];
760         uint64_t event = eth_rx_queue_info->event;
761         uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
762         struct rte_mbuf *m = mbufs[0];
763         uint32_t rss_mask;
764         uint32_t rss;
765         int do_rss;
766         uint64_t ts;
767         uint16_t nb_cb;
768         uint16_t dropped;
769
770         /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
771         rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
772         do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
773
774         if ((m->ol_flags & PKT_RX_TIMESTAMP) == 0) {
775                 ts = rte_get_tsc_cycles();
776                 for (i = 0; i < num; i++) {
777                         m = mbufs[i];
778
779                         m->timestamp = ts;
780                         m->ol_flags |= PKT_RX_TIMESTAMP;
781                 }
782         }
783
784         for (i = 0; i < num; i++) {
785                 m = mbufs[i];
786
787                 rss = do_rss ?
788                         rxa_do_softrss(m, rx_adapter->rss_key_be) :
789                         m->hash.rss;
790                 ev->event = event;
791                 ev->flow_id = (rss & ~flow_id_mask) |
792                                 (ev->flow_id & flow_id_mask);
793                 ev->mbuf = m;
794                 ev++;
795         }
796
797         if (dev_info->cb_fn) {
798
799                 dropped = 0;
800                 nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
801                                         ETH_EVENT_BUFFER_SIZE, buf->count, ev,
802                                         num, dev_info->cb_arg, &dropped);
803                 if (unlikely(nb_cb > num))
804                         RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
805                                 nb_cb, num);
806                 else
807                         num = nb_cb;
808                 if (dropped)
809                         rx_adapter->stats.rx_dropped += dropped;
810         }
811
812         buf->count += num;
813 }
814
815 /* Enqueue packets from  <port, q>  to event buffer */
816 static inline uint32_t
817 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
818         uint16_t port_id,
819         uint16_t queue_id,
820         uint32_t rx_count,
821         uint32_t max_rx,
822         int *rxq_empty)
823 {
824         struct rte_mbuf *mbufs[BATCH_SIZE];
825         struct rte_eth_event_enqueue_buffer *buf =
826                                         &rx_adapter->event_enqueue_buffer;
827         struct rte_event_eth_rx_adapter_stats *stats =
828                                         &rx_adapter->stats;
829         uint16_t n;
830         uint32_t nb_rx = 0;
831
832         if (rxq_empty)
833                 *rxq_empty = 0;
834         /* Don't do a batch dequeue from the rx queue if there isn't
835          * enough space in the enqueue buffer.
836          */
837         while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
838                 if (buf->count >= BATCH_SIZE)
839                         rxa_flush_event_buffer(rx_adapter);
840
841                 stats->rx_poll_count++;
842                 n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
843                 if (unlikely(!n)) {
844                         if (rxq_empty)
845                                 *rxq_empty = 1;
846                         break;
847                 }
848                 rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
849                 nb_rx += n;
850                 if (rx_count + nb_rx > max_rx)
851                         break;
852         }
853
854         if (buf->count > 0)
855                 rxa_flush_event_buffer(rx_adapter);
856
857         return nb_rx;
858 }
859
860 static inline void
861 rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
862                 void *data)
863 {
864         uint16_t port_id;
865         uint16_t queue;
866         int err;
867         union queue_data qd;
868         struct eth_device_info *dev_info;
869         struct eth_rx_queue_info *queue_info;
870         int *intr_enabled;
871
872         qd.ptr = data;
873         port_id = qd.port;
874         queue = qd.queue;
875
876         dev_info = &rx_adapter->eth_devices[port_id];
877         queue_info = &dev_info->rx_queue[queue];
878         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
879         if (rxa_shared_intr(dev_info, queue))
880                 intr_enabled = &dev_info->shared_intr_enabled;
881         else
882                 intr_enabled = &queue_info->intr_enabled;
883
884         if (*intr_enabled) {
885                 *intr_enabled = 0;
886                 err = rte_ring_enqueue(rx_adapter->intr_ring, data);
887                 /* Entry should always be available.
888                  * The ring size equals the maximum number of interrupt
889                  * vectors supported (an interrupt vector is shared in
890                  * case of shared interrupts)
891                  */
892                 if (err)
893                         RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
894                                 " to ring: %s", strerror(-err));
895                 else
896                         rte_eth_dev_rx_intr_disable(port_id, queue);
897         }
898         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
899 }
900
901 static int
902 rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
903                         uint32_t num_intr_vec)
904 {
905         if (rx_adapter->num_intr_vec + num_intr_vec >
906                                 RTE_EVENT_ETH_INTR_RING_SIZE) {
907                 RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
908                 " %d needed %d limit %d", rx_adapter->num_intr_vec,
909                 num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
910                 return -ENOSPC;
911         }
912
913         return 0;
914 }
915
916 /* Delete entries for (dev, queue) from the interrupt ring */
917 static void
918 rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
919                         struct eth_device_info *dev_info,
920                         uint16_t rx_queue_id)
921 {
922         int i, n;
923         union queue_data qd;
924
925         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
926
927         n = rte_ring_count(rx_adapter->intr_ring);
928         for (i = 0; i < n; i++) {
929                 rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
930                 if (!rxa_shared_intr(dev_info, rx_queue_id)) {
931                         if (qd.port == dev_info->dev->data->port_id &&
932                                 qd.queue == rx_queue_id)
933                                 continue;
934                 } else {
935                         if (qd.port == dev_info->dev->data->port_id)
936                                 continue;
937                 }
938                 rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
939         }
940
941         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
942 }
943
944 /* pthread callback handling interrupt mode receive queues
945  * After receiving an Rx interrupt, it enqueues the port id and queue id of the
946  * interrupting queue to the adapter's ring buffer for interrupt events.
947  * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
948  * the adapter service function.
949  */
950 static void *
951 rxa_intr_thread(void *arg)
952 {
953         struct rte_event_eth_rx_adapter *rx_adapter = arg;
954         struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
955         int n, i;
956
957         while (1) {
958                 n = rte_epoll_wait(rx_adapter->epd, epoll_events,
959                                 RTE_EVENT_ETH_INTR_RING_SIZE, -1);
960                 if (unlikely(n < 0))
961                         RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
962                                         n);
963                 for (i = 0; i < n; i++) {
964                         rxa_intr_ring_enqueue(rx_adapter,
965                                         epoll_events[i].epdata.data);
966                 }
967         }
968
969         return NULL;
970 }
971
972 /* Dequeue <port, q> from interrupt ring and enqueue received
973  * mbufs to eventdev
974  */
975 static inline uint32_t
976 rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
977 {
978         uint32_t n;
979         uint32_t nb_rx = 0;
980         int rxq_empty;
981         struct rte_eth_event_enqueue_buffer *buf;
982         rte_spinlock_t *ring_lock;
983         uint8_t max_done = 0;
984
985         if (rx_adapter->num_rx_intr == 0)
986                 return 0;
987
988         if (rte_ring_count(rx_adapter->intr_ring) == 0
989                 && !rx_adapter->qd_valid)
990                 return 0;
991
992         buf = &rx_adapter->event_enqueue_buffer;
993         ring_lock = &rx_adapter->intr_ring_lock;
994
995         if (buf->count >= BATCH_SIZE)
996                 rxa_flush_event_buffer(rx_adapter);
997
998         while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
999                 struct eth_device_info *dev_info;
1000                 uint16_t port;
1001                 uint16_t queue;
1002                 union queue_data qd  = rx_adapter->qd;
1003                 int err;
1004
1005                 if (!rx_adapter->qd_valid) {
1006                         struct eth_rx_queue_info *queue_info;
1007
1008                         rte_spinlock_lock(ring_lock);
1009                         err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1010                         if (err) {
1011                                 rte_spinlock_unlock(ring_lock);
1012                                 break;
1013                         }
1014
1015                         port = qd.port;
1016                         queue = qd.queue;
1017                         rx_adapter->qd = qd;
1018                         rx_adapter->qd_valid = 1;
1019                         dev_info = &rx_adapter->eth_devices[port];
1020                         if (rxa_shared_intr(dev_info, queue))
1021                                 dev_info->shared_intr_enabled = 1;
1022                         else {
1023                                 queue_info = &dev_info->rx_queue[queue];
1024                                 queue_info->intr_enabled = 1;
1025                         }
1026                         rte_eth_dev_rx_intr_enable(port, queue);
1027                         rte_spinlock_unlock(ring_lock);
1028                 } else {
1029                         port = qd.port;
1030                         queue = qd.queue;
1031
1032                         dev_info = &rx_adapter->eth_devices[port];
1033                 }
1034
1035                 if (rxa_shared_intr(dev_info, queue)) {
1036                         uint16_t i;
1037                         uint16_t nb_queues;
1038
1039                         nb_queues = dev_info->dev->data->nb_rx_queues;
1040                         n = 0;
1041                         for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1042                                 uint8_t enq_buffer_full;
1043
1044                                 if (!rxa_intr_queue(dev_info, i))
1045                                         continue;
1046                                 n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1047                                         rx_adapter->max_nb_rx,
1048                                         &rxq_empty);
1049                                 nb_rx += n;
1050
1051                                 enq_buffer_full = !rxq_empty && n == 0;
1052                                 max_done = nb_rx > rx_adapter->max_nb_rx;
1053
1054                                 if (enq_buffer_full || max_done) {
1055                                         dev_info->next_q_idx = i;
1056                                         goto done;
1057                                 }
1058                         }
1059
1060                         rx_adapter->qd_valid = 0;
1061
1062                         /* Reinitialize for next interrupt */
1063                         dev_info->next_q_idx = dev_info->multi_intr_cap ?
1064                                                 RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1065                                                 0;
1066                 } else {
1067                         n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1068                                 rx_adapter->max_nb_rx,
1069                                 &rxq_empty);
1070                         rx_adapter->qd_valid = !rxq_empty;
1071                         nb_rx += n;
1072                         if (nb_rx > rx_adapter->max_nb_rx)
1073                                 break;
1074                 }
1075         }
1076
1077 done:
1078         rx_adapter->stats.rx_intr_packets += nb_rx;
1079         return nb_rx;
1080 }
1081
1082 /*
1083  * Polls receive queues added to the event adapter and enqueues received
1084  * packets to the event device.
1085  *
1086  * The receive code enqueues initially to a temporary buffer, the
1087  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1088  *
1089  * If there isn't space available in the temporary buffer, packets from the
1090  * Rx queue aren't dequeued from the eth device, this back pressures the
1091  * eth device, in virtual device environments this back pressure is relayed to
1092  * the hypervisor's switching layer where adjustments can be made to deal with
1093  * it.
1094  */
1095 static inline uint32_t
1096 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
1097 {
1098         uint32_t num_queue;
1099         uint32_t nb_rx = 0;
1100         struct rte_eth_event_enqueue_buffer *buf;
1101         uint32_t wrr_pos;
1102         uint32_t max_nb_rx;
1103
1104         wrr_pos = rx_adapter->wrr_pos;
1105         max_nb_rx = rx_adapter->max_nb_rx;
1106         buf = &rx_adapter->event_enqueue_buffer;
1107
1108         /* Iterate through a WRR sequence */
1109         for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1110                 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1111                 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1112                 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1113
1114                 /* Don't do a batch dequeue from the rx queue if there isn't
1115                  * enough space in the enqueue buffer.
1116                  */
1117                 if (buf->count >= BATCH_SIZE)
1118                         rxa_flush_event_buffer(rx_adapter);
1119                 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
1120                         rx_adapter->wrr_pos = wrr_pos;
1121                         return nb_rx;
1122                 }
1123
1124                 nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1125                                 NULL);
1126                 if (nb_rx > max_nb_rx) {
1127                         rx_adapter->wrr_pos =
1128                                     (wrr_pos + 1) % rx_adapter->wrr_len;
1129                         break;
1130                 }
1131
1132                 if (++wrr_pos == rx_adapter->wrr_len)
1133                         wrr_pos = 0;
1134         }
1135         return nb_rx;
1136 }
1137
1138 static int
1139 rxa_service_func(void *args)
1140 {
1141         struct rte_event_eth_rx_adapter *rx_adapter = args;
1142         struct rte_event_eth_rx_adapter_stats *stats;
1143
1144         if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1145                 return 0;
1146         if (!rx_adapter->rxa_started) {
1147                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1148                 return 0;
1149         }
1150
1151         stats = &rx_adapter->stats;
1152         stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
1153         stats->rx_packets += rxa_poll(rx_adapter);
1154         rte_spinlock_unlock(&rx_adapter->rx_lock);
1155         return 0;
1156 }
1157
1158 static int
1159 rte_event_eth_rx_adapter_init(void)
1160 {
1161         const char *name = "rte_event_eth_rx_adapter_array";
1162         const struct rte_memzone *mz;
1163         unsigned int sz;
1164
1165         sz = sizeof(*event_eth_rx_adapter) *
1166             RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1167         sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1168
1169         mz = rte_memzone_lookup(name);
1170         if (mz == NULL) {
1171                 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1172                                                  RTE_CACHE_LINE_SIZE);
1173                 if (mz == NULL) {
1174                         RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1175                                         PRId32, rte_errno);
1176                         return -rte_errno;
1177                 }
1178         }
1179
1180         event_eth_rx_adapter = mz->addr;
1181         return 0;
1182 }
1183
1184 static inline struct rte_event_eth_rx_adapter *
1185 rxa_id_to_adapter(uint8_t id)
1186 {
1187         return event_eth_rx_adapter ?
1188                 event_eth_rx_adapter[id] : NULL;
1189 }
1190
1191 static int
1192 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1193                 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1194 {
1195         int ret;
1196         struct rte_eventdev *dev;
1197         struct rte_event_dev_config dev_conf;
1198         int started;
1199         uint8_t port_id;
1200         struct rte_event_port_conf *port_conf = arg;
1201         struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1202
1203         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1204         dev_conf = dev->data->dev_conf;
1205
1206         started = dev->data->dev_started;
1207         if (started)
1208                 rte_event_dev_stop(dev_id);
1209         port_id = dev_conf.nb_event_ports;
1210         dev_conf.nb_event_ports += 1;
1211         ret = rte_event_dev_configure(dev_id, &dev_conf);
1212         if (ret) {
1213                 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1214                                                 dev_id);
1215                 if (started) {
1216                         if (rte_event_dev_start(dev_id))
1217                                 return -EIO;
1218                 }
1219                 return ret;
1220         }
1221
1222         ret = rte_event_port_setup(dev_id, port_id, port_conf);
1223         if (ret) {
1224                 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1225                                         port_id);
1226                 return ret;
1227         }
1228
1229         conf->event_port_id = port_id;
1230         conf->max_nb_rx = 128;
1231         if (started)
1232                 ret = rte_event_dev_start(dev_id);
1233         rx_adapter->default_cb_arg = 1;
1234         return ret;
1235 }
1236
1237 static int
1238 rxa_epoll_create1(void)
1239 {
1240 #if defined(LINUX)
1241         int fd;
1242         fd = epoll_create1(EPOLL_CLOEXEC);
1243         return fd < 0 ? -errno : fd;
1244 #elif defined(BSD)
1245         return -ENOTSUP;
1246 #endif
1247 }
1248
1249 static int
1250 rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
1251 {
1252         if (rx_adapter->epd != INIT_FD)
1253                 return 0;
1254
1255         rx_adapter->epd = rxa_epoll_create1();
1256         if (rx_adapter->epd < 0) {
1257                 int err = rx_adapter->epd;
1258                 rx_adapter->epd = INIT_FD;
1259                 RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1260                 return err;
1261         }
1262
1263         return 0;
1264 }
1265
1266 static int
1267 rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1268 {
1269         int err;
1270         char thread_name[RTE_MAX_THREAD_NAME_LEN];
1271
1272         if (rx_adapter->intr_ring)
1273                 return 0;
1274
1275         rx_adapter->intr_ring = rte_ring_create("intr_ring",
1276                                         RTE_EVENT_ETH_INTR_RING_SIZE,
1277                                         rte_socket_id(), 0);
1278         if (!rx_adapter->intr_ring)
1279                 return -ENOMEM;
1280
1281         rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1282                                         RTE_EVENT_ETH_INTR_RING_SIZE *
1283                                         sizeof(struct rte_epoll_event),
1284                                         RTE_CACHE_LINE_SIZE,
1285                                         rx_adapter->socket_id);
1286         if (!rx_adapter->epoll_events) {
1287                 err = -ENOMEM;
1288                 goto error;
1289         }
1290
1291         rte_spinlock_init(&rx_adapter->intr_ring_lock);
1292
1293         snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1294                         "rx-intr-thread-%d", rx_adapter->id);
1295
1296         err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1297                                 NULL, rxa_intr_thread, rx_adapter);
1298         if (!err) {
1299                 rte_thread_setname(rx_adapter->rx_intr_thread, thread_name);
1300                 return 0;
1301         }
1302
1303         RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1304 error:
1305         rte_ring_free(rx_adapter->intr_ring);
1306         rx_adapter->intr_ring = NULL;
1307         rx_adapter->epoll_events = NULL;
1308         return err;
1309 }
1310
1311 static int
1312 rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1313 {
1314         int err;
1315
1316         err = pthread_cancel(rx_adapter->rx_intr_thread);
1317         if (err)
1318                 RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1319                                 err);
1320
1321         err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1322         if (err)
1323                 RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1324
1325         rte_free(rx_adapter->epoll_events);
1326         rte_ring_free(rx_adapter->intr_ring);
1327         rx_adapter->intr_ring = NULL;
1328         rx_adapter->epoll_events = NULL;
1329         return 0;
1330 }
1331
1332 static int
1333 rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
1334 {
1335         int ret;
1336
1337         if (rx_adapter->num_rx_intr == 0)
1338                 return 0;
1339
1340         ret = rxa_destroy_intr_thread(rx_adapter);
1341         if (ret)
1342                 return ret;
1343
1344         close(rx_adapter->epd);
1345         rx_adapter->epd = INIT_FD;
1346
1347         return ret;
1348 }
1349
1350 static int
1351 rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1352         struct eth_device_info *dev_info,
1353         uint16_t rx_queue_id)
1354 {
1355         int err;
1356         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1357         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1358
1359         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1360         if (err) {
1361                 RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1362                         rx_queue_id);
1363                 return err;
1364         }
1365
1366         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1367                                         rx_adapter->epd,
1368                                         RTE_INTR_EVENT_DEL,
1369                                         0);
1370         if (err)
1371                 RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1372
1373         if (sintr)
1374                 dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1375         else
1376                 dev_info->shared_intr_enabled = 0;
1377         return err;
1378 }
1379
1380 static int
1381 rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1382                 struct eth_device_info *dev_info,
1383                 int rx_queue_id)
1384 {
1385         int err;
1386         int i;
1387         int s;
1388
1389         if (dev_info->nb_rx_intr == 0)
1390                 return 0;
1391
1392         err = 0;
1393         if (rx_queue_id == -1) {
1394                 s = dev_info->nb_shared_intr;
1395                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1396                         int sintr;
1397                         uint16_t q;
1398
1399                         q = dev_info->intr_queue[i];
1400                         sintr = rxa_shared_intr(dev_info, q);
1401                         s -= sintr;
1402
1403                         if (!sintr || s == 0) {
1404
1405                                 err = rxa_disable_intr(rx_adapter, dev_info,
1406                                                 q);
1407                                 if (err)
1408                                         return err;
1409                                 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1410                                                         q);
1411                         }
1412                 }
1413         } else {
1414                 if (!rxa_intr_queue(dev_info, rx_queue_id))
1415                         return 0;
1416                 if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1417                                 dev_info->nb_shared_intr == 1) {
1418                         err = rxa_disable_intr(rx_adapter, dev_info,
1419                                         rx_queue_id);
1420                         if (err)
1421                                 return err;
1422                         rxa_intr_ring_del_entries(rx_adapter, dev_info,
1423                                                 rx_queue_id);
1424                 }
1425
1426                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1427                         if (dev_info->intr_queue[i] == rx_queue_id) {
1428                                 for (; i < dev_info->nb_rx_intr - 1; i++)
1429                                         dev_info->intr_queue[i] =
1430                                                 dev_info->intr_queue[i + 1];
1431                                 break;
1432                         }
1433                 }
1434         }
1435
1436         return err;
1437 }
1438
1439 static int
1440 rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1441         struct eth_device_info *dev_info,
1442         uint16_t rx_queue_id)
1443 {
1444         int err, err1;
1445         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1446         union queue_data qd;
1447         int init_fd;
1448         uint16_t *intr_queue;
1449         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1450
1451         if (rxa_intr_queue(dev_info, rx_queue_id))
1452                 return 0;
1453
1454         intr_queue = dev_info->intr_queue;
1455         if (dev_info->intr_queue == NULL) {
1456                 size_t len =
1457                         dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1458                 dev_info->intr_queue =
1459                         rte_zmalloc_socket(
1460                                 rx_adapter->mem_name,
1461                                 len,
1462                                 0,
1463                                 rx_adapter->socket_id);
1464                 if (dev_info->intr_queue == NULL)
1465                         return -ENOMEM;
1466         }
1467
1468         init_fd = rx_adapter->epd;
1469         err = rxa_init_epd(rx_adapter);
1470         if (err)
1471                 goto err_free_queue;
1472
1473         qd.port = eth_dev_id;
1474         qd.queue = rx_queue_id;
1475
1476         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1477                                         rx_adapter->epd,
1478                                         RTE_INTR_EVENT_ADD,
1479                                         qd.ptr);
1480         if (err) {
1481                 RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1482                         " Rx Queue %u err %d", rx_queue_id, err);
1483                 goto err_del_fd;
1484         }
1485
1486         err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1487         if (err) {
1488                 RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1489                                 " Rx Queue %u err %d", rx_queue_id, err);
1490
1491                 goto err_del_event;
1492         }
1493
1494         err = rxa_create_intr_thread(rx_adapter);
1495         if (!err)  {
1496                 if (sintr)
1497                         dev_info->shared_intr_enabled = 1;
1498                 else
1499                         dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1500                 return 0;
1501         }
1502
1503
1504         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1505         if (err)
1506                 RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1507                                 " Rx Queue %u err %d", rx_queue_id, err);
1508 err_del_event:
1509         err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1510                                         rx_adapter->epd,
1511                                         RTE_INTR_EVENT_DEL,
1512                                         0);
1513         if (err1) {
1514                 RTE_EDEV_LOG_ERR("Could not delete event for"
1515                                 " Rx Queue %u err %d", rx_queue_id, err1);
1516         }
1517 err_del_fd:
1518         if (init_fd == INIT_FD) {
1519                 close(rx_adapter->epd);
1520                 rx_adapter->epd = -1;
1521         }
1522 err_free_queue:
1523         if (intr_queue == NULL)
1524                 rte_free(dev_info->intr_queue);
1525
1526         return err;
1527 }
1528
1529 static int
1530 rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1531         struct eth_device_info *dev_info,
1532         int rx_queue_id)
1533
1534 {
1535         int i, j, err;
1536         int si = -1;
1537         int shared_done = (dev_info->nb_shared_intr > 0);
1538
1539         if (rx_queue_id != -1) {
1540                 if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1541                         return 0;
1542                 return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1543         }
1544
1545         err = 0;
1546         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1547
1548                 if (rxa_shared_intr(dev_info, i) && shared_done)
1549                         continue;
1550
1551                 err = rxa_config_intr(rx_adapter, dev_info, i);
1552
1553                 shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1554                 if (shared_done) {
1555                         si = i;
1556                         dev_info->shared_intr_enabled = 1;
1557                 }
1558                 if (err)
1559                         break;
1560         }
1561
1562         if (err == 0)
1563                 return 0;
1564
1565         shared_done = (dev_info->nb_shared_intr > 0);
1566         for (j = 0; j < i; j++) {
1567                 if (rxa_intr_queue(dev_info, j))
1568                         continue;
1569                 if (rxa_shared_intr(dev_info, j) && si != j)
1570                         continue;
1571                 err = rxa_disable_intr(rx_adapter, dev_info, j);
1572                 if (err)
1573                         break;
1574
1575         }
1576
1577         return err;
1578 }
1579
1580
1581 static int
1582 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
1583 {
1584         int ret;
1585         struct rte_service_spec service;
1586         struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1587
1588         if (rx_adapter->service_inited)
1589                 return 0;
1590
1591         memset(&service, 0, sizeof(service));
1592         snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1593                 "rte_event_eth_rx_adapter_%d", id);
1594         service.socket_id = rx_adapter->socket_id;
1595         service.callback = rxa_service_func;
1596         service.callback_userdata = rx_adapter;
1597         /* Service function handles locking for queue add/del updates */
1598         service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1599         ret = rte_service_component_register(&service, &rx_adapter->service_id);
1600         if (ret) {
1601                 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1602                         service.name, ret);
1603                 return ret;
1604         }
1605
1606         ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1607                 &rx_adapter_conf, rx_adapter->conf_arg);
1608         if (ret) {
1609                 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1610                         ret);
1611                 goto err_done;
1612         }
1613         rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1614         rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1615         rx_adapter->service_inited = 1;
1616         rx_adapter->epd = INIT_FD;
1617         return 0;
1618
1619 err_done:
1620         rte_service_component_unregister(rx_adapter->service_id);
1621         return ret;
1622 }
1623
1624 static void
1625 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1626                 struct eth_device_info *dev_info,
1627                 int32_t rx_queue_id,
1628                 uint8_t add)
1629 {
1630         struct eth_rx_queue_info *queue_info;
1631         int enabled;
1632         uint16_t i;
1633
1634         if (dev_info->rx_queue == NULL)
1635                 return;
1636
1637         if (rx_queue_id == -1) {
1638                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1639                         rxa_update_queue(rx_adapter, dev_info, i, add);
1640         } else {
1641                 queue_info = &dev_info->rx_queue[rx_queue_id];
1642                 enabled = queue_info->queue_enabled;
1643                 if (add) {
1644                         rx_adapter->nb_queues += !enabled;
1645                         dev_info->nb_dev_queues += !enabled;
1646                 } else {
1647                         rx_adapter->nb_queues -= enabled;
1648                         dev_info->nb_dev_queues -= enabled;
1649                 }
1650                 queue_info->queue_enabled = !!add;
1651         }
1652 }
1653
1654 static void
1655 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
1656         struct eth_device_info *dev_info,
1657         int32_t rx_queue_id)
1658 {
1659         int pollq;
1660         int intrq;
1661         int sintrq;
1662
1663
1664         if (rx_adapter->nb_queues == 0)
1665                 return;
1666
1667         if (rx_queue_id == -1) {
1668                 uint16_t nb_rx_queues;
1669                 uint16_t i;
1670
1671                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1672                 for (i = 0; i < nb_rx_queues; i++)
1673                         rxa_sw_del(rx_adapter, dev_info, i);
1674                 return;
1675         }
1676
1677         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1678         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1679         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1680         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1681         rx_adapter->num_rx_polled -= pollq;
1682         dev_info->nb_rx_poll -= pollq;
1683         rx_adapter->num_rx_intr -= intrq;
1684         dev_info->nb_rx_intr -= intrq;
1685         dev_info->nb_shared_intr -= intrq && sintrq;
1686 }
1687
1688 static void
1689 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1690         struct eth_device_info *dev_info,
1691         int32_t rx_queue_id,
1692         const struct rte_event_eth_rx_adapter_queue_conf *conf)
1693 {
1694         struct eth_rx_queue_info *queue_info;
1695         const struct rte_event *ev = &conf->ev;
1696         int pollq;
1697         int intrq;
1698         int sintrq;
1699         struct rte_event *qi_ev;
1700
1701         if (rx_queue_id == -1) {
1702                 uint16_t nb_rx_queues;
1703                 uint16_t i;
1704
1705                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1706                 for (i = 0; i < nb_rx_queues; i++)
1707                         rxa_add_queue(rx_adapter, dev_info, i, conf);
1708                 return;
1709         }
1710
1711         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1712         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1713         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1714
1715         queue_info = &dev_info->rx_queue[rx_queue_id];
1716         queue_info->wt = conf->servicing_weight;
1717
1718         qi_ev = (struct rte_event *)&queue_info->event;
1719         qi_ev->event = ev->event;
1720         qi_ev->op = RTE_EVENT_OP_NEW;
1721         qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
1722         qi_ev->sub_event_type = 0;
1723
1724         if (conf->rx_queue_flags &
1725                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
1726                 queue_info->flow_id_mask = ~0;
1727         } else
1728                 qi_ev->flow_id = 0;
1729
1730         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
1731         if (rxa_polled_queue(dev_info, rx_queue_id)) {
1732                 rx_adapter->num_rx_polled += !pollq;
1733                 dev_info->nb_rx_poll += !pollq;
1734                 rx_adapter->num_rx_intr -= intrq;
1735                 dev_info->nb_rx_intr -= intrq;
1736                 dev_info->nb_shared_intr -= intrq && sintrq;
1737         }
1738
1739         if (rxa_intr_queue(dev_info, rx_queue_id)) {
1740                 rx_adapter->num_rx_polled -= pollq;
1741                 dev_info->nb_rx_poll -= pollq;
1742                 rx_adapter->num_rx_intr += !intrq;
1743                 dev_info->nb_rx_intr += !intrq;
1744                 dev_info->nb_shared_intr += !intrq && sintrq;
1745                 if (dev_info->nb_shared_intr == 1) {
1746                         if (dev_info->multi_intr_cap)
1747                                 dev_info->next_q_idx =
1748                                         RTE_MAX_RXTX_INTR_VEC_ID - 1;
1749                         else
1750                                 dev_info->next_q_idx = 0;
1751                 }
1752         }
1753 }
1754
1755 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
1756                 uint16_t eth_dev_id,
1757                 int rx_queue_id,
1758                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
1759 {
1760         struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
1761         struct rte_event_eth_rx_adapter_queue_conf temp_conf;
1762         int ret;
1763         struct eth_rx_poll_entry *rx_poll;
1764         struct eth_rx_queue_info *rx_queue;
1765         uint32_t *rx_wrr;
1766         uint16_t nb_rx_queues;
1767         uint32_t nb_rx_poll, nb_wrr;
1768         uint32_t nb_rx_intr;
1769         int num_intr_vec;
1770         uint16_t wt;
1771
1772         if (queue_conf->servicing_weight == 0) {
1773                 struct rte_eth_dev_data *data = dev_info->dev->data;
1774
1775                 temp_conf = *queue_conf;
1776                 if (!data->dev_conf.intr_conf.rxq) {
1777                         /* If Rx interrupts are disabled set wt = 1 */
1778                         temp_conf.servicing_weight = 1;
1779                 }
1780                 queue_conf = &temp_conf;
1781         }
1782
1783         nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1784         rx_queue = dev_info->rx_queue;
1785         wt = queue_conf->servicing_weight;
1786
1787         if (dev_info->rx_queue == NULL) {
1788                 dev_info->rx_queue =
1789                     rte_zmalloc_socket(rx_adapter->mem_name,
1790                                        nb_rx_queues *
1791                                        sizeof(struct eth_rx_queue_info), 0,
1792                                        rx_adapter->socket_id);
1793                 if (dev_info->rx_queue == NULL)
1794                         return -ENOMEM;
1795         }
1796         rx_wrr = NULL;
1797         rx_poll = NULL;
1798
1799         rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
1800                         queue_conf->servicing_weight,
1801                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
1802
1803         if (dev_info->dev->intr_handle)
1804                 dev_info->multi_intr_cap =
1805                         rte_intr_cap_multiple(dev_info->dev->intr_handle);
1806
1807         ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
1808                                 &rx_poll, &rx_wrr);
1809         if (ret)
1810                 goto err_free_rxqueue;
1811
1812         if (wt == 0) {
1813                 num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
1814
1815                 ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
1816                 if (ret)
1817                         goto err_free_rxqueue;
1818
1819                 ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
1820                 if (ret)
1821                         goto err_free_rxqueue;
1822         } else {
1823
1824                 num_intr_vec = 0;
1825                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
1826                         num_intr_vec = rxa_nb_intr_vect(dev_info,
1827                                                 rx_queue_id, 0);
1828                         /* interrupt based queues are being converted to
1829                          * poll mode queues, delete the interrupt configuration
1830                          * for those.
1831                          */
1832                         ret = rxa_del_intr_queue(rx_adapter,
1833                                                 dev_info, rx_queue_id);
1834                         if (ret)
1835                                 goto err_free_rxqueue;
1836                 }
1837         }
1838
1839         if (nb_rx_intr == 0) {
1840                 ret = rxa_free_intr_resources(rx_adapter);
1841                 if (ret)
1842                         goto err_free_rxqueue;
1843         }
1844
1845         if (wt == 0) {
1846                 uint16_t i;
1847
1848                 if (rx_queue_id  == -1) {
1849                         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1850                                 dev_info->intr_queue[i] = i;
1851                 } else {
1852                         if (!rxa_intr_queue(dev_info, rx_queue_id))
1853                                 dev_info->intr_queue[nb_rx_intr - 1] =
1854                                         rx_queue_id;
1855                 }
1856         }
1857
1858
1859
1860         rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
1861         rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
1862
1863         rte_free(rx_adapter->eth_rx_poll);
1864         rte_free(rx_adapter->wrr_sched);
1865
1866         rx_adapter->eth_rx_poll = rx_poll;
1867         rx_adapter->wrr_sched = rx_wrr;
1868         rx_adapter->wrr_len = nb_wrr;
1869         rx_adapter->num_intr_vec += num_intr_vec;
1870         return 0;
1871
1872 err_free_rxqueue:
1873         if (rx_queue == NULL) {
1874                 rte_free(dev_info->rx_queue);
1875                 dev_info->rx_queue = NULL;
1876         }
1877
1878         rte_free(rx_poll);
1879         rte_free(rx_wrr);
1880
1881         return 0;
1882 }
1883
1884 static int
1885 rxa_ctrl(uint8_t id, int start)
1886 {
1887         struct rte_event_eth_rx_adapter *rx_adapter;
1888         struct rte_eventdev *dev;
1889         struct eth_device_info *dev_info;
1890         uint32_t i;
1891         int use_service = 0;
1892         int stop = !start;
1893
1894         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1895         rx_adapter = rxa_id_to_adapter(id);
1896         if (rx_adapter == NULL)
1897                 return -EINVAL;
1898
1899         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1900
1901         RTE_ETH_FOREACH_DEV(i) {
1902                 dev_info = &rx_adapter->eth_devices[i];
1903                 /* if start  check for num dev queues */
1904                 if (start && !dev_info->nb_dev_queues)
1905                         continue;
1906                 /* if stop check if dev has been started */
1907                 if (stop && !dev_info->dev_rx_started)
1908                         continue;
1909                 use_service |= !dev_info->internal_event_port;
1910                 dev_info->dev_rx_started = start;
1911                 if (dev_info->internal_event_port == 0)
1912                         continue;
1913                 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
1914                                                 &rte_eth_devices[i]) :
1915                         (*dev->dev_ops->eth_rx_adapter_stop)(dev,
1916                                                 &rte_eth_devices[i]);
1917         }
1918
1919         if (use_service) {
1920                 rte_spinlock_lock(&rx_adapter->rx_lock);
1921                 rx_adapter->rxa_started = start;
1922                 rte_service_runstate_set(rx_adapter->service_id, start);
1923                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1924         }
1925
1926         return 0;
1927 }
1928
1929 int
1930 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
1931                                 rte_event_eth_rx_adapter_conf_cb conf_cb,
1932                                 void *conf_arg)
1933 {
1934         struct rte_event_eth_rx_adapter *rx_adapter;
1935         int ret;
1936         int socket_id;
1937         uint16_t i;
1938         char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
1939         const uint8_t default_rss_key[] = {
1940                 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
1941                 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
1942                 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
1943                 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
1944                 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
1945         };
1946
1947         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1948         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1949         if (conf_cb == NULL)
1950                 return -EINVAL;
1951
1952         if (event_eth_rx_adapter == NULL) {
1953                 ret = rte_event_eth_rx_adapter_init();
1954                 if (ret)
1955                         return ret;
1956         }
1957
1958         rx_adapter = rxa_id_to_adapter(id);
1959         if (rx_adapter != NULL) {
1960                 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
1961                 return -EEXIST;
1962         }
1963
1964         socket_id = rte_event_dev_socket_id(dev_id);
1965         snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
1966                 "rte_event_eth_rx_adapter_%d",
1967                 id);
1968
1969         rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
1970                         RTE_CACHE_LINE_SIZE, socket_id);
1971         if (rx_adapter == NULL) {
1972                 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
1973                 return -ENOMEM;
1974         }
1975
1976         rx_adapter->eventdev_id = dev_id;
1977         rx_adapter->socket_id = socket_id;
1978         rx_adapter->conf_cb = conf_cb;
1979         rx_adapter->conf_arg = conf_arg;
1980         rx_adapter->id = id;
1981         strcpy(rx_adapter->mem_name, mem_name);
1982         rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
1983                                         RTE_MAX_ETHPORTS *
1984                                         sizeof(struct eth_device_info), 0,
1985                                         socket_id);
1986         rte_convert_rss_key((const uint32_t *)default_rss_key,
1987                         (uint32_t *)rx_adapter->rss_key_be,
1988                             RTE_DIM(default_rss_key));
1989
1990         if (rx_adapter->eth_devices == NULL) {
1991                 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
1992                 rte_free(rx_adapter);
1993                 return -ENOMEM;
1994         }
1995         rte_spinlock_init(&rx_adapter->rx_lock);
1996         for (i = 0; i < RTE_MAX_ETHPORTS; i++)
1997                 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
1998
1999         event_eth_rx_adapter[id] = rx_adapter;
2000         if (conf_cb == rxa_default_conf_cb)
2001                 rx_adapter->default_cb_arg = 1;
2002         rte_eventdev_trace_eth_rx_adapter_create(id, dev_id, conf_cb,
2003                 conf_arg);
2004         return 0;
2005 }
2006
2007 int
2008 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2009                 struct rte_event_port_conf *port_config)
2010 {
2011         struct rte_event_port_conf *pc;
2012         int ret;
2013
2014         if (port_config == NULL)
2015                 return -EINVAL;
2016         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2017
2018         pc = rte_malloc(NULL, sizeof(*pc), 0);
2019         if (pc == NULL)
2020                 return -ENOMEM;
2021         *pc = *port_config;
2022         ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2023                                         rxa_default_conf_cb,
2024                                         pc);
2025         if (ret)
2026                 rte_free(pc);
2027         return ret;
2028 }
2029
2030 int
2031 rte_event_eth_rx_adapter_free(uint8_t id)
2032 {
2033         struct rte_event_eth_rx_adapter *rx_adapter;
2034
2035         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2036
2037         rx_adapter = rxa_id_to_adapter(id);
2038         if (rx_adapter == NULL)
2039                 return -EINVAL;
2040
2041         if (rx_adapter->nb_queues) {
2042                 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2043                                 rx_adapter->nb_queues);
2044                 return -EBUSY;
2045         }
2046
2047         if (rx_adapter->default_cb_arg)
2048                 rte_free(rx_adapter->conf_arg);
2049         rte_free(rx_adapter->eth_devices);
2050         rte_free(rx_adapter);
2051         event_eth_rx_adapter[id] = NULL;
2052
2053         rte_eventdev_trace_eth_rx_adapter_free(id);
2054         return 0;
2055 }
2056
2057 int
2058 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2059                 uint16_t eth_dev_id,
2060                 int32_t rx_queue_id,
2061                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2062 {
2063         int ret;
2064         uint32_t cap;
2065         struct rte_event_eth_rx_adapter *rx_adapter;
2066         struct rte_eventdev *dev;
2067         struct eth_device_info *dev_info;
2068
2069         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2070         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2071
2072         rx_adapter = rxa_id_to_adapter(id);
2073         if ((rx_adapter == NULL) || (queue_conf == NULL))
2074                 return -EINVAL;
2075
2076         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2077         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2078                                                 eth_dev_id,
2079                                                 &cap);
2080         if (ret) {
2081                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2082                         "eth port %" PRIu16, id, eth_dev_id);
2083                 return ret;
2084         }
2085
2086         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2087                 && (queue_conf->rx_queue_flags &
2088                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2089                 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2090                                 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2091                                 eth_dev_id, id);
2092                 return -EINVAL;
2093         }
2094
2095         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2096                 (rx_queue_id != -1)) {
2097                 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2098                         "event queue, eth port: %" PRIu16 " adapter id: %"
2099                         PRIu8, eth_dev_id, id);
2100                 return -EINVAL;
2101         }
2102
2103         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2104                         rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2105                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2106                          (uint16_t)rx_queue_id);
2107                 return -EINVAL;
2108         }
2109
2110         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2111
2112         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2113                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2114                                         -ENOTSUP);
2115                 if (dev_info->rx_queue == NULL) {
2116                         dev_info->rx_queue =
2117                             rte_zmalloc_socket(rx_adapter->mem_name,
2118                                         dev_info->dev->data->nb_rx_queues *
2119                                         sizeof(struct eth_rx_queue_info), 0,
2120                                         rx_adapter->socket_id);
2121                         if (dev_info->rx_queue == NULL)
2122                                 return -ENOMEM;
2123                 }
2124
2125                 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2126                                 &rte_eth_devices[eth_dev_id],
2127                                 rx_queue_id, queue_conf);
2128                 if (ret == 0) {
2129                         dev_info->internal_event_port = 1;
2130                         rxa_update_queue(rx_adapter,
2131                                         &rx_adapter->eth_devices[eth_dev_id],
2132                                         rx_queue_id,
2133                                         1);
2134                 }
2135         } else {
2136                 rte_spinlock_lock(&rx_adapter->rx_lock);
2137                 dev_info->internal_event_port = 0;
2138                 ret = rxa_init_service(rx_adapter, id);
2139                 if (ret == 0) {
2140                         uint32_t service_id = rx_adapter->service_id;
2141                         ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2142                                         queue_conf);
2143                         rte_service_component_runstate_set(service_id,
2144                                 rxa_sw_adapter_queue_count(rx_adapter));
2145                 }
2146                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2147         }
2148
2149         rte_eventdev_trace_eth_rx_adapter_queue_add(id, eth_dev_id,
2150                 rx_queue_id, queue_conf, ret);
2151         if (ret)
2152                 return ret;
2153
2154         return 0;
2155 }
2156
2157 int
2158 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2159                                 int32_t rx_queue_id)
2160 {
2161         int ret = 0;
2162         struct rte_eventdev *dev;
2163         struct rte_event_eth_rx_adapter *rx_adapter;
2164         struct eth_device_info *dev_info;
2165         uint32_t cap;
2166         uint32_t nb_rx_poll = 0;
2167         uint32_t nb_wrr = 0;
2168         uint32_t nb_rx_intr;
2169         struct eth_rx_poll_entry *rx_poll = NULL;
2170         uint32_t *rx_wrr = NULL;
2171         int num_intr_vec;
2172
2173         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2174         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2175
2176         rx_adapter = rxa_id_to_adapter(id);
2177         if (rx_adapter == NULL)
2178                 return -EINVAL;
2179
2180         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2181         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2182                                                 eth_dev_id,
2183                                                 &cap);
2184         if (ret)
2185                 return ret;
2186
2187         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2188                 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2189                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2190                          (uint16_t)rx_queue_id);
2191                 return -EINVAL;
2192         }
2193
2194         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2195
2196         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2197                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2198                                  -ENOTSUP);
2199                 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2200                                                 &rte_eth_devices[eth_dev_id],
2201                                                 rx_queue_id);
2202                 if (ret == 0) {
2203                         rxa_update_queue(rx_adapter,
2204                                         &rx_adapter->eth_devices[eth_dev_id],
2205                                         rx_queue_id,
2206                                         0);
2207                         if (dev_info->nb_dev_queues == 0) {
2208                                 rte_free(dev_info->rx_queue);
2209                                 dev_info->rx_queue = NULL;
2210                         }
2211                 }
2212         } else {
2213                 rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2214                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2215
2216                 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2217                         &rx_poll, &rx_wrr);
2218                 if (ret)
2219                         return ret;
2220
2221                 rte_spinlock_lock(&rx_adapter->rx_lock);
2222
2223                 num_intr_vec = 0;
2224                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2225
2226                         num_intr_vec = rxa_nb_intr_vect(dev_info,
2227                                                 rx_queue_id, 0);
2228                         ret = rxa_del_intr_queue(rx_adapter, dev_info,
2229                                         rx_queue_id);
2230                         if (ret)
2231                                 goto unlock_ret;
2232                 }
2233
2234                 if (nb_rx_intr == 0) {
2235                         ret = rxa_free_intr_resources(rx_adapter);
2236                         if (ret)
2237                                 goto unlock_ret;
2238                 }
2239
2240                 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2241                 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2242
2243                 rte_free(rx_adapter->eth_rx_poll);
2244                 rte_free(rx_adapter->wrr_sched);
2245
2246                 if (nb_rx_intr == 0) {
2247                         rte_free(dev_info->intr_queue);
2248                         dev_info->intr_queue = NULL;
2249                 }
2250
2251                 rx_adapter->eth_rx_poll = rx_poll;
2252                 rx_adapter->wrr_sched = rx_wrr;
2253                 rx_adapter->wrr_len = nb_wrr;
2254                 rx_adapter->num_intr_vec += num_intr_vec;
2255
2256                 if (dev_info->nb_dev_queues == 0) {
2257                         rte_free(dev_info->rx_queue);
2258                         dev_info->rx_queue = NULL;
2259                 }
2260 unlock_ret:
2261                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2262                 if (ret) {
2263                         rte_free(rx_poll);
2264                         rte_free(rx_wrr);
2265                         return ret;
2266                 }
2267
2268                 rte_service_component_runstate_set(rx_adapter->service_id,
2269                                 rxa_sw_adapter_queue_count(rx_adapter));
2270         }
2271
2272         rte_eventdev_trace_eth_rx_adapter_queue_del(id, eth_dev_id,
2273                 rx_queue_id, ret);
2274         return ret;
2275 }
2276
2277 int
2278 rte_event_eth_rx_adapter_start(uint8_t id)
2279 {
2280         rte_eventdev_trace_eth_rx_adapter_start(id);
2281         return rxa_ctrl(id, 1);
2282 }
2283
2284 int
2285 rte_event_eth_rx_adapter_stop(uint8_t id)
2286 {
2287         rte_eventdev_trace_eth_rx_adapter_stop(id);
2288         return rxa_ctrl(id, 0);
2289 }
2290
2291 int
2292 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2293                                struct rte_event_eth_rx_adapter_stats *stats)
2294 {
2295         struct rte_event_eth_rx_adapter *rx_adapter;
2296         struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2297         struct rte_event_eth_rx_adapter_stats dev_stats;
2298         struct rte_eventdev *dev;
2299         struct eth_device_info *dev_info;
2300         uint32_t i;
2301         int ret;
2302
2303         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2304
2305         rx_adapter = rxa_id_to_adapter(id);
2306         if (rx_adapter  == NULL || stats == NULL)
2307                 return -EINVAL;
2308
2309         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2310         memset(stats, 0, sizeof(*stats));
2311         RTE_ETH_FOREACH_DEV(i) {
2312                 dev_info = &rx_adapter->eth_devices[i];
2313                 if (dev_info->internal_event_port == 0 ||
2314                         dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2315                         continue;
2316                 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2317                                                 &rte_eth_devices[i],
2318                                                 &dev_stats);
2319                 if (ret)
2320                         continue;
2321                 dev_stats_sum.rx_packets += dev_stats.rx_packets;
2322                 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2323         }
2324
2325         if (rx_adapter->service_inited)
2326                 *stats = rx_adapter->stats;
2327
2328         stats->rx_packets += dev_stats_sum.rx_packets;
2329         stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2330         return 0;
2331 }
2332
2333 int
2334 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
2335 {
2336         struct rte_event_eth_rx_adapter *rx_adapter;
2337         struct rte_eventdev *dev;
2338         struct eth_device_info *dev_info;
2339         uint32_t i;
2340
2341         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2342
2343         rx_adapter = rxa_id_to_adapter(id);
2344         if (rx_adapter == NULL)
2345                 return -EINVAL;
2346
2347         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2348         RTE_ETH_FOREACH_DEV(i) {
2349                 dev_info = &rx_adapter->eth_devices[i];
2350                 if (dev_info->internal_event_port == 0 ||
2351                         dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
2352                         continue;
2353                 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
2354                                                         &rte_eth_devices[i]);
2355         }
2356
2357         memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
2358         return 0;
2359 }
2360
2361 int
2362 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
2363 {
2364         struct rte_event_eth_rx_adapter *rx_adapter;
2365
2366         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2367
2368         rx_adapter = rxa_id_to_adapter(id);
2369         if (rx_adapter == NULL || service_id == NULL)
2370                 return -EINVAL;
2371
2372         if (rx_adapter->service_inited)
2373                 *service_id = rx_adapter->service_id;
2374
2375         return rx_adapter->service_inited ? 0 : -ESRCH;
2376 }
2377
2378 int
2379 rte_event_eth_rx_adapter_cb_register(uint8_t id,
2380                                         uint16_t eth_dev_id,
2381                                         rte_event_eth_rx_adapter_cb_fn cb_fn,
2382                                         void *cb_arg)
2383 {
2384         struct rte_event_eth_rx_adapter *rx_adapter;
2385         struct eth_device_info *dev_info;
2386         uint32_t cap;
2387         int ret;
2388
2389         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2390         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2391
2392         rx_adapter = rxa_id_to_adapter(id);
2393         if (rx_adapter == NULL)
2394                 return -EINVAL;
2395
2396         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2397         if (dev_info->rx_queue == NULL)
2398                 return -EINVAL;
2399
2400         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2401                                                 eth_dev_id,
2402                                                 &cap);
2403         if (ret) {
2404                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2405                         "eth port %" PRIu16, id, eth_dev_id);
2406                 return ret;
2407         }
2408
2409         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2410                 RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
2411                                 PRIu16, eth_dev_id);
2412                 return -EINVAL;
2413         }
2414
2415         rte_spinlock_lock(&rx_adapter->rx_lock);
2416         dev_info->cb_fn = cb_fn;
2417         dev_info->cb_arg = cb_arg;
2418         rte_spinlock_unlock(&rx_adapter->rx_lock);
2419
2420         return 0;
2421 }