1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2017 Intel Corporation.
5 #include <rte_cycles.h>
6 #include <rte_common.h>
9 #include <rte_ethdev.h>
11 #include <rte_malloc.h>
12 #include <rte_service_component.h>
13 #include <rte_thash.h>
15 #include "rte_eventdev.h"
16 #include "rte_eventdev_pmd.h"
17 #include "rte_event_eth_rx_adapter.h"
20 #define BLOCK_CNT_THRESHOLD 10
21 #define ETH_EVENT_BUFFER_SIZE (4*BATCH_SIZE)
23 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
24 #define ETH_RX_ADAPTER_MEM_NAME_LEN 32
26 #define RSS_KEY_SIZE 40
29 * There is an instance of this struct per polled Rx queue added to the
32 struct eth_rx_poll_entry {
33 /* Eth port to poll */
35 /* Eth rx queue to poll */
39 /* Instance per adapter */
40 struct rte_eth_event_enqueue_buffer {
41 /* Count of events in this buffer */
43 /* Array of events in this buffer */
44 struct rte_event events[ETH_EVENT_BUFFER_SIZE];
47 struct rte_event_eth_rx_adapter {
49 uint8_t rss_key_be[RSS_KEY_SIZE];
50 /* Event device identifier */
52 /* Per ethernet device structure */
53 struct eth_device_info *eth_devices;
54 /* Event port identifier */
55 uint8_t event_port_id;
56 /* Lock to serialize config updates with service function */
57 rte_spinlock_t rx_lock;
58 /* Max mbufs processed in any service function invocation */
60 /* Receive queues that need to be polled */
61 struct eth_rx_poll_entry *eth_rx_poll;
62 /* Size of the eth_rx_poll array */
63 uint16_t num_rx_polled;
64 /* Weighted round robin schedule */
66 /* wrr_sched[] size */
68 /* Next entry in wrr[] to begin polling */
70 /* Event burst buffer */
71 struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
72 /* Per adapter stats */
73 struct rte_event_eth_rx_adapter_stats stats;
74 /* Block count, counts up to BLOCK_CNT_THRESHOLD */
75 uint16_t enq_block_count;
77 uint64_t rx_enq_block_start_ts;
78 /* Configuration callback for rte_service configuration */
79 rte_event_eth_rx_adapter_conf_cb conf_cb;
80 /* Configuration callback argument */
82 /* Set if default_cb is being used */
84 /* Service initialization state */
85 uint8_t service_inited;
86 /* Total count of Rx queues in adapter */
88 /* Memory allocation name */
89 char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
90 /* Socket identifier cached from eventdev */
92 /* Per adapter EAL service */
94 /* Adapter started flag */
96 } __rte_cache_aligned;
99 struct eth_device_info {
100 struct rte_eth_dev *dev;
101 struct eth_rx_queue_info *rx_queue;
102 /* Set if ethdev->eventdev packet transfer uses a
105 uint8_t internal_event_port;
106 /* Set if the adapter is processing rx queues for
107 * this eth device and packet processing has been
108 * started, allows for the code to know if the PMD
109 * rx_adapter_stop callback needs to be invoked
111 uint8_t dev_rx_started;
112 /* If nb_dev_queues > 0, the start callback will
113 * be invoked if not already invoked
115 uint16_t nb_dev_queues;
119 struct eth_rx_queue_info {
120 int queue_enabled; /* True if added */
121 uint16_t wt; /* Polling weight */
122 uint8_t event_queue_id; /* Event queue to enqueue packets to */
123 uint8_t sched_type; /* Sched type for events */
124 uint8_t priority; /* Event priority */
125 uint32_t flow_id; /* App provided flow identifier */
126 uint32_t flow_id_mask; /* Set to ~0 if app provides flow id else 0 */
129 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
132 rxa_validate_id(uint8_t id)
134 return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
137 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
138 if (!rxa_validate_id(id)) { \
139 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
145 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
147 return rx_adapter->num_rx_polled;
150 /* Greatest common divisor */
151 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
155 return r ? rxa_gcd_u16(b, r) : b;
158 /* Returns the next queue in the polling sequence
160 * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
163 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
164 unsigned int n, int *cw,
165 struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
166 uint16_t gcd, int prev)
182 q = eth_rx_poll[i].eth_rx_qid;
183 d = eth_rx_poll[i].eth_dev_id;
184 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
191 /* Precalculate WRR polling sequence for all queues in rx_adapter */
193 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter)
199 /* Initialize variables for calculation of wrr schedule */
200 uint16_t max_wrr_pos = 0;
201 unsigned int poll_q = 0;
205 struct eth_rx_poll_entry *rx_poll = NULL;
206 uint32_t *rx_wrr = NULL;
208 if (rx_adapter->num_rx_polled) {
209 size_t len = RTE_ALIGN(rx_adapter->num_rx_polled *
210 sizeof(*rx_adapter->eth_rx_poll),
211 RTE_CACHE_LINE_SIZE);
212 rx_poll = rte_zmalloc_socket(rx_adapter->mem_name,
215 rx_adapter->socket_id);
219 /* Generate array of all queues to poll, the size of this
222 RTE_ETH_FOREACH_DEV(d) {
223 uint16_t nb_rx_queues;
224 struct eth_device_info *dev_info =
225 &rx_adapter->eth_devices[d];
226 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
227 if (dev_info->rx_queue == NULL)
229 if (dev_info->internal_event_port)
231 for (q = 0; q < nb_rx_queues; q++) {
232 struct eth_rx_queue_info *queue_info =
233 &dev_info->rx_queue[q];
234 if (queue_info->queue_enabled == 0)
237 uint16_t wt = queue_info->wt;
238 rx_poll[poll_q].eth_dev_id = d;
239 rx_poll[poll_q].eth_rx_qid = q;
241 max_wt = RTE_MAX(max_wt, wt);
242 gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
247 len = RTE_ALIGN(max_wrr_pos * sizeof(*rx_wrr),
248 RTE_CACHE_LINE_SIZE);
249 rx_wrr = rte_zmalloc_socket(rx_adapter->mem_name,
252 rx_adapter->socket_id);
253 if (rx_wrr == NULL) {
258 /* Generate polling sequence based on weights */
261 for (i = 0; i < max_wrr_pos; i++) {
262 rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
263 rx_poll, max_wt, gcd, prev);
268 rte_free(rx_adapter->eth_rx_poll);
269 rte_free(rx_adapter->wrr_sched);
271 rx_adapter->eth_rx_poll = rx_poll;
272 rx_adapter->wrr_sched = rx_wrr;
273 rx_adapter->wrr_len = max_wrr_pos;
279 rxa_mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr,
280 struct ipv6_hdr **ipv6_hdr)
282 struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(m, struct ether_hdr *);
283 struct vlan_hdr *vlan_hdr;
288 switch (eth_hdr->ether_type) {
289 case RTE_BE16(ETHER_TYPE_IPv4):
290 *ipv4_hdr = (struct ipv4_hdr *)(eth_hdr + 1);
293 case RTE_BE16(ETHER_TYPE_IPv6):
294 *ipv6_hdr = (struct ipv6_hdr *)(eth_hdr + 1);
297 case RTE_BE16(ETHER_TYPE_VLAN):
298 vlan_hdr = (struct vlan_hdr *)(eth_hdr + 1);
299 switch (vlan_hdr->eth_proto) {
300 case RTE_BE16(ETHER_TYPE_IPv4):
301 *ipv4_hdr = (struct ipv4_hdr *)(vlan_hdr + 1);
303 case RTE_BE16(ETHER_TYPE_IPv6):
304 *ipv6_hdr = (struct ipv6_hdr *)(vlan_hdr + 1);
316 /* Calculate RSS hash for IPv4/6 */
317 static inline uint32_t
318 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
322 struct rte_ipv4_tuple ipv4_tuple;
323 struct rte_ipv6_tuple ipv6_tuple;
324 struct ipv4_hdr *ipv4_hdr;
325 struct ipv6_hdr *ipv6_hdr;
327 rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
330 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
331 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
333 input_len = RTE_THASH_V4_L3_LEN;
334 } else if (ipv6_hdr) {
335 rte_thash_load_v6_addrs(ipv6_hdr,
336 (union rte_thash_tuple *)&ipv6_tuple);
338 input_len = RTE_THASH_V6_L3_LEN;
342 return rte_softrss_be(tuple, input_len, rss_key_be);
346 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
348 return !!rx_adapter->enq_block_count;
352 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
354 if (rx_adapter->rx_enq_block_start_ts)
357 rx_adapter->enq_block_count++;
358 if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
361 rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
365 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
366 struct rte_event_eth_rx_adapter_stats *stats)
368 if (unlikely(!stats->rx_enq_start_ts))
369 stats->rx_enq_start_ts = rte_get_tsc_cycles();
371 if (likely(!rxa_enq_blocked(rx_adapter)))
374 rx_adapter->enq_block_count = 0;
375 if (rx_adapter->rx_enq_block_start_ts) {
376 stats->rx_enq_end_ts = rte_get_tsc_cycles();
377 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
378 rx_adapter->rx_enq_block_start_ts;
379 rx_adapter->rx_enq_block_start_ts = 0;
383 /* Add event to buffer, free space check is done prior to calling
387 rxa_buffer_event(struct rte_event_eth_rx_adapter *rx_adapter,
388 struct rte_event *ev)
390 struct rte_eth_event_enqueue_buffer *buf =
391 &rx_adapter->event_enqueue_buffer;
392 rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event));
395 /* Enqueue buffered events to event device */
396 static inline uint16_t
397 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
399 struct rte_eth_event_enqueue_buffer *buf =
400 &rx_adapter->event_enqueue_buffer;
401 struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
403 uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
404 rx_adapter->event_port_id,
407 if (n != buf->count) {
410 (buf->count - n) * sizeof(struct rte_event));
411 stats->rx_enq_retry++;
414 n ? rxa_enq_block_end_ts(rx_adapter, stats) :
415 rxa_enq_block_start_ts(rx_adapter);
418 stats->rx_enq_count += n;
424 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
426 uint16_t rx_queue_id,
427 struct rte_mbuf **mbufs,
431 struct eth_device_info *eth_device_info =
432 &rx_adapter->eth_devices[eth_dev_id];
433 struct eth_rx_queue_info *eth_rx_queue_info =
434 ð_device_info->rx_queue[rx_queue_id];
436 int32_t qid = eth_rx_queue_info->event_queue_id;
437 uint8_t sched_type = eth_rx_queue_info->sched_type;
438 uint8_t priority = eth_rx_queue_info->priority;
440 struct rte_event events[BATCH_SIZE];
441 struct rte_mbuf *m = mbufs[0];
447 /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
448 rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
449 do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
451 if ((m->ol_flags & PKT_RX_TIMESTAMP) == 0) {
452 ts = rte_get_tsc_cycles();
453 for (i = 0; i < num; i++) {
457 m->ol_flags |= PKT_RX_TIMESTAMP;
461 for (i = 0; i < num; i++) {
463 struct rte_event *ev = &events[i];
466 rxa_do_softrss(m, rx_adapter->rss_key_be) :
469 eth_rx_queue_info->flow_id &
470 eth_rx_queue_info->flow_id_mask;
471 flow_id |= rss & ~eth_rx_queue_info->flow_id_mask;
472 ev->flow_id = flow_id;
473 ev->op = RTE_EVENT_OP_NEW;
474 ev->sched_type = sched_type;
476 ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
477 ev->sub_event_type = 0;
478 ev->priority = priority;
481 rxa_buffer_event(rx_adapter, ev);
486 * Polls receive queues added to the event adapter and enqueues received
487 * packets to the event device.
489 * The receive code enqueues initially to a temporary buffer, the
490 * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
492 * If there isn't space available in the temporary buffer, packets from the
493 * Rx queue aren't dequeued from the eth device, this back pressures the
494 * eth device, in virtual device environments this back pressure is relayed to
495 * the hypervisor's switching layer where adjustments can be made to deal with
499 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
504 struct rte_mbuf *mbufs[BATCH_SIZE];
505 struct rte_eth_event_enqueue_buffer *buf;
509 wrr_pos = rx_adapter->wrr_pos;
510 max_nb_rx = rx_adapter->max_nb_rx;
511 buf = &rx_adapter->event_enqueue_buffer;
512 struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
514 /* Iterate through a WRR sequence */
515 for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
516 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
517 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
518 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
520 /* Don't do a batch dequeue from the rx queue if there isn't
521 * enough space in the enqueue buffer.
523 if (buf->count >= BATCH_SIZE)
524 rxa_flush_event_buffer(rx_adapter);
525 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
526 rx_adapter->wrr_pos = wrr_pos;
530 stats->rx_poll_count++;
531 n = rte_eth_rx_burst(d, qid, mbufs, BATCH_SIZE);
534 stats->rx_packets += n;
535 /* The check before rte_eth_rx_burst() ensures that
536 * all n mbufs can be buffered
538 rxa_buffer_mbufs(rx_adapter, d, qid, mbufs, n);
540 if (nb_rx > max_nb_rx) {
541 rx_adapter->wrr_pos =
542 (wrr_pos + 1) % rx_adapter->wrr_len;
547 if (++wrr_pos == rx_adapter->wrr_len)
551 if (buf->count >= BATCH_SIZE)
552 rxa_flush_event_buffer(rx_adapter);
556 rxa_service_func(void *args)
558 struct rte_event_eth_rx_adapter *rx_adapter = args;
560 if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
562 if (!rx_adapter->rxa_started) {
564 rte_spinlock_unlock(&rx_adapter->rx_lock);
566 rxa_poll(rx_adapter);
567 rte_spinlock_unlock(&rx_adapter->rx_lock);
572 rte_event_eth_rx_adapter_init(void)
574 const char *name = "rte_event_eth_rx_adapter_array";
575 const struct rte_memzone *mz;
578 sz = sizeof(*event_eth_rx_adapter) *
579 RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
580 sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
582 mz = rte_memzone_lookup(name);
584 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
585 RTE_CACHE_LINE_SIZE);
587 RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
593 event_eth_rx_adapter = mz->addr;
597 static inline struct rte_event_eth_rx_adapter *
598 rxa_id_to_adapter(uint8_t id)
600 return event_eth_rx_adapter ?
601 event_eth_rx_adapter[id] : NULL;
605 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
606 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
609 struct rte_eventdev *dev;
610 struct rte_event_dev_config dev_conf;
613 struct rte_event_port_conf *port_conf = arg;
614 struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
616 dev = &rte_eventdevs[rx_adapter->eventdev_id];
617 dev_conf = dev->data->dev_conf;
619 started = dev->data->dev_started;
621 rte_event_dev_stop(dev_id);
622 port_id = dev_conf.nb_event_ports;
623 dev_conf.nb_event_ports += 1;
624 ret = rte_event_dev_configure(dev_id, &dev_conf);
626 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
629 if (rte_event_dev_start(dev_id))
635 ret = rte_event_port_setup(dev_id, port_id, port_conf);
637 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
642 conf->event_port_id = port_id;
643 conf->max_nb_rx = 128;
645 ret = rte_event_dev_start(dev_id);
646 rx_adapter->default_cb_arg = 1;
651 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
654 struct rte_service_spec service;
655 struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
657 if (rx_adapter->service_inited)
660 memset(&service, 0, sizeof(service));
661 snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
662 "rte_event_eth_rx_adapter_%d", id);
663 service.socket_id = rx_adapter->socket_id;
664 service.callback = rxa_service_func;
665 service.callback_userdata = rx_adapter;
666 /* Service function handles locking for queue add/del updates */
667 service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
668 ret = rte_service_component_register(&service, &rx_adapter->service_id);
670 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
675 ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
676 &rx_adapter_conf, rx_adapter->conf_arg);
678 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
682 rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
683 rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
684 rx_adapter->service_inited = 1;
688 rte_service_component_unregister(rx_adapter->service_id);
693 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
694 struct eth_device_info *dev_info,
698 struct eth_rx_queue_info *queue_info;
702 if (dev_info->rx_queue == NULL)
705 if (rx_queue_id == -1) {
706 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
707 rxa_update_queue(rx_adapter, dev_info, i, add);
709 queue_info = &dev_info->rx_queue[rx_queue_id];
710 enabled = queue_info->queue_enabled;
712 rx_adapter->nb_queues += !enabled;
713 dev_info->nb_dev_queues += !enabled;
715 rx_adapter->nb_queues -= enabled;
716 dev_info->nb_dev_queues -= enabled;
718 queue_info->queue_enabled = !!add;
723 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
724 struct eth_device_info *dev_info,
725 uint16_t rx_queue_id)
727 struct eth_rx_queue_info *queue_info;
729 if (rx_adapter->nb_queues == 0)
732 queue_info = &dev_info->rx_queue[rx_queue_id];
733 rx_adapter->num_rx_polled -= queue_info->queue_enabled;
734 rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
739 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
740 struct eth_device_info *dev_info,
741 uint16_t rx_queue_id,
742 const struct rte_event_eth_rx_adapter_queue_conf *conf)
745 struct eth_rx_queue_info *queue_info;
746 const struct rte_event *ev = &conf->ev;
748 queue_info = &dev_info->rx_queue[rx_queue_id];
749 queue_info->event_queue_id = ev->queue_id;
750 queue_info->sched_type = ev->sched_type;
751 queue_info->priority = ev->priority;
752 queue_info->wt = conf->servicing_weight;
754 if (conf->rx_queue_flags &
755 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
756 queue_info->flow_id = ev->flow_id;
757 queue_info->flow_id_mask = ~0;
760 /* The same queue can be added more than once */
761 rx_adapter->num_rx_polled += !queue_info->queue_enabled;
762 rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
765 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
768 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
770 struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
771 struct rte_event_eth_rx_adapter_queue_conf temp_conf;
775 if (queue_conf->servicing_weight == 0) {
777 struct rte_eth_dev_data *data = dev_info->dev->data;
778 if (data->dev_conf.intr_conf.rxq) {
779 RTE_EDEV_LOG_ERR("Interrupt driven queues"
783 temp_conf = *queue_conf;
785 /* If Rx interrupts are disabled set wt = 1 */
786 temp_conf.servicing_weight = 1;
787 queue_conf = &temp_conf;
790 if (dev_info->rx_queue == NULL) {
792 rte_zmalloc_socket(rx_adapter->mem_name,
793 dev_info->dev->data->nb_rx_queues *
794 sizeof(struct eth_rx_queue_info), 0,
795 rx_adapter->socket_id);
796 if (dev_info->rx_queue == NULL)
800 if (rx_queue_id == -1) {
801 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
802 rxa_add_queue(rx_adapter, dev_info, i, queue_conf);
804 rxa_add_queue(rx_adapter, dev_info, (uint16_t)rx_queue_id,
808 ret = rxa_calc_wrr_sequence(rx_adapter);
810 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
818 rxa_ctrl(uint8_t id, int start)
820 struct rte_event_eth_rx_adapter *rx_adapter;
821 struct rte_eventdev *dev;
822 struct eth_device_info *dev_info;
827 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
828 rx_adapter = rxa_id_to_adapter(id);
829 if (rx_adapter == NULL)
832 dev = &rte_eventdevs[rx_adapter->eventdev_id];
834 RTE_ETH_FOREACH_DEV(i) {
835 dev_info = &rx_adapter->eth_devices[i];
836 /* if start check for num dev queues */
837 if (start && !dev_info->nb_dev_queues)
839 /* if stop check if dev has been started */
840 if (stop && !dev_info->dev_rx_started)
842 use_service |= !dev_info->internal_event_port;
843 dev_info->dev_rx_started = start;
844 if (dev_info->internal_event_port == 0)
846 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
847 &rte_eth_devices[i]) :
848 (*dev->dev_ops->eth_rx_adapter_stop)(dev,
849 &rte_eth_devices[i]);
853 rte_spinlock_lock(&rx_adapter->rx_lock);
854 rx_adapter->rxa_started = start;
855 rte_service_runstate_set(rx_adapter->service_id, start);
856 rte_spinlock_unlock(&rx_adapter->rx_lock);
863 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
864 rte_event_eth_rx_adapter_conf_cb conf_cb,
867 struct rte_event_eth_rx_adapter *rx_adapter;
871 char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
872 const uint8_t default_rss_key[] = {
873 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
874 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
875 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
876 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
877 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
880 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
881 RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
885 if (event_eth_rx_adapter == NULL) {
886 ret = rte_event_eth_rx_adapter_init();
891 rx_adapter = rxa_id_to_adapter(id);
892 if (rx_adapter != NULL) {
893 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
897 socket_id = rte_event_dev_socket_id(dev_id);
898 snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
899 "rte_event_eth_rx_adapter_%d",
902 rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
903 RTE_CACHE_LINE_SIZE, socket_id);
904 if (rx_adapter == NULL) {
905 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
909 rx_adapter->eventdev_id = dev_id;
910 rx_adapter->socket_id = socket_id;
911 rx_adapter->conf_cb = conf_cb;
912 rx_adapter->conf_arg = conf_arg;
913 strcpy(rx_adapter->mem_name, mem_name);
914 rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
915 /* FIXME: incompatible with hotplug */
916 rte_eth_dev_count_total() *
917 sizeof(struct eth_device_info), 0,
919 rte_convert_rss_key((const uint32_t *)default_rss_key,
920 (uint32_t *)rx_adapter->rss_key_be,
921 RTE_DIM(default_rss_key));
923 if (rx_adapter->eth_devices == NULL) {
924 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
925 rte_free(rx_adapter);
928 rte_spinlock_init(&rx_adapter->rx_lock);
929 RTE_ETH_FOREACH_DEV(i)
930 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
932 event_eth_rx_adapter[id] = rx_adapter;
933 if (conf_cb == rxa_default_conf_cb)
934 rx_adapter->default_cb_arg = 1;
939 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
940 struct rte_event_port_conf *port_config)
942 struct rte_event_port_conf *pc;
945 if (port_config == NULL)
947 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
949 pc = rte_malloc(NULL, sizeof(*pc), 0);
953 ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
962 rte_event_eth_rx_adapter_free(uint8_t id)
964 struct rte_event_eth_rx_adapter *rx_adapter;
966 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
968 rx_adapter = rxa_id_to_adapter(id);
969 if (rx_adapter == NULL)
972 if (rx_adapter->nb_queues) {
973 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
974 rx_adapter->nb_queues);
978 if (rx_adapter->default_cb_arg)
979 rte_free(rx_adapter->conf_arg);
980 rte_free(rx_adapter->eth_devices);
981 rte_free(rx_adapter);
982 event_eth_rx_adapter[id] = NULL;
988 rte_event_eth_rx_adapter_queue_add(uint8_t id,
991 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
995 struct rte_event_eth_rx_adapter *rx_adapter;
996 struct rte_eventdev *dev;
997 struct eth_device_info *dev_info;
1000 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1001 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1003 rx_adapter = rxa_id_to_adapter(id);
1004 if ((rx_adapter == NULL) || (queue_conf == NULL))
1007 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1008 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
1012 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
1013 "eth port %" PRIu16, id, eth_dev_id);
1017 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
1018 && (queue_conf->rx_queue_flags &
1019 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
1020 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
1021 " eth port: %" PRIu16 " adapter id: %" PRIu8,
1026 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
1027 (rx_queue_id != -1)) {
1028 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
1029 "event queue, eth port: %" PRIu16 " adapter id: %"
1030 PRIu8, eth_dev_id, id);
1034 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1035 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1036 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1037 (uint16_t)rx_queue_id);
1042 dev_info = &rx_adapter->eth_devices[eth_dev_id];
1044 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1045 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
1047 if (dev_info->rx_queue == NULL) {
1048 dev_info->rx_queue =
1049 rte_zmalloc_socket(rx_adapter->mem_name,
1050 dev_info->dev->data->nb_rx_queues *
1051 sizeof(struct eth_rx_queue_info), 0,
1052 rx_adapter->socket_id);
1053 if (dev_info->rx_queue == NULL)
1057 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
1058 &rte_eth_devices[eth_dev_id],
1059 rx_queue_id, queue_conf);
1061 dev_info->internal_event_port = 1;
1062 rxa_update_queue(rx_adapter,
1063 &rx_adapter->eth_devices[eth_dev_id],
1068 rte_spinlock_lock(&rx_adapter->rx_lock);
1069 dev_info->internal_event_port = 0;
1070 ret = rxa_init_service(rx_adapter, id);
1072 ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
1074 rte_spinlock_unlock(&rx_adapter->rx_lock);
1077 !!rxa_sw_adapter_queue_count(rx_adapter);
1084 rte_service_component_runstate_set(rx_adapter->service_id, 1);
1090 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
1091 int32_t rx_queue_id)
1094 struct rte_eventdev *dev;
1095 struct rte_event_eth_rx_adapter *rx_adapter;
1096 struct eth_device_info *dev_info;
1100 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1101 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
1103 rx_adapter = rxa_id_to_adapter(id);
1104 if (rx_adapter == NULL)
1107 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1108 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
1114 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
1115 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
1116 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
1117 (uint16_t)rx_queue_id);
1121 dev_info = &rx_adapter->eth_devices[eth_dev_id];
1123 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
1124 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
1126 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
1127 &rte_eth_devices[eth_dev_id],
1130 rxa_update_queue(rx_adapter,
1131 &rx_adapter->eth_devices[eth_dev_id],
1134 if (dev_info->nb_dev_queues == 0) {
1135 rte_free(dev_info->rx_queue);
1136 dev_info->rx_queue = NULL;
1141 rte_spinlock_lock(&rx_adapter->rx_lock);
1142 if (rx_queue_id == -1) {
1143 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1144 rxa_sw_del(rx_adapter, dev_info, i);
1146 rxa_sw_del(rx_adapter, dev_info, (uint16_t)rx_queue_id);
1149 rc = rxa_calc_wrr_sequence(rx_adapter);
1151 RTE_EDEV_LOG_ERR("WRR recalculation failed %" PRId32,
1154 if (dev_info->nb_dev_queues == 0) {
1155 rte_free(dev_info->rx_queue);
1156 dev_info->rx_queue = NULL;
1159 rte_spinlock_unlock(&rx_adapter->rx_lock);
1160 rte_service_component_runstate_set(rx_adapter->service_id,
1161 rxa_sw_adapter_queue_count(rx_adapter));
1169 rte_event_eth_rx_adapter_start(uint8_t id)
1171 return rxa_ctrl(id, 1);
1175 rte_event_eth_rx_adapter_stop(uint8_t id)
1177 return rxa_ctrl(id, 0);
1181 rte_event_eth_rx_adapter_stats_get(uint8_t id,
1182 struct rte_event_eth_rx_adapter_stats *stats)
1184 struct rte_event_eth_rx_adapter *rx_adapter;
1185 struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
1186 struct rte_event_eth_rx_adapter_stats dev_stats;
1187 struct rte_eventdev *dev;
1188 struct eth_device_info *dev_info;
1192 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1194 rx_adapter = rxa_id_to_adapter(id);
1195 if (rx_adapter == NULL || stats == NULL)
1198 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1199 memset(stats, 0, sizeof(*stats));
1200 RTE_ETH_FOREACH_DEV(i) {
1201 dev_info = &rx_adapter->eth_devices[i];
1202 if (dev_info->internal_event_port == 0 ||
1203 dev->dev_ops->eth_rx_adapter_stats_get == NULL)
1205 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
1206 &rte_eth_devices[i],
1210 dev_stats_sum.rx_packets += dev_stats.rx_packets;
1211 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
1214 if (rx_adapter->service_inited)
1215 *stats = rx_adapter->stats;
1217 stats->rx_packets += dev_stats_sum.rx_packets;
1218 stats->rx_enq_count += dev_stats_sum.rx_enq_count;
1223 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
1225 struct rte_event_eth_rx_adapter *rx_adapter;
1226 struct rte_eventdev *dev;
1227 struct eth_device_info *dev_info;
1230 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1232 rx_adapter = rxa_id_to_adapter(id);
1233 if (rx_adapter == NULL)
1236 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1237 RTE_ETH_FOREACH_DEV(i) {
1238 dev_info = &rx_adapter->eth_devices[i];
1239 if (dev_info->internal_event_port == 0 ||
1240 dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
1242 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
1243 &rte_eth_devices[i]);
1246 memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
1251 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
1253 struct rte_event_eth_rx_adapter *rx_adapter;
1255 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1257 rx_adapter = rxa_id_to_adapter(id);
1258 if (rx_adapter == NULL || service_id == NULL)
1261 if (rx_adapter->service_inited)
1262 *service_id = rx_adapter->service_id;
1264 return rx_adapter->service_inited ? 0 : -ESRCH;