eventdev: remove event copy in Rx adapter
[dpdk.git] / lib / librte_eventdev / rte_event_eth_rx_adapter.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #if defined(LINUX)
6 #include <sys/epoll.h>
7 #endif
8 #include <unistd.h>
9
10 #include <rte_cycles.h>
11 #include <rte_common.h>
12 #include <rte_dev.h>
13 #include <rte_errno.h>
14 #include <rte_ethdev.h>
15 #include <rte_log.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
20
21 #include "rte_eventdev.h"
22 #include "rte_eventdev_pmd.h"
23 #include "rte_event_eth_rx_adapter.h"
24
25 #define BATCH_SIZE              32
26 #define BLOCK_CNT_THRESHOLD     10
27 #define ETH_EVENT_BUFFER_SIZE   (4*BATCH_SIZE)
28
29 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
30 #define ETH_RX_ADAPTER_MEM_NAME_LEN     32
31
32 #define RSS_KEY_SIZE    40
33 /* value written to intr thread pipe to signal thread exit */
34 #define ETH_BRIDGE_INTR_THREAD_EXIT     1
35 /* Sentinel value to detect initialized file handle */
36 #define INIT_FD         -1
37
38 /*
39  * Used to store port and queue ID of interrupting Rx queue
40  */
41 union queue_data {
42         RTE_STD_C11
43         void *ptr;
44         struct {
45                 uint16_t port;
46                 uint16_t queue;
47         };
48 };
49
50 /*
51  * There is an instance of this struct per polled Rx queue added to the
52  * adapter
53  */
54 struct eth_rx_poll_entry {
55         /* Eth port to poll */
56         uint16_t eth_dev_id;
57         /* Eth rx queue to poll */
58         uint16_t eth_rx_qid;
59 };
60
61 /* Instance per adapter */
62 struct rte_eth_event_enqueue_buffer {
63         /* Count of events in this buffer */
64         uint16_t count;
65         /* Array of events in this buffer */
66         struct rte_event events[ETH_EVENT_BUFFER_SIZE];
67 };
68
69 struct rte_event_eth_rx_adapter {
70         /* RSS key */
71         uint8_t rss_key_be[RSS_KEY_SIZE];
72         /* Event device identifier */
73         uint8_t eventdev_id;
74         /* Per ethernet device structure */
75         struct eth_device_info *eth_devices;
76         /* Event port identifier */
77         uint8_t event_port_id;
78         /* Lock to serialize config updates with service function */
79         rte_spinlock_t rx_lock;
80         /* Max mbufs processed in any service function invocation */
81         uint32_t max_nb_rx;
82         /* Receive queues that need to be polled */
83         struct eth_rx_poll_entry *eth_rx_poll;
84         /* Size of the eth_rx_poll array */
85         uint16_t num_rx_polled;
86         /* Weighted round robin schedule */
87         uint32_t *wrr_sched;
88         /* wrr_sched[] size */
89         uint32_t wrr_len;
90         /* Next entry in wrr[] to begin polling */
91         uint32_t wrr_pos;
92         /* Event burst buffer */
93         struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
94         /* Per adapter stats */
95         struct rte_event_eth_rx_adapter_stats stats;
96         /* Block count, counts up to BLOCK_CNT_THRESHOLD */
97         uint16_t enq_block_count;
98         /* Block start ts */
99         uint64_t rx_enq_block_start_ts;
100         /* epoll fd used to wait for Rx interrupts */
101         int epd;
102         /* Num of interrupt driven interrupt queues */
103         uint32_t num_rx_intr;
104         /* Used to send <dev id, queue id> of interrupting Rx queues from
105          * the interrupt thread to the Rx thread
106          */
107         struct rte_ring *intr_ring;
108         /* Rx Queue data (dev id, queue id) for the last non-empty
109          * queue polled
110          */
111         union queue_data qd;
112         /* queue_data is valid */
113         int qd_valid;
114         /* Interrupt ring lock, synchronizes Rx thread
115          * and interrupt thread
116          */
117         rte_spinlock_t intr_ring_lock;
118         /* event array passed to rte_poll_wait */
119         struct rte_epoll_event *epoll_events;
120         /* Count of interrupt vectors in use */
121         uint32_t num_intr_vec;
122         /* Thread blocked on Rx interrupts */
123         pthread_t rx_intr_thread;
124         /* Configuration callback for rte_service configuration */
125         rte_event_eth_rx_adapter_conf_cb conf_cb;
126         /* Configuration callback argument */
127         void *conf_arg;
128         /* Set if  default_cb is being used */
129         int default_cb_arg;
130         /* Service initialization state */
131         uint8_t service_inited;
132         /* Total count of Rx queues in adapter */
133         uint32_t nb_queues;
134         /* Memory allocation name */
135         char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
136         /* Socket identifier cached from eventdev */
137         int socket_id;
138         /* Per adapter EAL service */
139         uint32_t service_id;
140         /* Adapter started flag */
141         uint8_t rxa_started;
142         /* Adapter ID */
143         uint8_t id;
144 } __rte_cache_aligned;
145
146 /* Per eth device */
147 struct eth_device_info {
148         struct rte_eth_dev *dev;
149         struct eth_rx_queue_info *rx_queue;
150         /* Rx callback */
151         rte_event_eth_rx_adapter_cb_fn cb_fn;
152         /* Rx callback argument */
153         void *cb_arg;
154         /* Set if ethdev->eventdev packet transfer uses a
155          * hardware mechanism
156          */
157         uint8_t internal_event_port;
158         /* Set if the adapter is processing rx queues for
159          * this eth device and packet processing has been
160          * started, allows for the code to know if the PMD
161          * rx_adapter_stop callback needs to be invoked
162          */
163         uint8_t dev_rx_started;
164         /* Number of queues added for this device */
165         uint16_t nb_dev_queues;
166         /* Number of poll based queues
167          * If nb_rx_poll > 0, the start callback will
168          * be invoked if not already invoked
169          */
170         uint16_t nb_rx_poll;
171         /* Number of interrupt based queues
172          * If nb_rx_intr > 0, the start callback will
173          * be invoked if not already invoked.
174          */
175         uint16_t nb_rx_intr;
176         /* Number of queues that use the shared interrupt */
177         uint16_t nb_shared_intr;
178         /* sum(wrr(q)) for all queues within the device
179          * useful when deleting all device queues
180          */
181         uint32_t wrr_len;
182         /* Intr based queue index to start polling from, this is used
183          * if the number of shared interrupts is non-zero
184          */
185         uint16_t next_q_idx;
186         /* Intr based queue indices */
187         uint16_t *intr_queue;
188         /* device generates per Rx queue interrupt for queue index
189          * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
190          */
191         int multi_intr_cap;
192         /* shared interrupt enabled */
193         int shared_intr_enabled;
194 };
195
196 /* Per Rx queue */
197 struct eth_rx_queue_info {
198         int queue_enabled;      /* True if added */
199         int intr_enabled;
200         uint16_t wt;            /* Polling weight */
201         uint8_t event_queue_id; /* Event queue to enqueue packets to */
202         uint8_t sched_type;     /* Sched type for events */
203         uint8_t priority;       /* Event priority */
204         uint32_t flow_id;       /* App provided flow identifier */
205         uint32_t flow_id_mask;  /* Set to ~0 if app provides flow id else 0 */
206 };
207
208 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
209
210 static inline int
211 rxa_validate_id(uint8_t id)
212 {
213         return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
214 }
215
216 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
217         if (!rxa_validate_id(id)) { \
218                 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
219                 return retval; \
220         } \
221 } while (0)
222
223 static inline int
224 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
225 {
226         return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
227 }
228
229 /* Greatest common divisor */
230 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
231 {
232         uint16_t r = a % b;
233
234         return r ? rxa_gcd_u16(b, r) : b;
235 }
236
237 /* Returns the next queue in the polling sequence
238  *
239  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
240  */
241 static int
242 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
243          unsigned int n, int *cw,
244          struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
245          uint16_t gcd, int prev)
246 {
247         int i = prev;
248         uint16_t w;
249
250         while (1) {
251                 uint16_t q;
252                 uint16_t d;
253
254                 i = (i + 1) % n;
255                 if (i == 0) {
256                         *cw = *cw - gcd;
257                         if (*cw <= 0)
258                                 *cw = max_wt;
259                 }
260
261                 q = eth_rx_poll[i].eth_rx_qid;
262                 d = eth_rx_poll[i].eth_dev_id;
263                 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
264
265                 if ((int)w >= *cw)
266                         return i;
267         }
268 }
269
270 static inline int
271 rxa_shared_intr(struct eth_device_info *dev_info,
272         int rx_queue_id)
273 {
274         int multi_intr_cap;
275
276         if (dev_info->dev->intr_handle == NULL)
277                 return 0;
278
279         multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
280         return !multi_intr_cap ||
281                 rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
282 }
283
284 static inline int
285 rxa_intr_queue(struct eth_device_info *dev_info,
286         int rx_queue_id)
287 {
288         struct eth_rx_queue_info *queue_info;
289
290         queue_info = &dev_info->rx_queue[rx_queue_id];
291         return dev_info->rx_queue &&
292                 !dev_info->internal_event_port &&
293                 queue_info->queue_enabled && queue_info->wt == 0;
294 }
295
296 static inline int
297 rxa_polled_queue(struct eth_device_info *dev_info,
298         int rx_queue_id)
299 {
300         struct eth_rx_queue_info *queue_info;
301
302         queue_info = &dev_info->rx_queue[rx_queue_id];
303         return !dev_info->internal_event_port &&
304                 dev_info->rx_queue &&
305                 queue_info->queue_enabled && queue_info->wt != 0;
306 }
307
308 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
309 static int
310 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
311 {
312         uint16_t i;
313         int n, s;
314         uint16_t nbq;
315
316         nbq = dev_info->dev->data->nb_rx_queues;
317         n = 0; /* non shared count */
318         s = 0; /* shared count */
319
320         if (rx_queue_id == -1) {
321                 for (i = 0; i < nbq; i++) {
322                         if (!rxa_shared_intr(dev_info, i))
323                                 n += add ? !rxa_intr_queue(dev_info, i) :
324                                         rxa_intr_queue(dev_info, i);
325                         else
326                                 s += add ? !rxa_intr_queue(dev_info, i) :
327                                         rxa_intr_queue(dev_info, i);
328                 }
329
330                 if (s > 0) {
331                         if ((add && dev_info->nb_shared_intr == 0) ||
332                                 (!add && dev_info->nb_shared_intr))
333                                 n += 1;
334                 }
335         } else {
336                 if (!rxa_shared_intr(dev_info, rx_queue_id))
337                         n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
338                                 rxa_intr_queue(dev_info, rx_queue_id);
339                 else
340                         n = add ? !dev_info->nb_shared_intr :
341                                 dev_info->nb_shared_intr == 1;
342         }
343
344         return add ? n : -n;
345 }
346
347 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
348  */
349 static void
350 rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
351                         struct eth_device_info *dev_info,
352                         int rx_queue_id,
353                         uint32_t *nb_rx_intr)
354 {
355         uint32_t intr_diff;
356
357         if (rx_queue_id == -1)
358                 intr_diff = dev_info->nb_rx_intr;
359         else
360                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
361
362         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
363 }
364
365 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
366  * interrupt queues could currently be poll mode Rx queues
367  */
368 static void
369 rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
370                         struct eth_device_info *dev_info,
371                         int rx_queue_id,
372                         uint32_t *nb_rx_poll,
373                         uint32_t *nb_rx_intr,
374                         uint32_t *nb_wrr)
375 {
376         uint32_t intr_diff;
377         uint32_t poll_diff;
378         uint32_t wrr_len_diff;
379
380         if (rx_queue_id == -1) {
381                 intr_diff = dev_info->dev->data->nb_rx_queues -
382                                                 dev_info->nb_rx_intr;
383                 poll_diff = dev_info->nb_rx_poll;
384                 wrr_len_diff = dev_info->wrr_len;
385         } else {
386                 intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
387                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
388                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
389                                         0;
390         }
391
392         *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
393         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
394         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
395 }
396
397 /* Calculate size of the eth_rx_poll and wrr_sched arrays
398  * after deleting poll mode rx queues
399  */
400 static void
401 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
402                         struct eth_device_info *dev_info,
403                         int rx_queue_id,
404                         uint32_t *nb_rx_poll,
405                         uint32_t *nb_wrr)
406 {
407         uint32_t poll_diff;
408         uint32_t wrr_len_diff;
409
410         if (rx_queue_id == -1) {
411                 poll_diff = dev_info->nb_rx_poll;
412                 wrr_len_diff = dev_info->wrr_len;
413         } else {
414                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
415                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
416                                         0;
417         }
418
419         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
420         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
421 }
422
423 /* Calculate nb_rx_* after adding poll mode rx queues
424  */
425 static void
426 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
427                         struct eth_device_info *dev_info,
428                         int rx_queue_id,
429                         uint16_t wt,
430                         uint32_t *nb_rx_poll,
431                         uint32_t *nb_rx_intr,
432                         uint32_t *nb_wrr)
433 {
434         uint32_t intr_diff;
435         uint32_t poll_diff;
436         uint32_t wrr_len_diff;
437
438         if (rx_queue_id == -1) {
439                 intr_diff = dev_info->nb_rx_intr;
440                 poll_diff = dev_info->dev->data->nb_rx_queues -
441                                                 dev_info->nb_rx_poll;
442                 wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
443                                 - dev_info->wrr_len;
444         } else {
445                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
446                 poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
447                 wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
448                                 wt - dev_info->rx_queue[rx_queue_id].wt :
449                                 wt;
450         }
451
452         *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
453         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
454         *nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
455 }
456
457 /* Calculate nb_rx_* after adding rx_queue_id */
458 static void
459 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
460                 struct eth_device_info *dev_info,
461                 int rx_queue_id,
462                 uint16_t wt,
463                 uint32_t *nb_rx_poll,
464                 uint32_t *nb_rx_intr,
465                 uint32_t *nb_wrr)
466 {
467         if (wt != 0)
468                 rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
469                                         wt, nb_rx_poll, nb_rx_intr, nb_wrr);
470         else
471                 rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
472                                         nb_rx_poll, nb_rx_intr, nb_wrr);
473 }
474
475 /* Calculate nb_rx_* after deleting rx_queue_id */
476 static void
477 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
478                 struct eth_device_info *dev_info,
479                 int rx_queue_id,
480                 uint32_t *nb_rx_poll,
481                 uint32_t *nb_rx_intr,
482                 uint32_t *nb_wrr)
483 {
484         rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
485                                 nb_wrr);
486         rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
487                                 nb_rx_intr);
488 }
489
490 /*
491  * Allocate the rx_poll array
492  */
493 static struct eth_rx_poll_entry *
494 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
495         uint32_t num_rx_polled)
496 {
497         size_t len;
498
499         len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
500                                                         RTE_CACHE_LINE_SIZE);
501         return  rte_zmalloc_socket(rx_adapter->mem_name,
502                                 len,
503                                 RTE_CACHE_LINE_SIZE,
504                                 rx_adapter->socket_id);
505 }
506
507 /*
508  * Allocate the WRR array
509  */
510 static uint32_t *
511 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
512 {
513         size_t len;
514
515         len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
516                         RTE_CACHE_LINE_SIZE);
517         return  rte_zmalloc_socket(rx_adapter->mem_name,
518                                 len,
519                                 RTE_CACHE_LINE_SIZE,
520                                 rx_adapter->socket_id);
521 }
522
523 static int
524 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
525                 uint32_t nb_poll,
526                 uint32_t nb_wrr,
527                 struct eth_rx_poll_entry **rx_poll,
528                 uint32_t **wrr_sched)
529 {
530
531         if (nb_poll == 0) {
532                 *rx_poll = NULL;
533                 *wrr_sched = NULL;
534                 return 0;
535         }
536
537         *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
538         if (*rx_poll == NULL) {
539                 *wrr_sched = NULL;
540                 return -ENOMEM;
541         }
542
543         *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
544         if (*wrr_sched == NULL) {
545                 rte_free(*rx_poll);
546                 return -ENOMEM;
547         }
548         return 0;
549 }
550
551 /* Precalculate WRR polling sequence for all queues in rx_adapter */
552 static void
553 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
554                 struct eth_rx_poll_entry *rx_poll,
555                 uint32_t *rx_wrr)
556 {
557         uint16_t d;
558         uint16_t q;
559         unsigned int i;
560         int prev = -1;
561         int cw = -1;
562
563         /* Initialize variables for calculation of wrr schedule */
564         uint16_t max_wrr_pos = 0;
565         unsigned int poll_q = 0;
566         uint16_t max_wt = 0;
567         uint16_t gcd = 0;
568
569         if (rx_poll == NULL)
570                 return;
571
572         /* Generate array of all queues to poll, the size of this
573          * array is poll_q
574          */
575         RTE_ETH_FOREACH_DEV(d) {
576                 uint16_t nb_rx_queues;
577                 struct eth_device_info *dev_info =
578                                 &rx_adapter->eth_devices[d];
579                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
580                 if (dev_info->rx_queue == NULL)
581                         continue;
582                 if (dev_info->internal_event_port)
583                         continue;
584                 dev_info->wrr_len = 0;
585                 for (q = 0; q < nb_rx_queues; q++) {
586                         struct eth_rx_queue_info *queue_info =
587                                 &dev_info->rx_queue[q];
588                         uint16_t wt;
589
590                         if (!rxa_polled_queue(dev_info, q))
591                                 continue;
592                         wt = queue_info->wt;
593                         rx_poll[poll_q].eth_dev_id = d;
594                         rx_poll[poll_q].eth_rx_qid = q;
595                         max_wrr_pos += wt;
596                         dev_info->wrr_len += wt;
597                         max_wt = RTE_MAX(max_wt, wt);
598                         gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
599                         poll_q++;
600                 }
601         }
602
603         /* Generate polling sequence based on weights */
604         prev = -1;
605         cw = -1;
606         for (i = 0; i < max_wrr_pos; i++) {
607                 rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
608                                      rx_poll, max_wt, gcd, prev);
609                 prev = rx_wrr[i];
610         }
611 }
612
613 static inline void
614 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
615         struct rte_ipv6_hdr **ipv6_hdr)
616 {
617         struct rte_ether_hdr *eth_hdr =
618                 rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
619         struct rte_vlan_hdr *vlan_hdr;
620
621         *ipv4_hdr = NULL;
622         *ipv6_hdr = NULL;
623
624         switch (eth_hdr->ether_type) {
625         case RTE_BE16(RTE_ETHER_TYPE_IPV4):
626                 *ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
627                 break;
628
629         case RTE_BE16(RTE_ETHER_TYPE_IPV6):
630                 *ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
631                 break;
632
633         case RTE_BE16(RTE_ETHER_TYPE_VLAN):
634                 vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
635                 switch (vlan_hdr->eth_proto) {
636                 case RTE_BE16(RTE_ETHER_TYPE_IPV4):
637                         *ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
638                         break;
639                 case RTE_BE16(RTE_ETHER_TYPE_IPV6):
640                         *ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
641                         break;
642                 default:
643                         break;
644                 }
645                 break;
646
647         default:
648                 break;
649         }
650 }
651
652 /* Calculate RSS hash for IPv4/6 */
653 static inline uint32_t
654 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
655 {
656         uint32_t input_len;
657         void *tuple;
658         struct rte_ipv4_tuple ipv4_tuple;
659         struct rte_ipv6_tuple ipv6_tuple;
660         struct rte_ipv4_hdr *ipv4_hdr;
661         struct rte_ipv6_hdr *ipv6_hdr;
662
663         rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
664
665         if (ipv4_hdr) {
666                 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
667                 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
668                 tuple = &ipv4_tuple;
669                 input_len = RTE_THASH_V4_L3_LEN;
670         } else if (ipv6_hdr) {
671                 rte_thash_load_v6_addrs(ipv6_hdr,
672                                         (union rte_thash_tuple *)&ipv6_tuple);
673                 tuple = &ipv6_tuple;
674                 input_len = RTE_THASH_V6_L3_LEN;
675         } else
676                 return 0;
677
678         return rte_softrss_be(tuple, input_len, rss_key_be);
679 }
680
681 static inline int
682 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
683 {
684         return !!rx_adapter->enq_block_count;
685 }
686
687 static inline void
688 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
689 {
690         if (rx_adapter->rx_enq_block_start_ts)
691                 return;
692
693         rx_adapter->enq_block_count++;
694         if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
695                 return;
696
697         rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
698 }
699
700 static inline void
701 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
702                     struct rte_event_eth_rx_adapter_stats *stats)
703 {
704         if (unlikely(!stats->rx_enq_start_ts))
705                 stats->rx_enq_start_ts = rte_get_tsc_cycles();
706
707         if (likely(!rxa_enq_blocked(rx_adapter)))
708                 return;
709
710         rx_adapter->enq_block_count = 0;
711         if (rx_adapter->rx_enq_block_start_ts) {
712                 stats->rx_enq_end_ts = rte_get_tsc_cycles();
713                 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
714                     rx_adapter->rx_enq_block_start_ts;
715                 rx_adapter->rx_enq_block_start_ts = 0;
716         }
717 }
718
719 /* Enqueue buffered events to event device */
720 static inline uint16_t
721 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
722 {
723         struct rte_eth_event_enqueue_buffer *buf =
724             &rx_adapter->event_enqueue_buffer;
725         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
726
727         uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
728                                         rx_adapter->event_port_id,
729                                         buf->events,
730                                         buf->count);
731         if (n != buf->count) {
732                 memmove(buf->events,
733                         &buf->events[n],
734                         (buf->count - n) * sizeof(struct rte_event));
735                 stats->rx_enq_retry++;
736         }
737
738         n ? rxa_enq_block_end_ts(rx_adapter, stats) :
739                 rxa_enq_block_start_ts(rx_adapter);
740
741         buf->count -= n;
742         stats->rx_enq_count += n;
743
744         return n;
745 }
746
747 static inline void
748 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
749                 uint16_t eth_dev_id,
750                 uint16_t rx_queue_id,
751                 struct rte_mbuf **mbufs,
752                 uint16_t num)
753 {
754         uint32_t i;
755         struct eth_device_info *dev_info =
756                                         &rx_adapter->eth_devices[eth_dev_id];
757         struct eth_rx_queue_info *eth_rx_queue_info =
758                                         &dev_info->rx_queue[rx_queue_id];
759         struct rte_eth_event_enqueue_buffer *buf =
760                                         &rx_adapter->event_enqueue_buffer;
761         struct rte_event *ev = &buf->events[buf->count];
762         int32_t qid = eth_rx_queue_info->event_queue_id;
763         uint8_t sched_type = eth_rx_queue_info->sched_type;
764         uint8_t priority = eth_rx_queue_info->priority;
765         uint32_t flow_id;
766         struct rte_mbuf *m = mbufs[0];
767         uint32_t rss_mask;
768         uint32_t rss;
769         int do_rss;
770         uint64_t ts;
771         struct rte_mbuf *cb_mbufs[BATCH_SIZE];
772         uint16_t nb_cb;
773
774         /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
775         rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
776         do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
777
778         if ((m->ol_flags & PKT_RX_TIMESTAMP) == 0) {
779                 ts = rte_get_tsc_cycles();
780                 for (i = 0; i < num; i++) {
781                         m = mbufs[i];
782
783                         m->timestamp = ts;
784                         m->ol_flags |= PKT_RX_TIMESTAMP;
785                 }
786         }
787
788
789         nb_cb = dev_info->cb_fn ? dev_info->cb_fn(eth_dev_id, rx_queue_id,
790                                                 ETH_EVENT_BUFFER_SIZE,
791                                                 buf->count, mbufs,
792                                                 num,
793                                                 dev_info->cb_arg,
794                                                 cb_mbufs) :
795                                                 num;
796         if (nb_cb < num) {
797                 mbufs = cb_mbufs;
798                 num = nb_cb;
799         }
800
801         for (i = 0; i < num; i++) {
802                 m = mbufs[i];
803
804                 rss = do_rss ?
805                         rxa_do_softrss(m, rx_adapter->rss_key_be) :
806                         m->hash.rss;
807                 flow_id =
808                     eth_rx_queue_info->flow_id &
809                                 eth_rx_queue_info->flow_id_mask;
810                 flow_id |= rss & ~eth_rx_queue_info->flow_id_mask;
811                 ev->flow_id = flow_id;
812                 ev->op = RTE_EVENT_OP_NEW;
813                 ev->sched_type = sched_type;
814                 ev->queue_id = qid;
815                 ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
816                 ev->sub_event_type = 0;
817                 ev->priority = priority;
818                 ev->mbuf = m;
819                 ev++;
820         }
821
822         buf->count += num;
823 }
824
825 /* Enqueue packets from  <port, q>  to event buffer */
826 static inline uint32_t
827 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
828         uint16_t port_id,
829         uint16_t queue_id,
830         uint32_t rx_count,
831         uint32_t max_rx,
832         int *rxq_empty)
833 {
834         struct rte_mbuf *mbufs[BATCH_SIZE];
835         struct rte_eth_event_enqueue_buffer *buf =
836                                         &rx_adapter->event_enqueue_buffer;
837         struct rte_event_eth_rx_adapter_stats *stats =
838                                         &rx_adapter->stats;
839         uint16_t n;
840         uint32_t nb_rx = 0;
841
842         if (rxq_empty)
843                 *rxq_empty = 0;
844         /* Don't do a batch dequeue from the rx queue if there isn't
845          * enough space in the enqueue buffer.
846          */
847         while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
848                 if (buf->count >= BATCH_SIZE)
849                         rxa_flush_event_buffer(rx_adapter);
850
851                 stats->rx_poll_count++;
852                 n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
853                 if (unlikely(!n)) {
854                         if (rxq_empty)
855                                 *rxq_empty = 1;
856                         break;
857                 }
858                 rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
859                 nb_rx += n;
860                 if (rx_count + nb_rx > max_rx)
861                         break;
862         }
863
864         if (buf->count > 0)
865                 rxa_flush_event_buffer(rx_adapter);
866
867         return nb_rx;
868 }
869
870 static inline void
871 rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
872                 void *data)
873 {
874         uint16_t port_id;
875         uint16_t queue;
876         int err;
877         union queue_data qd;
878         struct eth_device_info *dev_info;
879         struct eth_rx_queue_info *queue_info;
880         int *intr_enabled;
881
882         qd.ptr = data;
883         port_id = qd.port;
884         queue = qd.queue;
885
886         dev_info = &rx_adapter->eth_devices[port_id];
887         queue_info = &dev_info->rx_queue[queue];
888         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
889         if (rxa_shared_intr(dev_info, queue))
890                 intr_enabled = &dev_info->shared_intr_enabled;
891         else
892                 intr_enabled = &queue_info->intr_enabled;
893
894         if (*intr_enabled) {
895                 *intr_enabled = 0;
896                 err = rte_ring_enqueue(rx_adapter->intr_ring, data);
897                 /* Entry should always be available.
898                  * The ring size equals the maximum number of interrupt
899                  * vectors supported (an interrupt vector is shared in
900                  * case of shared interrupts)
901                  */
902                 if (err)
903                         RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
904                                 " to ring: %s", strerror(-err));
905                 else
906                         rte_eth_dev_rx_intr_disable(port_id, queue);
907         }
908         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
909 }
910
911 static int
912 rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
913                         uint32_t num_intr_vec)
914 {
915         if (rx_adapter->num_intr_vec + num_intr_vec >
916                                 RTE_EVENT_ETH_INTR_RING_SIZE) {
917                 RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
918                 " %d needed %d limit %d", rx_adapter->num_intr_vec,
919                 num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
920                 return -ENOSPC;
921         }
922
923         return 0;
924 }
925
926 /* Delete entries for (dev, queue) from the interrupt ring */
927 static void
928 rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
929                         struct eth_device_info *dev_info,
930                         uint16_t rx_queue_id)
931 {
932         int i, n;
933         union queue_data qd;
934
935         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
936
937         n = rte_ring_count(rx_adapter->intr_ring);
938         for (i = 0; i < n; i++) {
939                 rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
940                 if (!rxa_shared_intr(dev_info, rx_queue_id)) {
941                         if (qd.port == dev_info->dev->data->port_id &&
942                                 qd.queue == rx_queue_id)
943                                 continue;
944                 } else {
945                         if (qd.port == dev_info->dev->data->port_id)
946                                 continue;
947                 }
948                 rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
949         }
950
951         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
952 }
953
954 /* pthread callback handling interrupt mode receive queues
955  * After receiving an Rx interrupt, it enqueues the port id and queue id of the
956  * interrupting queue to the adapter's ring buffer for interrupt events.
957  * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
958  * the adapter service function.
959  */
960 static void *
961 rxa_intr_thread(void *arg)
962 {
963         struct rte_event_eth_rx_adapter *rx_adapter = arg;
964         struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
965         int n, i;
966
967         while (1) {
968                 n = rte_epoll_wait(rx_adapter->epd, epoll_events,
969                                 RTE_EVENT_ETH_INTR_RING_SIZE, -1);
970                 if (unlikely(n < 0))
971                         RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
972                                         n);
973                 for (i = 0; i < n; i++) {
974                         rxa_intr_ring_enqueue(rx_adapter,
975                                         epoll_events[i].epdata.data);
976                 }
977         }
978
979         return NULL;
980 }
981
982 /* Dequeue <port, q> from interrupt ring and enqueue received
983  * mbufs to eventdev
984  */
985 static inline uint32_t
986 rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
987 {
988         uint32_t n;
989         uint32_t nb_rx = 0;
990         int rxq_empty;
991         struct rte_eth_event_enqueue_buffer *buf;
992         rte_spinlock_t *ring_lock;
993         uint8_t max_done = 0;
994
995         if (rx_adapter->num_rx_intr == 0)
996                 return 0;
997
998         if (rte_ring_count(rx_adapter->intr_ring) == 0
999                 && !rx_adapter->qd_valid)
1000                 return 0;
1001
1002         buf = &rx_adapter->event_enqueue_buffer;
1003         ring_lock = &rx_adapter->intr_ring_lock;
1004
1005         if (buf->count >= BATCH_SIZE)
1006                 rxa_flush_event_buffer(rx_adapter);
1007
1008         while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
1009                 struct eth_device_info *dev_info;
1010                 uint16_t port;
1011                 uint16_t queue;
1012                 union queue_data qd  = rx_adapter->qd;
1013                 int err;
1014
1015                 if (!rx_adapter->qd_valid) {
1016                         struct eth_rx_queue_info *queue_info;
1017
1018                         rte_spinlock_lock(ring_lock);
1019                         err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1020                         if (err) {
1021                                 rte_spinlock_unlock(ring_lock);
1022                                 break;
1023                         }
1024
1025                         port = qd.port;
1026                         queue = qd.queue;
1027                         rx_adapter->qd = qd;
1028                         rx_adapter->qd_valid = 1;
1029                         dev_info = &rx_adapter->eth_devices[port];
1030                         if (rxa_shared_intr(dev_info, queue))
1031                                 dev_info->shared_intr_enabled = 1;
1032                         else {
1033                                 queue_info = &dev_info->rx_queue[queue];
1034                                 queue_info->intr_enabled = 1;
1035                         }
1036                         rte_eth_dev_rx_intr_enable(port, queue);
1037                         rte_spinlock_unlock(ring_lock);
1038                 } else {
1039                         port = qd.port;
1040                         queue = qd.queue;
1041
1042                         dev_info = &rx_adapter->eth_devices[port];
1043                 }
1044
1045                 if (rxa_shared_intr(dev_info, queue)) {
1046                         uint16_t i;
1047                         uint16_t nb_queues;
1048
1049                         nb_queues = dev_info->dev->data->nb_rx_queues;
1050                         n = 0;
1051                         for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1052                                 uint8_t enq_buffer_full;
1053
1054                                 if (!rxa_intr_queue(dev_info, i))
1055                                         continue;
1056                                 n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1057                                         rx_adapter->max_nb_rx,
1058                                         &rxq_empty);
1059                                 nb_rx += n;
1060
1061                                 enq_buffer_full = !rxq_empty && n == 0;
1062                                 max_done = nb_rx > rx_adapter->max_nb_rx;
1063
1064                                 if (enq_buffer_full || max_done) {
1065                                         dev_info->next_q_idx = i;
1066                                         goto done;
1067                                 }
1068                         }
1069
1070                         rx_adapter->qd_valid = 0;
1071
1072                         /* Reinitialize for next interrupt */
1073                         dev_info->next_q_idx = dev_info->multi_intr_cap ?
1074                                                 RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1075                                                 0;
1076                 } else {
1077                         n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1078                                 rx_adapter->max_nb_rx,
1079                                 &rxq_empty);
1080                         rx_adapter->qd_valid = !rxq_empty;
1081                         nb_rx += n;
1082                         if (nb_rx > rx_adapter->max_nb_rx)
1083                                 break;
1084                 }
1085         }
1086
1087 done:
1088         rx_adapter->stats.rx_intr_packets += nb_rx;
1089         return nb_rx;
1090 }
1091
1092 /*
1093  * Polls receive queues added to the event adapter and enqueues received
1094  * packets to the event device.
1095  *
1096  * The receive code enqueues initially to a temporary buffer, the
1097  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1098  *
1099  * If there isn't space available in the temporary buffer, packets from the
1100  * Rx queue aren't dequeued from the eth device, this back pressures the
1101  * eth device, in virtual device environments this back pressure is relayed to
1102  * the hypervisor's switching layer where adjustments can be made to deal with
1103  * it.
1104  */
1105 static inline uint32_t
1106 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
1107 {
1108         uint32_t num_queue;
1109         uint32_t nb_rx = 0;
1110         struct rte_eth_event_enqueue_buffer *buf;
1111         uint32_t wrr_pos;
1112         uint32_t max_nb_rx;
1113
1114         wrr_pos = rx_adapter->wrr_pos;
1115         max_nb_rx = rx_adapter->max_nb_rx;
1116         buf = &rx_adapter->event_enqueue_buffer;
1117
1118         /* Iterate through a WRR sequence */
1119         for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1120                 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1121                 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1122                 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1123
1124                 /* Don't do a batch dequeue from the rx queue if there isn't
1125                  * enough space in the enqueue buffer.
1126                  */
1127                 if (buf->count >= BATCH_SIZE)
1128                         rxa_flush_event_buffer(rx_adapter);
1129                 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
1130                         rx_adapter->wrr_pos = wrr_pos;
1131                         return nb_rx;
1132                 }
1133
1134                 nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1135                                 NULL);
1136                 if (nb_rx > max_nb_rx) {
1137                         rx_adapter->wrr_pos =
1138                                     (wrr_pos + 1) % rx_adapter->wrr_len;
1139                         break;
1140                 }
1141
1142                 if (++wrr_pos == rx_adapter->wrr_len)
1143                         wrr_pos = 0;
1144         }
1145         return nb_rx;
1146 }
1147
1148 static int
1149 rxa_service_func(void *args)
1150 {
1151         struct rte_event_eth_rx_adapter *rx_adapter = args;
1152         struct rte_event_eth_rx_adapter_stats *stats;
1153
1154         if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1155                 return 0;
1156         if (!rx_adapter->rxa_started) {
1157                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1158                 return 0;
1159         }
1160
1161         stats = &rx_adapter->stats;
1162         stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
1163         stats->rx_packets += rxa_poll(rx_adapter);
1164         rte_spinlock_unlock(&rx_adapter->rx_lock);
1165         return 0;
1166 }
1167
1168 static int
1169 rte_event_eth_rx_adapter_init(void)
1170 {
1171         const char *name = "rte_event_eth_rx_adapter_array";
1172         const struct rte_memzone *mz;
1173         unsigned int sz;
1174
1175         sz = sizeof(*event_eth_rx_adapter) *
1176             RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1177         sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1178
1179         mz = rte_memzone_lookup(name);
1180         if (mz == NULL) {
1181                 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1182                                                  RTE_CACHE_LINE_SIZE);
1183                 if (mz == NULL) {
1184                         RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1185                                         PRId32, rte_errno);
1186                         return -rte_errno;
1187                 }
1188         }
1189
1190         event_eth_rx_adapter = mz->addr;
1191         return 0;
1192 }
1193
1194 static inline struct rte_event_eth_rx_adapter *
1195 rxa_id_to_adapter(uint8_t id)
1196 {
1197         return event_eth_rx_adapter ?
1198                 event_eth_rx_adapter[id] : NULL;
1199 }
1200
1201 static int
1202 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1203                 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1204 {
1205         int ret;
1206         struct rte_eventdev *dev;
1207         struct rte_event_dev_config dev_conf;
1208         int started;
1209         uint8_t port_id;
1210         struct rte_event_port_conf *port_conf = arg;
1211         struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1212
1213         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1214         dev_conf = dev->data->dev_conf;
1215
1216         started = dev->data->dev_started;
1217         if (started)
1218                 rte_event_dev_stop(dev_id);
1219         port_id = dev_conf.nb_event_ports;
1220         dev_conf.nb_event_ports += 1;
1221         ret = rte_event_dev_configure(dev_id, &dev_conf);
1222         if (ret) {
1223                 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1224                                                 dev_id);
1225                 if (started) {
1226                         if (rte_event_dev_start(dev_id))
1227                                 return -EIO;
1228                 }
1229                 return ret;
1230         }
1231
1232         ret = rte_event_port_setup(dev_id, port_id, port_conf);
1233         if (ret) {
1234                 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1235                                         port_id);
1236                 return ret;
1237         }
1238
1239         conf->event_port_id = port_id;
1240         conf->max_nb_rx = 128;
1241         if (started)
1242                 ret = rte_event_dev_start(dev_id);
1243         rx_adapter->default_cb_arg = 1;
1244         return ret;
1245 }
1246
1247 static int
1248 rxa_epoll_create1(void)
1249 {
1250 #if defined(LINUX)
1251         int fd;
1252         fd = epoll_create1(EPOLL_CLOEXEC);
1253         return fd < 0 ? -errno : fd;
1254 #elif defined(BSD)
1255         return -ENOTSUP;
1256 #endif
1257 }
1258
1259 static int
1260 rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
1261 {
1262         if (rx_adapter->epd != INIT_FD)
1263                 return 0;
1264
1265         rx_adapter->epd = rxa_epoll_create1();
1266         if (rx_adapter->epd < 0) {
1267                 int err = rx_adapter->epd;
1268                 rx_adapter->epd = INIT_FD;
1269                 RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1270                 return err;
1271         }
1272
1273         return 0;
1274 }
1275
1276 static int
1277 rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1278 {
1279         int err;
1280         char thread_name[RTE_MAX_THREAD_NAME_LEN];
1281
1282         if (rx_adapter->intr_ring)
1283                 return 0;
1284
1285         rx_adapter->intr_ring = rte_ring_create("intr_ring",
1286                                         RTE_EVENT_ETH_INTR_RING_SIZE,
1287                                         rte_socket_id(), 0);
1288         if (!rx_adapter->intr_ring)
1289                 return -ENOMEM;
1290
1291         rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1292                                         RTE_EVENT_ETH_INTR_RING_SIZE *
1293                                         sizeof(struct rte_epoll_event),
1294                                         RTE_CACHE_LINE_SIZE,
1295                                         rx_adapter->socket_id);
1296         if (!rx_adapter->epoll_events) {
1297                 err = -ENOMEM;
1298                 goto error;
1299         }
1300
1301         rte_spinlock_init(&rx_adapter->intr_ring_lock);
1302
1303         snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1304                         "rx-intr-thread-%d", rx_adapter->id);
1305
1306         err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1307                                 NULL, rxa_intr_thread, rx_adapter);
1308         if (!err) {
1309                 rte_thread_setname(rx_adapter->rx_intr_thread, thread_name);
1310                 return 0;
1311         }
1312
1313         RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1314 error:
1315         rte_ring_free(rx_adapter->intr_ring);
1316         rx_adapter->intr_ring = NULL;
1317         rx_adapter->epoll_events = NULL;
1318         return err;
1319 }
1320
1321 static int
1322 rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1323 {
1324         int err;
1325
1326         err = pthread_cancel(rx_adapter->rx_intr_thread);
1327         if (err)
1328                 RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1329                                 err);
1330
1331         err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1332         if (err)
1333                 RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1334
1335         rte_free(rx_adapter->epoll_events);
1336         rte_ring_free(rx_adapter->intr_ring);
1337         rx_adapter->intr_ring = NULL;
1338         rx_adapter->epoll_events = NULL;
1339         return 0;
1340 }
1341
1342 static int
1343 rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
1344 {
1345         int ret;
1346
1347         if (rx_adapter->num_rx_intr == 0)
1348                 return 0;
1349
1350         ret = rxa_destroy_intr_thread(rx_adapter);
1351         if (ret)
1352                 return ret;
1353
1354         close(rx_adapter->epd);
1355         rx_adapter->epd = INIT_FD;
1356
1357         return ret;
1358 }
1359
1360 static int
1361 rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1362         struct eth_device_info *dev_info,
1363         uint16_t rx_queue_id)
1364 {
1365         int err;
1366         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1367         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1368
1369         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1370         if (err) {
1371                 RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1372                         rx_queue_id);
1373                 return err;
1374         }
1375
1376         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1377                                         rx_adapter->epd,
1378                                         RTE_INTR_EVENT_DEL,
1379                                         0);
1380         if (err)
1381                 RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1382
1383         if (sintr)
1384                 dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1385         else
1386                 dev_info->shared_intr_enabled = 0;
1387         return err;
1388 }
1389
1390 static int
1391 rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1392                 struct eth_device_info *dev_info,
1393                 int rx_queue_id)
1394 {
1395         int err;
1396         int i;
1397         int s;
1398
1399         if (dev_info->nb_rx_intr == 0)
1400                 return 0;
1401
1402         err = 0;
1403         if (rx_queue_id == -1) {
1404                 s = dev_info->nb_shared_intr;
1405                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1406                         int sintr;
1407                         uint16_t q;
1408
1409                         q = dev_info->intr_queue[i];
1410                         sintr = rxa_shared_intr(dev_info, q);
1411                         s -= sintr;
1412
1413                         if (!sintr || s == 0) {
1414
1415                                 err = rxa_disable_intr(rx_adapter, dev_info,
1416                                                 q);
1417                                 if (err)
1418                                         return err;
1419                                 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1420                                                         q);
1421                         }
1422                 }
1423         } else {
1424                 if (!rxa_intr_queue(dev_info, rx_queue_id))
1425                         return 0;
1426                 if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1427                                 dev_info->nb_shared_intr == 1) {
1428                         err = rxa_disable_intr(rx_adapter, dev_info,
1429                                         rx_queue_id);
1430                         if (err)
1431                                 return err;
1432                         rxa_intr_ring_del_entries(rx_adapter, dev_info,
1433                                                 rx_queue_id);
1434                 }
1435
1436                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1437                         if (dev_info->intr_queue[i] == rx_queue_id) {
1438                                 for (; i < dev_info->nb_rx_intr - 1; i++)
1439                                         dev_info->intr_queue[i] =
1440                                                 dev_info->intr_queue[i + 1];
1441                                 break;
1442                         }
1443                 }
1444         }
1445
1446         return err;
1447 }
1448
1449 static int
1450 rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1451         struct eth_device_info *dev_info,
1452         uint16_t rx_queue_id)
1453 {
1454         int err, err1;
1455         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1456         union queue_data qd;
1457         int init_fd;
1458         uint16_t *intr_queue;
1459         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1460
1461         if (rxa_intr_queue(dev_info, rx_queue_id))
1462                 return 0;
1463
1464         intr_queue = dev_info->intr_queue;
1465         if (dev_info->intr_queue == NULL) {
1466                 size_t len =
1467                         dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1468                 dev_info->intr_queue =
1469                         rte_zmalloc_socket(
1470                                 rx_adapter->mem_name,
1471                                 len,
1472                                 0,
1473                                 rx_adapter->socket_id);
1474                 if (dev_info->intr_queue == NULL)
1475                         return -ENOMEM;
1476         }
1477
1478         init_fd = rx_adapter->epd;
1479         err = rxa_init_epd(rx_adapter);
1480         if (err)
1481                 goto err_free_queue;
1482
1483         qd.port = eth_dev_id;
1484         qd.queue = rx_queue_id;
1485
1486         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1487                                         rx_adapter->epd,
1488                                         RTE_INTR_EVENT_ADD,
1489                                         qd.ptr);
1490         if (err) {
1491                 RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1492                         " Rx Queue %u err %d", rx_queue_id, err);
1493                 goto err_del_fd;
1494         }
1495
1496         err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1497         if (err) {
1498                 RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1499                                 " Rx Queue %u err %d", rx_queue_id, err);
1500
1501                 goto err_del_event;
1502         }
1503
1504         err = rxa_create_intr_thread(rx_adapter);
1505         if (!err)  {
1506                 if (sintr)
1507                         dev_info->shared_intr_enabled = 1;
1508                 else
1509                         dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1510                 return 0;
1511         }
1512
1513
1514         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1515         if (err)
1516                 RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1517                                 " Rx Queue %u err %d", rx_queue_id, err);
1518 err_del_event:
1519         err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1520                                         rx_adapter->epd,
1521                                         RTE_INTR_EVENT_DEL,
1522                                         0);
1523         if (err1) {
1524                 RTE_EDEV_LOG_ERR("Could not delete event for"
1525                                 " Rx Queue %u err %d", rx_queue_id, err1);
1526         }
1527 err_del_fd:
1528         if (init_fd == INIT_FD) {
1529                 close(rx_adapter->epd);
1530                 rx_adapter->epd = -1;
1531         }
1532 err_free_queue:
1533         if (intr_queue == NULL)
1534                 rte_free(dev_info->intr_queue);
1535
1536         return err;
1537 }
1538
1539 static int
1540 rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1541         struct eth_device_info *dev_info,
1542         int rx_queue_id)
1543
1544 {
1545         int i, j, err;
1546         int si = -1;
1547         int shared_done = (dev_info->nb_shared_intr > 0);
1548
1549         if (rx_queue_id != -1) {
1550                 if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1551                         return 0;
1552                 return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1553         }
1554
1555         err = 0;
1556         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1557
1558                 if (rxa_shared_intr(dev_info, i) && shared_done)
1559                         continue;
1560
1561                 err = rxa_config_intr(rx_adapter, dev_info, i);
1562
1563                 shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1564                 if (shared_done) {
1565                         si = i;
1566                         dev_info->shared_intr_enabled = 1;
1567                 }
1568                 if (err)
1569                         break;
1570         }
1571
1572         if (err == 0)
1573                 return 0;
1574
1575         shared_done = (dev_info->nb_shared_intr > 0);
1576         for (j = 0; j < i; j++) {
1577                 if (rxa_intr_queue(dev_info, j))
1578                         continue;
1579                 if (rxa_shared_intr(dev_info, j) && si != j)
1580                         continue;
1581                 err = rxa_disable_intr(rx_adapter, dev_info, j);
1582                 if (err)
1583                         break;
1584
1585         }
1586
1587         return err;
1588 }
1589
1590
1591 static int
1592 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
1593 {
1594         int ret;
1595         struct rte_service_spec service;
1596         struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1597
1598         if (rx_adapter->service_inited)
1599                 return 0;
1600
1601         memset(&service, 0, sizeof(service));
1602         snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1603                 "rte_event_eth_rx_adapter_%d", id);
1604         service.socket_id = rx_adapter->socket_id;
1605         service.callback = rxa_service_func;
1606         service.callback_userdata = rx_adapter;
1607         /* Service function handles locking for queue add/del updates */
1608         service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1609         ret = rte_service_component_register(&service, &rx_adapter->service_id);
1610         if (ret) {
1611                 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1612                         service.name, ret);
1613                 return ret;
1614         }
1615
1616         ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1617                 &rx_adapter_conf, rx_adapter->conf_arg);
1618         if (ret) {
1619                 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1620                         ret);
1621                 goto err_done;
1622         }
1623         rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1624         rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1625         rx_adapter->service_inited = 1;
1626         rx_adapter->epd = INIT_FD;
1627         return 0;
1628
1629 err_done:
1630         rte_service_component_unregister(rx_adapter->service_id);
1631         return ret;
1632 }
1633
1634 static void
1635 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1636                 struct eth_device_info *dev_info,
1637                 int32_t rx_queue_id,
1638                 uint8_t add)
1639 {
1640         struct eth_rx_queue_info *queue_info;
1641         int enabled;
1642         uint16_t i;
1643
1644         if (dev_info->rx_queue == NULL)
1645                 return;
1646
1647         if (rx_queue_id == -1) {
1648                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1649                         rxa_update_queue(rx_adapter, dev_info, i, add);
1650         } else {
1651                 queue_info = &dev_info->rx_queue[rx_queue_id];
1652                 enabled = queue_info->queue_enabled;
1653                 if (add) {
1654                         rx_adapter->nb_queues += !enabled;
1655                         dev_info->nb_dev_queues += !enabled;
1656                 } else {
1657                         rx_adapter->nb_queues -= enabled;
1658                         dev_info->nb_dev_queues -= enabled;
1659                 }
1660                 queue_info->queue_enabled = !!add;
1661         }
1662 }
1663
1664 static void
1665 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
1666         struct eth_device_info *dev_info,
1667         int32_t rx_queue_id)
1668 {
1669         int pollq;
1670         int intrq;
1671         int sintrq;
1672
1673
1674         if (rx_adapter->nb_queues == 0)
1675                 return;
1676
1677         if (rx_queue_id == -1) {
1678                 uint16_t nb_rx_queues;
1679                 uint16_t i;
1680
1681                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1682                 for (i = 0; i < nb_rx_queues; i++)
1683                         rxa_sw_del(rx_adapter, dev_info, i);
1684                 return;
1685         }
1686
1687         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1688         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1689         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1690         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1691         rx_adapter->num_rx_polled -= pollq;
1692         dev_info->nb_rx_poll -= pollq;
1693         rx_adapter->num_rx_intr -= intrq;
1694         dev_info->nb_rx_intr -= intrq;
1695         dev_info->nb_shared_intr -= intrq && sintrq;
1696 }
1697
1698 static void
1699 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1700         struct eth_device_info *dev_info,
1701         int32_t rx_queue_id,
1702         const struct rte_event_eth_rx_adapter_queue_conf *conf)
1703 {
1704         struct eth_rx_queue_info *queue_info;
1705         const struct rte_event *ev = &conf->ev;
1706         int pollq;
1707         int intrq;
1708         int sintrq;
1709
1710         if (rx_queue_id == -1) {
1711                 uint16_t nb_rx_queues;
1712                 uint16_t i;
1713
1714                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1715                 for (i = 0; i < nb_rx_queues; i++)
1716                         rxa_add_queue(rx_adapter, dev_info, i, conf);
1717                 return;
1718         }
1719
1720         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1721         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1722         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1723
1724         queue_info = &dev_info->rx_queue[rx_queue_id];
1725         queue_info->event_queue_id = ev->queue_id;
1726         queue_info->sched_type = ev->sched_type;
1727         queue_info->priority = ev->priority;
1728         queue_info->wt = conf->servicing_weight;
1729
1730         if (conf->rx_queue_flags &
1731                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
1732                 queue_info->flow_id = ev->flow_id;
1733                 queue_info->flow_id_mask = ~0;
1734         }
1735
1736         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
1737         if (rxa_polled_queue(dev_info, rx_queue_id)) {
1738                 rx_adapter->num_rx_polled += !pollq;
1739                 dev_info->nb_rx_poll += !pollq;
1740                 rx_adapter->num_rx_intr -= intrq;
1741                 dev_info->nb_rx_intr -= intrq;
1742                 dev_info->nb_shared_intr -= intrq && sintrq;
1743         }
1744
1745         if (rxa_intr_queue(dev_info, rx_queue_id)) {
1746                 rx_adapter->num_rx_polled -= pollq;
1747                 dev_info->nb_rx_poll -= pollq;
1748                 rx_adapter->num_rx_intr += !intrq;
1749                 dev_info->nb_rx_intr += !intrq;
1750                 dev_info->nb_shared_intr += !intrq && sintrq;
1751                 if (dev_info->nb_shared_intr == 1) {
1752                         if (dev_info->multi_intr_cap)
1753                                 dev_info->next_q_idx =
1754                                         RTE_MAX_RXTX_INTR_VEC_ID - 1;
1755                         else
1756                                 dev_info->next_q_idx = 0;
1757                 }
1758         }
1759 }
1760
1761 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
1762                 uint16_t eth_dev_id,
1763                 int rx_queue_id,
1764                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
1765 {
1766         struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
1767         struct rte_event_eth_rx_adapter_queue_conf temp_conf;
1768         int ret;
1769         struct eth_rx_poll_entry *rx_poll;
1770         struct eth_rx_queue_info *rx_queue;
1771         uint32_t *rx_wrr;
1772         uint16_t nb_rx_queues;
1773         uint32_t nb_rx_poll, nb_wrr;
1774         uint32_t nb_rx_intr;
1775         int num_intr_vec;
1776         uint16_t wt;
1777
1778         if (queue_conf->servicing_weight == 0) {
1779                 struct rte_eth_dev_data *data = dev_info->dev->data;
1780
1781                 temp_conf = *queue_conf;
1782                 if (!data->dev_conf.intr_conf.rxq) {
1783                         /* If Rx interrupts are disabled set wt = 1 */
1784                         temp_conf.servicing_weight = 1;
1785                 }
1786                 queue_conf = &temp_conf;
1787         }
1788
1789         nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1790         rx_queue = dev_info->rx_queue;
1791         wt = queue_conf->servicing_weight;
1792
1793         if (dev_info->rx_queue == NULL) {
1794                 dev_info->rx_queue =
1795                     rte_zmalloc_socket(rx_adapter->mem_name,
1796                                        nb_rx_queues *
1797                                        sizeof(struct eth_rx_queue_info), 0,
1798                                        rx_adapter->socket_id);
1799                 if (dev_info->rx_queue == NULL)
1800                         return -ENOMEM;
1801         }
1802         rx_wrr = NULL;
1803         rx_poll = NULL;
1804
1805         rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
1806                         queue_conf->servicing_weight,
1807                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
1808
1809         if (dev_info->dev->intr_handle)
1810                 dev_info->multi_intr_cap =
1811                         rte_intr_cap_multiple(dev_info->dev->intr_handle);
1812
1813         ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
1814                                 &rx_poll, &rx_wrr);
1815         if (ret)
1816                 goto err_free_rxqueue;
1817
1818         if (wt == 0) {
1819                 num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
1820
1821                 ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
1822                 if (ret)
1823                         goto err_free_rxqueue;
1824
1825                 ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
1826                 if (ret)
1827                         goto err_free_rxqueue;
1828         } else {
1829
1830                 num_intr_vec = 0;
1831                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
1832                         num_intr_vec = rxa_nb_intr_vect(dev_info,
1833                                                 rx_queue_id, 0);
1834                         /* interrupt based queues are being converted to
1835                          * poll mode queues, delete the interrupt configuration
1836                          * for those.
1837                          */
1838                         ret = rxa_del_intr_queue(rx_adapter,
1839                                                 dev_info, rx_queue_id);
1840                         if (ret)
1841                                 goto err_free_rxqueue;
1842                 }
1843         }
1844
1845         if (nb_rx_intr == 0) {
1846                 ret = rxa_free_intr_resources(rx_adapter);
1847                 if (ret)
1848                         goto err_free_rxqueue;
1849         }
1850
1851         if (wt == 0) {
1852                 uint16_t i;
1853
1854                 if (rx_queue_id  == -1) {
1855                         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1856                                 dev_info->intr_queue[i] = i;
1857                 } else {
1858                         if (!rxa_intr_queue(dev_info, rx_queue_id))
1859                                 dev_info->intr_queue[nb_rx_intr - 1] =
1860                                         rx_queue_id;
1861                 }
1862         }
1863
1864
1865
1866         rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
1867         rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
1868
1869         rte_free(rx_adapter->eth_rx_poll);
1870         rte_free(rx_adapter->wrr_sched);
1871
1872         rx_adapter->eth_rx_poll = rx_poll;
1873         rx_adapter->wrr_sched = rx_wrr;
1874         rx_adapter->wrr_len = nb_wrr;
1875         rx_adapter->num_intr_vec += num_intr_vec;
1876         return 0;
1877
1878 err_free_rxqueue:
1879         if (rx_queue == NULL) {
1880                 rte_free(dev_info->rx_queue);
1881                 dev_info->rx_queue = NULL;
1882         }
1883
1884         rte_free(rx_poll);
1885         rte_free(rx_wrr);
1886
1887         return 0;
1888 }
1889
1890 static int
1891 rxa_ctrl(uint8_t id, int start)
1892 {
1893         struct rte_event_eth_rx_adapter *rx_adapter;
1894         struct rte_eventdev *dev;
1895         struct eth_device_info *dev_info;
1896         uint32_t i;
1897         int use_service = 0;
1898         int stop = !start;
1899
1900         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1901         rx_adapter = rxa_id_to_adapter(id);
1902         if (rx_adapter == NULL)
1903                 return -EINVAL;
1904
1905         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1906
1907         RTE_ETH_FOREACH_DEV(i) {
1908                 dev_info = &rx_adapter->eth_devices[i];
1909                 /* if start  check for num dev queues */
1910                 if (start && !dev_info->nb_dev_queues)
1911                         continue;
1912                 /* if stop check if dev has been started */
1913                 if (stop && !dev_info->dev_rx_started)
1914                         continue;
1915                 use_service |= !dev_info->internal_event_port;
1916                 dev_info->dev_rx_started = start;
1917                 if (dev_info->internal_event_port == 0)
1918                         continue;
1919                 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
1920                                                 &rte_eth_devices[i]) :
1921                         (*dev->dev_ops->eth_rx_adapter_stop)(dev,
1922                                                 &rte_eth_devices[i]);
1923         }
1924
1925         if (use_service) {
1926                 rte_spinlock_lock(&rx_adapter->rx_lock);
1927                 rx_adapter->rxa_started = start;
1928                 rte_service_runstate_set(rx_adapter->service_id, start);
1929                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1930         }
1931
1932         return 0;
1933 }
1934
1935 int
1936 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
1937                                 rte_event_eth_rx_adapter_conf_cb conf_cb,
1938                                 void *conf_arg)
1939 {
1940         struct rte_event_eth_rx_adapter *rx_adapter;
1941         int ret;
1942         int socket_id;
1943         uint16_t i;
1944         char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
1945         const uint8_t default_rss_key[] = {
1946                 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
1947                 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
1948                 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
1949                 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
1950                 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
1951         };
1952
1953         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1954         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1955         if (conf_cb == NULL)
1956                 return -EINVAL;
1957
1958         if (event_eth_rx_adapter == NULL) {
1959                 ret = rte_event_eth_rx_adapter_init();
1960                 if (ret)
1961                         return ret;
1962         }
1963
1964         rx_adapter = rxa_id_to_adapter(id);
1965         if (rx_adapter != NULL) {
1966                 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
1967                 return -EEXIST;
1968         }
1969
1970         socket_id = rte_event_dev_socket_id(dev_id);
1971         snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
1972                 "rte_event_eth_rx_adapter_%d",
1973                 id);
1974
1975         rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
1976                         RTE_CACHE_LINE_SIZE, socket_id);
1977         if (rx_adapter == NULL) {
1978                 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
1979                 return -ENOMEM;
1980         }
1981
1982         rx_adapter->eventdev_id = dev_id;
1983         rx_adapter->socket_id = socket_id;
1984         rx_adapter->conf_cb = conf_cb;
1985         rx_adapter->conf_arg = conf_arg;
1986         rx_adapter->id = id;
1987         strcpy(rx_adapter->mem_name, mem_name);
1988         rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
1989                                         RTE_MAX_ETHPORTS *
1990                                         sizeof(struct eth_device_info), 0,
1991                                         socket_id);
1992         rte_convert_rss_key((const uint32_t *)default_rss_key,
1993                         (uint32_t *)rx_adapter->rss_key_be,
1994                             RTE_DIM(default_rss_key));
1995
1996         if (rx_adapter->eth_devices == NULL) {
1997                 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
1998                 rte_free(rx_adapter);
1999                 return -ENOMEM;
2000         }
2001         rte_spinlock_init(&rx_adapter->rx_lock);
2002         for (i = 0; i < RTE_MAX_ETHPORTS; i++)
2003                 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
2004
2005         event_eth_rx_adapter[id] = rx_adapter;
2006         if (conf_cb == rxa_default_conf_cb)
2007                 rx_adapter->default_cb_arg = 1;
2008         return 0;
2009 }
2010
2011 int
2012 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2013                 struct rte_event_port_conf *port_config)
2014 {
2015         struct rte_event_port_conf *pc;
2016         int ret;
2017
2018         if (port_config == NULL)
2019                 return -EINVAL;
2020         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2021
2022         pc = rte_malloc(NULL, sizeof(*pc), 0);
2023         if (pc == NULL)
2024                 return -ENOMEM;
2025         *pc = *port_config;
2026         ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2027                                         rxa_default_conf_cb,
2028                                         pc);
2029         if (ret)
2030                 rte_free(pc);
2031         return ret;
2032 }
2033
2034 int
2035 rte_event_eth_rx_adapter_free(uint8_t id)
2036 {
2037         struct rte_event_eth_rx_adapter *rx_adapter;
2038
2039         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2040
2041         rx_adapter = rxa_id_to_adapter(id);
2042         if (rx_adapter == NULL)
2043                 return -EINVAL;
2044
2045         if (rx_adapter->nb_queues) {
2046                 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2047                                 rx_adapter->nb_queues);
2048                 return -EBUSY;
2049         }
2050
2051         if (rx_adapter->default_cb_arg)
2052                 rte_free(rx_adapter->conf_arg);
2053         rte_free(rx_adapter->eth_devices);
2054         rte_free(rx_adapter);
2055         event_eth_rx_adapter[id] = NULL;
2056
2057         return 0;
2058 }
2059
2060 int
2061 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2062                 uint16_t eth_dev_id,
2063                 int32_t rx_queue_id,
2064                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2065 {
2066         int ret;
2067         uint32_t cap;
2068         struct rte_event_eth_rx_adapter *rx_adapter;
2069         struct rte_eventdev *dev;
2070         struct eth_device_info *dev_info;
2071
2072         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2073         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2074
2075         rx_adapter = rxa_id_to_adapter(id);
2076         if ((rx_adapter == NULL) || (queue_conf == NULL))
2077                 return -EINVAL;
2078
2079         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2080         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2081                                                 eth_dev_id,
2082                                                 &cap);
2083         if (ret) {
2084                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2085                         "eth port %" PRIu16, id, eth_dev_id);
2086                 return ret;
2087         }
2088
2089         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2090                 && (queue_conf->rx_queue_flags &
2091                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2092                 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2093                                 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2094                                 eth_dev_id, id);
2095                 return -EINVAL;
2096         }
2097
2098         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2099                 (rx_queue_id != -1)) {
2100                 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2101                         "event queue, eth port: %" PRIu16 " adapter id: %"
2102                         PRIu8, eth_dev_id, id);
2103                 return -EINVAL;
2104         }
2105
2106         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2107                         rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2108                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2109                          (uint16_t)rx_queue_id);
2110                 return -EINVAL;
2111         }
2112
2113         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2114
2115         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2116                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2117                                         -ENOTSUP);
2118                 if (dev_info->rx_queue == NULL) {
2119                         dev_info->rx_queue =
2120                             rte_zmalloc_socket(rx_adapter->mem_name,
2121                                         dev_info->dev->data->nb_rx_queues *
2122                                         sizeof(struct eth_rx_queue_info), 0,
2123                                         rx_adapter->socket_id);
2124                         if (dev_info->rx_queue == NULL)
2125                                 return -ENOMEM;
2126                 }
2127
2128                 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2129                                 &rte_eth_devices[eth_dev_id],
2130                                 rx_queue_id, queue_conf);
2131                 if (ret == 0) {
2132                         dev_info->internal_event_port = 1;
2133                         rxa_update_queue(rx_adapter,
2134                                         &rx_adapter->eth_devices[eth_dev_id],
2135                                         rx_queue_id,
2136                                         1);
2137                 }
2138         } else {
2139                 rte_spinlock_lock(&rx_adapter->rx_lock);
2140                 dev_info->internal_event_port = 0;
2141                 ret = rxa_init_service(rx_adapter, id);
2142                 if (ret == 0) {
2143                         uint32_t service_id = rx_adapter->service_id;
2144                         ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2145                                         queue_conf);
2146                         rte_service_component_runstate_set(service_id,
2147                                 rxa_sw_adapter_queue_count(rx_adapter));
2148                 }
2149                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2150         }
2151
2152         if (ret)
2153                 return ret;
2154
2155         return 0;
2156 }
2157
2158 int
2159 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2160                                 int32_t rx_queue_id)
2161 {
2162         int ret = 0;
2163         struct rte_eventdev *dev;
2164         struct rte_event_eth_rx_adapter *rx_adapter;
2165         struct eth_device_info *dev_info;
2166         uint32_t cap;
2167         uint32_t nb_rx_poll = 0;
2168         uint32_t nb_wrr = 0;
2169         uint32_t nb_rx_intr;
2170         struct eth_rx_poll_entry *rx_poll = NULL;
2171         uint32_t *rx_wrr = NULL;
2172         int num_intr_vec;
2173
2174         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2175         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2176
2177         rx_adapter = rxa_id_to_adapter(id);
2178         if (rx_adapter == NULL)
2179                 return -EINVAL;
2180
2181         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2182         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2183                                                 eth_dev_id,
2184                                                 &cap);
2185         if (ret)
2186                 return ret;
2187
2188         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2189                 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2190                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2191                          (uint16_t)rx_queue_id);
2192                 return -EINVAL;
2193         }
2194
2195         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2196
2197         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2198                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2199                                  -ENOTSUP);
2200                 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2201                                                 &rte_eth_devices[eth_dev_id],
2202                                                 rx_queue_id);
2203                 if (ret == 0) {
2204                         rxa_update_queue(rx_adapter,
2205                                         &rx_adapter->eth_devices[eth_dev_id],
2206                                         rx_queue_id,
2207                                         0);
2208                         if (dev_info->nb_dev_queues == 0) {
2209                                 rte_free(dev_info->rx_queue);
2210                                 dev_info->rx_queue = NULL;
2211                         }
2212                 }
2213         } else {
2214                 rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2215                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2216
2217                 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2218                         &rx_poll, &rx_wrr);
2219                 if (ret)
2220                         return ret;
2221
2222                 rte_spinlock_lock(&rx_adapter->rx_lock);
2223
2224                 num_intr_vec = 0;
2225                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2226
2227                         num_intr_vec = rxa_nb_intr_vect(dev_info,
2228                                                 rx_queue_id, 0);
2229                         ret = rxa_del_intr_queue(rx_adapter, dev_info,
2230                                         rx_queue_id);
2231                         if (ret)
2232                                 goto unlock_ret;
2233                 }
2234
2235                 if (nb_rx_intr == 0) {
2236                         ret = rxa_free_intr_resources(rx_adapter);
2237                         if (ret)
2238                                 goto unlock_ret;
2239                 }
2240
2241                 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2242                 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2243
2244                 rte_free(rx_adapter->eth_rx_poll);
2245                 rte_free(rx_adapter->wrr_sched);
2246
2247                 if (nb_rx_intr == 0) {
2248                         rte_free(dev_info->intr_queue);
2249                         dev_info->intr_queue = NULL;
2250                 }
2251
2252                 rx_adapter->eth_rx_poll = rx_poll;
2253                 rx_adapter->wrr_sched = rx_wrr;
2254                 rx_adapter->wrr_len = nb_wrr;
2255                 rx_adapter->num_intr_vec += num_intr_vec;
2256
2257                 if (dev_info->nb_dev_queues == 0) {
2258                         rte_free(dev_info->rx_queue);
2259                         dev_info->rx_queue = NULL;
2260                 }
2261 unlock_ret:
2262                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2263                 if (ret) {
2264                         rte_free(rx_poll);
2265                         rte_free(rx_wrr);
2266                         return ret;
2267                 }
2268
2269                 rte_service_component_runstate_set(rx_adapter->service_id,
2270                                 rxa_sw_adapter_queue_count(rx_adapter));
2271         }
2272
2273         return ret;
2274 }
2275
2276 int
2277 rte_event_eth_rx_adapter_start(uint8_t id)
2278 {
2279         return rxa_ctrl(id, 1);
2280 }
2281
2282 int
2283 rte_event_eth_rx_adapter_stop(uint8_t id)
2284 {
2285         return rxa_ctrl(id, 0);
2286 }
2287
2288 int
2289 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2290                                struct rte_event_eth_rx_adapter_stats *stats)
2291 {
2292         struct rte_event_eth_rx_adapter *rx_adapter;
2293         struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2294         struct rte_event_eth_rx_adapter_stats dev_stats;
2295         struct rte_eventdev *dev;
2296         struct eth_device_info *dev_info;
2297         uint32_t i;
2298         int ret;
2299
2300         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2301
2302         rx_adapter = rxa_id_to_adapter(id);
2303         if (rx_adapter  == NULL || stats == NULL)
2304                 return -EINVAL;
2305
2306         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2307         memset(stats, 0, sizeof(*stats));
2308         RTE_ETH_FOREACH_DEV(i) {
2309                 dev_info = &rx_adapter->eth_devices[i];
2310                 if (dev_info->internal_event_port == 0 ||
2311                         dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2312                         continue;
2313                 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2314                                                 &rte_eth_devices[i],
2315                                                 &dev_stats);
2316                 if (ret)
2317                         continue;
2318                 dev_stats_sum.rx_packets += dev_stats.rx_packets;
2319                 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2320         }
2321
2322         if (rx_adapter->service_inited)
2323                 *stats = rx_adapter->stats;
2324
2325         stats->rx_packets += dev_stats_sum.rx_packets;
2326         stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2327         return 0;
2328 }
2329
2330 int
2331 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
2332 {
2333         struct rte_event_eth_rx_adapter *rx_adapter;
2334         struct rte_eventdev *dev;
2335         struct eth_device_info *dev_info;
2336         uint32_t i;
2337
2338         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2339
2340         rx_adapter = rxa_id_to_adapter(id);
2341         if (rx_adapter == NULL)
2342                 return -EINVAL;
2343
2344         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2345         RTE_ETH_FOREACH_DEV(i) {
2346                 dev_info = &rx_adapter->eth_devices[i];
2347                 if (dev_info->internal_event_port == 0 ||
2348                         dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
2349                         continue;
2350                 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
2351                                                         &rte_eth_devices[i]);
2352         }
2353
2354         memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
2355         return 0;
2356 }
2357
2358 int
2359 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
2360 {
2361         struct rte_event_eth_rx_adapter *rx_adapter;
2362
2363         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2364
2365         rx_adapter = rxa_id_to_adapter(id);
2366         if (rx_adapter == NULL || service_id == NULL)
2367                 return -EINVAL;
2368
2369         if (rx_adapter->service_inited)
2370                 *service_id = rx_adapter->service_id;
2371
2372         return rx_adapter->service_inited ? 0 : -ESRCH;
2373 }
2374
2375 int
2376 rte_event_eth_rx_adapter_cb_register(uint8_t id,
2377                                         uint16_t eth_dev_id,
2378                                         rte_event_eth_rx_adapter_cb_fn cb_fn,
2379                                         void *cb_arg)
2380 {
2381         struct rte_event_eth_rx_adapter *rx_adapter;
2382         struct eth_device_info *dev_info;
2383         uint32_t cap;
2384         int ret;
2385
2386         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2387         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2388
2389         rx_adapter = rxa_id_to_adapter(id);
2390         if (rx_adapter == NULL)
2391                 return -EINVAL;
2392
2393         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2394         if (dev_info->rx_queue == NULL)
2395                 return -EINVAL;
2396
2397         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2398                                                 eth_dev_id,
2399                                                 &cap);
2400         if (ret) {
2401                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2402                         "eth port %" PRIu16, id, eth_dev_id);
2403                 return ret;
2404         }
2405
2406         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2407                 RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
2408                                 PRIu16, eth_dev_id);
2409                 return -EINVAL;
2410         }
2411
2412         rte_spinlock_lock(&rx_adapter->rx_lock);
2413         dev_info->cb_fn = cb_fn;
2414         dev_info->cb_arg = cb_arg;
2415         rte_spinlock_unlock(&rx_adapter->rx_lock);
2416
2417         return 0;
2418 }