1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2017 Intel Corporation.
5 #include <rte_cycles.h>
6 #include <rte_common.h>
9 #include <rte_ethdev.h>
11 #include <rte_malloc.h>
12 #include <rte_service_component.h>
13 #include <rte_thash.h>
15 #include "rte_eventdev.h"
16 #include "rte_eventdev_pmd.h"
17 #include "rte_event_eth_rx_adapter.h"
20 #define BLOCK_CNT_THRESHOLD 10
21 #define ETH_EVENT_BUFFER_SIZE (4*BATCH_SIZE)
23 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
24 #define ETH_RX_ADAPTER_MEM_NAME_LEN 32
26 #define RSS_KEY_SIZE 40
29 * There is an instance of this struct per polled Rx queue added to the
32 struct eth_rx_poll_entry {
33 /* Eth port to poll */
35 /* Eth rx queue to poll */
39 /* Instance per adapter */
40 struct rte_eth_event_enqueue_buffer {
41 /* Count of events in this buffer */
43 /* Array of events in this buffer */
44 struct rte_event events[ETH_EVENT_BUFFER_SIZE];
47 struct rte_event_eth_rx_adapter {
49 uint8_t rss_key_be[RSS_KEY_SIZE];
50 /* Event device identifier */
52 /* Per ethernet device structure */
53 struct eth_device_info *eth_devices;
54 /* Event port identifier */
55 uint8_t event_port_id;
56 /* Lock to serialize config updates with service function */
57 rte_spinlock_t rx_lock;
58 /* Max mbufs processed in any service function invocation */
60 /* Receive queues that need to be polled */
61 struct eth_rx_poll_entry *eth_rx_poll;
62 /* Size of the eth_rx_poll array */
63 uint16_t num_rx_polled;
64 /* Weighted round robin schedule */
66 /* wrr_sched[] size */
68 /* Next entry in wrr[] to begin polling */
70 /* Event burst buffer */
71 struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
72 /* Per adapter stats */
73 struct rte_event_eth_rx_adapter_stats stats;
74 /* Block count, counts up to BLOCK_CNT_THRESHOLD */
75 uint16_t enq_block_count;
77 uint64_t rx_enq_block_start_ts;
78 /* Configuration callback for rte_service configuration */
79 rte_event_eth_rx_adapter_conf_cb conf_cb;
80 /* Configuration callback argument */
82 /* Set if default_cb is being used */
84 /* Service initialization state */
85 uint8_t service_inited;
86 /* Total count of Rx queues in adapter */
88 /* Memory allocation name */
89 char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
90 /* Socket identifier cached from eventdev */
92 /* Per adapter EAL service */
94 } __rte_cache_aligned;
97 struct eth_device_info {
98 struct rte_eth_dev *dev;
99 struct eth_rx_queue_info *rx_queue;
100 /* Set if ethdev->eventdev packet transfer uses a
103 uint8_t internal_event_port;
104 /* Set if the adapter is processing rx queues for
105 * this eth device and packet processing has been
106 * started, allows for the code to know if the PMD
107 * rx_adapter_stop callback needs to be invoked
109 uint8_t dev_rx_started;
110 /* If nb_dev_queues > 0, the start callback will
111 * be invoked if not already invoked
113 uint16_t nb_dev_queues;
117 struct eth_rx_queue_info {
118 int queue_enabled; /* True if added */
119 uint16_t wt; /* Polling weight */
120 uint8_t event_queue_id; /* Event queue to enqueue packets to */
121 uint8_t sched_type; /* Sched type for events */
122 uint8_t priority; /* Event priority */
123 uint32_t flow_id; /* App provided flow identifier */
124 uint32_t flow_id_mask; /* Set to ~0 if app provides flow id else 0 */
127 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
132 return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
135 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
136 if (!valid_id(id)) { \
137 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
143 sw_rx_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
145 return rx_adapter->num_rx_polled;
148 /* Greatest common divisor */
149 static uint16_t gcd_u16(uint16_t a, uint16_t b)
153 return r ? gcd_u16(b, r) : b;
156 /* Returns the next queue in the polling sequence
158 * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
161 wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
162 unsigned int n, int *cw,
163 struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
164 uint16_t gcd, int prev)
180 q = eth_rx_poll[i].eth_rx_qid;
181 d = eth_rx_poll[i].eth_dev_id;
182 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
189 /* Precalculate WRR polling sequence for all queues in rx_adapter */
191 eth_poll_wrr_calc(struct rte_event_eth_rx_adapter *rx_adapter)
197 /* Initialize variables for calculation of wrr schedule */
198 uint16_t max_wrr_pos = 0;
199 unsigned int poll_q = 0;
203 struct eth_rx_poll_entry *rx_poll = NULL;
204 uint32_t *rx_wrr = NULL;
206 if (rx_adapter->num_rx_polled) {
207 size_t len = RTE_ALIGN(rx_adapter->num_rx_polled *
208 sizeof(*rx_adapter->eth_rx_poll),
209 RTE_CACHE_LINE_SIZE);
210 rx_poll = rte_zmalloc_socket(rx_adapter->mem_name,
213 rx_adapter->socket_id);
217 /* Generate array of all queues to poll, the size of this
220 RTE_ETH_FOREACH_DEV(d) {
221 uint16_t nb_rx_queues;
222 struct eth_device_info *dev_info =
223 &rx_adapter->eth_devices[d];
224 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
225 if (dev_info->rx_queue == NULL)
227 for (q = 0; q < nb_rx_queues; q++) {
228 struct eth_rx_queue_info *queue_info =
229 &dev_info->rx_queue[q];
230 if (queue_info->queue_enabled == 0)
233 uint16_t wt = queue_info->wt;
234 rx_poll[poll_q].eth_dev_id = d;
235 rx_poll[poll_q].eth_rx_qid = q;
237 max_wt = RTE_MAX(max_wt, wt);
238 gcd = (gcd) ? gcd_u16(gcd, wt) : wt;
243 len = RTE_ALIGN(max_wrr_pos * sizeof(*rx_wrr),
244 RTE_CACHE_LINE_SIZE);
245 rx_wrr = rte_zmalloc_socket(rx_adapter->mem_name,
248 rx_adapter->socket_id);
249 if (rx_wrr == NULL) {
254 /* Generate polling sequence based on weights */
257 for (i = 0; i < max_wrr_pos; i++) {
258 rx_wrr[i] = wrr_next(rx_adapter, poll_q, &cw,
259 rx_poll, max_wt, gcd, prev);
264 rte_free(rx_adapter->eth_rx_poll);
265 rte_free(rx_adapter->wrr_sched);
267 rx_adapter->eth_rx_poll = rx_poll;
268 rx_adapter->wrr_sched = rx_wrr;
269 rx_adapter->wrr_len = max_wrr_pos;
275 mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr,
276 struct ipv6_hdr **ipv6_hdr)
278 struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(m, struct ether_hdr *);
279 struct vlan_hdr *vlan_hdr;
284 switch (eth_hdr->ether_type) {
285 case RTE_BE16(ETHER_TYPE_IPv4):
286 *ipv4_hdr = (struct ipv4_hdr *)(eth_hdr + 1);
289 case RTE_BE16(ETHER_TYPE_IPv6):
290 *ipv6_hdr = (struct ipv6_hdr *)(eth_hdr + 1);
293 case RTE_BE16(ETHER_TYPE_VLAN):
294 vlan_hdr = (struct vlan_hdr *)(eth_hdr + 1);
295 switch (vlan_hdr->eth_proto) {
296 case RTE_BE16(ETHER_TYPE_IPv4):
297 *ipv4_hdr = (struct ipv4_hdr *)(vlan_hdr + 1);
299 case RTE_BE16(ETHER_TYPE_IPv6):
300 *ipv6_hdr = (struct ipv6_hdr *)(vlan_hdr + 1);
312 /* Calculate RSS hash for IPv4/6 */
313 static inline uint32_t
314 do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
318 struct rte_ipv4_tuple ipv4_tuple;
319 struct rte_ipv6_tuple ipv6_tuple;
320 struct ipv4_hdr *ipv4_hdr;
321 struct ipv6_hdr *ipv6_hdr;
323 mtoip(m, &ipv4_hdr, &ipv6_hdr);
326 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
327 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
329 input_len = RTE_THASH_V4_L3_LEN;
330 } else if (ipv6_hdr) {
331 rte_thash_load_v6_addrs(ipv6_hdr,
332 (union rte_thash_tuple *)&ipv6_tuple);
334 input_len = RTE_THASH_V6_L3_LEN;
338 return rte_softrss_be(tuple, input_len, rss_key_be);
342 rx_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
344 return !!rx_adapter->enq_block_count;
348 rx_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
350 if (rx_adapter->rx_enq_block_start_ts)
353 rx_adapter->enq_block_count++;
354 if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
357 rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
361 rx_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
362 struct rte_event_eth_rx_adapter_stats *stats)
364 if (unlikely(!stats->rx_enq_start_ts))
365 stats->rx_enq_start_ts = rte_get_tsc_cycles();
367 if (likely(!rx_enq_blocked(rx_adapter)))
370 rx_adapter->enq_block_count = 0;
371 if (rx_adapter->rx_enq_block_start_ts) {
372 stats->rx_enq_end_ts = rte_get_tsc_cycles();
373 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
374 rx_adapter->rx_enq_block_start_ts;
375 rx_adapter->rx_enq_block_start_ts = 0;
379 /* Add event to buffer, free space check is done prior to calling
383 buf_event_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
384 struct rte_event *ev)
386 struct rte_eth_event_enqueue_buffer *buf =
387 &rx_adapter->event_enqueue_buffer;
388 rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event));
391 /* Enqueue buffered events to event device */
392 static inline uint16_t
393 flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
395 struct rte_eth_event_enqueue_buffer *buf =
396 &rx_adapter->event_enqueue_buffer;
397 struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
399 uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
400 rx_adapter->event_port_id,
403 if (n != buf->count) {
406 (buf->count - n) * sizeof(struct rte_event));
407 stats->rx_enq_retry++;
410 n ? rx_enq_block_end_ts(rx_adapter, stats) :
411 rx_enq_block_start_ts(rx_adapter);
414 stats->rx_enq_count += n;
420 fill_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter,
422 uint16_t rx_queue_id,
423 struct rte_mbuf **mbufs,
427 struct eth_device_info *eth_device_info =
428 &rx_adapter->eth_devices[eth_dev_id];
429 struct eth_rx_queue_info *eth_rx_queue_info =
430 ð_device_info->rx_queue[rx_queue_id];
432 int32_t qid = eth_rx_queue_info->event_queue_id;
433 uint8_t sched_type = eth_rx_queue_info->sched_type;
434 uint8_t priority = eth_rx_queue_info->priority;
436 struct rte_event events[BATCH_SIZE];
437 struct rte_mbuf *m = mbufs[0];
443 /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
444 rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
445 do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
447 if ((m->ol_flags & PKT_RX_TIMESTAMP) == 0) {
448 ts = rte_get_tsc_cycles();
449 for (i = 0; i < num; i++) {
453 m->ol_flags |= PKT_RX_TIMESTAMP;
457 for (i = 0; i < num; i++) {
459 struct rte_event *ev = &events[i];
462 do_softrss(m, rx_adapter->rss_key_be) : m->hash.rss;
464 eth_rx_queue_info->flow_id &
465 eth_rx_queue_info->flow_id_mask;
466 flow_id |= rss & ~eth_rx_queue_info->flow_id_mask;
467 ev->flow_id = flow_id;
468 ev->op = RTE_EVENT_OP_NEW;
469 ev->sched_type = sched_type;
471 ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
472 ev->sub_event_type = 0;
473 ev->priority = priority;
476 buf_event_enqueue(rx_adapter, ev);
481 * Polls receive queues added to the event adapter and enqueues received
482 * packets to the event device.
484 * The receive code enqueues initially to a temporary buffer, the
485 * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
487 * If there isn't space available in the temporary buffer, packets from the
488 * Rx queue aren't dequeued from the eth device, this back pressures the
489 * eth device, in virtual device environments this back pressure is relayed to
490 * the hypervisor's switching layer where adjustments can be made to deal with
493 static inline uint32_t
494 eth_rx_poll(struct rte_event_eth_rx_adapter *rx_adapter)
499 struct rte_mbuf *mbufs[BATCH_SIZE];
500 struct rte_eth_event_enqueue_buffer *buf;
504 wrr_pos = rx_adapter->wrr_pos;
505 max_nb_rx = rx_adapter->max_nb_rx;
506 buf = &rx_adapter->event_enqueue_buffer;
507 struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
509 /* Iterate through a WRR sequence */
510 for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
511 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
512 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
513 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
515 /* Don't do a batch dequeue from the rx queue if there isn't
516 * enough space in the enqueue buffer.
518 if (buf->count >= BATCH_SIZE)
519 flush_event_buffer(rx_adapter);
520 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
521 rx_adapter->wrr_pos = wrr_pos;
525 stats->rx_poll_count++;
526 n = rte_eth_rx_burst(d, qid, mbufs, BATCH_SIZE);
529 stats->rx_packets += n;
530 /* The check before rte_eth_rx_burst() ensures that
531 * all n mbufs can be buffered
533 fill_event_buffer(rx_adapter, d, qid, mbufs, n);
535 if (nb_rx > max_nb_rx) {
536 rx_adapter->wrr_pos =
537 (wrr_pos + 1) % rx_adapter->wrr_len;
542 if (++wrr_pos == rx_adapter->wrr_len)
550 event_eth_rx_adapter_service_func(void *args)
552 struct rte_event_eth_rx_adapter *rx_adapter = args;
553 struct rte_eth_event_enqueue_buffer *buf;
555 buf = &rx_adapter->event_enqueue_buffer;
556 if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
558 if (eth_rx_poll(rx_adapter) == 0 && buf->count)
559 flush_event_buffer(rx_adapter);
560 rte_spinlock_unlock(&rx_adapter->rx_lock);
565 rte_event_eth_rx_adapter_init(void)
567 const char *name = "rte_event_eth_rx_adapter_array";
568 const struct rte_memzone *mz;
571 sz = sizeof(*event_eth_rx_adapter) *
572 RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
573 sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
575 mz = rte_memzone_lookup(name);
577 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
578 RTE_CACHE_LINE_SIZE);
580 RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
586 event_eth_rx_adapter = mz->addr;
590 static inline struct rte_event_eth_rx_adapter *
591 id_to_rx_adapter(uint8_t id)
593 return event_eth_rx_adapter ?
594 event_eth_rx_adapter[id] : NULL;
598 default_conf_cb(uint8_t id, uint8_t dev_id,
599 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
602 struct rte_eventdev *dev;
603 struct rte_event_dev_config dev_conf;
606 struct rte_event_port_conf *port_conf = arg;
607 struct rte_event_eth_rx_adapter *rx_adapter = id_to_rx_adapter(id);
609 dev = &rte_eventdevs[rx_adapter->eventdev_id];
610 dev_conf = dev->data->dev_conf;
612 started = dev->data->dev_started;
614 rte_event_dev_stop(dev_id);
615 port_id = dev_conf.nb_event_ports;
616 dev_conf.nb_event_ports += 1;
617 ret = rte_event_dev_configure(dev_id, &dev_conf);
619 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
622 if (rte_event_dev_start(dev_id))
628 ret = rte_event_port_setup(dev_id, port_id, port_conf);
630 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
635 conf->event_port_id = port_id;
636 conf->max_nb_rx = 128;
638 ret = rte_event_dev_start(dev_id);
639 rx_adapter->default_cb_arg = 1;
644 init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
647 struct rte_service_spec service;
648 struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
650 if (rx_adapter->service_inited)
653 memset(&service, 0, sizeof(service));
654 snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
655 "rte_event_eth_rx_adapter_%d", id);
656 service.socket_id = rx_adapter->socket_id;
657 service.callback = event_eth_rx_adapter_service_func;
658 service.callback_userdata = rx_adapter;
659 /* Service function handles locking for queue add/del updates */
660 service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
661 ret = rte_service_component_register(&service, &rx_adapter->service_id);
663 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
668 ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
669 &rx_adapter_conf, rx_adapter->conf_arg);
671 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
675 rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
676 rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
677 rx_adapter->service_inited = 1;
681 rte_service_component_unregister(rx_adapter->service_id);
687 update_queue_info(struct rte_event_eth_rx_adapter *rx_adapter,
688 struct eth_device_info *dev_info,
692 struct eth_rx_queue_info *queue_info;
696 if (dev_info->rx_queue == NULL)
699 if (rx_queue_id == -1) {
700 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
701 update_queue_info(rx_adapter, dev_info, i, add);
703 queue_info = &dev_info->rx_queue[rx_queue_id];
704 enabled = queue_info->queue_enabled;
706 rx_adapter->nb_queues += !enabled;
707 dev_info->nb_dev_queues += !enabled;
709 rx_adapter->nb_queues -= enabled;
710 dev_info->nb_dev_queues -= enabled;
712 queue_info->queue_enabled = !!add;
717 event_eth_rx_adapter_queue_del(struct rte_event_eth_rx_adapter *rx_adapter,
718 struct eth_device_info *dev_info,
719 uint16_t rx_queue_id)
721 struct eth_rx_queue_info *queue_info;
723 if (rx_adapter->nb_queues == 0)
726 queue_info = &dev_info->rx_queue[rx_queue_id];
727 rx_adapter->num_rx_polled -= queue_info->queue_enabled;
728 update_queue_info(rx_adapter, dev_info, rx_queue_id, 0);
733 event_eth_rx_adapter_queue_add(struct rte_event_eth_rx_adapter *rx_adapter,
734 struct eth_device_info *dev_info,
735 uint16_t rx_queue_id,
736 const struct rte_event_eth_rx_adapter_queue_conf *conf)
739 struct eth_rx_queue_info *queue_info;
740 const struct rte_event *ev = &conf->ev;
742 queue_info = &dev_info->rx_queue[rx_queue_id];
743 queue_info->event_queue_id = ev->queue_id;
744 queue_info->sched_type = ev->sched_type;
745 queue_info->priority = ev->priority;
746 queue_info->wt = conf->servicing_weight;
748 if (conf->rx_queue_flags &
749 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
750 queue_info->flow_id = ev->flow_id;
751 queue_info->flow_id_mask = ~0;
754 /* The same queue can be added more than once */
755 rx_adapter->num_rx_polled += !queue_info->queue_enabled;
756 update_queue_info(rx_adapter, dev_info, rx_queue_id, 1);
759 static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter,
762 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
764 struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
765 struct rte_event_eth_rx_adapter_queue_conf temp_conf;
769 if (queue_conf->servicing_weight == 0) {
771 struct rte_eth_dev_data *data = dev_info->dev->data;
772 if (data->dev_conf.intr_conf.rxq) {
773 RTE_EDEV_LOG_ERR("Interrupt driven queues"
777 temp_conf = *queue_conf;
779 /* If Rx interrupts are disabled set wt = 1 */
780 temp_conf.servicing_weight = 1;
781 queue_conf = &temp_conf;
784 if (dev_info->rx_queue == NULL) {
786 rte_zmalloc_socket(rx_adapter->mem_name,
787 dev_info->dev->data->nb_rx_queues *
788 sizeof(struct eth_rx_queue_info), 0,
789 rx_adapter->socket_id);
790 if (dev_info->rx_queue == NULL)
794 if (rx_queue_id == -1) {
795 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
796 event_eth_rx_adapter_queue_add(rx_adapter,
800 event_eth_rx_adapter_queue_add(rx_adapter, dev_info,
801 (uint16_t)rx_queue_id,
805 ret = eth_poll_wrr_calc(rx_adapter);
807 event_eth_rx_adapter_queue_del(rx_adapter,
808 dev_info, rx_queue_id);
816 rx_adapter_ctrl(uint8_t id, int start)
818 struct rte_event_eth_rx_adapter *rx_adapter;
819 struct rte_eventdev *dev;
820 struct eth_device_info *dev_info;
825 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
826 rx_adapter = id_to_rx_adapter(id);
827 if (rx_adapter == NULL)
830 dev = &rte_eventdevs[rx_adapter->eventdev_id];
832 RTE_ETH_FOREACH_DEV(i) {
833 dev_info = &rx_adapter->eth_devices[i];
834 /* if start check for num dev queues */
835 if (start && !dev_info->nb_dev_queues)
837 /* if stop check if dev has been started */
838 if (stop && !dev_info->dev_rx_started)
840 use_service |= !dev_info->internal_event_port;
841 dev_info->dev_rx_started = start;
842 if (dev_info->internal_event_port == 0)
844 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
845 &rte_eth_devices[i]) :
846 (*dev->dev_ops->eth_rx_adapter_stop)(dev,
847 &rte_eth_devices[i]);
851 rte_service_runstate_set(rx_adapter->service_id, start);
857 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
858 rte_event_eth_rx_adapter_conf_cb conf_cb,
861 struct rte_event_eth_rx_adapter *rx_adapter;
865 char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
866 const uint8_t default_rss_key[] = {
867 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
868 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
869 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
870 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
871 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
874 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
875 RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
879 if (event_eth_rx_adapter == NULL) {
880 ret = rte_event_eth_rx_adapter_init();
885 rx_adapter = id_to_rx_adapter(id);
886 if (rx_adapter != NULL) {
887 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
891 socket_id = rte_event_dev_socket_id(dev_id);
892 snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
893 "rte_event_eth_rx_adapter_%d",
896 rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
897 RTE_CACHE_LINE_SIZE, socket_id);
898 if (rx_adapter == NULL) {
899 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
903 rx_adapter->eventdev_id = dev_id;
904 rx_adapter->socket_id = socket_id;
905 rx_adapter->conf_cb = conf_cb;
906 rx_adapter->conf_arg = conf_arg;
907 strcpy(rx_adapter->mem_name, mem_name);
908 rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
909 /* FIXME: incompatible with hotplug */
910 rte_eth_dev_count_total() *
911 sizeof(struct eth_device_info), 0,
913 rte_convert_rss_key((const uint32_t *)default_rss_key,
914 (uint32_t *)rx_adapter->rss_key_be,
915 RTE_DIM(default_rss_key));
917 if (rx_adapter->eth_devices == NULL) {
918 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
919 rte_free(rx_adapter);
922 rte_spinlock_init(&rx_adapter->rx_lock);
923 RTE_ETH_FOREACH_DEV(i)
924 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
926 event_eth_rx_adapter[id] = rx_adapter;
927 if (conf_cb == default_conf_cb)
928 rx_adapter->default_cb_arg = 1;
933 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
934 struct rte_event_port_conf *port_config)
936 struct rte_event_port_conf *pc;
939 if (port_config == NULL)
941 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
943 pc = rte_malloc(NULL, sizeof(*pc), 0);
947 ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
956 rte_event_eth_rx_adapter_free(uint8_t id)
958 struct rte_event_eth_rx_adapter *rx_adapter;
960 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
962 rx_adapter = id_to_rx_adapter(id);
963 if (rx_adapter == NULL)
966 if (rx_adapter->nb_queues) {
967 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
968 rx_adapter->nb_queues);
972 if (rx_adapter->default_cb_arg)
973 rte_free(rx_adapter->conf_arg);
974 rte_free(rx_adapter->eth_devices);
975 rte_free(rx_adapter);
976 event_eth_rx_adapter[id] = NULL;
982 rte_event_eth_rx_adapter_queue_add(uint8_t id,
985 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
989 struct rte_event_eth_rx_adapter *rx_adapter;
990 struct rte_eventdev *dev;
991 struct eth_device_info *dev_info;
994 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
995 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
997 rx_adapter = id_to_rx_adapter(id);
998 if ((rx_adapter == NULL) || (queue_conf == NULL))
1001 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1002 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
1006 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
1007 "eth port %" PRIu16, id, eth_dev_id);
1011 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
1012 && (queue_conf->rx_queue_flags &
1013 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
1014 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
1015 " eth port: %" PRIu16 " adapter id: %" PRIu8,
1020 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
1021 (rx_queue_id != -1)) {
1022 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
1023 "event queue, eth port: %" PRIu16 " adapter id: %"
1024 PRIu8, eth_dev_id, id);
1028 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1029 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1030 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1031 (uint16_t)rx_queue_id);
1036 dev_info = &rx_adapter->eth_devices[eth_dev_id];
1038 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1039 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
1041 if (dev_info->rx_queue == NULL) {
1042 dev_info->rx_queue =
1043 rte_zmalloc_socket(rx_adapter->mem_name,
1044 dev_info->dev->data->nb_rx_queues *
1045 sizeof(struct eth_rx_queue_info), 0,
1046 rx_adapter->socket_id);
1047 if (dev_info->rx_queue == NULL)
1051 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
1052 &rte_eth_devices[eth_dev_id],
1053 rx_queue_id, queue_conf);
1055 update_queue_info(rx_adapter,
1056 &rx_adapter->eth_devices[eth_dev_id],
1061 rte_spinlock_lock(&rx_adapter->rx_lock);
1062 ret = init_service(rx_adapter, id);
1064 ret = add_rx_queue(rx_adapter, eth_dev_id, rx_queue_id,
1066 rte_spinlock_unlock(&rx_adapter->rx_lock);
1068 start_service = !!sw_rx_adapter_queue_count(rx_adapter);
1075 rte_service_component_runstate_set(rx_adapter->service_id, 1);
1081 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
1082 int32_t rx_queue_id)
1085 struct rte_eventdev *dev;
1086 struct rte_event_eth_rx_adapter *rx_adapter;
1087 struct eth_device_info *dev_info;
1091 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1092 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1094 rx_adapter = id_to_rx_adapter(id);
1095 if (rx_adapter == NULL)
1098 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1099 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
1105 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1106 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1107 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1108 (uint16_t)rx_queue_id);
1112 dev_info = &rx_adapter->eth_devices[eth_dev_id];
1114 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1115 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
1117 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
1118 &rte_eth_devices[eth_dev_id],
1121 update_queue_info(rx_adapter,
1122 &rx_adapter->eth_devices[eth_dev_id],
1125 if (dev_info->nb_dev_queues == 0) {
1126 rte_free(dev_info->rx_queue);
1127 dev_info->rx_queue = NULL;
1132 rte_spinlock_lock(&rx_adapter->rx_lock);
1133 if (rx_queue_id == -1) {
1134 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1135 event_eth_rx_adapter_queue_del(rx_adapter,
1139 event_eth_rx_adapter_queue_del(rx_adapter,
1141 (uint16_t)rx_queue_id);
1144 rc = eth_poll_wrr_calc(rx_adapter);
1146 RTE_EDEV_LOG_ERR("WRR recalculation failed %" PRId32,
1149 if (dev_info->nb_dev_queues == 0) {
1150 rte_free(dev_info->rx_queue);
1151 dev_info->rx_queue = NULL;
1154 rte_spinlock_unlock(&rx_adapter->rx_lock);
1155 rte_service_component_runstate_set(rx_adapter->service_id,
1156 sw_rx_adapter_queue_count(rx_adapter));
1164 rte_event_eth_rx_adapter_start(uint8_t id)
1166 return rx_adapter_ctrl(id, 1);
1170 rte_event_eth_rx_adapter_stop(uint8_t id)
1172 return rx_adapter_ctrl(id, 0);
1176 rte_event_eth_rx_adapter_stats_get(uint8_t id,
1177 struct rte_event_eth_rx_adapter_stats *stats)
1179 struct rte_event_eth_rx_adapter *rx_adapter;
1180 struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
1181 struct rte_event_eth_rx_adapter_stats dev_stats;
1182 struct rte_eventdev *dev;
1183 struct eth_device_info *dev_info;
1187 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1189 rx_adapter = id_to_rx_adapter(id);
1190 if (rx_adapter == NULL || stats == NULL)
1193 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1194 memset(stats, 0, sizeof(*stats));
1195 RTE_ETH_FOREACH_DEV(i) {
1196 dev_info = &rx_adapter->eth_devices[i];
1197 if (dev_info->internal_event_port == 0 ||
1198 dev->dev_ops->eth_rx_adapter_stats_get == NULL)
1200 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
1201 &rte_eth_devices[i],
1205 dev_stats_sum.rx_packets += dev_stats.rx_packets;
1206 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
1209 if (rx_adapter->service_inited)
1210 *stats = rx_adapter->stats;
1212 stats->rx_packets += dev_stats_sum.rx_packets;
1213 stats->rx_enq_count += dev_stats_sum.rx_enq_count;
1218 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
1220 struct rte_event_eth_rx_adapter *rx_adapter;
1221 struct rte_eventdev *dev;
1222 struct eth_device_info *dev_info;
1225 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1227 rx_adapter = id_to_rx_adapter(id);
1228 if (rx_adapter == NULL)
1231 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1232 RTE_ETH_FOREACH_DEV(i) {
1233 dev_info = &rx_adapter->eth_devices[i];
1234 if (dev_info->internal_event_port == 0 ||
1235 dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
1237 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
1238 &rte_eth_devices[i]);
1241 memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
1246 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
1248 struct rte_event_eth_rx_adapter *rx_adapter;
1250 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1252 rx_adapter = id_to_rx_adapter(id);
1253 if (rx_adapter == NULL || service_id == NULL)
1256 if (rx_adapter->service_inited)
1257 *service_id = rx_adapter->service_id;
1259 return rx_adapter->service_inited ? 0 : -ESRCH;