eventdev: fix Rx SW adapter stop
[dpdk.git] / lib / librte_eventdev / rte_event_eth_rx_adapter.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation.
3  * All rights reserved.
4  */
5 #include <rte_cycles.h>
6 #include <rte_common.h>
7 #include <rte_dev.h>
8 #include <rte_errno.h>
9 #include <rte_ethdev.h>
10 #include <rte_log.h>
11 #include <rte_malloc.h>
12 #include <rte_service_component.h>
13 #include <rte_thash.h>
14
15 #include "rte_eventdev.h"
16 #include "rte_eventdev_pmd.h"
17 #include "rte_event_eth_rx_adapter.h"
18
19 #define BATCH_SIZE              32
20 #define BLOCK_CNT_THRESHOLD     10
21 #define ETH_EVENT_BUFFER_SIZE   (4*BATCH_SIZE)
22
23 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
24 #define ETH_RX_ADAPTER_MEM_NAME_LEN     32
25
26 #define RSS_KEY_SIZE    40
27
28 /*
29  * There is an instance of this struct per polled Rx queue added to the
30  * adapter
31  */
32 struct eth_rx_poll_entry {
33         /* Eth port to poll */
34         uint16_t eth_dev_id;
35         /* Eth rx queue to poll */
36         uint16_t eth_rx_qid;
37 };
38
39 /* Instance per adapter */
40 struct rte_eth_event_enqueue_buffer {
41         /* Count of events in this buffer */
42         uint16_t count;
43         /* Array of events in this buffer */
44         struct rte_event events[ETH_EVENT_BUFFER_SIZE];
45 };
46
47 struct rte_event_eth_rx_adapter {
48         /* RSS key */
49         uint8_t rss_key_be[RSS_KEY_SIZE];
50         /* Event device identifier */
51         uint8_t eventdev_id;
52         /* Per ethernet device structure */
53         struct eth_device_info *eth_devices;
54         /* Event port identifier */
55         uint8_t event_port_id;
56         /* Lock to serialize config updates with service function */
57         rte_spinlock_t rx_lock;
58         /* Max mbufs processed in any service function invocation */
59         uint32_t max_nb_rx;
60         /* Receive queues that need to be polled */
61         struct eth_rx_poll_entry *eth_rx_poll;
62         /* Size of the eth_rx_poll array */
63         uint16_t num_rx_polled;
64         /* Weighted round robin schedule */
65         uint32_t *wrr_sched;
66         /* wrr_sched[] size */
67         uint32_t wrr_len;
68         /* Next entry in wrr[] to begin polling */
69         uint32_t wrr_pos;
70         /* Event burst buffer */
71         struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
72         /* Per adapter stats */
73         struct rte_event_eth_rx_adapter_stats stats;
74         /* Block count, counts up to BLOCK_CNT_THRESHOLD */
75         uint16_t enq_block_count;
76         /* Block start ts */
77         uint64_t rx_enq_block_start_ts;
78         /* Configuration callback for rte_service configuration */
79         rte_event_eth_rx_adapter_conf_cb conf_cb;
80         /* Configuration callback argument */
81         void *conf_arg;
82         /* Set if  default_cb is being used */
83         int default_cb_arg;
84         /* Service initialization state */
85         uint8_t service_inited;
86         /* Total count of Rx queues in adapter */
87         uint32_t nb_queues;
88         /* Memory allocation name */
89         char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
90         /* Socket identifier cached from eventdev */
91         int socket_id;
92         /* Per adapter EAL service */
93         uint32_t service_id;
94         /* Adapter started flag */
95         uint8_t rxa_started;
96 } __rte_cache_aligned;
97
98 /* Per eth device */
99 struct eth_device_info {
100         struct rte_eth_dev *dev;
101         struct eth_rx_queue_info *rx_queue;
102         /* Set if ethdev->eventdev packet transfer uses a
103          * hardware mechanism
104          */
105         uint8_t internal_event_port;
106         /* Set if the adapter is processing rx queues for
107          * this eth device and packet processing has been
108          * started, allows for the code to know if the PMD
109          * rx_adapter_stop callback needs to be invoked
110          */
111         uint8_t dev_rx_started;
112         /* If nb_dev_queues > 0, the start callback will
113          * be invoked if not already invoked
114          */
115         uint16_t nb_dev_queues;
116 };
117
118 /* Per Rx queue */
119 struct eth_rx_queue_info {
120         int queue_enabled;      /* True if added */
121         uint16_t wt;            /* Polling weight */
122         uint8_t event_queue_id; /* Event queue to enqueue packets to */
123         uint8_t sched_type;     /* Sched type for events */
124         uint8_t priority;       /* Event priority */
125         uint32_t flow_id;       /* App provided flow identifier */
126         uint32_t flow_id_mask;  /* Set to ~0 if app provides flow id else 0 */
127 };
128
129 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
130
131 static inline int
132 valid_id(uint8_t id)
133 {
134         return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
135 }
136
137 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
138         if (!valid_id(id)) { \
139                 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
140                 return retval; \
141         } \
142 } while (0)
143
144 static inline int
145 sw_rx_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
146 {
147         return rx_adapter->num_rx_polled;
148 }
149
150 /* Greatest common divisor */
151 static uint16_t gcd_u16(uint16_t a, uint16_t b)
152 {
153         uint16_t r = a % b;
154
155         return r ? gcd_u16(b, r) : b;
156 }
157
158 /* Returns the next queue in the polling sequence
159  *
160  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
161  */
162 static int
163 wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
164          unsigned int n, int *cw,
165          struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
166          uint16_t gcd, int prev)
167 {
168         int i = prev;
169         uint16_t w;
170
171         while (1) {
172                 uint16_t q;
173                 uint16_t d;
174
175                 i = (i + 1) % n;
176                 if (i == 0) {
177                         *cw = *cw - gcd;
178                         if (*cw <= 0)
179                                 *cw = max_wt;
180                 }
181
182                 q = eth_rx_poll[i].eth_rx_qid;
183                 d = eth_rx_poll[i].eth_dev_id;
184                 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
185
186                 if ((int)w >= *cw)
187                         return i;
188         }
189 }
190
191 /* Precalculate WRR polling sequence for all queues in rx_adapter */
192 static int
193 eth_poll_wrr_calc(struct rte_event_eth_rx_adapter *rx_adapter)
194 {
195         uint16_t d;
196         uint16_t q;
197         unsigned int i;
198
199         /* Initialize variables for calculation of wrr schedule */
200         uint16_t max_wrr_pos = 0;
201         unsigned int poll_q = 0;
202         uint16_t max_wt = 0;
203         uint16_t gcd = 0;
204
205         struct eth_rx_poll_entry *rx_poll = NULL;
206         uint32_t *rx_wrr = NULL;
207
208         if (rx_adapter->num_rx_polled) {
209                 size_t len = RTE_ALIGN(rx_adapter->num_rx_polled *
210                                 sizeof(*rx_adapter->eth_rx_poll),
211                                 RTE_CACHE_LINE_SIZE);
212                 rx_poll = rte_zmalloc_socket(rx_adapter->mem_name,
213                                              len,
214                                              RTE_CACHE_LINE_SIZE,
215                                              rx_adapter->socket_id);
216                 if (rx_poll == NULL)
217                         return -ENOMEM;
218
219                 /* Generate array of all queues to poll, the size of this
220                  * array is poll_q
221                  */
222                 RTE_ETH_FOREACH_DEV(d) {
223                         uint16_t nb_rx_queues;
224                         struct eth_device_info *dev_info =
225                                         &rx_adapter->eth_devices[d];
226                         nb_rx_queues = dev_info->dev->data->nb_rx_queues;
227                         if (dev_info->rx_queue == NULL)
228                                 continue;
229                         if (dev_info->internal_event_port)
230                                 continue;
231                         for (q = 0; q < nb_rx_queues; q++) {
232                                 struct eth_rx_queue_info *queue_info =
233                                         &dev_info->rx_queue[q];
234                                 if (queue_info->queue_enabled == 0)
235                                         continue;
236
237                                 uint16_t wt = queue_info->wt;
238                                 rx_poll[poll_q].eth_dev_id = d;
239                                 rx_poll[poll_q].eth_rx_qid = q;
240                                 max_wrr_pos += wt;
241                                 max_wt = RTE_MAX(max_wt, wt);
242                                 gcd = (gcd) ? gcd_u16(gcd, wt) : wt;
243                                 poll_q++;
244                         }
245                 }
246
247                 len = RTE_ALIGN(max_wrr_pos * sizeof(*rx_wrr),
248                                 RTE_CACHE_LINE_SIZE);
249                 rx_wrr = rte_zmalloc_socket(rx_adapter->mem_name,
250                                             len,
251                                             RTE_CACHE_LINE_SIZE,
252                                             rx_adapter->socket_id);
253                 if (rx_wrr == NULL) {
254                         rte_free(rx_poll);
255                         return -ENOMEM;
256                 }
257
258                 /* Generate polling sequence based on weights */
259                 int prev = -1;
260                 int cw = -1;
261                 for (i = 0; i < max_wrr_pos; i++) {
262                         rx_wrr[i] = wrr_next(rx_adapter, poll_q, &cw,
263                                              rx_poll, max_wt, gcd, prev);
264                         prev = rx_wrr[i];
265                 }
266         }
267
268         rte_free(rx_adapter->eth_rx_poll);
269         rte_free(rx_adapter->wrr_sched);
270
271         rx_adapter->eth_rx_poll = rx_poll;
272         rx_adapter->wrr_sched = rx_wrr;
273         rx_adapter->wrr_len = max_wrr_pos;
274
275         return 0;
276 }
277
278 static inline void
279 mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr,
280         struct ipv6_hdr **ipv6_hdr)
281 {
282         struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(m, struct ether_hdr *);
283         struct vlan_hdr *vlan_hdr;
284
285         *ipv4_hdr = NULL;
286         *ipv6_hdr = NULL;
287
288         switch (eth_hdr->ether_type) {
289         case RTE_BE16(ETHER_TYPE_IPv4):
290                 *ipv4_hdr = (struct ipv4_hdr *)(eth_hdr + 1);
291                 break;
292
293         case RTE_BE16(ETHER_TYPE_IPv6):
294                 *ipv6_hdr = (struct ipv6_hdr *)(eth_hdr + 1);
295                 break;
296
297         case RTE_BE16(ETHER_TYPE_VLAN):
298                 vlan_hdr = (struct vlan_hdr *)(eth_hdr + 1);
299                 switch (vlan_hdr->eth_proto) {
300                 case RTE_BE16(ETHER_TYPE_IPv4):
301                         *ipv4_hdr = (struct ipv4_hdr *)(vlan_hdr + 1);
302                         break;
303                 case RTE_BE16(ETHER_TYPE_IPv6):
304                         *ipv6_hdr = (struct ipv6_hdr *)(vlan_hdr + 1);
305                         break;
306                 default:
307                         break;
308                 }
309                 break;
310
311         default:
312                 break;
313         }
314 }
315
316 /* Calculate RSS hash for IPv4/6 */
317 static inline uint32_t
318 do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
319 {
320         uint32_t input_len;
321         void *tuple;
322         struct rte_ipv4_tuple ipv4_tuple;
323         struct rte_ipv6_tuple ipv6_tuple;
324         struct ipv4_hdr *ipv4_hdr;
325         struct ipv6_hdr *ipv6_hdr;
326
327         mtoip(m, &ipv4_hdr, &ipv6_hdr);
328
329         if (ipv4_hdr) {
330                 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
331                 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
332                 tuple = &ipv4_tuple;
333                 input_len = RTE_THASH_V4_L3_LEN;
334         } else if (ipv6_hdr) {
335                 rte_thash_load_v6_addrs(ipv6_hdr,
336                                         (union rte_thash_tuple *)&ipv6_tuple);
337                 tuple = &ipv6_tuple;
338                 input_len = RTE_THASH_V6_L3_LEN;
339         } else
340                 return 0;
341
342         return rte_softrss_be(tuple, input_len, rss_key_be);
343 }
344
345 static inline int
346 rx_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
347 {
348         return !!rx_adapter->enq_block_count;
349 }
350
351 static inline void
352 rx_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
353 {
354         if (rx_adapter->rx_enq_block_start_ts)
355                 return;
356
357         rx_adapter->enq_block_count++;
358         if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
359                 return;
360
361         rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
362 }
363
364 static inline void
365 rx_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
366                     struct rte_event_eth_rx_adapter_stats *stats)
367 {
368         if (unlikely(!stats->rx_enq_start_ts))
369                 stats->rx_enq_start_ts = rte_get_tsc_cycles();
370
371         if (likely(!rx_enq_blocked(rx_adapter)))
372                 return;
373
374         rx_adapter->enq_block_count = 0;
375         if (rx_adapter->rx_enq_block_start_ts) {
376                 stats->rx_enq_end_ts = rte_get_tsc_cycles();
377                 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
378                     rx_adapter->rx_enq_block_start_ts;
379                 rx_adapter->rx_enq_block_start_ts = 0;
380         }
381 }
382
383 /* Add event to buffer, free space check is done prior to calling
384  * this function
385  */
386 static inline void
387 buf_event_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
388                   struct rte_event *ev)
389 {
390         struct rte_eth_event_enqueue_buffer *buf =
391             &rx_adapter->event_enqueue_buffer;
392         rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event));
393 }
394
395 /* Enqueue buffered events to event device */
396 static inline uint16_t
397 flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
398 {
399         struct rte_eth_event_enqueue_buffer *buf =
400             &rx_adapter->event_enqueue_buffer;
401         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
402
403         uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
404                                         rx_adapter->event_port_id,
405                                         buf->events,
406                                         buf->count);
407         if (n != buf->count) {
408                 memmove(buf->events,
409                         &buf->events[n],
410                         (buf->count - n) * sizeof(struct rte_event));
411                 stats->rx_enq_retry++;
412         }
413
414         n ? rx_enq_block_end_ts(rx_adapter, stats) :
415                 rx_enq_block_start_ts(rx_adapter);
416
417         buf->count -= n;
418         stats->rx_enq_count += n;
419
420         return n;
421 }
422
423 static inline void
424 fill_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter,
425         uint16_t eth_dev_id,
426         uint16_t rx_queue_id,
427         struct rte_mbuf **mbufs,
428         uint16_t num)
429 {
430         uint32_t i;
431         struct eth_device_info *eth_device_info =
432                                         &rx_adapter->eth_devices[eth_dev_id];
433         struct eth_rx_queue_info *eth_rx_queue_info =
434                                         &eth_device_info->rx_queue[rx_queue_id];
435
436         int32_t qid = eth_rx_queue_info->event_queue_id;
437         uint8_t sched_type = eth_rx_queue_info->sched_type;
438         uint8_t priority = eth_rx_queue_info->priority;
439         uint32_t flow_id;
440         struct rte_event events[BATCH_SIZE];
441         struct rte_mbuf *m = mbufs[0];
442         uint32_t rss_mask;
443         uint32_t rss;
444         int do_rss;
445         uint64_t ts;
446
447         /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
448         rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
449         do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
450
451         if ((m->ol_flags & PKT_RX_TIMESTAMP) == 0) {
452                 ts = rte_get_tsc_cycles();
453                 for (i = 0; i < num; i++) {
454                         m = mbufs[i];
455
456                         m->timestamp = ts;
457                         m->ol_flags |= PKT_RX_TIMESTAMP;
458                 }
459         }
460
461         for (i = 0; i < num; i++) {
462                 m = mbufs[i];
463                 struct rte_event *ev = &events[i];
464
465                 rss = do_rss ?
466                         do_softrss(m, rx_adapter->rss_key_be) : m->hash.rss;
467                 flow_id =
468                     eth_rx_queue_info->flow_id &
469                                 eth_rx_queue_info->flow_id_mask;
470                 flow_id |= rss & ~eth_rx_queue_info->flow_id_mask;
471                 ev->flow_id = flow_id;
472                 ev->op = RTE_EVENT_OP_NEW;
473                 ev->sched_type = sched_type;
474                 ev->queue_id = qid;
475                 ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
476                 ev->sub_event_type = 0;
477                 ev->priority = priority;
478                 ev->mbuf = m;
479
480                 buf_event_enqueue(rx_adapter, ev);
481         }
482 }
483
484 /*
485  * Polls receive queues added to the event adapter and enqueues received
486  * packets to the event device.
487  *
488  * The receive code enqueues initially to a temporary buffer, the
489  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
490  *
491  * If there isn't space available in the temporary buffer, packets from the
492  * Rx queue aren't dequeued from the eth device, this back pressures the
493  * eth device, in virtual device environments this back pressure is relayed to
494  * the hypervisor's switching layer where adjustments can be made to deal with
495  * it.
496  */
497 static inline void
498 eth_rx_poll(struct rte_event_eth_rx_adapter *rx_adapter)
499 {
500         uint32_t num_queue;
501         uint16_t n;
502         uint32_t nb_rx = 0;
503         struct rte_mbuf *mbufs[BATCH_SIZE];
504         struct rte_eth_event_enqueue_buffer *buf;
505         uint32_t wrr_pos;
506         uint32_t max_nb_rx;
507
508         wrr_pos = rx_adapter->wrr_pos;
509         max_nb_rx = rx_adapter->max_nb_rx;
510         buf = &rx_adapter->event_enqueue_buffer;
511         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
512
513         /* Iterate through a WRR sequence */
514         for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
515                 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
516                 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
517                 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
518
519                 /* Don't do a batch dequeue from the rx queue if there isn't
520                  * enough space in the enqueue buffer.
521                  */
522                 if (buf->count >= BATCH_SIZE)
523                         flush_event_buffer(rx_adapter);
524                 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
525                         rx_adapter->wrr_pos = wrr_pos;
526                         return;
527                 }
528
529                 stats->rx_poll_count++;
530                 n = rte_eth_rx_burst(d, qid, mbufs, BATCH_SIZE);
531
532                 if (n) {
533                         stats->rx_packets += n;
534                         /* The check before rte_eth_rx_burst() ensures that
535                          * all n mbufs can be buffered
536                          */
537                         fill_event_buffer(rx_adapter, d, qid, mbufs, n);
538                         nb_rx += n;
539                         if (nb_rx > max_nb_rx) {
540                                 rx_adapter->wrr_pos =
541                                     (wrr_pos + 1) % rx_adapter->wrr_len;
542                                 break;
543                         }
544                 }
545
546                 if (++wrr_pos == rx_adapter->wrr_len)
547                         wrr_pos = 0;
548         }
549
550         if (buf->count >= BATCH_SIZE)
551                 flush_event_buffer(rx_adapter);
552 }
553
554 static int
555 event_eth_rx_adapter_service_func(void *args)
556 {
557         struct rte_event_eth_rx_adapter *rx_adapter = args;
558
559         if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
560                 return 0;
561         if (!rx_adapter->rxa_started) {
562                 return 0;
563                 rte_spinlock_unlock(&rx_adapter->rx_lock);
564         }
565         eth_rx_poll(rx_adapter);
566         rte_spinlock_unlock(&rx_adapter->rx_lock);
567         return 0;
568 }
569
570 static int
571 rte_event_eth_rx_adapter_init(void)
572 {
573         const char *name = "rte_event_eth_rx_adapter_array";
574         const struct rte_memzone *mz;
575         unsigned int sz;
576
577         sz = sizeof(*event_eth_rx_adapter) *
578             RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
579         sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
580
581         mz = rte_memzone_lookup(name);
582         if (mz == NULL) {
583                 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
584                                                  RTE_CACHE_LINE_SIZE);
585                 if (mz == NULL) {
586                         RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
587                                         PRId32, rte_errno);
588                         return -rte_errno;
589                 }
590         }
591
592         event_eth_rx_adapter = mz->addr;
593         return 0;
594 }
595
596 static inline struct rte_event_eth_rx_adapter *
597 id_to_rx_adapter(uint8_t id)
598 {
599         return event_eth_rx_adapter ?
600                 event_eth_rx_adapter[id] : NULL;
601 }
602
603 static int
604 default_conf_cb(uint8_t id, uint8_t dev_id,
605                 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
606 {
607         int ret;
608         struct rte_eventdev *dev;
609         struct rte_event_dev_config dev_conf;
610         int started;
611         uint8_t port_id;
612         struct rte_event_port_conf *port_conf = arg;
613         struct rte_event_eth_rx_adapter *rx_adapter = id_to_rx_adapter(id);
614
615         dev = &rte_eventdevs[rx_adapter->eventdev_id];
616         dev_conf = dev->data->dev_conf;
617
618         started = dev->data->dev_started;
619         if (started)
620                 rte_event_dev_stop(dev_id);
621         port_id = dev_conf.nb_event_ports;
622         dev_conf.nb_event_ports += 1;
623         ret = rte_event_dev_configure(dev_id, &dev_conf);
624         if (ret) {
625                 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
626                                                 dev_id);
627                 if (started) {
628                         if (rte_event_dev_start(dev_id))
629                                 return -EIO;
630                 }
631                 return ret;
632         }
633
634         ret = rte_event_port_setup(dev_id, port_id, port_conf);
635         if (ret) {
636                 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
637                                         port_id);
638                 return ret;
639         }
640
641         conf->event_port_id = port_id;
642         conf->max_nb_rx = 128;
643         if (started)
644                 ret = rte_event_dev_start(dev_id);
645         rx_adapter->default_cb_arg = 1;
646         return ret;
647 }
648
649 static int
650 init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
651 {
652         int ret;
653         struct rte_service_spec service;
654         struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
655
656         if (rx_adapter->service_inited)
657                 return 0;
658
659         memset(&service, 0, sizeof(service));
660         snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
661                 "rte_event_eth_rx_adapter_%d", id);
662         service.socket_id = rx_adapter->socket_id;
663         service.callback = event_eth_rx_adapter_service_func;
664         service.callback_userdata = rx_adapter;
665         /* Service function handles locking for queue add/del updates */
666         service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
667         ret = rte_service_component_register(&service, &rx_adapter->service_id);
668         if (ret) {
669                 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
670                         service.name, ret);
671                 return ret;
672         }
673
674         ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
675                 &rx_adapter_conf, rx_adapter->conf_arg);
676         if (ret) {
677                 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
678                         ret);
679                 goto err_done;
680         }
681         rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
682         rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
683         rx_adapter->service_inited = 1;
684         return 0;
685
686 err_done:
687         rte_service_component_unregister(rx_adapter->service_id);
688         return ret;
689 }
690
691
692 static void
693 update_queue_info(struct rte_event_eth_rx_adapter *rx_adapter,
694                 struct eth_device_info *dev_info,
695                 int32_t rx_queue_id,
696                 uint8_t add)
697 {
698         struct eth_rx_queue_info *queue_info;
699         int enabled;
700         uint16_t i;
701
702         if (dev_info->rx_queue == NULL)
703                 return;
704
705         if (rx_queue_id == -1) {
706                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
707                         update_queue_info(rx_adapter, dev_info, i, add);
708         } else {
709                 queue_info = &dev_info->rx_queue[rx_queue_id];
710                 enabled = queue_info->queue_enabled;
711                 if (add) {
712                         rx_adapter->nb_queues += !enabled;
713                         dev_info->nb_dev_queues += !enabled;
714                 } else {
715                         rx_adapter->nb_queues -= enabled;
716                         dev_info->nb_dev_queues -= enabled;
717                 }
718                 queue_info->queue_enabled = !!add;
719         }
720 }
721
722 static int
723 event_eth_rx_adapter_queue_del(struct rte_event_eth_rx_adapter *rx_adapter,
724                             struct eth_device_info *dev_info,
725                             uint16_t rx_queue_id)
726 {
727         struct eth_rx_queue_info *queue_info;
728
729         if (rx_adapter->nb_queues == 0)
730                 return 0;
731
732         queue_info = &dev_info->rx_queue[rx_queue_id];
733         rx_adapter->num_rx_polled -= queue_info->queue_enabled;
734         update_queue_info(rx_adapter, dev_info, rx_queue_id, 0);
735         return 0;
736 }
737
738 static void
739 event_eth_rx_adapter_queue_add(struct rte_event_eth_rx_adapter *rx_adapter,
740                 struct eth_device_info *dev_info,
741                 uint16_t rx_queue_id,
742                 const struct rte_event_eth_rx_adapter_queue_conf *conf)
743
744 {
745         struct eth_rx_queue_info *queue_info;
746         const struct rte_event *ev = &conf->ev;
747
748         queue_info = &dev_info->rx_queue[rx_queue_id];
749         queue_info->event_queue_id = ev->queue_id;
750         queue_info->sched_type = ev->sched_type;
751         queue_info->priority = ev->priority;
752         queue_info->wt = conf->servicing_weight;
753
754         if (conf->rx_queue_flags &
755                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
756                 queue_info->flow_id = ev->flow_id;
757                 queue_info->flow_id_mask = ~0;
758         }
759
760         /* The same queue can be added more than once */
761         rx_adapter->num_rx_polled += !queue_info->queue_enabled;
762         update_queue_info(rx_adapter, dev_info, rx_queue_id, 1);
763 }
764
765 static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
766                 uint16_t eth_dev_id,
767                 int rx_queue_id,
768                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
769 {
770         struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
771         struct rte_event_eth_rx_adapter_queue_conf temp_conf;
772         uint32_t i;
773         int ret;
774
775         if (queue_conf->servicing_weight == 0) {
776
777                 struct rte_eth_dev_data *data = dev_info->dev->data;
778                 if (data->dev_conf.intr_conf.rxq) {
779                         RTE_EDEV_LOG_ERR("Interrupt driven queues"
780                                         " not supported");
781                         return -ENOTSUP;
782                 }
783                 temp_conf = *queue_conf;
784
785                 /* If Rx interrupts are disabled set wt = 1 */
786                 temp_conf.servicing_weight = 1;
787                 queue_conf = &temp_conf;
788         }
789
790         if (dev_info->rx_queue == NULL) {
791                 dev_info->rx_queue =
792                     rte_zmalloc_socket(rx_adapter->mem_name,
793                                        dev_info->dev->data->nb_rx_queues *
794                                        sizeof(struct eth_rx_queue_info), 0,
795                                        rx_adapter->socket_id);
796                 if (dev_info->rx_queue == NULL)
797                         return -ENOMEM;
798         }
799
800         if (rx_queue_id == -1) {
801                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
802                         event_eth_rx_adapter_queue_add(rx_adapter,
803                                                 dev_info, i,
804                                                 queue_conf);
805         } else {
806                 event_eth_rx_adapter_queue_add(rx_adapter, dev_info,
807                                           (uint16_t)rx_queue_id,
808                                           queue_conf);
809         }
810
811         ret = eth_poll_wrr_calc(rx_adapter);
812         if (ret) {
813                 event_eth_rx_adapter_queue_del(rx_adapter,
814                                         dev_info, rx_queue_id);
815                 return ret;
816         }
817
818         return ret;
819 }
820
821 static int
822 rx_adapter_ctrl(uint8_t id, int start)
823 {
824         struct rte_event_eth_rx_adapter *rx_adapter;
825         struct rte_eventdev *dev;
826         struct eth_device_info *dev_info;
827         uint32_t i;
828         int use_service = 0;
829         int stop = !start;
830
831         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
832         rx_adapter = id_to_rx_adapter(id);
833         if (rx_adapter == NULL)
834                 return -EINVAL;
835
836         dev = &rte_eventdevs[rx_adapter->eventdev_id];
837
838         RTE_ETH_FOREACH_DEV(i) {
839                 dev_info = &rx_adapter->eth_devices[i];
840                 /* if start  check for num dev queues */
841                 if (start && !dev_info->nb_dev_queues)
842                         continue;
843                 /* if stop check if dev has been started */
844                 if (stop && !dev_info->dev_rx_started)
845                         continue;
846                 use_service |= !dev_info->internal_event_port;
847                 dev_info->dev_rx_started = start;
848                 if (dev_info->internal_event_port == 0)
849                         continue;
850                 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
851                                                 &rte_eth_devices[i]) :
852                         (*dev->dev_ops->eth_rx_adapter_stop)(dev,
853                                                 &rte_eth_devices[i]);
854         }
855
856         if (use_service) {
857                 rte_spinlock_lock(&rx_adapter->rx_lock);
858                 rx_adapter->rxa_started = start;
859                 rte_service_runstate_set(rx_adapter->service_id, start);
860                 rte_spinlock_unlock(&rx_adapter->rx_lock);
861         }
862
863         return 0;
864 }
865
866 int
867 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
868                                 rte_event_eth_rx_adapter_conf_cb conf_cb,
869                                 void *conf_arg)
870 {
871         struct rte_event_eth_rx_adapter *rx_adapter;
872         int ret;
873         int socket_id;
874         uint16_t i;
875         char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
876         const uint8_t default_rss_key[] = {
877                 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
878                 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
879                 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
880                 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
881                 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
882         };
883
884         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
885         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
886         if (conf_cb == NULL)
887                 return -EINVAL;
888
889         if (event_eth_rx_adapter == NULL) {
890                 ret = rte_event_eth_rx_adapter_init();
891                 if (ret)
892                         return ret;
893         }
894
895         rx_adapter = id_to_rx_adapter(id);
896         if (rx_adapter != NULL) {
897                 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
898                 return -EEXIST;
899         }
900
901         socket_id = rte_event_dev_socket_id(dev_id);
902         snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
903                 "rte_event_eth_rx_adapter_%d",
904                 id);
905
906         rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
907                         RTE_CACHE_LINE_SIZE, socket_id);
908         if (rx_adapter == NULL) {
909                 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
910                 return -ENOMEM;
911         }
912
913         rx_adapter->eventdev_id = dev_id;
914         rx_adapter->socket_id = socket_id;
915         rx_adapter->conf_cb = conf_cb;
916         rx_adapter->conf_arg = conf_arg;
917         strcpy(rx_adapter->mem_name, mem_name);
918         rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
919                                         /* FIXME: incompatible with hotplug */
920                                         rte_eth_dev_count_total() *
921                                         sizeof(struct eth_device_info), 0,
922                                         socket_id);
923         rte_convert_rss_key((const uint32_t *)default_rss_key,
924                         (uint32_t *)rx_adapter->rss_key_be,
925                             RTE_DIM(default_rss_key));
926
927         if (rx_adapter->eth_devices == NULL) {
928                 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
929                 rte_free(rx_adapter);
930                 return -ENOMEM;
931         }
932         rte_spinlock_init(&rx_adapter->rx_lock);
933         RTE_ETH_FOREACH_DEV(i)
934                 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
935
936         event_eth_rx_adapter[id] = rx_adapter;
937         if (conf_cb == default_conf_cb)
938                 rx_adapter->default_cb_arg = 1;
939         return 0;
940 }
941
942 int
943 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
944                 struct rte_event_port_conf *port_config)
945 {
946         struct rte_event_port_conf *pc;
947         int ret;
948
949         if (port_config == NULL)
950                 return -EINVAL;
951         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
952
953         pc = rte_malloc(NULL, sizeof(*pc), 0);
954         if (pc == NULL)
955                 return -ENOMEM;
956         *pc = *port_config;
957         ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
958                                         default_conf_cb,
959                                         pc);
960         if (ret)
961                 rte_free(pc);
962         return ret;
963 }
964
965 int
966 rte_event_eth_rx_adapter_free(uint8_t id)
967 {
968         struct rte_event_eth_rx_adapter *rx_adapter;
969
970         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
971
972         rx_adapter = id_to_rx_adapter(id);
973         if (rx_adapter == NULL)
974                 return -EINVAL;
975
976         if (rx_adapter->nb_queues) {
977                 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
978                                 rx_adapter->nb_queues);
979                 return -EBUSY;
980         }
981
982         if (rx_adapter->default_cb_arg)
983                 rte_free(rx_adapter->conf_arg);
984         rte_free(rx_adapter->eth_devices);
985         rte_free(rx_adapter);
986         event_eth_rx_adapter[id] = NULL;
987
988         return 0;
989 }
990
991 int
992 rte_event_eth_rx_adapter_queue_add(uint8_t id,
993                 uint16_t eth_dev_id,
994                 int32_t rx_queue_id,
995                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
996 {
997         int ret;
998         uint32_t cap;
999         struct rte_event_eth_rx_adapter *rx_adapter;
1000         struct rte_eventdev *dev;
1001         struct eth_device_info *dev_info;
1002         int start_service;
1003
1004         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1005         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1006
1007         rx_adapter = id_to_rx_adapter(id);
1008         if ((rx_adapter == NULL) || (queue_conf == NULL))
1009                 return -EINVAL;
1010
1011         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1012         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
1013                                                 eth_dev_id,
1014                                                 &cap);
1015         if (ret) {
1016                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
1017                         "eth port %" PRIu16, id, eth_dev_id);
1018                 return ret;
1019         }
1020
1021         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
1022                 && (queue_conf->rx_queue_flags &
1023                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
1024                 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
1025                                 " eth port: %" PRIu16 " adapter id: %" PRIu8,
1026                                 eth_dev_id, id);
1027                 return -EINVAL;
1028         }
1029
1030         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
1031                 (rx_queue_id != -1)) {
1032                 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
1033                         "event queue, eth port: %" PRIu16 " adapter id: %"
1034                         PRIu8, eth_dev_id, id);
1035                 return -EINVAL;
1036         }
1037
1038         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1039                         rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1040                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1041                          (uint16_t)rx_queue_id);
1042                 return -EINVAL;
1043         }
1044
1045         start_service = 0;
1046         dev_info = &rx_adapter->eth_devices[eth_dev_id];
1047
1048         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1049                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
1050                                         -ENOTSUP);
1051                 if (dev_info->rx_queue == NULL) {
1052                         dev_info->rx_queue =
1053                             rte_zmalloc_socket(rx_adapter->mem_name,
1054                                         dev_info->dev->data->nb_rx_queues *
1055                                         sizeof(struct eth_rx_queue_info), 0,
1056                                         rx_adapter->socket_id);
1057                         if (dev_info->rx_queue == NULL)
1058                                 return -ENOMEM;
1059                 }
1060
1061                 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
1062                                 &rte_eth_devices[eth_dev_id],
1063                                 rx_queue_id, queue_conf);
1064                 if (ret == 0) {
1065                         dev_info->internal_event_port = 1;
1066                         update_queue_info(rx_adapter,
1067                                         &rx_adapter->eth_devices[eth_dev_id],
1068                                         rx_queue_id,
1069                                         1);
1070                 }
1071         } else {
1072                 rte_spinlock_lock(&rx_adapter->rx_lock);
1073                 dev_info->internal_event_port = 0;
1074                 ret = init_service(rx_adapter, id);
1075                 if (ret == 0)
1076                         ret = add_rx_queue(rx_adapter, eth_dev_id, rx_queue_id,
1077                                         queue_conf);
1078                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1079                 if (ret == 0)
1080                         start_service = !!sw_rx_adapter_queue_count(rx_adapter);
1081         }
1082
1083         if (ret)
1084                 return ret;
1085
1086         if (start_service)
1087                 rte_service_component_runstate_set(rx_adapter->service_id, 1);
1088
1089         return 0;
1090 }
1091
1092 int
1093 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
1094                                 int32_t rx_queue_id)
1095 {
1096         int ret = 0;
1097         struct rte_eventdev *dev;
1098         struct rte_event_eth_rx_adapter *rx_adapter;
1099         struct eth_device_info *dev_info;
1100         uint32_t cap;
1101         uint16_t i;
1102
1103         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1104         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1105
1106         rx_adapter = id_to_rx_adapter(id);
1107         if (rx_adapter == NULL)
1108                 return -EINVAL;
1109
1110         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1111         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
1112                                                 eth_dev_id,
1113                                                 &cap);
1114         if (ret)
1115                 return ret;
1116
1117         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1118                 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1119                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1120                          (uint16_t)rx_queue_id);
1121                 return -EINVAL;
1122         }
1123
1124         dev_info = &rx_adapter->eth_devices[eth_dev_id];
1125
1126         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1127                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
1128                                  -ENOTSUP);
1129                 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
1130                                                 &rte_eth_devices[eth_dev_id],
1131                                                 rx_queue_id);
1132                 if (ret == 0) {
1133                         update_queue_info(rx_adapter,
1134                                         &rx_adapter->eth_devices[eth_dev_id],
1135                                         rx_queue_id,
1136                                         0);
1137                         if (dev_info->nb_dev_queues == 0) {
1138                                 rte_free(dev_info->rx_queue);
1139                                 dev_info->rx_queue = NULL;
1140                         }
1141                 }
1142         } else {
1143                 int rc;
1144                 rte_spinlock_lock(&rx_adapter->rx_lock);
1145                 if (rx_queue_id == -1) {
1146                         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1147                                 event_eth_rx_adapter_queue_del(rx_adapter,
1148                                                         dev_info,
1149                                                         i);
1150                 } else {
1151                         event_eth_rx_adapter_queue_del(rx_adapter,
1152                                                 dev_info,
1153                                                 (uint16_t)rx_queue_id);
1154                 }
1155
1156                 rc = eth_poll_wrr_calc(rx_adapter);
1157                 if (rc)
1158                         RTE_EDEV_LOG_ERR("WRR recalculation failed %" PRId32,
1159                                         rc);
1160
1161                 if (dev_info->nb_dev_queues == 0) {
1162                         rte_free(dev_info->rx_queue);
1163                         dev_info->rx_queue = NULL;
1164                 }
1165
1166                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1167                 rte_service_component_runstate_set(rx_adapter->service_id,
1168                                 sw_rx_adapter_queue_count(rx_adapter));
1169         }
1170
1171         return ret;
1172 }
1173
1174
1175 int
1176 rte_event_eth_rx_adapter_start(uint8_t id)
1177 {
1178         return rx_adapter_ctrl(id, 1);
1179 }
1180
1181 int
1182 rte_event_eth_rx_adapter_stop(uint8_t id)
1183 {
1184         return rx_adapter_ctrl(id, 0);
1185 }
1186
1187 int
1188 rte_event_eth_rx_adapter_stats_get(uint8_t id,
1189                                struct rte_event_eth_rx_adapter_stats *stats)
1190 {
1191         struct rte_event_eth_rx_adapter *rx_adapter;
1192         struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
1193         struct rte_event_eth_rx_adapter_stats dev_stats;
1194         struct rte_eventdev *dev;
1195         struct eth_device_info *dev_info;
1196         uint32_t i;
1197         int ret;
1198
1199         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1200
1201         rx_adapter = id_to_rx_adapter(id);
1202         if (rx_adapter  == NULL || stats == NULL)
1203                 return -EINVAL;
1204
1205         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1206         memset(stats, 0, sizeof(*stats));
1207         RTE_ETH_FOREACH_DEV(i) {
1208                 dev_info = &rx_adapter->eth_devices[i];
1209                 if (dev_info->internal_event_port == 0 ||
1210                         dev->dev_ops->eth_rx_adapter_stats_get == NULL)
1211                         continue;
1212                 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
1213                                                 &rte_eth_devices[i],
1214                                                 &dev_stats);
1215                 if (ret)
1216                         continue;
1217                 dev_stats_sum.rx_packets += dev_stats.rx_packets;
1218                 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
1219         }
1220
1221         if (rx_adapter->service_inited)
1222                 *stats = rx_adapter->stats;
1223
1224         stats->rx_packets += dev_stats_sum.rx_packets;
1225         stats->rx_enq_count += dev_stats_sum.rx_enq_count;
1226         return 0;
1227 }
1228
1229 int
1230 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
1231 {
1232         struct rte_event_eth_rx_adapter *rx_adapter;
1233         struct rte_eventdev *dev;
1234         struct eth_device_info *dev_info;
1235         uint32_t i;
1236
1237         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1238
1239         rx_adapter = id_to_rx_adapter(id);
1240         if (rx_adapter == NULL)
1241                 return -EINVAL;
1242
1243         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1244         RTE_ETH_FOREACH_DEV(i) {
1245                 dev_info = &rx_adapter->eth_devices[i];
1246                 if (dev_info->internal_event_port == 0 ||
1247                         dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
1248                         continue;
1249                 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
1250                                                         &rte_eth_devices[i]);
1251         }
1252
1253         memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
1254         return 0;
1255 }
1256
1257 int
1258 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
1259 {
1260         struct rte_event_eth_rx_adapter *rx_adapter;
1261
1262         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1263
1264         rx_adapter = id_to_rx_adapter(id);
1265         if (rx_adapter == NULL || service_id == NULL)
1266                 return -EINVAL;
1267
1268         if (rx_adapter->service_inited)
1269                 *service_id = rx_adapter->service_id;
1270
1271         return rx_adapter->service_inited ? 0 : -ESRCH;
1272 }