1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2017 Intel Corporation.
10 #include <rte_cycles.h>
11 #include <rte_common.h>
13 #include <rte_errno.h>
14 #include <rte_ethdev.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
21 #include "rte_eventdev.h"
22 #include "rte_eventdev_pmd.h"
23 #include "rte_event_eth_rx_adapter.h"
26 #define BLOCK_CNT_THRESHOLD 10
27 #define ETH_EVENT_BUFFER_SIZE (4*BATCH_SIZE)
29 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
30 #define ETH_RX_ADAPTER_MEM_NAME_LEN 32
32 #define RSS_KEY_SIZE 40
33 /* value written to intr thread pipe to signal thread exit */
34 #define ETH_BRIDGE_INTR_THREAD_EXIT 1
35 /* Sentinel value to detect initialized file handle */
39 * Used to store port and queue ID of interrupting Rx queue
51 * There is an instance of this struct per polled Rx queue added to the
54 struct eth_rx_poll_entry {
55 /* Eth port to poll */
57 /* Eth rx queue to poll */
61 /* Instance per adapter */
62 struct rte_eth_event_enqueue_buffer {
63 /* Count of events in this buffer */
65 /* Array of events in this buffer */
66 struct rte_event events[ETH_EVENT_BUFFER_SIZE];
69 struct rte_event_eth_rx_adapter {
71 uint8_t rss_key_be[RSS_KEY_SIZE];
72 /* Event device identifier */
74 /* Per ethernet device structure */
75 struct eth_device_info *eth_devices;
76 /* Event port identifier */
77 uint8_t event_port_id;
78 /* Lock to serialize config updates with service function */
79 rte_spinlock_t rx_lock;
80 /* Max mbufs processed in any service function invocation */
82 /* Receive queues that need to be polled */
83 struct eth_rx_poll_entry *eth_rx_poll;
84 /* Size of the eth_rx_poll array */
85 uint16_t num_rx_polled;
86 /* Weighted round robin schedule */
88 /* wrr_sched[] size */
90 /* Next entry in wrr[] to begin polling */
92 /* Event burst buffer */
93 struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
94 /* Per adapter stats */
95 struct rte_event_eth_rx_adapter_stats stats;
96 /* Block count, counts up to BLOCK_CNT_THRESHOLD */
97 uint16_t enq_block_count;
99 uint64_t rx_enq_block_start_ts;
100 /* epoll fd used to wait for Rx interrupts */
102 /* Num of interrupt driven interrupt queues */
103 uint32_t num_rx_intr;
104 /* Used to send <dev id, queue id> of interrupting Rx queues from
105 * the interrupt thread to the Rx thread
107 struct rte_ring *intr_ring;
108 /* Rx Queue data (dev id, queue id) for the last non-empty
112 /* queue_data is valid */
114 /* Interrupt ring lock, synchronizes Rx thread
115 * and interrupt thread
117 rte_spinlock_t intr_ring_lock;
118 /* event array passed to rte_poll_wait */
119 struct rte_epoll_event *epoll_events;
120 /* Count of interrupt vectors in use */
121 uint32_t num_intr_vec;
122 /* Thread blocked on Rx interrupts */
123 pthread_t rx_intr_thread;
124 /* Configuration callback for rte_service configuration */
125 rte_event_eth_rx_adapter_conf_cb conf_cb;
126 /* Configuration callback argument */
128 /* Set if default_cb is being used */
130 /* Service initialization state */
131 uint8_t service_inited;
132 /* Total count of Rx queues in adapter */
134 /* Memory allocation name */
135 char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
136 /* Socket identifier cached from eventdev */
138 /* Per adapter EAL service */
140 /* Adapter started flag */
144 } __rte_cache_aligned;
147 struct eth_device_info {
148 struct rte_eth_dev *dev;
149 struct eth_rx_queue_info *rx_queue;
151 rte_event_eth_rx_adapter_cb_fn cb_fn;
152 /* Rx callback argument */
154 /* Set if ethdev->eventdev packet transfer uses a
157 uint8_t internal_event_port;
158 /* Set if the adapter is processing rx queues for
159 * this eth device and packet processing has been
160 * started, allows for the code to know if the PMD
161 * rx_adapter_stop callback needs to be invoked
163 uint8_t dev_rx_started;
164 /* Number of queues added for this device */
165 uint16_t nb_dev_queues;
166 /* Number of poll based queues
167 * If nb_rx_poll > 0, the start callback will
168 * be invoked if not already invoked
171 /* Number of interrupt based queues
172 * If nb_rx_intr > 0, the start callback will
173 * be invoked if not already invoked.
176 /* Number of queues that use the shared interrupt */
177 uint16_t nb_shared_intr;
178 /* sum(wrr(q)) for all queues within the device
179 * useful when deleting all device queues
182 /* Intr based queue index to start polling from, this is used
183 * if the number of shared interrupts is non-zero
186 /* Intr based queue indices */
187 uint16_t *intr_queue;
188 /* device generates per Rx queue interrupt for queue index
189 * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
192 /* shared interrupt enabled */
193 int shared_intr_enabled;
197 struct eth_rx_queue_info {
198 int queue_enabled; /* True if added */
200 uint16_t wt; /* Polling weight */
201 uint32_t flow_id_mask; /* Set to ~0 if app provides flow id else 0 */
205 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
208 rxa_validate_id(uint8_t id)
210 return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
213 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
214 if (!rxa_validate_id(id)) { \
215 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
221 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
223 return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
226 /* Greatest common divisor */
227 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
231 return r ? rxa_gcd_u16(b, r) : b;
234 /* Returns the next queue in the polling sequence
236 * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
239 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
240 unsigned int n, int *cw,
241 struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
242 uint16_t gcd, int prev)
258 q = eth_rx_poll[i].eth_rx_qid;
259 d = eth_rx_poll[i].eth_dev_id;
260 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
268 rxa_shared_intr(struct eth_device_info *dev_info,
273 if (dev_info->dev->intr_handle == NULL)
276 multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
277 return !multi_intr_cap ||
278 rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
282 rxa_intr_queue(struct eth_device_info *dev_info,
285 struct eth_rx_queue_info *queue_info;
287 queue_info = &dev_info->rx_queue[rx_queue_id];
288 return dev_info->rx_queue &&
289 !dev_info->internal_event_port &&
290 queue_info->queue_enabled && queue_info->wt == 0;
294 rxa_polled_queue(struct eth_device_info *dev_info,
297 struct eth_rx_queue_info *queue_info;
299 queue_info = &dev_info->rx_queue[rx_queue_id];
300 return !dev_info->internal_event_port &&
301 dev_info->rx_queue &&
302 queue_info->queue_enabled && queue_info->wt != 0;
305 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
307 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
313 nbq = dev_info->dev->data->nb_rx_queues;
314 n = 0; /* non shared count */
315 s = 0; /* shared count */
317 if (rx_queue_id == -1) {
318 for (i = 0; i < nbq; i++) {
319 if (!rxa_shared_intr(dev_info, i))
320 n += add ? !rxa_intr_queue(dev_info, i) :
321 rxa_intr_queue(dev_info, i);
323 s += add ? !rxa_intr_queue(dev_info, i) :
324 rxa_intr_queue(dev_info, i);
328 if ((add && dev_info->nb_shared_intr == 0) ||
329 (!add && dev_info->nb_shared_intr))
333 if (!rxa_shared_intr(dev_info, rx_queue_id))
334 n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
335 rxa_intr_queue(dev_info, rx_queue_id);
337 n = add ? !dev_info->nb_shared_intr :
338 dev_info->nb_shared_intr == 1;
344 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
347 rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
348 struct eth_device_info *dev_info,
350 uint32_t *nb_rx_intr)
354 if (rx_queue_id == -1)
355 intr_diff = dev_info->nb_rx_intr;
357 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
359 *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
362 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
363 * interrupt queues could currently be poll mode Rx queues
366 rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
367 struct eth_device_info *dev_info,
369 uint32_t *nb_rx_poll,
370 uint32_t *nb_rx_intr,
375 uint32_t wrr_len_diff;
377 if (rx_queue_id == -1) {
378 intr_diff = dev_info->dev->data->nb_rx_queues -
379 dev_info->nb_rx_intr;
380 poll_diff = dev_info->nb_rx_poll;
381 wrr_len_diff = dev_info->wrr_len;
383 intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
384 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
385 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
389 *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
390 *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
391 *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
394 /* Calculate size of the eth_rx_poll and wrr_sched arrays
395 * after deleting poll mode rx queues
398 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
399 struct eth_device_info *dev_info,
401 uint32_t *nb_rx_poll,
405 uint32_t wrr_len_diff;
407 if (rx_queue_id == -1) {
408 poll_diff = dev_info->nb_rx_poll;
409 wrr_len_diff = dev_info->wrr_len;
411 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
412 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
416 *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
417 *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
420 /* Calculate nb_rx_* after adding poll mode rx queues
423 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
424 struct eth_device_info *dev_info,
427 uint32_t *nb_rx_poll,
428 uint32_t *nb_rx_intr,
433 uint32_t wrr_len_diff;
435 if (rx_queue_id == -1) {
436 intr_diff = dev_info->nb_rx_intr;
437 poll_diff = dev_info->dev->data->nb_rx_queues -
438 dev_info->nb_rx_poll;
439 wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
442 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
443 poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
444 wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
445 wt - dev_info->rx_queue[rx_queue_id].wt :
449 *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
450 *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
451 *nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
454 /* Calculate nb_rx_* after adding rx_queue_id */
456 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
457 struct eth_device_info *dev_info,
460 uint32_t *nb_rx_poll,
461 uint32_t *nb_rx_intr,
465 rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
466 wt, nb_rx_poll, nb_rx_intr, nb_wrr);
468 rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
469 nb_rx_poll, nb_rx_intr, nb_wrr);
472 /* Calculate nb_rx_* after deleting rx_queue_id */
474 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
475 struct eth_device_info *dev_info,
477 uint32_t *nb_rx_poll,
478 uint32_t *nb_rx_intr,
481 rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
483 rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
488 * Allocate the rx_poll array
490 static struct eth_rx_poll_entry *
491 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
492 uint32_t num_rx_polled)
496 len = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
497 RTE_CACHE_LINE_SIZE);
498 return rte_zmalloc_socket(rx_adapter->mem_name,
501 rx_adapter->socket_id);
505 * Allocate the WRR array
508 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
512 len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
513 RTE_CACHE_LINE_SIZE);
514 return rte_zmalloc_socket(rx_adapter->mem_name,
517 rx_adapter->socket_id);
521 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
524 struct eth_rx_poll_entry **rx_poll,
525 uint32_t **wrr_sched)
534 *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
535 if (*rx_poll == NULL) {
540 *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
541 if (*wrr_sched == NULL) {
548 /* Precalculate WRR polling sequence for all queues in rx_adapter */
550 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
551 struct eth_rx_poll_entry *rx_poll,
560 /* Initialize variables for calculation of wrr schedule */
561 uint16_t max_wrr_pos = 0;
562 unsigned int poll_q = 0;
569 /* Generate array of all queues to poll, the size of this
572 RTE_ETH_FOREACH_DEV(d) {
573 uint16_t nb_rx_queues;
574 struct eth_device_info *dev_info =
575 &rx_adapter->eth_devices[d];
576 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
577 if (dev_info->rx_queue == NULL)
579 if (dev_info->internal_event_port)
581 dev_info->wrr_len = 0;
582 for (q = 0; q < nb_rx_queues; q++) {
583 struct eth_rx_queue_info *queue_info =
584 &dev_info->rx_queue[q];
587 if (!rxa_polled_queue(dev_info, q))
590 rx_poll[poll_q].eth_dev_id = d;
591 rx_poll[poll_q].eth_rx_qid = q;
593 dev_info->wrr_len += wt;
594 max_wt = RTE_MAX(max_wt, wt);
595 gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
600 /* Generate polling sequence based on weights */
603 for (i = 0; i < max_wrr_pos; i++) {
604 rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
605 rx_poll, max_wt, gcd, prev);
611 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
612 struct rte_ipv6_hdr **ipv6_hdr)
614 struct rte_ether_hdr *eth_hdr =
615 rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
616 struct rte_vlan_hdr *vlan_hdr;
621 switch (eth_hdr->ether_type) {
622 case RTE_BE16(RTE_ETHER_TYPE_IPV4):
623 *ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
626 case RTE_BE16(RTE_ETHER_TYPE_IPV6):
627 *ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
630 case RTE_BE16(RTE_ETHER_TYPE_VLAN):
631 vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
632 switch (vlan_hdr->eth_proto) {
633 case RTE_BE16(RTE_ETHER_TYPE_IPV4):
634 *ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
636 case RTE_BE16(RTE_ETHER_TYPE_IPV6):
637 *ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
649 /* Calculate RSS hash for IPv4/6 */
650 static inline uint32_t
651 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
655 struct rte_ipv4_tuple ipv4_tuple;
656 struct rte_ipv6_tuple ipv6_tuple;
657 struct rte_ipv4_hdr *ipv4_hdr;
658 struct rte_ipv6_hdr *ipv6_hdr;
660 rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
663 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
664 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
666 input_len = RTE_THASH_V4_L3_LEN;
667 } else if (ipv6_hdr) {
668 rte_thash_load_v6_addrs(ipv6_hdr,
669 (union rte_thash_tuple *)&ipv6_tuple);
671 input_len = RTE_THASH_V6_L3_LEN;
675 return rte_softrss_be(tuple, input_len, rss_key_be);
679 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
681 return !!rx_adapter->enq_block_count;
685 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
687 if (rx_adapter->rx_enq_block_start_ts)
690 rx_adapter->enq_block_count++;
691 if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
694 rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
698 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
699 struct rte_event_eth_rx_adapter_stats *stats)
701 if (unlikely(!stats->rx_enq_start_ts))
702 stats->rx_enq_start_ts = rte_get_tsc_cycles();
704 if (likely(!rxa_enq_blocked(rx_adapter)))
707 rx_adapter->enq_block_count = 0;
708 if (rx_adapter->rx_enq_block_start_ts) {
709 stats->rx_enq_end_ts = rte_get_tsc_cycles();
710 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
711 rx_adapter->rx_enq_block_start_ts;
712 rx_adapter->rx_enq_block_start_ts = 0;
716 /* Enqueue buffered events to event device */
717 static inline uint16_t
718 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
720 struct rte_eth_event_enqueue_buffer *buf =
721 &rx_adapter->event_enqueue_buffer;
722 struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
724 uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
725 rx_adapter->event_port_id,
728 if (n != buf->count) {
731 (buf->count - n) * sizeof(struct rte_event));
732 stats->rx_enq_retry++;
735 n ? rxa_enq_block_end_ts(rx_adapter, stats) :
736 rxa_enq_block_start_ts(rx_adapter);
739 stats->rx_enq_count += n;
745 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
747 uint16_t rx_queue_id,
748 struct rte_mbuf **mbufs,
752 struct eth_device_info *dev_info =
753 &rx_adapter->eth_devices[eth_dev_id];
754 struct eth_rx_queue_info *eth_rx_queue_info =
755 &dev_info->rx_queue[rx_queue_id];
756 struct rte_eth_event_enqueue_buffer *buf =
757 &rx_adapter->event_enqueue_buffer;
758 struct rte_event *ev = &buf->events[buf->count];
759 uint64_t event = eth_rx_queue_info->event;
760 uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
761 struct rte_mbuf *m = mbufs[0];
769 /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
770 rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
771 do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
773 if ((m->ol_flags & PKT_RX_TIMESTAMP) == 0) {
774 ts = rte_get_tsc_cycles();
775 for (i = 0; i < num; i++) {
779 m->ol_flags |= PKT_RX_TIMESTAMP;
783 for (i = 0; i < num; i++) {
787 rxa_do_softrss(m, rx_adapter->rss_key_be) :
790 ev->flow_id = (rss & ~flow_id_mask) |
791 (ev->flow_id & flow_id_mask);
796 if (dev_info->cb_fn) {
799 nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
800 ETH_EVENT_BUFFER_SIZE, buf->count, ev,
801 num, dev_info->cb_arg, &dropped);
802 if (unlikely(nb_cb > num))
803 RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
808 rx_adapter->stats.rx_dropped += dropped;
814 /* Enqueue packets from <port, q> to event buffer */
815 static inline uint32_t
816 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
823 struct rte_mbuf *mbufs[BATCH_SIZE];
824 struct rte_eth_event_enqueue_buffer *buf =
825 &rx_adapter->event_enqueue_buffer;
826 struct rte_event_eth_rx_adapter_stats *stats =
833 /* Don't do a batch dequeue from the rx queue if there isn't
834 * enough space in the enqueue buffer.
836 while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
837 if (buf->count >= BATCH_SIZE)
838 rxa_flush_event_buffer(rx_adapter);
840 stats->rx_poll_count++;
841 n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
847 rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
849 if (rx_count + nb_rx > max_rx)
854 rxa_flush_event_buffer(rx_adapter);
860 rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
867 struct eth_device_info *dev_info;
868 struct eth_rx_queue_info *queue_info;
875 dev_info = &rx_adapter->eth_devices[port_id];
876 queue_info = &dev_info->rx_queue[queue];
877 rte_spinlock_lock(&rx_adapter->intr_ring_lock);
878 if (rxa_shared_intr(dev_info, queue))
879 intr_enabled = &dev_info->shared_intr_enabled;
881 intr_enabled = &queue_info->intr_enabled;
885 err = rte_ring_enqueue(rx_adapter->intr_ring, data);
886 /* Entry should always be available.
887 * The ring size equals the maximum number of interrupt
888 * vectors supported (an interrupt vector is shared in
889 * case of shared interrupts)
892 RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
893 " to ring: %s", strerror(-err));
895 rte_eth_dev_rx_intr_disable(port_id, queue);
897 rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
901 rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
902 uint32_t num_intr_vec)
904 if (rx_adapter->num_intr_vec + num_intr_vec >
905 RTE_EVENT_ETH_INTR_RING_SIZE) {
906 RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
907 " %d needed %d limit %d", rx_adapter->num_intr_vec,
908 num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
915 /* Delete entries for (dev, queue) from the interrupt ring */
917 rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
918 struct eth_device_info *dev_info,
919 uint16_t rx_queue_id)
924 rte_spinlock_lock(&rx_adapter->intr_ring_lock);
926 n = rte_ring_count(rx_adapter->intr_ring);
927 for (i = 0; i < n; i++) {
928 rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
929 if (!rxa_shared_intr(dev_info, rx_queue_id)) {
930 if (qd.port == dev_info->dev->data->port_id &&
931 qd.queue == rx_queue_id)
934 if (qd.port == dev_info->dev->data->port_id)
937 rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
940 rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
943 /* pthread callback handling interrupt mode receive queues
944 * After receiving an Rx interrupt, it enqueues the port id and queue id of the
945 * interrupting queue to the adapter's ring buffer for interrupt events.
946 * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
947 * the adapter service function.
950 rxa_intr_thread(void *arg)
952 struct rte_event_eth_rx_adapter *rx_adapter = arg;
953 struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
957 n = rte_epoll_wait(rx_adapter->epd, epoll_events,
958 RTE_EVENT_ETH_INTR_RING_SIZE, -1);
960 RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
962 for (i = 0; i < n; i++) {
963 rxa_intr_ring_enqueue(rx_adapter,
964 epoll_events[i].epdata.data);
971 /* Dequeue <port, q> from interrupt ring and enqueue received
974 static inline uint32_t
975 rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
980 struct rte_eth_event_enqueue_buffer *buf;
981 rte_spinlock_t *ring_lock;
982 uint8_t max_done = 0;
984 if (rx_adapter->num_rx_intr == 0)
987 if (rte_ring_count(rx_adapter->intr_ring) == 0
988 && !rx_adapter->qd_valid)
991 buf = &rx_adapter->event_enqueue_buffer;
992 ring_lock = &rx_adapter->intr_ring_lock;
994 if (buf->count >= BATCH_SIZE)
995 rxa_flush_event_buffer(rx_adapter);
997 while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
998 struct eth_device_info *dev_info;
1001 union queue_data qd = rx_adapter->qd;
1004 if (!rx_adapter->qd_valid) {
1005 struct eth_rx_queue_info *queue_info;
1007 rte_spinlock_lock(ring_lock);
1008 err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1010 rte_spinlock_unlock(ring_lock);
1016 rx_adapter->qd = qd;
1017 rx_adapter->qd_valid = 1;
1018 dev_info = &rx_adapter->eth_devices[port];
1019 if (rxa_shared_intr(dev_info, queue))
1020 dev_info->shared_intr_enabled = 1;
1022 queue_info = &dev_info->rx_queue[queue];
1023 queue_info->intr_enabled = 1;
1025 rte_eth_dev_rx_intr_enable(port, queue);
1026 rte_spinlock_unlock(ring_lock);
1031 dev_info = &rx_adapter->eth_devices[port];
1034 if (rxa_shared_intr(dev_info, queue)) {
1038 nb_queues = dev_info->dev->data->nb_rx_queues;
1040 for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1041 uint8_t enq_buffer_full;
1043 if (!rxa_intr_queue(dev_info, i))
1045 n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1046 rx_adapter->max_nb_rx,
1050 enq_buffer_full = !rxq_empty && n == 0;
1051 max_done = nb_rx > rx_adapter->max_nb_rx;
1053 if (enq_buffer_full || max_done) {
1054 dev_info->next_q_idx = i;
1059 rx_adapter->qd_valid = 0;
1061 /* Reinitialize for next interrupt */
1062 dev_info->next_q_idx = dev_info->multi_intr_cap ?
1063 RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1066 n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1067 rx_adapter->max_nb_rx,
1069 rx_adapter->qd_valid = !rxq_empty;
1071 if (nb_rx > rx_adapter->max_nb_rx)
1077 rx_adapter->stats.rx_intr_packets += nb_rx;
1082 * Polls receive queues added to the event adapter and enqueues received
1083 * packets to the event device.
1085 * The receive code enqueues initially to a temporary buffer, the
1086 * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1088 * If there isn't space available in the temporary buffer, packets from the
1089 * Rx queue aren't dequeued from the eth device, this back pressures the
1090 * eth device, in virtual device environments this back pressure is relayed to
1091 * the hypervisor's switching layer where adjustments can be made to deal with
1094 static inline uint32_t
1095 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
1099 struct rte_eth_event_enqueue_buffer *buf;
1103 wrr_pos = rx_adapter->wrr_pos;
1104 max_nb_rx = rx_adapter->max_nb_rx;
1105 buf = &rx_adapter->event_enqueue_buffer;
1107 /* Iterate through a WRR sequence */
1108 for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1109 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1110 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1111 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1113 /* Don't do a batch dequeue from the rx queue if there isn't
1114 * enough space in the enqueue buffer.
1116 if (buf->count >= BATCH_SIZE)
1117 rxa_flush_event_buffer(rx_adapter);
1118 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
1119 rx_adapter->wrr_pos = wrr_pos;
1123 nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1125 if (nb_rx > max_nb_rx) {
1126 rx_adapter->wrr_pos =
1127 (wrr_pos + 1) % rx_adapter->wrr_len;
1131 if (++wrr_pos == rx_adapter->wrr_len)
1138 rxa_service_func(void *args)
1140 struct rte_event_eth_rx_adapter *rx_adapter = args;
1141 struct rte_event_eth_rx_adapter_stats *stats;
1143 if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1145 if (!rx_adapter->rxa_started) {
1146 rte_spinlock_unlock(&rx_adapter->rx_lock);
1150 stats = &rx_adapter->stats;
1151 stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
1152 stats->rx_packets += rxa_poll(rx_adapter);
1153 rte_spinlock_unlock(&rx_adapter->rx_lock);
1158 rte_event_eth_rx_adapter_init(void)
1160 const char *name = "rte_event_eth_rx_adapter_array";
1161 const struct rte_memzone *mz;
1164 sz = sizeof(*event_eth_rx_adapter) *
1165 RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1166 sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1168 mz = rte_memzone_lookup(name);
1170 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1171 RTE_CACHE_LINE_SIZE);
1173 RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1179 event_eth_rx_adapter = mz->addr;
1183 static inline struct rte_event_eth_rx_adapter *
1184 rxa_id_to_adapter(uint8_t id)
1186 return event_eth_rx_adapter ?
1187 event_eth_rx_adapter[id] : NULL;
1191 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1192 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1195 struct rte_eventdev *dev;
1196 struct rte_event_dev_config dev_conf;
1199 struct rte_event_port_conf *port_conf = arg;
1200 struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1202 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1203 dev_conf = dev->data->dev_conf;
1205 started = dev->data->dev_started;
1207 rte_event_dev_stop(dev_id);
1208 port_id = dev_conf.nb_event_ports;
1209 dev_conf.nb_event_ports += 1;
1210 ret = rte_event_dev_configure(dev_id, &dev_conf);
1212 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1215 if (rte_event_dev_start(dev_id))
1221 ret = rte_event_port_setup(dev_id, port_id, port_conf);
1223 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1228 conf->event_port_id = port_id;
1229 conf->max_nb_rx = 128;
1231 ret = rte_event_dev_start(dev_id);
1232 rx_adapter->default_cb_arg = 1;
1237 rxa_epoll_create1(void)
1241 fd = epoll_create1(EPOLL_CLOEXEC);
1242 return fd < 0 ? -errno : fd;
1249 rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
1251 if (rx_adapter->epd != INIT_FD)
1254 rx_adapter->epd = rxa_epoll_create1();
1255 if (rx_adapter->epd < 0) {
1256 int err = rx_adapter->epd;
1257 rx_adapter->epd = INIT_FD;
1258 RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1266 rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1269 char thread_name[RTE_MAX_THREAD_NAME_LEN];
1271 if (rx_adapter->intr_ring)
1274 rx_adapter->intr_ring = rte_ring_create("intr_ring",
1275 RTE_EVENT_ETH_INTR_RING_SIZE,
1276 rte_socket_id(), 0);
1277 if (!rx_adapter->intr_ring)
1280 rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1281 RTE_EVENT_ETH_INTR_RING_SIZE *
1282 sizeof(struct rte_epoll_event),
1283 RTE_CACHE_LINE_SIZE,
1284 rx_adapter->socket_id);
1285 if (!rx_adapter->epoll_events) {
1290 rte_spinlock_init(&rx_adapter->intr_ring_lock);
1292 snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1293 "rx-intr-thread-%d", rx_adapter->id);
1295 err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1296 NULL, rxa_intr_thread, rx_adapter);
1298 rte_thread_setname(rx_adapter->rx_intr_thread, thread_name);
1302 RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1304 rte_ring_free(rx_adapter->intr_ring);
1305 rx_adapter->intr_ring = NULL;
1306 rx_adapter->epoll_events = NULL;
1311 rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1315 err = pthread_cancel(rx_adapter->rx_intr_thread);
1317 RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1320 err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1322 RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1324 rte_free(rx_adapter->epoll_events);
1325 rte_ring_free(rx_adapter->intr_ring);
1326 rx_adapter->intr_ring = NULL;
1327 rx_adapter->epoll_events = NULL;
1332 rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
1336 if (rx_adapter->num_rx_intr == 0)
1339 ret = rxa_destroy_intr_thread(rx_adapter);
1343 close(rx_adapter->epd);
1344 rx_adapter->epd = INIT_FD;
1350 rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1351 struct eth_device_info *dev_info,
1352 uint16_t rx_queue_id)
1355 uint16_t eth_dev_id = dev_info->dev->data->port_id;
1356 int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1358 err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1360 RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1365 err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1370 RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1373 dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1375 dev_info->shared_intr_enabled = 0;
1380 rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1381 struct eth_device_info *dev_info,
1388 if (dev_info->nb_rx_intr == 0)
1392 if (rx_queue_id == -1) {
1393 s = dev_info->nb_shared_intr;
1394 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1398 q = dev_info->intr_queue[i];
1399 sintr = rxa_shared_intr(dev_info, q);
1402 if (!sintr || s == 0) {
1404 err = rxa_disable_intr(rx_adapter, dev_info,
1408 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1413 if (!rxa_intr_queue(dev_info, rx_queue_id))
1415 if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1416 dev_info->nb_shared_intr == 1) {
1417 err = rxa_disable_intr(rx_adapter, dev_info,
1421 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1425 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1426 if (dev_info->intr_queue[i] == rx_queue_id) {
1427 for (; i < dev_info->nb_rx_intr - 1; i++)
1428 dev_info->intr_queue[i] =
1429 dev_info->intr_queue[i + 1];
1439 rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1440 struct eth_device_info *dev_info,
1441 uint16_t rx_queue_id)
1444 uint16_t eth_dev_id = dev_info->dev->data->port_id;
1445 union queue_data qd;
1447 uint16_t *intr_queue;
1448 int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1450 if (rxa_intr_queue(dev_info, rx_queue_id))
1453 intr_queue = dev_info->intr_queue;
1454 if (dev_info->intr_queue == NULL) {
1456 dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1457 dev_info->intr_queue =
1459 rx_adapter->mem_name,
1462 rx_adapter->socket_id);
1463 if (dev_info->intr_queue == NULL)
1467 init_fd = rx_adapter->epd;
1468 err = rxa_init_epd(rx_adapter);
1470 goto err_free_queue;
1472 qd.port = eth_dev_id;
1473 qd.queue = rx_queue_id;
1475 err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1480 RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1481 " Rx Queue %u err %d", rx_queue_id, err);
1485 err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1487 RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1488 " Rx Queue %u err %d", rx_queue_id, err);
1493 err = rxa_create_intr_thread(rx_adapter);
1496 dev_info->shared_intr_enabled = 1;
1498 dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1503 err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1505 RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1506 " Rx Queue %u err %d", rx_queue_id, err);
1508 err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1513 RTE_EDEV_LOG_ERR("Could not delete event for"
1514 " Rx Queue %u err %d", rx_queue_id, err1);
1517 if (init_fd == INIT_FD) {
1518 close(rx_adapter->epd);
1519 rx_adapter->epd = -1;
1522 if (intr_queue == NULL)
1523 rte_free(dev_info->intr_queue);
1529 rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1530 struct eth_device_info *dev_info,
1536 int shared_done = (dev_info->nb_shared_intr > 0);
1538 if (rx_queue_id != -1) {
1539 if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1541 return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1545 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1547 if (rxa_shared_intr(dev_info, i) && shared_done)
1550 err = rxa_config_intr(rx_adapter, dev_info, i);
1552 shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1555 dev_info->shared_intr_enabled = 1;
1564 shared_done = (dev_info->nb_shared_intr > 0);
1565 for (j = 0; j < i; j++) {
1566 if (rxa_intr_queue(dev_info, j))
1568 if (rxa_shared_intr(dev_info, j) && si != j)
1570 err = rxa_disable_intr(rx_adapter, dev_info, j);
1581 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
1584 struct rte_service_spec service;
1585 struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1587 if (rx_adapter->service_inited)
1590 memset(&service, 0, sizeof(service));
1591 snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1592 "rte_event_eth_rx_adapter_%d", id);
1593 service.socket_id = rx_adapter->socket_id;
1594 service.callback = rxa_service_func;
1595 service.callback_userdata = rx_adapter;
1596 /* Service function handles locking for queue add/del updates */
1597 service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1598 ret = rte_service_component_register(&service, &rx_adapter->service_id);
1600 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1605 ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1606 &rx_adapter_conf, rx_adapter->conf_arg);
1608 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1612 rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1613 rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1614 rx_adapter->service_inited = 1;
1615 rx_adapter->epd = INIT_FD;
1619 rte_service_component_unregister(rx_adapter->service_id);
1624 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1625 struct eth_device_info *dev_info,
1626 int32_t rx_queue_id,
1629 struct eth_rx_queue_info *queue_info;
1633 if (dev_info->rx_queue == NULL)
1636 if (rx_queue_id == -1) {
1637 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1638 rxa_update_queue(rx_adapter, dev_info, i, add);
1640 queue_info = &dev_info->rx_queue[rx_queue_id];
1641 enabled = queue_info->queue_enabled;
1643 rx_adapter->nb_queues += !enabled;
1644 dev_info->nb_dev_queues += !enabled;
1646 rx_adapter->nb_queues -= enabled;
1647 dev_info->nb_dev_queues -= enabled;
1649 queue_info->queue_enabled = !!add;
1654 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
1655 struct eth_device_info *dev_info,
1656 int32_t rx_queue_id)
1663 if (rx_adapter->nb_queues == 0)
1666 if (rx_queue_id == -1) {
1667 uint16_t nb_rx_queues;
1670 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1671 for (i = 0; i < nb_rx_queues; i++)
1672 rxa_sw_del(rx_adapter, dev_info, i);
1676 pollq = rxa_polled_queue(dev_info, rx_queue_id);
1677 intrq = rxa_intr_queue(dev_info, rx_queue_id);
1678 sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1679 rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1680 rx_adapter->num_rx_polled -= pollq;
1681 dev_info->nb_rx_poll -= pollq;
1682 rx_adapter->num_rx_intr -= intrq;
1683 dev_info->nb_rx_intr -= intrq;
1684 dev_info->nb_shared_intr -= intrq && sintrq;
1688 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1689 struct eth_device_info *dev_info,
1690 int32_t rx_queue_id,
1691 const struct rte_event_eth_rx_adapter_queue_conf *conf)
1693 struct eth_rx_queue_info *queue_info;
1694 const struct rte_event *ev = &conf->ev;
1698 struct rte_event *qi_ev;
1700 if (rx_queue_id == -1) {
1701 uint16_t nb_rx_queues;
1704 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1705 for (i = 0; i < nb_rx_queues; i++)
1706 rxa_add_queue(rx_adapter, dev_info, i, conf);
1710 pollq = rxa_polled_queue(dev_info, rx_queue_id);
1711 intrq = rxa_intr_queue(dev_info, rx_queue_id);
1712 sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1714 queue_info = &dev_info->rx_queue[rx_queue_id];
1715 queue_info->wt = conf->servicing_weight;
1717 qi_ev = (struct rte_event *)&queue_info->event;
1718 qi_ev->event = ev->event;
1719 qi_ev->op = RTE_EVENT_OP_NEW;
1720 qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
1721 qi_ev->sub_event_type = 0;
1723 if (conf->rx_queue_flags &
1724 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
1725 queue_info->flow_id_mask = ~0;
1729 rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
1730 if (rxa_polled_queue(dev_info, rx_queue_id)) {
1731 rx_adapter->num_rx_polled += !pollq;
1732 dev_info->nb_rx_poll += !pollq;
1733 rx_adapter->num_rx_intr -= intrq;
1734 dev_info->nb_rx_intr -= intrq;
1735 dev_info->nb_shared_intr -= intrq && sintrq;
1738 if (rxa_intr_queue(dev_info, rx_queue_id)) {
1739 rx_adapter->num_rx_polled -= pollq;
1740 dev_info->nb_rx_poll -= pollq;
1741 rx_adapter->num_rx_intr += !intrq;
1742 dev_info->nb_rx_intr += !intrq;
1743 dev_info->nb_shared_intr += !intrq && sintrq;
1744 if (dev_info->nb_shared_intr == 1) {
1745 if (dev_info->multi_intr_cap)
1746 dev_info->next_q_idx =
1747 RTE_MAX_RXTX_INTR_VEC_ID - 1;
1749 dev_info->next_q_idx = 0;
1754 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
1755 uint16_t eth_dev_id,
1757 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
1759 struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
1760 struct rte_event_eth_rx_adapter_queue_conf temp_conf;
1762 struct eth_rx_poll_entry *rx_poll;
1763 struct eth_rx_queue_info *rx_queue;
1765 uint16_t nb_rx_queues;
1766 uint32_t nb_rx_poll, nb_wrr;
1767 uint32_t nb_rx_intr;
1771 if (queue_conf->servicing_weight == 0) {
1772 struct rte_eth_dev_data *data = dev_info->dev->data;
1774 temp_conf = *queue_conf;
1775 if (!data->dev_conf.intr_conf.rxq) {
1776 /* If Rx interrupts are disabled set wt = 1 */
1777 temp_conf.servicing_weight = 1;
1779 queue_conf = &temp_conf;
1782 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1783 rx_queue = dev_info->rx_queue;
1784 wt = queue_conf->servicing_weight;
1786 if (dev_info->rx_queue == NULL) {
1787 dev_info->rx_queue =
1788 rte_zmalloc_socket(rx_adapter->mem_name,
1790 sizeof(struct eth_rx_queue_info), 0,
1791 rx_adapter->socket_id);
1792 if (dev_info->rx_queue == NULL)
1798 rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
1799 queue_conf->servicing_weight,
1800 &nb_rx_poll, &nb_rx_intr, &nb_wrr);
1802 if (dev_info->dev->intr_handle)
1803 dev_info->multi_intr_cap =
1804 rte_intr_cap_multiple(dev_info->dev->intr_handle);
1806 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
1809 goto err_free_rxqueue;
1812 num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
1814 ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
1816 goto err_free_rxqueue;
1818 ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
1820 goto err_free_rxqueue;
1824 if (rx_adapter->num_rx_intr > nb_rx_intr) {
1825 num_intr_vec = rxa_nb_intr_vect(dev_info,
1827 /* interrupt based queues are being converted to
1828 * poll mode queues, delete the interrupt configuration
1831 ret = rxa_del_intr_queue(rx_adapter,
1832 dev_info, rx_queue_id);
1834 goto err_free_rxqueue;
1838 if (nb_rx_intr == 0) {
1839 ret = rxa_free_intr_resources(rx_adapter);
1841 goto err_free_rxqueue;
1847 if (rx_queue_id == -1) {
1848 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1849 dev_info->intr_queue[i] = i;
1851 if (!rxa_intr_queue(dev_info, rx_queue_id))
1852 dev_info->intr_queue[nb_rx_intr - 1] =
1859 rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
1860 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
1862 rte_free(rx_adapter->eth_rx_poll);
1863 rte_free(rx_adapter->wrr_sched);
1865 rx_adapter->eth_rx_poll = rx_poll;
1866 rx_adapter->wrr_sched = rx_wrr;
1867 rx_adapter->wrr_len = nb_wrr;
1868 rx_adapter->num_intr_vec += num_intr_vec;
1872 if (rx_queue == NULL) {
1873 rte_free(dev_info->rx_queue);
1874 dev_info->rx_queue = NULL;
1884 rxa_ctrl(uint8_t id, int start)
1886 struct rte_event_eth_rx_adapter *rx_adapter;
1887 struct rte_eventdev *dev;
1888 struct eth_device_info *dev_info;
1890 int use_service = 0;
1893 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1894 rx_adapter = rxa_id_to_adapter(id);
1895 if (rx_adapter == NULL)
1898 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1900 RTE_ETH_FOREACH_DEV(i) {
1901 dev_info = &rx_adapter->eth_devices[i];
1902 /* if start check for num dev queues */
1903 if (start && !dev_info->nb_dev_queues)
1905 /* if stop check if dev has been started */
1906 if (stop && !dev_info->dev_rx_started)
1908 use_service |= !dev_info->internal_event_port;
1909 dev_info->dev_rx_started = start;
1910 if (dev_info->internal_event_port == 0)
1912 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
1913 &rte_eth_devices[i]) :
1914 (*dev->dev_ops->eth_rx_adapter_stop)(dev,
1915 &rte_eth_devices[i]);
1919 rte_spinlock_lock(&rx_adapter->rx_lock);
1920 rx_adapter->rxa_started = start;
1921 rte_service_runstate_set(rx_adapter->service_id, start);
1922 rte_spinlock_unlock(&rx_adapter->rx_lock);
1929 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
1930 rte_event_eth_rx_adapter_conf_cb conf_cb,
1933 struct rte_event_eth_rx_adapter *rx_adapter;
1937 char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
1938 const uint8_t default_rss_key[] = {
1939 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
1940 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
1941 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
1942 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
1943 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
1946 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1947 RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1948 if (conf_cb == NULL)
1951 if (event_eth_rx_adapter == NULL) {
1952 ret = rte_event_eth_rx_adapter_init();
1957 rx_adapter = rxa_id_to_adapter(id);
1958 if (rx_adapter != NULL) {
1959 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
1963 socket_id = rte_event_dev_socket_id(dev_id);
1964 snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
1965 "rte_event_eth_rx_adapter_%d",
1968 rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
1969 RTE_CACHE_LINE_SIZE, socket_id);
1970 if (rx_adapter == NULL) {
1971 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
1975 rx_adapter->eventdev_id = dev_id;
1976 rx_adapter->socket_id = socket_id;
1977 rx_adapter->conf_cb = conf_cb;
1978 rx_adapter->conf_arg = conf_arg;
1979 rx_adapter->id = id;
1980 strcpy(rx_adapter->mem_name, mem_name);
1981 rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
1983 sizeof(struct eth_device_info), 0,
1985 rte_convert_rss_key((const uint32_t *)default_rss_key,
1986 (uint32_t *)rx_adapter->rss_key_be,
1987 RTE_DIM(default_rss_key));
1989 if (rx_adapter->eth_devices == NULL) {
1990 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
1991 rte_free(rx_adapter);
1994 rte_spinlock_init(&rx_adapter->rx_lock);
1995 for (i = 0; i < RTE_MAX_ETHPORTS; i++)
1996 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
1998 event_eth_rx_adapter[id] = rx_adapter;
1999 if (conf_cb == rxa_default_conf_cb)
2000 rx_adapter->default_cb_arg = 1;
2005 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2006 struct rte_event_port_conf *port_config)
2008 struct rte_event_port_conf *pc;
2011 if (port_config == NULL)
2013 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2015 pc = rte_malloc(NULL, sizeof(*pc), 0);
2019 ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2020 rxa_default_conf_cb,
2028 rte_event_eth_rx_adapter_free(uint8_t id)
2030 struct rte_event_eth_rx_adapter *rx_adapter;
2032 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2034 rx_adapter = rxa_id_to_adapter(id);
2035 if (rx_adapter == NULL)
2038 if (rx_adapter->nb_queues) {
2039 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2040 rx_adapter->nb_queues);
2044 if (rx_adapter->default_cb_arg)
2045 rte_free(rx_adapter->conf_arg);
2046 rte_free(rx_adapter->eth_devices);
2047 rte_free(rx_adapter);
2048 event_eth_rx_adapter[id] = NULL;
2054 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2055 uint16_t eth_dev_id,
2056 int32_t rx_queue_id,
2057 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2061 struct rte_event_eth_rx_adapter *rx_adapter;
2062 struct rte_eventdev *dev;
2063 struct eth_device_info *dev_info;
2065 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2066 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2068 rx_adapter = rxa_id_to_adapter(id);
2069 if ((rx_adapter == NULL) || (queue_conf == NULL))
2072 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2073 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2077 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2078 "eth port %" PRIu16, id, eth_dev_id);
2082 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2083 && (queue_conf->rx_queue_flags &
2084 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2085 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2086 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2091 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2092 (rx_queue_id != -1)) {
2093 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2094 "event queue, eth port: %" PRIu16 " adapter id: %"
2095 PRIu8, eth_dev_id, id);
2099 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2100 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2101 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2102 (uint16_t)rx_queue_id);
2106 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2108 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2109 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2111 if (dev_info->rx_queue == NULL) {
2112 dev_info->rx_queue =
2113 rte_zmalloc_socket(rx_adapter->mem_name,
2114 dev_info->dev->data->nb_rx_queues *
2115 sizeof(struct eth_rx_queue_info), 0,
2116 rx_adapter->socket_id);
2117 if (dev_info->rx_queue == NULL)
2121 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2122 &rte_eth_devices[eth_dev_id],
2123 rx_queue_id, queue_conf);
2125 dev_info->internal_event_port = 1;
2126 rxa_update_queue(rx_adapter,
2127 &rx_adapter->eth_devices[eth_dev_id],
2132 rte_spinlock_lock(&rx_adapter->rx_lock);
2133 dev_info->internal_event_port = 0;
2134 ret = rxa_init_service(rx_adapter, id);
2136 uint32_t service_id = rx_adapter->service_id;
2137 ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2139 rte_service_component_runstate_set(service_id,
2140 rxa_sw_adapter_queue_count(rx_adapter));
2142 rte_spinlock_unlock(&rx_adapter->rx_lock);
2152 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2153 int32_t rx_queue_id)
2156 struct rte_eventdev *dev;
2157 struct rte_event_eth_rx_adapter *rx_adapter;
2158 struct eth_device_info *dev_info;
2160 uint32_t nb_rx_poll = 0;
2161 uint32_t nb_wrr = 0;
2162 uint32_t nb_rx_intr;
2163 struct eth_rx_poll_entry *rx_poll = NULL;
2164 uint32_t *rx_wrr = NULL;
2167 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2168 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2170 rx_adapter = rxa_id_to_adapter(id);
2171 if (rx_adapter == NULL)
2174 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2175 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2181 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2182 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2183 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2184 (uint16_t)rx_queue_id);
2188 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2190 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2191 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2193 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2194 &rte_eth_devices[eth_dev_id],
2197 rxa_update_queue(rx_adapter,
2198 &rx_adapter->eth_devices[eth_dev_id],
2201 if (dev_info->nb_dev_queues == 0) {
2202 rte_free(dev_info->rx_queue);
2203 dev_info->rx_queue = NULL;
2207 rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2208 &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2210 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2215 rte_spinlock_lock(&rx_adapter->rx_lock);
2218 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2220 num_intr_vec = rxa_nb_intr_vect(dev_info,
2222 ret = rxa_del_intr_queue(rx_adapter, dev_info,
2228 if (nb_rx_intr == 0) {
2229 ret = rxa_free_intr_resources(rx_adapter);
2234 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2235 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2237 rte_free(rx_adapter->eth_rx_poll);
2238 rte_free(rx_adapter->wrr_sched);
2240 if (nb_rx_intr == 0) {
2241 rte_free(dev_info->intr_queue);
2242 dev_info->intr_queue = NULL;
2245 rx_adapter->eth_rx_poll = rx_poll;
2246 rx_adapter->wrr_sched = rx_wrr;
2247 rx_adapter->wrr_len = nb_wrr;
2248 rx_adapter->num_intr_vec += num_intr_vec;
2250 if (dev_info->nb_dev_queues == 0) {
2251 rte_free(dev_info->rx_queue);
2252 dev_info->rx_queue = NULL;
2255 rte_spinlock_unlock(&rx_adapter->rx_lock);
2262 rte_service_component_runstate_set(rx_adapter->service_id,
2263 rxa_sw_adapter_queue_count(rx_adapter));
2270 rte_event_eth_rx_adapter_start(uint8_t id)
2272 return rxa_ctrl(id, 1);
2276 rte_event_eth_rx_adapter_stop(uint8_t id)
2278 return rxa_ctrl(id, 0);
2282 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2283 struct rte_event_eth_rx_adapter_stats *stats)
2285 struct rte_event_eth_rx_adapter *rx_adapter;
2286 struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2287 struct rte_event_eth_rx_adapter_stats dev_stats;
2288 struct rte_eventdev *dev;
2289 struct eth_device_info *dev_info;
2293 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2295 rx_adapter = rxa_id_to_adapter(id);
2296 if (rx_adapter == NULL || stats == NULL)
2299 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2300 memset(stats, 0, sizeof(*stats));
2301 RTE_ETH_FOREACH_DEV(i) {
2302 dev_info = &rx_adapter->eth_devices[i];
2303 if (dev_info->internal_event_port == 0 ||
2304 dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2306 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2307 &rte_eth_devices[i],
2311 dev_stats_sum.rx_packets += dev_stats.rx_packets;
2312 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2315 if (rx_adapter->service_inited)
2316 *stats = rx_adapter->stats;
2318 stats->rx_packets += dev_stats_sum.rx_packets;
2319 stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2324 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
2326 struct rte_event_eth_rx_adapter *rx_adapter;
2327 struct rte_eventdev *dev;
2328 struct eth_device_info *dev_info;
2331 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2333 rx_adapter = rxa_id_to_adapter(id);
2334 if (rx_adapter == NULL)
2337 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2338 RTE_ETH_FOREACH_DEV(i) {
2339 dev_info = &rx_adapter->eth_devices[i];
2340 if (dev_info->internal_event_port == 0 ||
2341 dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
2343 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
2344 &rte_eth_devices[i]);
2347 memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
2352 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
2354 struct rte_event_eth_rx_adapter *rx_adapter;
2356 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2358 rx_adapter = rxa_id_to_adapter(id);
2359 if (rx_adapter == NULL || service_id == NULL)
2362 if (rx_adapter->service_inited)
2363 *service_id = rx_adapter->service_id;
2365 return rx_adapter->service_inited ? 0 : -ESRCH;
2369 rte_event_eth_rx_adapter_cb_register(uint8_t id,
2370 uint16_t eth_dev_id,
2371 rte_event_eth_rx_adapter_cb_fn cb_fn,
2374 struct rte_event_eth_rx_adapter *rx_adapter;
2375 struct eth_device_info *dev_info;
2379 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2380 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2382 rx_adapter = rxa_id_to_adapter(id);
2383 if (rx_adapter == NULL)
2386 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2387 if (dev_info->rx_queue == NULL)
2390 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2394 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2395 "eth port %" PRIu16, id, eth_dev_id);
2399 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2400 RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
2401 PRIu16, eth_dev_id);
2405 rte_spinlock_lock(&rx_adapter->rx_lock);
2406 dev_info->cb_fn = cb_fn;
2407 dev_info->cb_arg = cb_arg;
2408 rte_spinlock_unlock(&rx_adapter->rx_lock);