net: add rte prefix to ether defines
[dpdk.git] / lib / librte_eventdev / rte_event_eth_rx_adapter.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #if defined(LINUX)
6 #include <sys/epoll.h>
7 #endif
8 #include <unistd.h>
9
10 #include <rte_cycles.h>
11 #include <rte_common.h>
12 #include <rte_dev.h>
13 #include <rte_errno.h>
14 #include <rte_ethdev.h>
15 #include <rte_log.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
20
21 #include "rte_eventdev.h"
22 #include "rte_eventdev_pmd.h"
23 #include "rte_event_eth_rx_adapter.h"
24
25 #define BATCH_SIZE              32
26 #define BLOCK_CNT_THRESHOLD     10
27 #define ETH_EVENT_BUFFER_SIZE   (4*BATCH_SIZE)
28
29 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
30 #define ETH_RX_ADAPTER_MEM_NAME_LEN     32
31
32 #define RSS_KEY_SIZE    40
33 /* value written to intr thread pipe to signal thread exit */
34 #define ETH_BRIDGE_INTR_THREAD_EXIT     1
35 /* Sentinel value to detect initialized file handle */
36 #define INIT_FD         -1
37
38 /*
39  * Used to store port and queue ID of interrupting Rx queue
40  */
41 union queue_data {
42         RTE_STD_C11
43         void *ptr;
44         struct {
45                 uint16_t port;
46                 uint16_t queue;
47         };
48 };
49
50 /*
51  * There is an instance of this struct per polled Rx queue added to the
52  * adapter
53  */
54 struct eth_rx_poll_entry {
55         /* Eth port to poll */
56         uint16_t eth_dev_id;
57         /* Eth rx queue to poll */
58         uint16_t eth_rx_qid;
59 };
60
61 /* Instance per adapter */
62 struct rte_eth_event_enqueue_buffer {
63         /* Count of events in this buffer */
64         uint16_t count;
65         /* Array of events in this buffer */
66         struct rte_event events[ETH_EVENT_BUFFER_SIZE];
67 };
68
69 struct rte_event_eth_rx_adapter {
70         /* RSS key */
71         uint8_t rss_key_be[RSS_KEY_SIZE];
72         /* Event device identifier */
73         uint8_t eventdev_id;
74         /* Per ethernet device structure */
75         struct eth_device_info *eth_devices;
76         /* Event port identifier */
77         uint8_t event_port_id;
78         /* Lock to serialize config updates with service function */
79         rte_spinlock_t rx_lock;
80         /* Max mbufs processed in any service function invocation */
81         uint32_t max_nb_rx;
82         /* Receive queues that need to be polled */
83         struct eth_rx_poll_entry *eth_rx_poll;
84         /* Size of the eth_rx_poll array */
85         uint16_t num_rx_polled;
86         /* Weighted round robin schedule */
87         uint32_t *wrr_sched;
88         /* wrr_sched[] size */
89         uint32_t wrr_len;
90         /* Next entry in wrr[] to begin polling */
91         uint32_t wrr_pos;
92         /* Event burst buffer */
93         struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
94         /* Per adapter stats */
95         struct rte_event_eth_rx_adapter_stats stats;
96         /* Block count, counts up to BLOCK_CNT_THRESHOLD */
97         uint16_t enq_block_count;
98         /* Block start ts */
99         uint64_t rx_enq_block_start_ts;
100         /* epoll fd used to wait for Rx interrupts */
101         int epd;
102         /* Num of interrupt driven interrupt queues */
103         uint32_t num_rx_intr;
104         /* Used to send <dev id, queue id> of interrupting Rx queues from
105          * the interrupt thread to the Rx thread
106          */
107         struct rte_ring *intr_ring;
108         /* Rx Queue data (dev id, queue id) for the last non-empty
109          * queue polled
110          */
111         union queue_data qd;
112         /* queue_data is valid */
113         int qd_valid;
114         /* Interrupt ring lock, synchronizes Rx thread
115          * and interrupt thread
116          */
117         rte_spinlock_t intr_ring_lock;
118         /* event array passed to rte_poll_wait */
119         struct rte_epoll_event *epoll_events;
120         /* Count of interrupt vectors in use */
121         uint32_t num_intr_vec;
122         /* Thread blocked on Rx interrupts */
123         pthread_t rx_intr_thread;
124         /* Configuration callback for rte_service configuration */
125         rte_event_eth_rx_adapter_conf_cb conf_cb;
126         /* Configuration callback argument */
127         void *conf_arg;
128         /* Set if  default_cb is being used */
129         int default_cb_arg;
130         /* Service initialization state */
131         uint8_t service_inited;
132         /* Total count of Rx queues in adapter */
133         uint32_t nb_queues;
134         /* Memory allocation name */
135         char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
136         /* Socket identifier cached from eventdev */
137         int socket_id;
138         /* Per adapter EAL service */
139         uint32_t service_id;
140         /* Adapter started flag */
141         uint8_t rxa_started;
142         /* Adapter ID */
143         uint8_t id;
144 } __rte_cache_aligned;
145
146 /* Per eth device */
147 struct eth_device_info {
148         struct rte_eth_dev *dev;
149         struct eth_rx_queue_info *rx_queue;
150         /* Rx callback */
151         rte_event_eth_rx_adapter_cb_fn cb_fn;
152         /* Rx callback argument */
153         void *cb_arg;
154         /* Set if ethdev->eventdev packet transfer uses a
155          * hardware mechanism
156          */
157         uint8_t internal_event_port;
158         /* Set if the adapter is processing rx queues for
159          * this eth device and packet processing has been
160          * started, allows for the code to know if the PMD
161          * rx_adapter_stop callback needs to be invoked
162          */
163         uint8_t dev_rx_started;
164         /* Number of queues added for this device */
165         uint16_t nb_dev_queues;
166         /* Number of poll based queues
167          * If nb_rx_poll > 0, the start callback will
168          * be invoked if not already invoked
169          */
170         uint16_t nb_rx_poll;
171         /* Number of interrupt based queues
172          * If nb_rx_intr > 0, the start callback will
173          * be invoked if not already invoked.
174          */
175         uint16_t nb_rx_intr;
176         /* Number of queues that use the shared interrupt */
177         uint16_t nb_shared_intr;
178         /* sum(wrr(q)) for all queues within the device
179          * useful when deleting all device queues
180          */
181         uint32_t wrr_len;
182         /* Intr based queue index to start polling from, this is used
183          * if the number of shared interrupts is non-zero
184          */
185         uint16_t next_q_idx;
186         /* Intr based queue indices */
187         uint16_t *intr_queue;
188         /* device generates per Rx queue interrupt for queue index
189          * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
190          */
191         int multi_intr_cap;
192         /* shared interrupt enabled */
193         int shared_intr_enabled;
194 };
195
196 /* Per Rx queue */
197 struct eth_rx_queue_info {
198         int queue_enabled;      /* True if added */
199         int intr_enabled;
200         uint16_t wt;            /* Polling weight */
201         uint8_t event_queue_id; /* Event queue to enqueue packets to */
202         uint8_t sched_type;     /* Sched type for events */
203         uint8_t priority;       /* Event priority */
204         uint32_t flow_id;       /* App provided flow identifier */
205         uint32_t flow_id_mask;  /* Set to ~0 if app provides flow id else 0 */
206 };
207
208 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
209
210 static inline int
211 rxa_validate_id(uint8_t id)
212 {
213         return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
214 }
215
216 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
217         if (!rxa_validate_id(id)) { \
218                 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
219                 return retval; \
220         } \
221 } while (0)
222
223 static inline int
224 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
225 {
226         return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
227 }
228
229 /* Greatest common divisor */
230 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
231 {
232         uint16_t r = a % b;
233
234         return r ? rxa_gcd_u16(b, r) : b;
235 }
236
237 /* Returns the next queue in the polling sequence
238  *
239  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
240  */
241 static int
242 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
243          unsigned int n, int *cw,
244          struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
245          uint16_t gcd, int prev)
246 {
247         int i = prev;
248         uint16_t w;
249
250         while (1) {
251                 uint16_t q;
252                 uint16_t d;
253
254                 i = (i + 1) % n;
255                 if (i == 0) {
256                         *cw = *cw - gcd;
257                         if (*cw <= 0)
258                                 *cw = max_wt;
259                 }
260
261                 q = eth_rx_poll[i].eth_rx_qid;
262                 d = eth_rx_poll[i].eth_dev_id;
263                 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
264
265                 if ((int)w >= *cw)
266                         return i;
267         }
268 }
269
270 static inline int
271 rxa_shared_intr(struct eth_device_info *dev_info,
272         int rx_queue_id)
273 {
274         int multi_intr_cap;
275
276         if (dev_info->dev->intr_handle == NULL)
277                 return 0;
278
279         multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
280         return !multi_intr_cap ||
281                 rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
282 }
283
284 static inline int
285 rxa_intr_queue(struct eth_device_info *dev_info,
286         int rx_queue_id)
287 {
288         struct eth_rx_queue_info *queue_info;
289
290         queue_info = &dev_info->rx_queue[rx_queue_id];
291         return dev_info->rx_queue &&
292                 !dev_info->internal_event_port &&
293                 queue_info->queue_enabled && queue_info->wt == 0;
294 }
295
296 static inline int
297 rxa_polled_queue(struct eth_device_info *dev_info,
298         int rx_queue_id)
299 {
300         struct eth_rx_queue_info *queue_info;
301
302         queue_info = &dev_info->rx_queue[rx_queue_id];
303         return !dev_info->internal_event_port &&
304                 dev_info->rx_queue &&
305                 queue_info->queue_enabled && queue_info->wt != 0;
306 }
307
308 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
309 static int
310 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
311 {
312         uint16_t i;
313         int n, s;
314         uint16_t nbq;
315
316         nbq = dev_info->dev->data->nb_rx_queues;
317         n = 0; /* non shared count */
318         s = 0; /* shared count */
319
320         if (rx_queue_id == -1) {
321                 for (i = 0; i < nbq; i++) {
322                         if (!rxa_shared_intr(dev_info, i))
323                                 n += add ? !rxa_intr_queue(dev_info, i) :
324                                         rxa_intr_queue(dev_info, i);
325                         else
326                                 s += add ? !rxa_intr_queue(dev_info, i) :
327                                         rxa_intr_queue(dev_info, i);
328                 }
329
330                 if (s > 0) {
331                         if ((add && dev_info->nb_shared_intr == 0) ||
332                                 (!add && dev_info->nb_shared_intr))
333                                 n += 1;
334                 }
335         } else {
336                 if (!rxa_shared_intr(dev_info, rx_queue_id))
337                         n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
338                                 rxa_intr_queue(dev_info, rx_queue_id);
339                 else
340                         n = add ? !dev_info->nb_shared_intr :
341                                 dev_info->nb_shared_intr == 1;
342         }
343
344         return add ? n : -n;
345 }
346
347 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
348  */
349 static void
350 rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
351                         struct eth_device_info *dev_info,
352                         int rx_queue_id,
353                         uint32_t *nb_rx_intr)
354 {
355         uint32_t intr_diff;
356
357         if (rx_queue_id == -1)
358                 intr_diff = dev_info->nb_rx_intr;
359         else
360                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
361
362         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
363 }
364
365 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
366  * interrupt queues could currently be poll mode Rx queues
367  */
368 static void
369 rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
370                         struct eth_device_info *dev_info,
371                         int rx_queue_id,
372                         uint32_t *nb_rx_poll,
373                         uint32_t *nb_rx_intr,
374                         uint32_t *nb_wrr)
375 {
376         uint32_t intr_diff;
377         uint32_t poll_diff;
378         uint32_t wrr_len_diff;
379
380         if (rx_queue_id == -1) {
381                 intr_diff = dev_info->dev->data->nb_rx_queues -
382                                                 dev_info->nb_rx_intr;
383                 poll_diff = dev_info->nb_rx_poll;
384                 wrr_len_diff = dev_info->wrr_len;
385         } else {
386                 intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
387                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
388                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
389                                         0;
390         }
391
392         *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
393         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
394         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
395 }
396
397 /* Calculate size of the eth_rx_poll and wrr_sched arrays
398  * after deleting poll mode rx queues
399  */
400 static void
401 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
402                         struct eth_device_info *dev_info,
403                         int rx_queue_id,
404                         uint32_t *nb_rx_poll,
405                         uint32_t *nb_wrr)
406 {
407         uint32_t poll_diff;
408         uint32_t wrr_len_diff;
409
410         if (rx_queue_id == -1) {
411                 poll_diff = dev_info->nb_rx_poll;
412                 wrr_len_diff = dev_info->wrr_len;
413         } else {
414                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
415                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
416                                         0;
417         }
418
419         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
420         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
421 }
422
423 /* Calculate nb_rx_* after adding poll mode rx queues
424  */
425 static void
426 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
427                         struct eth_device_info *dev_info,
428                         int rx_queue_id,
429                         uint16_t wt,
430                         uint32_t *nb_rx_poll,
431                         uint32_t *nb_rx_intr,
432                         uint32_t *nb_wrr)
433 {
434         uint32_t intr_diff;
435         uint32_t poll_diff;
436         uint32_t wrr_len_diff;
437
438         if (rx_queue_id == -1) {
439                 intr_diff = dev_info->nb_rx_intr;
440                 poll_diff = dev_info->dev->data->nb_rx_queues -
441                                                 dev_info->nb_rx_poll;
442                 wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
443                                 - dev_info->wrr_len;
444         } else {
445                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
446                 poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
447                 wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
448                                 wt - dev_info->rx_queue[rx_queue_id].wt :
449                                 wt;
450         }
451
452         *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
453         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
454         *nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
455 }
456
457 /* Calculate nb_rx_* after adding rx_queue_id */
458 static void
459 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
460                 struct eth_device_info *dev_info,
461                 int rx_queue_id,
462                 uint16_t wt,
463                 uint32_t *nb_rx_poll,
464                 uint32_t *nb_rx_intr,
465                 uint32_t *nb_wrr)
466 {
467         if (wt != 0)
468                 rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
469                                         wt, nb_rx_poll, nb_rx_intr, nb_wrr);
470         else
471                 rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
472                                         nb_rx_poll, nb_rx_intr, nb_wrr);
473 }
474
475 /* Calculate nb_rx_* after deleting rx_queue_id */
476 static void
477 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
478                 struct eth_device_info *dev_info,
479                 int rx_queue_id,
480                 uint32_t *nb_rx_poll,
481                 uint32_t *nb_rx_intr,
482                 uint32_t *nb_wrr)
483 {
484         rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
485                                 nb_wrr);
486         rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
487                                 nb_rx_intr);
488 }
489
490 /*
491  * Allocate the rx_poll array
492  */
493 static struct eth_rx_poll_entry *
494 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
495         uint32_t num_rx_polled)
496 {
497         size_t len;
498
499         len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
500                                                         RTE_CACHE_LINE_SIZE);
501         return  rte_zmalloc_socket(rx_adapter->mem_name,
502                                 len,
503                                 RTE_CACHE_LINE_SIZE,
504                                 rx_adapter->socket_id);
505 }
506
507 /*
508  * Allocate the WRR array
509  */
510 static uint32_t *
511 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
512 {
513         size_t len;
514
515         len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
516                         RTE_CACHE_LINE_SIZE);
517         return  rte_zmalloc_socket(rx_adapter->mem_name,
518                                 len,
519                                 RTE_CACHE_LINE_SIZE,
520                                 rx_adapter->socket_id);
521 }
522
523 static int
524 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
525                 uint32_t nb_poll,
526                 uint32_t nb_wrr,
527                 struct eth_rx_poll_entry **rx_poll,
528                 uint32_t **wrr_sched)
529 {
530
531         if (nb_poll == 0) {
532                 *rx_poll = NULL;
533                 *wrr_sched = NULL;
534                 return 0;
535         }
536
537         *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
538         if (*rx_poll == NULL) {
539                 *wrr_sched = NULL;
540                 return -ENOMEM;
541         }
542
543         *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
544         if (*wrr_sched == NULL) {
545                 rte_free(*rx_poll);
546                 return -ENOMEM;
547         }
548         return 0;
549 }
550
551 /* Precalculate WRR polling sequence for all queues in rx_adapter */
552 static void
553 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
554                 struct eth_rx_poll_entry *rx_poll,
555                 uint32_t *rx_wrr)
556 {
557         uint16_t d;
558         uint16_t q;
559         unsigned int i;
560         int prev = -1;
561         int cw = -1;
562
563         /* Initialize variables for calculation of wrr schedule */
564         uint16_t max_wrr_pos = 0;
565         unsigned int poll_q = 0;
566         uint16_t max_wt = 0;
567         uint16_t gcd = 0;
568
569         if (rx_poll == NULL)
570                 return;
571
572         /* Generate array of all queues to poll, the size of this
573          * array is poll_q
574          */
575         RTE_ETH_FOREACH_DEV(d) {
576                 uint16_t nb_rx_queues;
577                 struct eth_device_info *dev_info =
578                                 &rx_adapter->eth_devices[d];
579                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
580                 if (dev_info->rx_queue == NULL)
581                         continue;
582                 if (dev_info->internal_event_port)
583                         continue;
584                 dev_info->wrr_len = 0;
585                 for (q = 0; q < nb_rx_queues; q++) {
586                         struct eth_rx_queue_info *queue_info =
587                                 &dev_info->rx_queue[q];
588                         uint16_t wt;
589
590                         if (!rxa_polled_queue(dev_info, q))
591                                 continue;
592                         wt = queue_info->wt;
593                         rx_poll[poll_q].eth_dev_id = d;
594                         rx_poll[poll_q].eth_rx_qid = q;
595                         max_wrr_pos += wt;
596                         dev_info->wrr_len += wt;
597                         max_wt = RTE_MAX(max_wt, wt);
598                         gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
599                         poll_q++;
600                 }
601         }
602
603         /* Generate polling sequence based on weights */
604         prev = -1;
605         cw = -1;
606         for (i = 0; i < max_wrr_pos; i++) {
607                 rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
608                                      rx_poll, max_wt, gcd, prev);
609                 prev = rx_wrr[i];
610         }
611 }
612
613 static inline void
614 rxa_mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr,
615         struct ipv6_hdr **ipv6_hdr)
616 {
617         struct rte_ether_hdr *eth_hdr =
618                 rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
619         struct rte_vlan_hdr *vlan_hdr;
620
621         *ipv4_hdr = NULL;
622         *ipv6_hdr = NULL;
623
624         switch (eth_hdr->ether_type) {
625         case RTE_BE16(RTE_ETHER_TYPE_IPv4):
626                 *ipv4_hdr = (struct ipv4_hdr *)(eth_hdr + 1);
627                 break;
628
629         case RTE_BE16(RTE_ETHER_TYPE_IPv6):
630                 *ipv6_hdr = (struct ipv6_hdr *)(eth_hdr + 1);
631                 break;
632
633         case RTE_BE16(RTE_ETHER_TYPE_VLAN):
634                 vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
635                 switch (vlan_hdr->eth_proto) {
636                 case RTE_BE16(RTE_ETHER_TYPE_IPv4):
637                         *ipv4_hdr = (struct ipv4_hdr *)(vlan_hdr + 1);
638                         break;
639                 case RTE_BE16(RTE_ETHER_TYPE_IPv6):
640                         *ipv6_hdr = (struct ipv6_hdr *)(vlan_hdr + 1);
641                         break;
642                 default:
643                         break;
644                 }
645                 break;
646
647         default:
648                 break;
649         }
650 }
651
652 /* Calculate RSS hash for IPv4/6 */
653 static inline uint32_t
654 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
655 {
656         uint32_t input_len;
657         void *tuple;
658         struct rte_ipv4_tuple ipv4_tuple;
659         struct rte_ipv6_tuple ipv6_tuple;
660         struct ipv4_hdr *ipv4_hdr;
661         struct ipv6_hdr *ipv6_hdr;
662
663         rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
664
665         if (ipv4_hdr) {
666                 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
667                 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
668                 tuple = &ipv4_tuple;
669                 input_len = RTE_THASH_V4_L3_LEN;
670         } else if (ipv6_hdr) {
671                 rte_thash_load_v6_addrs(ipv6_hdr,
672                                         (union rte_thash_tuple *)&ipv6_tuple);
673                 tuple = &ipv6_tuple;
674                 input_len = RTE_THASH_V6_L3_LEN;
675         } else
676                 return 0;
677
678         return rte_softrss_be(tuple, input_len, rss_key_be);
679 }
680
681 static inline int
682 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
683 {
684         return !!rx_adapter->enq_block_count;
685 }
686
687 static inline void
688 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
689 {
690         if (rx_adapter->rx_enq_block_start_ts)
691                 return;
692
693         rx_adapter->enq_block_count++;
694         if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
695                 return;
696
697         rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
698 }
699
700 static inline void
701 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
702                     struct rte_event_eth_rx_adapter_stats *stats)
703 {
704         if (unlikely(!stats->rx_enq_start_ts))
705                 stats->rx_enq_start_ts = rte_get_tsc_cycles();
706
707         if (likely(!rxa_enq_blocked(rx_adapter)))
708                 return;
709
710         rx_adapter->enq_block_count = 0;
711         if (rx_adapter->rx_enq_block_start_ts) {
712                 stats->rx_enq_end_ts = rte_get_tsc_cycles();
713                 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
714                     rx_adapter->rx_enq_block_start_ts;
715                 rx_adapter->rx_enq_block_start_ts = 0;
716         }
717 }
718
719 /* Add event to buffer, free space check is done prior to calling
720  * this function
721  */
722 static inline void
723 rxa_buffer_event(struct rte_event_eth_rx_adapter *rx_adapter,
724                 struct rte_event *ev)
725 {
726         struct rte_eth_event_enqueue_buffer *buf =
727             &rx_adapter->event_enqueue_buffer;
728         rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event));
729 }
730
731 /* Enqueue buffered events to event device */
732 static inline uint16_t
733 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
734 {
735         struct rte_eth_event_enqueue_buffer *buf =
736             &rx_adapter->event_enqueue_buffer;
737         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
738
739         uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
740                                         rx_adapter->event_port_id,
741                                         buf->events,
742                                         buf->count);
743         if (n != buf->count) {
744                 memmove(buf->events,
745                         &buf->events[n],
746                         (buf->count - n) * sizeof(struct rte_event));
747                 stats->rx_enq_retry++;
748         }
749
750         n ? rxa_enq_block_end_ts(rx_adapter, stats) :
751                 rxa_enq_block_start_ts(rx_adapter);
752
753         buf->count -= n;
754         stats->rx_enq_count += n;
755
756         return n;
757 }
758
759 static inline void
760 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
761                 uint16_t eth_dev_id,
762                 uint16_t rx_queue_id,
763                 struct rte_mbuf **mbufs,
764                 uint16_t num)
765 {
766         uint32_t i;
767         struct eth_device_info *dev_info =
768                                         &rx_adapter->eth_devices[eth_dev_id];
769         struct eth_rx_queue_info *eth_rx_queue_info =
770                                         &dev_info->rx_queue[rx_queue_id];
771         struct rte_eth_event_enqueue_buffer *buf =
772                                         &rx_adapter->event_enqueue_buffer;
773         int32_t qid = eth_rx_queue_info->event_queue_id;
774         uint8_t sched_type = eth_rx_queue_info->sched_type;
775         uint8_t priority = eth_rx_queue_info->priority;
776         uint32_t flow_id;
777         struct rte_event events[BATCH_SIZE];
778         struct rte_mbuf *m = mbufs[0];
779         uint32_t rss_mask;
780         uint32_t rss;
781         int do_rss;
782         uint64_t ts;
783         struct rte_mbuf *cb_mbufs[BATCH_SIZE];
784         uint16_t nb_cb;
785
786         /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
787         rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
788         do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
789
790         if ((m->ol_flags & PKT_RX_TIMESTAMP) == 0) {
791                 ts = rte_get_tsc_cycles();
792                 for (i = 0; i < num; i++) {
793                         m = mbufs[i];
794
795                         m->timestamp = ts;
796                         m->ol_flags |= PKT_RX_TIMESTAMP;
797                 }
798         }
799
800
801         nb_cb = dev_info->cb_fn ? dev_info->cb_fn(eth_dev_id, rx_queue_id,
802                                                 ETH_EVENT_BUFFER_SIZE,
803                                                 buf->count, mbufs,
804                                                 num,
805                                                 dev_info->cb_arg,
806                                                 cb_mbufs) :
807                                                 num;
808         if (nb_cb < num) {
809                 mbufs = cb_mbufs;
810                 num = nb_cb;
811         }
812
813         for (i = 0; i < num; i++) {
814                 m = mbufs[i];
815                 struct rte_event *ev = &events[i];
816
817                 rss = do_rss ?
818                         rxa_do_softrss(m, rx_adapter->rss_key_be) :
819                         m->hash.rss;
820                 flow_id =
821                     eth_rx_queue_info->flow_id &
822                                 eth_rx_queue_info->flow_id_mask;
823                 flow_id |= rss & ~eth_rx_queue_info->flow_id_mask;
824                 ev->flow_id = flow_id;
825                 ev->op = RTE_EVENT_OP_NEW;
826                 ev->sched_type = sched_type;
827                 ev->queue_id = qid;
828                 ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
829                 ev->sub_event_type = 0;
830                 ev->priority = priority;
831                 ev->mbuf = m;
832
833                 rxa_buffer_event(rx_adapter, ev);
834         }
835 }
836
837 /* Enqueue packets from  <port, q>  to event buffer */
838 static inline uint32_t
839 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
840         uint16_t port_id,
841         uint16_t queue_id,
842         uint32_t rx_count,
843         uint32_t max_rx,
844         int *rxq_empty)
845 {
846         struct rte_mbuf *mbufs[BATCH_SIZE];
847         struct rte_eth_event_enqueue_buffer *buf =
848                                         &rx_adapter->event_enqueue_buffer;
849         struct rte_event_eth_rx_adapter_stats *stats =
850                                         &rx_adapter->stats;
851         uint16_t n;
852         uint32_t nb_rx = 0;
853
854         if (rxq_empty)
855                 *rxq_empty = 0;
856         /* Don't do a batch dequeue from the rx queue if there isn't
857          * enough space in the enqueue buffer.
858          */
859         while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
860                 if (buf->count >= BATCH_SIZE)
861                         rxa_flush_event_buffer(rx_adapter);
862
863                 stats->rx_poll_count++;
864                 n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
865                 if (unlikely(!n)) {
866                         if (rxq_empty)
867                                 *rxq_empty = 1;
868                         break;
869                 }
870                 rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
871                 nb_rx += n;
872                 if (rx_count + nb_rx > max_rx)
873                         break;
874         }
875
876         if (buf->count > 0)
877                 rxa_flush_event_buffer(rx_adapter);
878
879         return nb_rx;
880 }
881
882 static inline void
883 rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
884                 void *data)
885 {
886         uint16_t port_id;
887         uint16_t queue;
888         int err;
889         union queue_data qd;
890         struct eth_device_info *dev_info;
891         struct eth_rx_queue_info *queue_info;
892         int *intr_enabled;
893
894         qd.ptr = data;
895         port_id = qd.port;
896         queue = qd.queue;
897
898         dev_info = &rx_adapter->eth_devices[port_id];
899         queue_info = &dev_info->rx_queue[queue];
900         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
901         if (rxa_shared_intr(dev_info, queue))
902                 intr_enabled = &dev_info->shared_intr_enabled;
903         else
904                 intr_enabled = &queue_info->intr_enabled;
905
906         if (*intr_enabled) {
907                 *intr_enabled = 0;
908                 err = rte_ring_enqueue(rx_adapter->intr_ring, data);
909                 /* Entry should always be available.
910                  * The ring size equals the maximum number of interrupt
911                  * vectors supported (an interrupt vector is shared in
912                  * case of shared interrupts)
913                  */
914                 if (err)
915                         RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
916                                 " to ring: %s", strerror(-err));
917                 else
918                         rte_eth_dev_rx_intr_disable(port_id, queue);
919         }
920         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
921 }
922
923 static int
924 rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
925                         uint32_t num_intr_vec)
926 {
927         if (rx_adapter->num_intr_vec + num_intr_vec >
928                                 RTE_EVENT_ETH_INTR_RING_SIZE) {
929                 RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
930                 " %d needed %d limit %d", rx_adapter->num_intr_vec,
931                 num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
932                 return -ENOSPC;
933         }
934
935         return 0;
936 }
937
938 /* Delete entries for (dev, queue) from the interrupt ring */
939 static void
940 rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
941                         struct eth_device_info *dev_info,
942                         uint16_t rx_queue_id)
943 {
944         int i, n;
945         union queue_data qd;
946
947         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
948
949         n = rte_ring_count(rx_adapter->intr_ring);
950         for (i = 0; i < n; i++) {
951                 rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
952                 if (!rxa_shared_intr(dev_info, rx_queue_id)) {
953                         if (qd.port == dev_info->dev->data->port_id &&
954                                 qd.queue == rx_queue_id)
955                                 continue;
956                 } else {
957                         if (qd.port == dev_info->dev->data->port_id)
958                                 continue;
959                 }
960                 rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
961         }
962
963         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
964 }
965
966 /* pthread callback handling interrupt mode receive queues
967  * After receiving an Rx interrupt, it enqueues the port id and queue id of the
968  * interrupting queue to the adapter's ring buffer for interrupt events.
969  * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
970  * the adapter service function.
971  */
972 static void *
973 rxa_intr_thread(void *arg)
974 {
975         struct rte_event_eth_rx_adapter *rx_adapter = arg;
976         struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
977         int n, i;
978
979         while (1) {
980                 n = rte_epoll_wait(rx_adapter->epd, epoll_events,
981                                 RTE_EVENT_ETH_INTR_RING_SIZE, -1);
982                 if (unlikely(n < 0))
983                         RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
984                                         n);
985                 for (i = 0; i < n; i++) {
986                         rxa_intr_ring_enqueue(rx_adapter,
987                                         epoll_events[i].epdata.data);
988                 }
989         }
990
991         return NULL;
992 }
993
994 /* Dequeue <port, q> from interrupt ring and enqueue received
995  * mbufs to eventdev
996  */
997 static inline uint32_t
998 rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
999 {
1000         uint32_t n;
1001         uint32_t nb_rx = 0;
1002         int rxq_empty;
1003         struct rte_eth_event_enqueue_buffer *buf;
1004         rte_spinlock_t *ring_lock;
1005         uint8_t max_done = 0;
1006
1007         if (rx_adapter->num_rx_intr == 0)
1008                 return 0;
1009
1010         if (rte_ring_count(rx_adapter->intr_ring) == 0
1011                 && !rx_adapter->qd_valid)
1012                 return 0;
1013
1014         buf = &rx_adapter->event_enqueue_buffer;
1015         ring_lock = &rx_adapter->intr_ring_lock;
1016
1017         if (buf->count >= BATCH_SIZE)
1018                 rxa_flush_event_buffer(rx_adapter);
1019
1020         while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
1021                 struct eth_device_info *dev_info;
1022                 uint16_t port;
1023                 uint16_t queue;
1024                 union queue_data qd  = rx_adapter->qd;
1025                 int err;
1026
1027                 if (!rx_adapter->qd_valid) {
1028                         struct eth_rx_queue_info *queue_info;
1029
1030                         rte_spinlock_lock(ring_lock);
1031                         err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1032                         if (err) {
1033                                 rte_spinlock_unlock(ring_lock);
1034                                 break;
1035                         }
1036
1037                         port = qd.port;
1038                         queue = qd.queue;
1039                         rx_adapter->qd = qd;
1040                         rx_adapter->qd_valid = 1;
1041                         dev_info = &rx_adapter->eth_devices[port];
1042                         if (rxa_shared_intr(dev_info, queue))
1043                                 dev_info->shared_intr_enabled = 1;
1044                         else {
1045                                 queue_info = &dev_info->rx_queue[queue];
1046                                 queue_info->intr_enabled = 1;
1047                         }
1048                         rte_eth_dev_rx_intr_enable(port, queue);
1049                         rte_spinlock_unlock(ring_lock);
1050                 } else {
1051                         port = qd.port;
1052                         queue = qd.queue;
1053
1054                         dev_info = &rx_adapter->eth_devices[port];
1055                 }
1056
1057                 if (rxa_shared_intr(dev_info, queue)) {
1058                         uint16_t i;
1059                         uint16_t nb_queues;
1060
1061                         nb_queues = dev_info->dev->data->nb_rx_queues;
1062                         n = 0;
1063                         for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1064                                 uint8_t enq_buffer_full;
1065
1066                                 if (!rxa_intr_queue(dev_info, i))
1067                                         continue;
1068                                 n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1069                                         rx_adapter->max_nb_rx,
1070                                         &rxq_empty);
1071                                 nb_rx += n;
1072
1073                                 enq_buffer_full = !rxq_empty && n == 0;
1074                                 max_done = nb_rx > rx_adapter->max_nb_rx;
1075
1076                                 if (enq_buffer_full || max_done) {
1077                                         dev_info->next_q_idx = i;
1078                                         goto done;
1079                                 }
1080                         }
1081
1082                         rx_adapter->qd_valid = 0;
1083
1084                         /* Reinitialize for next interrupt */
1085                         dev_info->next_q_idx = dev_info->multi_intr_cap ?
1086                                                 RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1087                                                 0;
1088                 } else {
1089                         n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1090                                 rx_adapter->max_nb_rx,
1091                                 &rxq_empty);
1092                         rx_adapter->qd_valid = !rxq_empty;
1093                         nb_rx += n;
1094                         if (nb_rx > rx_adapter->max_nb_rx)
1095                                 break;
1096                 }
1097         }
1098
1099 done:
1100         rx_adapter->stats.rx_intr_packets += nb_rx;
1101         return nb_rx;
1102 }
1103
1104 /*
1105  * Polls receive queues added to the event adapter and enqueues received
1106  * packets to the event device.
1107  *
1108  * The receive code enqueues initially to a temporary buffer, the
1109  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1110  *
1111  * If there isn't space available in the temporary buffer, packets from the
1112  * Rx queue aren't dequeued from the eth device, this back pressures the
1113  * eth device, in virtual device environments this back pressure is relayed to
1114  * the hypervisor's switching layer where adjustments can be made to deal with
1115  * it.
1116  */
1117 static inline uint32_t
1118 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
1119 {
1120         uint32_t num_queue;
1121         uint32_t nb_rx = 0;
1122         struct rte_eth_event_enqueue_buffer *buf;
1123         uint32_t wrr_pos;
1124         uint32_t max_nb_rx;
1125
1126         wrr_pos = rx_adapter->wrr_pos;
1127         max_nb_rx = rx_adapter->max_nb_rx;
1128         buf = &rx_adapter->event_enqueue_buffer;
1129
1130         /* Iterate through a WRR sequence */
1131         for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1132                 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1133                 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1134                 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1135
1136                 /* Don't do a batch dequeue from the rx queue if there isn't
1137                  * enough space in the enqueue buffer.
1138                  */
1139                 if (buf->count >= BATCH_SIZE)
1140                         rxa_flush_event_buffer(rx_adapter);
1141                 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
1142                         rx_adapter->wrr_pos = wrr_pos;
1143                         return nb_rx;
1144                 }
1145
1146                 nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1147                                 NULL);
1148                 if (nb_rx > max_nb_rx) {
1149                         rx_adapter->wrr_pos =
1150                                     (wrr_pos + 1) % rx_adapter->wrr_len;
1151                         break;
1152                 }
1153
1154                 if (++wrr_pos == rx_adapter->wrr_len)
1155                         wrr_pos = 0;
1156         }
1157         return nb_rx;
1158 }
1159
1160 static int
1161 rxa_service_func(void *args)
1162 {
1163         struct rte_event_eth_rx_adapter *rx_adapter = args;
1164         struct rte_event_eth_rx_adapter_stats *stats;
1165
1166         if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1167                 return 0;
1168         if (!rx_adapter->rxa_started) {
1169                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1170                 return 0;
1171         }
1172
1173         stats = &rx_adapter->stats;
1174         stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
1175         stats->rx_packets += rxa_poll(rx_adapter);
1176         rte_spinlock_unlock(&rx_adapter->rx_lock);
1177         return 0;
1178 }
1179
1180 static int
1181 rte_event_eth_rx_adapter_init(void)
1182 {
1183         const char *name = "rte_event_eth_rx_adapter_array";
1184         const struct rte_memzone *mz;
1185         unsigned int sz;
1186
1187         sz = sizeof(*event_eth_rx_adapter) *
1188             RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1189         sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1190
1191         mz = rte_memzone_lookup(name);
1192         if (mz == NULL) {
1193                 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1194                                                  RTE_CACHE_LINE_SIZE);
1195                 if (mz == NULL) {
1196                         RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1197                                         PRId32, rte_errno);
1198                         return -rte_errno;
1199                 }
1200         }
1201
1202         event_eth_rx_adapter = mz->addr;
1203         return 0;
1204 }
1205
1206 static inline struct rte_event_eth_rx_adapter *
1207 rxa_id_to_adapter(uint8_t id)
1208 {
1209         return event_eth_rx_adapter ?
1210                 event_eth_rx_adapter[id] : NULL;
1211 }
1212
1213 static int
1214 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1215                 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1216 {
1217         int ret;
1218         struct rte_eventdev *dev;
1219         struct rte_event_dev_config dev_conf;
1220         int started;
1221         uint8_t port_id;
1222         struct rte_event_port_conf *port_conf = arg;
1223         struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1224
1225         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1226         dev_conf = dev->data->dev_conf;
1227
1228         started = dev->data->dev_started;
1229         if (started)
1230                 rte_event_dev_stop(dev_id);
1231         port_id = dev_conf.nb_event_ports;
1232         dev_conf.nb_event_ports += 1;
1233         ret = rte_event_dev_configure(dev_id, &dev_conf);
1234         if (ret) {
1235                 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1236                                                 dev_id);
1237                 if (started) {
1238                         if (rte_event_dev_start(dev_id))
1239                                 return -EIO;
1240                 }
1241                 return ret;
1242         }
1243
1244         ret = rte_event_port_setup(dev_id, port_id, port_conf);
1245         if (ret) {
1246                 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1247                                         port_id);
1248                 return ret;
1249         }
1250
1251         conf->event_port_id = port_id;
1252         conf->max_nb_rx = 128;
1253         if (started)
1254                 ret = rte_event_dev_start(dev_id);
1255         rx_adapter->default_cb_arg = 1;
1256         return ret;
1257 }
1258
1259 static int
1260 rxa_epoll_create1(void)
1261 {
1262 #if defined(LINUX)
1263         int fd;
1264         fd = epoll_create1(EPOLL_CLOEXEC);
1265         return fd < 0 ? -errno : fd;
1266 #elif defined(BSD)
1267         return -ENOTSUP;
1268 #endif
1269 }
1270
1271 static int
1272 rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
1273 {
1274         if (rx_adapter->epd != INIT_FD)
1275                 return 0;
1276
1277         rx_adapter->epd = rxa_epoll_create1();
1278         if (rx_adapter->epd < 0) {
1279                 int err = rx_adapter->epd;
1280                 rx_adapter->epd = INIT_FD;
1281                 RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1282                 return err;
1283         }
1284
1285         return 0;
1286 }
1287
1288 static int
1289 rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1290 {
1291         int err;
1292         char thread_name[RTE_MAX_THREAD_NAME_LEN];
1293
1294         if (rx_adapter->intr_ring)
1295                 return 0;
1296
1297         rx_adapter->intr_ring = rte_ring_create("intr_ring",
1298                                         RTE_EVENT_ETH_INTR_RING_SIZE,
1299                                         rte_socket_id(), 0);
1300         if (!rx_adapter->intr_ring)
1301                 return -ENOMEM;
1302
1303         rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1304                                         RTE_EVENT_ETH_INTR_RING_SIZE *
1305                                         sizeof(struct rte_epoll_event),
1306                                         RTE_CACHE_LINE_SIZE,
1307                                         rx_adapter->socket_id);
1308         if (!rx_adapter->epoll_events) {
1309                 err = -ENOMEM;
1310                 goto error;
1311         }
1312
1313         rte_spinlock_init(&rx_adapter->intr_ring_lock);
1314
1315         snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1316                         "rx-intr-thread-%d", rx_adapter->id);
1317
1318         err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1319                                 NULL, rxa_intr_thread, rx_adapter);
1320         if (!err) {
1321                 rte_thread_setname(rx_adapter->rx_intr_thread, thread_name);
1322                 return 0;
1323         }
1324
1325         RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1326 error:
1327         rte_ring_free(rx_adapter->intr_ring);
1328         rx_adapter->intr_ring = NULL;
1329         rx_adapter->epoll_events = NULL;
1330         return err;
1331 }
1332
1333 static int
1334 rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1335 {
1336         int err;
1337
1338         err = pthread_cancel(rx_adapter->rx_intr_thread);
1339         if (err)
1340                 RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1341                                 err);
1342
1343         err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1344         if (err)
1345                 RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1346
1347         rte_free(rx_adapter->epoll_events);
1348         rte_ring_free(rx_adapter->intr_ring);
1349         rx_adapter->intr_ring = NULL;
1350         rx_adapter->epoll_events = NULL;
1351         return 0;
1352 }
1353
1354 static int
1355 rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
1356 {
1357         int ret;
1358
1359         if (rx_adapter->num_rx_intr == 0)
1360                 return 0;
1361
1362         ret = rxa_destroy_intr_thread(rx_adapter);
1363         if (ret)
1364                 return ret;
1365
1366         close(rx_adapter->epd);
1367         rx_adapter->epd = INIT_FD;
1368
1369         return ret;
1370 }
1371
1372 static int
1373 rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1374         struct eth_device_info *dev_info,
1375         uint16_t rx_queue_id)
1376 {
1377         int err;
1378         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1379         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1380
1381         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1382         if (err) {
1383                 RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1384                         rx_queue_id);
1385                 return err;
1386         }
1387
1388         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1389                                         rx_adapter->epd,
1390                                         RTE_INTR_EVENT_DEL,
1391                                         0);
1392         if (err)
1393                 RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1394
1395         if (sintr)
1396                 dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1397         else
1398                 dev_info->shared_intr_enabled = 0;
1399         return err;
1400 }
1401
1402 static int
1403 rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1404                 struct eth_device_info *dev_info,
1405                 int rx_queue_id)
1406 {
1407         int err;
1408         int i;
1409         int s;
1410
1411         if (dev_info->nb_rx_intr == 0)
1412                 return 0;
1413
1414         err = 0;
1415         if (rx_queue_id == -1) {
1416                 s = dev_info->nb_shared_intr;
1417                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1418                         int sintr;
1419                         uint16_t q;
1420
1421                         q = dev_info->intr_queue[i];
1422                         sintr = rxa_shared_intr(dev_info, q);
1423                         s -= sintr;
1424
1425                         if (!sintr || s == 0) {
1426
1427                                 err = rxa_disable_intr(rx_adapter, dev_info,
1428                                                 q);
1429                                 if (err)
1430                                         return err;
1431                                 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1432                                                         q);
1433                         }
1434                 }
1435         } else {
1436                 if (!rxa_intr_queue(dev_info, rx_queue_id))
1437                         return 0;
1438                 if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1439                                 dev_info->nb_shared_intr == 1) {
1440                         err = rxa_disable_intr(rx_adapter, dev_info,
1441                                         rx_queue_id);
1442                         if (err)
1443                                 return err;
1444                         rxa_intr_ring_del_entries(rx_adapter, dev_info,
1445                                                 rx_queue_id);
1446                 }
1447
1448                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1449                         if (dev_info->intr_queue[i] == rx_queue_id) {
1450                                 for (; i < dev_info->nb_rx_intr - 1; i++)
1451                                         dev_info->intr_queue[i] =
1452                                                 dev_info->intr_queue[i + 1];
1453                                 break;
1454                         }
1455                 }
1456         }
1457
1458         return err;
1459 }
1460
1461 static int
1462 rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1463         struct eth_device_info *dev_info,
1464         uint16_t rx_queue_id)
1465 {
1466         int err, err1;
1467         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1468         union queue_data qd;
1469         int init_fd;
1470         uint16_t *intr_queue;
1471         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1472
1473         if (rxa_intr_queue(dev_info, rx_queue_id))
1474                 return 0;
1475
1476         intr_queue = dev_info->intr_queue;
1477         if (dev_info->intr_queue == NULL) {
1478                 size_t len =
1479                         dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1480                 dev_info->intr_queue =
1481                         rte_zmalloc_socket(
1482                                 rx_adapter->mem_name,
1483                                 len,
1484                                 0,
1485                                 rx_adapter->socket_id);
1486                 if (dev_info->intr_queue == NULL)
1487                         return -ENOMEM;
1488         }
1489
1490         init_fd = rx_adapter->epd;
1491         err = rxa_init_epd(rx_adapter);
1492         if (err)
1493                 goto err_free_queue;
1494
1495         qd.port = eth_dev_id;
1496         qd.queue = rx_queue_id;
1497
1498         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1499                                         rx_adapter->epd,
1500                                         RTE_INTR_EVENT_ADD,
1501                                         qd.ptr);
1502         if (err) {
1503                 RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1504                         " Rx Queue %u err %d", rx_queue_id, err);
1505                 goto err_del_fd;
1506         }
1507
1508         err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1509         if (err) {
1510                 RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1511                                 " Rx Queue %u err %d", rx_queue_id, err);
1512
1513                 goto err_del_event;
1514         }
1515
1516         err = rxa_create_intr_thread(rx_adapter);
1517         if (!err)  {
1518                 if (sintr)
1519                         dev_info->shared_intr_enabled = 1;
1520                 else
1521                         dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1522                 return 0;
1523         }
1524
1525
1526         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1527         if (err)
1528                 RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1529                                 " Rx Queue %u err %d", rx_queue_id, err);
1530 err_del_event:
1531         err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1532                                         rx_adapter->epd,
1533                                         RTE_INTR_EVENT_DEL,
1534                                         0);
1535         if (err1) {
1536                 RTE_EDEV_LOG_ERR("Could not delete event for"
1537                                 " Rx Queue %u err %d", rx_queue_id, err1);
1538         }
1539 err_del_fd:
1540         if (init_fd == INIT_FD) {
1541                 close(rx_adapter->epd);
1542                 rx_adapter->epd = -1;
1543         }
1544 err_free_queue:
1545         if (intr_queue == NULL)
1546                 rte_free(dev_info->intr_queue);
1547
1548         return err;
1549 }
1550
1551 static int
1552 rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1553         struct eth_device_info *dev_info,
1554         int rx_queue_id)
1555
1556 {
1557         int i, j, err;
1558         int si = -1;
1559         int shared_done = (dev_info->nb_shared_intr > 0);
1560
1561         if (rx_queue_id != -1) {
1562                 if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1563                         return 0;
1564                 return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1565         }
1566
1567         err = 0;
1568         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1569
1570                 if (rxa_shared_intr(dev_info, i) && shared_done)
1571                         continue;
1572
1573                 err = rxa_config_intr(rx_adapter, dev_info, i);
1574
1575                 shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1576                 if (shared_done) {
1577                         si = i;
1578                         dev_info->shared_intr_enabled = 1;
1579                 }
1580                 if (err)
1581                         break;
1582         }
1583
1584         if (err == 0)
1585                 return 0;
1586
1587         shared_done = (dev_info->nb_shared_intr > 0);
1588         for (j = 0; j < i; j++) {
1589                 if (rxa_intr_queue(dev_info, j))
1590                         continue;
1591                 if (rxa_shared_intr(dev_info, j) && si != j)
1592                         continue;
1593                 err = rxa_disable_intr(rx_adapter, dev_info, j);
1594                 if (err)
1595                         break;
1596
1597         }
1598
1599         return err;
1600 }
1601
1602
1603 static int
1604 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
1605 {
1606         int ret;
1607         struct rte_service_spec service;
1608         struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1609
1610         if (rx_adapter->service_inited)
1611                 return 0;
1612
1613         memset(&service, 0, sizeof(service));
1614         snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1615                 "rte_event_eth_rx_adapter_%d", id);
1616         service.socket_id = rx_adapter->socket_id;
1617         service.callback = rxa_service_func;
1618         service.callback_userdata = rx_adapter;
1619         /* Service function handles locking for queue add/del updates */
1620         service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1621         ret = rte_service_component_register(&service, &rx_adapter->service_id);
1622         if (ret) {
1623                 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1624                         service.name, ret);
1625                 return ret;
1626         }
1627
1628         ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1629                 &rx_adapter_conf, rx_adapter->conf_arg);
1630         if (ret) {
1631                 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1632                         ret);
1633                 goto err_done;
1634         }
1635         rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1636         rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1637         rx_adapter->service_inited = 1;
1638         rx_adapter->epd = INIT_FD;
1639         return 0;
1640
1641 err_done:
1642         rte_service_component_unregister(rx_adapter->service_id);
1643         return ret;
1644 }
1645
1646 static void
1647 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1648                 struct eth_device_info *dev_info,
1649                 int32_t rx_queue_id,
1650                 uint8_t add)
1651 {
1652         struct eth_rx_queue_info *queue_info;
1653         int enabled;
1654         uint16_t i;
1655
1656         if (dev_info->rx_queue == NULL)
1657                 return;
1658
1659         if (rx_queue_id == -1) {
1660                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1661                         rxa_update_queue(rx_adapter, dev_info, i, add);
1662         } else {
1663                 queue_info = &dev_info->rx_queue[rx_queue_id];
1664                 enabled = queue_info->queue_enabled;
1665                 if (add) {
1666                         rx_adapter->nb_queues += !enabled;
1667                         dev_info->nb_dev_queues += !enabled;
1668                 } else {
1669                         rx_adapter->nb_queues -= enabled;
1670                         dev_info->nb_dev_queues -= enabled;
1671                 }
1672                 queue_info->queue_enabled = !!add;
1673         }
1674 }
1675
1676 static void
1677 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
1678         struct eth_device_info *dev_info,
1679         int32_t rx_queue_id)
1680 {
1681         int pollq;
1682         int intrq;
1683         int sintrq;
1684
1685
1686         if (rx_adapter->nb_queues == 0)
1687                 return;
1688
1689         if (rx_queue_id == -1) {
1690                 uint16_t nb_rx_queues;
1691                 uint16_t i;
1692
1693                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1694                 for (i = 0; i < nb_rx_queues; i++)
1695                         rxa_sw_del(rx_adapter, dev_info, i);
1696                 return;
1697         }
1698
1699         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1700         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1701         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1702         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1703         rx_adapter->num_rx_polled -= pollq;
1704         dev_info->nb_rx_poll -= pollq;
1705         rx_adapter->num_rx_intr -= intrq;
1706         dev_info->nb_rx_intr -= intrq;
1707         dev_info->nb_shared_intr -= intrq && sintrq;
1708 }
1709
1710 static void
1711 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1712         struct eth_device_info *dev_info,
1713         int32_t rx_queue_id,
1714         const struct rte_event_eth_rx_adapter_queue_conf *conf)
1715 {
1716         struct eth_rx_queue_info *queue_info;
1717         const struct rte_event *ev = &conf->ev;
1718         int pollq;
1719         int intrq;
1720         int sintrq;
1721
1722         if (rx_queue_id == -1) {
1723                 uint16_t nb_rx_queues;
1724                 uint16_t i;
1725
1726                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1727                 for (i = 0; i < nb_rx_queues; i++)
1728                         rxa_add_queue(rx_adapter, dev_info, i, conf);
1729                 return;
1730         }
1731
1732         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1733         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1734         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1735
1736         queue_info = &dev_info->rx_queue[rx_queue_id];
1737         queue_info->event_queue_id = ev->queue_id;
1738         queue_info->sched_type = ev->sched_type;
1739         queue_info->priority = ev->priority;
1740         queue_info->wt = conf->servicing_weight;
1741
1742         if (conf->rx_queue_flags &
1743                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
1744                 queue_info->flow_id = ev->flow_id;
1745                 queue_info->flow_id_mask = ~0;
1746         }
1747
1748         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
1749         if (rxa_polled_queue(dev_info, rx_queue_id)) {
1750                 rx_adapter->num_rx_polled += !pollq;
1751                 dev_info->nb_rx_poll += !pollq;
1752                 rx_adapter->num_rx_intr -= intrq;
1753                 dev_info->nb_rx_intr -= intrq;
1754                 dev_info->nb_shared_intr -= intrq && sintrq;
1755         }
1756
1757         if (rxa_intr_queue(dev_info, rx_queue_id)) {
1758                 rx_adapter->num_rx_polled -= pollq;
1759                 dev_info->nb_rx_poll -= pollq;
1760                 rx_adapter->num_rx_intr += !intrq;
1761                 dev_info->nb_rx_intr += !intrq;
1762                 dev_info->nb_shared_intr += !intrq && sintrq;
1763                 if (dev_info->nb_shared_intr == 1) {
1764                         if (dev_info->multi_intr_cap)
1765                                 dev_info->next_q_idx =
1766                                         RTE_MAX_RXTX_INTR_VEC_ID - 1;
1767                         else
1768                                 dev_info->next_q_idx = 0;
1769                 }
1770         }
1771 }
1772
1773 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
1774                 uint16_t eth_dev_id,
1775                 int rx_queue_id,
1776                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
1777 {
1778         struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
1779         struct rte_event_eth_rx_adapter_queue_conf temp_conf;
1780         int ret;
1781         struct eth_rx_poll_entry *rx_poll;
1782         struct eth_rx_queue_info *rx_queue;
1783         uint32_t *rx_wrr;
1784         uint16_t nb_rx_queues;
1785         uint32_t nb_rx_poll, nb_wrr;
1786         uint32_t nb_rx_intr;
1787         int num_intr_vec;
1788         uint16_t wt;
1789
1790         if (queue_conf->servicing_weight == 0) {
1791                 struct rte_eth_dev_data *data = dev_info->dev->data;
1792
1793                 temp_conf = *queue_conf;
1794                 if (!data->dev_conf.intr_conf.rxq) {
1795                         /* If Rx interrupts are disabled set wt = 1 */
1796                         temp_conf.servicing_weight = 1;
1797                 }
1798                 queue_conf = &temp_conf;
1799         }
1800
1801         nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1802         rx_queue = dev_info->rx_queue;
1803         wt = queue_conf->servicing_weight;
1804
1805         if (dev_info->rx_queue == NULL) {
1806                 dev_info->rx_queue =
1807                     rte_zmalloc_socket(rx_adapter->mem_name,
1808                                        nb_rx_queues *
1809                                        sizeof(struct eth_rx_queue_info), 0,
1810                                        rx_adapter->socket_id);
1811                 if (dev_info->rx_queue == NULL)
1812                         return -ENOMEM;
1813         }
1814         rx_wrr = NULL;
1815         rx_poll = NULL;
1816
1817         rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
1818                         queue_conf->servicing_weight,
1819                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
1820
1821         if (dev_info->dev->intr_handle)
1822                 dev_info->multi_intr_cap =
1823                         rte_intr_cap_multiple(dev_info->dev->intr_handle);
1824
1825         ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
1826                                 &rx_poll, &rx_wrr);
1827         if (ret)
1828                 goto err_free_rxqueue;
1829
1830         if (wt == 0) {
1831                 num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
1832
1833                 ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
1834                 if (ret)
1835                         goto err_free_rxqueue;
1836
1837                 ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
1838                 if (ret)
1839                         goto err_free_rxqueue;
1840         } else {
1841
1842                 num_intr_vec = 0;
1843                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
1844                         num_intr_vec = rxa_nb_intr_vect(dev_info,
1845                                                 rx_queue_id, 0);
1846                         /* interrupt based queues are being converted to
1847                          * poll mode queues, delete the interrupt configuration
1848                          * for those.
1849                          */
1850                         ret = rxa_del_intr_queue(rx_adapter,
1851                                                 dev_info, rx_queue_id);
1852                         if (ret)
1853                                 goto err_free_rxqueue;
1854                 }
1855         }
1856
1857         if (nb_rx_intr == 0) {
1858                 ret = rxa_free_intr_resources(rx_adapter);
1859                 if (ret)
1860                         goto err_free_rxqueue;
1861         }
1862
1863         if (wt == 0) {
1864                 uint16_t i;
1865
1866                 if (rx_queue_id  == -1) {
1867                         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1868                                 dev_info->intr_queue[i] = i;
1869                 } else {
1870                         if (!rxa_intr_queue(dev_info, rx_queue_id))
1871                                 dev_info->intr_queue[nb_rx_intr - 1] =
1872                                         rx_queue_id;
1873                 }
1874         }
1875
1876
1877
1878         rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
1879         rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
1880
1881         rte_free(rx_adapter->eth_rx_poll);
1882         rte_free(rx_adapter->wrr_sched);
1883
1884         rx_adapter->eth_rx_poll = rx_poll;
1885         rx_adapter->wrr_sched = rx_wrr;
1886         rx_adapter->wrr_len = nb_wrr;
1887         rx_adapter->num_intr_vec += num_intr_vec;
1888         return 0;
1889
1890 err_free_rxqueue:
1891         if (rx_queue == NULL) {
1892                 rte_free(dev_info->rx_queue);
1893                 dev_info->rx_queue = NULL;
1894         }
1895
1896         rte_free(rx_poll);
1897         rte_free(rx_wrr);
1898
1899         return 0;
1900 }
1901
1902 static int
1903 rxa_ctrl(uint8_t id, int start)
1904 {
1905         struct rte_event_eth_rx_adapter *rx_adapter;
1906         struct rte_eventdev *dev;
1907         struct eth_device_info *dev_info;
1908         uint32_t i;
1909         int use_service = 0;
1910         int stop = !start;
1911
1912         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1913         rx_adapter = rxa_id_to_adapter(id);
1914         if (rx_adapter == NULL)
1915                 return -EINVAL;
1916
1917         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1918
1919         RTE_ETH_FOREACH_DEV(i) {
1920                 dev_info = &rx_adapter->eth_devices[i];
1921                 /* if start  check for num dev queues */
1922                 if (start && !dev_info->nb_dev_queues)
1923                         continue;
1924                 /* if stop check if dev has been started */
1925                 if (stop && !dev_info->dev_rx_started)
1926                         continue;
1927                 use_service |= !dev_info->internal_event_port;
1928                 dev_info->dev_rx_started = start;
1929                 if (dev_info->internal_event_port == 0)
1930                         continue;
1931                 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
1932                                                 &rte_eth_devices[i]) :
1933                         (*dev->dev_ops->eth_rx_adapter_stop)(dev,
1934                                                 &rte_eth_devices[i]);
1935         }
1936
1937         if (use_service) {
1938                 rte_spinlock_lock(&rx_adapter->rx_lock);
1939                 rx_adapter->rxa_started = start;
1940                 rte_service_runstate_set(rx_adapter->service_id, start);
1941                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1942         }
1943
1944         return 0;
1945 }
1946
1947 int
1948 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
1949                                 rte_event_eth_rx_adapter_conf_cb conf_cb,
1950                                 void *conf_arg)
1951 {
1952         struct rte_event_eth_rx_adapter *rx_adapter;
1953         int ret;
1954         int socket_id;
1955         uint16_t i;
1956         char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
1957         const uint8_t default_rss_key[] = {
1958                 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
1959                 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
1960                 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
1961                 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
1962                 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
1963         };
1964
1965         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1966         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1967         if (conf_cb == NULL)
1968                 return -EINVAL;
1969
1970         if (event_eth_rx_adapter == NULL) {
1971                 ret = rte_event_eth_rx_adapter_init();
1972                 if (ret)
1973                         return ret;
1974         }
1975
1976         rx_adapter = rxa_id_to_adapter(id);
1977         if (rx_adapter != NULL) {
1978                 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
1979                 return -EEXIST;
1980         }
1981
1982         socket_id = rte_event_dev_socket_id(dev_id);
1983         snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
1984                 "rte_event_eth_rx_adapter_%d",
1985                 id);
1986
1987         rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
1988                         RTE_CACHE_LINE_SIZE, socket_id);
1989         if (rx_adapter == NULL) {
1990                 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
1991                 return -ENOMEM;
1992         }
1993
1994         rx_adapter->eventdev_id = dev_id;
1995         rx_adapter->socket_id = socket_id;
1996         rx_adapter->conf_cb = conf_cb;
1997         rx_adapter->conf_arg = conf_arg;
1998         rx_adapter->id = id;
1999         strcpy(rx_adapter->mem_name, mem_name);
2000         rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
2001                                         RTE_MAX_ETHPORTS *
2002                                         sizeof(struct eth_device_info), 0,
2003                                         socket_id);
2004         rte_convert_rss_key((const uint32_t *)default_rss_key,
2005                         (uint32_t *)rx_adapter->rss_key_be,
2006                             RTE_DIM(default_rss_key));
2007
2008         if (rx_adapter->eth_devices == NULL) {
2009                 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
2010                 rte_free(rx_adapter);
2011                 return -ENOMEM;
2012         }
2013         rte_spinlock_init(&rx_adapter->rx_lock);
2014         for (i = 0; i < RTE_MAX_ETHPORTS; i++)
2015                 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
2016
2017         event_eth_rx_adapter[id] = rx_adapter;
2018         if (conf_cb == rxa_default_conf_cb)
2019                 rx_adapter->default_cb_arg = 1;
2020         return 0;
2021 }
2022
2023 int
2024 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2025                 struct rte_event_port_conf *port_config)
2026 {
2027         struct rte_event_port_conf *pc;
2028         int ret;
2029
2030         if (port_config == NULL)
2031                 return -EINVAL;
2032         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2033
2034         pc = rte_malloc(NULL, sizeof(*pc), 0);
2035         if (pc == NULL)
2036                 return -ENOMEM;
2037         *pc = *port_config;
2038         ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2039                                         rxa_default_conf_cb,
2040                                         pc);
2041         if (ret)
2042                 rte_free(pc);
2043         return ret;
2044 }
2045
2046 int
2047 rte_event_eth_rx_adapter_free(uint8_t id)
2048 {
2049         struct rte_event_eth_rx_adapter *rx_adapter;
2050
2051         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2052
2053         rx_adapter = rxa_id_to_adapter(id);
2054         if (rx_adapter == NULL)
2055                 return -EINVAL;
2056
2057         if (rx_adapter->nb_queues) {
2058                 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2059                                 rx_adapter->nb_queues);
2060                 return -EBUSY;
2061         }
2062
2063         if (rx_adapter->default_cb_arg)
2064                 rte_free(rx_adapter->conf_arg);
2065         rte_free(rx_adapter->eth_devices);
2066         rte_free(rx_adapter);
2067         event_eth_rx_adapter[id] = NULL;
2068
2069         return 0;
2070 }
2071
2072 int
2073 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2074                 uint16_t eth_dev_id,
2075                 int32_t rx_queue_id,
2076                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2077 {
2078         int ret;
2079         uint32_t cap;
2080         struct rte_event_eth_rx_adapter *rx_adapter;
2081         struct rte_eventdev *dev;
2082         struct eth_device_info *dev_info;
2083
2084         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2085         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2086
2087         rx_adapter = rxa_id_to_adapter(id);
2088         if ((rx_adapter == NULL) || (queue_conf == NULL))
2089                 return -EINVAL;
2090
2091         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2092         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2093                                                 eth_dev_id,
2094                                                 &cap);
2095         if (ret) {
2096                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2097                         "eth port %" PRIu16, id, eth_dev_id);
2098                 return ret;
2099         }
2100
2101         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2102                 && (queue_conf->rx_queue_flags &
2103                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2104                 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2105                                 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2106                                 eth_dev_id, id);
2107                 return -EINVAL;
2108         }
2109
2110         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2111                 (rx_queue_id != -1)) {
2112                 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2113                         "event queue, eth port: %" PRIu16 " adapter id: %"
2114                         PRIu8, eth_dev_id, id);
2115                 return -EINVAL;
2116         }
2117
2118         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2119                         rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2120                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2121                          (uint16_t)rx_queue_id);
2122                 return -EINVAL;
2123         }
2124
2125         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2126
2127         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2128                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2129                                         -ENOTSUP);
2130                 if (dev_info->rx_queue == NULL) {
2131                         dev_info->rx_queue =
2132                             rte_zmalloc_socket(rx_adapter->mem_name,
2133                                         dev_info->dev->data->nb_rx_queues *
2134                                         sizeof(struct eth_rx_queue_info), 0,
2135                                         rx_adapter->socket_id);
2136                         if (dev_info->rx_queue == NULL)
2137                                 return -ENOMEM;
2138                 }
2139
2140                 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2141                                 &rte_eth_devices[eth_dev_id],
2142                                 rx_queue_id, queue_conf);
2143                 if (ret == 0) {
2144                         dev_info->internal_event_port = 1;
2145                         rxa_update_queue(rx_adapter,
2146                                         &rx_adapter->eth_devices[eth_dev_id],
2147                                         rx_queue_id,
2148                                         1);
2149                 }
2150         } else {
2151                 rte_spinlock_lock(&rx_adapter->rx_lock);
2152                 dev_info->internal_event_port = 0;
2153                 ret = rxa_init_service(rx_adapter, id);
2154                 if (ret == 0) {
2155                         uint32_t service_id = rx_adapter->service_id;
2156                         ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2157                                         queue_conf);
2158                         rte_service_component_runstate_set(service_id,
2159                                 rxa_sw_adapter_queue_count(rx_adapter));
2160                 }
2161                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2162         }
2163
2164         if (ret)
2165                 return ret;
2166
2167         return 0;
2168 }
2169
2170 int
2171 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2172                                 int32_t rx_queue_id)
2173 {
2174         int ret = 0;
2175         struct rte_eventdev *dev;
2176         struct rte_event_eth_rx_adapter *rx_adapter;
2177         struct eth_device_info *dev_info;
2178         uint32_t cap;
2179         uint32_t nb_rx_poll = 0;
2180         uint32_t nb_wrr = 0;
2181         uint32_t nb_rx_intr;
2182         struct eth_rx_poll_entry *rx_poll = NULL;
2183         uint32_t *rx_wrr = NULL;
2184         int num_intr_vec;
2185
2186         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2187         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2188
2189         rx_adapter = rxa_id_to_adapter(id);
2190         if (rx_adapter == NULL)
2191                 return -EINVAL;
2192
2193         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2194         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2195                                                 eth_dev_id,
2196                                                 &cap);
2197         if (ret)
2198                 return ret;
2199
2200         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2201                 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2202                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2203                          (uint16_t)rx_queue_id);
2204                 return -EINVAL;
2205         }
2206
2207         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2208
2209         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2210                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2211                                  -ENOTSUP);
2212                 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2213                                                 &rte_eth_devices[eth_dev_id],
2214                                                 rx_queue_id);
2215                 if (ret == 0) {
2216                         rxa_update_queue(rx_adapter,
2217                                         &rx_adapter->eth_devices[eth_dev_id],
2218                                         rx_queue_id,
2219                                         0);
2220                         if (dev_info->nb_dev_queues == 0) {
2221                                 rte_free(dev_info->rx_queue);
2222                                 dev_info->rx_queue = NULL;
2223                         }
2224                 }
2225         } else {
2226                 rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2227                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2228
2229                 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2230                         &rx_poll, &rx_wrr);
2231                 if (ret)
2232                         return ret;
2233
2234                 rte_spinlock_lock(&rx_adapter->rx_lock);
2235
2236                 num_intr_vec = 0;
2237                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2238
2239                         num_intr_vec = rxa_nb_intr_vect(dev_info,
2240                                                 rx_queue_id, 0);
2241                         ret = rxa_del_intr_queue(rx_adapter, dev_info,
2242                                         rx_queue_id);
2243                         if (ret)
2244                                 goto unlock_ret;
2245                 }
2246
2247                 if (nb_rx_intr == 0) {
2248                         ret = rxa_free_intr_resources(rx_adapter);
2249                         if (ret)
2250                                 goto unlock_ret;
2251                 }
2252
2253                 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2254                 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2255
2256                 rte_free(rx_adapter->eth_rx_poll);
2257                 rte_free(rx_adapter->wrr_sched);
2258
2259                 if (nb_rx_intr == 0) {
2260                         rte_free(dev_info->intr_queue);
2261                         dev_info->intr_queue = NULL;
2262                 }
2263
2264                 rx_adapter->eth_rx_poll = rx_poll;
2265                 rx_adapter->wrr_sched = rx_wrr;
2266                 rx_adapter->wrr_len = nb_wrr;
2267                 rx_adapter->num_intr_vec += num_intr_vec;
2268
2269                 if (dev_info->nb_dev_queues == 0) {
2270                         rte_free(dev_info->rx_queue);
2271                         dev_info->rx_queue = NULL;
2272                 }
2273 unlock_ret:
2274                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2275                 if (ret) {
2276                         rte_free(rx_poll);
2277                         rte_free(rx_wrr);
2278                         return ret;
2279                 }
2280
2281                 rte_service_component_runstate_set(rx_adapter->service_id,
2282                                 rxa_sw_adapter_queue_count(rx_adapter));
2283         }
2284
2285         return ret;
2286 }
2287
2288 int
2289 rte_event_eth_rx_adapter_start(uint8_t id)
2290 {
2291         return rxa_ctrl(id, 1);
2292 }
2293
2294 int
2295 rte_event_eth_rx_adapter_stop(uint8_t id)
2296 {
2297         return rxa_ctrl(id, 0);
2298 }
2299
2300 int __rte_experimental
2301 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2302                                struct rte_event_eth_rx_adapter_stats *stats)
2303 {
2304         struct rte_event_eth_rx_adapter *rx_adapter;
2305         struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2306         struct rte_event_eth_rx_adapter_stats dev_stats;
2307         struct rte_eventdev *dev;
2308         struct eth_device_info *dev_info;
2309         uint32_t i;
2310         int ret;
2311
2312         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2313
2314         rx_adapter = rxa_id_to_adapter(id);
2315         if (rx_adapter  == NULL || stats == NULL)
2316                 return -EINVAL;
2317
2318         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2319         memset(stats, 0, sizeof(*stats));
2320         RTE_ETH_FOREACH_DEV(i) {
2321                 dev_info = &rx_adapter->eth_devices[i];
2322                 if (dev_info->internal_event_port == 0 ||
2323                         dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2324                         continue;
2325                 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2326                                                 &rte_eth_devices[i],
2327                                                 &dev_stats);
2328                 if (ret)
2329                         continue;
2330                 dev_stats_sum.rx_packets += dev_stats.rx_packets;
2331                 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2332         }
2333
2334         if (rx_adapter->service_inited)
2335                 *stats = rx_adapter->stats;
2336
2337         stats->rx_packets += dev_stats_sum.rx_packets;
2338         stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2339         return 0;
2340 }
2341
2342 int
2343 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
2344 {
2345         struct rte_event_eth_rx_adapter *rx_adapter;
2346         struct rte_eventdev *dev;
2347         struct eth_device_info *dev_info;
2348         uint32_t i;
2349
2350         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2351
2352         rx_adapter = rxa_id_to_adapter(id);
2353         if (rx_adapter == NULL)
2354                 return -EINVAL;
2355
2356         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2357         RTE_ETH_FOREACH_DEV(i) {
2358                 dev_info = &rx_adapter->eth_devices[i];
2359                 if (dev_info->internal_event_port == 0 ||
2360                         dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
2361                         continue;
2362                 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
2363                                                         &rte_eth_devices[i]);
2364         }
2365
2366         memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
2367         return 0;
2368 }
2369
2370 int
2371 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
2372 {
2373         struct rte_event_eth_rx_adapter *rx_adapter;
2374
2375         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2376
2377         rx_adapter = rxa_id_to_adapter(id);
2378         if (rx_adapter == NULL || service_id == NULL)
2379                 return -EINVAL;
2380
2381         if (rx_adapter->service_inited)
2382                 *service_id = rx_adapter->service_id;
2383
2384         return rx_adapter->service_inited ? 0 : -ESRCH;
2385 }
2386
2387 int __rte_experimental
2388 rte_event_eth_rx_adapter_cb_register(uint8_t id,
2389                                         uint16_t eth_dev_id,
2390                                         rte_event_eth_rx_adapter_cb_fn cb_fn,
2391                                         void *cb_arg)
2392 {
2393         struct rte_event_eth_rx_adapter *rx_adapter;
2394         struct eth_device_info *dev_info;
2395         uint32_t cap;
2396         int ret;
2397
2398         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2399         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2400
2401         rx_adapter = rxa_id_to_adapter(id);
2402         if (rx_adapter == NULL)
2403                 return -EINVAL;
2404
2405         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2406         if (dev_info->rx_queue == NULL)
2407                 return -EINVAL;
2408
2409         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2410                                                 eth_dev_id,
2411                                                 &cap);
2412         if (ret) {
2413                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2414                         "eth port %" PRIu16, id, eth_dev_id);
2415                 return ret;
2416         }
2417
2418         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2419                 RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
2420                                 PRIu16, eth_dev_id);
2421                 return -EINVAL;
2422         }
2423
2424         rte_spinlock_lock(&rx_adapter->rx_lock);
2425         dev_info->cb_fn = cb_fn;
2426         dev_info->cb_arg = cb_arg;
2427         rte_spinlock_unlock(&rx_adapter->rx_lock);
2428
2429         return 0;
2430 }