ethdev: deprecate port count function
[dpdk.git] / lib / librte_eventdev / rte_event_eth_rx_adapter.c
1 #include <rte_cycles.h>
2 #include <rte_common.h>
3 #include <rte_dev.h>
4 #include <rte_errno.h>
5 #include <rte_ethdev.h>
6 #include <rte_log.h>
7 #include <rte_malloc.h>
8 #include <rte_service_component.h>
9 #include <rte_thash.h>
10
11 #include "rte_eventdev.h"
12 #include "rte_eventdev_pmd.h"
13 #include "rte_event_eth_rx_adapter.h"
14
15 #define BATCH_SIZE              32
16 #define BLOCK_CNT_THRESHOLD     10
17 #define ETH_EVENT_BUFFER_SIZE   (4*BATCH_SIZE)
18
19 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
20 #define ETH_RX_ADAPTER_MEM_NAME_LEN     32
21
22 #define RSS_KEY_SIZE    40
23
24 /*
25  * There is an instance of this struct per polled Rx queue added to the
26  * adapter
27  */
28 struct eth_rx_poll_entry {
29         /* Eth port to poll */
30         uint8_t eth_dev_id;
31         /* Eth rx queue to poll */
32         uint16_t eth_rx_qid;
33 };
34
35 /* Instance per adapter */
36 struct rte_eth_event_enqueue_buffer {
37         /* Count of events in this buffer */
38         uint16_t count;
39         /* Array of events in this buffer */
40         struct rte_event events[ETH_EVENT_BUFFER_SIZE];
41 };
42
43 struct rte_event_eth_rx_adapter {
44         /* RSS key */
45         uint8_t rss_key_be[RSS_KEY_SIZE];
46         /* Event device identifier */
47         uint8_t eventdev_id;
48         /* Per ethernet device structure */
49         struct eth_device_info *eth_devices;
50         /* Event port identifier */
51         uint8_t event_port_id;
52         /* Lock to serialize config updates with service function */
53         rte_spinlock_t rx_lock;
54         /* Max mbufs processed in any service function invocation */
55         uint32_t max_nb_rx;
56         /* Receive queues that need to be polled */
57         struct eth_rx_poll_entry *eth_rx_poll;
58         /* Size of the eth_rx_poll array */
59         uint16_t num_rx_polled;
60         /* Weighted round robin schedule */
61         uint32_t *wrr_sched;
62         /* wrr_sched[] size */
63         uint32_t wrr_len;
64         /* Next entry in wrr[] to begin polling */
65         uint32_t wrr_pos;
66         /* Event burst buffer */
67         struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
68         /* Per adapter stats */
69         struct rte_event_eth_rx_adapter_stats stats;
70         /* Block count, counts up to BLOCK_CNT_THRESHOLD */
71         uint16_t enq_block_count;
72         /* Block start ts */
73         uint64_t rx_enq_block_start_ts;
74         /* Configuration callback for rte_service configuration */
75         rte_event_eth_rx_adapter_conf_cb conf_cb;
76         /* Configuration callback argument */
77         void *conf_arg;
78         /* Set if  default_cb is being used */
79         int default_cb_arg;
80         /* Service initialization state */
81         uint8_t service_inited;
82         /* Total count of Rx queues in adapter */
83         uint32_t nb_queues;
84         /* Memory allocation name */
85         char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
86         /* Socket identifier cached from eventdev */
87         int socket_id;
88         /* Per adapter EAL service */
89         uint32_t service_id;
90 } __rte_cache_aligned;
91
92 /* Per eth device */
93 struct eth_device_info {
94         struct rte_eth_dev *dev;
95         struct eth_rx_queue_info *rx_queue;
96         /* Set if ethdev->eventdev packet transfer uses a
97          * hardware mechanism
98          */
99         uint8_t internal_event_port;
100         /* Set if the adapter is processing rx queues for
101          * this eth device and packet processing has been
102          * started, allows for the code to know if the PMD
103          * rx_adapter_stop callback needs to be invoked
104          */
105         uint8_t dev_rx_started;
106         /* If nb_dev_queues > 0, the start callback will
107          * be invoked if not already invoked
108          */
109         uint16_t nb_dev_queues;
110 };
111
112 /* Per Rx queue */
113 struct eth_rx_queue_info {
114         int queue_enabled;      /* True if added */
115         uint16_t wt;            /* Polling weight */
116         uint8_t event_queue_id; /* Event queue to enqueue packets to */
117         uint8_t sched_type;     /* Sched type for events */
118         uint8_t priority;       /* Event priority */
119         uint32_t flow_id;       /* App provided flow identifier */
120         uint32_t flow_id_mask;  /* Set to ~0 if app provides flow id else 0 */
121 };
122
123 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
124
125 static inline int
126 valid_id(uint8_t id)
127 {
128         return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
129 }
130
131 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
132         if (!valid_id(id)) { \
133                 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
134                 return retval; \
135         } \
136 } while (0)
137
138 static inline int
139 sw_rx_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
140 {
141         return rx_adapter->num_rx_polled;
142 }
143
144 /* Greatest common divisor */
145 static uint16_t gcd_u16(uint16_t a, uint16_t b)
146 {
147         uint16_t r = a % b;
148
149         return r ? gcd_u16(b, r) : b;
150 }
151
152 /* Returns the next queue in the polling sequence
153  *
154  * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
155  */
156 static int
157 wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
158          unsigned int n, int *cw,
159          struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
160          uint16_t gcd, int prev)
161 {
162         int i = prev;
163         uint16_t w;
164
165         while (1) {
166                 uint16_t q;
167                 uint8_t d;
168
169                 i = (i + 1) % n;
170                 if (i == 0) {
171                         *cw = *cw - gcd;
172                         if (*cw <= 0)
173                                 *cw = max_wt;
174                 }
175
176                 q = eth_rx_poll[i].eth_rx_qid;
177                 d = eth_rx_poll[i].eth_dev_id;
178                 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
179
180                 if ((int)w >= *cw)
181                         return i;
182         }
183 }
184
185 /* Precalculate WRR polling sequence for all queues in rx_adapter */
186 static int
187 eth_poll_wrr_calc(struct rte_event_eth_rx_adapter *rx_adapter)
188 {
189         uint8_t d;
190         uint16_t q;
191         unsigned int i;
192
193         /* Initialize variables for calculation of wrr schedule */
194         uint16_t max_wrr_pos = 0;
195         unsigned int poll_q = 0;
196         uint16_t max_wt = 0;
197         uint16_t gcd = 0;
198
199         struct eth_rx_poll_entry *rx_poll = NULL;
200         uint32_t *rx_wrr = NULL;
201
202         if (rx_adapter->num_rx_polled) {
203                 size_t len = RTE_ALIGN(rx_adapter->num_rx_polled *
204                                 sizeof(*rx_adapter->eth_rx_poll),
205                                 RTE_CACHE_LINE_SIZE);
206                 rx_poll = rte_zmalloc_socket(rx_adapter->mem_name,
207                                              len,
208                                              RTE_CACHE_LINE_SIZE,
209                                              rx_adapter->socket_id);
210                 if (rx_poll == NULL)
211                         return -ENOMEM;
212
213                 /* Generate array of all queues to poll, the size of this
214                  * array is poll_q
215                  */
216                 RTE_ETH_FOREACH_DEV(d) {
217                         uint16_t nb_rx_queues;
218                         struct eth_device_info *dev_info =
219                                         &rx_adapter->eth_devices[d];
220                         nb_rx_queues = dev_info->dev->data->nb_rx_queues;
221                         if (dev_info->rx_queue == NULL)
222                                 continue;
223                         for (q = 0; q < nb_rx_queues; q++) {
224                                 struct eth_rx_queue_info *queue_info =
225                                         &dev_info->rx_queue[q];
226                                 if (queue_info->queue_enabled == 0)
227                                         continue;
228
229                                 uint16_t wt = queue_info->wt;
230                                 rx_poll[poll_q].eth_dev_id = d;
231                                 rx_poll[poll_q].eth_rx_qid = q;
232                                 max_wrr_pos += wt;
233                                 max_wt = RTE_MAX(max_wt, wt);
234                                 gcd = (gcd) ? gcd_u16(gcd, wt) : wt;
235                                 poll_q++;
236                         }
237                 }
238
239                 len = RTE_ALIGN(max_wrr_pos * sizeof(*rx_wrr),
240                                 RTE_CACHE_LINE_SIZE);
241                 rx_wrr = rte_zmalloc_socket(rx_adapter->mem_name,
242                                             len,
243                                             RTE_CACHE_LINE_SIZE,
244                                             rx_adapter->socket_id);
245                 if (rx_wrr == NULL) {
246                         rte_free(rx_poll);
247                         return -ENOMEM;
248                 }
249
250                 /* Generate polling sequence based on weights */
251                 int prev = -1;
252                 int cw = -1;
253                 for (i = 0; i < max_wrr_pos; i++) {
254                         rx_wrr[i] = wrr_next(rx_adapter, poll_q, &cw,
255                                              rx_poll, max_wt, gcd, prev);
256                         prev = rx_wrr[i];
257                 }
258         }
259
260         rte_free(rx_adapter->eth_rx_poll);
261         rte_free(rx_adapter->wrr_sched);
262
263         rx_adapter->eth_rx_poll = rx_poll;
264         rx_adapter->wrr_sched = rx_wrr;
265         rx_adapter->wrr_len = max_wrr_pos;
266
267         return 0;
268 }
269
270 static inline void
271 mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr,
272         struct ipv6_hdr **ipv6_hdr)
273 {
274         struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(m, struct ether_hdr *);
275         struct vlan_hdr *vlan_hdr;
276
277         *ipv4_hdr = NULL;
278         *ipv6_hdr = NULL;
279
280         switch (eth_hdr->ether_type) {
281         case RTE_BE16(ETHER_TYPE_IPv4):
282                 *ipv4_hdr = (struct ipv4_hdr *)(eth_hdr + 1);
283                 break;
284
285         case RTE_BE16(ETHER_TYPE_IPv6):
286                 *ipv6_hdr = (struct ipv6_hdr *)(eth_hdr + 1);
287                 break;
288
289         case RTE_BE16(ETHER_TYPE_VLAN):
290                 vlan_hdr = (struct vlan_hdr *)(eth_hdr + 1);
291                 switch (vlan_hdr->eth_proto) {
292                 case RTE_BE16(ETHER_TYPE_IPv4):
293                         *ipv4_hdr = (struct ipv4_hdr *)(vlan_hdr + 1);
294                         break;
295                 case RTE_BE16(ETHER_TYPE_IPv6):
296                         *ipv6_hdr = (struct ipv6_hdr *)(vlan_hdr + 1);
297                         break;
298                 default:
299                         break;
300                 }
301                 break;
302
303         default:
304                 break;
305         }
306 }
307
308 /* Calculate RSS hash for IPv4/6 */
309 static inline uint32_t
310 do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
311 {
312         uint32_t input_len;
313         void *tuple;
314         struct rte_ipv4_tuple ipv4_tuple;
315         struct rte_ipv6_tuple ipv6_tuple;
316         struct ipv4_hdr *ipv4_hdr;
317         struct ipv6_hdr *ipv6_hdr;
318
319         mtoip(m, &ipv4_hdr, &ipv6_hdr);
320
321         if (ipv4_hdr) {
322                 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
323                 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
324                 tuple = &ipv4_tuple;
325                 input_len = RTE_THASH_V4_L3_LEN;
326         } else if (ipv6_hdr) {
327                 rte_thash_load_v6_addrs(ipv6_hdr,
328                                         (union rte_thash_tuple *)&ipv6_tuple);
329                 tuple = &ipv6_tuple;
330                 input_len = RTE_THASH_V6_L3_LEN;
331         } else
332                 return 0;
333
334         return rte_softrss_be(tuple, input_len, rss_key_be);
335 }
336
337 static inline int
338 rx_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
339 {
340         return !!rx_adapter->enq_block_count;
341 }
342
343 static inline void
344 rx_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
345 {
346         if (rx_adapter->rx_enq_block_start_ts)
347                 return;
348
349         rx_adapter->enq_block_count++;
350         if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
351                 return;
352
353         rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
354 }
355
356 static inline void
357 rx_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
358                     struct rte_event_eth_rx_adapter_stats *stats)
359 {
360         if (unlikely(!stats->rx_enq_start_ts))
361                 stats->rx_enq_start_ts = rte_get_tsc_cycles();
362
363         if (likely(!rx_enq_blocked(rx_adapter)))
364                 return;
365
366         rx_adapter->enq_block_count = 0;
367         if (rx_adapter->rx_enq_block_start_ts) {
368                 stats->rx_enq_end_ts = rte_get_tsc_cycles();
369                 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
370                     rx_adapter->rx_enq_block_start_ts;
371                 rx_adapter->rx_enq_block_start_ts = 0;
372         }
373 }
374
375 /* Add event to buffer, free space check is done prior to calling
376  * this function
377  */
378 static inline void
379 buf_event_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
380                   struct rte_event *ev)
381 {
382         struct rte_eth_event_enqueue_buffer *buf =
383             &rx_adapter->event_enqueue_buffer;
384         rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event));
385 }
386
387 /* Enqueue buffered events to event device */
388 static inline uint16_t
389 flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
390 {
391         struct rte_eth_event_enqueue_buffer *buf =
392             &rx_adapter->event_enqueue_buffer;
393         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
394
395         uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
396                                         rx_adapter->event_port_id,
397                                         buf->events,
398                                         buf->count);
399         if (n != buf->count) {
400                 memmove(buf->events,
401                         &buf->events[n],
402                         (buf->count - n) * sizeof(struct rte_event));
403                 stats->rx_enq_retry++;
404         }
405
406         n ? rx_enq_block_end_ts(rx_adapter, stats) :
407                 rx_enq_block_start_ts(rx_adapter);
408
409         buf->count -= n;
410         stats->rx_enq_count += n;
411
412         return n;
413 }
414
415 static inline void
416 fill_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter,
417         uint8_t dev_id,
418         uint16_t rx_queue_id,
419         struct rte_mbuf **mbufs,
420         uint16_t num)
421 {
422         uint32_t i;
423         struct eth_device_info *eth_device_info =
424                                         &rx_adapter->eth_devices[dev_id];
425         struct eth_rx_queue_info *eth_rx_queue_info =
426                                         &eth_device_info->rx_queue[rx_queue_id];
427
428         int32_t qid = eth_rx_queue_info->event_queue_id;
429         uint8_t sched_type = eth_rx_queue_info->sched_type;
430         uint8_t priority = eth_rx_queue_info->priority;
431         uint32_t flow_id;
432         struct rte_event events[BATCH_SIZE];
433         struct rte_mbuf *m = mbufs[0];
434         uint32_t rss_mask;
435         uint32_t rss;
436         int do_rss;
437         uint64_t ts;
438
439         /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
440         rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
441         do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
442
443         if ((m->ol_flags & PKT_RX_TIMESTAMP) == 0) {
444                 ts = rte_get_tsc_cycles();
445                 for (i = 0; i < num; i++) {
446                         m = mbufs[i];
447
448                         m->timestamp = ts;
449                         m->ol_flags |= PKT_RX_TIMESTAMP;
450                 }
451         }
452
453         for (i = 0; i < num; i++) {
454                 m = mbufs[i];
455                 struct rte_event *ev = &events[i];
456
457                 rss = do_rss ?
458                         do_softrss(m, rx_adapter->rss_key_be) : m->hash.rss;
459                 flow_id =
460                     eth_rx_queue_info->flow_id &
461                                 eth_rx_queue_info->flow_id_mask;
462                 flow_id |= rss & ~eth_rx_queue_info->flow_id_mask;
463                 ev->flow_id = flow_id;
464                 ev->op = RTE_EVENT_OP_NEW;
465                 ev->sched_type = sched_type;
466                 ev->queue_id = qid;
467                 ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
468                 ev->sub_event_type = 0;
469                 ev->priority = priority;
470                 ev->mbuf = m;
471
472                 buf_event_enqueue(rx_adapter, ev);
473         }
474 }
475
476 /*
477  * Polls receive queues added to the event adapter and enqueues received
478  * packets to the event device.
479  *
480  * The receive code enqueues initially to a temporary buffer, the
481  * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
482  *
483  * If there isn't space available in the temporary buffer, packets from the
484  * Rx queue aren't dequeued from the eth device, this back pressures the
485  * eth device, in virtual device environments this back pressure is relayed to
486  * the hypervisor's switching layer where adjustments can be made to deal with
487  * it.
488  */
489 static inline uint32_t
490 eth_rx_poll(struct rte_event_eth_rx_adapter *rx_adapter)
491 {
492         uint32_t num_queue;
493         uint16_t n;
494         uint32_t nb_rx = 0;
495         struct rte_mbuf *mbufs[BATCH_SIZE];
496         struct rte_eth_event_enqueue_buffer *buf;
497         uint32_t wrr_pos;
498         uint32_t max_nb_rx;
499
500         wrr_pos = rx_adapter->wrr_pos;
501         max_nb_rx = rx_adapter->max_nb_rx;
502         buf = &rx_adapter->event_enqueue_buffer;
503         struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
504
505         /* Iterate through a WRR sequence */
506         for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
507                 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
508                 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
509                 uint8_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
510
511                 /* Don't do a batch dequeue from the rx queue if there isn't
512                  * enough space in the enqueue buffer.
513                  */
514                 if (buf->count >= BATCH_SIZE)
515                         flush_event_buffer(rx_adapter);
516                 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count))
517                         break;
518
519                 stats->rx_poll_count++;
520                 n = rte_eth_rx_burst(d, qid, mbufs, BATCH_SIZE);
521
522                 if (n) {
523                         stats->rx_packets += n;
524                         /* The check before rte_eth_rx_burst() ensures that
525                          * all n mbufs can be buffered
526                          */
527                         fill_event_buffer(rx_adapter, d, qid, mbufs, n);
528                         nb_rx += n;
529                         if (nb_rx > max_nb_rx) {
530                                 rx_adapter->wrr_pos =
531                                     (wrr_pos + 1) % rx_adapter->wrr_len;
532                                 return nb_rx;
533                         }
534                 }
535
536                 if (++wrr_pos == rx_adapter->wrr_len)
537                         wrr_pos = 0;
538         }
539
540         return nb_rx;
541 }
542
543 static int
544 event_eth_rx_adapter_service_func(void *args)
545 {
546         struct rte_event_eth_rx_adapter *rx_adapter = args;
547         struct rte_eth_event_enqueue_buffer *buf;
548
549         buf = &rx_adapter->event_enqueue_buffer;
550         if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
551                 return 0;
552         if (eth_rx_poll(rx_adapter) == 0 && buf->count)
553                 flush_event_buffer(rx_adapter);
554         rte_spinlock_unlock(&rx_adapter->rx_lock);
555         return 0;
556 }
557
558 static int
559 rte_event_eth_rx_adapter_init(void)
560 {
561         const char *name = "rte_event_eth_rx_adapter_array";
562         const struct rte_memzone *mz;
563         unsigned int sz;
564
565         sz = sizeof(*event_eth_rx_adapter) *
566             RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
567         sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
568
569         mz = rte_memzone_lookup(name);
570         if (mz == NULL) {
571                 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
572                                                  RTE_CACHE_LINE_SIZE);
573                 if (mz == NULL) {
574                         RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
575                                         PRId32, rte_errno);
576                         return -rte_errno;
577                 }
578         }
579
580         event_eth_rx_adapter = mz->addr;
581         return 0;
582 }
583
584 static inline struct rte_event_eth_rx_adapter *
585 id_to_rx_adapter(uint8_t id)
586 {
587         return event_eth_rx_adapter ?
588                 event_eth_rx_adapter[id] : NULL;
589 }
590
591 static int
592 default_conf_cb(uint8_t id, uint8_t dev_id,
593                 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
594 {
595         int ret;
596         struct rte_eventdev *dev;
597         struct rte_event_dev_config dev_conf;
598         int started;
599         uint8_t port_id;
600         struct rte_event_port_conf *port_conf = arg;
601         struct rte_event_eth_rx_adapter *rx_adapter = id_to_rx_adapter(id);
602
603         dev = &rte_eventdevs[rx_adapter->eventdev_id];
604         dev_conf = dev->data->dev_conf;
605
606         started = dev->data->dev_started;
607         if (started)
608                 rte_event_dev_stop(dev_id);
609         port_id = dev_conf.nb_event_ports;
610         dev_conf.nb_event_ports += 1;
611         ret = rte_event_dev_configure(dev_id, &dev_conf);
612         if (ret) {
613                 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
614                                                 dev_id);
615                 if (started) {
616                         if (rte_event_dev_start(dev_id))
617                                 return -EIO;
618                 }
619                 return ret;
620         }
621
622         ret = rte_event_port_setup(dev_id, port_id, port_conf);
623         if (ret) {
624                 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
625                                         port_id);
626                 return ret;
627         }
628
629         conf->event_port_id = port_id;
630         conf->max_nb_rx = 128;
631         if (started)
632                 ret = rte_event_dev_start(dev_id);
633         rx_adapter->default_cb_arg = 1;
634         return ret;
635 }
636
637 static int
638 init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
639 {
640         int ret;
641         struct rte_service_spec service;
642         struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
643
644         if (rx_adapter->service_inited)
645                 return 0;
646
647         memset(&service, 0, sizeof(service));
648         snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
649                 "rte_event_eth_rx_adapter_%d", id);
650         service.socket_id = rx_adapter->socket_id;
651         service.callback = event_eth_rx_adapter_service_func;
652         service.callback_userdata = rx_adapter;
653         /* Service function handles locking for queue add/del updates */
654         service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
655         ret = rte_service_component_register(&service, &rx_adapter->service_id);
656         if (ret) {
657                 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
658                         service.name, ret);
659                 return ret;
660         }
661
662         ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
663                 &rx_adapter_conf, rx_adapter->conf_arg);
664         if (ret) {
665                 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
666                         ret);
667                 goto err_done;
668         }
669         rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
670         rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
671         rx_adapter->service_inited = 1;
672         return 0;
673
674 err_done:
675         rte_service_component_unregister(rx_adapter->service_id);
676         return ret;
677 }
678
679
680 static void
681 update_queue_info(struct rte_event_eth_rx_adapter *rx_adapter,
682                 struct eth_device_info *dev_info,
683                 int32_t rx_queue_id,
684                 uint8_t add)
685 {
686         struct eth_rx_queue_info *queue_info;
687         int enabled;
688         uint16_t i;
689
690         if (dev_info->rx_queue == NULL)
691                 return;
692
693         if (rx_queue_id == -1) {
694                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
695                         update_queue_info(rx_adapter, dev_info, i, add);
696         } else {
697                 queue_info = &dev_info->rx_queue[rx_queue_id];
698                 enabled = queue_info->queue_enabled;
699                 if (add) {
700                         rx_adapter->nb_queues += !enabled;
701                         dev_info->nb_dev_queues += !enabled;
702                 } else {
703                         rx_adapter->nb_queues -= enabled;
704                         dev_info->nb_dev_queues -= enabled;
705                 }
706                 queue_info->queue_enabled = !!add;
707         }
708 }
709
710 static int
711 event_eth_rx_adapter_queue_del(struct rte_event_eth_rx_adapter *rx_adapter,
712                             struct eth_device_info *dev_info,
713                             uint16_t rx_queue_id)
714 {
715         struct eth_rx_queue_info *queue_info;
716
717         if (rx_adapter->nb_queues == 0)
718                 return 0;
719
720         queue_info = &dev_info->rx_queue[rx_queue_id];
721         rx_adapter->num_rx_polled -= queue_info->queue_enabled;
722         update_queue_info(rx_adapter, dev_info, rx_queue_id, 0);
723         return 0;
724 }
725
726 static void
727 event_eth_rx_adapter_queue_add(struct rte_event_eth_rx_adapter *rx_adapter,
728                 struct eth_device_info *dev_info,
729                 uint16_t rx_queue_id,
730                 const struct rte_event_eth_rx_adapter_queue_conf *conf)
731
732 {
733         struct eth_rx_queue_info *queue_info;
734         const struct rte_event *ev = &conf->ev;
735
736         queue_info = &dev_info->rx_queue[rx_queue_id];
737         queue_info->event_queue_id = ev->queue_id;
738         queue_info->sched_type = ev->sched_type;
739         queue_info->priority = ev->priority;
740         queue_info->wt = conf->servicing_weight;
741
742         if (conf->rx_queue_flags &
743                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
744                 queue_info->flow_id = ev->flow_id;
745                 queue_info->flow_id_mask = ~0;
746         }
747
748         /* The same queue can be added more than once */
749         rx_adapter->num_rx_polled += !queue_info->queue_enabled;
750         update_queue_info(rx_adapter, dev_info, rx_queue_id, 1);
751 }
752
753 static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
754                 uint8_t eth_dev_id,
755                 int rx_queue_id,
756                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
757 {
758         struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
759         struct rte_event_eth_rx_adapter_queue_conf temp_conf;
760         uint32_t i;
761         int ret;
762
763         if (queue_conf->servicing_weight == 0) {
764
765                 struct rte_eth_dev_data *data = dev_info->dev->data;
766                 if (data->dev_conf.intr_conf.rxq) {
767                         RTE_EDEV_LOG_ERR("Interrupt driven queues"
768                                         " not supported");
769                         return -ENOTSUP;
770                 }
771                 temp_conf = *queue_conf;
772
773                 /* If Rx interrupts are disabled set wt = 1 */
774                 temp_conf.servicing_weight = 1;
775                 queue_conf = &temp_conf;
776         }
777
778         if (dev_info->rx_queue == NULL) {
779                 dev_info->rx_queue =
780                     rte_zmalloc_socket(rx_adapter->mem_name,
781                                        dev_info->dev->data->nb_rx_queues *
782                                        sizeof(struct eth_rx_queue_info), 0,
783                                        rx_adapter->socket_id);
784                 if (dev_info->rx_queue == NULL)
785                         return -ENOMEM;
786         }
787
788         if (rx_queue_id == -1) {
789                 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
790                         event_eth_rx_adapter_queue_add(rx_adapter,
791                                                 dev_info, i,
792                                                 queue_conf);
793         } else {
794                 event_eth_rx_adapter_queue_add(rx_adapter, dev_info,
795                                           (uint16_t)rx_queue_id,
796                                           queue_conf);
797         }
798
799         ret = eth_poll_wrr_calc(rx_adapter);
800         if (ret) {
801                 event_eth_rx_adapter_queue_del(rx_adapter,
802                                         dev_info, rx_queue_id);
803                 return ret;
804         }
805
806         return ret;
807 }
808
809 static int
810 rx_adapter_ctrl(uint8_t id, int start)
811 {
812         struct rte_event_eth_rx_adapter *rx_adapter;
813         struct rte_eventdev *dev;
814         struct eth_device_info *dev_info;
815         uint32_t i;
816         int use_service = 0;
817         int stop = !start;
818
819         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
820         rx_adapter = id_to_rx_adapter(id);
821         if (rx_adapter == NULL)
822                 return -EINVAL;
823
824         dev = &rte_eventdevs[rx_adapter->eventdev_id];
825
826         RTE_ETH_FOREACH_DEV(i) {
827                 dev_info = &rx_adapter->eth_devices[i];
828                 /* if start  check for num dev queues */
829                 if (start && !dev_info->nb_dev_queues)
830                         continue;
831                 /* if stop check if dev has been started */
832                 if (stop && !dev_info->dev_rx_started)
833                         continue;
834                 use_service |= !dev_info->internal_event_port;
835                 dev_info->dev_rx_started = start;
836                 if (dev_info->internal_event_port == 0)
837                         continue;
838                 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
839                                                 &rte_eth_devices[i]) :
840                         (*dev->dev_ops->eth_rx_adapter_stop)(dev,
841                                                 &rte_eth_devices[i]);
842         }
843
844         if (use_service)
845                 rte_service_runstate_set(rx_adapter->service_id, start);
846
847         return 0;
848 }
849
850 int
851 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
852                                 rte_event_eth_rx_adapter_conf_cb conf_cb,
853                                 void *conf_arg)
854 {
855         struct rte_event_eth_rx_adapter *rx_adapter;
856         int ret;
857         int socket_id;
858         uint8_t i;
859         char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
860         const uint8_t default_rss_key[] = {
861                 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
862                 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
863                 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
864                 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
865                 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
866         };
867
868         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
869         RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
870         if (conf_cb == NULL)
871                 return -EINVAL;
872
873         if (event_eth_rx_adapter == NULL) {
874                 ret = rte_event_eth_rx_adapter_init();
875                 if (ret)
876                         return ret;
877         }
878
879         rx_adapter = id_to_rx_adapter(id);
880         if (rx_adapter != NULL) {
881                 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
882                 return -EEXIST;
883         }
884
885         socket_id = rte_event_dev_socket_id(dev_id);
886         snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
887                 "rte_event_eth_rx_adapter_%d",
888                 id);
889
890         rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
891                         RTE_CACHE_LINE_SIZE, socket_id);
892         if (rx_adapter == NULL) {
893                 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
894                 return -ENOMEM;
895         }
896
897         rx_adapter->eventdev_id = dev_id;
898         rx_adapter->socket_id = socket_id;
899         rx_adapter->conf_cb = conf_cb;
900         rx_adapter->conf_arg = conf_arg;
901         strcpy(rx_adapter->mem_name, mem_name);
902         rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
903                                         /* FIXME: incompatible with hotplug */
904                                         rte_eth_dev_count_total() *
905                                         sizeof(struct eth_device_info), 0,
906                                         socket_id);
907         rte_convert_rss_key((const uint32_t *)default_rss_key,
908                         (uint32_t *)rx_adapter->rss_key_be,
909                             RTE_DIM(default_rss_key));
910
911         if (rx_adapter->eth_devices == NULL) {
912                 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
913                 rte_free(rx_adapter);
914                 return -ENOMEM;
915         }
916         rte_spinlock_init(&rx_adapter->rx_lock);
917         RTE_ETH_FOREACH_DEV(i)
918                 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
919
920         event_eth_rx_adapter[id] = rx_adapter;
921         if (conf_cb == default_conf_cb)
922                 rx_adapter->default_cb_arg = 1;
923         return 0;
924 }
925
926 int
927 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
928                 struct rte_event_port_conf *port_config)
929 {
930         struct rte_event_port_conf *pc;
931         int ret;
932
933         if (port_config == NULL)
934                 return -EINVAL;
935         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
936
937         pc = rte_malloc(NULL, sizeof(*pc), 0);
938         if (pc == NULL)
939                 return -ENOMEM;
940         *pc = *port_config;
941         ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
942                                         default_conf_cb,
943                                         pc);
944         if (ret)
945                 rte_free(pc);
946         return ret;
947 }
948
949 int
950 rte_event_eth_rx_adapter_free(uint8_t id)
951 {
952         struct rte_event_eth_rx_adapter *rx_adapter;
953
954         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
955
956         rx_adapter = id_to_rx_adapter(id);
957         if (rx_adapter == NULL)
958                 return -EINVAL;
959
960         if (rx_adapter->nb_queues) {
961                 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
962                                 rx_adapter->nb_queues);
963                 return -EBUSY;
964         }
965
966         if (rx_adapter->default_cb_arg)
967                 rte_free(rx_adapter->conf_arg);
968         rte_free(rx_adapter->eth_devices);
969         rte_free(rx_adapter);
970         event_eth_rx_adapter[id] = NULL;
971
972         return 0;
973 }
974
975 int
976 rte_event_eth_rx_adapter_queue_add(uint8_t id,
977                 uint8_t eth_dev_id,
978                 int32_t rx_queue_id,
979                 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
980 {
981         int ret;
982         uint32_t cap;
983         struct rte_event_eth_rx_adapter *rx_adapter;
984         struct rte_eventdev *dev;
985         struct eth_device_info *dev_info;
986         int start_service;
987
988         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
989         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
990
991         rx_adapter = id_to_rx_adapter(id);
992         if ((rx_adapter == NULL) || (queue_conf == NULL))
993                 return -EINVAL;
994
995         dev = &rte_eventdevs[rx_adapter->eventdev_id];
996         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
997                                                 eth_dev_id,
998                                                 &cap);
999         if (ret) {
1000                 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
1001                         "eth port %" PRIu8, id, eth_dev_id);
1002                 return ret;
1003         }
1004
1005         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
1006                 && (queue_conf->rx_queue_flags &
1007                         RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
1008                 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
1009                                 " eth port: %" PRIu8 " adapter id: %" PRIu8,
1010                                 eth_dev_id, id);
1011                 return -EINVAL;
1012         }
1013
1014         if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
1015                 (rx_queue_id != -1)) {
1016                 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
1017                         "event queue id %u eth port %u", id, eth_dev_id);
1018                 return -EINVAL;
1019         }
1020
1021         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1022                         rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1023                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1024                          (uint16_t)rx_queue_id);
1025                 return -EINVAL;
1026         }
1027
1028         start_service = 0;
1029         dev_info = &rx_adapter->eth_devices[eth_dev_id];
1030
1031         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1032                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
1033                                         -ENOTSUP);
1034                 if (dev_info->rx_queue == NULL) {
1035                         dev_info->rx_queue =
1036                             rte_zmalloc_socket(rx_adapter->mem_name,
1037                                         dev_info->dev->data->nb_rx_queues *
1038                                         sizeof(struct eth_rx_queue_info), 0,
1039                                         rx_adapter->socket_id);
1040                         if (dev_info->rx_queue == NULL)
1041                                 return -ENOMEM;
1042                 }
1043
1044                 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
1045                                 &rte_eth_devices[eth_dev_id],
1046                                 rx_queue_id, queue_conf);
1047                 if (ret == 0) {
1048                         update_queue_info(rx_adapter,
1049                                         &rx_adapter->eth_devices[eth_dev_id],
1050                                         rx_queue_id,
1051                                         1);
1052                 }
1053         } else {
1054                 rte_spinlock_lock(&rx_adapter->rx_lock);
1055                 ret = init_service(rx_adapter, id);
1056                 if (ret == 0)
1057                         ret = add_rx_queue(rx_adapter, eth_dev_id, rx_queue_id,
1058                                         queue_conf);
1059                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1060                 if (ret == 0)
1061                         start_service = !!sw_rx_adapter_queue_count(rx_adapter);
1062         }
1063
1064         if (ret)
1065                 return ret;
1066
1067         if (start_service)
1068                 rte_service_component_runstate_set(rx_adapter->service_id, 1);
1069
1070         return 0;
1071 }
1072
1073 int
1074 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint8_t eth_dev_id,
1075                                 int32_t rx_queue_id)
1076 {
1077         int ret = 0;
1078         struct rte_eventdev *dev;
1079         struct rte_event_eth_rx_adapter *rx_adapter;
1080         struct eth_device_info *dev_info;
1081         uint32_t cap;
1082         uint16_t i;
1083
1084         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1085         RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1086
1087         rx_adapter = id_to_rx_adapter(id);
1088         if (rx_adapter == NULL)
1089                 return -EINVAL;
1090
1091         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1092         ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
1093                                                 eth_dev_id,
1094                                                 &cap);
1095         if (ret)
1096                 return ret;
1097
1098         if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1099                 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1100                 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1101                          (uint16_t)rx_queue_id);
1102                 return -EINVAL;
1103         }
1104
1105         dev_info = &rx_adapter->eth_devices[eth_dev_id];
1106
1107         if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1108                 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
1109                                  -ENOTSUP);
1110                 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
1111                                                 &rte_eth_devices[eth_dev_id],
1112                                                 rx_queue_id);
1113                 if (ret == 0) {
1114                         update_queue_info(rx_adapter,
1115                                         &rx_adapter->eth_devices[eth_dev_id],
1116                                         rx_queue_id,
1117                                         0);
1118                         if (dev_info->nb_dev_queues == 0) {
1119                                 rte_free(dev_info->rx_queue);
1120                                 dev_info->rx_queue = NULL;
1121                         }
1122                 }
1123         } else {
1124                 int rc;
1125                 rte_spinlock_lock(&rx_adapter->rx_lock);
1126                 if (rx_queue_id == -1) {
1127                         for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1128                                 event_eth_rx_adapter_queue_del(rx_adapter,
1129                                                         dev_info,
1130                                                         i);
1131                 } else {
1132                         event_eth_rx_adapter_queue_del(rx_adapter,
1133                                                 dev_info,
1134                                                 (uint16_t)rx_queue_id);
1135                 }
1136
1137                 rc = eth_poll_wrr_calc(rx_adapter);
1138                 if (rc)
1139                         RTE_EDEV_LOG_ERR("WRR recalculation failed %" PRId32,
1140                                         rc);
1141
1142                 if (dev_info->nb_dev_queues == 0) {
1143                         rte_free(dev_info->rx_queue);
1144                         dev_info->rx_queue = NULL;
1145                 }
1146
1147                 rte_spinlock_unlock(&rx_adapter->rx_lock);
1148                 rte_service_component_runstate_set(rx_adapter->service_id,
1149                                 sw_rx_adapter_queue_count(rx_adapter));
1150         }
1151
1152         return ret;
1153 }
1154
1155
1156 int
1157 rte_event_eth_rx_adapter_start(uint8_t id)
1158 {
1159         return rx_adapter_ctrl(id, 1);
1160 }
1161
1162 int
1163 rte_event_eth_rx_adapter_stop(uint8_t id)
1164 {
1165         return rx_adapter_ctrl(id, 0);
1166 }
1167
1168 int
1169 rte_event_eth_rx_adapter_stats_get(uint8_t id,
1170                                struct rte_event_eth_rx_adapter_stats *stats)
1171 {
1172         struct rte_event_eth_rx_adapter *rx_adapter;
1173         struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
1174         struct rte_event_eth_rx_adapter_stats dev_stats;
1175         struct rte_eventdev *dev;
1176         struct eth_device_info *dev_info;
1177         uint32_t i;
1178         int ret;
1179
1180         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1181
1182         rx_adapter = id_to_rx_adapter(id);
1183         if (rx_adapter  == NULL || stats == NULL)
1184                 return -EINVAL;
1185
1186         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1187         memset(stats, 0, sizeof(*stats));
1188         RTE_ETH_FOREACH_DEV(i) {
1189                 dev_info = &rx_adapter->eth_devices[i];
1190                 if (dev_info->internal_event_port == 0 ||
1191                         dev->dev_ops->eth_rx_adapter_stats_get == NULL)
1192                         continue;
1193                 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
1194                                                 &rte_eth_devices[i],
1195                                                 &dev_stats);
1196                 if (ret)
1197                         continue;
1198                 dev_stats_sum.rx_packets += dev_stats.rx_packets;
1199                 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
1200         }
1201
1202         if (rx_adapter->service_inited)
1203                 *stats = rx_adapter->stats;
1204
1205         stats->rx_packets += dev_stats_sum.rx_packets;
1206         stats->rx_enq_count += dev_stats_sum.rx_enq_count;
1207         return 0;
1208 }
1209
1210 int
1211 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
1212 {
1213         struct rte_event_eth_rx_adapter *rx_adapter;
1214         struct rte_eventdev *dev;
1215         struct eth_device_info *dev_info;
1216         uint32_t i;
1217
1218         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1219
1220         rx_adapter = id_to_rx_adapter(id);
1221         if (rx_adapter == NULL)
1222                 return -EINVAL;
1223
1224         dev = &rte_eventdevs[rx_adapter->eventdev_id];
1225         RTE_ETH_FOREACH_DEV(i) {
1226                 dev_info = &rx_adapter->eth_devices[i];
1227                 if (dev_info->internal_event_port == 0 ||
1228                         dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
1229                         continue;
1230                 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
1231                                                         &rte_eth_devices[i]);
1232         }
1233
1234         memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
1235         return 0;
1236 }
1237
1238 int
1239 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
1240 {
1241         struct rte_event_eth_rx_adapter *rx_adapter;
1242
1243         RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1244
1245         rx_adapter = id_to_rx_adapter(id);
1246         if (rx_adapter == NULL || service_id == NULL)
1247                 return -EINVAL;
1248
1249         if (rx_adapter->service_inited)
1250                 *service_id = rx_adapter->service_id;
1251
1252         return rx_adapter->service_inited ? 0 : -ESRCH;
1253 }