raw/ifpga/base: fix interrupt handler instance usage
[dpdk.git] / lib / librte_eventdev / rte_event_eth_rx_adapter.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #if defined(LINUX)
6 #include <sys/epoll.h>
7 #endif
8 #include <unistd.h>
9
10 #include <rte_cycles.h>
11 #include <rte_common.h>
12 #include <rte_dev.h>
13 #include <rte_errno.h>
14 #include <rte_ethdev.h>
15 #include <rte_log.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
20
21 #include "rte_eventdev.h"
22 #include "rte_eventdev_pmd.h"
23 #include "rte_eventdev_trace.h"
24 #include "rte_event_eth_rx_adapter.h"
25
26 #define BATCH_SIZE              32
27 #define BLOCK_CNT_THRESHOLD     10
28 #define ETH_EVENT_BUFFER_SIZE   (4*BATCH_SIZE)
29
30 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
31 #define ETH_RX_ADAPTER_MEM_NAME_LEN     32
32
33 #define RSS_KEY_SIZE    40
34 /* value written to intr thread pipe to signal thread exit */
35 #define ETH_BRIDGE_INTR_THREAD_EXIT     1
36 /* Sentinel value to detect initialized file handle */
37 #define INIT_FD         -1
38
39 /*
40  * Used to store port and queue ID of interrupting Rx queue
41  */
42 union queue_data {
43         RTE_STD_C11
44         void *ptr;
45         struct {
46                 uint16_t port;
47                 uint16_t queue;
48         };
49 };
50
51 /*
52  * There is an instance of this struct per polled Rx queue added to the
53  * adapter
54  */
55 struct eth_rx_poll_entry {
56         /* Eth port to poll */
57         uint16_t eth_dev_id;
58         /* Eth rx queue to poll */
59         uint16_t eth_rx_qid;
60 };
61
62 /* Instance per adapter */
63 struct rte_eth_event_enqueue_buffer {
64         /* Count of events in this buffer */
65         uint16_t count;
66         /* Array of events in this buffer */
67         struct rte_event events[ETH_EVENT_BUFFER_SIZE];
68 };
69
70 struct rte_event_eth_rx_adapter {
71         /* RSS key */
72         uint8_t rss_key_be[RSS_KEY_SIZE];
73         /* Event device identifier */
74         uint8_t eventdev_id;
75         /* Per ethernet device structure */
76         struct eth_device_info *eth_devices;
77         /* Event port identifier */
78         uint8_t event_port_id;
79         /* Lock to serialize config updates with service function */
80         rte_spinlock_t rx_lock;
81         /* Max mbufs processed in any service function invocation */
82         uint32_t max_nb_rx;
83         /* Receive queues that need to be polled */
84         struct eth_rx_poll_entry *eth_rx_poll;
85         /* Size of the eth_rx_poll array */
86         uint16_t num_rx_polled;
87         /* Weighted round robin schedule */
88         uint32_t *wrr_sched;
89         /* wrr_sched[] size */
90         uint32_t wrr_len;
91         /* Next entry in wrr[] to begin polling */
92         uint32_t wrr_pos;
93         /* Event burst buffer */
94         struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
95         /* Per adapter stats */
96         struct rte_event_eth_rx_adapter_stats stats;
97         /* Block count, counts up to BLOCK_CNT_THRESHOLD */
98         uint16_t enq_block_count;
99         /* Block start ts */
100         uint64_t rx_enq_block_start_ts;
101         /* epoll fd used to wait for Rx interrupts */
102         int epd;
103         /* Num of interrupt driven interrupt queues */
104         uint32_t num_rx_intr;
105         /* Used to send <dev id, queue id> of interrupting Rx queues from
106          * the interrupt thread to the Rx thread
107          */
108         struct rte_ring *intr_ring;
109         /* Rx Queue data (dev id, queue id) for the last non-empty
110          * queue polled
111          */
112         union queue_data qd;
113         /* queue_data is valid */
114         int qd_valid;
115         /* Interrupt ring lock, synchronizes Rx thread
116          * and interrupt thread
117          */
118         rte_spinlock_t intr_ring_lock;
119         /* event array passed to rte_poll_wait */
120         struct rte_epoll_event *epoll_events;
121         /* Count of interrupt vectors in use */
122         uint32_t num_intr_vec;
123         /* Thread blocked on Rx interrupts */
124         pthread_t rx_intr_thread;
125         /* Configuration callback for rte_service configuration */
126         rte_event_eth_rx_adapter_conf_cb conf_cb;
127         /* Configuration callback argument */
128         void *conf_arg;
129         /* Set if  default_cb is being used */
130         int default_cb_arg;
131         /* Service initialization state */
132         uint8_t service_inited;
133         /* Total count of Rx queues in adapter */
134         uint32_t nb_queues;
135         /* Memory allocation name */
136         char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
137         /* Socket identifier cached from eventdev */
138         int socket_id;
139         /* Per adapter EAL service */
140         uint32_t service_id;
141         /* Adapter started flag */
142         uint8_t rxa_started;
143         /* Adapter ID */
144         uint8_t id;
145 } __rte_cache_aligned;
146
147 /* Per eth device */
148 struct eth_device_info {
149         struct rte_eth_dev *dev;
150         struct eth_rx_queue_info *rx_queue;
151         /* Rx callback */
152         rte_event_eth_rx_adapter_cb_fn cb_fn;
153         /* Rx callback argument */
154         void *cb_arg;
155         /* Set if ethdev->eventdev packet transfer uses a
156          * hardware mechanism
157          */
158         uint8_t internal_event_port;
159         /* Set if the adapter is processing rx queues for
160          * this eth device and packet processing has been
161          * started, allows for the code to know if the PMD
162          * rx_adapter_stop callback needs to be invoked
163          */
164         uint8_t dev_rx_started;
165         /* Number of queues added for this device */
166         uint16_t nb_dev_queues;
167         /* Number of poll based queues
168          * If nb_rx_poll > 0, the start callback will
169          * be invoked if not already invoked
170          */
171         uint16_t nb_rx_poll;
172         /* Number of interrupt based queues
173          * If nb_rx_intr > 0, the start callback will
174          * be invoked if not already invoked.
175          */
176         uint16_t nb_rx_intr;
177         /* Number of queues that use the shared interrupt */
178         uint16_t nb_shared_intr;
179         /* sum(wrr(q)) for all queues within the device
180          * useful when deleting all device queues
181          */
182         uint32_t wrr_len;
183         /* Intr based queue index to start polling from, this is used
184          * if the number of shared interrupts is non-zero
185          */
186         uint16_t next_q_idx;
187         /* Intr based queue indices */
188         uint16_t *intr_queue;
189         /* device generates per Rx queue interrupt for queue index
190          * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
191          */
192         int multi_intr_cap;
193         /* shared interrupt enabled */
194         int shared_intr_enabled;
195 };
196
197 /* Per Rx queue */
198 struct eth_rx_queue_info {
199         int queue_enabled;      /* True if added */
200         int intr_enabled;
201         uint16_t wt;            /* Polling weight */
202         uint32_t flow_id_mask;  /* Set to ~0 if app provides flow id else 0 */
203         uint64_t event;
204 };
205
206 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
207
208 static inline int
209 rxa_validate_id(uint8_t id)
210 {
211         return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
212 }
213
214 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
215         if (!rxa_validate_id(id)) { \
216                 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
217                 return retval; \
218         } \
219 } while (0)
220
221 static inline int
222 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
223 {
224         return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
225 }
226
227 /* Greatest common divisor */
228 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
229 {
230         uint16_t r = a % b;
231
232         return r ? rxa_gcd_u16(b, r) : b;
233 }
234
235 /* Returns the next queue in the polling sequence
236  *
237  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
238  */
239 static int
240 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
241          unsigned int n, int *cw,
242          struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
243          uint16_t gcd, int prev)
244 {
245         int i = prev;
246         uint16_t w;
247
248         while (1) {
249                 uint16_t q;
250                 uint16_t d;
251
252                 i = (i + 1) % n;
253                 if (i == 0) {
254                         *cw = *cw - gcd;
255                         if (*cw <= 0)
256                                 *cw = max_wt;
257                 }
258
259                 q = eth_rx_poll[i].eth_rx_qid;
260                 d = eth_rx_poll[i].eth_dev_id;
261                 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
262
263                 if ((int)w >= *cw)
264                         return i;
265         }
266 }
267
268 static inline int
269 rxa_shared_intr(struct eth_device_info *dev_info,
270         int rx_queue_id)
271 {
272         int multi_intr_cap;
273
274         if (dev_info->dev->intr_handle == NULL)
275                 return 0;
276
277         multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
278         return !multi_intr_cap ||
279                 rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
280 }
281
282 static inline int
283 rxa_intr_queue(struct eth_device_info *dev_info,
284         int rx_queue_id)
285 {
286         struct eth_rx_queue_info *queue_info;
287
288         queue_info = &dev_info->rx_queue[rx_queue_id];
289         return dev_info->rx_queue &&
290                 !dev_info->internal_event_port &&
291                 queue_info->queue_enabled && queue_info->wt == 0;
292 }
293
294 static inline int
295 rxa_polled_queue(struct eth_device_info *dev_info,
296         int rx_queue_id)
297 {
298         struct eth_rx_queue_info *queue_info;
299
300         queue_info = &dev_info->rx_queue[rx_queue_id];
301         return !dev_info->internal_event_port &&
302                 dev_info->rx_queue &&
303                 queue_info->queue_enabled && queue_info->wt != 0;
304 }
305
306 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
307 static int
308 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
309 {
310         uint16_t i;
311         int n, s;
312         uint16_t nbq;
313
314         nbq = dev_info->dev->data->nb_rx_queues;
315         n = 0; /* non shared count */
316         s = 0; /* shared count */
317
318         if (rx_queue_id == -1) {
319                 for (i = 0; i < nbq; i++) {
320                         if (!rxa_shared_intr(dev_info, i))
321                                 n += add ? !rxa_intr_queue(dev_info, i) :
322                                         rxa_intr_queue(dev_info, i);
323                         else
324                                 s += add ? !rxa_intr_queue(dev_info, i) :
325                                         rxa_intr_queue(dev_info, i);
326                 }
327
328                 if (s > 0) {
329                         if ((add && dev_info->nb_shared_intr == 0) ||
330                                 (!add && dev_info->nb_shared_intr))
331                                 n += 1;
332                 }
333         } else {
334                 if (!rxa_shared_intr(dev_info, rx_queue_id))
335                         n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
336                                 rxa_intr_queue(dev_info, rx_queue_id);
337                 else
338                         n = add ? !dev_info->nb_shared_intr :
339                                 dev_info->nb_shared_intr == 1;
340         }
341
342         return add ? n : -n;
343 }
344
345 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
346  */
347 static void
348 rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
349                         struct eth_device_info *dev_info,
350                         int rx_queue_id,
351                         uint32_t *nb_rx_intr)
352 {
353         uint32_t intr_diff;
354
355         if (rx_queue_id == -1)
356                 intr_diff = dev_info->nb_rx_intr;
357         else
358                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
359
360         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
361 }
362
363 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
364  * interrupt queues could currently be poll mode Rx queues
365  */
366 static void
367 rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
368                         struct eth_device_info *dev_info,
369                         int rx_queue_id,
370                         uint32_t *nb_rx_poll,
371                         uint32_t *nb_rx_intr,
372                         uint32_t *nb_wrr)
373 {
374         uint32_t intr_diff;
375         uint32_t poll_diff;
376         uint32_t wrr_len_diff;
377
378         if (rx_queue_id == -1) {
379                 intr_diff = dev_info->dev->data->nb_rx_queues -
380                                                 dev_info->nb_rx_intr;
381                 poll_diff = dev_info->nb_rx_poll;
382                 wrr_len_diff = dev_info->wrr_len;
383         } else {
384                 intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
385                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
386                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
387                                         0;
388         }
389
390         *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
391         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
392         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
393 }
394
395 /* Calculate size of the eth_rx_poll and wrr_sched arrays
396  * after deleting poll mode rx queues
397  */
398 static void
399 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
400                         struct eth_device_info *dev_info,
401                         int rx_queue_id,
402                         uint32_t *nb_rx_poll,
403                         uint32_t *nb_wrr)
404 {
405         uint32_t poll_diff;
406         uint32_t wrr_len_diff;
407
408         if (rx_queue_id == -1) {
409                 poll_diff = dev_info->nb_rx_poll;
410                 wrr_len_diff = dev_info->wrr_len;
411         } else {
412                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
413                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
414                                         0;
415         }
416
417         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
418         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
419 }
420
421 /* Calculate nb_rx_* after adding poll mode rx queues
422  */
423 static void
424 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
425                         struct eth_device_info *dev_info,
426                         int rx_queue_id,
427                         uint16_t wt,
428                         uint32_t *nb_rx_poll,
429                         uint32_t *nb_rx_intr,
430                         uint32_t *nb_wrr)
431 {
432         uint32_t intr_diff;
433         uint32_t poll_diff;
434         uint32_t wrr_len_diff;
435
436         if (rx_queue_id == -1) {
437                 intr_diff = dev_info->nb_rx_intr;
438                 poll_diff = dev_info->dev->data->nb_rx_queues -
439                                                 dev_info->nb_rx_poll;
440                 wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
441                                 - dev_info->wrr_len;
442         } else {
443                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
444                 poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
445                 wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
446                                 wt - dev_info->rx_queue[rx_queue_id].wt :
447                                 wt;
448         }
449
450         *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
451         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
452         *nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
453 }
454
455 /* Calculate nb_rx_* after adding rx_queue_id */
456 static void
457 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
458                 struct eth_device_info *dev_info,
459                 int rx_queue_id,
460                 uint16_t wt,
461                 uint32_t *nb_rx_poll,
462                 uint32_t *nb_rx_intr,
463                 uint32_t *nb_wrr)
464 {
465         if (wt != 0)
466                 rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
467                                         wt, nb_rx_poll, nb_rx_intr, nb_wrr);
468         else
469                 rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
470                                         nb_rx_poll, nb_rx_intr, nb_wrr);
471 }
472
473 /* Calculate nb_rx_* after deleting rx_queue_id */
474 static void
475 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
476                 struct eth_device_info *dev_info,
477                 int rx_queue_id,
478                 uint32_t *nb_rx_poll,
479                 uint32_t *nb_rx_intr,
480                 uint32_t *nb_wrr)
481 {
482         rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
483                                 nb_wrr);
484         rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
485                                 nb_rx_intr);
486 }
487
488 /*
489  * Allocate the rx_poll array
490  */
491 static struct eth_rx_poll_entry *
492 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
493         uint32_t num_rx_polled)
494 {
495         size_t len;
496
497         len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
498                                                         RTE_CACHE_LINE_SIZE);
499         return  rte_zmalloc_socket(rx_adapter->mem_name,
500                                 len,
501                                 RTE_CACHE_LINE_SIZE,
502                                 rx_adapter->socket_id);
503 }
504
505 /*
506  * Allocate the WRR array
507  */
508 static uint32_t *
509 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
510 {
511         size_t len;
512
513         len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
514                         RTE_CACHE_LINE_SIZE);
515         return  rte_zmalloc_socket(rx_adapter->mem_name,
516                                 len,
517                                 RTE_CACHE_LINE_SIZE,
518                                 rx_adapter->socket_id);
519 }
520
521 static int
522 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
523                 uint32_t nb_poll,
524                 uint32_t nb_wrr,
525                 struct eth_rx_poll_entry **rx_poll,
526                 uint32_t **wrr_sched)
527 {
528
529         if (nb_poll == 0) {
530                 *rx_poll = NULL;
531                 *wrr_sched = NULL;
532                 return 0;
533         }
534
535         *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
536         if (*rx_poll == NULL) {
537                 *wrr_sched = NULL;
538                 return -ENOMEM;
539         }
540
541         *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
542         if (*wrr_sched == NULL) {
543                 rte_free(*rx_poll);
544                 return -ENOMEM;
545         }
546         return 0;
547 }
548
549 /* Precalculate WRR polling sequence for all queues in rx_adapter */
550 static void
551 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
552                 struct eth_rx_poll_entry *rx_poll,
553                 uint32_t *rx_wrr)
554 {
555         uint16_t d;
556         uint16_t q;
557         unsigned int i;
558         int prev = -1;
559         int cw = -1;
560
561         /* Initialize variables for calculation of wrr schedule */
562         uint16_t max_wrr_pos = 0;
563         unsigned int poll_q = 0;
564         uint16_t max_wt = 0;
565         uint16_t gcd = 0;
566
567         if (rx_poll == NULL)
568                 return;
569
570         /* Generate array of all queues to poll, the size of this
571          * array is poll_q
572          */
573         RTE_ETH_FOREACH_DEV(d) {
574                 uint16_t nb_rx_queues;
575                 struct eth_device_info *dev_info =
576                                 &rx_adapter->eth_devices[d];
577                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
578                 if (dev_info->rx_queue == NULL)
579                         continue;
580                 if (dev_info->internal_event_port)
581                         continue;
582                 dev_info->wrr_len = 0;
583                 for (q = 0; q < nb_rx_queues; q++) {
584                         struct eth_rx_queue_info *queue_info =
585                                 &dev_info->rx_queue[q];
586                         uint16_t wt;
587
588                         if (!rxa_polled_queue(dev_info, q))
589                                 continue;
590                         wt = queue_info->wt;
591                         rx_poll[poll_q].eth_dev_id = d;
592                         rx_poll[poll_q].eth_rx_qid = q;
593                         max_wrr_pos += wt;
594                         dev_info->wrr_len += wt;
595                         max_wt = RTE_MAX(max_wt, wt);
596                         gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
597                         poll_q++;
598                 }
599         }
600
601         /* Generate polling sequence based on weights */
602         prev = -1;
603         cw = -1;
604         for (i = 0; i < max_wrr_pos; i++) {
605                 rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
606                                      rx_poll, max_wt, gcd, prev);
607                 prev = rx_wrr[i];
608         }
609 }
610
611 static inline void
612 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
613         struct rte_ipv6_hdr **ipv6_hdr)
614 {
615         struct rte_ether_hdr *eth_hdr =
616                 rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
617         struct rte_vlan_hdr *vlan_hdr;
618
619         *ipv4_hdr = NULL;
620         *ipv6_hdr = NULL;
621
622         switch (eth_hdr->ether_type) {
623         case RTE_BE16(RTE_ETHER_TYPE_IPV4):
624                 *ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
625                 break;
626
627         case RTE_BE16(RTE_ETHER_TYPE_IPV6):
628                 *ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
629                 break;
630
631         case RTE_BE16(RTE_ETHER_TYPE_VLAN):
632                 vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
633                 switch (vlan_hdr->eth_proto) {
634                 case RTE_BE16(RTE_ETHER_TYPE_IPV4):
635                         *ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
636                         break;
637                 case RTE_BE16(RTE_ETHER_TYPE_IPV6):
638                         *ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
639                         break;
640                 default:
641                         break;
642                 }
643                 break;
644
645         default:
646                 break;
647         }
648 }
649
650 /* Calculate RSS hash for IPv4/6 */
651 static inline uint32_t
652 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
653 {
654         uint32_t input_len;
655         void *tuple;
656         struct rte_ipv4_tuple ipv4_tuple;
657         struct rte_ipv6_tuple ipv6_tuple;
658         struct rte_ipv4_hdr *ipv4_hdr;
659         struct rte_ipv6_hdr *ipv6_hdr;
660
661         rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
662
663         if (ipv4_hdr) {
664                 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
665                 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
666                 tuple = &ipv4_tuple;
667                 input_len = RTE_THASH_V4_L3_LEN;
668         } else if (ipv6_hdr) {
669                 rte_thash_load_v6_addrs(ipv6_hdr,
670                                         (union rte_thash_tuple *)&ipv6_tuple);
671                 tuple = &ipv6_tuple;
672                 input_len = RTE_THASH_V6_L3_LEN;
673         } else
674                 return 0;
675
676         return rte_softrss_be(tuple, input_len, rss_key_be);
677 }
678
679 static inline int
680 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
681 {
682         return !!rx_adapter->enq_block_count;
683 }
684
685 static inline void
686 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
687 {
688         if (rx_adapter->rx_enq_block_start_ts)
689                 return;
690
691         rx_adapter->enq_block_count++;
692         if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
693                 return;
694
695         rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
696 }
697
698 static inline void
699 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
700                     struct rte_event_eth_rx_adapter_stats *stats)
701 {
702         if (unlikely(!stats->rx_enq_start_ts))
703                 stats->rx_enq_start_ts = rte_get_tsc_cycles();
704
705         if (likely(!rxa_enq_blocked(rx_adapter)))
706                 return;
707
708         rx_adapter->enq_block_count = 0;
709         if (rx_adapter->rx_enq_block_start_ts) {
710                 stats->rx_enq_end_ts = rte_get_tsc_cycles();
711                 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
712                     rx_adapter->rx_enq_block_start_ts;
713                 rx_adapter->rx_enq_block_start_ts = 0;
714         }
715 }
716
717 /* Enqueue buffered events to event device */
718 static inline uint16_t
719 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
720 {
721         struct rte_eth_event_enqueue_buffer *buf =
722             &rx_adapter->event_enqueue_buffer;
723         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
724
725         uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
726                                         rx_adapter->event_port_id,
727                                         buf->events,
728                                         buf->count);
729         if (n != buf->count) {
730                 memmove(buf->events,
731                         &buf->events[n],
732                         (buf->count - n) * sizeof(struct rte_event));
733                 stats->rx_enq_retry++;
734         }
735
736         n ? rxa_enq_block_end_ts(rx_adapter, stats) :
737                 rxa_enq_block_start_ts(rx_adapter);
738
739         buf->count -= n;
740         stats->rx_enq_count += n;
741
742         return n;
743 }
744
745 static inline void
746 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
747                 uint16_t eth_dev_id,
748                 uint16_t rx_queue_id,
749                 struct rte_mbuf **mbufs,
750                 uint16_t num)
751 {
752         uint32_t i;
753         struct eth_device_info *dev_info =
754                                         &rx_adapter->eth_devices[eth_dev_id];
755         struct eth_rx_queue_info *eth_rx_queue_info =
756                                         &dev_info->rx_queue[rx_queue_id];
757         struct rte_eth_event_enqueue_buffer *buf =
758                                         &rx_adapter->event_enqueue_buffer;
759         struct rte_event *ev = &buf->events[buf->count];
760         uint64_t event = eth_rx_queue_info->event;
761         uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
762         struct rte_mbuf *m = mbufs[0];
763         uint32_t rss_mask;
764         uint32_t rss;
765         int do_rss;
766         uint16_t nb_cb;
767         uint16_t dropped;
768
769         /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
770         rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
771         do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
772
773         for (i = 0; i < num; i++) {
774                 m = mbufs[i];
775
776                 rss = do_rss ?
777                         rxa_do_softrss(m, rx_adapter->rss_key_be) :
778                         m->hash.rss;
779                 ev->event = event;
780                 ev->flow_id = (rss & ~flow_id_mask) |
781                                 (ev->flow_id & flow_id_mask);
782                 ev->mbuf = m;
783                 ev++;
784         }
785
786         if (dev_info->cb_fn) {
787
788                 dropped = 0;
789                 nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
790                                         ETH_EVENT_BUFFER_SIZE, buf->count, ev,
791                                         num, dev_info->cb_arg, &dropped);
792                 if (unlikely(nb_cb > num))
793                         RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
794                                 nb_cb, num);
795                 else
796                         num = nb_cb;
797                 if (dropped)
798                         rx_adapter->stats.rx_dropped += dropped;
799         }
800
801         buf->count += num;
802 }
803
804 /* Enqueue packets from  <port, q>  to event buffer */
805 static inline uint32_t
806 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
807         uint16_t port_id,
808         uint16_t queue_id,
809         uint32_t rx_count,
810         uint32_t max_rx,
811         int *rxq_empty)
812 {
813         struct rte_mbuf *mbufs[BATCH_SIZE];
814         struct rte_eth_event_enqueue_buffer *buf =
815                                         &rx_adapter->event_enqueue_buffer;
816         struct rte_event_eth_rx_adapter_stats *stats =
817                                         &rx_adapter->stats;
818         uint16_t n;
819         uint32_t nb_rx = 0;
820
821         if (rxq_empty)
822                 *rxq_empty = 0;
823         /* Don't do a batch dequeue from the rx queue if there isn't
824          * enough space in the enqueue buffer.
825          */
826         while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
827                 if (buf->count >= BATCH_SIZE)
828                         rxa_flush_event_buffer(rx_adapter);
829
830                 stats->rx_poll_count++;
831                 n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
832                 if (unlikely(!n)) {
833                         if (rxq_empty)
834                                 *rxq_empty = 1;
835                         break;
836                 }
837                 rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
838                 nb_rx += n;
839                 if (rx_count + nb_rx > max_rx)
840                         break;
841         }
842
843         if (buf->count > 0)
844                 rxa_flush_event_buffer(rx_adapter);
845
846         return nb_rx;
847 }
848
849 static inline void
850 rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
851                 void *data)
852 {
853         uint16_t port_id;
854         uint16_t queue;
855         int err;
856         union queue_data qd;
857         struct eth_device_info *dev_info;
858         struct eth_rx_queue_info *queue_info;
859         int *intr_enabled;
860
861         qd.ptr = data;
862         port_id = qd.port;
863         queue = qd.queue;
864
865         dev_info = &rx_adapter->eth_devices[port_id];
866         queue_info = &dev_info->rx_queue[queue];
867         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
868         if (rxa_shared_intr(dev_info, queue))
869                 intr_enabled = &dev_info->shared_intr_enabled;
870         else
871                 intr_enabled = &queue_info->intr_enabled;
872
873         if (*intr_enabled) {
874                 *intr_enabled = 0;
875                 err = rte_ring_enqueue(rx_adapter->intr_ring, data);
876                 /* Entry should always be available.
877                  * The ring size equals the maximum number of interrupt
878                  * vectors supported (an interrupt vector is shared in
879                  * case of shared interrupts)
880                  */
881                 if (err)
882                         RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
883                                 " to ring: %s", strerror(-err));
884                 else
885                         rte_eth_dev_rx_intr_disable(port_id, queue);
886         }
887         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
888 }
889
890 static int
891 rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
892                         uint32_t num_intr_vec)
893 {
894         if (rx_adapter->num_intr_vec + num_intr_vec >
895                                 RTE_EVENT_ETH_INTR_RING_SIZE) {
896                 RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
897                 " %d needed %d limit %d", rx_adapter->num_intr_vec,
898                 num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
899                 return -ENOSPC;
900         }
901
902         return 0;
903 }
904
905 /* Delete entries for (dev, queue) from the interrupt ring */
906 static void
907 rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
908                         struct eth_device_info *dev_info,
909                         uint16_t rx_queue_id)
910 {
911         int i, n;
912         union queue_data qd;
913
914         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
915
916         n = rte_ring_count(rx_adapter->intr_ring);
917         for (i = 0; i < n; i++) {
918                 rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
919                 if (!rxa_shared_intr(dev_info, rx_queue_id)) {
920                         if (qd.port == dev_info->dev->data->port_id &&
921                                 qd.queue == rx_queue_id)
922                                 continue;
923                 } else {
924                         if (qd.port == dev_info->dev->data->port_id)
925                                 continue;
926                 }
927                 rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
928         }
929
930         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
931 }
932
933 /* pthread callback handling interrupt mode receive queues
934  * After receiving an Rx interrupt, it enqueues the port id and queue id of the
935  * interrupting queue to the adapter's ring buffer for interrupt events.
936  * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
937  * the adapter service function.
938  */
939 static void *
940 rxa_intr_thread(void *arg)
941 {
942         struct rte_event_eth_rx_adapter *rx_adapter = arg;
943         struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
944         int n, i;
945
946         while (1) {
947                 n = rte_epoll_wait(rx_adapter->epd, epoll_events,
948                                 RTE_EVENT_ETH_INTR_RING_SIZE, -1);
949                 if (unlikely(n < 0))
950                         RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
951                                         n);
952                 for (i = 0; i < n; i++) {
953                         rxa_intr_ring_enqueue(rx_adapter,
954                                         epoll_events[i].epdata.data);
955                 }
956         }
957
958         return NULL;
959 }
960
961 /* Dequeue <port, q> from interrupt ring and enqueue received
962  * mbufs to eventdev
963  */
964 static inline uint32_t
965 rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
966 {
967         uint32_t n;
968         uint32_t nb_rx = 0;
969         int rxq_empty;
970         struct rte_eth_event_enqueue_buffer *buf;
971         rte_spinlock_t *ring_lock;
972         uint8_t max_done = 0;
973
974         if (rx_adapter->num_rx_intr == 0)
975                 return 0;
976
977         if (rte_ring_count(rx_adapter->intr_ring) == 0
978                 && !rx_adapter->qd_valid)
979                 return 0;
980
981         buf = &rx_adapter->event_enqueue_buffer;
982         ring_lock = &rx_adapter->intr_ring_lock;
983
984         if (buf->count >= BATCH_SIZE)
985                 rxa_flush_event_buffer(rx_adapter);
986
987         while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
988                 struct eth_device_info *dev_info;
989                 uint16_t port;
990                 uint16_t queue;
991                 union queue_data qd  = rx_adapter->qd;
992                 int err;
993
994                 if (!rx_adapter->qd_valid) {
995                         struct eth_rx_queue_info *queue_info;
996
997                         rte_spinlock_lock(ring_lock);
998                         err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
999                         if (err) {
1000                                 rte_spinlock_unlock(ring_lock);
1001                                 break;
1002                         }
1003
1004                         port = qd.port;
1005                         queue = qd.queue;
1006                         rx_adapter->qd = qd;
1007                         rx_adapter->qd_valid = 1;
1008                         dev_info = &rx_adapter->eth_devices[port];
1009                         if (rxa_shared_intr(dev_info, queue))
1010                                 dev_info->shared_intr_enabled = 1;
1011                         else {
1012                                 queue_info = &dev_info->rx_queue[queue];
1013                                 queue_info->intr_enabled = 1;
1014                         }
1015                         rte_eth_dev_rx_intr_enable(port, queue);
1016                         rte_spinlock_unlock(ring_lock);
1017                 } else {
1018                         port = qd.port;
1019                         queue = qd.queue;
1020
1021                         dev_info = &rx_adapter->eth_devices[port];
1022                 }
1023
1024                 if (rxa_shared_intr(dev_info, queue)) {
1025                         uint16_t i;
1026                         uint16_t nb_queues;
1027
1028                         nb_queues = dev_info->dev->data->nb_rx_queues;
1029                         n = 0;
1030                         for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1031                                 uint8_t enq_buffer_full;
1032
1033                                 if (!rxa_intr_queue(dev_info, i))
1034                                         continue;
1035                                 n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1036                                         rx_adapter->max_nb_rx,
1037                                         &rxq_empty);
1038                                 nb_rx += n;
1039
1040                                 enq_buffer_full = !rxq_empty && n == 0;
1041                                 max_done = nb_rx > rx_adapter->max_nb_rx;
1042
1043                                 if (enq_buffer_full || max_done) {
1044                                         dev_info->next_q_idx = i;
1045                                         goto done;
1046                                 }
1047                         }
1048
1049                         rx_adapter->qd_valid = 0;
1050
1051                         /* Reinitialize for next interrupt */
1052                         dev_info->next_q_idx = dev_info->multi_intr_cap ?
1053                                                 RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1054                                                 0;
1055                 } else {
1056                         n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1057                                 rx_adapter->max_nb_rx,
1058                                 &rxq_empty);
1059                         rx_adapter->qd_valid = !rxq_empty;
1060                         nb_rx += n;
1061                         if (nb_rx > rx_adapter->max_nb_rx)
1062                                 break;
1063                 }
1064         }
1065
1066 done:
1067         rx_adapter->stats.rx_intr_packets += nb_rx;
1068         return nb_rx;
1069 }
1070
1071 /*
1072  * Polls receive queues added to the event adapter and enqueues received
1073  * packets to the event device.
1074  *
1075  * The receive code enqueues initially to a temporary buffer, the
1076  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1077  *
1078  * If there isn't space available in the temporary buffer, packets from the
1079  * Rx queue aren't dequeued from the eth device, this back pressures the
1080  * eth device, in virtual device environments this back pressure is relayed to
1081  * the hypervisor's switching layer where adjustments can be made to deal with
1082  * it.
1083  */
1084 static inline uint32_t
1085 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
1086 {
1087         uint32_t num_queue;
1088         uint32_t nb_rx = 0;
1089         struct rte_eth_event_enqueue_buffer *buf;
1090         uint32_t wrr_pos;
1091         uint32_t max_nb_rx;
1092
1093         wrr_pos = rx_adapter->wrr_pos;
1094         max_nb_rx = rx_adapter->max_nb_rx;
1095         buf = &rx_adapter->event_enqueue_buffer;
1096
1097         /* Iterate through a WRR sequence */
1098         for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1099                 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1100                 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1101                 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1102
1103                 /* Don't do a batch dequeue from the rx queue if there isn't
1104                  * enough space in the enqueue buffer.
1105                  */
1106                 if (buf->count >= BATCH_SIZE)
1107                         rxa_flush_event_buffer(rx_adapter);
1108                 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
1109                         rx_adapter->wrr_pos = wrr_pos;
1110                         return nb_rx;
1111                 }
1112
1113                 nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1114                                 NULL);
1115                 if (nb_rx > max_nb_rx) {
1116                         rx_adapter->wrr_pos =
1117                                     (wrr_pos + 1) % rx_adapter->wrr_len;
1118                         break;
1119                 }
1120
1121                 if (++wrr_pos == rx_adapter->wrr_len)
1122                         wrr_pos = 0;
1123         }
1124         return nb_rx;
1125 }
1126
1127 static int
1128 rxa_service_func(void *args)
1129 {
1130         struct rte_event_eth_rx_adapter *rx_adapter = args;
1131         struct rte_event_eth_rx_adapter_stats *stats;
1132
1133         if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1134                 return 0;
1135         if (!rx_adapter->rxa_started) {
1136                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1137                 return 0;
1138         }
1139
1140         stats = &rx_adapter->stats;
1141         stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
1142         stats->rx_packets += rxa_poll(rx_adapter);
1143         rte_spinlock_unlock(&rx_adapter->rx_lock);
1144         return 0;
1145 }
1146
1147 static int
1148 rte_event_eth_rx_adapter_init(void)
1149 {
1150         const char *name = "rte_event_eth_rx_adapter_array";
1151         const struct rte_memzone *mz;
1152         unsigned int sz;
1153
1154         sz = sizeof(*event_eth_rx_adapter) *
1155             RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1156         sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1157
1158         mz = rte_memzone_lookup(name);
1159         if (mz == NULL) {
1160                 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1161                                                  RTE_CACHE_LINE_SIZE);
1162                 if (mz == NULL) {
1163                         RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1164                                         PRId32, rte_errno);
1165                         return -rte_errno;
1166                 }
1167         }
1168
1169         event_eth_rx_adapter = mz->addr;
1170         return 0;
1171 }
1172
1173 static inline struct rte_event_eth_rx_adapter *
1174 rxa_id_to_adapter(uint8_t id)
1175 {
1176         return event_eth_rx_adapter ?
1177                 event_eth_rx_adapter[id] : NULL;
1178 }
1179
1180 static int
1181 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1182                 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1183 {
1184         int ret;
1185         struct rte_eventdev *dev;
1186         struct rte_event_dev_config dev_conf;
1187         int started;
1188         uint8_t port_id;
1189         struct rte_event_port_conf *port_conf = arg;
1190         struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1191
1192         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1193         dev_conf = dev->data->dev_conf;
1194
1195         started = dev->data->dev_started;
1196         if (started)
1197                 rte_event_dev_stop(dev_id);
1198         port_id = dev_conf.nb_event_ports;
1199         dev_conf.nb_event_ports += 1;
1200         ret = rte_event_dev_configure(dev_id, &dev_conf);
1201         if (ret) {
1202                 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1203                                                 dev_id);
1204                 if (started) {
1205                         if (rte_event_dev_start(dev_id))
1206                                 return -EIO;
1207                 }
1208                 return ret;
1209         }
1210
1211         ret = rte_event_port_setup(dev_id, port_id, port_conf);
1212         if (ret) {
1213                 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1214                                         port_id);
1215                 return ret;
1216         }
1217
1218         conf->event_port_id = port_id;
1219         conf->max_nb_rx = 128;
1220         if (started)
1221                 ret = rte_event_dev_start(dev_id);
1222         rx_adapter->default_cb_arg = 1;
1223         return ret;
1224 }
1225
1226 static int
1227 rxa_epoll_create1(void)
1228 {
1229 #if defined(LINUX)
1230         int fd;
1231         fd = epoll_create1(EPOLL_CLOEXEC);
1232         return fd < 0 ? -errno : fd;
1233 #elif defined(BSD)
1234         return -ENOTSUP;
1235 #endif
1236 }
1237
1238 static int
1239 rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
1240 {
1241         if (rx_adapter->epd != INIT_FD)
1242                 return 0;
1243
1244         rx_adapter->epd = rxa_epoll_create1();
1245         if (rx_adapter->epd < 0) {
1246                 int err = rx_adapter->epd;
1247                 rx_adapter->epd = INIT_FD;
1248                 RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1249                 return err;
1250         }
1251
1252         return 0;
1253 }
1254
1255 static int
1256 rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1257 {
1258         int err;
1259         char thread_name[RTE_MAX_THREAD_NAME_LEN];
1260
1261         if (rx_adapter->intr_ring)
1262                 return 0;
1263
1264         rx_adapter->intr_ring = rte_ring_create("intr_ring",
1265                                         RTE_EVENT_ETH_INTR_RING_SIZE,
1266                                         rte_socket_id(), 0);
1267         if (!rx_adapter->intr_ring)
1268                 return -ENOMEM;
1269
1270         rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1271                                         RTE_EVENT_ETH_INTR_RING_SIZE *
1272                                         sizeof(struct rte_epoll_event),
1273                                         RTE_CACHE_LINE_SIZE,
1274                                         rx_adapter->socket_id);
1275         if (!rx_adapter->epoll_events) {
1276                 err = -ENOMEM;
1277                 goto error;
1278         }
1279
1280         rte_spinlock_init(&rx_adapter->intr_ring_lock);
1281
1282         snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1283                         "rx-intr-thread-%d", rx_adapter->id);
1284
1285         err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1286                                 NULL, rxa_intr_thread, rx_adapter);
1287         if (!err) {
1288                 rte_thread_setname(rx_adapter->rx_intr_thread, thread_name);
1289                 return 0;
1290         }
1291
1292         RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1293 error:
1294         rte_ring_free(rx_adapter->intr_ring);
1295         rx_adapter->intr_ring = NULL;
1296         rx_adapter->epoll_events = NULL;
1297         return err;
1298 }
1299
1300 static int
1301 rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1302 {
1303         int err;
1304
1305         err = pthread_cancel(rx_adapter->rx_intr_thread);
1306         if (err)
1307                 RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1308                                 err);
1309
1310         err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1311         if (err)
1312                 RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1313
1314         rte_free(rx_adapter->epoll_events);
1315         rte_ring_free(rx_adapter->intr_ring);
1316         rx_adapter->intr_ring = NULL;
1317         rx_adapter->epoll_events = NULL;
1318         return 0;
1319 }
1320
1321 static int
1322 rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
1323 {
1324         int ret;
1325
1326         if (rx_adapter->num_rx_intr == 0)
1327                 return 0;
1328
1329         ret = rxa_destroy_intr_thread(rx_adapter);
1330         if (ret)
1331                 return ret;
1332
1333         close(rx_adapter->epd);
1334         rx_adapter->epd = INIT_FD;
1335
1336         return ret;
1337 }
1338
1339 static int
1340 rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1341         struct eth_device_info *dev_info,
1342         uint16_t rx_queue_id)
1343 {
1344         int err;
1345         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1346         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1347
1348         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1349         if (err) {
1350                 RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1351                         rx_queue_id);
1352                 return err;
1353         }
1354
1355         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1356                                         rx_adapter->epd,
1357                                         RTE_INTR_EVENT_DEL,
1358                                         0);
1359         if (err)
1360                 RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1361
1362         if (sintr)
1363                 dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1364         else
1365                 dev_info->shared_intr_enabled = 0;
1366         return err;
1367 }
1368
1369 static int
1370 rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1371                 struct eth_device_info *dev_info,
1372                 int rx_queue_id)
1373 {
1374         int err;
1375         int i;
1376         int s;
1377
1378         if (dev_info->nb_rx_intr == 0)
1379                 return 0;
1380
1381         err = 0;
1382         if (rx_queue_id == -1) {
1383                 s = dev_info->nb_shared_intr;
1384                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1385                         int sintr;
1386                         uint16_t q;
1387
1388                         q = dev_info->intr_queue[i];
1389                         sintr = rxa_shared_intr(dev_info, q);
1390                         s -= sintr;
1391
1392                         if (!sintr || s == 0) {
1393
1394                                 err = rxa_disable_intr(rx_adapter, dev_info,
1395                                                 q);
1396                                 if (err)
1397                                         return err;
1398                                 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1399                                                         q);
1400                         }
1401                 }
1402         } else {
1403                 if (!rxa_intr_queue(dev_info, rx_queue_id))
1404                         return 0;
1405                 if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1406                                 dev_info->nb_shared_intr == 1) {
1407                         err = rxa_disable_intr(rx_adapter, dev_info,
1408                                         rx_queue_id);
1409                         if (err)
1410                                 return err;
1411                         rxa_intr_ring_del_entries(rx_adapter, dev_info,
1412                                                 rx_queue_id);
1413                 }
1414
1415                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1416                         if (dev_info->intr_queue[i] == rx_queue_id) {
1417                                 for (; i < dev_info->nb_rx_intr - 1; i++)
1418                                         dev_info->intr_queue[i] =
1419                                                 dev_info->intr_queue[i + 1];
1420                                 break;
1421                         }
1422                 }
1423         }
1424
1425         return err;
1426 }
1427
1428 static int
1429 rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1430         struct eth_device_info *dev_info,
1431         uint16_t rx_queue_id)
1432 {
1433         int err, err1;
1434         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1435         union queue_data qd;
1436         int init_fd;
1437         uint16_t *intr_queue;
1438         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1439
1440         if (rxa_intr_queue(dev_info, rx_queue_id))
1441                 return 0;
1442
1443         intr_queue = dev_info->intr_queue;
1444         if (dev_info->intr_queue == NULL) {
1445                 size_t len =
1446                         dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1447                 dev_info->intr_queue =
1448                         rte_zmalloc_socket(
1449                                 rx_adapter->mem_name,
1450                                 len,
1451                                 0,
1452                                 rx_adapter->socket_id);
1453                 if (dev_info->intr_queue == NULL)
1454                         return -ENOMEM;
1455         }
1456
1457         init_fd = rx_adapter->epd;
1458         err = rxa_init_epd(rx_adapter);
1459         if (err)
1460                 goto err_free_queue;
1461
1462         qd.port = eth_dev_id;
1463         qd.queue = rx_queue_id;
1464
1465         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1466                                         rx_adapter->epd,
1467                                         RTE_INTR_EVENT_ADD,
1468                                         qd.ptr);
1469         if (err) {
1470                 RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1471                         " Rx Queue %u err %d", rx_queue_id, err);
1472                 goto err_del_fd;
1473         }
1474
1475         err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1476         if (err) {
1477                 RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1478                                 " Rx Queue %u err %d", rx_queue_id, err);
1479
1480                 goto err_del_event;
1481         }
1482
1483         err = rxa_create_intr_thread(rx_adapter);
1484         if (!err)  {
1485                 if (sintr)
1486                         dev_info->shared_intr_enabled = 1;
1487                 else
1488                         dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1489                 return 0;
1490         }
1491
1492
1493         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1494         if (err)
1495                 RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1496                                 " Rx Queue %u err %d", rx_queue_id, err);
1497 err_del_event:
1498         err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1499                                         rx_adapter->epd,
1500                                         RTE_INTR_EVENT_DEL,
1501                                         0);
1502         if (err1) {
1503                 RTE_EDEV_LOG_ERR("Could not delete event for"
1504                                 " Rx Queue %u err %d", rx_queue_id, err1);
1505         }
1506 err_del_fd:
1507         if (init_fd == INIT_FD) {
1508                 close(rx_adapter->epd);
1509                 rx_adapter->epd = -1;
1510         }
1511 err_free_queue:
1512         if (intr_queue == NULL)
1513                 rte_free(dev_info->intr_queue);
1514
1515         return err;
1516 }
1517
1518 static int
1519 rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1520         struct eth_device_info *dev_info,
1521         int rx_queue_id)
1522
1523 {
1524         int i, j, err;
1525         int si = -1;
1526         int shared_done = (dev_info->nb_shared_intr > 0);
1527
1528         if (rx_queue_id != -1) {
1529                 if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1530                         return 0;
1531                 return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1532         }
1533
1534         err = 0;
1535         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1536
1537                 if (rxa_shared_intr(dev_info, i) && shared_done)
1538                         continue;
1539
1540                 err = rxa_config_intr(rx_adapter, dev_info, i);
1541
1542                 shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1543                 if (shared_done) {
1544                         si = i;
1545                         dev_info->shared_intr_enabled = 1;
1546                 }
1547                 if (err)
1548                         break;
1549         }
1550
1551         if (err == 0)
1552                 return 0;
1553
1554         shared_done = (dev_info->nb_shared_intr > 0);
1555         for (j = 0; j < i; j++) {
1556                 if (rxa_intr_queue(dev_info, j))
1557                         continue;
1558                 if (rxa_shared_intr(dev_info, j) && si != j)
1559                         continue;
1560                 err = rxa_disable_intr(rx_adapter, dev_info, j);
1561                 if (err)
1562                         break;
1563
1564         }
1565
1566         return err;
1567 }
1568
1569
1570 static int
1571 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
1572 {
1573         int ret;
1574         struct rte_service_spec service;
1575         struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1576
1577         if (rx_adapter->service_inited)
1578                 return 0;
1579
1580         memset(&service, 0, sizeof(service));
1581         snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1582                 "rte_event_eth_rx_adapter_%d", id);
1583         service.socket_id = rx_adapter->socket_id;
1584         service.callback = rxa_service_func;
1585         service.callback_userdata = rx_adapter;
1586         /* Service function handles locking for queue add/del updates */
1587         service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1588         ret = rte_service_component_register(&service, &rx_adapter->service_id);
1589         if (ret) {
1590                 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1591                         service.name, ret);
1592                 return ret;
1593         }
1594
1595         ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1596                 &rx_adapter_conf, rx_adapter->conf_arg);
1597         if (ret) {
1598                 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1599                         ret);
1600                 goto err_done;
1601         }
1602         rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1603         rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1604         rx_adapter->service_inited = 1;
1605         rx_adapter->epd = INIT_FD;
1606         return 0;
1607
1608 err_done:
1609         rte_service_component_unregister(rx_adapter->service_id);
1610         return ret;
1611 }
1612
1613 static void
1614 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1615                 struct eth_device_info *dev_info,
1616                 int32_t rx_queue_id,
1617                 uint8_t add)
1618 {
1619         struct eth_rx_queue_info *queue_info;
1620         int enabled;
1621         uint16_t i;
1622
1623         if (dev_info->rx_queue == NULL)
1624                 return;
1625
1626         if (rx_queue_id == -1) {
1627                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1628                         rxa_update_queue(rx_adapter, dev_info, i, add);
1629         } else {
1630                 queue_info = &dev_info->rx_queue[rx_queue_id];
1631                 enabled = queue_info->queue_enabled;
1632                 if (add) {
1633                         rx_adapter->nb_queues += !enabled;
1634                         dev_info->nb_dev_queues += !enabled;
1635                 } else {
1636                         rx_adapter->nb_queues -= enabled;
1637                         dev_info->nb_dev_queues -= enabled;
1638                 }
1639                 queue_info->queue_enabled = !!add;
1640         }
1641 }
1642
1643 static void
1644 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
1645         struct eth_device_info *dev_info,
1646         int32_t rx_queue_id)
1647 {
1648         int pollq;
1649         int intrq;
1650         int sintrq;
1651
1652
1653         if (rx_adapter->nb_queues == 0)
1654                 return;
1655
1656         if (rx_queue_id == -1) {
1657                 uint16_t nb_rx_queues;
1658                 uint16_t i;
1659
1660                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1661                 for (i = 0; i < nb_rx_queues; i++)
1662                         rxa_sw_del(rx_adapter, dev_info, i);
1663                 return;
1664         }
1665
1666         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1667         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1668         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1669         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1670         rx_adapter->num_rx_polled -= pollq;
1671         dev_info->nb_rx_poll -= pollq;
1672         rx_adapter->num_rx_intr -= intrq;
1673         dev_info->nb_rx_intr -= intrq;
1674         dev_info->nb_shared_intr -= intrq && sintrq;
1675 }
1676
1677 static void
1678 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1679         struct eth_device_info *dev_info,
1680         int32_t rx_queue_id,
1681         const struct rte_event_eth_rx_adapter_queue_conf *conf)
1682 {
1683         struct eth_rx_queue_info *queue_info;
1684         const struct rte_event *ev = &conf->ev;
1685         int pollq;
1686         int intrq;
1687         int sintrq;
1688         struct rte_event *qi_ev;
1689
1690         if (rx_queue_id == -1) {
1691                 uint16_t nb_rx_queues;
1692                 uint16_t i;
1693
1694                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1695                 for (i = 0; i < nb_rx_queues; i++)
1696                         rxa_add_queue(rx_adapter, dev_info, i, conf);
1697                 return;
1698         }
1699
1700         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1701         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1702         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1703
1704         queue_info = &dev_info->rx_queue[rx_queue_id];
1705         queue_info->wt = conf->servicing_weight;
1706
1707         qi_ev = (struct rte_event *)&queue_info->event;
1708         qi_ev->event = ev->event;
1709         qi_ev->op = RTE_EVENT_OP_NEW;
1710         qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
1711         qi_ev->sub_event_type = 0;
1712
1713         if (conf->rx_queue_flags &
1714                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
1715                 queue_info->flow_id_mask = ~0;
1716         } else
1717                 qi_ev->flow_id = 0;
1718
1719         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
1720         if (rxa_polled_queue(dev_info, rx_queue_id)) {
1721                 rx_adapter->num_rx_polled += !pollq;
1722                 dev_info->nb_rx_poll += !pollq;
1723                 rx_adapter->num_rx_intr -= intrq;
1724                 dev_info->nb_rx_intr -= intrq;
1725                 dev_info->nb_shared_intr -= intrq && sintrq;
1726         }
1727
1728         if (rxa_intr_queue(dev_info, rx_queue_id)) {
1729                 rx_adapter->num_rx_polled -= pollq;
1730                 dev_info->nb_rx_poll -= pollq;
1731                 rx_adapter->num_rx_intr += !intrq;
1732                 dev_info->nb_rx_intr += !intrq;
1733                 dev_info->nb_shared_intr += !intrq && sintrq;
1734                 if (dev_info->nb_shared_intr == 1) {
1735                         if (dev_info->multi_intr_cap)
1736                                 dev_info->next_q_idx =
1737                                         RTE_MAX_RXTX_INTR_VEC_ID - 1;
1738                         else
1739                                 dev_info->next_q_idx = 0;
1740                 }
1741         }
1742 }
1743
1744 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
1745                 uint16_t eth_dev_id,
1746                 int rx_queue_id,
1747                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
1748 {
1749         struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
1750         struct rte_event_eth_rx_adapter_queue_conf temp_conf;
1751         int ret;
1752         struct eth_rx_poll_entry *rx_poll;
1753         struct eth_rx_queue_info *rx_queue;
1754         uint32_t *rx_wrr;
1755         uint16_t nb_rx_queues;
1756         uint32_t nb_rx_poll, nb_wrr;
1757         uint32_t nb_rx_intr;
1758         int num_intr_vec;
1759         uint16_t wt;
1760
1761         if (queue_conf->servicing_weight == 0) {
1762                 struct rte_eth_dev_data *data = dev_info->dev->data;
1763
1764                 temp_conf = *queue_conf;
1765                 if (!data->dev_conf.intr_conf.rxq) {
1766                         /* If Rx interrupts are disabled set wt = 1 */
1767                         temp_conf.servicing_weight = 1;
1768                 }
1769                 queue_conf = &temp_conf;
1770         }
1771
1772         nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1773         rx_queue = dev_info->rx_queue;
1774         wt = queue_conf->servicing_weight;
1775
1776         if (dev_info->rx_queue == NULL) {
1777                 dev_info->rx_queue =
1778                     rte_zmalloc_socket(rx_adapter->mem_name,
1779                                        nb_rx_queues *
1780                                        sizeof(struct eth_rx_queue_info), 0,
1781                                        rx_adapter->socket_id);
1782                 if (dev_info->rx_queue == NULL)
1783                         return -ENOMEM;
1784         }
1785         rx_wrr = NULL;
1786         rx_poll = NULL;
1787
1788         rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
1789                         queue_conf->servicing_weight,
1790                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
1791
1792         if (dev_info->dev->intr_handle)
1793                 dev_info->multi_intr_cap =
1794                         rte_intr_cap_multiple(dev_info->dev->intr_handle);
1795
1796         ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
1797                                 &rx_poll, &rx_wrr);
1798         if (ret)
1799                 goto err_free_rxqueue;
1800
1801         if (wt == 0) {
1802                 num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
1803
1804                 ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
1805                 if (ret)
1806                         goto err_free_rxqueue;
1807
1808                 ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
1809                 if (ret)
1810                         goto err_free_rxqueue;
1811         } else {
1812
1813                 num_intr_vec = 0;
1814                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
1815                         num_intr_vec = rxa_nb_intr_vect(dev_info,
1816                                                 rx_queue_id, 0);
1817                         /* interrupt based queues are being converted to
1818                          * poll mode queues, delete the interrupt configuration
1819                          * for those.
1820                          */
1821                         ret = rxa_del_intr_queue(rx_adapter,
1822                                                 dev_info, rx_queue_id);
1823                         if (ret)
1824                                 goto err_free_rxqueue;
1825                 }
1826         }
1827
1828         if (nb_rx_intr == 0) {
1829                 ret = rxa_free_intr_resources(rx_adapter);
1830                 if (ret)
1831                         goto err_free_rxqueue;
1832         }
1833
1834         if (wt == 0) {
1835                 uint16_t i;
1836
1837                 if (rx_queue_id  == -1) {
1838                         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1839                                 dev_info->intr_queue[i] = i;
1840                 } else {
1841                         if (!rxa_intr_queue(dev_info, rx_queue_id))
1842                                 dev_info->intr_queue[nb_rx_intr - 1] =
1843                                         rx_queue_id;
1844                 }
1845         }
1846
1847
1848
1849         rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
1850         rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
1851
1852         rte_free(rx_adapter->eth_rx_poll);
1853         rte_free(rx_adapter->wrr_sched);
1854
1855         rx_adapter->eth_rx_poll = rx_poll;
1856         rx_adapter->wrr_sched = rx_wrr;
1857         rx_adapter->wrr_len = nb_wrr;
1858         rx_adapter->num_intr_vec += num_intr_vec;
1859         return 0;
1860
1861 err_free_rxqueue:
1862         if (rx_queue == NULL) {
1863                 rte_free(dev_info->rx_queue);
1864                 dev_info->rx_queue = NULL;
1865         }
1866
1867         rte_free(rx_poll);
1868         rte_free(rx_wrr);
1869
1870         return 0;
1871 }
1872
1873 static int
1874 rxa_ctrl(uint8_t id, int start)
1875 {
1876         struct rte_event_eth_rx_adapter *rx_adapter;
1877         struct rte_eventdev *dev;
1878         struct eth_device_info *dev_info;
1879         uint32_t i;
1880         int use_service = 0;
1881         int stop = !start;
1882
1883         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1884         rx_adapter = rxa_id_to_adapter(id);
1885         if (rx_adapter == NULL)
1886                 return -EINVAL;
1887
1888         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1889
1890         RTE_ETH_FOREACH_DEV(i) {
1891                 dev_info = &rx_adapter->eth_devices[i];
1892                 /* if start  check for num dev queues */
1893                 if (start && !dev_info->nb_dev_queues)
1894                         continue;
1895                 /* if stop check if dev has been started */
1896                 if (stop && !dev_info->dev_rx_started)
1897                         continue;
1898                 use_service |= !dev_info->internal_event_port;
1899                 dev_info->dev_rx_started = start;
1900                 if (dev_info->internal_event_port == 0)
1901                         continue;
1902                 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
1903                                                 &rte_eth_devices[i]) :
1904                         (*dev->dev_ops->eth_rx_adapter_stop)(dev,
1905                                                 &rte_eth_devices[i]);
1906         }
1907
1908         if (use_service) {
1909                 rte_spinlock_lock(&rx_adapter->rx_lock);
1910                 rx_adapter->rxa_started = start;
1911                 rte_service_runstate_set(rx_adapter->service_id, start);
1912                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1913         }
1914
1915         return 0;
1916 }
1917
1918 int
1919 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
1920                                 rte_event_eth_rx_adapter_conf_cb conf_cb,
1921                                 void *conf_arg)
1922 {
1923         struct rte_event_eth_rx_adapter *rx_adapter;
1924         int ret;
1925         int socket_id;
1926         uint16_t i;
1927         char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
1928         const uint8_t default_rss_key[] = {
1929                 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
1930                 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
1931                 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
1932                 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
1933                 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
1934         };
1935
1936         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1937         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1938         if (conf_cb == NULL)
1939                 return -EINVAL;
1940
1941         if (event_eth_rx_adapter == NULL) {
1942                 ret = rte_event_eth_rx_adapter_init();
1943                 if (ret)
1944                         return ret;
1945         }
1946
1947         rx_adapter = rxa_id_to_adapter(id);
1948         if (rx_adapter != NULL) {
1949                 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
1950                 return -EEXIST;
1951         }
1952
1953         socket_id = rte_event_dev_socket_id(dev_id);
1954         snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
1955                 "rte_event_eth_rx_adapter_%d",
1956                 id);
1957
1958         rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
1959                         RTE_CACHE_LINE_SIZE, socket_id);
1960         if (rx_adapter == NULL) {
1961                 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
1962                 return -ENOMEM;
1963         }
1964
1965         rx_adapter->eventdev_id = dev_id;
1966         rx_adapter->socket_id = socket_id;
1967         rx_adapter->conf_cb = conf_cb;
1968         rx_adapter->conf_arg = conf_arg;
1969         rx_adapter->id = id;
1970         strcpy(rx_adapter->mem_name, mem_name);
1971         rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
1972                                         RTE_MAX_ETHPORTS *
1973                                         sizeof(struct eth_device_info), 0,
1974                                         socket_id);
1975         rte_convert_rss_key((const uint32_t *)default_rss_key,
1976                         (uint32_t *)rx_adapter->rss_key_be,
1977                             RTE_DIM(default_rss_key));
1978
1979         if (rx_adapter->eth_devices == NULL) {
1980                 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
1981                 rte_free(rx_adapter);
1982                 return -ENOMEM;
1983         }
1984         rte_spinlock_init(&rx_adapter->rx_lock);
1985         for (i = 0; i < RTE_MAX_ETHPORTS; i++)
1986                 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
1987
1988         event_eth_rx_adapter[id] = rx_adapter;
1989         if (conf_cb == rxa_default_conf_cb)
1990                 rx_adapter->default_cb_arg = 1;
1991         rte_eventdev_trace_eth_rx_adapter_create(id, dev_id, conf_cb,
1992                 conf_arg);
1993         return 0;
1994 }
1995
1996 int
1997 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
1998                 struct rte_event_port_conf *port_config)
1999 {
2000         struct rte_event_port_conf *pc;
2001         int ret;
2002
2003         if (port_config == NULL)
2004                 return -EINVAL;
2005         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2006
2007         pc = rte_malloc(NULL, sizeof(*pc), 0);
2008         if (pc == NULL)
2009                 return -ENOMEM;
2010         *pc = *port_config;
2011         ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2012                                         rxa_default_conf_cb,
2013                                         pc);
2014         if (ret)
2015                 rte_free(pc);
2016         return ret;
2017 }
2018
2019 int
2020 rte_event_eth_rx_adapter_free(uint8_t id)
2021 {
2022         struct rte_event_eth_rx_adapter *rx_adapter;
2023
2024         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2025
2026         rx_adapter = rxa_id_to_adapter(id);
2027         if (rx_adapter == NULL)
2028                 return -EINVAL;
2029
2030         if (rx_adapter->nb_queues) {
2031                 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2032                                 rx_adapter->nb_queues);
2033                 return -EBUSY;
2034         }
2035
2036         if (rx_adapter->default_cb_arg)
2037                 rte_free(rx_adapter->conf_arg);
2038         rte_free(rx_adapter->eth_devices);
2039         rte_free(rx_adapter);
2040         event_eth_rx_adapter[id] = NULL;
2041
2042         rte_eventdev_trace_eth_rx_adapter_free(id);
2043         return 0;
2044 }
2045
2046 int
2047 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2048                 uint16_t eth_dev_id,
2049                 int32_t rx_queue_id,
2050                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2051 {
2052         int ret;
2053         uint32_t cap;
2054         struct rte_event_eth_rx_adapter *rx_adapter;
2055         struct rte_eventdev *dev;
2056         struct eth_device_info *dev_info;
2057
2058         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2059         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2060
2061         rx_adapter = rxa_id_to_adapter(id);
2062         if ((rx_adapter == NULL) || (queue_conf == NULL))
2063                 return -EINVAL;
2064
2065         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2066         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2067                                                 eth_dev_id,
2068                                                 &cap);
2069         if (ret) {
2070                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2071                         "eth port %" PRIu16, id, eth_dev_id);
2072                 return ret;
2073         }
2074
2075         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2076                 && (queue_conf->rx_queue_flags &
2077                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2078                 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2079                                 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2080                                 eth_dev_id, id);
2081                 return -EINVAL;
2082         }
2083
2084         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2085                 (rx_queue_id != -1)) {
2086                 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2087                         "event queue, eth port: %" PRIu16 " adapter id: %"
2088                         PRIu8, eth_dev_id, id);
2089                 return -EINVAL;
2090         }
2091
2092         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2093                         rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2094                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2095                          (uint16_t)rx_queue_id);
2096                 return -EINVAL;
2097         }
2098
2099         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2100
2101         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2102                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2103                                         -ENOTSUP);
2104                 if (dev_info->rx_queue == NULL) {
2105                         dev_info->rx_queue =
2106                             rte_zmalloc_socket(rx_adapter->mem_name,
2107                                         dev_info->dev->data->nb_rx_queues *
2108                                         sizeof(struct eth_rx_queue_info), 0,
2109                                         rx_adapter->socket_id);
2110                         if (dev_info->rx_queue == NULL)
2111                                 return -ENOMEM;
2112                 }
2113
2114                 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2115                                 &rte_eth_devices[eth_dev_id],
2116                                 rx_queue_id, queue_conf);
2117                 if (ret == 0) {
2118                         dev_info->internal_event_port = 1;
2119                         rxa_update_queue(rx_adapter,
2120                                         &rx_adapter->eth_devices[eth_dev_id],
2121                                         rx_queue_id,
2122                                         1);
2123                 }
2124         } else {
2125                 rte_spinlock_lock(&rx_adapter->rx_lock);
2126                 dev_info->internal_event_port = 0;
2127                 ret = rxa_init_service(rx_adapter, id);
2128                 if (ret == 0) {
2129                         uint32_t service_id = rx_adapter->service_id;
2130                         ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2131                                         queue_conf);
2132                         rte_service_component_runstate_set(service_id,
2133                                 rxa_sw_adapter_queue_count(rx_adapter));
2134                 }
2135                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2136         }
2137
2138         rte_eventdev_trace_eth_rx_adapter_queue_add(id, eth_dev_id,
2139                 rx_queue_id, queue_conf, ret);
2140         if (ret)
2141                 return ret;
2142
2143         return 0;
2144 }
2145
2146 int
2147 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2148                                 int32_t rx_queue_id)
2149 {
2150         int ret = 0;
2151         struct rte_eventdev *dev;
2152         struct rte_event_eth_rx_adapter *rx_adapter;
2153         struct eth_device_info *dev_info;
2154         uint32_t cap;
2155         uint32_t nb_rx_poll = 0;
2156         uint32_t nb_wrr = 0;
2157         uint32_t nb_rx_intr;
2158         struct eth_rx_poll_entry *rx_poll = NULL;
2159         uint32_t *rx_wrr = NULL;
2160         int num_intr_vec;
2161
2162         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2163         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2164
2165         rx_adapter = rxa_id_to_adapter(id);
2166         if (rx_adapter == NULL)
2167                 return -EINVAL;
2168
2169         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2170         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2171                                                 eth_dev_id,
2172                                                 &cap);
2173         if (ret)
2174                 return ret;
2175
2176         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2177                 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2178                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2179                          (uint16_t)rx_queue_id);
2180                 return -EINVAL;
2181         }
2182
2183         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2184
2185         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2186                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2187                                  -ENOTSUP);
2188                 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2189                                                 &rte_eth_devices[eth_dev_id],
2190                                                 rx_queue_id);
2191                 if (ret == 0) {
2192                         rxa_update_queue(rx_adapter,
2193                                         &rx_adapter->eth_devices[eth_dev_id],
2194                                         rx_queue_id,
2195                                         0);
2196                         if (dev_info->nb_dev_queues == 0) {
2197                                 rte_free(dev_info->rx_queue);
2198                                 dev_info->rx_queue = NULL;
2199                         }
2200                 }
2201         } else {
2202                 rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2203                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2204
2205                 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2206                         &rx_poll, &rx_wrr);
2207                 if (ret)
2208                         return ret;
2209
2210                 rte_spinlock_lock(&rx_adapter->rx_lock);
2211
2212                 num_intr_vec = 0;
2213                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2214
2215                         num_intr_vec = rxa_nb_intr_vect(dev_info,
2216                                                 rx_queue_id, 0);
2217                         ret = rxa_del_intr_queue(rx_adapter, dev_info,
2218                                         rx_queue_id);
2219                         if (ret)
2220                                 goto unlock_ret;
2221                 }
2222
2223                 if (nb_rx_intr == 0) {
2224                         ret = rxa_free_intr_resources(rx_adapter);
2225                         if (ret)
2226                                 goto unlock_ret;
2227                 }
2228
2229                 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2230                 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2231
2232                 rte_free(rx_adapter->eth_rx_poll);
2233                 rte_free(rx_adapter->wrr_sched);
2234
2235                 if (nb_rx_intr == 0) {
2236                         rte_free(dev_info->intr_queue);
2237                         dev_info->intr_queue = NULL;
2238                 }
2239
2240                 rx_adapter->eth_rx_poll = rx_poll;
2241                 rx_adapter->wrr_sched = rx_wrr;
2242                 rx_adapter->wrr_len = nb_wrr;
2243                 rx_adapter->num_intr_vec += num_intr_vec;
2244
2245                 if (dev_info->nb_dev_queues == 0) {
2246                         rte_free(dev_info->rx_queue);
2247                         dev_info->rx_queue = NULL;
2248                 }
2249 unlock_ret:
2250                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2251                 if (ret) {
2252                         rte_free(rx_poll);
2253                         rte_free(rx_wrr);
2254                         return ret;
2255                 }
2256
2257                 rte_service_component_runstate_set(rx_adapter->service_id,
2258                                 rxa_sw_adapter_queue_count(rx_adapter));
2259         }
2260
2261         rte_eventdev_trace_eth_rx_adapter_queue_del(id, eth_dev_id,
2262                 rx_queue_id, ret);
2263         return ret;
2264 }
2265
2266 int
2267 rte_event_eth_rx_adapter_start(uint8_t id)
2268 {
2269         rte_eventdev_trace_eth_rx_adapter_start(id);
2270         return rxa_ctrl(id, 1);
2271 }
2272
2273 int
2274 rte_event_eth_rx_adapter_stop(uint8_t id)
2275 {
2276         rte_eventdev_trace_eth_rx_adapter_stop(id);
2277         return rxa_ctrl(id, 0);
2278 }
2279
2280 int
2281 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2282                                struct rte_event_eth_rx_adapter_stats *stats)
2283 {
2284         struct rte_event_eth_rx_adapter *rx_adapter;
2285         struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2286         struct rte_event_eth_rx_adapter_stats dev_stats;
2287         struct rte_eventdev *dev;
2288         struct eth_device_info *dev_info;
2289         uint32_t i;
2290         int ret;
2291
2292         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2293
2294         rx_adapter = rxa_id_to_adapter(id);
2295         if (rx_adapter  == NULL || stats == NULL)
2296                 return -EINVAL;
2297
2298         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2299         memset(stats, 0, sizeof(*stats));
2300         RTE_ETH_FOREACH_DEV(i) {
2301                 dev_info = &rx_adapter->eth_devices[i];
2302                 if (dev_info->internal_event_port == 0 ||
2303                         dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2304                         continue;
2305                 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2306                                                 &rte_eth_devices[i],
2307                                                 &dev_stats);
2308                 if (ret)
2309                         continue;
2310                 dev_stats_sum.rx_packets += dev_stats.rx_packets;
2311                 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2312         }
2313
2314         if (rx_adapter->service_inited)
2315                 *stats = rx_adapter->stats;
2316
2317         stats->rx_packets += dev_stats_sum.rx_packets;
2318         stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2319         return 0;
2320 }
2321
2322 int
2323 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
2324 {
2325         struct rte_event_eth_rx_adapter *rx_adapter;
2326         struct rte_eventdev *dev;
2327         struct eth_device_info *dev_info;
2328         uint32_t i;
2329
2330         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2331
2332         rx_adapter = rxa_id_to_adapter(id);
2333         if (rx_adapter == NULL)
2334                 return -EINVAL;
2335
2336         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2337         RTE_ETH_FOREACH_DEV(i) {
2338                 dev_info = &rx_adapter->eth_devices[i];
2339                 if (dev_info->internal_event_port == 0 ||
2340                         dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
2341                         continue;
2342                 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
2343                                                         &rte_eth_devices[i]);
2344         }
2345
2346         memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
2347         return 0;
2348 }
2349
2350 int
2351 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
2352 {
2353         struct rte_event_eth_rx_adapter *rx_adapter;
2354
2355         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2356
2357         rx_adapter = rxa_id_to_adapter(id);
2358         if (rx_adapter == NULL || service_id == NULL)
2359                 return -EINVAL;
2360
2361         if (rx_adapter->service_inited)
2362                 *service_id = rx_adapter->service_id;
2363
2364         return rx_adapter->service_inited ? 0 : -ESRCH;
2365 }
2366
2367 int
2368 rte_event_eth_rx_adapter_cb_register(uint8_t id,
2369                                         uint16_t eth_dev_id,
2370                                         rte_event_eth_rx_adapter_cb_fn cb_fn,
2371                                         void *cb_arg)
2372 {
2373         struct rte_event_eth_rx_adapter *rx_adapter;
2374         struct eth_device_info *dev_info;
2375         uint32_t cap;
2376         int ret;
2377
2378         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2379         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2380
2381         rx_adapter = rxa_id_to_adapter(id);
2382         if (rx_adapter == NULL)
2383                 return -EINVAL;
2384
2385         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2386         if (dev_info->rx_queue == NULL)
2387                 return -EINVAL;
2388
2389         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2390                                                 eth_dev_id,
2391                                                 &cap);
2392         if (ret) {
2393                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2394                         "eth port %" PRIu16, id, eth_dev_id);
2395                 return ret;
2396         }
2397
2398         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2399                 RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
2400                                 PRIu16, eth_dev_id);
2401                 return -EINVAL;
2402         }
2403
2404         rte_spinlock_lock(&rx_adapter->rx_lock);
2405         dev_info->cb_fn = cb_fn;
2406         dev_info->cb_arg = cb_arg;
2407         rte_spinlock_unlock(&rx_adapter->rx_lock);
2408
2409         return 0;
2410 }