eventdev: fix missing update to Rx adaper WRR position
[dpdk.git] / lib / librte_eventdev / rte_event_eth_rx_adapter.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #include <rte_cycles.h>
6 #include <rte_common.h>
7 #include <rte_dev.h>
8 #include <rte_errno.h>
9 #include <rte_ethdev.h>
10 #include <rte_log.h>
11 #include <rte_malloc.h>
12 #include <rte_service_component.h>
13 #include <rte_thash.h>
14
15 #include "rte_eventdev.h"
16 #include "rte_eventdev_pmd.h"
17 #include "rte_event_eth_rx_adapter.h"
18
19 #define BATCH_SIZE              32
20 #define BLOCK_CNT_THRESHOLD     10
21 #define ETH_EVENT_BUFFER_SIZE   (4*BATCH_SIZE)
22
23 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
24 #define ETH_RX_ADAPTER_MEM_NAME_LEN     32
25
26 #define RSS_KEY_SIZE    40
27
28 /*
29  * There is an instance of this struct per polled Rx queue added to the
30  * adapter
31  */
32 struct eth_rx_poll_entry {
33         /* Eth port to poll */
34         uint16_t eth_dev_id;
35         /* Eth rx queue to poll */
36         uint16_t eth_rx_qid;
37 };
38
39 /* Instance per adapter */
40 struct rte_eth_event_enqueue_buffer {
41         /* Count of events in this buffer */
42         uint16_t count;
43         /* Array of events in this buffer */
44         struct rte_event events[ETH_EVENT_BUFFER_SIZE];
45 };
46
47 struct rte_event_eth_rx_adapter {
48         /* RSS key */
49         uint8_t rss_key_be[RSS_KEY_SIZE];
50         /* Event device identifier */
51         uint8_t eventdev_id;
52         /* Per ethernet device structure */
53         struct eth_device_info *eth_devices;
54         /* Event port identifier */
55         uint8_t event_port_id;
56         /* Lock to serialize config updates with service function */
57         rte_spinlock_t rx_lock;
58         /* Max mbufs processed in any service function invocation */
59         uint32_t max_nb_rx;
60         /* Receive queues that need to be polled */
61         struct eth_rx_poll_entry *eth_rx_poll;
62         /* Size of the eth_rx_poll array */
63         uint16_t num_rx_polled;
64         /* Weighted round robin schedule */
65         uint32_t *wrr_sched;
66         /* wrr_sched[] size */
67         uint32_t wrr_len;
68         /* Next entry in wrr[] to begin polling */
69         uint32_t wrr_pos;
70         /* Event burst buffer */
71         struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
72         /* Per adapter stats */
73         struct rte_event_eth_rx_adapter_stats stats;
74         /* Block count, counts up to BLOCK_CNT_THRESHOLD */
75         uint16_t enq_block_count;
76         /* Block start ts */
77         uint64_t rx_enq_block_start_ts;
78         /* Configuration callback for rte_service configuration */
79         rte_event_eth_rx_adapter_conf_cb conf_cb;
80         /* Configuration callback argument */
81         void *conf_arg;
82         /* Set if  default_cb is being used */
83         int default_cb_arg;
84         /* Service initialization state */
85         uint8_t service_inited;
86         /* Total count of Rx queues in adapter */
87         uint32_t nb_queues;
88         /* Memory allocation name */
89         char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
90         /* Socket identifier cached from eventdev */
91         int socket_id;
92         /* Per adapter EAL service */
93         uint32_t service_id;
94 } __rte_cache_aligned;
95
96 /* Per eth device */
97 struct eth_device_info {
98         struct rte_eth_dev *dev;
99         struct eth_rx_queue_info *rx_queue;
100         /* Set if ethdev->eventdev packet transfer uses a
101          * hardware mechanism
102          */
103         uint8_t internal_event_port;
104         /* Set if the adapter is processing rx queues for
105          * this eth device and packet processing has been
106          * started, allows for the code to know if the PMD
107          * rx_adapter_stop callback needs to be invoked
108          */
109         uint8_t dev_rx_started;
110         /* If nb_dev_queues > 0, the start callback will
111          * be invoked if not already invoked
112          */
113         uint16_t nb_dev_queues;
114 };
115
116 /* Per Rx queue */
117 struct eth_rx_queue_info {
118         int queue_enabled;      /* True if added */
119         uint16_t wt;            /* Polling weight */
120         uint8_t event_queue_id; /* Event queue to enqueue packets to */
121         uint8_t sched_type;     /* Sched type for events */
122         uint8_t priority;       /* Event priority */
123         uint32_t flow_id;       /* App provided flow identifier */
124         uint32_t flow_id_mask;  /* Set to ~0 if app provides flow id else 0 */
125 };
126
127 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
128
129 static inline int
130 valid_id(uint8_t id)
131 {
132         return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
133 }
134
135 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
136         if (!valid_id(id)) { \
137                 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
138                 return retval; \
139         } \
140 } while (0)
141
142 static inline int
143 sw_rx_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
144 {
145         return rx_adapter->num_rx_polled;
146 }
147
148 /* Greatest common divisor */
149 static uint16_t gcd_u16(uint16_t a, uint16_t b)
150 {
151         uint16_t r = a % b;
152
153         return r ? gcd_u16(b, r) : b;
154 }
155
156 /* Returns the next queue in the polling sequence
157  *
158  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
159  */
160 static int
161 wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
162          unsigned int n, int *cw,
163          struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
164          uint16_t gcd, int prev)
165 {
166         int i = prev;
167         uint16_t w;
168
169         while (1) {
170                 uint16_t q;
171                 uint16_t d;
172
173                 i = (i + 1) % n;
174                 if (i == 0) {
175                         *cw = *cw - gcd;
176                         if (*cw <= 0)
177                                 *cw = max_wt;
178                 }
179
180                 q = eth_rx_poll[i].eth_rx_qid;
181                 d = eth_rx_poll[i].eth_dev_id;
182                 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
183
184                 if ((int)w >= *cw)
185                         return i;
186         }
187 }
188
189 /* Precalculate WRR polling sequence for all queues in rx_adapter */
190 static int
191 eth_poll_wrr_calc(struct rte_event_eth_rx_adapter *rx_adapter)
192 {
193         uint16_t d;
194         uint16_t q;
195         unsigned int i;
196
197         /* Initialize variables for calculation of wrr schedule */
198         uint16_t max_wrr_pos = 0;
199         unsigned int poll_q = 0;
200         uint16_t max_wt = 0;
201         uint16_t gcd = 0;
202
203         struct eth_rx_poll_entry *rx_poll = NULL;
204         uint32_t *rx_wrr = NULL;
205
206         if (rx_adapter->num_rx_polled) {
207                 size_t len = RTE_ALIGN(rx_adapter->num_rx_polled *
208                                 sizeof(*rx_adapter->eth_rx_poll),
209                                 RTE_CACHE_LINE_SIZE);
210                 rx_poll = rte_zmalloc_socket(rx_adapter->mem_name,
211                                              len,
212                                              RTE_CACHE_LINE_SIZE,
213                                              rx_adapter->socket_id);
214                 if (rx_poll == NULL)
215                         return -ENOMEM;
216
217                 /* Generate array of all queues to poll, the size of this
218                  * array is poll_q
219                  */
220                 RTE_ETH_FOREACH_DEV(d) {
221                         uint16_t nb_rx_queues;
222                         struct eth_device_info *dev_info =
223                                         &rx_adapter->eth_devices[d];
224                         nb_rx_queues = dev_info->dev->data->nb_rx_queues;
225                         if (dev_info->rx_queue == NULL)
226                                 continue;
227                         for (q = 0; q < nb_rx_queues; q++) {
228                                 struct eth_rx_queue_info *queue_info =
229                                         &dev_info->rx_queue[q];
230                                 if (queue_info->queue_enabled == 0)
231                                         continue;
232
233                                 uint16_t wt = queue_info->wt;
234                                 rx_poll[poll_q].eth_dev_id = d;
235                                 rx_poll[poll_q].eth_rx_qid = q;
236                                 max_wrr_pos += wt;
237                                 max_wt = RTE_MAX(max_wt, wt);
238                                 gcd = (gcd) ? gcd_u16(gcd, wt) : wt;
239                                 poll_q++;
240                         }
241                 }
242
243                 len = RTE_ALIGN(max_wrr_pos * sizeof(*rx_wrr),
244                                 RTE_CACHE_LINE_SIZE);
245                 rx_wrr = rte_zmalloc_socket(rx_adapter->mem_name,
246                                             len,
247                                             RTE_CACHE_LINE_SIZE,
248                                             rx_adapter->socket_id);
249                 if (rx_wrr == NULL) {
250                         rte_free(rx_poll);
251                         return -ENOMEM;
252                 }
253
254                 /* Generate polling sequence based on weights */
255                 int prev = -1;
256                 int cw = -1;
257                 for (i = 0; i < max_wrr_pos; i++) {
258                         rx_wrr[i] = wrr_next(rx_adapter, poll_q, &cw,
259                                              rx_poll, max_wt, gcd, prev);
260                         prev = rx_wrr[i];
261                 }
262         }
263
264         rte_free(rx_adapter->eth_rx_poll);
265         rte_free(rx_adapter->wrr_sched);
266
267         rx_adapter->eth_rx_poll = rx_poll;
268         rx_adapter->wrr_sched = rx_wrr;
269         rx_adapter->wrr_len = max_wrr_pos;
270
271         return 0;
272 }
273
274 static inline void
275 mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr,
276         struct ipv6_hdr **ipv6_hdr)
277 {
278         struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(m, struct ether_hdr *);
279         struct vlan_hdr *vlan_hdr;
280
281         *ipv4_hdr = NULL;
282         *ipv6_hdr = NULL;
283
284         switch (eth_hdr->ether_type) {
285         case RTE_BE16(ETHER_TYPE_IPv4):
286                 *ipv4_hdr = (struct ipv4_hdr *)(eth_hdr + 1);
287                 break;
288
289         case RTE_BE16(ETHER_TYPE_IPv6):
290                 *ipv6_hdr = (struct ipv6_hdr *)(eth_hdr + 1);
291                 break;
292
293         case RTE_BE16(ETHER_TYPE_VLAN):
294                 vlan_hdr = (struct vlan_hdr *)(eth_hdr + 1);
295                 switch (vlan_hdr->eth_proto) {
296                 case RTE_BE16(ETHER_TYPE_IPv4):
297                         *ipv4_hdr = (struct ipv4_hdr *)(vlan_hdr + 1);
298                         break;
299                 case RTE_BE16(ETHER_TYPE_IPv6):
300                         *ipv6_hdr = (struct ipv6_hdr *)(vlan_hdr + 1);
301                         break;
302                 default:
303                         break;
304                 }
305                 break;
306
307         default:
308                 break;
309         }
310 }
311
312 /* Calculate RSS hash for IPv4/6 */
313 static inline uint32_t
314 do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
315 {
316         uint32_t input_len;
317         void *tuple;
318         struct rte_ipv4_tuple ipv4_tuple;
319         struct rte_ipv6_tuple ipv6_tuple;
320         struct ipv4_hdr *ipv4_hdr;
321         struct ipv6_hdr *ipv6_hdr;
322
323         mtoip(m, &ipv4_hdr, &ipv6_hdr);
324
325         if (ipv4_hdr) {
326                 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
327                 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
328                 tuple = &ipv4_tuple;
329                 input_len = RTE_THASH_V4_L3_LEN;
330         } else if (ipv6_hdr) {
331                 rte_thash_load_v6_addrs(ipv6_hdr,
332                                         (union rte_thash_tuple *)&ipv6_tuple);
333                 tuple = &ipv6_tuple;
334                 input_len = RTE_THASH_V6_L3_LEN;
335         } else
336                 return 0;
337
338         return rte_softrss_be(tuple, input_len, rss_key_be);
339 }
340
341 static inline int
342 rx_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
343 {
344         return !!rx_adapter->enq_block_count;
345 }
346
347 static inline void
348 rx_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
349 {
350         if (rx_adapter->rx_enq_block_start_ts)
351                 return;
352
353         rx_adapter->enq_block_count++;
354         if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
355                 return;
356
357         rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
358 }
359
360 static inline void
361 rx_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
362                     struct rte_event_eth_rx_adapter_stats *stats)
363 {
364         if (unlikely(!stats->rx_enq_start_ts))
365                 stats->rx_enq_start_ts = rte_get_tsc_cycles();
366
367         if (likely(!rx_enq_blocked(rx_adapter)))
368                 return;
369
370         rx_adapter->enq_block_count = 0;
371         if (rx_adapter->rx_enq_block_start_ts) {
372                 stats->rx_enq_end_ts = rte_get_tsc_cycles();
373                 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
374                     rx_adapter->rx_enq_block_start_ts;
375                 rx_adapter->rx_enq_block_start_ts = 0;
376         }
377 }
378
379 /* Add event to buffer, free space check is done prior to calling
380  * this function
381  */
382 static inline void
383 buf_event_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
384                   struct rte_event *ev)
385 {
386         struct rte_eth_event_enqueue_buffer *buf =
387             &rx_adapter->event_enqueue_buffer;
388         rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event));
389 }
390
391 /* Enqueue buffered events to event device */
392 static inline uint16_t
393 flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
394 {
395         struct rte_eth_event_enqueue_buffer *buf =
396             &rx_adapter->event_enqueue_buffer;
397         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
398
399         uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
400                                         rx_adapter->event_port_id,
401                                         buf->events,
402                                         buf->count);
403         if (n != buf->count) {
404                 memmove(buf->events,
405                         &buf->events[n],
406                         (buf->count - n) * sizeof(struct rte_event));
407                 stats->rx_enq_retry++;
408         }
409
410         n ? rx_enq_block_end_ts(rx_adapter, stats) :
411                 rx_enq_block_start_ts(rx_adapter);
412
413         buf->count -= n;
414         stats->rx_enq_count += n;
415
416         return n;
417 }
418
419 static inline void
420 fill_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter,
421         uint16_t eth_dev_id,
422         uint16_t rx_queue_id,
423         struct rte_mbuf **mbufs,
424         uint16_t num)
425 {
426         uint32_t i;
427         struct eth_device_info *eth_device_info =
428                                         &rx_adapter->eth_devices[eth_dev_id];
429         struct eth_rx_queue_info *eth_rx_queue_info =
430                                         &eth_device_info->rx_queue[rx_queue_id];
431
432         int32_t qid = eth_rx_queue_info->event_queue_id;
433         uint8_t sched_type = eth_rx_queue_info->sched_type;
434         uint8_t priority = eth_rx_queue_info->priority;
435         uint32_t flow_id;
436         struct rte_event events[BATCH_SIZE];
437         struct rte_mbuf *m = mbufs[0];
438         uint32_t rss_mask;
439         uint32_t rss;
440         int do_rss;
441         uint64_t ts;
442
443         /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
444         rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
445         do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
446
447         if ((m->ol_flags & PKT_RX_TIMESTAMP) == 0) {
448                 ts = rte_get_tsc_cycles();
449                 for (i = 0; i < num; i++) {
450                         m = mbufs[i];
451
452                         m->timestamp = ts;
453                         m->ol_flags |= PKT_RX_TIMESTAMP;
454                 }
455         }
456
457         for (i = 0; i < num; i++) {
458                 m = mbufs[i];
459                 struct rte_event *ev = &events[i];
460
461                 rss = do_rss ?
462                         do_softrss(m, rx_adapter->rss_key_be) : m->hash.rss;
463                 flow_id =
464                     eth_rx_queue_info->flow_id &
465                                 eth_rx_queue_info->flow_id_mask;
466                 flow_id |= rss & ~eth_rx_queue_info->flow_id_mask;
467                 ev->flow_id = flow_id;
468                 ev->op = RTE_EVENT_OP_NEW;
469                 ev->sched_type = sched_type;
470                 ev->queue_id = qid;
471                 ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
472                 ev->sub_event_type = 0;
473                 ev->priority = priority;
474                 ev->mbuf = m;
475
476                 buf_event_enqueue(rx_adapter, ev);
477         }
478 }
479
480 /*
481  * Polls receive queues added to the event adapter and enqueues received
482  * packets to the event device.
483  *
484  * The receive code enqueues initially to a temporary buffer, the
485  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
486  *
487  * If there isn't space available in the temporary buffer, packets from the
488  * Rx queue aren't dequeued from the eth device, this back pressures the
489  * eth device, in virtual device environments this back pressure is relayed to
490  * the hypervisor's switching layer where adjustments can be made to deal with
491  * it.
492  */
493 static inline uint32_t
494 eth_rx_poll(struct rte_event_eth_rx_adapter *rx_adapter)
495 {
496         uint32_t num_queue;
497         uint16_t n;
498         uint32_t nb_rx = 0;
499         struct rte_mbuf *mbufs[BATCH_SIZE];
500         struct rte_eth_event_enqueue_buffer *buf;
501         uint32_t wrr_pos;
502         uint32_t max_nb_rx;
503
504         wrr_pos = rx_adapter->wrr_pos;
505         max_nb_rx = rx_adapter->max_nb_rx;
506         buf = &rx_adapter->event_enqueue_buffer;
507         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
508
509         /* Iterate through a WRR sequence */
510         for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
511                 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
512                 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
513                 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
514
515                 /* Don't do a batch dequeue from the rx queue if there isn't
516                  * enough space in the enqueue buffer.
517                  */
518                 if (buf->count >= BATCH_SIZE)
519                         flush_event_buffer(rx_adapter);
520                 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
521                         rx_adapter->wrr_pos = wrr_pos;
522                         break;
523                 }
524
525                 stats->rx_poll_count++;
526                 n = rte_eth_rx_burst(d, qid, mbufs, BATCH_SIZE);
527
528                 if (n) {
529                         stats->rx_packets += n;
530                         /* The check before rte_eth_rx_burst() ensures that
531                          * all n mbufs can be buffered
532                          */
533                         fill_event_buffer(rx_adapter, d, qid, mbufs, n);
534                         nb_rx += n;
535                         if (nb_rx > max_nb_rx) {
536                                 rx_adapter->wrr_pos =
537                                     (wrr_pos + 1) % rx_adapter->wrr_len;
538                                 return nb_rx;
539                         }
540                 }
541
542                 if (++wrr_pos == rx_adapter->wrr_len)
543                         wrr_pos = 0;
544         }
545
546         return nb_rx;
547 }
548
549 static int
550 event_eth_rx_adapter_service_func(void *args)
551 {
552         struct rte_event_eth_rx_adapter *rx_adapter = args;
553         struct rte_eth_event_enqueue_buffer *buf;
554
555         buf = &rx_adapter->event_enqueue_buffer;
556         if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
557                 return 0;
558         if (eth_rx_poll(rx_adapter) == 0 && buf->count)
559                 flush_event_buffer(rx_adapter);
560         rte_spinlock_unlock(&rx_adapter->rx_lock);
561         return 0;
562 }
563
564 static int
565 rte_event_eth_rx_adapter_init(void)
566 {
567         const char *name = "rte_event_eth_rx_adapter_array";
568         const struct rte_memzone *mz;
569         unsigned int sz;
570
571         sz = sizeof(*event_eth_rx_adapter) *
572             RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
573         sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
574
575         mz = rte_memzone_lookup(name);
576         if (mz == NULL) {
577                 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
578                                                  RTE_CACHE_LINE_SIZE);
579                 if (mz == NULL) {
580                         RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
581                                         PRId32, rte_errno);
582                         return -rte_errno;
583                 }
584         }
585
586         event_eth_rx_adapter = mz->addr;
587         return 0;
588 }
589
590 static inline struct rte_event_eth_rx_adapter *
591 id_to_rx_adapter(uint8_t id)
592 {
593         return event_eth_rx_adapter ?
594                 event_eth_rx_adapter[id] : NULL;
595 }
596
597 static int
598 default_conf_cb(uint8_t id, uint8_t dev_id,
599                 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
600 {
601         int ret;
602         struct rte_eventdev *dev;
603         struct rte_event_dev_config dev_conf;
604         int started;
605         uint8_t port_id;
606         struct rte_event_port_conf *port_conf = arg;
607         struct rte_event_eth_rx_adapter *rx_adapter = id_to_rx_adapter(id);
608
609         dev = &rte_eventdevs[rx_adapter->eventdev_id];
610         dev_conf = dev->data->dev_conf;
611
612         started = dev->data->dev_started;
613         if (started)
614                 rte_event_dev_stop(dev_id);
615         port_id = dev_conf.nb_event_ports;
616         dev_conf.nb_event_ports += 1;
617         ret = rte_event_dev_configure(dev_id, &dev_conf);
618         if (ret) {
619                 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
620                                                 dev_id);
621                 if (started) {
622                         if (rte_event_dev_start(dev_id))
623                                 return -EIO;
624                 }
625                 return ret;
626         }
627
628         ret = rte_event_port_setup(dev_id, port_id, port_conf);
629         if (ret) {
630                 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
631                                         port_id);
632                 return ret;
633         }
634
635         conf->event_port_id = port_id;
636         conf->max_nb_rx = 128;
637         if (started)
638                 ret = rte_event_dev_start(dev_id);
639         rx_adapter->default_cb_arg = 1;
640         return ret;
641 }
642
643 static int
644 init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
645 {
646         int ret;
647         struct rte_service_spec service;
648         struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
649
650         if (rx_adapter->service_inited)
651                 return 0;
652
653         memset(&service, 0, sizeof(service));
654         snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
655                 "rte_event_eth_rx_adapter_%d", id);
656         service.socket_id = rx_adapter->socket_id;
657         service.callback = event_eth_rx_adapter_service_func;
658         service.callback_userdata = rx_adapter;
659         /* Service function handles locking for queue add/del updates */
660         service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
661         ret = rte_service_component_register(&service, &rx_adapter->service_id);
662         if (ret) {
663                 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
664                         service.name, ret);
665                 return ret;
666         }
667
668         ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
669                 &rx_adapter_conf, rx_adapter->conf_arg);
670         if (ret) {
671                 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
672                         ret);
673                 goto err_done;
674         }
675         rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
676         rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
677         rx_adapter->service_inited = 1;
678         return 0;
679
680 err_done:
681         rte_service_component_unregister(rx_adapter->service_id);
682         return ret;
683 }
684
685
686 static void
687 update_queue_info(struct rte_event_eth_rx_adapter *rx_adapter,
688                 struct eth_device_info *dev_info,
689                 int32_t rx_queue_id,
690                 uint8_t add)
691 {
692         struct eth_rx_queue_info *queue_info;
693         int enabled;
694         uint16_t i;
695
696         if (dev_info->rx_queue == NULL)
697                 return;
698
699         if (rx_queue_id == -1) {
700                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
701                         update_queue_info(rx_adapter, dev_info, i, add);
702         } else {
703                 queue_info = &dev_info->rx_queue[rx_queue_id];
704                 enabled = queue_info->queue_enabled;
705                 if (add) {
706                         rx_adapter->nb_queues += !enabled;
707                         dev_info->nb_dev_queues += !enabled;
708                 } else {
709                         rx_adapter->nb_queues -= enabled;
710                         dev_info->nb_dev_queues -= enabled;
711                 }
712                 queue_info->queue_enabled = !!add;
713         }
714 }
715
716 static int
717 event_eth_rx_adapter_queue_del(struct rte_event_eth_rx_adapter *rx_adapter,
718                             struct eth_device_info *dev_info,
719                             uint16_t rx_queue_id)
720 {
721         struct eth_rx_queue_info *queue_info;
722
723         if (rx_adapter->nb_queues == 0)
724                 return 0;
725
726         queue_info = &dev_info->rx_queue[rx_queue_id];
727         rx_adapter->num_rx_polled -= queue_info->queue_enabled;
728         update_queue_info(rx_adapter, dev_info, rx_queue_id, 0);
729         return 0;
730 }
731
732 static void
733 event_eth_rx_adapter_queue_add(struct rte_event_eth_rx_adapter *rx_adapter,
734                 struct eth_device_info *dev_info,
735                 uint16_t rx_queue_id,
736                 const struct rte_event_eth_rx_adapter_queue_conf *conf)
737
738 {
739         struct eth_rx_queue_info *queue_info;
740         const struct rte_event *ev = &conf->ev;
741
742         queue_info = &dev_info->rx_queue[rx_queue_id];
743         queue_info->event_queue_id = ev->queue_id;
744         queue_info->sched_type = ev->sched_type;
745         queue_info->priority = ev->priority;
746         queue_info->wt = conf->servicing_weight;
747
748         if (conf->rx_queue_flags &
749                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
750                 queue_info->flow_id = ev->flow_id;
751                 queue_info->flow_id_mask = ~0;
752         }
753
754         /* The same queue can be added more than once */
755         rx_adapter->num_rx_polled += !queue_info->queue_enabled;
756         update_queue_info(rx_adapter, dev_info, rx_queue_id, 1);
757 }
758
759 static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
760                 uint16_t eth_dev_id,
761                 int rx_queue_id,
762                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
763 {
764         struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
765         struct rte_event_eth_rx_adapter_queue_conf temp_conf;
766         uint32_t i;
767         int ret;
768
769         if (queue_conf->servicing_weight == 0) {
770
771                 struct rte_eth_dev_data *data = dev_info->dev->data;
772                 if (data->dev_conf.intr_conf.rxq) {
773                         RTE_EDEV_LOG_ERR("Interrupt driven queues"
774                                         " not supported");
775                         return -ENOTSUP;
776                 }
777                 temp_conf = *queue_conf;
778
779                 /* If Rx interrupts are disabled set wt = 1 */
780                 temp_conf.servicing_weight = 1;
781                 queue_conf = &temp_conf;
782         }
783
784         if (dev_info->rx_queue == NULL) {
785                 dev_info->rx_queue =
786                     rte_zmalloc_socket(rx_adapter->mem_name,
787                                        dev_info->dev->data->nb_rx_queues *
788                                        sizeof(struct eth_rx_queue_info), 0,
789                                        rx_adapter->socket_id);
790                 if (dev_info->rx_queue == NULL)
791                         return -ENOMEM;
792         }
793
794         if (rx_queue_id == -1) {
795                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
796                         event_eth_rx_adapter_queue_add(rx_adapter,
797                                                 dev_info, i,
798                                                 queue_conf);
799         } else {
800                 event_eth_rx_adapter_queue_add(rx_adapter, dev_info,
801                                           (uint16_t)rx_queue_id,
802                                           queue_conf);
803         }
804
805         ret = eth_poll_wrr_calc(rx_adapter);
806         if (ret) {
807                 event_eth_rx_adapter_queue_del(rx_adapter,
808                                         dev_info, rx_queue_id);
809                 return ret;
810         }
811
812         return ret;
813 }
814
815 static int
816 rx_adapter_ctrl(uint8_t id, int start)
817 {
818         struct rte_event_eth_rx_adapter *rx_adapter;
819         struct rte_eventdev *dev;
820         struct eth_device_info *dev_info;
821         uint32_t i;
822         int use_service = 0;
823         int stop = !start;
824
825         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
826         rx_adapter = id_to_rx_adapter(id);
827         if (rx_adapter == NULL)
828                 return -EINVAL;
829
830         dev = &rte_eventdevs[rx_adapter->eventdev_id];
831
832         RTE_ETH_FOREACH_DEV(i) {
833                 dev_info = &rx_adapter->eth_devices[i];
834                 /* if start  check for num dev queues */
835                 if (start && !dev_info->nb_dev_queues)
836                         continue;
837                 /* if stop check if dev has been started */
838                 if (stop && !dev_info->dev_rx_started)
839                         continue;
840                 use_service |= !dev_info->internal_event_port;
841                 dev_info->dev_rx_started = start;
842                 if (dev_info->internal_event_port == 0)
843                         continue;
844                 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
845                                                 &rte_eth_devices[i]) :
846                         (*dev->dev_ops->eth_rx_adapter_stop)(dev,
847                                                 &rte_eth_devices[i]);
848         }
849
850         if (use_service)
851                 rte_service_runstate_set(rx_adapter->service_id, start);
852
853         return 0;
854 }
855
856 int
857 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
858                                 rte_event_eth_rx_adapter_conf_cb conf_cb,
859                                 void *conf_arg)
860 {
861         struct rte_event_eth_rx_adapter *rx_adapter;
862         int ret;
863         int socket_id;
864         uint16_t i;
865         char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
866         const uint8_t default_rss_key[] = {
867                 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
868                 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
869                 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
870                 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
871                 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
872         };
873
874         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
875         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
876         if (conf_cb == NULL)
877                 return -EINVAL;
878
879         if (event_eth_rx_adapter == NULL) {
880                 ret = rte_event_eth_rx_adapter_init();
881                 if (ret)
882                         return ret;
883         }
884
885         rx_adapter = id_to_rx_adapter(id);
886         if (rx_adapter != NULL) {
887                 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
888                 return -EEXIST;
889         }
890
891         socket_id = rte_event_dev_socket_id(dev_id);
892         snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
893                 "rte_event_eth_rx_adapter_%d",
894                 id);
895
896         rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
897                         RTE_CACHE_LINE_SIZE, socket_id);
898         if (rx_adapter == NULL) {
899                 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
900                 return -ENOMEM;
901         }
902
903         rx_adapter->eventdev_id = dev_id;
904         rx_adapter->socket_id = socket_id;
905         rx_adapter->conf_cb = conf_cb;
906         rx_adapter->conf_arg = conf_arg;
907         strcpy(rx_adapter->mem_name, mem_name);
908         rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
909                                         /* FIXME: incompatible with hotplug */
910                                         rte_eth_dev_count_total() *
911                                         sizeof(struct eth_device_info), 0,
912                                         socket_id);
913         rte_convert_rss_key((const uint32_t *)default_rss_key,
914                         (uint32_t *)rx_adapter->rss_key_be,
915                             RTE_DIM(default_rss_key));
916
917         if (rx_adapter->eth_devices == NULL) {
918                 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
919                 rte_free(rx_adapter);
920                 return -ENOMEM;
921         }
922         rte_spinlock_init(&rx_adapter->rx_lock);
923         RTE_ETH_FOREACH_DEV(i)
924                 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
925
926         event_eth_rx_adapter[id] = rx_adapter;
927         if (conf_cb == default_conf_cb)
928                 rx_adapter->default_cb_arg = 1;
929         return 0;
930 }
931
932 int
933 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
934                 struct rte_event_port_conf *port_config)
935 {
936         struct rte_event_port_conf *pc;
937         int ret;
938
939         if (port_config == NULL)
940                 return -EINVAL;
941         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
942
943         pc = rte_malloc(NULL, sizeof(*pc), 0);
944         if (pc == NULL)
945                 return -ENOMEM;
946         *pc = *port_config;
947         ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
948                                         default_conf_cb,
949                                         pc);
950         if (ret)
951                 rte_free(pc);
952         return ret;
953 }
954
955 int
956 rte_event_eth_rx_adapter_free(uint8_t id)
957 {
958         struct rte_event_eth_rx_adapter *rx_adapter;
959
960         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
961
962         rx_adapter = id_to_rx_adapter(id);
963         if (rx_adapter == NULL)
964                 return -EINVAL;
965
966         if (rx_adapter->nb_queues) {
967                 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
968                                 rx_adapter->nb_queues);
969                 return -EBUSY;
970         }
971
972         if (rx_adapter->default_cb_arg)
973                 rte_free(rx_adapter->conf_arg);
974         rte_free(rx_adapter->eth_devices);
975         rte_free(rx_adapter);
976         event_eth_rx_adapter[id] = NULL;
977
978         return 0;
979 }
980
981 int
982 rte_event_eth_rx_adapter_queue_add(uint8_t id,
983                 uint16_t eth_dev_id,
984                 int32_t rx_queue_id,
985                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
986 {
987         int ret;
988         uint32_t cap;
989         struct rte_event_eth_rx_adapter *rx_adapter;
990         struct rte_eventdev *dev;
991         struct eth_device_info *dev_info;
992         int start_service;
993
994         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
995         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
996
997         rx_adapter = id_to_rx_adapter(id);
998         if ((rx_adapter == NULL) || (queue_conf == NULL))
999                 return -EINVAL;
1000
1001         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1002         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
1003                                                 eth_dev_id,
1004                                                 &cap);
1005         if (ret) {
1006                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
1007                         "eth port %" PRIu16, id, eth_dev_id);
1008                 return ret;
1009         }
1010
1011         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
1012                 && (queue_conf->rx_queue_flags &
1013                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
1014                 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
1015                                 " eth port: %" PRIu16 " adapter id: %" PRIu8,
1016                                 eth_dev_id, id);
1017                 return -EINVAL;
1018         }
1019
1020         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
1021                 (rx_queue_id != -1)) {
1022                 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
1023                         "event queue, eth port: %" PRIu16 " adapter id: %"
1024                         PRIu8, eth_dev_id, id);
1025                 return -EINVAL;
1026         }
1027
1028         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1029                         rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1030                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1031                          (uint16_t)rx_queue_id);
1032                 return -EINVAL;
1033         }
1034
1035         start_service = 0;
1036         dev_info = &rx_adapter->eth_devices[eth_dev_id];
1037
1038         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1039                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
1040                                         -ENOTSUP);
1041                 if (dev_info->rx_queue == NULL) {
1042                         dev_info->rx_queue =
1043                             rte_zmalloc_socket(rx_adapter->mem_name,
1044                                         dev_info->dev->data->nb_rx_queues *
1045                                         sizeof(struct eth_rx_queue_info), 0,
1046                                         rx_adapter->socket_id);
1047                         if (dev_info->rx_queue == NULL)
1048                                 return -ENOMEM;
1049                 }
1050
1051                 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
1052                                 &rte_eth_devices[eth_dev_id],
1053                                 rx_queue_id, queue_conf);
1054                 if (ret == 0) {
1055                         update_queue_info(rx_adapter,
1056                                         &rx_adapter->eth_devices[eth_dev_id],
1057                                         rx_queue_id,
1058                                         1);
1059                 }
1060         } else {
1061                 rte_spinlock_lock(&rx_adapter->rx_lock);
1062                 ret = init_service(rx_adapter, id);
1063                 if (ret == 0)
1064                         ret = add_rx_queue(rx_adapter, eth_dev_id, rx_queue_id,
1065                                         queue_conf);
1066                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1067                 if (ret == 0)
1068                         start_service = !!sw_rx_adapter_queue_count(rx_adapter);
1069         }
1070
1071         if (ret)
1072                 return ret;
1073
1074         if (start_service)
1075                 rte_service_component_runstate_set(rx_adapter->service_id, 1);
1076
1077         return 0;
1078 }
1079
1080 int
1081 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
1082                                 int32_t rx_queue_id)
1083 {
1084         int ret = 0;
1085         struct rte_eventdev *dev;
1086         struct rte_event_eth_rx_adapter *rx_adapter;
1087         struct eth_device_info *dev_info;
1088         uint32_t cap;
1089         uint16_t i;
1090
1091         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1092         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1093
1094         rx_adapter = id_to_rx_adapter(id);
1095         if (rx_adapter == NULL)
1096                 return -EINVAL;
1097
1098         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1099         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
1100                                                 eth_dev_id,
1101                                                 &cap);
1102         if (ret)
1103                 return ret;
1104
1105         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1106                 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1107                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1108                          (uint16_t)rx_queue_id);
1109                 return -EINVAL;
1110         }
1111
1112         dev_info = &rx_adapter->eth_devices[eth_dev_id];
1113
1114         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1115                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
1116                                  -ENOTSUP);
1117                 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
1118                                                 &rte_eth_devices[eth_dev_id],
1119                                                 rx_queue_id);
1120                 if (ret == 0) {
1121                         update_queue_info(rx_adapter,
1122                                         &rx_adapter->eth_devices[eth_dev_id],
1123                                         rx_queue_id,
1124                                         0);
1125                         if (dev_info->nb_dev_queues == 0) {
1126                                 rte_free(dev_info->rx_queue);
1127                                 dev_info->rx_queue = NULL;
1128                         }
1129                 }
1130         } else {
1131                 int rc;
1132                 rte_spinlock_lock(&rx_adapter->rx_lock);
1133                 if (rx_queue_id == -1) {
1134                         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1135                                 event_eth_rx_adapter_queue_del(rx_adapter,
1136                                                         dev_info,
1137                                                         i);
1138                 } else {
1139                         event_eth_rx_adapter_queue_del(rx_adapter,
1140                                                 dev_info,
1141                                                 (uint16_t)rx_queue_id);
1142                 }
1143
1144                 rc = eth_poll_wrr_calc(rx_adapter);
1145                 if (rc)
1146                         RTE_EDEV_LOG_ERR("WRR recalculation failed %" PRId32,
1147                                         rc);
1148
1149                 if (dev_info->nb_dev_queues == 0) {
1150                         rte_free(dev_info->rx_queue);
1151                         dev_info->rx_queue = NULL;
1152                 }
1153
1154                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1155                 rte_service_component_runstate_set(rx_adapter->service_id,
1156                                 sw_rx_adapter_queue_count(rx_adapter));
1157         }
1158
1159         return ret;
1160 }
1161
1162
1163 int
1164 rte_event_eth_rx_adapter_start(uint8_t id)
1165 {
1166         return rx_adapter_ctrl(id, 1);
1167 }
1168
1169 int
1170 rte_event_eth_rx_adapter_stop(uint8_t id)
1171 {
1172         return rx_adapter_ctrl(id, 0);
1173 }
1174
1175 int
1176 rte_event_eth_rx_adapter_stats_get(uint8_t id,
1177                                struct rte_event_eth_rx_adapter_stats *stats)
1178 {
1179         struct rte_event_eth_rx_adapter *rx_adapter;
1180         struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
1181         struct rte_event_eth_rx_adapter_stats dev_stats;
1182         struct rte_eventdev *dev;
1183         struct eth_device_info *dev_info;
1184         uint32_t i;
1185         int ret;
1186
1187         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1188
1189         rx_adapter = id_to_rx_adapter(id);
1190         if (rx_adapter  == NULL || stats == NULL)
1191                 return -EINVAL;
1192
1193         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1194         memset(stats, 0, sizeof(*stats));
1195         RTE_ETH_FOREACH_DEV(i) {
1196                 dev_info = &rx_adapter->eth_devices[i];
1197                 if (dev_info->internal_event_port == 0 ||
1198                         dev->dev_ops->eth_rx_adapter_stats_get == NULL)
1199                         continue;
1200                 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
1201                                                 &rte_eth_devices[i],
1202                                                 &dev_stats);
1203                 if (ret)
1204                         continue;
1205                 dev_stats_sum.rx_packets += dev_stats.rx_packets;
1206                 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
1207         }
1208
1209         if (rx_adapter->service_inited)
1210                 *stats = rx_adapter->stats;
1211
1212         stats->rx_packets += dev_stats_sum.rx_packets;
1213         stats->rx_enq_count += dev_stats_sum.rx_enq_count;
1214         return 0;
1215 }
1216
1217 int
1218 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
1219 {
1220         struct rte_event_eth_rx_adapter *rx_adapter;
1221         struct rte_eventdev *dev;
1222         struct eth_device_info *dev_info;
1223         uint32_t i;
1224
1225         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1226
1227         rx_adapter = id_to_rx_adapter(id);
1228         if (rx_adapter == NULL)
1229                 return -EINVAL;
1230
1231         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1232         RTE_ETH_FOREACH_DEV(i) {
1233                 dev_info = &rx_adapter->eth_devices[i];
1234                 if (dev_info->internal_event_port == 0 ||
1235                         dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
1236                         continue;
1237                 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
1238                                                         &rte_eth_devices[i]);
1239         }
1240
1241         memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
1242         return 0;
1243 }
1244
1245 int
1246 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
1247 {
1248         struct rte_event_eth_rx_adapter *rx_adapter;
1249
1250         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1251
1252         rx_adapter = id_to_rx_adapter(id);
1253         if (rx_adapter == NULL || service_id == NULL)
1254                 return -EINVAL;
1255
1256         if (rx_adapter->service_inited)
1257                 *service_id = rx_adapter->service_id;
1258
1259         return rx_adapter->service_inited ? 0 : -ESRCH;
1260 }