eventdev: fix internal port logic in Rx adapter
[dpdk.git] / lib / librte_eventdev / rte_event_eth_rx_adapter.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #include <rte_cycles.h>
6 #include <rte_common.h>
7 #include <rte_dev.h>
8 #include <rte_errno.h>
9 #include <rte_ethdev.h>
10 #include <rte_log.h>
11 #include <rte_malloc.h>
12 #include <rte_service_component.h>
13 #include <rte_thash.h>
14
15 #include "rte_eventdev.h"
16 #include "rte_eventdev_pmd.h"
17 #include "rte_event_eth_rx_adapter.h"
18
19 #define BATCH_SIZE              32
20 #define BLOCK_CNT_THRESHOLD     10
21 #define ETH_EVENT_BUFFER_SIZE   (4*BATCH_SIZE)
22
23 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
24 #define ETH_RX_ADAPTER_MEM_NAME_LEN     32
25
26 #define RSS_KEY_SIZE    40
27
28 /*
29  * There is an instance of this struct per polled Rx queue added to the
30  * adapter
31  */
32 struct eth_rx_poll_entry {
33         /* Eth port to poll */
34         uint16_t eth_dev_id;
35         /* Eth rx queue to poll */
36         uint16_t eth_rx_qid;
37 };
38
39 /* Instance per adapter */
40 struct rte_eth_event_enqueue_buffer {
41         /* Count of events in this buffer */
42         uint16_t count;
43         /* Array of events in this buffer */
44         struct rte_event events[ETH_EVENT_BUFFER_SIZE];
45 };
46
47 struct rte_event_eth_rx_adapter {
48         /* RSS key */
49         uint8_t rss_key_be[RSS_KEY_SIZE];
50         /* Event device identifier */
51         uint8_t eventdev_id;
52         /* Per ethernet device structure */
53         struct eth_device_info *eth_devices;
54         /* Event port identifier */
55         uint8_t event_port_id;
56         /* Lock to serialize config updates with service function */
57         rte_spinlock_t rx_lock;
58         /* Max mbufs processed in any service function invocation */
59         uint32_t max_nb_rx;
60         /* Receive queues that need to be polled */
61         struct eth_rx_poll_entry *eth_rx_poll;
62         /* Size of the eth_rx_poll array */
63         uint16_t num_rx_polled;
64         /* Weighted round robin schedule */
65         uint32_t *wrr_sched;
66         /* wrr_sched[] size */
67         uint32_t wrr_len;
68         /* Next entry in wrr[] to begin polling */
69         uint32_t wrr_pos;
70         /* Event burst buffer */
71         struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
72         /* Per adapter stats */
73         struct rte_event_eth_rx_adapter_stats stats;
74         /* Block count, counts up to BLOCK_CNT_THRESHOLD */
75         uint16_t enq_block_count;
76         /* Block start ts */
77         uint64_t rx_enq_block_start_ts;
78         /* Configuration callback for rte_service configuration */
79         rte_event_eth_rx_adapter_conf_cb conf_cb;
80         /* Configuration callback argument */
81         void *conf_arg;
82         /* Set if  default_cb is being used */
83         int default_cb_arg;
84         /* Service initialization state */
85         uint8_t service_inited;
86         /* Total count of Rx queues in adapter */
87         uint32_t nb_queues;
88         /* Memory allocation name */
89         char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
90         /* Socket identifier cached from eventdev */
91         int socket_id;
92         /* Per adapter EAL service */
93         uint32_t service_id;
94 } __rte_cache_aligned;
95
96 /* Per eth device */
97 struct eth_device_info {
98         struct rte_eth_dev *dev;
99         struct eth_rx_queue_info *rx_queue;
100         /* Set if ethdev->eventdev packet transfer uses a
101          * hardware mechanism
102          */
103         uint8_t internal_event_port;
104         /* Set if the adapter is processing rx queues for
105          * this eth device and packet processing has been
106          * started, allows for the code to know if the PMD
107          * rx_adapter_stop callback needs to be invoked
108          */
109         uint8_t dev_rx_started;
110         /* If nb_dev_queues > 0, the start callback will
111          * be invoked if not already invoked
112          */
113         uint16_t nb_dev_queues;
114 };
115
116 /* Per Rx queue */
117 struct eth_rx_queue_info {
118         int queue_enabled;      /* True if added */
119         uint16_t wt;            /* Polling weight */
120         uint8_t event_queue_id; /* Event queue to enqueue packets to */
121         uint8_t sched_type;     /* Sched type for events */
122         uint8_t priority;       /* Event priority */
123         uint32_t flow_id;       /* App provided flow identifier */
124         uint32_t flow_id_mask;  /* Set to ~0 if app provides flow id else 0 */
125 };
126
127 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
128
129 static inline int
130 valid_id(uint8_t id)
131 {
132         return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
133 }
134
135 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
136         if (!valid_id(id)) { \
137                 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
138                 return retval; \
139         } \
140 } while (0)
141
142 static inline int
143 sw_rx_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
144 {
145         return rx_adapter->num_rx_polled;
146 }
147
148 /* Greatest common divisor */
149 static uint16_t gcd_u16(uint16_t a, uint16_t b)
150 {
151         uint16_t r = a % b;
152
153         return r ? gcd_u16(b, r) : b;
154 }
155
156 /* Returns the next queue in the polling sequence
157  *
158  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
159  */
160 static int
161 wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
162          unsigned int n, int *cw,
163          struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
164          uint16_t gcd, int prev)
165 {
166         int i = prev;
167         uint16_t w;
168
169         while (1) {
170                 uint16_t q;
171                 uint16_t d;
172
173                 i = (i + 1) % n;
174                 if (i == 0) {
175                         *cw = *cw - gcd;
176                         if (*cw <= 0)
177                                 *cw = max_wt;
178                 }
179
180                 q = eth_rx_poll[i].eth_rx_qid;
181                 d = eth_rx_poll[i].eth_dev_id;
182                 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
183
184                 if ((int)w >= *cw)
185                         return i;
186         }
187 }
188
189 /* Precalculate WRR polling sequence for all queues in rx_adapter */
190 static int
191 eth_poll_wrr_calc(struct rte_event_eth_rx_adapter *rx_adapter)
192 {
193         uint16_t d;
194         uint16_t q;
195         unsigned int i;
196
197         /* Initialize variables for calculation of wrr schedule */
198         uint16_t max_wrr_pos = 0;
199         unsigned int poll_q = 0;
200         uint16_t max_wt = 0;
201         uint16_t gcd = 0;
202
203         struct eth_rx_poll_entry *rx_poll = NULL;
204         uint32_t *rx_wrr = NULL;
205
206         if (rx_adapter->num_rx_polled) {
207                 size_t len = RTE_ALIGN(rx_adapter->num_rx_polled *
208                                 sizeof(*rx_adapter->eth_rx_poll),
209                                 RTE_CACHE_LINE_SIZE);
210                 rx_poll = rte_zmalloc_socket(rx_adapter->mem_name,
211                                              len,
212                                              RTE_CACHE_LINE_SIZE,
213                                              rx_adapter->socket_id);
214                 if (rx_poll == NULL)
215                         return -ENOMEM;
216
217                 /* Generate array of all queues to poll, the size of this
218                  * array is poll_q
219                  */
220                 RTE_ETH_FOREACH_DEV(d) {
221                         uint16_t nb_rx_queues;
222                         struct eth_device_info *dev_info =
223                                         &rx_adapter->eth_devices[d];
224                         nb_rx_queues = dev_info->dev->data->nb_rx_queues;
225                         if (dev_info->rx_queue == NULL)
226                                 continue;
227                         if (dev_info->internal_event_port)
228                                 continue;
229                         for (q = 0; q < nb_rx_queues; q++) {
230                                 struct eth_rx_queue_info *queue_info =
231                                         &dev_info->rx_queue[q];
232                                 if (queue_info->queue_enabled == 0)
233                                         continue;
234
235                                 uint16_t wt = queue_info->wt;
236                                 rx_poll[poll_q].eth_dev_id = d;
237                                 rx_poll[poll_q].eth_rx_qid = q;
238                                 max_wrr_pos += wt;
239                                 max_wt = RTE_MAX(max_wt, wt);
240                                 gcd = (gcd) ? gcd_u16(gcd, wt) : wt;
241                                 poll_q++;
242                         }
243                 }
244
245                 len = RTE_ALIGN(max_wrr_pos * sizeof(*rx_wrr),
246                                 RTE_CACHE_LINE_SIZE);
247                 rx_wrr = rte_zmalloc_socket(rx_adapter->mem_name,
248                                             len,
249                                             RTE_CACHE_LINE_SIZE,
250                                             rx_adapter->socket_id);
251                 if (rx_wrr == NULL) {
252                         rte_free(rx_poll);
253                         return -ENOMEM;
254                 }
255
256                 /* Generate polling sequence based on weights */
257                 int prev = -1;
258                 int cw = -1;
259                 for (i = 0; i < max_wrr_pos; i++) {
260                         rx_wrr[i] = wrr_next(rx_adapter, poll_q, &cw,
261                                              rx_poll, max_wt, gcd, prev);
262                         prev = rx_wrr[i];
263                 }
264         }
265
266         rte_free(rx_adapter->eth_rx_poll);
267         rte_free(rx_adapter->wrr_sched);
268
269         rx_adapter->eth_rx_poll = rx_poll;
270         rx_adapter->wrr_sched = rx_wrr;
271         rx_adapter->wrr_len = max_wrr_pos;
272
273         return 0;
274 }
275
276 static inline void
277 mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr,
278         struct ipv6_hdr **ipv6_hdr)
279 {
280         struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(m, struct ether_hdr *);
281         struct vlan_hdr *vlan_hdr;
282
283         *ipv4_hdr = NULL;
284         *ipv6_hdr = NULL;
285
286         switch (eth_hdr->ether_type) {
287         case RTE_BE16(ETHER_TYPE_IPv4):
288                 *ipv4_hdr = (struct ipv4_hdr *)(eth_hdr + 1);
289                 break;
290
291         case RTE_BE16(ETHER_TYPE_IPv6):
292                 *ipv6_hdr = (struct ipv6_hdr *)(eth_hdr + 1);
293                 break;
294
295         case RTE_BE16(ETHER_TYPE_VLAN):
296                 vlan_hdr = (struct vlan_hdr *)(eth_hdr + 1);
297                 switch (vlan_hdr->eth_proto) {
298                 case RTE_BE16(ETHER_TYPE_IPv4):
299                         *ipv4_hdr = (struct ipv4_hdr *)(vlan_hdr + 1);
300                         break;
301                 case RTE_BE16(ETHER_TYPE_IPv6):
302                         *ipv6_hdr = (struct ipv6_hdr *)(vlan_hdr + 1);
303                         break;
304                 default:
305                         break;
306                 }
307                 break;
308
309         default:
310                 break;
311         }
312 }
313
314 /* Calculate RSS hash for IPv4/6 */
315 static inline uint32_t
316 do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
317 {
318         uint32_t input_len;
319         void *tuple;
320         struct rte_ipv4_tuple ipv4_tuple;
321         struct rte_ipv6_tuple ipv6_tuple;
322         struct ipv4_hdr *ipv4_hdr;
323         struct ipv6_hdr *ipv6_hdr;
324
325         mtoip(m, &ipv4_hdr, &ipv6_hdr);
326
327         if (ipv4_hdr) {
328                 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
329                 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
330                 tuple = &ipv4_tuple;
331                 input_len = RTE_THASH_V4_L3_LEN;
332         } else if (ipv6_hdr) {
333                 rte_thash_load_v6_addrs(ipv6_hdr,
334                                         (union rte_thash_tuple *)&ipv6_tuple);
335                 tuple = &ipv6_tuple;
336                 input_len = RTE_THASH_V6_L3_LEN;
337         } else
338                 return 0;
339
340         return rte_softrss_be(tuple, input_len, rss_key_be);
341 }
342
343 static inline int
344 rx_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
345 {
346         return !!rx_adapter->enq_block_count;
347 }
348
349 static inline void
350 rx_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
351 {
352         if (rx_adapter->rx_enq_block_start_ts)
353                 return;
354
355         rx_adapter->enq_block_count++;
356         if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
357                 return;
358
359         rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
360 }
361
362 static inline void
363 rx_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
364                     struct rte_event_eth_rx_adapter_stats *stats)
365 {
366         if (unlikely(!stats->rx_enq_start_ts))
367                 stats->rx_enq_start_ts = rte_get_tsc_cycles();
368
369         if (likely(!rx_enq_blocked(rx_adapter)))
370                 return;
371
372         rx_adapter->enq_block_count = 0;
373         if (rx_adapter->rx_enq_block_start_ts) {
374                 stats->rx_enq_end_ts = rte_get_tsc_cycles();
375                 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
376                     rx_adapter->rx_enq_block_start_ts;
377                 rx_adapter->rx_enq_block_start_ts = 0;
378         }
379 }
380
381 /* Add event to buffer, free space check is done prior to calling
382  * this function
383  */
384 static inline void
385 buf_event_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
386                   struct rte_event *ev)
387 {
388         struct rte_eth_event_enqueue_buffer *buf =
389             &rx_adapter->event_enqueue_buffer;
390         rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event));
391 }
392
393 /* Enqueue buffered events to event device */
394 static inline uint16_t
395 flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
396 {
397         struct rte_eth_event_enqueue_buffer *buf =
398             &rx_adapter->event_enqueue_buffer;
399         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
400
401         uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
402                                         rx_adapter->event_port_id,
403                                         buf->events,
404                                         buf->count);
405         if (n != buf->count) {
406                 memmove(buf->events,
407                         &buf->events[n],
408                         (buf->count - n) * sizeof(struct rte_event));
409                 stats->rx_enq_retry++;
410         }
411
412         n ? rx_enq_block_end_ts(rx_adapter, stats) :
413                 rx_enq_block_start_ts(rx_adapter);
414
415         buf->count -= n;
416         stats->rx_enq_count += n;
417
418         return n;
419 }
420
421 static inline void
422 fill_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter,
423         uint16_t eth_dev_id,
424         uint16_t rx_queue_id,
425         struct rte_mbuf **mbufs,
426         uint16_t num)
427 {
428         uint32_t i;
429         struct eth_device_info *eth_device_info =
430                                         &rx_adapter->eth_devices[eth_dev_id];
431         struct eth_rx_queue_info *eth_rx_queue_info =
432                                         &eth_device_info->rx_queue[rx_queue_id];
433
434         int32_t qid = eth_rx_queue_info->event_queue_id;
435         uint8_t sched_type = eth_rx_queue_info->sched_type;
436         uint8_t priority = eth_rx_queue_info->priority;
437         uint32_t flow_id;
438         struct rte_event events[BATCH_SIZE];
439         struct rte_mbuf *m = mbufs[0];
440         uint32_t rss_mask;
441         uint32_t rss;
442         int do_rss;
443         uint64_t ts;
444
445         /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
446         rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
447         do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
448
449         if ((m->ol_flags & PKT_RX_TIMESTAMP) == 0) {
450                 ts = rte_get_tsc_cycles();
451                 for (i = 0; i < num; i++) {
452                         m = mbufs[i];
453
454                         m->timestamp = ts;
455                         m->ol_flags |= PKT_RX_TIMESTAMP;
456                 }
457         }
458
459         for (i = 0; i < num; i++) {
460                 m = mbufs[i];
461                 struct rte_event *ev = &events[i];
462
463                 rss = do_rss ?
464                         do_softrss(m, rx_adapter->rss_key_be) : m->hash.rss;
465                 flow_id =
466                     eth_rx_queue_info->flow_id &
467                                 eth_rx_queue_info->flow_id_mask;
468                 flow_id |= rss & ~eth_rx_queue_info->flow_id_mask;
469                 ev->flow_id = flow_id;
470                 ev->op = RTE_EVENT_OP_NEW;
471                 ev->sched_type = sched_type;
472                 ev->queue_id = qid;
473                 ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
474                 ev->sub_event_type = 0;
475                 ev->priority = priority;
476                 ev->mbuf = m;
477
478                 buf_event_enqueue(rx_adapter, ev);
479         }
480 }
481
482 /*
483  * Polls receive queues added to the event adapter and enqueues received
484  * packets to the event device.
485  *
486  * The receive code enqueues initially to a temporary buffer, the
487  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
488  *
489  * If there isn't space available in the temporary buffer, packets from the
490  * Rx queue aren't dequeued from the eth device, this back pressures the
491  * eth device, in virtual device environments this back pressure is relayed to
492  * the hypervisor's switching layer where adjustments can be made to deal with
493  * it.
494  */
495 static inline void
496 eth_rx_poll(struct rte_event_eth_rx_adapter *rx_adapter)
497 {
498         uint32_t num_queue;
499         uint16_t n;
500         uint32_t nb_rx = 0;
501         struct rte_mbuf *mbufs[BATCH_SIZE];
502         struct rte_eth_event_enqueue_buffer *buf;
503         uint32_t wrr_pos;
504         uint32_t max_nb_rx;
505
506         wrr_pos = rx_adapter->wrr_pos;
507         max_nb_rx = rx_adapter->max_nb_rx;
508         buf = &rx_adapter->event_enqueue_buffer;
509         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
510
511         /* Iterate through a WRR sequence */
512         for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
513                 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
514                 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
515                 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
516
517                 /* Don't do a batch dequeue from the rx queue if there isn't
518                  * enough space in the enqueue buffer.
519                  */
520                 if (buf->count >= BATCH_SIZE)
521                         flush_event_buffer(rx_adapter);
522                 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
523                         rx_adapter->wrr_pos = wrr_pos;
524                         return;
525                 }
526
527                 stats->rx_poll_count++;
528                 n = rte_eth_rx_burst(d, qid, mbufs, BATCH_SIZE);
529
530                 if (n) {
531                         stats->rx_packets += n;
532                         /* The check before rte_eth_rx_burst() ensures that
533                          * all n mbufs can be buffered
534                          */
535                         fill_event_buffer(rx_adapter, d, qid, mbufs, n);
536                         nb_rx += n;
537                         if (nb_rx > max_nb_rx) {
538                                 rx_adapter->wrr_pos =
539                                     (wrr_pos + 1) % rx_adapter->wrr_len;
540                                 break;
541                         }
542                 }
543
544                 if (++wrr_pos == rx_adapter->wrr_len)
545                         wrr_pos = 0;
546         }
547
548         if (buf->count >= BATCH_SIZE)
549                 flush_event_buffer(rx_adapter);
550 }
551
552 static int
553 event_eth_rx_adapter_service_func(void *args)
554 {
555         struct rte_event_eth_rx_adapter *rx_adapter = args;
556
557         if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
558                 return 0;
559         eth_rx_poll(rx_adapter);
560         rte_spinlock_unlock(&rx_adapter->rx_lock);
561         return 0;
562 }
563
564 static int
565 rte_event_eth_rx_adapter_init(void)
566 {
567         const char *name = "rte_event_eth_rx_adapter_array";
568         const struct rte_memzone *mz;
569         unsigned int sz;
570
571         sz = sizeof(*event_eth_rx_adapter) *
572             RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
573         sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
574
575         mz = rte_memzone_lookup(name);
576         if (mz == NULL) {
577                 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
578                                                  RTE_CACHE_LINE_SIZE);
579                 if (mz == NULL) {
580                         RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
581                                         PRId32, rte_errno);
582                         return -rte_errno;
583                 }
584         }
585
586         event_eth_rx_adapter = mz->addr;
587         return 0;
588 }
589
590 static inline struct rte_event_eth_rx_adapter *
591 id_to_rx_adapter(uint8_t id)
592 {
593         return event_eth_rx_adapter ?
594                 event_eth_rx_adapter[id] : NULL;
595 }
596
597 static int
598 default_conf_cb(uint8_t id, uint8_t dev_id,
599                 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
600 {
601         int ret;
602         struct rte_eventdev *dev;
603         struct rte_event_dev_config dev_conf;
604         int started;
605         uint8_t port_id;
606         struct rte_event_port_conf *port_conf = arg;
607         struct rte_event_eth_rx_adapter *rx_adapter = id_to_rx_adapter(id);
608
609         dev = &rte_eventdevs[rx_adapter->eventdev_id];
610         dev_conf = dev->data->dev_conf;
611
612         started = dev->data->dev_started;
613         if (started)
614                 rte_event_dev_stop(dev_id);
615         port_id = dev_conf.nb_event_ports;
616         dev_conf.nb_event_ports += 1;
617         ret = rte_event_dev_configure(dev_id, &dev_conf);
618         if (ret) {
619                 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
620                                                 dev_id);
621                 if (started) {
622                         if (rte_event_dev_start(dev_id))
623                                 return -EIO;
624                 }
625                 return ret;
626         }
627
628         ret = rte_event_port_setup(dev_id, port_id, port_conf);
629         if (ret) {
630                 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
631                                         port_id);
632                 return ret;
633         }
634
635         conf->event_port_id = port_id;
636         conf->max_nb_rx = 128;
637         if (started)
638                 ret = rte_event_dev_start(dev_id);
639         rx_adapter->default_cb_arg = 1;
640         return ret;
641 }
642
643 static int
644 init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
645 {
646         int ret;
647         struct rte_service_spec service;
648         struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
649
650         if (rx_adapter->service_inited)
651                 return 0;
652
653         memset(&service, 0, sizeof(service));
654         snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
655                 "rte_event_eth_rx_adapter_%d", id);
656         service.socket_id = rx_adapter->socket_id;
657         service.callback = event_eth_rx_adapter_service_func;
658         service.callback_userdata = rx_adapter;
659         /* Service function handles locking for queue add/del updates */
660         service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
661         ret = rte_service_component_register(&service, &rx_adapter->service_id);
662         if (ret) {
663                 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
664                         service.name, ret);
665                 return ret;
666         }
667
668         ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
669                 &rx_adapter_conf, rx_adapter->conf_arg);
670         if (ret) {
671                 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
672                         ret);
673                 goto err_done;
674         }
675         rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
676         rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
677         rx_adapter->service_inited = 1;
678         return 0;
679
680 err_done:
681         rte_service_component_unregister(rx_adapter->service_id);
682         return ret;
683 }
684
685
686 static void
687 update_queue_info(struct rte_event_eth_rx_adapter *rx_adapter,
688                 struct eth_device_info *dev_info,
689                 int32_t rx_queue_id,
690                 uint8_t add)
691 {
692         struct eth_rx_queue_info *queue_info;
693         int enabled;
694         uint16_t i;
695
696         if (dev_info->rx_queue == NULL)
697                 return;
698
699         if (rx_queue_id == -1) {
700                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
701                         update_queue_info(rx_adapter, dev_info, i, add);
702         } else {
703                 queue_info = &dev_info->rx_queue[rx_queue_id];
704                 enabled = queue_info->queue_enabled;
705                 if (add) {
706                         rx_adapter->nb_queues += !enabled;
707                         dev_info->nb_dev_queues += !enabled;
708                 } else {
709                         rx_adapter->nb_queues -= enabled;
710                         dev_info->nb_dev_queues -= enabled;
711                 }
712                 queue_info->queue_enabled = !!add;
713         }
714 }
715
716 static int
717 event_eth_rx_adapter_queue_del(struct rte_event_eth_rx_adapter *rx_adapter,
718                             struct eth_device_info *dev_info,
719                             uint16_t rx_queue_id)
720 {
721         struct eth_rx_queue_info *queue_info;
722
723         if (rx_adapter->nb_queues == 0)
724                 return 0;
725
726         queue_info = &dev_info->rx_queue[rx_queue_id];
727         rx_adapter->num_rx_polled -= queue_info->queue_enabled;
728         update_queue_info(rx_adapter, dev_info, rx_queue_id, 0);
729         return 0;
730 }
731
732 static void
733 event_eth_rx_adapter_queue_add(struct rte_event_eth_rx_adapter *rx_adapter,
734                 struct eth_device_info *dev_info,
735                 uint16_t rx_queue_id,
736                 const struct rte_event_eth_rx_adapter_queue_conf *conf)
737
738 {
739         struct eth_rx_queue_info *queue_info;
740         const struct rte_event *ev = &conf->ev;
741
742         queue_info = &dev_info->rx_queue[rx_queue_id];
743         queue_info->event_queue_id = ev->queue_id;
744         queue_info->sched_type = ev->sched_type;
745         queue_info->priority = ev->priority;
746         queue_info->wt = conf->servicing_weight;
747
748         if (conf->rx_queue_flags &
749                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
750                 queue_info->flow_id = ev->flow_id;
751                 queue_info->flow_id_mask = ~0;
752         }
753
754         /* The same queue can be added more than once */
755         rx_adapter->num_rx_polled += !queue_info->queue_enabled;
756         update_queue_info(rx_adapter, dev_info, rx_queue_id, 1);
757 }
758
759 static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
760                 uint16_t eth_dev_id,
761                 int rx_queue_id,
762                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
763 {
764         struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
765         struct rte_event_eth_rx_adapter_queue_conf temp_conf;
766         uint32_t i;
767         int ret;
768
769         if (queue_conf->servicing_weight == 0) {
770
771                 struct rte_eth_dev_data *data = dev_info->dev->data;
772                 if (data->dev_conf.intr_conf.rxq) {
773                         RTE_EDEV_LOG_ERR("Interrupt driven queues"
774                                         " not supported");
775                         return -ENOTSUP;
776                 }
777                 temp_conf = *queue_conf;
778
779                 /* If Rx interrupts are disabled set wt = 1 */
780                 temp_conf.servicing_weight = 1;
781                 queue_conf = &temp_conf;
782         }
783
784         if (dev_info->rx_queue == NULL) {
785                 dev_info->rx_queue =
786                     rte_zmalloc_socket(rx_adapter->mem_name,
787                                        dev_info->dev->data->nb_rx_queues *
788                                        sizeof(struct eth_rx_queue_info), 0,
789                                        rx_adapter->socket_id);
790                 if (dev_info->rx_queue == NULL)
791                         return -ENOMEM;
792         }
793
794         if (rx_queue_id == -1) {
795                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
796                         event_eth_rx_adapter_queue_add(rx_adapter,
797                                                 dev_info, i,
798                                                 queue_conf);
799         } else {
800                 event_eth_rx_adapter_queue_add(rx_adapter, dev_info,
801                                           (uint16_t)rx_queue_id,
802                                           queue_conf);
803         }
804
805         ret = eth_poll_wrr_calc(rx_adapter);
806         if (ret) {
807                 event_eth_rx_adapter_queue_del(rx_adapter,
808                                         dev_info, rx_queue_id);
809                 return ret;
810         }
811
812         return ret;
813 }
814
815 static int
816 rx_adapter_ctrl(uint8_t id, int start)
817 {
818         struct rte_event_eth_rx_adapter *rx_adapter;
819         struct rte_eventdev *dev;
820         struct eth_device_info *dev_info;
821         uint32_t i;
822         int use_service = 0;
823         int stop = !start;
824
825         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
826         rx_adapter = id_to_rx_adapter(id);
827         if (rx_adapter == NULL)
828                 return -EINVAL;
829
830         dev = &rte_eventdevs[rx_adapter->eventdev_id];
831
832         RTE_ETH_FOREACH_DEV(i) {
833                 dev_info = &rx_adapter->eth_devices[i];
834                 /* if start  check for num dev queues */
835                 if (start && !dev_info->nb_dev_queues)
836                         continue;
837                 /* if stop check if dev has been started */
838                 if (stop && !dev_info->dev_rx_started)
839                         continue;
840                 use_service |= !dev_info->internal_event_port;
841                 dev_info->dev_rx_started = start;
842                 if (dev_info->internal_event_port == 0)
843                         continue;
844                 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
845                                                 &rte_eth_devices[i]) :
846                         (*dev->dev_ops->eth_rx_adapter_stop)(dev,
847                                                 &rte_eth_devices[i]);
848         }
849
850         if (use_service)
851                 rte_service_runstate_set(rx_adapter->service_id, start);
852
853         return 0;
854 }
855
856 int
857 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
858                                 rte_event_eth_rx_adapter_conf_cb conf_cb,
859                                 void *conf_arg)
860 {
861         struct rte_event_eth_rx_adapter *rx_adapter;
862         int ret;
863         int socket_id;
864         uint16_t i;
865         char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
866         const uint8_t default_rss_key[] = {
867                 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
868                 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
869                 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
870                 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
871                 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
872         };
873
874         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
875         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
876         if (conf_cb == NULL)
877                 return -EINVAL;
878
879         if (event_eth_rx_adapter == NULL) {
880                 ret = rte_event_eth_rx_adapter_init();
881                 if (ret)
882                         return ret;
883         }
884
885         rx_adapter = id_to_rx_adapter(id);
886         if (rx_adapter != NULL) {
887                 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
888                 return -EEXIST;
889         }
890
891         socket_id = rte_event_dev_socket_id(dev_id);
892         snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
893                 "rte_event_eth_rx_adapter_%d",
894                 id);
895
896         rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
897                         RTE_CACHE_LINE_SIZE, socket_id);
898         if (rx_adapter == NULL) {
899                 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
900                 return -ENOMEM;
901         }
902
903         rx_adapter->eventdev_id = dev_id;
904         rx_adapter->socket_id = socket_id;
905         rx_adapter->conf_cb = conf_cb;
906         rx_adapter->conf_arg = conf_arg;
907         strcpy(rx_adapter->mem_name, mem_name);
908         rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
909                                         /* FIXME: incompatible with hotplug */
910                                         rte_eth_dev_count_total() *
911                                         sizeof(struct eth_device_info), 0,
912                                         socket_id);
913         rte_convert_rss_key((const uint32_t *)default_rss_key,
914                         (uint32_t *)rx_adapter->rss_key_be,
915                             RTE_DIM(default_rss_key));
916
917         if (rx_adapter->eth_devices == NULL) {
918                 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
919                 rte_free(rx_adapter);
920                 return -ENOMEM;
921         }
922         rte_spinlock_init(&rx_adapter->rx_lock);
923         RTE_ETH_FOREACH_DEV(i)
924                 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
925
926         event_eth_rx_adapter[id] = rx_adapter;
927         if (conf_cb == default_conf_cb)
928                 rx_adapter->default_cb_arg = 1;
929         return 0;
930 }
931
932 int
933 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
934                 struct rte_event_port_conf *port_config)
935 {
936         struct rte_event_port_conf *pc;
937         int ret;
938
939         if (port_config == NULL)
940                 return -EINVAL;
941         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
942
943         pc = rte_malloc(NULL, sizeof(*pc), 0);
944         if (pc == NULL)
945                 return -ENOMEM;
946         *pc = *port_config;
947         ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
948                                         default_conf_cb,
949                                         pc);
950         if (ret)
951                 rte_free(pc);
952         return ret;
953 }
954
955 int
956 rte_event_eth_rx_adapter_free(uint8_t id)
957 {
958         struct rte_event_eth_rx_adapter *rx_adapter;
959
960         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
961
962         rx_adapter = id_to_rx_adapter(id);
963         if (rx_adapter == NULL)
964                 return -EINVAL;
965
966         if (rx_adapter->nb_queues) {
967                 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
968                                 rx_adapter->nb_queues);
969                 return -EBUSY;
970         }
971
972         if (rx_adapter->default_cb_arg)
973                 rte_free(rx_adapter->conf_arg);
974         rte_free(rx_adapter->eth_devices);
975         rte_free(rx_adapter);
976         event_eth_rx_adapter[id] = NULL;
977
978         return 0;
979 }
980
981 int
982 rte_event_eth_rx_adapter_queue_add(uint8_t id,
983                 uint16_t eth_dev_id,
984                 int32_t rx_queue_id,
985                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
986 {
987         int ret;
988         uint32_t cap;
989         struct rte_event_eth_rx_adapter *rx_adapter;
990         struct rte_eventdev *dev;
991         struct eth_device_info *dev_info;
992         int start_service;
993
994         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
995         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
996
997         rx_adapter = id_to_rx_adapter(id);
998         if ((rx_adapter == NULL) || (queue_conf == NULL))
999                 return -EINVAL;
1000
1001         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1002         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
1003                                                 eth_dev_id,
1004                                                 &cap);
1005         if (ret) {
1006                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
1007                         "eth port %" PRIu16, id, eth_dev_id);
1008                 return ret;
1009         }
1010
1011         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
1012                 && (queue_conf->rx_queue_flags &
1013                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
1014                 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
1015                                 " eth port: %" PRIu16 " adapter id: %" PRIu8,
1016                                 eth_dev_id, id);
1017                 return -EINVAL;
1018         }
1019
1020         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
1021                 (rx_queue_id != -1)) {
1022                 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
1023                         "event queue, eth port: %" PRIu16 " adapter id: %"
1024                         PRIu8, eth_dev_id, id);
1025                 return -EINVAL;
1026         }
1027
1028         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1029                         rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1030                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1031                          (uint16_t)rx_queue_id);
1032                 return -EINVAL;
1033         }
1034
1035         start_service = 0;
1036         dev_info = &rx_adapter->eth_devices[eth_dev_id];
1037
1038         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1039                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
1040                                         -ENOTSUP);
1041                 if (dev_info->rx_queue == NULL) {
1042                         dev_info->rx_queue =
1043                             rte_zmalloc_socket(rx_adapter->mem_name,
1044                                         dev_info->dev->data->nb_rx_queues *
1045                                         sizeof(struct eth_rx_queue_info), 0,
1046                                         rx_adapter->socket_id);
1047                         if (dev_info->rx_queue == NULL)
1048                                 return -ENOMEM;
1049                 }
1050
1051                 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
1052                                 &rte_eth_devices[eth_dev_id],
1053                                 rx_queue_id, queue_conf);
1054                 if (ret == 0) {
1055                         dev_info->internal_event_port = 1;
1056                         update_queue_info(rx_adapter,
1057                                         &rx_adapter->eth_devices[eth_dev_id],
1058                                         rx_queue_id,
1059                                         1);
1060                 }
1061         } else {
1062                 rte_spinlock_lock(&rx_adapter->rx_lock);
1063                 dev_info->internal_event_port = 0;
1064                 ret = init_service(rx_adapter, id);
1065                 if (ret == 0)
1066                         ret = add_rx_queue(rx_adapter, eth_dev_id, rx_queue_id,
1067                                         queue_conf);
1068                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1069                 if (ret == 0)
1070                         start_service = !!sw_rx_adapter_queue_count(rx_adapter);
1071         }
1072
1073         if (ret)
1074                 return ret;
1075
1076         if (start_service)
1077                 rte_service_component_runstate_set(rx_adapter->service_id, 1);
1078
1079         return 0;
1080 }
1081
1082 int
1083 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
1084                                 int32_t rx_queue_id)
1085 {
1086         int ret = 0;
1087         struct rte_eventdev *dev;
1088         struct rte_event_eth_rx_adapter *rx_adapter;
1089         struct eth_device_info *dev_info;
1090         uint32_t cap;
1091         uint16_t i;
1092
1093         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1094         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1095
1096         rx_adapter = id_to_rx_adapter(id);
1097         if (rx_adapter == NULL)
1098                 return -EINVAL;
1099
1100         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1101         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
1102                                                 eth_dev_id,
1103                                                 &cap);
1104         if (ret)
1105                 return ret;
1106
1107         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1108                 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1109                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1110                          (uint16_t)rx_queue_id);
1111                 return -EINVAL;
1112         }
1113
1114         dev_info = &rx_adapter->eth_devices[eth_dev_id];
1115
1116         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1117                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
1118                                  -ENOTSUP);
1119                 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
1120                                                 &rte_eth_devices[eth_dev_id],
1121                                                 rx_queue_id);
1122                 if (ret == 0) {
1123                         update_queue_info(rx_adapter,
1124                                         &rx_adapter->eth_devices[eth_dev_id],
1125                                         rx_queue_id,
1126                                         0);
1127                         if (dev_info->nb_dev_queues == 0) {
1128                                 rte_free(dev_info->rx_queue);
1129                                 dev_info->rx_queue = NULL;
1130                         }
1131                 }
1132         } else {
1133                 int rc;
1134                 rte_spinlock_lock(&rx_adapter->rx_lock);
1135                 if (rx_queue_id == -1) {
1136                         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1137                                 event_eth_rx_adapter_queue_del(rx_adapter,
1138                                                         dev_info,
1139                                                         i);
1140                 } else {
1141                         event_eth_rx_adapter_queue_del(rx_adapter,
1142                                                 dev_info,
1143                                                 (uint16_t)rx_queue_id);
1144                 }
1145
1146                 rc = eth_poll_wrr_calc(rx_adapter);
1147                 if (rc)
1148                         RTE_EDEV_LOG_ERR("WRR recalculation failed %" PRId32,
1149                                         rc);
1150
1151                 if (dev_info->nb_dev_queues == 0) {
1152                         rte_free(dev_info->rx_queue);
1153                         dev_info->rx_queue = NULL;
1154                 }
1155
1156                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1157                 rte_service_component_runstate_set(rx_adapter->service_id,
1158                                 sw_rx_adapter_queue_count(rx_adapter));
1159         }
1160
1161         return ret;
1162 }
1163
1164
1165 int
1166 rte_event_eth_rx_adapter_start(uint8_t id)
1167 {
1168         return rx_adapter_ctrl(id, 1);
1169 }
1170
1171 int
1172 rte_event_eth_rx_adapter_stop(uint8_t id)
1173 {
1174         return rx_adapter_ctrl(id, 0);
1175 }
1176
1177 int
1178 rte_event_eth_rx_adapter_stats_get(uint8_t id,
1179                                struct rte_event_eth_rx_adapter_stats *stats)
1180 {
1181         struct rte_event_eth_rx_adapter *rx_adapter;
1182         struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
1183         struct rte_event_eth_rx_adapter_stats dev_stats;
1184         struct rte_eventdev *dev;
1185         struct eth_device_info *dev_info;
1186         uint32_t i;
1187         int ret;
1188
1189         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1190
1191         rx_adapter = id_to_rx_adapter(id);
1192         if (rx_adapter  == NULL || stats == NULL)
1193                 return -EINVAL;
1194
1195         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1196         memset(stats, 0, sizeof(*stats));
1197         RTE_ETH_FOREACH_DEV(i) {
1198                 dev_info = &rx_adapter->eth_devices[i];
1199                 if (dev_info->internal_event_port == 0 ||
1200                         dev->dev_ops->eth_rx_adapter_stats_get == NULL)
1201                         continue;
1202                 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
1203                                                 &rte_eth_devices[i],
1204                                                 &dev_stats);
1205                 if (ret)
1206                         continue;
1207                 dev_stats_sum.rx_packets += dev_stats.rx_packets;
1208                 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
1209         }
1210
1211         if (rx_adapter->service_inited)
1212                 *stats = rx_adapter->stats;
1213
1214         stats->rx_packets += dev_stats_sum.rx_packets;
1215         stats->rx_enq_count += dev_stats_sum.rx_enq_count;
1216         return 0;
1217 }
1218
1219 int
1220 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
1221 {
1222         struct rte_event_eth_rx_adapter *rx_adapter;
1223         struct rte_eventdev *dev;
1224         struct eth_device_info *dev_info;
1225         uint32_t i;
1226
1227         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1228
1229         rx_adapter = id_to_rx_adapter(id);
1230         if (rx_adapter == NULL)
1231                 return -EINVAL;
1232
1233         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1234         RTE_ETH_FOREACH_DEV(i) {
1235                 dev_info = &rx_adapter->eth_devices[i];
1236                 if (dev_info->internal_event_port == 0 ||
1237                         dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
1238                         continue;
1239                 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
1240                                                         &rte_eth_devices[i]);
1241         }
1242
1243         memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
1244         return 0;
1245 }
1246
1247 int
1248 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
1249 {
1250         struct rte_event_eth_rx_adapter *rx_adapter;
1251
1252         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1253
1254         rx_adapter = id_to_rx_adapter(id);
1255         if (rx_adapter == NULL || service_id == NULL)
1256                 return -EINVAL;
1257
1258         if (rx_adapter->service_inited)
1259                 *service_id = rx_adapter->service_id;
1260
1261         return rx_adapter->service_inited ? 0 : -ESRCH;
1262 }