eventdev: convert eth Rx adapter files to SPDX license tag
[dpdk.git] / lib / librte_eventdev / rte_event_eth_rx_adapter.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #include <rte_cycles.h>
6 #include <rte_common.h>
7 #include <rte_dev.h>
8 #include <rte_errno.h>
9 #include <rte_ethdev.h>
10 #include <rte_log.h>
11 #include <rte_malloc.h>
12 #include <rte_service_component.h>
13 #include <rte_thash.h>
14
15 #include "rte_eventdev.h"
16 #include "rte_eventdev_pmd.h"
17 #include "rte_event_eth_rx_adapter.h"
18
19 #define BATCH_SIZE              32
20 #define BLOCK_CNT_THRESHOLD     10
21 #define ETH_EVENT_BUFFER_SIZE   (4*BATCH_SIZE)
22
23 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
24 #define ETH_RX_ADAPTER_MEM_NAME_LEN     32
25
26 #define RSS_KEY_SIZE    40
27
28 /*
29  * There is an instance of this struct per polled Rx queue added to the
30  * adapter
31  */
32 struct eth_rx_poll_entry {
33         /* Eth port to poll */
34         uint8_t eth_dev_id;
35         /* Eth rx queue to poll */
36         uint16_t eth_rx_qid;
37 };
38
39 /* Instance per adapter */
40 struct rte_eth_event_enqueue_buffer {
41         /* Count of events in this buffer */
42         uint16_t count;
43         /* Array of events in this buffer */
44         struct rte_event events[ETH_EVENT_BUFFER_SIZE];
45 };
46
47 struct rte_event_eth_rx_adapter {
48         /* RSS key */
49         uint8_t rss_key_be[RSS_KEY_SIZE];
50         /* Event device identifier */
51         uint8_t eventdev_id;
52         /* Per ethernet device structure */
53         struct eth_device_info *eth_devices;
54         /* Event port identifier */
55         uint8_t event_port_id;
56         /* Lock to serialize config updates with service function */
57         rte_spinlock_t rx_lock;
58         /* Max mbufs processed in any service function invocation */
59         uint32_t max_nb_rx;
60         /* Receive queues that need to be polled */
61         struct eth_rx_poll_entry *eth_rx_poll;
62         /* Size of the eth_rx_poll array */
63         uint16_t num_rx_polled;
64         /* Weighted round robin schedule */
65         uint32_t *wrr_sched;
66         /* wrr_sched[] size */
67         uint32_t wrr_len;
68         /* Next entry in wrr[] to begin polling */
69         uint32_t wrr_pos;
70         /* Event burst buffer */
71         struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
72         /* Per adapter stats */
73         struct rte_event_eth_rx_adapter_stats stats;
74         /* Block count, counts up to BLOCK_CNT_THRESHOLD */
75         uint16_t enq_block_count;
76         /* Block start ts */
77         uint64_t rx_enq_block_start_ts;
78         /* Configuration callback for rte_service configuration */
79         rte_event_eth_rx_adapter_conf_cb conf_cb;
80         /* Configuration callback argument */
81         void *conf_arg;
82         /* Set if  default_cb is being used */
83         int default_cb_arg;
84         /* Service initialization state */
85         uint8_t service_inited;
86         /* Total count of Rx queues in adapter */
87         uint32_t nb_queues;
88         /* Memory allocation name */
89         char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
90         /* Socket identifier cached from eventdev */
91         int socket_id;
92         /* Per adapter EAL service */
93         uint32_t service_id;
94 } __rte_cache_aligned;
95
96 /* Per eth device */
97 struct eth_device_info {
98         struct rte_eth_dev *dev;
99         struct eth_rx_queue_info *rx_queue;
100         /* Set if ethdev->eventdev packet transfer uses a
101          * hardware mechanism
102          */
103         uint8_t internal_event_port;
104         /* Set if the adapter is processing rx queues for
105          * this eth device and packet processing has been
106          * started, allows for the code to know if the PMD
107          * rx_adapter_stop callback needs to be invoked
108          */
109         uint8_t dev_rx_started;
110         /* If nb_dev_queues > 0, the start callback will
111          * be invoked if not already invoked
112          */
113         uint16_t nb_dev_queues;
114 };
115
116 /* Per Rx queue */
117 struct eth_rx_queue_info {
118         int queue_enabled;      /* True if added */
119         uint16_t wt;            /* Polling weight */
120         uint8_t event_queue_id; /* Event queue to enqueue packets to */
121         uint8_t sched_type;     /* Sched type for events */
122         uint8_t priority;       /* Event priority */
123         uint32_t flow_id;       /* App provided flow identifier */
124         uint32_t flow_id_mask;  /* Set to ~0 if app provides flow id else 0 */
125 };
126
127 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
128
129 static inline int
130 valid_id(uint8_t id)
131 {
132         return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
133 }
134
135 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
136         if (!valid_id(id)) { \
137                 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
138                 return retval; \
139         } \
140 } while (0)
141
142 static inline int
143 sw_rx_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
144 {
145         return rx_adapter->num_rx_polled;
146 }
147
148 /* Greatest common divisor */
149 static uint16_t gcd_u16(uint16_t a, uint16_t b)
150 {
151         uint16_t r = a % b;
152
153         return r ? gcd_u16(b, r) : b;
154 }
155
156 /* Returns the next queue in the polling sequence
157  *
158  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
159  */
160 static int
161 wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
162          unsigned int n, int *cw,
163          struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
164          uint16_t gcd, int prev)
165 {
166         int i = prev;
167         uint16_t w;
168
169         while (1) {
170                 uint16_t q;
171                 uint8_t d;
172
173                 i = (i + 1) % n;
174                 if (i == 0) {
175                         *cw = *cw - gcd;
176                         if (*cw <= 0)
177                                 *cw = max_wt;
178                 }
179
180                 q = eth_rx_poll[i].eth_rx_qid;
181                 d = eth_rx_poll[i].eth_dev_id;
182                 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
183
184                 if ((int)w >= *cw)
185                         return i;
186         }
187 }
188
189 /* Precalculate WRR polling sequence for all queues in rx_adapter */
190 static int
191 eth_poll_wrr_calc(struct rte_event_eth_rx_adapter *rx_adapter)
192 {
193         uint8_t d;
194         uint16_t q;
195         unsigned int i;
196
197         /* Initialize variables for calculation of wrr schedule */
198         uint16_t max_wrr_pos = 0;
199         unsigned int poll_q = 0;
200         uint16_t max_wt = 0;
201         uint16_t gcd = 0;
202
203         struct eth_rx_poll_entry *rx_poll = NULL;
204         uint32_t *rx_wrr = NULL;
205
206         if (rx_adapter->num_rx_polled) {
207                 size_t len = RTE_ALIGN(rx_adapter->num_rx_polled *
208                                 sizeof(*rx_adapter->eth_rx_poll),
209                                 RTE_CACHE_LINE_SIZE);
210                 rx_poll = rte_zmalloc_socket(rx_adapter->mem_name,
211                                              len,
212                                              RTE_CACHE_LINE_SIZE,
213                                              rx_adapter->socket_id);
214                 if (rx_poll == NULL)
215                         return -ENOMEM;
216
217                 /* Generate array of all queues to poll, the size of this
218                  * array is poll_q
219                  */
220                 RTE_ETH_FOREACH_DEV(d) {
221                         uint16_t nb_rx_queues;
222                         struct eth_device_info *dev_info =
223                                         &rx_adapter->eth_devices[d];
224                         nb_rx_queues = dev_info->dev->data->nb_rx_queues;
225                         if (dev_info->rx_queue == NULL)
226                                 continue;
227                         for (q = 0; q < nb_rx_queues; q++) {
228                                 struct eth_rx_queue_info *queue_info =
229                                         &dev_info->rx_queue[q];
230                                 if (queue_info->queue_enabled == 0)
231                                         continue;
232
233                                 uint16_t wt = queue_info->wt;
234                                 rx_poll[poll_q].eth_dev_id = d;
235                                 rx_poll[poll_q].eth_rx_qid = q;
236                                 max_wrr_pos += wt;
237                                 max_wt = RTE_MAX(max_wt, wt);
238                                 gcd = (gcd) ? gcd_u16(gcd, wt) : wt;
239                                 poll_q++;
240                         }
241                 }
242
243                 len = RTE_ALIGN(max_wrr_pos * sizeof(*rx_wrr),
244                                 RTE_CACHE_LINE_SIZE);
245                 rx_wrr = rte_zmalloc_socket(rx_adapter->mem_name,
246                                             len,
247                                             RTE_CACHE_LINE_SIZE,
248                                             rx_adapter->socket_id);
249                 if (rx_wrr == NULL) {
250                         rte_free(rx_poll);
251                         return -ENOMEM;
252                 }
253
254                 /* Generate polling sequence based on weights */
255                 int prev = -1;
256                 int cw = -1;
257                 for (i = 0; i < max_wrr_pos; i++) {
258                         rx_wrr[i] = wrr_next(rx_adapter, poll_q, &cw,
259                                              rx_poll, max_wt, gcd, prev);
260                         prev = rx_wrr[i];
261                 }
262         }
263
264         rte_free(rx_adapter->eth_rx_poll);
265         rte_free(rx_adapter->wrr_sched);
266
267         rx_adapter->eth_rx_poll = rx_poll;
268         rx_adapter->wrr_sched = rx_wrr;
269         rx_adapter->wrr_len = max_wrr_pos;
270
271         return 0;
272 }
273
274 static inline void
275 mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr,
276         struct ipv6_hdr **ipv6_hdr)
277 {
278         struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(m, struct ether_hdr *);
279         struct vlan_hdr *vlan_hdr;
280
281         *ipv4_hdr = NULL;
282         *ipv6_hdr = NULL;
283
284         switch (eth_hdr->ether_type) {
285         case RTE_BE16(ETHER_TYPE_IPv4):
286                 *ipv4_hdr = (struct ipv4_hdr *)(eth_hdr + 1);
287                 break;
288
289         case RTE_BE16(ETHER_TYPE_IPv6):
290                 *ipv6_hdr = (struct ipv6_hdr *)(eth_hdr + 1);
291                 break;
292
293         case RTE_BE16(ETHER_TYPE_VLAN):
294                 vlan_hdr = (struct vlan_hdr *)(eth_hdr + 1);
295                 switch (vlan_hdr->eth_proto) {
296                 case RTE_BE16(ETHER_TYPE_IPv4):
297                         *ipv4_hdr = (struct ipv4_hdr *)(vlan_hdr + 1);
298                         break;
299                 case RTE_BE16(ETHER_TYPE_IPv6):
300                         *ipv6_hdr = (struct ipv6_hdr *)(vlan_hdr + 1);
301                         break;
302                 default:
303                         break;
304                 }
305                 break;
306
307         default:
308                 break;
309         }
310 }
311
312 /* Calculate RSS hash for IPv4/6 */
313 static inline uint32_t
314 do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
315 {
316         uint32_t input_len;
317         void *tuple;
318         struct rte_ipv4_tuple ipv4_tuple;
319         struct rte_ipv6_tuple ipv6_tuple;
320         struct ipv4_hdr *ipv4_hdr;
321         struct ipv6_hdr *ipv6_hdr;
322
323         mtoip(m, &ipv4_hdr, &ipv6_hdr);
324
325         if (ipv4_hdr) {
326                 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
327                 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
328                 tuple = &ipv4_tuple;
329                 input_len = RTE_THASH_V4_L3_LEN;
330         } else if (ipv6_hdr) {
331                 rte_thash_load_v6_addrs(ipv6_hdr,
332                                         (union rte_thash_tuple *)&ipv6_tuple);
333                 tuple = &ipv6_tuple;
334                 input_len = RTE_THASH_V6_L3_LEN;
335         } else
336                 return 0;
337
338         return rte_softrss_be(tuple, input_len, rss_key_be);
339 }
340
341 static inline int
342 rx_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
343 {
344         return !!rx_adapter->enq_block_count;
345 }
346
347 static inline void
348 rx_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
349 {
350         if (rx_adapter->rx_enq_block_start_ts)
351                 return;
352
353         rx_adapter->enq_block_count++;
354         if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
355                 return;
356
357         rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
358 }
359
360 static inline void
361 rx_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
362                     struct rte_event_eth_rx_adapter_stats *stats)
363 {
364         if (unlikely(!stats->rx_enq_start_ts))
365                 stats->rx_enq_start_ts = rte_get_tsc_cycles();
366
367         if (likely(!rx_enq_blocked(rx_adapter)))
368                 return;
369
370         rx_adapter->enq_block_count = 0;
371         if (rx_adapter->rx_enq_block_start_ts) {
372                 stats->rx_enq_end_ts = rte_get_tsc_cycles();
373                 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
374                     rx_adapter->rx_enq_block_start_ts;
375                 rx_adapter->rx_enq_block_start_ts = 0;
376         }
377 }
378
379 /* Add event to buffer, free space check is done prior to calling
380  * this function
381  */
382 static inline void
383 buf_event_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
384                   struct rte_event *ev)
385 {
386         struct rte_eth_event_enqueue_buffer *buf =
387             &rx_adapter->event_enqueue_buffer;
388         rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event));
389 }
390
391 /* Enqueue buffered events to event device */
392 static inline uint16_t
393 flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
394 {
395         struct rte_eth_event_enqueue_buffer *buf =
396             &rx_adapter->event_enqueue_buffer;
397         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
398
399         uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
400                                         rx_adapter->event_port_id,
401                                         buf->events,
402                                         buf->count);
403         if (n != buf->count) {
404                 memmove(buf->events,
405                         &buf->events[n],
406                         (buf->count - n) * sizeof(struct rte_event));
407                 stats->rx_enq_retry++;
408         }
409
410         n ? rx_enq_block_end_ts(rx_adapter, stats) :
411                 rx_enq_block_start_ts(rx_adapter);
412
413         buf->count -= n;
414         stats->rx_enq_count += n;
415
416         return n;
417 }
418
419 static inline void
420 fill_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter,
421         uint8_t dev_id,
422         uint16_t rx_queue_id,
423         struct rte_mbuf **mbufs,
424         uint16_t num)
425 {
426         uint32_t i;
427         struct eth_device_info *eth_device_info =
428                                         &rx_adapter->eth_devices[dev_id];
429         struct eth_rx_queue_info *eth_rx_queue_info =
430                                         &eth_device_info->rx_queue[rx_queue_id];
431
432         int32_t qid = eth_rx_queue_info->event_queue_id;
433         uint8_t sched_type = eth_rx_queue_info->sched_type;
434         uint8_t priority = eth_rx_queue_info->priority;
435         uint32_t flow_id;
436         struct rte_event events[BATCH_SIZE];
437         struct rte_mbuf *m = mbufs[0];
438         uint32_t rss_mask;
439         uint32_t rss;
440         int do_rss;
441         uint64_t ts;
442
443         /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
444         rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
445         do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
446
447         if ((m->ol_flags & PKT_RX_TIMESTAMP) == 0) {
448                 ts = rte_get_tsc_cycles();
449                 for (i = 0; i < num; i++) {
450                         m = mbufs[i];
451
452                         m->timestamp = ts;
453                         m->ol_flags |= PKT_RX_TIMESTAMP;
454                 }
455         }
456
457         for (i = 0; i < num; i++) {
458                 m = mbufs[i];
459                 struct rte_event *ev = &events[i];
460
461                 rss = do_rss ?
462                         do_softrss(m, rx_adapter->rss_key_be) : m->hash.rss;
463                 flow_id =
464                     eth_rx_queue_info->flow_id &
465                                 eth_rx_queue_info->flow_id_mask;
466                 flow_id |= rss & ~eth_rx_queue_info->flow_id_mask;
467                 ev->flow_id = flow_id;
468                 ev->op = RTE_EVENT_OP_NEW;
469                 ev->sched_type = sched_type;
470                 ev->queue_id = qid;
471                 ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
472                 ev->sub_event_type = 0;
473                 ev->priority = priority;
474                 ev->mbuf = m;
475
476                 buf_event_enqueue(rx_adapter, ev);
477         }
478 }
479
480 /*
481  * Polls receive queues added to the event adapter and enqueues received
482  * packets to the event device.
483  *
484  * The receive code enqueues initially to a temporary buffer, the
485  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
486  *
487  * If there isn't space available in the temporary buffer, packets from the
488  * Rx queue aren't dequeued from the eth device, this back pressures the
489  * eth device, in virtual device environments this back pressure is relayed to
490  * the hypervisor's switching layer where adjustments can be made to deal with
491  * it.
492  */
493 static inline uint32_t
494 eth_rx_poll(struct rte_event_eth_rx_adapter *rx_adapter)
495 {
496         uint32_t num_queue;
497         uint16_t n;
498         uint32_t nb_rx = 0;
499         struct rte_mbuf *mbufs[BATCH_SIZE];
500         struct rte_eth_event_enqueue_buffer *buf;
501         uint32_t wrr_pos;
502         uint32_t max_nb_rx;
503
504         wrr_pos = rx_adapter->wrr_pos;
505         max_nb_rx = rx_adapter->max_nb_rx;
506         buf = &rx_adapter->event_enqueue_buffer;
507         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
508
509         /* Iterate through a WRR sequence */
510         for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
511                 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
512                 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
513                 uint8_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
514
515                 /* Don't do a batch dequeue from the rx queue if there isn't
516                  * enough space in the enqueue buffer.
517                  */
518                 if (buf->count >= BATCH_SIZE)
519                         flush_event_buffer(rx_adapter);
520                 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count))
521                         break;
522
523                 stats->rx_poll_count++;
524                 n = rte_eth_rx_burst(d, qid, mbufs, BATCH_SIZE);
525
526                 if (n) {
527                         stats->rx_packets += n;
528                         /* The check before rte_eth_rx_burst() ensures that
529                          * all n mbufs can be buffered
530                          */
531                         fill_event_buffer(rx_adapter, d, qid, mbufs, n);
532                         nb_rx += n;
533                         if (nb_rx > max_nb_rx) {
534                                 rx_adapter->wrr_pos =
535                                     (wrr_pos + 1) % rx_adapter->wrr_len;
536                                 return nb_rx;
537                         }
538                 }
539
540                 if (++wrr_pos == rx_adapter->wrr_len)
541                         wrr_pos = 0;
542         }
543
544         return nb_rx;
545 }
546
547 static int
548 event_eth_rx_adapter_service_func(void *args)
549 {
550         struct rte_event_eth_rx_adapter *rx_adapter = args;
551         struct rte_eth_event_enqueue_buffer *buf;
552
553         buf = &rx_adapter->event_enqueue_buffer;
554         if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
555                 return 0;
556         if (eth_rx_poll(rx_adapter) == 0 && buf->count)
557                 flush_event_buffer(rx_adapter);
558         rte_spinlock_unlock(&rx_adapter->rx_lock);
559         return 0;
560 }
561
562 static int
563 rte_event_eth_rx_adapter_init(void)
564 {
565         const char *name = "rte_event_eth_rx_adapter_array";
566         const struct rte_memzone *mz;
567         unsigned int sz;
568
569         sz = sizeof(*event_eth_rx_adapter) *
570             RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
571         sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
572
573         mz = rte_memzone_lookup(name);
574         if (mz == NULL) {
575                 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
576                                                  RTE_CACHE_LINE_SIZE);
577                 if (mz == NULL) {
578                         RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
579                                         PRId32, rte_errno);
580                         return -rte_errno;
581                 }
582         }
583
584         event_eth_rx_adapter = mz->addr;
585         return 0;
586 }
587
588 static inline struct rte_event_eth_rx_adapter *
589 id_to_rx_adapter(uint8_t id)
590 {
591         return event_eth_rx_adapter ?
592                 event_eth_rx_adapter[id] : NULL;
593 }
594
595 static int
596 default_conf_cb(uint8_t id, uint8_t dev_id,
597                 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
598 {
599         int ret;
600         struct rte_eventdev *dev;
601         struct rte_event_dev_config dev_conf;
602         int started;
603         uint8_t port_id;
604         struct rte_event_port_conf *port_conf = arg;
605         struct rte_event_eth_rx_adapter *rx_adapter = id_to_rx_adapter(id);
606
607         dev = &rte_eventdevs[rx_adapter->eventdev_id];
608         dev_conf = dev->data->dev_conf;
609
610         started = dev->data->dev_started;
611         if (started)
612                 rte_event_dev_stop(dev_id);
613         port_id = dev_conf.nb_event_ports;
614         dev_conf.nb_event_ports += 1;
615         ret = rte_event_dev_configure(dev_id, &dev_conf);
616         if (ret) {
617                 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
618                                                 dev_id);
619                 if (started) {
620                         if (rte_event_dev_start(dev_id))
621                                 return -EIO;
622                 }
623                 return ret;
624         }
625
626         ret = rte_event_port_setup(dev_id, port_id, port_conf);
627         if (ret) {
628                 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
629                                         port_id);
630                 return ret;
631         }
632
633         conf->event_port_id = port_id;
634         conf->max_nb_rx = 128;
635         if (started)
636                 ret = rte_event_dev_start(dev_id);
637         rx_adapter->default_cb_arg = 1;
638         return ret;
639 }
640
641 static int
642 init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
643 {
644         int ret;
645         struct rte_service_spec service;
646         struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
647
648         if (rx_adapter->service_inited)
649                 return 0;
650
651         memset(&service, 0, sizeof(service));
652         snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
653                 "rte_event_eth_rx_adapter_%d", id);
654         service.socket_id = rx_adapter->socket_id;
655         service.callback = event_eth_rx_adapter_service_func;
656         service.callback_userdata = rx_adapter;
657         /* Service function handles locking for queue add/del updates */
658         service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
659         ret = rte_service_component_register(&service, &rx_adapter->service_id);
660         if (ret) {
661                 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
662                         service.name, ret);
663                 return ret;
664         }
665
666         ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
667                 &rx_adapter_conf, rx_adapter->conf_arg);
668         if (ret) {
669                 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
670                         ret);
671                 goto err_done;
672         }
673         rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
674         rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
675         rx_adapter->service_inited = 1;
676         return 0;
677
678 err_done:
679         rte_service_component_unregister(rx_adapter->service_id);
680         return ret;
681 }
682
683
684 static void
685 update_queue_info(struct rte_event_eth_rx_adapter *rx_adapter,
686                 struct eth_device_info *dev_info,
687                 int32_t rx_queue_id,
688                 uint8_t add)
689 {
690         struct eth_rx_queue_info *queue_info;
691         int enabled;
692         uint16_t i;
693
694         if (dev_info->rx_queue == NULL)
695                 return;
696
697         if (rx_queue_id == -1) {
698                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
699                         update_queue_info(rx_adapter, dev_info, i, add);
700         } else {
701                 queue_info = &dev_info->rx_queue[rx_queue_id];
702                 enabled = queue_info->queue_enabled;
703                 if (add) {
704                         rx_adapter->nb_queues += !enabled;
705                         dev_info->nb_dev_queues += !enabled;
706                 } else {
707                         rx_adapter->nb_queues -= enabled;
708                         dev_info->nb_dev_queues -= enabled;
709                 }
710                 queue_info->queue_enabled = !!add;
711         }
712 }
713
714 static int
715 event_eth_rx_adapter_queue_del(struct rte_event_eth_rx_adapter *rx_adapter,
716                             struct eth_device_info *dev_info,
717                             uint16_t rx_queue_id)
718 {
719         struct eth_rx_queue_info *queue_info;
720
721         if (rx_adapter->nb_queues == 0)
722                 return 0;
723
724         queue_info = &dev_info->rx_queue[rx_queue_id];
725         rx_adapter->num_rx_polled -= queue_info->queue_enabled;
726         update_queue_info(rx_adapter, dev_info, rx_queue_id, 0);
727         return 0;
728 }
729
730 static void
731 event_eth_rx_adapter_queue_add(struct rte_event_eth_rx_adapter *rx_adapter,
732                 struct eth_device_info *dev_info,
733                 uint16_t rx_queue_id,
734                 const struct rte_event_eth_rx_adapter_queue_conf *conf)
735
736 {
737         struct eth_rx_queue_info *queue_info;
738         const struct rte_event *ev = &conf->ev;
739
740         queue_info = &dev_info->rx_queue[rx_queue_id];
741         queue_info->event_queue_id = ev->queue_id;
742         queue_info->sched_type = ev->sched_type;
743         queue_info->priority = ev->priority;
744         queue_info->wt = conf->servicing_weight;
745
746         if (conf->rx_queue_flags &
747                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
748                 queue_info->flow_id = ev->flow_id;
749                 queue_info->flow_id_mask = ~0;
750         }
751
752         /* The same queue can be added more than once */
753         rx_adapter->num_rx_polled += !queue_info->queue_enabled;
754         update_queue_info(rx_adapter, dev_info, rx_queue_id, 1);
755 }
756
757 static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
758                 uint8_t eth_dev_id,
759                 int rx_queue_id,
760                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
761 {
762         struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
763         struct rte_event_eth_rx_adapter_queue_conf temp_conf;
764         uint32_t i;
765         int ret;
766
767         if (queue_conf->servicing_weight == 0) {
768
769                 struct rte_eth_dev_data *data = dev_info->dev->data;
770                 if (data->dev_conf.intr_conf.rxq) {
771                         RTE_EDEV_LOG_ERR("Interrupt driven queues"
772                                         " not supported");
773                         return -ENOTSUP;
774                 }
775                 temp_conf = *queue_conf;
776
777                 /* If Rx interrupts are disabled set wt = 1 */
778                 temp_conf.servicing_weight = 1;
779                 queue_conf = &temp_conf;
780         }
781
782         if (dev_info->rx_queue == NULL) {
783                 dev_info->rx_queue =
784                     rte_zmalloc_socket(rx_adapter->mem_name,
785                                        dev_info->dev->data->nb_rx_queues *
786                                        sizeof(struct eth_rx_queue_info), 0,
787                                        rx_adapter->socket_id);
788                 if (dev_info->rx_queue == NULL)
789                         return -ENOMEM;
790         }
791
792         if (rx_queue_id == -1) {
793                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
794                         event_eth_rx_adapter_queue_add(rx_adapter,
795                                                 dev_info, i,
796                                                 queue_conf);
797         } else {
798                 event_eth_rx_adapter_queue_add(rx_adapter, dev_info,
799                                           (uint16_t)rx_queue_id,
800                                           queue_conf);
801         }
802
803         ret = eth_poll_wrr_calc(rx_adapter);
804         if (ret) {
805                 event_eth_rx_adapter_queue_del(rx_adapter,
806                                         dev_info, rx_queue_id);
807                 return ret;
808         }
809
810         return ret;
811 }
812
813 static int
814 rx_adapter_ctrl(uint8_t id, int start)
815 {
816         struct rte_event_eth_rx_adapter *rx_adapter;
817         struct rte_eventdev *dev;
818         struct eth_device_info *dev_info;
819         uint32_t i;
820         int use_service = 0;
821         int stop = !start;
822
823         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
824         rx_adapter = id_to_rx_adapter(id);
825         if (rx_adapter == NULL)
826                 return -EINVAL;
827
828         dev = &rte_eventdevs[rx_adapter->eventdev_id];
829
830         RTE_ETH_FOREACH_DEV(i) {
831                 dev_info = &rx_adapter->eth_devices[i];
832                 /* if start  check for num dev queues */
833                 if (start && !dev_info->nb_dev_queues)
834                         continue;
835                 /* if stop check if dev has been started */
836                 if (stop && !dev_info->dev_rx_started)
837                         continue;
838                 use_service |= !dev_info->internal_event_port;
839                 dev_info->dev_rx_started = start;
840                 if (dev_info->internal_event_port == 0)
841                         continue;
842                 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
843                                                 &rte_eth_devices[i]) :
844                         (*dev->dev_ops->eth_rx_adapter_stop)(dev,
845                                                 &rte_eth_devices[i]);
846         }
847
848         if (use_service)
849                 rte_service_runstate_set(rx_adapter->service_id, start);
850
851         return 0;
852 }
853
854 int
855 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
856                                 rte_event_eth_rx_adapter_conf_cb conf_cb,
857                                 void *conf_arg)
858 {
859         struct rte_event_eth_rx_adapter *rx_adapter;
860         int ret;
861         int socket_id;
862         uint8_t i;
863         char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
864         const uint8_t default_rss_key[] = {
865                 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
866                 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
867                 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
868                 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
869                 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
870         };
871
872         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
873         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
874         if (conf_cb == NULL)
875                 return -EINVAL;
876
877         if (event_eth_rx_adapter == NULL) {
878                 ret = rte_event_eth_rx_adapter_init();
879                 if (ret)
880                         return ret;
881         }
882
883         rx_adapter = id_to_rx_adapter(id);
884         if (rx_adapter != NULL) {
885                 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
886                 return -EEXIST;
887         }
888
889         socket_id = rte_event_dev_socket_id(dev_id);
890         snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
891                 "rte_event_eth_rx_adapter_%d",
892                 id);
893
894         rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
895                         RTE_CACHE_LINE_SIZE, socket_id);
896         if (rx_adapter == NULL) {
897                 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
898                 return -ENOMEM;
899         }
900
901         rx_adapter->eventdev_id = dev_id;
902         rx_adapter->socket_id = socket_id;
903         rx_adapter->conf_cb = conf_cb;
904         rx_adapter->conf_arg = conf_arg;
905         strcpy(rx_adapter->mem_name, mem_name);
906         rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
907                                         /* FIXME: incompatible with hotplug */
908                                         rte_eth_dev_count_total() *
909                                         sizeof(struct eth_device_info), 0,
910                                         socket_id);
911         rte_convert_rss_key((const uint32_t *)default_rss_key,
912                         (uint32_t *)rx_adapter->rss_key_be,
913                             RTE_DIM(default_rss_key));
914
915         if (rx_adapter->eth_devices == NULL) {
916                 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
917                 rte_free(rx_adapter);
918                 return -ENOMEM;
919         }
920         rte_spinlock_init(&rx_adapter->rx_lock);
921         RTE_ETH_FOREACH_DEV(i)
922                 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
923
924         event_eth_rx_adapter[id] = rx_adapter;
925         if (conf_cb == default_conf_cb)
926                 rx_adapter->default_cb_arg = 1;
927         return 0;
928 }
929
930 int
931 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
932                 struct rte_event_port_conf *port_config)
933 {
934         struct rte_event_port_conf *pc;
935         int ret;
936
937         if (port_config == NULL)
938                 return -EINVAL;
939         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
940
941         pc = rte_malloc(NULL, sizeof(*pc), 0);
942         if (pc == NULL)
943                 return -ENOMEM;
944         *pc = *port_config;
945         ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
946                                         default_conf_cb,
947                                         pc);
948         if (ret)
949                 rte_free(pc);
950         return ret;
951 }
952
953 int
954 rte_event_eth_rx_adapter_free(uint8_t id)
955 {
956         struct rte_event_eth_rx_adapter *rx_adapter;
957
958         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
959
960         rx_adapter = id_to_rx_adapter(id);
961         if (rx_adapter == NULL)
962                 return -EINVAL;
963
964         if (rx_adapter->nb_queues) {
965                 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
966                                 rx_adapter->nb_queues);
967                 return -EBUSY;
968         }
969
970         if (rx_adapter->default_cb_arg)
971                 rte_free(rx_adapter->conf_arg);
972         rte_free(rx_adapter->eth_devices);
973         rte_free(rx_adapter);
974         event_eth_rx_adapter[id] = NULL;
975
976         return 0;
977 }
978
979 int
980 rte_event_eth_rx_adapter_queue_add(uint8_t id,
981                 uint8_t eth_dev_id,
982                 int32_t rx_queue_id,
983                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
984 {
985         int ret;
986         uint32_t cap;
987         struct rte_event_eth_rx_adapter *rx_adapter;
988         struct rte_eventdev *dev;
989         struct eth_device_info *dev_info;
990         int start_service;
991
992         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
993         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
994
995         rx_adapter = id_to_rx_adapter(id);
996         if ((rx_adapter == NULL) || (queue_conf == NULL))
997                 return -EINVAL;
998
999         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1000         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
1001                                                 eth_dev_id,
1002                                                 &cap);
1003         if (ret) {
1004                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
1005                         "eth port %" PRIu8, id, eth_dev_id);
1006                 return ret;
1007         }
1008
1009         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
1010                 && (queue_conf->rx_queue_flags &
1011                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
1012                 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
1013                                 " eth port: %" PRIu8 " adapter id: %" PRIu8,
1014                                 eth_dev_id, id);
1015                 return -EINVAL;
1016         }
1017
1018         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
1019                 (rx_queue_id != -1)) {
1020                 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
1021                         "event queue id %u eth port %u", id, eth_dev_id);
1022                 return -EINVAL;
1023         }
1024
1025         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1026                         rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1027                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1028                          (uint16_t)rx_queue_id);
1029                 return -EINVAL;
1030         }
1031
1032         start_service = 0;
1033         dev_info = &rx_adapter->eth_devices[eth_dev_id];
1034
1035         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1036                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
1037                                         -ENOTSUP);
1038                 if (dev_info->rx_queue == NULL) {
1039                         dev_info->rx_queue =
1040                             rte_zmalloc_socket(rx_adapter->mem_name,
1041                                         dev_info->dev->data->nb_rx_queues *
1042                                         sizeof(struct eth_rx_queue_info), 0,
1043                                         rx_adapter->socket_id);
1044                         if (dev_info->rx_queue == NULL)
1045                                 return -ENOMEM;
1046                 }
1047
1048                 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
1049                                 &rte_eth_devices[eth_dev_id],
1050                                 rx_queue_id, queue_conf);
1051                 if (ret == 0) {
1052                         update_queue_info(rx_adapter,
1053                                         &rx_adapter->eth_devices[eth_dev_id],
1054                                         rx_queue_id,
1055                                         1);
1056                 }
1057         } else {
1058                 rte_spinlock_lock(&rx_adapter->rx_lock);
1059                 ret = init_service(rx_adapter, id);
1060                 if (ret == 0)
1061                         ret = add_rx_queue(rx_adapter, eth_dev_id, rx_queue_id,
1062                                         queue_conf);
1063                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1064                 if (ret == 0)
1065                         start_service = !!sw_rx_adapter_queue_count(rx_adapter);
1066         }
1067
1068         if (ret)
1069                 return ret;
1070
1071         if (start_service)
1072                 rte_service_component_runstate_set(rx_adapter->service_id, 1);
1073
1074         return 0;
1075 }
1076
1077 int
1078 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint8_t eth_dev_id,
1079                                 int32_t rx_queue_id)
1080 {
1081         int ret = 0;
1082         struct rte_eventdev *dev;
1083         struct rte_event_eth_rx_adapter *rx_adapter;
1084         struct eth_device_info *dev_info;
1085         uint32_t cap;
1086         uint16_t i;
1087
1088         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1089         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1090
1091         rx_adapter = id_to_rx_adapter(id);
1092         if (rx_adapter == NULL)
1093                 return -EINVAL;
1094
1095         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1096         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
1097                                                 eth_dev_id,
1098                                                 &cap);
1099         if (ret)
1100                 return ret;
1101
1102         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1103                 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1104                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1105                          (uint16_t)rx_queue_id);
1106                 return -EINVAL;
1107         }
1108
1109         dev_info = &rx_adapter->eth_devices[eth_dev_id];
1110
1111         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1112                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
1113                                  -ENOTSUP);
1114                 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
1115                                                 &rte_eth_devices[eth_dev_id],
1116                                                 rx_queue_id);
1117                 if (ret == 0) {
1118                         update_queue_info(rx_adapter,
1119                                         &rx_adapter->eth_devices[eth_dev_id],
1120                                         rx_queue_id,
1121                                         0);
1122                         if (dev_info->nb_dev_queues == 0) {
1123                                 rte_free(dev_info->rx_queue);
1124                                 dev_info->rx_queue = NULL;
1125                         }
1126                 }
1127         } else {
1128                 int rc;
1129                 rte_spinlock_lock(&rx_adapter->rx_lock);
1130                 if (rx_queue_id == -1) {
1131                         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1132                                 event_eth_rx_adapter_queue_del(rx_adapter,
1133                                                         dev_info,
1134                                                         i);
1135                 } else {
1136                         event_eth_rx_adapter_queue_del(rx_adapter,
1137                                                 dev_info,
1138                                                 (uint16_t)rx_queue_id);
1139                 }
1140
1141                 rc = eth_poll_wrr_calc(rx_adapter);
1142                 if (rc)
1143                         RTE_EDEV_LOG_ERR("WRR recalculation failed %" PRId32,
1144                                         rc);
1145
1146                 if (dev_info->nb_dev_queues == 0) {
1147                         rte_free(dev_info->rx_queue);
1148                         dev_info->rx_queue = NULL;
1149                 }
1150
1151                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1152                 rte_service_component_runstate_set(rx_adapter->service_id,
1153                                 sw_rx_adapter_queue_count(rx_adapter));
1154         }
1155
1156         return ret;
1157 }
1158
1159
1160 int
1161 rte_event_eth_rx_adapter_start(uint8_t id)
1162 {
1163         return rx_adapter_ctrl(id, 1);
1164 }
1165
1166 int
1167 rte_event_eth_rx_adapter_stop(uint8_t id)
1168 {
1169         return rx_adapter_ctrl(id, 0);
1170 }
1171
1172 int
1173 rte_event_eth_rx_adapter_stats_get(uint8_t id,
1174                                struct rte_event_eth_rx_adapter_stats *stats)
1175 {
1176         struct rte_event_eth_rx_adapter *rx_adapter;
1177         struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
1178         struct rte_event_eth_rx_adapter_stats dev_stats;
1179         struct rte_eventdev *dev;
1180         struct eth_device_info *dev_info;
1181         uint32_t i;
1182         int ret;
1183
1184         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1185
1186         rx_adapter = id_to_rx_adapter(id);
1187         if (rx_adapter  == NULL || stats == NULL)
1188                 return -EINVAL;
1189
1190         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1191         memset(stats, 0, sizeof(*stats));
1192         RTE_ETH_FOREACH_DEV(i) {
1193                 dev_info = &rx_adapter->eth_devices[i];
1194                 if (dev_info->internal_event_port == 0 ||
1195                         dev->dev_ops->eth_rx_adapter_stats_get == NULL)
1196                         continue;
1197                 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
1198                                                 &rte_eth_devices[i],
1199                                                 &dev_stats);
1200                 if (ret)
1201                         continue;
1202                 dev_stats_sum.rx_packets += dev_stats.rx_packets;
1203                 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
1204         }
1205
1206         if (rx_adapter->service_inited)
1207                 *stats = rx_adapter->stats;
1208
1209         stats->rx_packets += dev_stats_sum.rx_packets;
1210         stats->rx_enq_count += dev_stats_sum.rx_enq_count;
1211         return 0;
1212 }
1213
1214 int
1215 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
1216 {
1217         struct rte_event_eth_rx_adapter *rx_adapter;
1218         struct rte_eventdev *dev;
1219         struct eth_device_info *dev_info;
1220         uint32_t i;
1221
1222         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1223
1224         rx_adapter = id_to_rx_adapter(id);
1225         if (rx_adapter == NULL)
1226                 return -EINVAL;
1227
1228         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1229         RTE_ETH_FOREACH_DEV(i) {
1230                 dev_info = &rx_adapter->eth_devices[i];
1231                 if (dev_info->internal_event_port == 0 ||
1232                         dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
1233                         continue;
1234                 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
1235                                                         &rte_eth_devices[i]);
1236         }
1237
1238         memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
1239         return 0;
1240 }
1241
1242 int
1243 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
1244 {
1245         struct rte_event_eth_rx_adapter *rx_adapter;
1246
1247         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1248
1249         rx_adapter = id_to_rx_adapter(id);
1250         if (rx_adapter == NULL || service_id == NULL)
1251                 return -EINVAL;
1252
1253         if (rx_adapter->service_inited)
1254                 *service_id = rx_adapter->service_id;
1255
1256         return rx_adapter->service_inited ? 0 : -ESRCH;
1257 }