eventdev: standardize Rx adapter internal function names
[dpdk.git] / lib / librte_eventdev / rte_event_eth_rx_adapter.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #include <rte_cycles.h>
6 #include <rte_common.h>
7 #include <rte_dev.h>
8 #include <rte_errno.h>
9 #include <rte_ethdev.h>
10 #include <rte_log.h>
11 #include <rte_malloc.h>
12 #include <rte_service_component.h>
13 #include <rte_thash.h>
14
15 #include "rte_eventdev.h"
16 #include "rte_eventdev_pmd.h"
17 #include "rte_event_eth_rx_adapter.h"
18
19 #define BATCH_SIZE              32
20 #define BLOCK_CNT_THRESHOLD     10
21 #define ETH_EVENT_BUFFER_SIZE   (4*BATCH_SIZE)
22
23 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
24 #define ETH_RX_ADAPTER_MEM_NAME_LEN     32
25
26 #define RSS_KEY_SIZE    40
27
28 /*
29  * There is an instance of this struct per polled Rx queue added to the
30  * adapter
31  */
32 struct eth_rx_poll_entry {
33         /* Eth port to poll */
34         uint16_t eth_dev_id;
35         /* Eth rx queue to poll */
36         uint16_t eth_rx_qid;
37 };
38
39 /* Instance per adapter */
40 struct rte_eth_event_enqueue_buffer {
41         /* Count of events in this buffer */
42         uint16_t count;
43         /* Array of events in this buffer */
44         struct rte_event events[ETH_EVENT_BUFFER_SIZE];
45 };
46
47 struct rte_event_eth_rx_adapter {
48         /* RSS key */
49         uint8_t rss_key_be[RSS_KEY_SIZE];
50         /* Event device identifier */
51         uint8_t eventdev_id;
52         /* Per ethernet device structure */
53         struct eth_device_info *eth_devices;
54         /* Event port identifier */
55         uint8_t event_port_id;
56         /* Lock to serialize config updates with service function */
57         rte_spinlock_t rx_lock;
58         /* Max mbufs processed in any service function invocation */
59         uint32_t max_nb_rx;
60         /* Receive queues that need to be polled */
61         struct eth_rx_poll_entry *eth_rx_poll;
62         /* Size of the eth_rx_poll array */
63         uint16_t num_rx_polled;
64         /* Weighted round robin schedule */
65         uint32_t *wrr_sched;
66         /* wrr_sched[] size */
67         uint32_t wrr_len;
68         /* Next entry in wrr[] to begin polling */
69         uint32_t wrr_pos;
70         /* Event burst buffer */
71         struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
72         /* Per adapter stats */
73         struct rte_event_eth_rx_adapter_stats stats;
74         /* Block count, counts up to BLOCK_CNT_THRESHOLD */
75         uint16_t enq_block_count;
76         /* Block start ts */
77         uint64_t rx_enq_block_start_ts;
78         /* Configuration callback for rte_service configuration */
79         rte_event_eth_rx_adapter_conf_cb conf_cb;
80         /* Configuration callback argument */
81         void *conf_arg;
82         /* Set if  default_cb is being used */
83         int default_cb_arg;
84         /* Service initialization state */
85         uint8_t service_inited;
86         /* Total count of Rx queues in adapter */
87         uint32_t nb_queues;
88         /* Memory allocation name */
89         char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
90         /* Socket identifier cached from eventdev */
91         int socket_id;
92         /* Per adapter EAL service */
93         uint32_t service_id;
94         /* Adapter started flag */
95         uint8_t rxa_started;
96 } __rte_cache_aligned;
97
98 /* Per eth device */
99 struct eth_device_info {
100         struct rte_eth_dev *dev;
101         struct eth_rx_queue_info *rx_queue;
102         /* Set if ethdev->eventdev packet transfer uses a
103          * hardware mechanism
104          */
105         uint8_t internal_event_port;
106         /* Set if the adapter is processing rx queues for
107          * this eth device and packet processing has been
108          * started, allows for the code to know if the PMD
109          * rx_adapter_stop callback needs to be invoked
110          */
111         uint8_t dev_rx_started;
112         /* If nb_dev_queues > 0, the start callback will
113          * be invoked if not already invoked
114          */
115         uint16_t nb_dev_queues;
116 };
117
118 /* Per Rx queue */
119 struct eth_rx_queue_info {
120         int queue_enabled;      /* True if added */
121         uint16_t wt;            /* Polling weight */
122         uint8_t event_queue_id; /* Event queue to enqueue packets to */
123         uint8_t sched_type;     /* Sched type for events */
124         uint8_t priority;       /* Event priority */
125         uint32_t flow_id;       /* App provided flow identifier */
126         uint32_t flow_id_mask;  /* Set to ~0 if app provides flow id else 0 */
127 };
128
129 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
130
131 static inline int
132 rxa_validate_id(uint8_t id)
133 {
134         return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
135 }
136
137 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
138         if (!rxa_validate_id(id)) { \
139                 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
140                 return retval; \
141         } \
142 } while (0)
143
144 static inline int
145 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
146 {
147         return rx_adapter->num_rx_polled;
148 }
149
150 /* Greatest common divisor */
151 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
152 {
153         uint16_t r = a % b;
154
155         return r ? rxa_gcd_u16(b, r) : b;
156 }
157
158 /* Returns the next queue in the polling sequence
159  *
160  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
161  */
162 static int
163 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
164          unsigned int n, int *cw,
165          struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
166          uint16_t gcd, int prev)
167 {
168         int i = prev;
169         uint16_t w;
170
171         while (1) {
172                 uint16_t q;
173                 uint16_t d;
174
175                 i = (i + 1) % n;
176                 if (i == 0) {
177                         *cw = *cw - gcd;
178                         if (*cw <= 0)
179                                 *cw = max_wt;
180                 }
181
182                 q = eth_rx_poll[i].eth_rx_qid;
183                 d = eth_rx_poll[i].eth_dev_id;
184                 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
185
186                 if ((int)w >= *cw)
187                         return i;
188         }
189 }
190
191 /* Precalculate WRR polling sequence for all queues in rx_adapter */
192 static int
193 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter)
194 {
195         uint16_t d;
196         uint16_t q;
197         unsigned int i;
198
199         /* Initialize variables for calculation of wrr schedule */
200         uint16_t max_wrr_pos = 0;
201         unsigned int poll_q = 0;
202         uint16_t max_wt = 0;
203         uint16_t gcd = 0;
204
205         struct eth_rx_poll_entry *rx_poll = NULL;
206         uint32_t *rx_wrr = NULL;
207
208         if (rx_adapter->num_rx_polled) {
209                 size_t len = RTE_ALIGN(rx_adapter->num_rx_polled *
210                                 sizeof(*rx_adapter->eth_rx_poll),
211                                 RTE_CACHE_LINE_SIZE);
212                 rx_poll = rte_zmalloc_socket(rx_adapter->mem_name,
213                                              len,
214                                              RTE_CACHE_LINE_SIZE,
215                                              rx_adapter->socket_id);
216                 if (rx_poll == NULL)
217                         return -ENOMEM;
218
219                 /* Generate array of all queues to poll, the size of this
220                  * array is poll_q
221                  */
222                 RTE_ETH_FOREACH_DEV(d) {
223                         uint16_t nb_rx_queues;
224                         struct eth_device_info *dev_info =
225                                         &rx_adapter->eth_devices[d];
226                         nb_rx_queues = dev_info->dev->data->nb_rx_queues;
227                         if (dev_info->rx_queue == NULL)
228                                 continue;
229                         if (dev_info->internal_event_port)
230                                 continue;
231                         for (q = 0; q < nb_rx_queues; q++) {
232                                 struct eth_rx_queue_info *queue_info =
233                                         &dev_info->rx_queue[q];
234                                 if (queue_info->queue_enabled == 0)
235                                         continue;
236
237                                 uint16_t wt = queue_info->wt;
238                                 rx_poll[poll_q].eth_dev_id = d;
239                                 rx_poll[poll_q].eth_rx_qid = q;
240                                 max_wrr_pos += wt;
241                                 max_wt = RTE_MAX(max_wt, wt);
242                                 gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
243                                 poll_q++;
244                         }
245                 }
246
247                 len = RTE_ALIGN(max_wrr_pos * sizeof(*rx_wrr),
248                                 RTE_CACHE_LINE_SIZE);
249                 rx_wrr = rte_zmalloc_socket(rx_adapter->mem_name,
250                                             len,
251                                             RTE_CACHE_LINE_SIZE,
252                                             rx_adapter->socket_id);
253                 if (rx_wrr == NULL) {
254                         rte_free(rx_poll);
255                         return -ENOMEM;
256                 }
257
258                 /* Generate polling sequence based on weights */
259                 int prev = -1;
260                 int cw = -1;
261                 for (i = 0; i < max_wrr_pos; i++) {
262                         rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
263                                              rx_poll, max_wt, gcd, prev);
264                         prev = rx_wrr[i];
265                 }
266         }
267
268         rte_free(rx_adapter->eth_rx_poll);
269         rte_free(rx_adapter->wrr_sched);
270
271         rx_adapter->eth_rx_poll = rx_poll;
272         rx_adapter->wrr_sched = rx_wrr;
273         rx_adapter->wrr_len = max_wrr_pos;
274
275         return 0;
276 }
277
278 static inline void
279 rxa_mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr,
280         struct ipv6_hdr **ipv6_hdr)
281 {
282         struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(m, struct ether_hdr *);
283         struct vlan_hdr *vlan_hdr;
284
285         *ipv4_hdr = NULL;
286         *ipv6_hdr = NULL;
287
288         switch (eth_hdr->ether_type) {
289         case RTE_BE16(ETHER_TYPE_IPv4):
290                 *ipv4_hdr = (struct ipv4_hdr *)(eth_hdr + 1);
291                 break;
292
293         case RTE_BE16(ETHER_TYPE_IPv6):
294                 *ipv6_hdr = (struct ipv6_hdr *)(eth_hdr + 1);
295                 break;
296
297         case RTE_BE16(ETHER_TYPE_VLAN):
298                 vlan_hdr = (struct vlan_hdr *)(eth_hdr + 1);
299                 switch (vlan_hdr->eth_proto) {
300                 case RTE_BE16(ETHER_TYPE_IPv4):
301                         *ipv4_hdr = (struct ipv4_hdr *)(vlan_hdr + 1);
302                         break;
303                 case RTE_BE16(ETHER_TYPE_IPv6):
304                         *ipv6_hdr = (struct ipv6_hdr *)(vlan_hdr + 1);
305                         break;
306                 default:
307                         break;
308                 }
309                 break;
310
311         default:
312                 break;
313         }
314 }
315
316 /* Calculate RSS hash for IPv4/6 */
317 static inline uint32_t
318 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
319 {
320         uint32_t input_len;
321         void *tuple;
322         struct rte_ipv4_tuple ipv4_tuple;
323         struct rte_ipv6_tuple ipv6_tuple;
324         struct ipv4_hdr *ipv4_hdr;
325         struct ipv6_hdr *ipv6_hdr;
326
327         rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
328
329         if (ipv4_hdr) {
330                 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
331                 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
332                 tuple = &ipv4_tuple;
333                 input_len = RTE_THASH_V4_L3_LEN;
334         } else if (ipv6_hdr) {
335                 rte_thash_load_v6_addrs(ipv6_hdr,
336                                         (union rte_thash_tuple *)&ipv6_tuple);
337                 tuple = &ipv6_tuple;
338                 input_len = RTE_THASH_V6_L3_LEN;
339         } else
340                 return 0;
341
342         return rte_softrss_be(tuple, input_len, rss_key_be);
343 }
344
345 static inline int
346 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
347 {
348         return !!rx_adapter->enq_block_count;
349 }
350
351 static inline void
352 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
353 {
354         if (rx_adapter->rx_enq_block_start_ts)
355                 return;
356
357         rx_adapter->enq_block_count++;
358         if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
359                 return;
360
361         rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
362 }
363
364 static inline void
365 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
366                     struct rte_event_eth_rx_adapter_stats *stats)
367 {
368         if (unlikely(!stats->rx_enq_start_ts))
369                 stats->rx_enq_start_ts = rte_get_tsc_cycles();
370
371         if (likely(!rxa_enq_blocked(rx_adapter)))
372                 return;
373
374         rx_adapter->enq_block_count = 0;
375         if (rx_adapter->rx_enq_block_start_ts) {
376                 stats->rx_enq_end_ts = rte_get_tsc_cycles();
377                 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
378                     rx_adapter->rx_enq_block_start_ts;
379                 rx_adapter->rx_enq_block_start_ts = 0;
380         }
381 }
382
383 /* Add event to buffer, free space check is done prior to calling
384  * this function
385  */
386 static inline void
387 rxa_buffer_event(struct rte_event_eth_rx_adapter *rx_adapter,
388                 struct rte_event *ev)
389 {
390         struct rte_eth_event_enqueue_buffer *buf =
391             &rx_adapter->event_enqueue_buffer;
392         rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event));
393 }
394
395 /* Enqueue buffered events to event device */
396 static inline uint16_t
397 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
398 {
399         struct rte_eth_event_enqueue_buffer *buf =
400             &rx_adapter->event_enqueue_buffer;
401         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
402
403         uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
404                                         rx_adapter->event_port_id,
405                                         buf->events,
406                                         buf->count);
407         if (n != buf->count) {
408                 memmove(buf->events,
409                         &buf->events[n],
410                         (buf->count - n) * sizeof(struct rte_event));
411                 stats->rx_enq_retry++;
412         }
413
414         n ? rxa_enq_block_end_ts(rx_adapter, stats) :
415                 rxa_enq_block_start_ts(rx_adapter);
416
417         buf->count -= n;
418         stats->rx_enq_count += n;
419
420         return n;
421 }
422
423 static inline void
424 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
425                 uint16_t eth_dev_id,
426                 uint16_t rx_queue_id,
427                 struct rte_mbuf **mbufs,
428                 uint16_t num)
429 {
430         uint32_t i;
431         struct eth_device_info *eth_device_info =
432                                         &rx_adapter->eth_devices[eth_dev_id];
433         struct eth_rx_queue_info *eth_rx_queue_info =
434                                         &eth_device_info->rx_queue[rx_queue_id];
435
436         int32_t qid = eth_rx_queue_info->event_queue_id;
437         uint8_t sched_type = eth_rx_queue_info->sched_type;
438         uint8_t priority = eth_rx_queue_info->priority;
439         uint32_t flow_id;
440         struct rte_event events[BATCH_SIZE];
441         struct rte_mbuf *m = mbufs[0];
442         uint32_t rss_mask;
443         uint32_t rss;
444         int do_rss;
445         uint64_t ts;
446
447         /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
448         rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
449         do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
450
451         if ((m->ol_flags & PKT_RX_TIMESTAMP) == 0) {
452                 ts = rte_get_tsc_cycles();
453                 for (i = 0; i < num; i++) {
454                         m = mbufs[i];
455
456                         m->timestamp = ts;
457                         m->ol_flags |= PKT_RX_TIMESTAMP;
458                 }
459         }
460
461         for (i = 0; i < num; i++) {
462                 m = mbufs[i];
463                 struct rte_event *ev = &events[i];
464
465                 rss = do_rss ?
466                         rxa_do_softrss(m, rx_adapter->rss_key_be) :
467                         m->hash.rss;
468                 flow_id =
469                     eth_rx_queue_info->flow_id &
470                                 eth_rx_queue_info->flow_id_mask;
471                 flow_id |= rss & ~eth_rx_queue_info->flow_id_mask;
472                 ev->flow_id = flow_id;
473                 ev->op = RTE_EVENT_OP_NEW;
474                 ev->sched_type = sched_type;
475                 ev->queue_id = qid;
476                 ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
477                 ev->sub_event_type = 0;
478                 ev->priority = priority;
479                 ev->mbuf = m;
480
481                 rxa_buffer_event(rx_adapter, ev);
482         }
483 }
484
485 /*
486  * Polls receive queues added to the event adapter and enqueues received
487  * packets to the event device.
488  *
489  * The receive code enqueues initially to a temporary buffer, the
490  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
491  *
492  * If there isn't space available in the temporary buffer, packets from the
493  * Rx queue aren't dequeued from the eth device, this back pressures the
494  * eth device, in virtual device environments this back pressure is relayed to
495  * the hypervisor's switching layer where adjustments can be made to deal with
496  * it.
497  */
498 static inline void
499 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
500 {
501         uint32_t num_queue;
502         uint16_t n;
503         uint32_t nb_rx = 0;
504         struct rte_mbuf *mbufs[BATCH_SIZE];
505         struct rte_eth_event_enqueue_buffer *buf;
506         uint32_t wrr_pos;
507         uint32_t max_nb_rx;
508
509         wrr_pos = rx_adapter->wrr_pos;
510         max_nb_rx = rx_adapter->max_nb_rx;
511         buf = &rx_adapter->event_enqueue_buffer;
512         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
513
514         /* Iterate through a WRR sequence */
515         for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
516                 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
517                 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
518                 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
519
520                 /* Don't do a batch dequeue from the rx queue if there isn't
521                  * enough space in the enqueue buffer.
522                  */
523                 if (buf->count >= BATCH_SIZE)
524                         rxa_flush_event_buffer(rx_adapter);
525                 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
526                         rx_adapter->wrr_pos = wrr_pos;
527                         return;
528                 }
529
530                 stats->rx_poll_count++;
531                 n = rte_eth_rx_burst(d, qid, mbufs, BATCH_SIZE);
532
533                 if (n) {
534                         stats->rx_packets += n;
535                         /* The check before rte_eth_rx_burst() ensures that
536                          * all n mbufs can be buffered
537                          */
538                         rxa_buffer_mbufs(rx_adapter, d, qid, mbufs, n);
539                         nb_rx += n;
540                         if (nb_rx > max_nb_rx) {
541                                 rx_adapter->wrr_pos =
542                                     (wrr_pos + 1) % rx_adapter->wrr_len;
543                                 break;
544                         }
545                 }
546
547                 if (++wrr_pos == rx_adapter->wrr_len)
548                         wrr_pos = 0;
549         }
550
551         if (buf->count >= BATCH_SIZE)
552                 rxa_flush_event_buffer(rx_adapter);
553 }
554
555 static int
556 rxa_service_func(void *args)
557 {
558         struct rte_event_eth_rx_adapter *rx_adapter = args;
559
560         if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
561                 return 0;
562         if (!rx_adapter->rxa_started) {
563                 return 0;
564                 rte_spinlock_unlock(&rx_adapter->rx_lock);
565         }
566         rxa_poll(rx_adapter);
567         rte_spinlock_unlock(&rx_adapter->rx_lock);
568         return 0;
569 }
570
571 static int
572 rte_event_eth_rx_adapter_init(void)
573 {
574         const char *name = "rte_event_eth_rx_adapter_array";
575         const struct rte_memzone *mz;
576         unsigned int sz;
577
578         sz = sizeof(*event_eth_rx_adapter) *
579             RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
580         sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
581
582         mz = rte_memzone_lookup(name);
583         if (mz == NULL) {
584                 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
585                                                  RTE_CACHE_LINE_SIZE);
586                 if (mz == NULL) {
587                         RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
588                                         PRId32, rte_errno);
589                         return -rte_errno;
590                 }
591         }
592
593         event_eth_rx_adapter = mz->addr;
594         return 0;
595 }
596
597 static inline struct rte_event_eth_rx_adapter *
598 rxa_id_to_adapter(uint8_t id)
599 {
600         return event_eth_rx_adapter ?
601                 event_eth_rx_adapter[id] : NULL;
602 }
603
604 static int
605 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
606                 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
607 {
608         int ret;
609         struct rte_eventdev *dev;
610         struct rte_event_dev_config dev_conf;
611         int started;
612         uint8_t port_id;
613         struct rte_event_port_conf *port_conf = arg;
614         struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
615
616         dev = &rte_eventdevs[rx_adapter->eventdev_id];
617         dev_conf = dev->data->dev_conf;
618
619         started = dev->data->dev_started;
620         if (started)
621                 rte_event_dev_stop(dev_id);
622         port_id = dev_conf.nb_event_ports;
623         dev_conf.nb_event_ports += 1;
624         ret = rte_event_dev_configure(dev_id, &dev_conf);
625         if (ret) {
626                 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
627                                                 dev_id);
628                 if (started) {
629                         if (rte_event_dev_start(dev_id))
630                                 return -EIO;
631                 }
632                 return ret;
633         }
634
635         ret = rte_event_port_setup(dev_id, port_id, port_conf);
636         if (ret) {
637                 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
638                                         port_id);
639                 return ret;
640         }
641
642         conf->event_port_id = port_id;
643         conf->max_nb_rx = 128;
644         if (started)
645                 ret = rte_event_dev_start(dev_id);
646         rx_adapter->default_cb_arg = 1;
647         return ret;
648 }
649
650 static int
651 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
652 {
653         int ret;
654         struct rte_service_spec service;
655         struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
656
657         if (rx_adapter->service_inited)
658                 return 0;
659
660         memset(&service, 0, sizeof(service));
661         snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
662                 "rte_event_eth_rx_adapter_%d", id);
663         service.socket_id = rx_adapter->socket_id;
664         service.callback = rxa_service_func;
665         service.callback_userdata = rx_adapter;
666         /* Service function handles locking for queue add/del updates */
667         service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
668         ret = rte_service_component_register(&service, &rx_adapter->service_id);
669         if (ret) {
670                 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
671                         service.name, ret);
672                 return ret;
673         }
674
675         ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
676                 &rx_adapter_conf, rx_adapter->conf_arg);
677         if (ret) {
678                 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
679                         ret);
680                 goto err_done;
681         }
682         rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
683         rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
684         rx_adapter->service_inited = 1;
685         return 0;
686
687 err_done:
688         rte_service_component_unregister(rx_adapter->service_id);
689         return ret;
690 }
691
692 static void
693 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
694                 struct eth_device_info *dev_info,
695                 int32_t rx_queue_id,
696                 uint8_t add)
697 {
698         struct eth_rx_queue_info *queue_info;
699         int enabled;
700         uint16_t i;
701
702         if (dev_info->rx_queue == NULL)
703                 return;
704
705         if (rx_queue_id == -1) {
706                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
707                         rxa_update_queue(rx_adapter, dev_info, i, add);
708         } else {
709                 queue_info = &dev_info->rx_queue[rx_queue_id];
710                 enabled = queue_info->queue_enabled;
711                 if (add) {
712                         rx_adapter->nb_queues += !enabled;
713                         dev_info->nb_dev_queues += !enabled;
714                 } else {
715                         rx_adapter->nb_queues -= enabled;
716                         dev_info->nb_dev_queues -= enabled;
717                 }
718                 queue_info->queue_enabled = !!add;
719         }
720 }
721
722 static int
723 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
724         struct eth_device_info *dev_info,
725         uint16_t rx_queue_id)
726 {
727         struct eth_rx_queue_info *queue_info;
728
729         if (rx_adapter->nb_queues == 0)
730                 return 0;
731
732         queue_info = &dev_info->rx_queue[rx_queue_id];
733         rx_adapter->num_rx_polled -= queue_info->queue_enabled;
734         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
735         return 0;
736 }
737
738 static void
739 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
740         struct eth_device_info *dev_info,
741         uint16_t rx_queue_id,
742         const struct rte_event_eth_rx_adapter_queue_conf *conf)
743
744 {
745         struct eth_rx_queue_info *queue_info;
746         const struct rte_event *ev = &conf->ev;
747
748         queue_info = &dev_info->rx_queue[rx_queue_id];
749         queue_info->event_queue_id = ev->queue_id;
750         queue_info->sched_type = ev->sched_type;
751         queue_info->priority = ev->priority;
752         queue_info->wt = conf->servicing_weight;
753
754         if (conf->rx_queue_flags &
755                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
756                 queue_info->flow_id = ev->flow_id;
757                 queue_info->flow_id_mask = ~0;
758         }
759
760         /* The same queue can be added more than once */
761         rx_adapter->num_rx_polled += !queue_info->queue_enabled;
762         rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
763 }
764
765 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
766                 uint16_t eth_dev_id,
767                 int rx_queue_id,
768                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
769 {
770         struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
771         struct rte_event_eth_rx_adapter_queue_conf temp_conf;
772         uint32_t i;
773         int ret;
774
775         if (queue_conf->servicing_weight == 0) {
776
777                 struct rte_eth_dev_data *data = dev_info->dev->data;
778                 if (data->dev_conf.intr_conf.rxq) {
779                         RTE_EDEV_LOG_ERR("Interrupt driven queues"
780                                         " not supported");
781                         return -ENOTSUP;
782                 }
783                 temp_conf = *queue_conf;
784
785                 /* If Rx interrupts are disabled set wt = 1 */
786                 temp_conf.servicing_weight = 1;
787                 queue_conf = &temp_conf;
788         }
789
790         if (dev_info->rx_queue == NULL) {
791                 dev_info->rx_queue =
792                     rte_zmalloc_socket(rx_adapter->mem_name,
793                                        dev_info->dev->data->nb_rx_queues *
794                                        sizeof(struct eth_rx_queue_info), 0,
795                                        rx_adapter->socket_id);
796                 if (dev_info->rx_queue == NULL)
797                         return -ENOMEM;
798         }
799
800         if (rx_queue_id == -1) {
801                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
802                         rxa_add_queue(rx_adapter, dev_info, i, queue_conf);
803         } else {
804                 rxa_add_queue(rx_adapter, dev_info, (uint16_t)rx_queue_id,
805                         queue_conf);
806         }
807
808         ret = rxa_calc_wrr_sequence(rx_adapter);
809         if (ret) {
810                 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
811                 return ret;
812         }
813
814         return ret;
815 }
816
817 static int
818 rxa_ctrl(uint8_t id, int start)
819 {
820         struct rte_event_eth_rx_adapter *rx_adapter;
821         struct rte_eventdev *dev;
822         struct eth_device_info *dev_info;
823         uint32_t i;
824         int use_service = 0;
825         int stop = !start;
826
827         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
828         rx_adapter = rxa_id_to_adapter(id);
829         if (rx_adapter == NULL)
830                 return -EINVAL;
831
832         dev = &rte_eventdevs[rx_adapter->eventdev_id];
833
834         RTE_ETH_FOREACH_DEV(i) {
835                 dev_info = &rx_adapter->eth_devices[i];
836                 /* if start  check for num dev queues */
837                 if (start && !dev_info->nb_dev_queues)
838                         continue;
839                 /* if stop check if dev has been started */
840                 if (stop && !dev_info->dev_rx_started)
841                         continue;
842                 use_service |= !dev_info->internal_event_port;
843                 dev_info->dev_rx_started = start;
844                 if (dev_info->internal_event_port == 0)
845                         continue;
846                 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
847                                                 &rte_eth_devices[i]) :
848                         (*dev->dev_ops->eth_rx_adapter_stop)(dev,
849                                                 &rte_eth_devices[i]);
850         }
851
852         if (use_service) {
853                 rte_spinlock_lock(&rx_adapter->rx_lock);
854                 rx_adapter->rxa_started = start;
855                 rte_service_runstate_set(rx_adapter->service_id, start);
856                 rte_spinlock_unlock(&rx_adapter->rx_lock);
857         }
858
859         return 0;
860 }
861
862 int
863 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
864                                 rte_event_eth_rx_adapter_conf_cb conf_cb,
865                                 void *conf_arg)
866 {
867         struct rte_event_eth_rx_adapter *rx_adapter;
868         int ret;
869         int socket_id;
870         uint16_t i;
871         char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
872         const uint8_t default_rss_key[] = {
873                 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
874                 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
875                 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
876                 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
877                 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
878         };
879
880         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
881         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
882         if (conf_cb == NULL)
883                 return -EINVAL;
884
885         if (event_eth_rx_adapter == NULL) {
886                 ret = rte_event_eth_rx_adapter_init();
887                 if (ret)
888                         return ret;
889         }
890
891         rx_adapter = rxa_id_to_adapter(id);
892         if (rx_adapter != NULL) {
893                 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
894                 return -EEXIST;
895         }
896
897         socket_id = rte_event_dev_socket_id(dev_id);
898         snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
899                 "rte_event_eth_rx_adapter_%d",
900                 id);
901
902         rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
903                         RTE_CACHE_LINE_SIZE, socket_id);
904         if (rx_adapter == NULL) {
905                 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
906                 return -ENOMEM;
907         }
908
909         rx_adapter->eventdev_id = dev_id;
910         rx_adapter->socket_id = socket_id;
911         rx_adapter->conf_cb = conf_cb;
912         rx_adapter->conf_arg = conf_arg;
913         strcpy(rx_adapter->mem_name, mem_name);
914         rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
915                                         /* FIXME: incompatible with hotplug */
916                                         rte_eth_dev_count_total() *
917                                         sizeof(struct eth_device_info), 0,
918                                         socket_id);
919         rte_convert_rss_key((const uint32_t *)default_rss_key,
920                         (uint32_t *)rx_adapter->rss_key_be,
921                             RTE_DIM(default_rss_key));
922
923         if (rx_adapter->eth_devices == NULL) {
924                 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
925                 rte_free(rx_adapter);
926                 return -ENOMEM;
927         }
928         rte_spinlock_init(&rx_adapter->rx_lock);
929         RTE_ETH_FOREACH_DEV(i)
930                 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
931
932         event_eth_rx_adapter[id] = rx_adapter;
933         if (conf_cb == rxa_default_conf_cb)
934                 rx_adapter->default_cb_arg = 1;
935         return 0;
936 }
937
938 int
939 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
940                 struct rte_event_port_conf *port_config)
941 {
942         struct rte_event_port_conf *pc;
943         int ret;
944
945         if (port_config == NULL)
946                 return -EINVAL;
947         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
948
949         pc = rte_malloc(NULL, sizeof(*pc), 0);
950         if (pc == NULL)
951                 return -ENOMEM;
952         *pc = *port_config;
953         ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
954                                         rxa_default_conf_cb,
955                                         pc);
956         if (ret)
957                 rte_free(pc);
958         return ret;
959 }
960
961 int
962 rte_event_eth_rx_adapter_free(uint8_t id)
963 {
964         struct rte_event_eth_rx_adapter *rx_adapter;
965
966         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
967
968         rx_adapter = rxa_id_to_adapter(id);
969         if (rx_adapter == NULL)
970                 return -EINVAL;
971
972         if (rx_adapter->nb_queues) {
973                 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
974                                 rx_adapter->nb_queues);
975                 return -EBUSY;
976         }
977
978         if (rx_adapter->default_cb_arg)
979                 rte_free(rx_adapter->conf_arg);
980         rte_free(rx_adapter->eth_devices);
981         rte_free(rx_adapter);
982         event_eth_rx_adapter[id] = NULL;
983
984         return 0;
985 }
986
987 int
988 rte_event_eth_rx_adapter_queue_add(uint8_t id,
989                 uint16_t eth_dev_id,
990                 int32_t rx_queue_id,
991                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
992 {
993         int ret;
994         uint32_t cap;
995         struct rte_event_eth_rx_adapter *rx_adapter;
996         struct rte_eventdev *dev;
997         struct eth_device_info *dev_info;
998         int start_service;
999
1000         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1001         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1002
1003         rx_adapter = rxa_id_to_adapter(id);
1004         if ((rx_adapter == NULL) || (queue_conf == NULL))
1005                 return -EINVAL;
1006
1007         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1008         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
1009                                                 eth_dev_id,
1010                                                 &cap);
1011         if (ret) {
1012                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
1013                         "eth port %" PRIu16, id, eth_dev_id);
1014                 return ret;
1015         }
1016
1017         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
1018                 && (queue_conf->rx_queue_flags &
1019                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
1020                 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
1021                                 " eth port: %" PRIu16 " adapter id: %" PRIu8,
1022                                 eth_dev_id, id);
1023                 return -EINVAL;
1024         }
1025
1026         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
1027                 (rx_queue_id != -1)) {
1028                 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
1029                         "event queue, eth port: %" PRIu16 " adapter id: %"
1030                         PRIu8, eth_dev_id, id);
1031                 return -EINVAL;
1032         }
1033
1034         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1035                         rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1036                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1037                          (uint16_t)rx_queue_id);
1038                 return -EINVAL;
1039         }
1040
1041         start_service = 0;
1042         dev_info = &rx_adapter->eth_devices[eth_dev_id];
1043
1044         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1045                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
1046                                         -ENOTSUP);
1047                 if (dev_info->rx_queue == NULL) {
1048                         dev_info->rx_queue =
1049                             rte_zmalloc_socket(rx_adapter->mem_name,
1050                                         dev_info->dev->data->nb_rx_queues *
1051                                         sizeof(struct eth_rx_queue_info), 0,
1052                                         rx_adapter->socket_id);
1053                         if (dev_info->rx_queue == NULL)
1054                                 return -ENOMEM;
1055                 }
1056
1057                 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
1058                                 &rte_eth_devices[eth_dev_id],
1059                                 rx_queue_id, queue_conf);
1060                 if (ret == 0) {
1061                         dev_info->internal_event_port = 1;
1062                         rxa_update_queue(rx_adapter,
1063                                         &rx_adapter->eth_devices[eth_dev_id],
1064                                         rx_queue_id,
1065                                         1);
1066                 }
1067         } else {
1068                 rte_spinlock_lock(&rx_adapter->rx_lock);
1069                 dev_info->internal_event_port = 0;
1070                 ret = rxa_init_service(rx_adapter, id);
1071                 if (ret == 0)
1072                         ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
1073                                         queue_conf);
1074                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1075                 if (ret == 0)
1076                         start_service =
1077                                 !!rxa_sw_adapter_queue_count(rx_adapter);
1078         }
1079
1080         if (ret)
1081                 return ret;
1082
1083         if (start_service)
1084                 rte_service_component_runstate_set(rx_adapter->service_id, 1);
1085
1086         return 0;
1087 }
1088
1089 int
1090 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
1091                                 int32_t rx_queue_id)
1092 {
1093         int ret = 0;
1094         struct rte_eventdev *dev;
1095         struct rte_event_eth_rx_adapter *rx_adapter;
1096         struct eth_device_info *dev_info;
1097         uint32_t cap;
1098         uint16_t i;
1099
1100         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1101         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1102
1103         rx_adapter = rxa_id_to_adapter(id);
1104         if (rx_adapter == NULL)
1105                 return -EINVAL;
1106
1107         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1108         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
1109                                                 eth_dev_id,
1110                                                 &cap);
1111         if (ret)
1112                 return ret;
1113
1114         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1115                 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1116                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1117                          (uint16_t)rx_queue_id);
1118                 return -EINVAL;
1119         }
1120
1121         dev_info = &rx_adapter->eth_devices[eth_dev_id];
1122
1123         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1124                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
1125                                  -ENOTSUP);
1126                 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
1127                                                 &rte_eth_devices[eth_dev_id],
1128                                                 rx_queue_id);
1129                 if (ret == 0) {
1130                         rxa_update_queue(rx_adapter,
1131                                         &rx_adapter->eth_devices[eth_dev_id],
1132                                         rx_queue_id,
1133                                         0);
1134                         if (dev_info->nb_dev_queues == 0) {
1135                                 rte_free(dev_info->rx_queue);
1136                                 dev_info->rx_queue = NULL;
1137                         }
1138                 }
1139         } else {
1140                 int rc;
1141                 rte_spinlock_lock(&rx_adapter->rx_lock);
1142                 if (rx_queue_id == -1) {
1143                         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1144                                 rxa_sw_del(rx_adapter, dev_info, i);
1145                 } else {
1146                         rxa_sw_del(rx_adapter, dev_info, (uint16_t)rx_queue_id);
1147                 }
1148
1149                 rc = rxa_calc_wrr_sequence(rx_adapter);
1150                 if (rc)
1151                         RTE_EDEV_LOG_ERR("WRR recalculation failed %" PRId32,
1152                                         rc);
1153
1154                 if (dev_info->nb_dev_queues == 0) {
1155                         rte_free(dev_info->rx_queue);
1156                         dev_info->rx_queue = NULL;
1157                 }
1158
1159                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1160                 rte_service_component_runstate_set(rx_adapter->service_id,
1161                                 rxa_sw_adapter_queue_count(rx_adapter));
1162         }
1163
1164         return ret;
1165 }
1166
1167
1168 int
1169 rte_event_eth_rx_adapter_start(uint8_t id)
1170 {
1171         return rxa_ctrl(id, 1);
1172 }
1173
1174 int
1175 rte_event_eth_rx_adapter_stop(uint8_t id)
1176 {
1177         return rxa_ctrl(id, 0);
1178 }
1179
1180 int
1181 rte_event_eth_rx_adapter_stats_get(uint8_t id,
1182                                struct rte_event_eth_rx_adapter_stats *stats)
1183 {
1184         struct rte_event_eth_rx_adapter *rx_adapter;
1185         struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
1186         struct rte_event_eth_rx_adapter_stats dev_stats;
1187         struct rte_eventdev *dev;
1188         struct eth_device_info *dev_info;
1189         uint32_t i;
1190         int ret;
1191
1192         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1193
1194         rx_adapter = rxa_id_to_adapter(id);
1195         if (rx_adapter  == NULL || stats == NULL)
1196                 return -EINVAL;
1197
1198         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1199         memset(stats, 0, sizeof(*stats));
1200         RTE_ETH_FOREACH_DEV(i) {
1201                 dev_info = &rx_adapter->eth_devices[i];
1202                 if (dev_info->internal_event_port == 0 ||
1203                         dev->dev_ops->eth_rx_adapter_stats_get == NULL)
1204                         continue;
1205                 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
1206                                                 &rte_eth_devices[i],
1207                                                 &dev_stats);
1208                 if (ret)
1209                         continue;
1210                 dev_stats_sum.rx_packets += dev_stats.rx_packets;
1211                 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
1212         }
1213
1214         if (rx_adapter->service_inited)
1215                 *stats = rx_adapter->stats;
1216
1217         stats->rx_packets += dev_stats_sum.rx_packets;
1218         stats->rx_enq_count += dev_stats_sum.rx_enq_count;
1219         return 0;
1220 }
1221
1222 int
1223 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
1224 {
1225         struct rte_event_eth_rx_adapter *rx_adapter;
1226         struct rte_eventdev *dev;
1227         struct eth_device_info *dev_info;
1228         uint32_t i;
1229
1230         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1231
1232         rx_adapter = rxa_id_to_adapter(id);
1233         if (rx_adapter == NULL)
1234                 return -EINVAL;
1235
1236         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1237         RTE_ETH_FOREACH_DEV(i) {
1238                 dev_info = &rx_adapter->eth_devices[i];
1239                 if (dev_info->internal_event_port == 0 ||
1240                         dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
1241                         continue;
1242                 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
1243                                                         &rte_eth_devices[i]);
1244         }
1245
1246         memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
1247         return 0;
1248 }
1249
1250 int
1251 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
1252 {
1253         struct rte_event_eth_rx_adapter *rx_adapter;
1254
1255         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1256
1257         rx_adapter = rxa_id_to_adapter(id);
1258         if (rx_adapter == NULL || service_id == NULL)
1259                 return -EINVAL;
1260
1261         if (rx_adapter->service_inited)
1262                 *service_id = rx_adapter->service_id;
1263
1264         return rx_adapter->service_inited ? 0 : -ESRCH;
1265 }