eventdev: move Rx adapter to separate function
[dpdk.git] / lib / librte_eventdev / rte_event_eth_rx_adapter.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #include <rte_cycles.h>
6 #include <rte_common.h>
7 #include <rte_dev.h>
8 #include <rte_errno.h>
9 #include <rte_ethdev.h>
10 #include <rte_log.h>
11 #include <rte_malloc.h>
12 #include <rte_service_component.h>
13 #include <rte_thash.h>
14
15 #include "rte_eventdev.h"
16 #include "rte_eventdev_pmd.h"
17 #include "rte_event_eth_rx_adapter.h"
18
19 #define BATCH_SIZE              32
20 #define BLOCK_CNT_THRESHOLD     10
21 #define ETH_EVENT_BUFFER_SIZE   (4*BATCH_SIZE)
22
23 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
24 #define ETH_RX_ADAPTER_MEM_NAME_LEN     32
25
26 #define RSS_KEY_SIZE    40
27
28 /*
29  * There is an instance of this struct per polled Rx queue added to the
30  * adapter
31  */
32 struct eth_rx_poll_entry {
33         /* Eth port to poll */
34         uint16_t eth_dev_id;
35         /* Eth rx queue to poll */
36         uint16_t eth_rx_qid;
37 };
38
39 /* Instance per adapter */
40 struct rte_eth_event_enqueue_buffer {
41         /* Count of events in this buffer */
42         uint16_t count;
43         /* Array of events in this buffer */
44         struct rte_event events[ETH_EVENT_BUFFER_SIZE];
45 };
46
47 struct rte_event_eth_rx_adapter {
48         /* RSS key */
49         uint8_t rss_key_be[RSS_KEY_SIZE];
50         /* Event device identifier */
51         uint8_t eventdev_id;
52         /* Per ethernet device structure */
53         struct eth_device_info *eth_devices;
54         /* Event port identifier */
55         uint8_t event_port_id;
56         /* Lock to serialize config updates with service function */
57         rte_spinlock_t rx_lock;
58         /* Max mbufs processed in any service function invocation */
59         uint32_t max_nb_rx;
60         /* Receive queues that need to be polled */
61         struct eth_rx_poll_entry *eth_rx_poll;
62         /* Size of the eth_rx_poll array */
63         uint16_t num_rx_polled;
64         /* Weighted round robin schedule */
65         uint32_t *wrr_sched;
66         /* wrr_sched[] size */
67         uint32_t wrr_len;
68         /* Next entry in wrr[] to begin polling */
69         uint32_t wrr_pos;
70         /* Event burst buffer */
71         struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
72         /* Per adapter stats */
73         struct rte_event_eth_rx_adapter_stats stats;
74         /* Block count, counts up to BLOCK_CNT_THRESHOLD */
75         uint16_t enq_block_count;
76         /* Block start ts */
77         uint64_t rx_enq_block_start_ts;
78         /* Configuration callback for rte_service configuration */
79         rte_event_eth_rx_adapter_conf_cb conf_cb;
80         /* Configuration callback argument */
81         void *conf_arg;
82         /* Set if  default_cb is being used */
83         int default_cb_arg;
84         /* Service initialization state */
85         uint8_t service_inited;
86         /* Total count of Rx queues in adapter */
87         uint32_t nb_queues;
88         /* Memory allocation name */
89         char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
90         /* Socket identifier cached from eventdev */
91         int socket_id;
92         /* Per adapter EAL service */
93         uint32_t service_id;
94         /* Adapter started flag */
95         uint8_t rxa_started;
96 } __rte_cache_aligned;
97
98 /* Per eth device */
99 struct eth_device_info {
100         struct rte_eth_dev *dev;
101         struct eth_rx_queue_info *rx_queue;
102         /* Set if ethdev->eventdev packet transfer uses a
103          * hardware mechanism
104          */
105         uint8_t internal_event_port;
106         /* Set if the adapter is processing rx queues for
107          * this eth device and packet processing has been
108          * started, allows for the code to know if the PMD
109          * rx_adapter_stop callback needs to be invoked
110          */
111         uint8_t dev_rx_started;
112         /* Number of queues added for this device */
113         uint16_t nb_dev_queues;
114         /* If nb_rx_poll > 0, the start callback will
115          * be invoked if not already invoked
116          */
117         uint16_t nb_rx_poll;
118         /* sum(wrr(q)) for all queues within the device
119          * useful when deleting all device queues
120          */
121         uint32_t wrr_len;
122 };
123
124 /* Per Rx queue */
125 struct eth_rx_queue_info {
126         int queue_enabled;      /* True if added */
127         uint16_t wt;            /* Polling weight */
128         uint8_t event_queue_id; /* Event queue to enqueue packets to */
129         uint8_t sched_type;     /* Sched type for events */
130         uint8_t priority;       /* Event priority */
131         uint32_t flow_id;       /* App provided flow identifier */
132         uint32_t flow_id_mask;  /* Set to ~0 if app provides flow id else 0 */
133 };
134
135 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
136
137 static inline int
138 rxa_validate_id(uint8_t id)
139 {
140         return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
141 }
142
143 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
144         if (!rxa_validate_id(id)) { \
145                 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
146                 return retval; \
147         } \
148 } while (0)
149
150 static inline int
151 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
152 {
153         return rx_adapter->num_rx_polled;
154 }
155
156 /* Greatest common divisor */
157 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
158 {
159         uint16_t r = a % b;
160
161         return r ? rxa_gcd_u16(b, r) : b;
162 }
163
164 /* Returns the next queue in the polling sequence
165  *
166  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
167  */
168 static int
169 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
170          unsigned int n, int *cw,
171          struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
172          uint16_t gcd, int prev)
173 {
174         int i = prev;
175         uint16_t w;
176
177         while (1) {
178                 uint16_t q;
179                 uint16_t d;
180
181                 i = (i + 1) % n;
182                 if (i == 0) {
183                         *cw = *cw - gcd;
184                         if (*cw <= 0)
185                                 *cw = max_wt;
186                 }
187
188                 q = eth_rx_poll[i].eth_rx_qid;
189                 d = eth_rx_poll[i].eth_dev_id;
190                 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
191
192                 if ((int)w >= *cw)
193                         return i;
194         }
195 }
196
197 static inline int
198 rxa_polled_queue(struct eth_device_info *dev_info,
199         int rx_queue_id)
200 {
201         struct eth_rx_queue_info *queue_info;
202
203         queue_info = &dev_info->rx_queue[rx_queue_id];
204         return !dev_info->internal_event_port &&
205                 dev_info->rx_queue &&
206                 queue_info->queue_enabled && queue_info->wt != 0;
207 }
208
209 /* Calculate size of the eth_rx_poll and wrr_sched arrays
210  * after deleting poll mode rx queues
211  */
212 static void
213 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
214                         struct eth_device_info *dev_info,
215                         int rx_queue_id,
216                         uint32_t *nb_rx_poll,
217                         uint32_t *nb_wrr)
218 {
219         uint32_t poll_diff;
220         uint32_t wrr_len_diff;
221
222         if (rx_queue_id == -1) {
223                 poll_diff = dev_info->nb_rx_poll;
224                 wrr_len_diff = dev_info->wrr_len;
225         } else {
226                 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
227                 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
228                                         0;
229         }
230
231         *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
232         *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
233 }
234
235 /* Calculate nb_rx_* after adding poll mode rx queues
236  */
237 static void
238 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
239                         struct eth_device_info *dev_info,
240                         int rx_queue_id,
241                         uint16_t wt,
242                         uint32_t *nb_rx_poll,
243                         uint32_t *nb_wrr)
244 {
245         uint32_t poll_diff;
246         uint32_t wrr_len_diff;
247
248         if (rx_queue_id == -1) {
249                 poll_diff = dev_info->dev->data->nb_rx_queues -
250                                                 dev_info->nb_rx_poll;
251                 wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
252                                 - dev_info->wrr_len;
253         } else {
254                 poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
255                 wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
256                                 wt - dev_info->rx_queue[rx_queue_id].wt :
257                                 wt;
258         }
259
260         *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
261         *nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
262 }
263
264 /* Calculate nb_rx_* after adding rx_queue_id */
265 static void
266 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
267                 struct eth_device_info *dev_info,
268                 int rx_queue_id,
269                 uint16_t wt,
270                 uint32_t *nb_rx_poll,
271                 uint32_t *nb_wrr)
272 {
273         rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
274                                 wt, nb_rx_poll, nb_wrr);
275 }
276
277 /* Calculate nb_rx_* after deleting rx_queue_id */
278 static void
279 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
280                 struct eth_device_info *dev_info,
281                 int rx_queue_id,
282                 uint32_t *nb_rx_poll,
283                 uint32_t *nb_wrr)
284 {
285         rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
286                                 nb_wrr);
287 }
288
289 /*
290  * Allocate the rx_poll array
291  */
292 static struct eth_rx_poll_entry *
293 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
294         uint32_t num_rx_polled)
295 {
296         size_t len;
297
298         len  = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
299                                                         RTE_CACHE_LINE_SIZE);
300         return  rte_zmalloc_socket(rx_adapter->mem_name,
301                                 len,
302                                 RTE_CACHE_LINE_SIZE,
303                                 rx_adapter->socket_id);
304 }
305
306 /*
307  * Allocate the WRR array
308  */
309 static uint32_t *
310 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
311 {
312         size_t len;
313
314         len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
315                         RTE_CACHE_LINE_SIZE);
316         return  rte_zmalloc_socket(rx_adapter->mem_name,
317                                 len,
318                                 RTE_CACHE_LINE_SIZE,
319                                 rx_adapter->socket_id);
320 }
321
322 static int
323 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
324                 uint32_t nb_poll,
325                 uint32_t nb_wrr,
326                 struct eth_rx_poll_entry **rx_poll,
327                 uint32_t **wrr_sched)
328 {
329
330         if (nb_poll == 0) {
331                 *rx_poll = NULL;
332                 *wrr_sched = NULL;
333                 return 0;
334         }
335
336         *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
337         if (*rx_poll == NULL) {
338                 *wrr_sched = NULL;
339                 return -ENOMEM;
340         }
341
342         *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
343         if (*wrr_sched == NULL) {
344                 rte_free(*rx_poll);
345                 return -ENOMEM;
346         }
347         return 0;
348 }
349
350 /* Precalculate WRR polling sequence for all queues in rx_adapter */
351 static void
352 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
353                 struct eth_rx_poll_entry *rx_poll,
354                 uint32_t *rx_wrr)
355 {
356         uint16_t d;
357         uint16_t q;
358         unsigned int i;
359         int prev = -1;
360         int cw = -1;
361
362         /* Initialize variables for calculation of wrr schedule */
363         uint16_t max_wrr_pos = 0;
364         unsigned int poll_q = 0;
365         uint16_t max_wt = 0;
366         uint16_t gcd = 0;
367
368         if (rx_poll == NULL)
369                 return;
370
371         /* Generate array of all queues to poll, the size of this
372          * array is poll_q
373          */
374         RTE_ETH_FOREACH_DEV(d) {
375                 uint16_t nb_rx_queues;
376                 struct eth_device_info *dev_info =
377                                 &rx_adapter->eth_devices[d];
378                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
379                 if (dev_info->rx_queue == NULL)
380                         continue;
381                 if (dev_info->internal_event_port)
382                         continue;
383                 dev_info->wrr_len = 0;
384                 for (q = 0; q < nb_rx_queues; q++) {
385                         struct eth_rx_queue_info *queue_info =
386                                 &dev_info->rx_queue[q];
387                         uint16_t wt;
388
389                         if (!rxa_polled_queue(dev_info, q))
390                                 continue;
391                         wt = queue_info->wt;
392                         rx_poll[poll_q].eth_dev_id = d;
393                         rx_poll[poll_q].eth_rx_qid = q;
394                         max_wrr_pos += wt;
395                         dev_info->wrr_len += wt;
396                         max_wt = RTE_MAX(max_wt, wt);
397                         gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
398                         poll_q++;
399                 }
400         }
401
402         /* Generate polling sequence based on weights */
403         prev = -1;
404         cw = -1;
405         for (i = 0; i < max_wrr_pos; i++) {
406                 rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
407                                      rx_poll, max_wt, gcd, prev);
408                 prev = rx_wrr[i];
409         }
410 }
411
412 static inline void
413 rxa_mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr,
414         struct ipv6_hdr **ipv6_hdr)
415 {
416         struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(m, struct ether_hdr *);
417         struct vlan_hdr *vlan_hdr;
418
419         *ipv4_hdr = NULL;
420         *ipv6_hdr = NULL;
421
422         switch (eth_hdr->ether_type) {
423         case RTE_BE16(ETHER_TYPE_IPv4):
424                 *ipv4_hdr = (struct ipv4_hdr *)(eth_hdr + 1);
425                 break;
426
427         case RTE_BE16(ETHER_TYPE_IPv6):
428                 *ipv6_hdr = (struct ipv6_hdr *)(eth_hdr + 1);
429                 break;
430
431         case RTE_BE16(ETHER_TYPE_VLAN):
432                 vlan_hdr = (struct vlan_hdr *)(eth_hdr + 1);
433                 switch (vlan_hdr->eth_proto) {
434                 case RTE_BE16(ETHER_TYPE_IPv4):
435                         *ipv4_hdr = (struct ipv4_hdr *)(vlan_hdr + 1);
436                         break;
437                 case RTE_BE16(ETHER_TYPE_IPv6):
438                         *ipv6_hdr = (struct ipv6_hdr *)(vlan_hdr + 1);
439                         break;
440                 default:
441                         break;
442                 }
443                 break;
444
445         default:
446                 break;
447         }
448 }
449
450 /* Calculate RSS hash for IPv4/6 */
451 static inline uint32_t
452 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
453 {
454         uint32_t input_len;
455         void *tuple;
456         struct rte_ipv4_tuple ipv4_tuple;
457         struct rte_ipv6_tuple ipv6_tuple;
458         struct ipv4_hdr *ipv4_hdr;
459         struct ipv6_hdr *ipv6_hdr;
460
461         rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
462
463         if (ipv4_hdr) {
464                 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
465                 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
466                 tuple = &ipv4_tuple;
467                 input_len = RTE_THASH_V4_L3_LEN;
468         } else if (ipv6_hdr) {
469                 rte_thash_load_v6_addrs(ipv6_hdr,
470                                         (union rte_thash_tuple *)&ipv6_tuple);
471                 tuple = &ipv6_tuple;
472                 input_len = RTE_THASH_V6_L3_LEN;
473         } else
474                 return 0;
475
476         return rte_softrss_be(tuple, input_len, rss_key_be);
477 }
478
479 static inline int
480 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
481 {
482         return !!rx_adapter->enq_block_count;
483 }
484
485 static inline void
486 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
487 {
488         if (rx_adapter->rx_enq_block_start_ts)
489                 return;
490
491         rx_adapter->enq_block_count++;
492         if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
493                 return;
494
495         rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
496 }
497
498 static inline void
499 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
500                     struct rte_event_eth_rx_adapter_stats *stats)
501 {
502         if (unlikely(!stats->rx_enq_start_ts))
503                 stats->rx_enq_start_ts = rte_get_tsc_cycles();
504
505         if (likely(!rxa_enq_blocked(rx_adapter)))
506                 return;
507
508         rx_adapter->enq_block_count = 0;
509         if (rx_adapter->rx_enq_block_start_ts) {
510                 stats->rx_enq_end_ts = rte_get_tsc_cycles();
511                 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
512                     rx_adapter->rx_enq_block_start_ts;
513                 rx_adapter->rx_enq_block_start_ts = 0;
514         }
515 }
516
517 /* Add event to buffer, free space check is done prior to calling
518  * this function
519  */
520 static inline void
521 rxa_buffer_event(struct rte_event_eth_rx_adapter *rx_adapter,
522                 struct rte_event *ev)
523 {
524         struct rte_eth_event_enqueue_buffer *buf =
525             &rx_adapter->event_enqueue_buffer;
526         rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event));
527 }
528
529 /* Enqueue buffered events to event device */
530 static inline uint16_t
531 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
532 {
533         struct rte_eth_event_enqueue_buffer *buf =
534             &rx_adapter->event_enqueue_buffer;
535         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
536
537         uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
538                                         rx_adapter->event_port_id,
539                                         buf->events,
540                                         buf->count);
541         if (n != buf->count) {
542                 memmove(buf->events,
543                         &buf->events[n],
544                         (buf->count - n) * sizeof(struct rte_event));
545                 stats->rx_enq_retry++;
546         }
547
548         n ? rxa_enq_block_end_ts(rx_adapter, stats) :
549                 rxa_enq_block_start_ts(rx_adapter);
550
551         buf->count -= n;
552         stats->rx_enq_count += n;
553
554         return n;
555 }
556
557 static inline void
558 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
559                 uint16_t eth_dev_id,
560                 uint16_t rx_queue_id,
561                 struct rte_mbuf **mbufs,
562                 uint16_t num)
563 {
564         uint32_t i;
565         struct eth_device_info *eth_device_info =
566                                         &rx_adapter->eth_devices[eth_dev_id];
567         struct eth_rx_queue_info *eth_rx_queue_info =
568                                         &eth_device_info->rx_queue[rx_queue_id];
569
570         int32_t qid = eth_rx_queue_info->event_queue_id;
571         uint8_t sched_type = eth_rx_queue_info->sched_type;
572         uint8_t priority = eth_rx_queue_info->priority;
573         uint32_t flow_id;
574         struct rte_event events[BATCH_SIZE];
575         struct rte_mbuf *m = mbufs[0];
576         uint32_t rss_mask;
577         uint32_t rss;
578         int do_rss;
579         uint64_t ts;
580
581         /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
582         rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
583         do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
584
585         if ((m->ol_flags & PKT_RX_TIMESTAMP) == 0) {
586                 ts = rte_get_tsc_cycles();
587                 for (i = 0; i < num; i++) {
588                         m = mbufs[i];
589
590                         m->timestamp = ts;
591                         m->ol_flags |= PKT_RX_TIMESTAMP;
592                 }
593         }
594
595         for (i = 0; i < num; i++) {
596                 m = mbufs[i];
597                 struct rte_event *ev = &events[i];
598
599                 rss = do_rss ?
600                         rxa_do_softrss(m, rx_adapter->rss_key_be) :
601                         m->hash.rss;
602                 flow_id =
603                     eth_rx_queue_info->flow_id &
604                                 eth_rx_queue_info->flow_id_mask;
605                 flow_id |= rss & ~eth_rx_queue_info->flow_id_mask;
606                 ev->flow_id = flow_id;
607                 ev->op = RTE_EVENT_OP_NEW;
608                 ev->sched_type = sched_type;
609                 ev->queue_id = qid;
610                 ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
611                 ev->sub_event_type = 0;
612                 ev->priority = priority;
613                 ev->mbuf = m;
614
615                 rxa_buffer_event(rx_adapter, ev);
616         }
617 }
618
619 /* Enqueue packets from  <port, q>  to event buffer */
620 static inline uint32_t
621 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
622         uint16_t port_id,
623         uint16_t queue_id,
624         uint32_t rx_count,
625         uint32_t max_rx)
626 {
627         struct rte_mbuf *mbufs[BATCH_SIZE];
628         struct rte_eth_event_enqueue_buffer *buf =
629                                         &rx_adapter->event_enqueue_buffer;
630         struct rte_event_eth_rx_adapter_stats *stats =
631                                         &rx_adapter->stats;
632         uint16_t n;
633         uint32_t nb_rx = 0;
634
635         /* Don't do a batch dequeue from the rx queue if there isn't
636          * enough space in the enqueue buffer.
637          */
638         while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
639                 if (buf->count >= BATCH_SIZE)
640                         rxa_flush_event_buffer(rx_adapter);
641
642                 stats->rx_poll_count++;
643                 n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
644                 if (unlikely(!n))
645                         break;
646                 rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
647                 nb_rx += n;
648                 if (rx_count + nb_rx > max_rx)
649                         break;
650         }
651
652         if (buf->count >= BATCH_SIZE)
653                 rxa_flush_event_buffer(rx_adapter);
654
655         return nb_rx;
656 }
657
658 /*
659  * Polls receive queues added to the event adapter and enqueues received
660  * packets to the event device.
661  *
662  * The receive code enqueues initially to a temporary buffer, the
663  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
664  *
665  * If there isn't space available in the temporary buffer, packets from the
666  * Rx queue aren't dequeued from the eth device, this back pressures the
667  * eth device, in virtual device environments this back pressure is relayed to
668  * the hypervisor's switching layer where adjustments can be made to deal with
669  * it.
670  */
671 static inline void
672 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
673 {
674         uint32_t num_queue;
675         uint32_t nb_rx = 0;
676         struct rte_eth_event_enqueue_buffer *buf;
677         uint32_t wrr_pos;
678         uint32_t max_nb_rx;
679         struct rte_event_eth_rx_adapter_stats *stats;
680
681         wrr_pos = rx_adapter->wrr_pos;
682         max_nb_rx = rx_adapter->max_nb_rx;
683         buf = &rx_adapter->event_enqueue_buffer;
684         stats = &rx_adapter->stats;
685
686         /* Iterate through a WRR sequence */
687         for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
688                 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
689                 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
690                 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
691
692                 /* Don't do a batch dequeue from the rx queue if there isn't
693                  * enough space in the enqueue buffer.
694                  */
695                 if (buf->count >= BATCH_SIZE)
696                         rxa_flush_event_buffer(rx_adapter);
697                 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
698                         rx_adapter->wrr_pos = wrr_pos;
699                         break;
700                 }
701
702                 nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx);
703                 if (nb_rx > max_nb_rx) {
704                         rx_adapter->wrr_pos =
705                                     (wrr_pos + 1) % rx_adapter->wrr_len;
706                         break;
707                 }
708
709                 if (++wrr_pos == rx_adapter->wrr_len)
710                         wrr_pos = 0;
711         }
712
713         stats->rx_packets += nb_rx;
714 }
715
716 static int
717 rxa_service_func(void *args)
718 {
719         struct rte_event_eth_rx_adapter *rx_adapter = args;
720
721         if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
722                 return 0;
723         if (!rx_adapter->rxa_started) {
724                 return 0;
725                 rte_spinlock_unlock(&rx_adapter->rx_lock);
726         }
727         rxa_poll(rx_adapter);
728         rte_spinlock_unlock(&rx_adapter->rx_lock);
729         return 0;
730 }
731
732 static int
733 rte_event_eth_rx_adapter_init(void)
734 {
735         const char *name = "rte_event_eth_rx_adapter_array";
736         const struct rte_memzone *mz;
737         unsigned int sz;
738
739         sz = sizeof(*event_eth_rx_adapter) *
740             RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
741         sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
742
743         mz = rte_memzone_lookup(name);
744         if (mz == NULL) {
745                 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
746                                                  RTE_CACHE_LINE_SIZE);
747                 if (mz == NULL) {
748                         RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
749                                         PRId32, rte_errno);
750                         return -rte_errno;
751                 }
752         }
753
754         event_eth_rx_adapter = mz->addr;
755         return 0;
756 }
757
758 static inline struct rte_event_eth_rx_adapter *
759 rxa_id_to_adapter(uint8_t id)
760 {
761         return event_eth_rx_adapter ?
762                 event_eth_rx_adapter[id] : NULL;
763 }
764
765 static int
766 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
767                 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
768 {
769         int ret;
770         struct rte_eventdev *dev;
771         struct rte_event_dev_config dev_conf;
772         int started;
773         uint8_t port_id;
774         struct rte_event_port_conf *port_conf = arg;
775         struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
776
777         dev = &rte_eventdevs[rx_adapter->eventdev_id];
778         dev_conf = dev->data->dev_conf;
779
780         started = dev->data->dev_started;
781         if (started)
782                 rte_event_dev_stop(dev_id);
783         port_id = dev_conf.nb_event_ports;
784         dev_conf.nb_event_ports += 1;
785         ret = rte_event_dev_configure(dev_id, &dev_conf);
786         if (ret) {
787                 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
788                                                 dev_id);
789                 if (started) {
790                         if (rte_event_dev_start(dev_id))
791                                 return -EIO;
792                 }
793                 return ret;
794         }
795
796         ret = rte_event_port_setup(dev_id, port_id, port_conf);
797         if (ret) {
798                 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
799                                         port_id);
800                 return ret;
801         }
802
803         conf->event_port_id = port_id;
804         conf->max_nb_rx = 128;
805         if (started)
806                 ret = rte_event_dev_start(dev_id);
807         rx_adapter->default_cb_arg = 1;
808         return ret;
809 }
810
811 static int
812 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
813 {
814         int ret;
815         struct rte_service_spec service;
816         struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
817
818         if (rx_adapter->service_inited)
819                 return 0;
820
821         memset(&service, 0, sizeof(service));
822         snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
823                 "rte_event_eth_rx_adapter_%d", id);
824         service.socket_id = rx_adapter->socket_id;
825         service.callback = rxa_service_func;
826         service.callback_userdata = rx_adapter;
827         /* Service function handles locking for queue add/del updates */
828         service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
829         ret = rte_service_component_register(&service, &rx_adapter->service_id);
830         if (ret) {
831                 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
832                         service.name, ret);
833                 return ret;
834         }
835
836         ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
837                 &rx_adapter_conf, rx_adapter->conf_arg);
838         if (ret) {
839                 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
840                         ret);
841                 goto err_done;
842         }
843         rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
844         rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
845         rx_adapter->service_inited = 1;
846         return 0;
847
848 err_done:
849         rte_service_component_unregister(rx_adapter->service_id);
850         return ret;
851 }
852
853 static void
854 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
855                 struct eth_device_info *dev_info,
856                 int32_t rx_queue_id,
857                 uint8_t add)
858 {
859         struct eth_rx_queue_info *queue_info;
860         int enabled;
861         uint16_t i;
862
863         if (dev_info->rx_queue == NULL)
864                 return;
865
866         if (rx_queue_id == -1) {
867                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
868                         rxa_update_queue(rx_adapter, dev_info, i, add);
869         } else {
870                 queue_info = &dev_info->rx_queue[rx_queue_id];
871                 enabled = queue_info->queue_enabled;
872                 if (add) {
873                         rx_adapter->nb_queues += !enabled;
874                         dev_info->nb_dev_queues += !enabled;
875                 } else {
876                         rx_adapter->nb_queues -= enabled;
877                         dev_info->nb_dev_queues -= enabled;
878                 }
879                 queue_info->queue_enabled = !!add;
880         }
881 }
882
883 static void
884 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
885         struct eth_device_info *dev_info,
886         int32_t rx_queue_id)
887 {
888         int pollq;
889
890         if (rx_adapter->nb_queues == 0)
891                 return;
892
893         if (rx_queue_id == -1) {
894                 uint16_t nb_rx_queues;
895                 uint16_t i;
896
897                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
898                 for (i = 0; i < nb_rx_queues; i++)
899                         rxa_sw_del(rx_adapter, dev_info, i);
900                 return;
901         }
902
903         pollq = rxa_polled_queue(dev_info, rx_queue_id);
904         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
905         rx_adapter->num_rx_polled -= pollq;
906         dev_info->nb_rx_poll -= pollq;
907 }
908
909 static void
910 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
911         struct eth_device_info *dev_info,
912         int32_t rx_queue_id,
913         const struct rte_event_eth_rx_adapter_queue_conf *conf)
914 {
915         struct eth_rx_queue_info *queue_info;
916         const struct rte_event *ev = &conf->ev;
917         int pollq;
918
919         if (rx_queue_id == -1) {
920                 uint16_t nb_rx_queues;
921                 uint16_t i;
922
923                 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
924                 for (i = 0; i < nb_rx_queues; i++)
925                         rxa_add_queue(rx_adapter, dev_info, i, conf);
926                 return;
927         }
928
929         pollq = rxa_polled_queue(dev_info, rx_queue_id);
930
931         queue_info = &dev_info->rx_queue[rx_queue_id];
932         queue_info->event_queue_id = ev->queue_id;
933         queue_info->sched_type = ev->sched_type;
934         queue_info->priority = ev->priority;
935         queue_info->wt = conf->servicing_weight;
936
937         if (conf->rx_queue_flags &
938                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
939                 queue_info->flow_id = ev->flow_id;
940                 queue_info->flow_id_mask = ~0;
941         }
942
943         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
944         if (rxa_polled_queue(dev_info, rx_queue_id)) {
945                 rx_adapter->num_rx_polled += !pollq;
946                 dev_info->nb_rx_poll += !pollq;
947         }
948 }
949
950 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
951                 uint16_t eth_dev_id,
952                 int rx_queue_id,
953                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
954 {
955         struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
956         struct rte_event_eth_rx_adapter_queue_conf temp_conf;
957         int ret;
958         struct eth_rx_poll_entry *rx_poll;
959         struct eth_rx_queue_info *rx_queue;
960         uint32_t *rx_wrr;
961         uint16_t nb_rx_queues;
962         uint32_t nb_rx_poll, nb_wrr;
963
964         if (queue_conf->servicing_weight == 0) {
965
966                 struct rte_eth_dev_data *data = dev_info->dev->data;
967                 if (data->dev_conf.intr_conf.rxq) {
968                         RTE_EDEV_LOG_ERR("Interrupt driven queues"
969                                         " not supported");
970                         return -ENOTSUP;
971                 }
972                 temp_conf = *queue_conf;
973
974                 /* If Rx interrupts are disabled set wt = 1 */
975                 temp_conf.servicing_weight = 1;
976                 queue_conf = &temp_conf;
977         }
978
979         nb_rx_queues = dev_info->dev->data->nb_rx_queues;
980         rx_queue = dev_info->rx_queue;
981
982         if (dev_info->rx_queue == NULL) {
983                 dev_info->rx_queue =
984                     rte_zmalloc_socket(rx_adapter->mem_name,
985                                        nb_rx_queues *
986                                        sizeof(struct eth_rx_queue_info), 0,
987                                        rx_adapter->socket_id);
988                 if (dev_info->rx_queue == NULL)
989                         return -ENOMEM;
990         }
991         rx_wrr = NULL;
992         rx_poll = NULL;
993
994         rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
995                         queue_conf->servicing_weight,
996                         &nb_rx_poll, &nb_wrr);
997
998         ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
999                                 &rx_poll, &rx_wrr);
1000         if (ret)
1001                 goto err_free_rxqueue;
1002
1003         rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
1004         rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
1005
1006         rte_free(rx_adapter->eth_rx_poll);
1007         rte_free(rx_adapter->wrr_sched);
1008
1009         rx_adapter->eth_rx_poll = rx_poll;
1010         rx_adapter->wrr_sched = rx_wrr;
1011         rx_adapter->wrr_len = nb_wrr;
1012         return 0;
1013
1014 err_free_rxqueue:
1015         if (rx_queue == NULL) {
1016                 rte_free(dev_info->rx_queue);
1017                 dev_info->rx_queue = NULL;
1018         }
1019
1020         rte_free(rx_poll);
1021         rte_free(rx_wrr);
1022
1023         return 0;
1024 }
1025
1026 static int
1027 rxa_ctrl(uint8_t id, int start)
1028 {
1029         struct rte_event_eth_rx_adapter *rx_adapter;
1030         struct rte_eventdev *dev;
1031         struct eth_device_info *dev_info;
1032         uint32_t i;
1033         int use_service = 0;
1034         int stop = !start;
1035
1036         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1037         rx_adapter = rxa_id_to_adapter(id);
1038         if (rx_adapter == NULL)
1039                 return -EINVAL;
1040
1041         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1042
1043         RTE_ETH_FOREACH_DEV(i) {
1044                 dev_info = &rx_adapter->eth_devices[i];
1045                 /* if start  check for num dev queues */
1046                 if (start && !dev_info->nb_dev_queues)
1047                         continue;
1048                 /* if stop check if dev has been started */
1049                 if (stop && !dev_info->dev_rx_started)
1050                         continue;
1051                 use_service |= !dev_info->internal_event_port;
1052                 dev_info->dev_rx_started = start;
1053                 if (dev_info->internal_event_port == 0)
1054                         continue;
1055                 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
1056                                                 &rte_eth_devices[i]) :
1057                         (*dev->dev_ops->eth_rx_adapter_stop)(dev,
1058                                                 &rte_eth_devices[i]);
1059         }
1060
1061         if (use_service) {
1062                 rte_spinlock_lock(&rx_adapter->rx_lock);
1063                 rx_adapter->rxa_started = start;
1064                 rte_service_runstate_set(rx_adapter->service_id, start);
1065                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1066         }
1067
1068         return 0;
1069 }
1070
1071 int
1072 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
1073                                 rte_event_eth_rx_adapter_conf_cb conf_cb,
1074                                 void *conf_arg)
1075 {
1076         struct rte_event_eth_rx_adapter *rx_adapter;
1077         int ret;
1078         int socket_id;
1079         uint16_t i;
1080         char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
1081         const uint8_t default_rss_key[] = {
1082                 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
1083                 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
1084                 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
1085                 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
1086                 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
1087         };
1088
1089         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1090         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1091         if (conf_cb == NULL)
1092                 return -EINVAL;
1093
1094         if (event_eth_rx_adapter == NULL) {
1095                 ret = rte_event_eth_rx_adapter_init();
1096                 if (ret)
1097                         return ret;
1098         }
1099
1100         rx_adapter = rxa_id_to_adapter(id);
1101         if (rx_adapter != NULL) {
1102                 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
1103                 return -EEXIST;
1104         }
1105
1106         socket_id = rte_event_dev_socket_id(dev_id);
1107         snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
1108                 "rte_event_eth_rx_adapter_%d",
1109                 id);
1110
1111         rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
1112                         RTE_CACHE_LINE_SIZE, socket_id);
1113         if (rx_adapter == NULL) {
1114                 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
1115                 return -ENOMEM;
1116         }
1117
1118         rx_adapter->eventdev_id = dev_id;
1119         rx_adapter->socket_id = socket_id;
1120         rx_adapter->conf_cb = conf_cb;
1121         rx_adapter->conf_arg = conf_arg;
1122         strcpy(rx_adapter->mem_name, mem_name);
1123         rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
1124                                         /* FIXME: incompatible with hotplug */
1125                                         rte_eth_dev_count_total() *
1126                                         sizeof(struct eth_device_info), 0,
1127                                         socket_id);
1128         rte_convert_rss_key((const uint32_t *)default_rss_key,
1129                         (uint32_t *)rx_adapter->rss_key_be,
1130                             RTE_DIM(default_rss_key));
1131
1132         if (rx_adapter->eth_devices == NULL) {
1133                 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
1134                 rte_free(rx_adapter);
1135                 return -ENOMEM;
1136         }
1137         rte_spinlock_init(&rx_adapter->rx_lock);
1138         RTE_ETH_FOREACH_DEV(i)
1139                 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
1140
1141         event_eth_rx_adapter[id] = rx_adapter;
1142         if (conf_cb == rxa_default_conf_cb)
1143                 rx_adapter->default_cb_arg = 1;
1144         return 0;
1145 }
1146
1147 int
1148 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
1149                 struct rte_event_port_conf *port_config)
1150 {
1151         struct rte_event_port_conf *pc;
1152         int ret;
1153
1154         if (port_config == NULL)
1155                 return -EINVAL;
1156         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1157
1158         pc = rte_malloc(NULL, sizeof(*pc), 0);
1159         if (pc == NULL)
1160                 return -ENOMEM;
1161         *pc = *port_config;
1162         ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
1163                                         rxa_default_conf_cb,
1164                                         pc);
1165         if (ret)
1166                 rte_free(pc);
1167         return ret;
1168 }
1169
1170 int
1171 rte_event_eth_rx_adapter_free(uint8_t id)
1172 {
1173         struct rte_event_eth_rx_adapter *rx_adapter;
1174
1175         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1176
1177         rx_adapter = rxa_id_to_adapter(id);
1178         if (rx_adapter == NULL)
1179                 return -EINVAL;
1180
1181         if (rx_adapter->nb_queues) {
1182                 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
1183                                 rx_adapter->nb_queues);
1184                 return -EBUSY;
1185         }
1186
1187         if (rx_adapter->default_cb_arg)
1188                 rte_free(rx_adapter->conf_arg);
1189         rte_free(rx_adapter->eth_devices);
1190         rte_free(rx_adapter);
1191         event_eth_rx_adapter[id] = NULL;
1192
1193         return 0;
1194 }
1195
1196 int
1197 rte_event_eth_rx_adapter_queue_add(uint8_t id,
1198                 uint16_t eth_dev_id,
1199                 int32_t rx_queue_id,
1200                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
1201 {
1202         int ret;
1203         uint32_t cap;
1204         struct rte_event_eth_rx_adapter *rx_adapter;
1205         struct rte_eventdev *dev;
1206         struct eth_device_info *dev_info;
1207
1208         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1209         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1210
1211         rx_adapter = rxa_id_to_adapter(id);
1212         if ((rx_adapter == NULL) || (queue_conf == NULL))
1213                 return -EINVAL;
1214
1215         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1216         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
1217                                                 eth_dev_id,
1218                                                 &cap);
1219         if (ret) {
1220                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
1221                         "eth port %" PRIu16, id, eth_dev_id);
1222                 return ret;
1223         }
1224
1225         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
1226                 && (queue_conf->rx_queue_flags &
1227                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
1228                 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
1229                                 " eth port: %" PRIu16 " adapter id: %" PRIu8,
1230                                 eth_dev_id, id);
1231                 return -EINVAL;
1232         }
1233
1234         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
1235                 (rx_queue_id != -1)) {
1236                 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
1237                         "event queue, eth port: %" PRIu16 " adapter id: %"
1238                         PRIu8, eth_dev_id, id);
1239                 return -EINVAL;
1240         }
1241
1242         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1243                         rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1244                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1245                          (uint16_t)rx_queue_id);
1246                 return -EINVAL;
1247         }
1248
1249         dev_info = &rx_adapter->eth_devices[eth_dev_id];
1250
1251         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1252                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
1253                                         -ENOTSUP);
1254                 if (dev_info->rx_queue == NULL) {
1255                         dev_info->rx_queue =
1256                             rte_zmalloc_socket(rx_adapter->mem_name,
1257                                         dev_info->dev->data->nb_rx_queues *
1258                                         sizeof(struct eth_rx_queue_info), 0,
1259                                         rx_adapter->socket_id);
1260                         if (dev_info->rx_queue == NULL)
1261                                 return -ENOMEM;
1262                 }
1263
1264                 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
1265                                 &rte_eth_devices[eth_dev_id],
1266                                 rx_queue_id, queue_conf);
1267                 if (ret == 0) {
1268                         dev_info->internal_event_port = 1;
1269                         rxa_update_queue(rx_adapter,
1270                                         &rx_adapter->eth_devices[eth_dev_id],
1271                                         rx_queue_id,
1272                                         1);
1273                 }
1274         } else {
1275                 rte_spinlock_lock(&rx_adapter->rx_lock);
1276                 dev_info->internal_event_port = 0;
1277                 ret = rxa_init_service(rx_adapter, id);
1278                 if (ret == 0) {
1279                         uint32_t service_id = rx_adapter->service_id;
1280                         ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
1281                                         queue_conf);
1282                         rte_service_component_runstate_set(service_id,
1283                                 rxa_sw_adapter_queue_count(rx_adapter));
1284                 }
1285                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1286         }
1287
1288         if (ret)
1289                 return ret;
1290
1291         return 0;
1292 }
1293
1294 int
1295 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
1296                                 int32_t rx_queue_id)
1297 {
1298         int ret = 0;
1299         struct rte_eventdev *dev;
1300         struct rte_event_eth_rx_adapter *rx_adapter;
1301         struct eth_device_info *dev_info;
1302         uint32_t cap;
1303         uint32_t nb_rx_poll = 0;
1304         uint32_t nb_wrr = 0;
1305         struct eth_rx_poll_entry *rx_poll = NULL;
1306         uint32_t *rx_wrr = NULL;
1307
1308         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1309         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1310
1311         rx_adapter = rxa_id_to_adapter(id);
1312         if (rx_adapter == NULL)
1313                 return -EINVAL;
1314
1315         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1316         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
1317                                                 eth_dev_id,
1318                                                 &cap);
1319         if (ret)
1320                 return ret;
1321
1322         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1323                 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1324                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1325                          (uint16_t)rx_queue_id);
1326                 return -EINVAL;
1327         }
1328
1329         dev_info = &rx_adapter->eth_devices[eth_dev_id];
1330
1331         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1332                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
1333                                  -ENOTSUP);
1334                 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
1335                                                 &rte_eth_devices[eth_dev_id],
1336                                                 rx_queue_id);
1337                 if (ret == 0) {
1338                         rxa_update_queue(rx_adapter,
1339                                         &rx_adapter->eth_devices[eth_dev_id],
1340                                         rx_queue_id,
1341                                         0);
1342                         if (dev_info->nb_dev_queues == 0) {
1343                                 rte_free(dev_info->rx_queue);
1344                                 dev_info->rx_queue = NULL;
1345                         }
1346                 }
1347         } else {
1348                 rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
1349                         &nb_rx_poll, &nb_wrr);
1350                 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
1351                         &rx_poll, &rx_wrr);
1352                 if (ret)
1353                         return ret;
1354
1355                 rte_spinlock_lock(&rx_adapter->rx_lock);
1356                 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
1357                 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
1358
1359                 rte_free(rx_adapter->eth_rx_poll);
1360                 rte_free(rx_adapter->wrr_sched);
1361
1362                 rx_adapter->eth_rx_poll = rx_poll;
1363                 rx_adapter->num_rx_polled = nb_rx_poll;
1364                 rx_adapter->wrr_sched = rx_wrr;
1365                 rx_adapter->wrr_len = nb_wrr;
1366
1367                 if (dev_info->nb_dev_queues == 0) {
1368                         rte_free(dev_info->rx_queue);
1369                         dev_info->rx_queue = NULL;
1370                 }
1371                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1372
1373                 rte_service_component_runstate_set(rx_adapter->service_id,
1374                                 rxa_sw_adapter_queue_count(rx_adapter));
1375         }
1376
1377         return ret;
1378 }
1379
1380
1381 int
1382 rte_event_eth_rx_adapter_start(uint8_t id)
1383 {
1384         return rxa_ctrl(id, 1);
1385 }
1386
1387 int
1388 rte_event_eth_rx_adapter_stop(uint8_t id)
1389 {
1390         return rxa_ctrl(id, 0);
1391 }
1392
1393 int
1394 rte_event_eth_rx_adapter_stats_get(uint8_t id,
1395                                struct rte_event_eth_rx_adapter_stats *stats)
1396 {
1397         struct rte_event_eth_rx_adapter *rx_adapter;
1398         struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
1399         struct rte_event_eth_rx_adapter_stats dev_stats;
1400         struct rte_eventdev *dev;
1401         struct eth_device_info *dev_info;
1402         uint32_t i;
1403         int ret;
1404
1405         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1406
1407         rx_adapter = rxa_id_to_adapter(id);
1408         if (rx_adapter  == NULL || stats == NULL)
1409                 return -EINVAL;
1410
1411         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1412         memset(stats, 0, sizeof(*stats));
1413         RTE_ETH_FOREACH_DEV(i) {
1414                 dev_info = &rx_adapter->eth_devices[i];
1415                 if (dev_info->internal_event_port == 0 ||
1416                         dev->dev_ops->eth_rx_adapter_stats_get == NULL)
1417                         continue;
1418                 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
1419                                                 &rte_eth_devices[i],
1420                                                 &dev_stats);
1421                 if (ret)
1422                         continue;
1423                 dev_stats_sum.rx_packets += dev_stats.rx_packets;
1424                 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
1425         }
1426
1427         if (rx_adapter->service_inited)
1428                 *stats = rx_adapter->stats;
1429
1430         stats->rx_packets += dev_stats_sum.rx_packets;
1431         stats->rx_enq_count += dev_stats_sum.rx_enq_count;
1432         return 0;
1433 }
1434
1435 int
1436 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
1437 {
1438         struct rte_event_eth_rx_adapter *rx_adapter;
1439         struct rte_eventdev *dev;
1440         struct eth_device_info *dev_info;
1441         uint32_t i;
1442
1443         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1444
1445         rx_adapter = rxa_id_to_adapter(id);
1446         if (rx_adapter == NULL)
1447                 return -EINVAL;
1448
1449         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1450         RTE_ETH_FOREACH_DEV(i) {
1451                 dev_info = &rx_adapter->eth_devices[i];
1452                 if (dev_info->internal_event_port == 0 ||
1453                         dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
1454                         continue;
1455                 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
1456                                                         &rte_eth_devices[i]);
1457         }
1458
1459         memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
1460         return 0;
1461 }
1462
1463 int
1464 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
1465 {
1466         struct rte_event_eth_rx_adapter *rx_adapter;
1467
1468         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1469
1470         rx_adapter = rxa_id_to_adapter(id);
1471         if (rx_adapter == NULL || service_id == NULL)
1472                 return -EINVAL;
1473
1474         if (rx_adapter->service_inited)
1475                 *service_id = rx_adapter->service_id;
1476
1477         return rx_adapter->service_inited ? 0 : -ESRCH;
1478 }