eventdev: add interrupt driven queues to Rx adapter
[dpdk.git] / lib / librte_eventdev / rte_event_eth_rx_adapter.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #if defined(LINUX)
6 #include <sys/epoll.h>
7 #endif
8 #include <unistd.h>
9
10 #include <rte_cycles.h>
11 #include <rte_common.h>
12 #include <rte_dev.h>
13 #include <rte_errno.h>
14 #include <rte_ethdev.h>
15 #include <rte_log.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
20
21 #include "rte_eventdev.h"
22 #include "rte_eventdev_pmd.h"
23 #include "rte_event_eth_rx_adapter.h"
24
25 #define BATCH_SIZE              32
26 #define BLOCK_CNT_THRESHOLD     10
27 #define ETH_EVENT_BUFFER_SIZE   (4*BATCH_SIZE)
28
29 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
30 #define ETH_RX_ADAPTER_MEM_NAME_LEN     32
31
32 #define RSS_KEY_SIZE    40
33 /* value written to intr thread pipe to signal thread exit */
34 #define ETH_BRIDGE_INTR_THREAD_EXIT     1
35 /* Sentinel value to detect initialized file handle */
36 #define INIT_FD         -1
37
38 /*
39  * Used to store port and queue ID of interrupting Rx queue
40  */
41 union queue_data {
42         RTE_STD_C11
43         void *ptr;
44         struct {
45                 uint16_t port;
46                 uint16_t queue;
47         };
48 };
49
50 /*
51  * There is an instance of this struct per polled Rx queue added to the
52  * adapter
53  */
54 struct eth_rx_poll_entry {
55         /* Eth port to poll */
56         uint16_t eth_dev_id;
57         /* Eth rx queue to poll */
58         uint16_t eth_rx_qid;
59 };
60
61 /* Instance per adapter */
62 struct rte_eth_event_enqueue_buffer {
63         /* Count of events in this buffer */
64         uint16_t count;
65         /* Array of events in this buffer */
66         struct rte_event events[ETH_EVENT_BUFFER_SIZE];
67 };
68
69 struct rte_event_eth_rx_adapter {
70         /* RSS key */
71         uint8_t rss_key_be[RSS_KEY_SIZE];
72         /* Event device identifier */
73         uint8_t eventdev_id;
74         /* Per ethernet device structure */
75         struct eth_device_info *eth_devices;
76         /* Event port identifier */
77         uint8_t event_port_id;
78         /* Lock to serialize config updates with service function */
79         rte_spinlock_t rx_lock;
80         /* Max mbufs processed in any service function invocation */
81         uint32_t max_nb_rx;
82         /* Receive queues that need to be polled */
83         struct eth_rx_poll_entry *eth_rx_poll;
84         /* Size of the eth_rx_poll array */
85         uint16_t num_rx_polled;
86         /* Weighted round robin schedule */
87         uint32_t *wrr_sched;
88         /* wrr_sched[] size */
89         uint32_t wrr_len;
90         /* Next entry in wrr[] to begin polling */
91         uint32_t wrr_pos;
92         /* Event burst buffer */
93         struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
94         /* Per adapter stats */
95         struct rte_event_eth_rx_adapter_stats stats;
96         /* Block count, counts up to BLOCK_CNT_THRESHOLD */
97         uint16_t enq_block_count;
98         /* Block start ts */
99         uint64_t rx_enq_block_start_ts;
100         /* epoll fd used to wait for Rx interrupts */
101         int epd;
102         /* Num of interrupt driven interrupt queues */
103         uint32_t num_rx_intr;
104         /* Used to send <dev id, queue id> of interrupting Rx queues from
105          * the interrupt thread to the Rx thread
106          */
107         struct rte_ring *intr_ring;
108         /* Rx Queue data (dev id, queue id) for the last non-empty
109          * queue polled
110          */
111         union queue_data qd;
112         /* queue_data is valid */
113         int qd_valid;
114         /* Interrupt ring lock, synchronizes Rx thread
115          * and interrupt thread
116          */
117         rte_spinlock_t intr_ring_lock;
118         /* event array passed to rte_poll_wait */
119         struct rte_epoll_event *epoll_events;
120         /* Count of interrupt vectors in use */
121         uint32_t num_intr_vec;
122         /* Thread blocked on Rx interrupts */
123         pthread_t rx_intr_thread;
124         /* Configuration callback for rte_service configuration */
125         rte_event_eth_rx_adapter_conf_cb conf_cb;
126         /* Configuration callback argument */
127         void *conf_arg;
128         /* Set if  default_cb is being used */
129         int default_cb_arg;
130         /* Service initialization state */
131         uint8_t service_inited;
132         /* Total count of Rx queues in adapter */
133         uint32_t nb_queues;
134         /* Memory allocation name */
135         char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
136         /* Socket identifier cached from eventdev */
137         int socket_id;
138         /* Per adapter EAL service */
139         uint32_t service_id;
140         /* Adapter started flag */
141         uint8_t rxa_started;
142         /* Adapter ID */
143         uint8_t id;
144 } __rte_cache_aligned;
145
146 /* Per eth device */
147 struct eth_device_info {
148         struct rte_eth_dev *dev;
149         struct eth_rx_queue_info *rx_queue;
150         /* Set if ethdev->eventdev packet transfer uses a
151          * hardware mechanism
152          */
153         uint8_t internal_event_port;
154         /* Set if the adapter is processing rx queues for
155          * this eth device and packet processing has been
156          * started, allows for the code to know if the PMD
157          * rx_adapter_stop callback needs to be invoked
158          */
159         uint8_t dev_rx_started;
160         /* Number of queues added for this device */
161         uint16_t nb_dev_queues;
162         /* Number of poll based queues
163          * If nb_rx_poll > 0, the start callback will
164          * be invoked if not already invoked
165          */
166         uint16_t nb_rx_poll;
167         /* Number of interrupt based queues
168          * If nb_rx_intr > 0, the start callback will
169          * be invoked if not already invoked.
170          */
171         uint16_t nb_rx_intr;
172         /* Number of queues that use the shared interrupt */
173         uint16_t nb_shared_intr;
174         /* sum(wrr(q)) for all queues within the device
175          * useful when deleting all device queues
176          */
177         uint32_t wrr_len;
178         /* Intr based queue index to start polling from, this is used
179          * if the number of shared interrupts is non-zero
180          */
181         uint16_t next_q_idx;
182         /* Intr based queue indices */
183         uint16_t *intr_queue;
184         /* device generates per Rx queue interrupt for queue index
185          * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
186          */
187         int multi_intr_cap;
188         /* shared interrupt enabled */
189         int shared_intr_enabled;
190 };
191
192 /* Per Rx queue */
193 struct eth_rx_queue_info {
194         int queue_enabled;      /* True if added */
195         int intr_enabled;
196         uint16_t wt;            /* Polling weight */
197         uint8_t event_queue_id; /* Event queue to enqueue packets to */
198         uint8_t sched_type;     /* Sched type for events */
199         uint8_t priority;       /* Event priority */
200         uint32_t flow_id;       /* App provided flow identifier */
201         uint32_t flow_id_mask;  /* Set to ~0 if app provides flow id else 0 */
202 };
203
204 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
205
206 static inline int
207 rxa_validate_id(uint8_t id)
208 {
209         return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
210 }
211
212 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
213         if (!rxa_validate_id(id)) { \
214                 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
215                 return retval; \
216         } \
217 } while (0)
218
219 static inline int
220 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
221 {
222         return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
223 }
224
225 /* Greatest common divisor */
226 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
227 {
228         uint16_t r = a % b;
229
230         return r ? rxa_gcd_u16(b, r) : b;
231 }
232
233 /* Returns the next queue in the polling sequence
234  *
235  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
236  */
237 static int
238 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
239          unsigned int n, int *cw,
240          struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
241          uint16_t gcd, int prev)
242 {
243         int i = prev;
244         uint16_t w;
245
246         while (1) {
247                 uint16_t q;
248                 uint16_t d;
249
250                 i = (i + 1) % n;
251                 if (i == 0) {
252                         *cw = *cw - gcd;
253                         if (*cw <= 0)
254                                 *cw = max_wt;
255                 }
256
257                 q = eth_rx_poll[i].eth_rx_qid;
258                 d = eth_rx_poll[i].eth_dev_id;
259                 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
260
261                 if ((int)w >= *cw)
262                         return i;
263         }
264 }
265
266 static inline int
267 rxa_shared_intr(struct eth_device_info *dev_info,
268         int rx_queue_id)
269 {
270         int multi_intr_cap;
271
272         if (dev_info->dev->intr_handle == NULL)
273                 return 0;
274
275         multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
276         return !multi_intr_cap ||
277                 rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
278 }
279
280 static inline int
281 rxa_intr_queue(struct eth_device_info *dev_info,
282         int rx_queue_id)
283 {
284         struct eth_rx_queue_info *queue_info;
285
286         queue_info = &dev_info->rx_queue[rx_queue_id];
287         return dev_info->rx_queue &&
288                 !dev_info->internal_event_port &&
289                 queue_info->queue_enabled && queue_info->wt == 0;
290 }
291
292 static inline int
293 rxa_polled_queue(struct eth_device_info *dev_info,
294         int rx_queue_id)
295 {
296         struct eth_rx_queue_info *queue_info;
297
298         queue_info = &dev_info->rx_queue[rx_queue_id];
299         return !dev_info->internal_event_port &&
300                 dev_info->rx_queue &&
301                 queue_info->queue_enabled && queue_info->wt != 0;
302 }
303
304 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
305 static int
306 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
307 {
308         uint16_t i;
309         int n, s;
310         uint16_t nbq;
311
312         nbq = dev_info->dev->data->nb_rx_queues;
313         n = 0; /* non shared count */
314         s = 0; /* shared count */
315
316         if (rx_queue_id == -1) {
317                 for (i = 0; i < nbq; i++) {
318                         if (!rxa_shared_intr(dev_info, i))
319                                 n += add ? !rxa_intr_queue(dev_info, i) :
320                                         rxa_intr_queue(dev_info, i);
321                         else
322                                 s += add ? !rxa_intr_queue(dev_info, i) :
323                                         rxa_intr_queue(dev_info, i);
324                 }
325
326                 if (s > 0) {
327                         if ((add && dev_info->nb_shared_intr == 0) ||
328                                 (!add && dev_info->nb_shared_intr))
329                                 n += 1;
330                 }
331         } else {
332                 if (!rxa_shared_intr(dev_info, rx_queue_id))
333                         n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
334                                 rxa_intr_queue(dev_info, rx_queue_id);
335                 else
336                         n = add ? !dev_info->nb_shared_intr :
337                                 dev_info->nb_shared_intr == 1;
338         }
339
340         return add ? n : -n;
341 }
342
343 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
344  */
345 static void
346 rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
347                         struct eth_device_info *dev_info,
348                         int rx_queue_id,
349                         uint32_t *nb_rx_intr)
350 {
351         uint32_t intr_diff;
352
353         if (rx_queue_id == -1)
354                 intr_diff = dev_info->nb_rx_intr;
355         else
356                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
357
358         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
359 }
360
361 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
362  * interrupt queues could currently be poll mode Rx queues
363  */
364 static void
365 rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
366                         struct eth_device_info *dev_info,
367                         int rx_queue_id,
368                         uint32_t *nb_rx_poll,
369                         uint32_t *nb_rx_intr,
370                         uint32_t *nb_wrr)
371 {
372         uint32_t intr_diff;
373         uint32_t poll_diff;
374         uint32_t wrr_len_diff;
375
376         if (rx_queue_id == -1) {
377                 intr_diff = dev_info->dev->data->nb_rx_queues -
378                                                 dev_info->nb_rx_intr;
379                 poll_diff = dev_info->nb_rx_poll;
380                 wrr_len_diff = dev_info->wrr_len;
381         } else {
382                 intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
383                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
384                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
385                                         0;
386         }
387
388         *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
389         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
390         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
391 }
392
393 /* Calculate size of the eth_rx_poll and wrr_sched arrays
394  * after deleting poll mode rx queues
395  */
396 static void
397 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
398                         struct eth_device_info *dev_info,
399                         int rx_queue_id,
400                         uint32_t *nb_rx_poll,
401                         uint32_t *nb_wrr)
402 {
403         uint32_t poll_diff;
404         uint32_t wrr_len_diff;
405
406         if (rx_queue_id == -1) {
407                 poll_diff = dev_info->nb_rx_poll;
408                 wrr_len_diff = dev_info->wrr_len;
409         } else {
410                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
411                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
412                                         0;
413         }
414
415         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
416         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
417 }
418
419 /* Calculate nb_rx_* after adding poll mode rx queues
420  */
421 static void
422 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
423                         struct eth_device_info *dev_info,
424                         int rx_queue_id,
425                         uint16_t wt,
426                         uint32_t *nb_rx_poll,
427                         uint32_t *nb_rx_intr,
428                         uint32_t *nb_wrr)
429 {
430         uint32_t intr_diff;
431         uint32_t poll_diff;
432         uint32_t wrr_len_diff;
433
434         if (rx_queue_id == -1) {
435                 intr_diff = dev_info->nb_rx_intr;
436                 poll_diff = dev_info->dev->data->nb_rx_queues -
437                                                 dev_info->nb_rx_poll;
438                 wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
439                                 - dev_info->wrr_len;
440         } else {
441                 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
442                 poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
443                 wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
444                                 wt - dev_info->rx_queue[rx_queue_id].wt :
445                                 wt;
446         }
447
448         *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
449         *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
450         *nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
451 }
452
453 /* Calculate nb_rx_* after adding rx_queue_id */
454 static void
455 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
456                 struct eth_device_info *dev_info,
457                 int rx_queue_id,
458                 uint16_t wt,
459                 uint32_t *nb_rx_poll,
460                 uint32_t *nb_rx_intr,
461                 uint32_t *nb_wrr)
462 {
463         if (wt != 0)
464                 rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
465                                         wt, nb_rx_poll, nb_rx_intr, nb_wrr);
466         else
467                 rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
468                                         nb_rx_poll, nb_rx_intr, nb_wrr);
469 }
470
471 /* Calculate nb_rx_* after deleting rx_queue_id */
472 static void
473 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
474                 struct eth_device_info *dev_info,
475                 int rx_queue_id,
476                 uint32_t *nb_rx_poll,
477                 uint32_t *nb_rx_intr,
478                 uint32_t *nb_wrr)
479 {
480         rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
481                                 nb_wrr);
482         rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
483                                 nb_rx_intr);
484 }
485
486 /*
487  * Allocate the rx_poll array
488  */
489 static struct eth_rx_poll_entry *
490 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
491         uint32_t num_rx_polled)
492 {
493         size_t len;
494
495         len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
496                                                         RTE_CACHE_LINE_SIZE);
497         return  rte_zmalloc_socket(rx_adapter->mem_name,
498                                 len,
499                                 RTE_CACHE_LINE_SIZE,
500                                 rx_adapter->socket_id);
501 }
502
503 /*
504  * Allocate the WRR array
505  */
506 static uint32_t *
507 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
508 {
509         size_t len;
510
511         len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
512                         RTE_CACHE_LINE_SIZE);
513         return  rte_zmalloc_socket(rx_adapter->mem_name,
514                                 len,
515                                 RTE_CACHE_LINE_SIZE,
516                                 rx_adapter->socket_id);
517 }
518
519 static int
520 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
521                 uint32_t nb_poll,
522                 uint32_t nb_wrr,
523                 struct eth_rx_poll_entry **rx_poll,
524                 uint32_t **wrr_sched)
525 {
526
527         if (nb_poll == 0) {
528                 *rx_poll = NULL;
529                 *wrr_sched = NULL;
530                 return 0;
531         }
532
533         *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
534         if (*rx_poll == NULL) {
535                 *wrr_sched = NULL;
536                 return -ENOMEM;
537         }
538
539         *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
540         if (*wrr_sched == NULL) {
541                 rte_free(*rx_poll);
542                 return -ENOMEM;
543         }
544         return 0;
545 }
546
547 /* Precalculate WRR polling sequence for all queues in rx_adapter */
548 static void
549 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
550                 struct eth_rx_poll_entry *rx_poll,
551                 uint32_t *rx_wrr)
552 {
553         uint16_t d;
554         uint16_t q;
555         unsigned int i;
556         int prev = -1;
557         int cw = -1;
558
559         /* Initialize variables for calculation of wrr schedule */
560         uint16_t max_wrr_pos = 0;
561         unsigned int poll_q = 0;
562         uint16_t max_wt = 0;
563         uint16_t gcd = 0;
564
565         if (rx_poll == NULL)
566                 return;
567
568         /* Generate array of all queues to poll, the size of this
569          * array is poll_q
570          */
571         RTE_ETH_FOREACH_DEV(d) {
572                 uint16_t nb_rx_queues;
573                 struct eth_device_info *dev_info =
574                                 &rx_adapter->eth_devices[d];
575                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
576                 if (dev_info->rx_queue == NULL)
577                         continue;
578                 if (dev_info->internal_event_port)
579                         continue;
580                 dev_info->wrr_len = 0;
581                 for (q = 0; q < nb_rx_queues; q++) {
582                         struct eth_rx_queue_info *queue_info =
583                                 &dev_info->rx_queue[q];
584                         uint16_t wt;
585
586                         if (!rxa_polled_queue(dev_info, q))
587                                 continue;
588                         wt = queue_info->wt;
589                         rx_poll[poll_q].eth_dev_id = d;
590                         rx_poll[poll_q].eth_rx_qid = q;
591                         max_wrr_pos += wt;
592                         dev_info->wrr_len += wt;
593                         max_wt = RTE_MAX(max_wt, wt);
594                         gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
595                         poll_q++;
596                 }
597         }
598
599         /* Generate polling sequence based on weights */
600         prev = -1;
601         cw = -1;
602         for (i = 0; i < max_wrr_pos; i++) {
603                 rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
604                                      rx_poll, max_wt, gcd, prev);
605                 prev = rx_wrr[i];
606         }
607 }
608
609 static inline void
610 rxa_mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr,
611         struct ipv6_hdr **ipv6_hdr)
612 {
613         struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(m, struct ether_hdr *);
614         struct vlan_hdr *vlan_hdr;
615
616         *ipv4_hdr = NULL;
617         *ipv6_hdr = NULL;
618
619         switch (eth_hdr->ether_type) {
620         case RTE_BE16(ETHER_TYPE_IPv4):
621                 *ipv4_hdr = (struct ipv4_hdr *)(eth_hdr + 1);
622                 break;
623
624         case RTE_BE16(ETHER_TYPE_IPv6):
625                 *ipv6_hdr = (struct ipv6_hdr *)(eth_hdr + 1);
626                 break;
627
628         case RTE_BE16(ETHER_TYPE_VLAN):
629                 vlan_hdr = (struct vlan_hdr *)(eth_hdr + 1);
630                 switch (vlan_hdr->eth_proto) {
631                 case RTE_BE16(ETHER_TYPE_IPv4):
632                         *ipv4_hdr = (struct ipv4_hdr *)(vlan_hdr + 1);
633                         break;
634                 case RTE_BE16(ETHER_TYPE_IPv6):
635                         *ipv6_hdr = (struct ipv6_hdr *)(vlan_hdr + 1);
636                         break;
637                 default:
638                         break;
639                 }
640                 break;
641
642         default:
643                 break;
644         }
645 }
646
647 /* Calculate RSS hash for IPv4/6 */
648 static inline uint32_t
649 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
650 {
651         uint32_t input_len;
652         void *tuple;
653         struct rte_ipv4_tuple ipv4_tuple;
654         struct rte_ipv6_tuple ipv6_tuple;
655         struct ipv4_hdr *ipv4_hdr;
656         struct ipv6_hdr *ipv6_hdr;
657
658         rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
659
660         if (ipv4_hdr) {
661                 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
662                 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
663                 tuple = &ipv4_tuple;
664                 input_len = RTE_THASH_V4_L3_LEN;
665         } else if (ipv6_hdr) {
666                 rte_thash_load_v6_addrs(ipv6_hdr,
667                                         (union rte_thash_tuple *)&ipv6_tuple);
668                 tuple = &ipv6_tuple;
669                 input_len = RTE_THASH_V6_L3_LEN;
670         } else
671                 return 0;
672
673         return rte_softrss_be(tuple, input_len, rss_key_be);
674 }
675
676 static inline int
677 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
678 {
679         return !!rx_adapter->enq_block_count;
680 }
681
682 static inline void
683 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
684 {
685         if (rx_adapter->rx_enq_block_start_ts)
686                 return;
687
688         rx_adapter->enq_block_count++;
689         if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
690                 return;
691
692         rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
693 }
694
695 static inline void
696 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
697                     struct rte_event_eth_rx_adapter_stats *stats)
698 {
699         if (unlikely(!stats->rx_enq_start_ts))
700                 stats->rx_enq_start_ts = rte_get_tsc_cycles();
701
702         if (likely(!rxa_enq_blocked(rx_adapter)))
703                 return;
704
705         rx_adapter->enq_block_count = 0;
706         if (rx_adapter->rx_enq_block_start_ts) {
707                 stats->rx_enq_end_ts = rte_get_tsc_cycles();
708                 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
709                     rx_adapter->rx_enq_block_start_ts;
710                 rx_adapter->rx_enq_block_start_ts = 0;
711         }
712 }
713
714 /* Add event to buffer, free space check is done prior to calling
715  * this function
716  */
717 static inline void
718 rxa_buffer_event(struct rte_event_eth_rx_adapter *rx_adapter,
719                 struct rte_event *ev)
720 {
721         struct rte_eth_event_enqueue_buffer *buf =
722             &rx_adapter->event_enqueue_buffer;
723         rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event));
724 }
725
726 /* Enqueue buffered events to event device */
727 static inline uint16_t
728 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
729 {
730         struct rte_eth_event_enqueue_buffer *buf =
731             &rx_adapter->event_enqueue_buffer;
732         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
733
734         uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
735                                         rx_adapter->event_port_id,
736                                         buf->events,
737                                         buf->count);
738         if (n != buf->count) {
739                 memmove(buf->events,
740                         &buf->events[n],
741                         (buf->count - n) * sizeof(struct rte_event));
742                 stats->rx_enq_retry++;
743         }
744
745         n ? rxa_enq_block_end_ts(rx_adapter, stats) :
746                 rxa_enq_block_start_ts(rx_adapter);
747
748         buf->count -= n;
749         stats->rx_enq_count += n;
750
751         return n;
752 }
753
754 static inline void
755 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
756                 uint16_t eth_dev_id,
757                 uint16_t rx_queue_id,
758                 struct rte_mbuf **mbufs,
759                 uint16_t num)
760 {
761         uint32_t i;
762         struct eth_device_info *eth_device_info =
763                                         &rx_adapter->eth_devices[eth_dev_id];
764         struct eth_rx_queue_info *eth_rx_queue_info =
765                                         &eth_device_info->rx_queue[rx_queue_id];
766
767         int32_t qid = eth_rx_queue_info->event_queue_id;
768         uint8_t sched_type = eth_rx_queue_info->sched_type;
769         uint8_t priority = eth_rx_queue_info->priority;
770         uint32_t flow_id;
771         struct rte_event events[BATCH_SIZE];
772         struct rte_mbuf *m = mbufs[0];
773         uint32_t rss_mask;
774         uint32_t rss;
775         int do_rss;
776         uint64_t ts;
777
778         /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
779         rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
780         do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
781
782         if ((m->ol_flags & PKT_RX_TIMESTAMP) == 0) {
783                 ts = rte_get_tsc_cycles();
784                 for (i = 0; i < num; i++) {
785                         m = mbufs[i];
786
787                         m->timestamp = ts;
788                         m->ol_flags |= PKT_RX_TIMESTAMP;
789                 }
790         }
791
792         for (i = 0; i < num; i++) {
793                 m = mbufs[i];
794                 struct rte_event *ev = &events[i];
795
796                 rss = do_rss ?
797                         rxa_do_softrss(m, rx_adapter->rss_key_be) :
798                         m->hash.rss;
799                 flow_id =
800                     eth_rx_queue_info->flow_id &
801                                 eth_rx_queue_info->flow_id_mask;
802                 flow_id |= rss & ~eth_rx_queue_info->flow_id_mask;
803                 ev->flow_id = flow_id;
804                 ev->op = RTE_EVENT_OP_NEW;
805                 ev->sched_type = sched_type;
806                 ev->queue_id = qid;
807                 ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
808                 ev->sub_event_type = 0;
809                 ev->priority = priority;
810                 ev->mbuf = m;
811
812                 rxa_buffer_event(rx_adapter, ev);
813         }
814 }
815
816 /* Enqueue packets from  <port, q>  to event buffer */
817 static inline uint32_t
818 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
819         uint16_t port_id,
820         uint16_t queue_id,
821         uint32_t rx_count,
822         uint32_t max_rx,
823         int *rxq_empty)
824 {
825         struct rte_mbuf *mbufs[BATCH_SIZE];
826         struct rte_eth_event_enqueue_buffer *buf =
827                                         &rx_adapter->event_enqueue_buffer;
828         struct rte_event_eth_rx_adapter_stats *stats =
829                                         &rx_adapter->stats;
830         uint16_t n;
831         uint32_t nb_rx = 0;
832
833         if (rxq_empty)
834                 *rxq_empty = 0;
835         /* Don't do a batch dequeue from the rx queue if there isn't
836          * enough space in the enqueue buffer.
837          */
838         while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
839                 if (buf->count >= BATCH_SIZE)
840                         rxa_flush_event_buffer(rx_adapter);
841
842                 stats->rx_poll_count++;
843                 n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
844                 if (unlikely(!n)) {
845                         if (rxq_empty)
846                                 *rxq_empty = 1;
847                         break;
848                 }
849                 rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
850                 nb_rx += n;
851                 if (rx_count + nb_rx > max_rx)
852                         break;
853         }
854
855         if (buf->count >= BATCH_SIZE)
856                 rxa_flush_event_buffer(rx_adapter);
857
858         return nb_rx;
859 }
860
861 static inline void
862 rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
863                 void *data)
864 {
865         uint16_t port_id;
866         uint16_t queue;
867         int err;
868         union queue_data qd;
869         struct eth_device_info *dev_info;
870         struct eth_rx_queue_info *queue_info;
871         int *intr_enabled;
872
873         qd.ptr = data;
874         port_id = qd.port;
875         queue = qd.queue;
876
877         dev_info = &rx_adapter->eth_devices[port_id];
878         queue_info = &dev_info->rx_queue[queue];
879         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
880         if (rxa_shared_intr(dev_info, queue))
881                 intr_enabled = &dev_info->shared_intr_enabled;
882         else
883                 intr_enabled = &queue_info->intr_enabled;
884
885         if (*intr_enabled) {
886                 *intr_enabled = 0;
887                 err = rte_ring_enqueue(rx_adapter->intr_ring, data);
888                 /* Entry should always be available.
889                  * The ring size equals the maximum number of interrupt
890                  * vectors supported (an interrupt vector is shared in
891                  * case of shared interrupts)
892                  */
893                 if (err)
894                         RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
895                                 " to ring: %s", strerror(err));
896                 else
897                         rte_eth_dev_rx_intr_disable(port_id, queue);
898         }
899         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
900 }
901
902 static int
903 rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
904                         uint32_t num_intr_vec)
905 {
906         if (rx_adapter->num_intr_vec + num_intr_vec >
907                                 RTE_EVENT_ETH_INTR_RING_SIZE) {
908                 RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
909                 " %d needed %d limit %d", rx_adapter->num_intr_vec,
910                 num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
911                 return -ENOSPC;
912         }
913
914         return 0;
915 }
916
917 /* Delete entries for (dev, queue) from the interrupt ring */
918 static void
919 rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
920                         struct eth_device_info *dev_info,
921                         uint16_t rx_queue_id)
922 {
923         int i, n;
924         union queue_data qd;
925
926         rte_spinlock_lock(&rx_adapter->intr_ring_lock);
927
928         n = rte_ring_count(rx_adapter->intr_ring);
929         for (i = 0; i < n; i++) {
930                 rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
931                 if (!rxa_shared_intr(dev_info, rx_queue_id)) {
932                         if (qd.port == dev_info->dev->data->port_id &&
933                                 qd.queue == rx_queue_id)
934                                 continue;
935                 } else {
936                         if (qd.port == dev_info->dev->data->port_id)
937                                 continue;
938                 }
939                 rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
940         }
941
942         rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
943 }
944
945 /* pthread callback handling interrupt mode receive queues
946  * After receiving an Rx interrupt, it enqueues the port id and queue id of the
947  * interrupting queue to the adapter's ring buffer for interrupt events.
948  * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
949  * the adapter service function.
950  */
951 static void *
952 rxa_intr_thread(void *arg)
953 {
954         struct rte_event_eth_rx_adapter *rx_adapter = arg;
955         struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
956         int n, i;
957
958         while (1) {
959                 n = rte_epoll_wait(rx_adapter->epd, epoll_events,
960                                 RTE_EVENT_ETH_INTR_RING_SIZE, -1);
961                 if (unlikely(n < 0))
962                         RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
963                                         n);
964                 for (i = 0; i < n; i++) {
965                         rxa_intr_ring_enqueue(rx_adapter,
966                                         epoll_events[i].epdata.data);
967                 }
968         }
969
970         return NULL;
971 }
972
973 /* Dequeue <port, q> from interrupt ring and enqueue received
974  * mbufs to eventdev
975  */
976 static inline uint32_t
977 rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
978 {
979         uint32_t n;
980         uint32_t nb_rx = 0;
981         int rxq_empty;
982         struct rte_eth_event_enqueue_buffer *buf;
983         rte_spinlock_t *ring_lock;
984         uint8_t max_done = 0;
985
986         if (rx_adapter->num_rx_intr == 0)
987                 return 0;
988
989         if (rte_ring_count(rx_adapter->intr_ring) == 0
990                 && !rx_adapter->qd_valid)
991                 return 0;
992
993         buf = &rx_adapter->event_enqueue_buffer;
994         ring_lock = &rx_adapter->intr_ring_lock;
995
996         if (buf->count >= BATCH_SIZE)
997                 rxa_flush_event_buffer(rx_adapter);
998
999         while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
1000                 struct eth_device_info *dev_info;
1001                 uint16_t port;
1002                 uint16_t queue;
1003                 union queue_data qd  = rx_adapter->qd;
1004                 int err;
1005
1006                 if (!rx_adapter->qd_valid) {
1007                         struct eth_rx_queue_info *queue_info;
1008
1009                         rte_spinlock_lock(ring_lock);
1010                         err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1011                         if (err) {
1012                                 rte_spinlock_unlock(ring_lock);
1013                                 break;
1014                         }
1015
1016                         port = qd.port;
1017                         queue = qd.queue;
1018                         rx_adapter->qd = qd;
1019                         rx_adapter->qd_valid = 1;
1020                         dev_info = &rx_adapter->eth_devices[port];
1021                         if (rxa_shared_intr(dev_info, queue))
1022                                 dev_info->shared_intr_enabled = 1;
1023                         else {
1024                                 queue_info = &dev_info->rx_queue[queue];
1025                                 queue_info->intr_enabled = 1;
1026                         }
1027                         rte_eth_dev_rx_intr_enable(port, queue);
1028                         rte_spinlock_unlock(ring_lock);
1029                 } else {
1030                         port = qd.port;
1031                         queue = qd.queue;
1032
1033                         dev_info = &rx_adapter->eth_devices[port];
1034                 }
1035
1036                 if (rxa_shared_intr(dev_info, queue)) {
1037                         uint16_t i;
1038                         uint16_t nb_queues;
1039
1040                         nb_queues = dev_info->dev->data->nb_rx_queues;
1041                         n = 0;
1042                         for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1043                                 uint8_t enq_buffer_full;
1044
1045                                 if (!rxa_intr_queue(dev_info, i))
1046                                         continue;
1047                                 n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1048                                         rx_adapter->max_nb_rx,
1049                                         &rxq_empty);
1050                                 nb_rx += n;
1051
1052                                 enq_buffer_full = !rxq_empty && n == 0;
1053                                 max_done = nb_rx > rx_adapter->max_nb_rx;
1054
1055                                 if (enq_buffer_full || max_done) {
1056                                         dev_info->next_q_idx = i;
1057                                         goto done;
1058                                 }
1059                         }
1060
1061                         rx_adapter->qd_valid = 0;
1062
1063                         /* Reinitialize for next interrupt */
1064                         dev_info->next_q_idx = dev_info->multi_intr_cap ?
1065                                                 RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1066                                                 0;
1067                 } else {
1068                         n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1069                                 rx_adapter->max_nb_rx,
1070                                 &rxq_empty);
1071                         rx_adapter->qd_valid = !rxq_empty;
1072                         nb_rx += n;
1073                         if (nb_rx > rx_adapter->max_nb_rx)
1074                                 break;
1075                 }
1076         }
1077
1078 done:
1079         rx_adapter->stats.rx_intr_packets += nb_rx;
1080         return nb_rx;
1081 }
1082
1083 /*
1084  * Polls receive queues added to the event adapter and enqueues received
1085  * packets to the event device.
1086  *
1087  * The receive code enqueues initially to a temporary buffer, the
1088  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1089  *
1090  * If there isn't space available in the temporary buffer, packets from the
1091  * Rx queue aren't dequeued from the eth device, this back pressures the
1092  * eth device, in virtual device environments this back pressure is relayed to
1093  * the hypervisor's switching layer where adjustments can be made to deal with
1094  * it.
1095  */
1096 static inline uint32_t
1097 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
1098 {
1099         uint32_t num_queue;
1100         uint32_t nb_rx = 0;
1101         struct rte_eth_event_enqueue_buffer *buf;
1102         uint32_t wrr_pos;
1103         uint32_t max_nb_rx;
1104
1105         wrr_pos = rx_adapter->wrr_pos;
1106         max_nb_rx = rx_adapter->max_nb_rx;
1107         buf = &rx_adapter->event_enqueue_buffer;
1108         stats = &rx_adapter->stats;
1109
1110         /* Iterate through a WRR sequence */
1111         for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1112                 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1113                 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1114                 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1115
1116                 /* Don't do a batch dequeue from the rx queue if there isn't
1117                  * enough space in the enqueue buffer.
1118                  */
1119                 if (buf->count >= BATCH_SIZE)
1120                         rxa_flush_event_buffer(rx_adapter);
1121                 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
1122                         rx_adapter->wrr_pos = wrr_pos;
1123                         return nb_rx;
1124                 }
1125
1126                 nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1127                                 NULL);
1128                 if (nb_rx > max_nb_rx) {
1129                         rx_adapter->wrr_pos =
1130                                     (wrr_pos + 1) % rx_adapter->wrr_len;
1131                         break;
1132                 }
1133
1134                 if (++wrr_pos == rx_adapter->wrr_len)
1135                         wrr_pos = 0;
1136         }
1137         return nb_rx;
1138 }
1139
1140 static int
1141 rxa_service_func(void *args)
1142 {
1143         struct rte_event_eth_rx_adapter *rx_adapter = args;
1144         struct rte_event_eth_rx_adapter_stats *stats;
1145
1146         if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1147                 return 0;
1148         if (!rx_adapter->rxa_started) {
1149                 return 0;
1150                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1151         }
1152
1153         stats = &rx_adapter->stats;
1154         stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
1155         stats->rx_packets += rxa_poll(rx_adapter);
1156         rte_spinlock_unlock(&rx_adapter->rx_lock);
1157         return 0;
1158 }
1159
1160 static int
1161 rte_event_eth_rx_adapter_init(void)
1162 {
1163         const char *name = "rte_event_eth_rx_adapter_array";
1164         const struct rte_memzone *mz;
1165         unsigned int sz;
1166
1167         sz = sizeof(*event_eth_rx_adapter) *
1168             RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1169         sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1170
1171         mz = rte_memzone_lookup(name);
1172         if (mz == NULL) {
1173                 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1174                                                  RTE_CACHE_LINE_SIZE);
1175                 if (mz == NULL) {
1176                         RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1177                                         PRId32, rte_errno);
1178                         return -rte_errno;
1179                 }
1180         }
1181
1182         event_eth_rx_adapter = mz->addr;
1183         return 0;
1184 }
1185
1186 static inline struct rte_event_eth_rx_adapter *
1187 rxa_id_to_adapter(uint8_t id)
1188 {
1189         return event_eth_rx_adapter ?
1190                 event_eth_rx_adapter[id] : NULL;
1191 }
1192
1193 static int
1194 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1195                 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1196 {
1197         int ret;
1198         struct rte_eventdev *dev;
1199         struct rte_event_dev_config dev_conf;
1200         int started;
1201         uint8_t port_id;
1202         struct rte_event_port_conf *port_conf = arg;
1203         struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1204
1205         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1206         dev_conf = dev->data->dev_conf;
1207
1208         started = dev->data->dev_started;
1209         if (started)
1210                 rte_event_dev_stop(dev_id);
1211         port_id = dev_conf.nb_event_ports;
1212         dev_conf.nb_event_ports += 1;
1213         ret = rte_event_dev_configure(dev_id, &dev_conf);
1214         if (ret) {
1215                 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1216                                                 dev_id);
1217                 if (started) {
1218                         if (rte_event_dev_start(dev_id))
1219                                 return -EIO;
1220                 }
1221                 return ret;
1222         }
1223
1224         ret = rte_event_port_setup(dev_id, port_id, port_conf);
1225         if (ret) {
1226                 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1227                                         port_id);
1228                 return ret;
1229         }
1230
1231         conf->event_port_id = port_id;
1232         conf->max_nb_rx = 128;
1233         if (started)
1234                 ret = rte_event_dev_start(dev_id);
1235         rx_adapter->default_cb_arg = 1;
1236         return ret;
1237 }
1238
1239 static int
1240 rxa_epoll_create1(void)
1241 {
1242 #if defined(LINUX)
1243         int fd;
1244         fd = epoll_create1(EPOLL_CLOEXEC);
1245         return fd < 0 ? -errno : fd;
1246 #elif defined(BSD)
1247         return -ENOTSUP;
1248 #endif
1249 }
1250
1251 static int
1252 rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
1253 {
1254         if (rx_adapter->epd != INIT_FD)
1255                 return 0;
1256
1257         rx_adapter->epd = rxa_epoll_create1();
1258         if (rx_adapter->epd < 0) {
1259                 int err = rx_adapter->epd;
1260                 rx_adapter->epd = INIT_FD;
1261                 RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1262                 return err;
1263         }
1264
1265         return 0;
1266 }
1267
1268 static int
1269 rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1270 {
1271         int err;
1272         char thread_name[RTE_MAX_THREAD_NAME_LEN];
1273
1274         if (rx_adapter->intr_ring)
1275                 return 0;
1276
1277         rx_adapter->intr_ring = rte_ring_create("intr_ring",
1278                                         RTE_EVENT_ETH_INTR_RING_SIZE,
1279                                         rte_socket_id(), 0);
1280         if (!rx_adapter->intr_ring)
1281                 return -ENOMEM;
1282
1283         rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1284                                         RTE_EVENT_ETH_INTR_RING_SIZE *
1285                                         sizeof(struct rte_epoll_event),
1286                                         RTE_CACHE_LINE_SIZE,
1287                                         rx_adapter->socket_id);
1288         if (!rx_adapter->epoll_events) {
1289                 err = -ENOMEM;
1290                 goto error;
1291         }
1292
1293         rte_spinlock_init(&rx_adapter->intr_ring_lock);
1294
1295         snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1296                         "rx-intr-thread-%d", rx_adapter->id);
1297
1298         err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1299                                 NULL, rxa_intr_thread, rx_adapter);
1300         if (!err) {
1301                 rte_thread_setname(rx_adapter->rx_intr_thread, thread_name);
1302                 return 0;
1303         }
1304
1305         RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1306 error:
1307         rte_ring_free(rx_adapter->intr_ring);
1308         rx_adapter->intr_ring = NULL;
1309         rx_adapter->epoll_events = NULL;
1310         return err;
1311 }
1312
1313 static int
1314 rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1315 {
1316         int err;
1317
1318         err = pthread_cancel(rx_adapter->rx_intr_thread);
1319         if (err)
1320                 RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1321                                 err);
1322
1323         err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1324         if (err)
1325                 RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1326
1327         rte_free(rx_adapter->epoll_events);
1328         rte_ring_free(rx_adapter->intr_ring);
1329         rx_adapter->intr_ring = NULL;
1330         rx_adapter->epoll_events = NULL;
1331         return 0;
1332 }
1333
1334 static int
1335 rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
1336 {
1337         int ret;
1338
1339         if (rx_adapter->num_rx_intr == 0)
1340                 return 0;
1341
1342         ret = rxa_destroy_intr_thread(rx_adapter);
1343         if (ret)
1344                 return ret;
1345
1346         close(rx_adapter->epd);
1347         rx_adapter->epd = INIT_FD;
1348
1349         return ret;
1350 }
1351
1352 static int
1353 rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1354         struct eth_device_info *dev_info,
1355         uint16_t rx_queue_id)
1356 {
1357         int err;
1358         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1359         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1360
1361         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1362         if (err) {
1363                 RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1364                         rx_queue_id);
1365                 return err;
1366         }
1367
1368         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1369                                         rx_adapter->epd,
1370                                         RTE_INTR_EVENT_DEL,
1371                                         0);
1372         if (err)
1373                 RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1374
1375         if (sintr)
1376                 dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1377         else
1378                 dev_info->shared_intr_enabled = 0;
1379         return err;
1380 }
1381
1382 static int
1383 rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1384                 struct eth_device_info *dev_info,
1385                 int rx_queue_id)
1386 {
1387         int err;
1388         int i;
1389         int s;
1390
1391         if (dev_info->nb_rx_intr == 0)
1392                 return 0;
1393
1394         err = 0;
1395         if (rx_queue_id == -1) {
1396                 s = dev_info->nb_shared_intr;
1397                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1398                         int sintr;
1399                         uint16_t q;
1400
1401                         q = dev_info->intr_queue[i];
1402                         sintr = rxa_shared_intr(dev_info, q);
1403                         s -= sintr;
1404
1405                         if (!sintr || s == 0) {
1406
1407                                 err = rxa_disable_intr(rx_adapter, dev_info,
1408                                                 q);
1409                                 if (err)
1410                                         return err;
1411                                 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1412                                                         q);
1413                         }
1414                 }
1415         } else {
1416                 if (!rxa_intr_queue(dev_info, rx_queue_id))
1417                         return 0;
1418                 if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1419                                 dev_info->nb_shared_intr == 1) {
1420                         err = rxa_disable_intr(rx_adapter, dev_info,
1421                                         rx_queue_id);
1422                         if (err)
1423                                 return err;
1424                         rxa_intr_ring_del_entries(rx_adapter, dev_info,
1425                                                 rx_queue_id);
1426                 }
1427
1428                 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1429                         if (dev_info->intr_queue[i] == rx_queue_id) {
1430                                 for (; i < dev_info->nb_rx_intr - 1; i++)
1431                                         dev_info->intr_queue[i] =
1432                                                 dev_info->intr_queue[i + 1];
1433                                 break;
1434                         }
1435                 }
1436         }
1437
1438         return err;
1439 }
1440
1441 static int
1442 rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1443         struct eth_device_info *dev_info,
1444         uint16_t rx_queue_id)
1445 {
1446         int err, err1;
1447         uint16_t eth_dev_id = dev_info->dev->data->port_id;
1448         union queue_data qd;
1449         int init_fd;
1450         uint16_t *intr_queue;
1451         int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1452
1453         if (rxa_intr_queue(dev_info, rx_queue_id))
1454                 return 0;
1455
1456         intr_queue = dev_info->intr_queue;
1457         if (dev_info->intr_queue == NULL) {
1458                 size_t len =
1459                         dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1460                 dev_info->intr_queue =
1461                         rte_zmalloc_socket(
1462                                 rx_adapter->mem_name,
1463                                 len,
1464                                 0,
1465                                 rx_adapter->socket_id);
1466                 if (dev_info->intr_queue == NULL)
1467                         return -ENOMEM;
1468         }
1469
1470         init_fd = rx_adapter->epd;
1471         err = rxa_init_epd(rx_adapter);
1472         if (err)
1473                 goto err_free_queue;
1474
1475         qd.port = eth_dev_id;
1476         qd.queue = rx_queue_id;
1477
1478         err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1479                                         rx_adapter->epd,
1480                                         RTE_INTR_EVENT_ADD,
1481                                         qd.ptr);
1482         if (err) {
1483                 RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1484                         " Rx Queue %u err %d", rx_queue_id, err);
1485                 goto err_del_fd;
1486         }
1487
1488         err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1489         if (err) {
1490                 RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1491                                 " Rx Queue %u err %d", rx_queue_id, err);
1492
1493                 goto err_del_event;
1494         }
1495
1496         err = rxa_create_intr_thread(rx_adapter);
1497         if (!err)  {
1498                 if (sintr)
1499                         dev_info->shared_intr_enabled = 1;
1500                 else
1501                         dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1502                 return 0;
1503         }
1504
1505
1506         err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1507         if (err)
1508                 RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1509                                 " Rx Queue %u err %d", rx_queue_id, err);
1510 err_del_event:
1511         err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1512                                         rx_adapter->epd,
1513                                         RTE_INTR_EVENT_DEL,
1514                                         0);
1515         if (err1) {
1516                 RTE_EDEV_LOG_ERR("Could not delete event for"
1517                                 " Rx Queue %u err %d", rx_queue_id, err1);
1518         }
1519 err_del_fd:
1520         if (init_fd == INIT_FD) {
1521                 close(rx_adapter->epd);
1522                 rx_adapter->epd = -1;
1523         }
1524 err_free_queue:
1525         if (intr_queue == NULL)
1526                 rte_free(dev_info->intr_queue);
1527
1528         return err;
1529 }
1530
1531 static int
1532 rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1533         struct eth_device_info *dev_info,
1534         int rx_queue_id)
1535
1536 {
1537         int i, j, err;
1538         int si = -1;
1539         int shared_done = (dev_info->nb_shared_intr > 0);
1540
1541         if (rx_queue_id != -1) {
1542                 if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1543                         return 0;
1544                 return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1545         }
1546
1547         err = 0;
1548         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1549
1550                 if (rxa_shared_intr(dev_info, i) && shared_done)
1551                         continue;
1552
1553                 err = rxa_config_intr(rx_adapter, dev_info, i);
1554
1555                 shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1556                 if (shared_done) {
1557                         si = i;
1558                         dev_info->shared_intr_enabled = 1;
1559                 }
1560                 if (err)
1561                         break;
1562         }
1563
1564         if (err == 0)
1565                 return 0;
1566
1567         shared_done = (dev_info->nb_shared_intr > 0);
1568         for (j = 0; j < i; j++) {
1569                 if (rxa_intr_queue(dev_info, j))
1570                         continue;
1571                 if (rxa_shared_intr(dev_info, j) && si != j)
1572                         continue;
1573                 err = rxa_disable_intr(rx_adapter, dev_info, j);
1574                 if (err)
1575                         break;
1576
1577         }
1578
1579         return err;
1580 }
1581
1582
1583 static int
1584 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
1585 {
1586         int ret;
1587         struct rte_service_spec service;
1588         struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1589
1590         if (rx_adapter->service_inited)
1591                 return 0;
1592
1593         memset(&service, 0, sizeof(service));
1594         snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1595                 "rte_event_eth_rx_adapter_%d", id);
1596         service.socket_id = rx_adapter->socket_id;
1597         service.callback = rxa_service_func;
1598         service.callback_userdata = rx_adapter;
1599         /* Service function handles locking for queue add/del updates */
1600         service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1601         ret = rte_service_component_register(&service, &rx_adapter->service_id);
1602         if (ret) {
1603                 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1604                         service.name, ret);
1605                 return ret;
1606         }
1607
1608         ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1609                 &rx_adapter_conf, rx_adapter->conf_arg);
1610         if (ret) {
1611                 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1612                         ret);
1613                 goto err_done;
1614         }
1615         rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1616         rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1617         rx_adapter->service_inited = 1;
1618         rx_adapter->epd = INIT_FD;
1619         return 0;
1620
1621 err_done:
1622         rte_service_component_unregister(rx_adapter->service_id);
1623         return ret;
1624 }
1625
1626 static void
1627 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1628                 struct eth_device_info *dev_info,
1629                 int32_t rx_queue_id,
1630                 uint8_t add)
1631 {
1632         struct eth_rx_queue_info *queue_info;
1633         int enabled;
1634         uint16_t i;
1635
1636         if (dev_info->rx_queue == NULL)
1637                 return;
1638
1639         if (rx_queue_id == -1) {
1640                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1641                         rxa_update_queue(rx_adapter, dev_info, i, add);
1642         } else {
1643                 queue_info = &dev_info->rx_queue[rx_queue_id];
1644                 enabled = queue_info->queue_enabled;
1645                 if (add) {
1646                         rx_adapter->nb_queues += !enabled;
1647                         dev_info->nb_dev_queues += !enabled;
1648                 } else {
1649                         rx_adapter->nb_queues -= enabled;
1650                         dev_info->nb_dev_queues -= enabled;
1651                 }
1652                 queue_info->queue_enabled = !!add;
1653         }
1654 }
1655
1656 static void
1657 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
1658         struct eth_device_info *dev_info,
1659         int32_t rx_queue_id)
1660 {
1661         int pollq;
1662         int intrq;
1663         int sintrq;
1664
1665
1666         if (rx_adapter->nb_queues == 0)
1667                 return;
1668
1669         if (rx_queue_id == -1) {
1670                 uint16_t nb_rx_queues;
1671                 uint16_t i;
1672
1673                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1674                 for (i = 0; i < nb_rx_queues; i++)
1675                         rxa_sw_del(rx_adapter, dev_info, i);
1676                 return;
1677         }
1678
1679         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1680         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1681         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1682         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1683         rx_adapter->num_rx_polled -= pollq;
1684         dev_info->nb_rx_poll -= pollq;
1685         rx_adapter->num_rx_intr -= intrq;
1686         dev_info->nb_rx_intr -= intrq;
1687         dev_info->nb_shared_intr -= intrq && sintrq;
1688 }
1689
1690 static void
1691 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1692         struct eth_device_info *dev_info,
1693         int32_t rx_queue_id,
1694         const struct rte_event_eth_rx_adapter_queue_conf *conf)
1695 {
1696         struct eth_rx_queue_info *queue_info;
1697         const struct rte_event *ev = &conf->ev;
1698         int pollq;
1699         int intrq;
1700         int sintrq;
1701
1702         if (rx_queue_id == -1) {
1703                 uint16_t nb_rx_queues;
1704                 uint16_t i;
1705
1706                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1707                 for (i = 0; i < nb_rx_queues; i++)
1708                         rxa_add_queue(rx_adapter, dev_info, i, conf);
1709                 return;
1710         }
1711
1712         pollq = rxa_polled_queue(dev_info, rx_queue_id);
1713         intrq = rxa_intr_queue(dev_info, rx_queue_id);
1714         sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1715
1716         queue_info = &dev_info->rx_queue[rx_queue_id];
1717         queue_info->event_queue_id = ev->queue_id;
1718         queue_info->sched_type = ev->sched_type;
1719         queue_info->priority = ev->priority;
1720         queue_info->wt = conf->servicing_weight;
1721
1722         if (conf->rx_queue_flags &
1723                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
1724                 queue_info->flow_id = ev->flow_id;
1725                 queue_info->flow_id_mask = ~0;
1726         }
1727
1728         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
1729         if (rxa_polled_queue(dev_info, rx_queue_id)) {
1730                 rx_adapter->num_rx_polled += !pollq;
1731                 dev_info->nb_rx_poll += !pollq;
1732                 rx_adapter->num_rx_intr -= intrq;
1733                 dev_info->nb_rx_intr -= intrq;
1734                 dev_info->nb_shared_intr -= intrq && sintrq;
1735         }
1736
1737         if (rxa_intr_queue(dev_info, rx_queue_id)) {
1738                 rx_adapter->num_rx_polled -= pollq;
1739                 dev_info->nb_rx_poll -= pollq;
1740                 rx_adapter->num_rx_intr += !intrq;
1741                 dev_info->nb_rx_intr += !intrq;
1742                 dev_info->nb_shared_intr += !intrq && sintrq;
1743                 if (dev_info->nb_shared_intr == 1) {
1744                         if (dev_info->multi_intr_cap)
1745                                 dev_info->next_q_idx =
1746                                         RTE_MAX_RXTX_INTR_VEC_ID - 1;
1747                         else
1748                                 dev_info->next_q_idx = 0;
1749                 }
1750         }
1751 }
1752
1753 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
1754                 uint16_t eth_dev_id,
1755                 int rx_queue_id,
1756                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
1757 {
1758         struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
1759         struct rte_event_eth_rx_adapter_queue_conf temp_conf;
1760         int ret;
1761         struct eth_rx_poll_entry *rx_poll;
1762         struct eth_rx_queue_info *rx_queue;
1763         uint32_t *rx_wrr;
1764         uint16_t nb_rx_queues;
1765         uint32_t nb_rx_poll, nb_wrr;
1766         uint32_t nb_rx_intr;
1767         int num_intr_vec;
1768         uint16_t wt;
1769
1770         if (queue_conf->servicing_weight == 0) {
1771                 struct rte_eth_dev_data *data = dev_info->dev->data;
1772
1773                 temp_conf = *queue_conf;
1774                 if (!data->dev_conf.intr_conf.rxq) {
1775                         /* If Rx interrupts are disabled set wt = 1 */
1776                         temp_conf.servicing_weight = 1;
1777                 }
1778                 queue_conf = &temp_conf;
1779         }
1780
1781         nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1782         rx_queue = dev_info->rx_queue;
1783         wt = queue_conf->servicing_weight;
1784
1785         if (dev_info->rx_queue == NULL) {
1786                 dev_info->rx_queue =
1787                     rte_zmalloc_socket(rx_adapter->mem_name,
1788                                        nb_rx_queues *
1789                                        sizeof(struct eth_rx_queue_info), 0,
1790                                        rx_adapter->socket_id);
1791                 if (dev_info->rx_queue == NULL)
1792                         return -ENOMEM;
1793         }
1794         rx_wrr = NULL;
1795         rx_poll = NULL;
1796
1797         rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
1798                         queue_conf->servicing_weight,
1799                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
1800
1801         if (dev_info->dev->intr_handle)
1802                 dev_info->multi_intr_cap =
1803                         rte_intr_cap_multiple(dev_info->dev->intr_handle);
1804
1805         ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
1806                                 &rx_poll, &rx_wrr);
1807         if (ret)
1808                 goto err_free_rxqueue;
1809
1810         if (wt == 0) {
1811                 num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
1812
1813                 ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
1814                 if (ret)
1815                         goto err_free_rxqueue;
1816
1817                 ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
1818                 if (ret)
1819                         goto err_free_rxqueue;
1820         } else {
1821
1822                 num_intr_vec = 0;
1823                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
1824                         num_intr_vec = rxa_nb_intr_vect(dev_info,
1825                                                 rx_queue_id, 0);
1826                         /* interrupt based queues are being converted to
1827                          * poll mode queues, delete the interrupt configuration
1828                          * for those.
1829                          */
1830                         ret = rxa_del_intr_queue(rx_adapter,
1831                                                 dev_info, rx_queue_id);
1832                         if (ret)
1833                                 goto err_free_rxqueue;
1834                 }
1835         }
1836
1837         if (nb_rx_intr == 0) {
1838                 ret = rxa_free_intr_resources(rx_adapter);
1839                 if (ret)
1840                         goto err_free_rxqueue;
1841         }
1842
1843         if (wt == 0) {
1844                 uint16_t i;
1845
1846                 if (rx_queue_id  == -1) {
1847                         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1848                                 dev_info->intr_queue[i] = i;
1849                 } else {
1850                         if (!rxa_intr_queue(dev_info, rx_queue_id))
1851                                 dev_info->intr_queue[nb_rx_intr - 1] =
1852                                         rx_queue_id;
1853                 }
1854         }
1855
1856
1857
1858         rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
1859         rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
1860
1861         rte_free(rx_adapter->eth_rx_poll);
1862         rte_free(rx_adapter->wrr_sched);
1863
1864         rx_adapter->eth_rx_poll = rx_poll;
1865         rx_adapter->wrr_sched = rx_wrr;
1866         rx_adapter->wrr_len = nb_wrr;
1867         rx_adapter->num_intr_vec += num_intr_vec;
1868         return 0;
1869
1870 err_free_rxqueue:
1871         if (rx_queue == NULL) {
1872                 rte_free(dev_info->rx_queue);
1873                 dev_info->rx_queue = NULL;
1874         }
1875
1876         rte_free(rx_poll);
1877         rte_free(rx_wrr);
1878
1879         return 0;
1880 }
1881
1882 static int
1883 rxa_ctrl(uint8_t id, int start)
1884 {
1885         struct rte_event_eth_rx_adapter *rx_adapter;
1886         struct rte_eventdev *dev;
1887         struct eth_device_info *dev_info;
1888         uint32_t i;
1889         int use_service = 0;
1890         int stop = !start;
1891
1892         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1893         rx_adapter = rxa_id_to_adapter(id);
1894         if (rx_adapter == NULL)
1895                 return -EINVAL;
1896
1897         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1898
1899         RTE_ETH_FOREACH_DEV(i) {
1900                 dev_info = &rx_adapter->eth_devices[i];
1901                 /* if start  check for num dev queues */
1902                 if (start && !dev_info->nb_dev_queues)
1903                         continue;
1904                 /* if stop check if dev has been started */
1905                 if (stop && !dev_info->dev_rx_started)
1906                         continue;
1907                 use_service |= !dev_info->internal_event_port;
1908                 dev_info->dev_rx_started = start;
1909                 if (dev_info->internal_event_port == 0)
1910                         continue;
1911                 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
1912                                                 &rte_eth_devices[i]) :
1913                         (*dev->dev_ops->eth_rx_adapter_stop)(dev,
1914                                                 &rte_eth_devices[i]);
1915         }
1916
1917         if (use_service) {
1918                 rte_spinlock_lock(&rx_adapter->rx_lock);
1919                 rx_adapter->rxa_started = start;
1920                 rte_service_runstate_set(rx_adapter->service_id, start);
1921                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1922         }
1923
1924         return 0;
1925 }
1926
1927 int
1928 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
1929                                 rte_event_eth_rx_adapter_conf_cb conf_cb,
1930                                 void *conf_arg)
1931 {
1932         struct rte_event_eth_rx_adapter *rx_adapter;
1933         int ret;
1934         int socket_id;
1935         uint16_t i;
1936         char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
1937         const uint8_t default_rss_key[] = {
1938                 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
1939                 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
1940                 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
1941                 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
1942                 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
1943         };
1944
1945         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1946         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1947         if (conf_cb == NULL)
1948                 return -EINVAL;
1949
1950         if (event_eth_rx_adapter == NULL) {
1951                 ret = rte_event_eth_rx_adapter_init();
1952                 if (ret)
1953                         return ret;
1954         }
1955
1956         rx_adapter = rxa_id_to_adapter(id);
1957         if (rx_adapter != NULL) {
1958                 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
1959                 return -EEXIST;
1960         }
1961
1962         socket_id = rte_event_dev_socket_id(dev_id);
1963         snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
1964                 "rte_event_eth_rx_adapter_%d",
1965                 id);
1966
1967         rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
1968                         RTE_CACHE_LINE_SIZE, socket_id);
1969         if (rx_adapter == NULL) {
1970                 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
1971                 return -ENOMEM;
1972         }
1973
1974         rx_adapter->eventdev_id = dev_id;
1975         rx_adapter->socket_id = socket_id;
1976         rx_adapter->conf_cb = conf_cb;
1977         rx_adapter->conf_arg = conf_arg;
1978         rx_adapter->id = id;
1979         strcpy(rx_adapter->mem_name, mem_name);
1980         rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
1981                                         /* FIXME: incompatible with hotplug */
1982                                         rte_eth_dev_count_total() *
1983                                         sizeof(struct eth_device_info), 0,
1984                                         socket_id);
1985         rte_convert_rss_key((const uint32_t *)default_rss_key,
1986                         (uint32_t *)rx_adapter->rss_key_be,
1987                             RTE_DIM(default_rss_key));
1988
1989         if (rx_adapter->eth_devices == NULL) {
1990                 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
1991                 rte_free(rx_adapter);
1992                 return -ENOMEM;
1993         }
1994         rte_spinlock_init(&rx_adapter->rx_lock);
1995         RTE_ETH_FOREACH_DEV(i)
1996                 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
1997
1998         event_eth_rx_adapter[id] = rx_adapter;
1999         if (conf_cb == rxa_default_conf_cb)
2000                 rx_adapter->default_cb_arg = 1;
2001         return 0;
2002 }
2003
2004 int
2005 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2006                 struct rte_event_port_conf *port_config)
2007 {
2008         struct rte_event_port_conf *pc;
2009         int ret;
2010
2011         if (port_config == NULL)
2012                 return -EINVAL;
2013         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2014
2015         pc = rte_malloc(NULL, sizeof(*pc), 0);
2016         if (pc == NULL)
2017                 return -ENOMEM;
2018         *pc = *port_config;
2019         ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2020                                         rxa_default_conf_cb,
2021                                         pc);
2022         if (ret)
2023                 rte_free(pc);
2024         return ret;
2025 }
2026
2027 int
2028 rte_event_eth_rx_adapter_free(uint8_t id)
2029 {
2030         struct rte_event_eth_rx_adapter *rx_adapter;
2031
2032         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2033
2034         rx_adapter = rxa_id_to_adapter(id);
2035         if (rx_adapter == NULL)
2036                 return -EINVAL;
2037
2038         if (rx_adapter->nb_queues) {
2039                 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2040                                 rx_adapter->nb_queues);
2041                 return -EBUSY;
2042         }
2043
2044         if (rx_adapter->default_cb_arg)
2045                 rte_free(rx_adapter->conf_arg);
2046         rte_free(rx_adapter->eth_devices);
2047         rte_free(rx_adapter);
2048         event_eth_rx_adapter[id] = NULL;
2049
2050         return 0;
2051 }
2052
2053 int
2054 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2055                 uint16_t eth_dev_id,
2056                 int32_t rx_queue_id,
2057                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2058 {
2059         int ret;
2060         uint32_t cap;
2061         struct rte_event_eth_rx_adapter *rx_adapter;
2062         struct rte_eventdev *dev;
2063         struct eth_device_info *dev_info;
2064
2065         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2066         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2067
2068         rx_adapter = rxa_id_to_adapter(id);
2069         if ((rx_adapter == NULL) || (queue_conf == NULL))
2070                 return -EINVAL;
2071
2072         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2073         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2074                                                 eth_dev_id,
2075                                                 &cap);
2076         if (ret) {
2077                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2078                         "eth port %" PRIu16, id, eth_dev_id);
2079                 return ret;
2080         }
2081
2082         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2083                 && (queue_conf->rx_queue_flags &
2084                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2085                 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2086                                 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2087                                 eth_dev_id, id);
2088                 return -EINVAL;
2089         }
2090
2091         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2092                 (rx_queue_id != -1)) {
2093                 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2094                         "event queue, eth port: %" PRIu16 " adapter id: %"
2095                         PRIu8, eth_dev_id, id);
2096                 return -EINVAL;
2097         }
2098
2099         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2100                         rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2101                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2102                          (uint16_t)rx_queue_id);
2103                 return -EINVAL;
2104         }
2105
2106         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2107
2108         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2109                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2110                                         -ENOTSUP);
2111                 if (dev_info->rx_queue == NULL) {
2112                         dev_info->rx_queue =
2113                             rte_zmalloc_socket(rx_adapter->mem_name,
2114                                         dev_info->dev->data->nb_rx_queues *
2115                                         sizeof(struct eth_rx_queue_info), 0,
2116                                         rx_adapter->socket_id);
2117                         if (dev_info->rx_queue == NULL)
2118                                 return -ENOMEM;
2119                 }
2120
2121                 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2122                                 &rte_eth_devices[eth_dev_id],
2123                                 rx_queue_id, queue_conf);
2124                 if (ret == 0) {
2125                         dev_info->internal_event_port = 1;
2126                         rxa_update_queue(rx_adapter,
2127                                         &rx_adapter->eth_devices[eth_dev_id],
2128                                         rx_queue_id,
2129                                         1);
2130                 }
2131         } else {
2132                 rte_spinlock_lock(&rx_adapter->rx_lock);
2133                 dev_info->internal_event_port = 0;
2134                 ret = rxa_init_service(rx_adapter, id);
2135                 if (ret == 0) {
2136                         uint32_t service_id = rx_adapter->service_id;
2137                         ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2138                                         queue_conf);
2139                         rte_service_component_runstate_set(service_id,
2140                                 rxa_sw_adapter_queue_count(rx_adapter));
2141                 }
2142                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2143         }
2144
2145         if (ret)
2146                 return ret;
2147
2148         return 0;
2149 }
2150
2151 int
2152 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2153                                 int32_t rx_queue_id)
2154 {
2155         int ret = 0;
2156         struct rte_eventdev *dev;
2157         struct rte_event_eth_rx_adapter *rx_adapter;
2158         struct eth_device_info *dev_info;
2159         uint32_t cap;
2160         uint32_t nb_rx_poll = 0;
2161         uint32_t nb_wrr = 0;
2162         uint32_t nb_rx_intr;
2163         struct eth_rx_poll_entry *rx_poll = NULL;
2164         uint32_t *rx_wrr = NULL;
2165         int num_intr_vec;
2166
2167         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2168         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2169
2170         rx_adapter = rxa_id_to_adapter(id);
2171         if (rx_adapter == NULL)
2172                 return -EINVAL;
2173
2174         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2175         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2176                                                 eth_dev_id,
2177                                                 &cap);
2178         if (ret)
2179                 return ret;
2180
2181         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2182                 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2183                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2184                          (uint16_t)rx_queue_id);
2185                 return -EINVAL;
2186         }
2187
2188         dev_info = &rx_adapter->eth_devices[eth_dev_id];
2189
2190         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2191                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2192                                  -ENOTSUP);
2193                 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2194                                                 &rte_eth_devices[eth_dev_id],
2195                                                 rx_queue_id);
2196                 if (ret == 0) {
2197                         rxa_update_queue(rx_adapter,
2198                                         &rx_adapter->eth_devices[eth_dev_id],
2199                                         rx_queue_id,
2200                                         0);
2201                         if (dev_info->nb_dev_queues == 0) {
2202                                 rte_free(dev_info->rx_queue);
2203                                 dev_info->rx_queue = NULL;
2204                         }
2205                 }
2206         } else {
2207                 rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2208                         &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2209
2210                 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2211                         &rx_poll, &rx_wrr);
2212                 if (ret)
2213                         return ret;
2214
2215                 rte_spinlock_lock(&rx_adapter->rx_lock);
2216
2217                 num_intr_vec = 0;
2218                 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2219
2220                         num_intr_vec = rxa_nb_intr_vect(dev_info,
2221                                                 rx_queue_id, 0);
2222                         ret = rxa_del_intr_queue(rx_adapter, dev_info,
2223                                         rx_queue_id);
2224                         if (ret)
2225                                 goto unlock_ret;
2226                 }
2227
2228                 if (nb_rx_intr == 0) {
2229                         ret = rxa_free_intr_resources(rx_adapter);
2230                         if (ret)
2231                                 goto unlock_ret;
2232                 }
2233
2234                 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2235                 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2236
2237                 rte_free(rx_adapter->eth_rx_poll);
2238                 rte_free(rx_adapter->wrr_sched);
2239
2240                 if (nb_rx_intr == 0) {
2241                         rte_free(dev_info->intr_queue);
2242                         dev_info->intr_queue = NULL;
2243                 }
2244
2245                 rx_adapter->eth_rx_poll = rx_poll;
2246                 rx_adapter->wrr_sched = rx_wrr;
2247                 rx_adapter->wrr_len = nb_wrr;
2248                 rx_adapter->num_intr_vec += num_intr_vec;
2249
2250                 if (dev_info->nb_dev_queues == 0) {
2251                         rte_free(dev_info->rx_queue);
2252                         dev_info->rx_queue = NULL;
2253                 }
2254 unlock_ret:
2255                 rte_spinlock_unlock(&rx_adapter->rx_lock);
2256                 if (ret) {
2257                         rte_free(rx_poll);
2258                         rte_free(rx_wrr);
2259                         return ret;
2260                 }
2261
2262                 rte_service_component_runstate_set(rx_adapter->service_id,
2263                                 rxa_sw_adapter_queue_count(rx_adapter));
2264         }
2265
2266         return ret;
2267 }
2268
2269 int
2270 rte_event_eth_rx_adapter_start(uint8_t id)
2271 {
2272         return rxa_ctrl(id, 1);
2273 }
2274
2275 int
2276 rte_event_eth_rx_adapter_stop(uint8_t id)
2277 {
2278         return rxa_ctrl(id, 0);
2279 }
2280
2281 int
2282 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2283                                struct rte_event_eth_rx_adapter_stats *stats)
2284 {
2285         struct rte_event_eth_rx_adapter *rx_adapter;
2286         struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2287         struct rte_event_eth_rx_adapter_stats dev_stats;
2288         struct rte_eventdev *dev;
2289         struct eth_device_info *dev_info;
2290         uint32_t i;
2291         int ret;
2292
2293         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2294
2295         rx_adapter = rxa_id_to_adapter(id);
2296         if (rx_adapter  == NULL || stats == NULL)
2297                 return -EINVAL;
2298
2299         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2300         memset(stats, 0, sizeof(*stats));
2301         RTE_ETH_FOREACH_DEV(i) {
2302                 dev_info = &rx_adapter->eth_devices[i];
2303                 if (dev_info->internal_event_port == 0 ||
2304                         dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2305                         continue;
2306                 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2307                                                 &rte_eth_devices[i],
2308                                                 &dev_stats);
2309                 if (ret)
2310                         continue;
2311                 dev_stats_sum.rx_packets += dev_stats.rx_packets;
2312                 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2313         }
2314
2315         if (rx_adapter->service_inited)
2316                 *stats = rx_adapter->stats;
2317
2318         stats->rx_packets += dev_stats_sum.rx_packets;
2319         stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2320         return 0;
2321 }
2322
2323 int
2324 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
2325 {
2326         struct rte_event_eth_rx_adapter *rx_adapter;
2327         struct rte_eventdev *dev;
2328         struct eth_device_info *dev_info;
2329         uint32_t i;
2330
2331         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2332
2333         rx_adapter = rxa_id_to_adapter(id);
2334         if (rx_adapter == NULL)
2335                 return -EINVAL;
2336
2337         dev = &rte_eventdevs[rx_adapter->eventdev_id];
2338         RTE_ETH_FOREACH_DEV(i) {
2339                 dev_info = &rx_adapter->eth_devices[i];
2340                 if (dev_info->internal_event_port == 0 ||
2341                         dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
2342                         continue;
2343                 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
2344                                                         &rte_eth_devices[i]);
2345         }
2346
2347         memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
2348         return 0;
2349 }
2350
2351 int
2352 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
2353 {
2354         struct rte_event_eth_rx_adapter *rx_adapter;
2355
2356         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2357
2358         rx_adapter = rxa_id_to_adapter(id);
2359         if (rx_adapter == NULL || service_id == NULL)
2360                 return -EINVAL;
2361
2362         if (rx_adapter->service_inited)
2363                 *service_id = rx_adapter->service_id;
2364
2365         return rx_adapter->service_inited ? 0 : -ESRCH;
2366 }