1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2017 Intel Corporation.
10 #include <rte_cycles.h>
11 #include <rte_common.h>
13 #include <rte_errno.h>
14 #include <rte_ethdev.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
21 #include "rte_eventdev.h"
22 #include "rte_eventdev_pmd.h"
23 #include "rte_eventdev_trace.h"
24 #include "rte_event_eth_rx_adapter.h"
27 #define BLOCK_CNT_THRESHOLD 10
28 #define ETH_EVENT_BUFFER_SIZE (4*BATCH_SIZE)
30 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
31 #define ETH_RX_ADAPTER_MEM_NAME_LEN 32
33 #define RSS_KEY_SIZE 40
34 /* value written to intr thread pipe to signal thread exit */
35 #define ETH_BRIDGE_INTR_THREAD_EXIT 1
36 /* Sentinel value to detect initialized file handle */
40 * Used to store port and queue ID of interrupting Rx queue
52 * There is an instance of this struct per polled Rx queue added to the
55 struct eth_rx_poll_entry {
56 /* Eth port to poll */
58 /* Eth rx queue to poll */
62 /* Instance per adapter */
63 struct rte_eth_event_enqueue_buffer {
64 /* Count of events in this buffer */
66 /* Array of events in this buffer */
67 struct rte_event events[ETH_EVENT_BUFFER_SIZE];
70 struct rte_event_eth_rx_adapter {
72 uint8_t rss_key_be[RSS_KEY_SIZE];
73 /* Event device identifier */
75 /* Per ethernet device structure */
76 struct eth_device_info *eth_devices;
77 /* Event port identifier */
78 uint8_t event_port_id;
79 /* Lock to serialize config updates with service function */
80 rte_spinlock_t rx_lock;
81 /* Max mbufs processed in any service function invocation */
83 /* Receive queues that need to be polled */
84 struct eth_rx_poll_entry *eth_rx_poll;
85 /* Size of the eth_rx_poll array */
86 uint16_t num_rx_polled;
87 /* Weighted round robin schedule */
89 /* wrr_sched[] size */
91 /* Next entry in wrr[] to begin polling */
93 /* Event burst buffer */
94 struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
95 /* Per adapter stats */
96 struct rte_event_eth_rx_adapter_stats stats;
97 /* Block count, counts up to BLOCK_CNT_THRESHOLD */
98 uint16_t enq_block_count;
100 uint64_t rx_enq_block_start_ts;
101 /* epoll fd used to wait for Rx interrupts */
103 /* Num of interrupt driven interrupt queues */
104 uint32_t num_rx_intr;
105 /* Used to send <dev id, queue id> of interrupting Rx queues from
106 * the interrupt thread to the Rx thread
108 struct rte_ring *intr_ring;
109 /* Rx Queue data (dev id, queue id) for the last non-empty
113 /* queue_data is valid */
115 /* Interrupt ring lock, synchronizes Rx thread
116 * and interrupt thread
118 rte_spinlock_t intr_ring_lock;
119 /* event array passed to rte_poll_wait */
120 struct rte_epoll_event *epoll_events;
121 /* Count of interrupt vectors in use */
122 uint32_t num_intr_vec;
123 /* Thread blocked on Rx interrupts */
124 pthread_t rx_intr_thread;
125 /* Configuration callback for rte_service configuration */
126 rte_event_eth_rx_adapter_conf_cb conf_cb;
127 /* Configuration callback argument */
129 /* Set if default_cb is being used */
131 /* Service initialization state */
132 uint8_t service_inited;
133 /* Total count of Rx queues in adapter */
135 /* Memory allocation name */
136 char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
137 /* Socket identifier cached from eventdev */
139 /* Per adapter EAL service */
141 /* Adapter started flag */
145 } __rte_cache_aligned;
148 struct eth_device_info {
149 struct rte_eth_dev *dev;
150 struct eth_rx_queue_info *rx_queue;
152 rte_event_eth_rx_adapter_cb_fn cb_fn;
153 /* Rx callback argument */
155 /* Set if ethdev->eventdev packet transfer uses a
158 uint8_t internal_event_port;
159 /* Set if the adapter is processing rx queues for
160 * this eth device and packet processing has been
161 * started, allows for the code to know if the PMD
162 * rx_adapter_stop callback needs to be invoked
164 uint8_t dev_rx_started;
165 /* Number of queues added for this device */
166 uint16_t nb_dev_queues;
167 /* Number of poll based queues
168 * If nb_rx_poll > 0, the start callback will
169 * be invoked if not already invoked
172 /* Number of interrupt based queues
173 * If nb_rx_intr > 0, the start callback will
174 * be invoked if not already invoked.
177 /* Number of queues that use the shared interrupt */
178 uint16_t nb_shared_intr;
179 /* sum(wrr(q)) for all queues within the device
180 * useful when deleting all device queues
183 /* Intr based queue index to start polling from, this is used
184 * if the number of shared interrupts is non-zero
187 /* Intr based queue indices */
188 uint16_t *intr_queue;
189 /* device generates per Rx queue interrupt for queue index
190 * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
193 /* shared interrupt enabled */
194 int shared_intr_enabled;
198 struct eth_rx_queue_info {
199 int queue_enabled; /* True if added */
201 uint16_t wt; /* Polling weight */
202 uint32_t flow_id_mask; /* Set to ~0 if app provides flow id else 0 */
206 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
209 rxa_validate_id(uint8_t id)
211 return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
214 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
215 if (!rxa_validate_id(id)) { \
216 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
222 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
224 return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
227 /* Greatest common divisor */
228 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
232 return r ? rxa_gcd_u16(b, r) : b;
235 /* Returns the next queue in the polling sequence
237 * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
240 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
241 unsigned int n, int *cw,
242 struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
243 uint16_t gcd, int prev)
259 q = eth_rx_poll[i].eth_rx_qid;
260 d = eth_rx_poll[i].eth_dev_id;
261 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
269 rxa_shared_intr(struct eth_device_info *dev_info,
274 if (dev_info->dev->intr_handle == NULL)
277 multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
278 return !multi_intr_cap ||
279 rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
283 rxa_intr_queue(struct eth_device_info *dev_info,
286 struct eth_rx_queue_info *queue_info;
288 queue_info = &dev_info->rx_queue[rx_queue_id];
289 return dev_info->rx_queue &&
290 !dev_info->internal_event_port &&
291 queue_info->queue_enabled && queue_info->wt == 0;
295 rxa_polled_queue(struct eth_device_info *dev_info,
298 struct eth_rx_queue_info *queue_info;
300 queue_info = &dev_info->rx_queue[rx_queue_id];
301 return !dev_info->internal_event_port &&
302 dev_info->rx_queue &&
303 queue_info->queue_enabled && queue_info->wt != 0;
306 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
308 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
314 nbq = dev_info->dev->data->nb_rx_queues;
315 n = 0; /* non shared count */
316 s = 0; /* shared count */
318 if (rx_queue_id == -1) {
319 for (i = 0; i < nbq; i++) {
320 if (!rxa_shared_intr(dev_info, i))
321 n += add ? !rxa_intr_queue(dev_info, i) :
322 rxa_intr_queue(dev_info, i);
324 s += add ? !rxa_intr_queue(dev_info, i) :
325 rxa_intr_queue(dev_info, i);
329 if ((add && dev_info->nb_shared_intr == 0) ||
330 (!add && dev_info->nb_shared_intr))
334 if (!rxa_shared_intr(dev_info, rx_queue_id))
335 n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
336 rxa_intr_queue(dev_info, rx_queue_id);
338 n = add ? !dev_info->nb_shared_intr :
339 dev_info->nb_shared_intr == 1;
345 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
348 rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
349 struct eth_device_info *dev_info,
351 uint32_t *nb_rx_intr)
355 if (rx_queue_id == -1)
356 intr_diff = dev_info->nb_rx_intr;
358 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
360 *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
363 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
364 * interrupt queues could currently be poll mode Rx queues
367 rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
368 struct eth_device_info *dev_info,
370 uint32_t *nb_rx_poll,
371 uint32_t *nb_rx_intr,
376 uint32_t wrr_len_diff;
378 if (rx_queue_id == -1) {
379 intr_diff = dev_info->dev->data->nb_rx_queues -
380 dev_info->nb_rx_intr;
381 poll_diff = dev_info->nb_rx_poll;
382 wrr_len_diff = dev_info->wrr_len;
384 intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
385 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
386 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
390 *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
391 *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
392 *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
395 /* Calculate size of the eth_rx_poll and wrr_sched arrays
396 * after deleting poll mode rx queues
399 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
400 struct eth_device_info *dev_info,
402 uint32_t *nb_rx_poll,
406 uint32_t wrr_len_diff;
408 if (rx_queue_id == -1) {
409 poll_diff = dev_info->nb_rx_poll;
410 wrr_len_diff = dev_info->wrr_len;
412 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
413 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
417 *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
418 *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
421 /* Calculate nb_rx_* after adding poll mode rx queues
424 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
425 struct eth_device_info *dev_info,
428 uint32_t *nb_rx_poll,
429 uint32_t *nb_rx_intr,
434 uint32_t wrr_len_diff;
436 if (rx_queue_id == -1) {
437 intr_diff = dev_info->nb_rx_intr;
438 poll_diff = dev_info->dev->data->nb_rx_queues -
439 dev_info->nb_rx_poll;
440 wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
443 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
444 poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
445 wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
446 wt - dev_info->rx_queue[rx_queue_id].wt :
450 *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
451 *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
452 *nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
455 /* Calculate nb_rx_* after adding rx_queue_id */
457 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
458 struct eth_device_info *dev_info,
461 uint32_t *nb_rx_poll,
462 uint32_t *nb_rx_intr,
466 rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
467 wt, nb_rx_poll, nb_rx_intr, nb_wrr);
469 rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
470 nb_rx_poll, nb_rx_intr, nb_wrr);
473 /* Calculate nb_rx_* after deleting rx_queue_id */
475 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
476 struct eth_device_info *dev_info,
478 uint32_t *nb_rx_poll,
479 uint32_t *nb_rx_intr,
482 rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
484 rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
489 * Allocate the rx_poll array
491 static struct eth_rx_poll_entry *
492 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
493 uint32_t num_rx_polled)
497 len = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
498 RTE_CACHE_LINE_SIZE);
499 return rte_zmalloc_socket(rx_adapter->mem_name,
502 rx_adapter->socket_id);
506 * Allocate the WRR array
509 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
513 len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
514 RTE_CACHE_LINE_SIZE);
515 return rte_zmalloc_socket(rx_adapter->mem_name,
518 rx_adapter->socket_id);
522 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
525 struct eth_rx_poll_entry **rx_poll,
526 uint32_t **wrr_sched)
535 *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
536 if (*rx_poll == NULL) {
541 *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
542 if (*wrr_sched == NULL) {
549 /* Precalculate WRR polling sequence for all queues in rx_adapter */
551 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
552 struct eth_rx_poll_entry *rx_poll,
561 /* Initialize variables for calculation of wrr schedule */
562 uint16_t max_wrr_pos = 0;
563 unsigned int poll_q = 0;
570 /* Generate array of all queues to poll, the size of this
573 RTE_ETH_FOREACH_DEV(d) {
574 uint16_t nb_rx_queues;
575 struct eth_device_info *dev_info =
576 &rx_adapter->eth_devices[d];
577 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
578 if (dev_info->rx_queue == NULL)
580 if (dev_info->internal_event_port)
582 dev_info->wrr_len = 0;
583 for (q = 0; q < nb_rx_queues; q++) {
584 struct eth_rx_queue_info *queue_info =
585 &dev_info->rx_queue[q];
588 if (!rxa_polled_queue(dev_info, q))
591 rx_poll[poll_q].eth_dev_id = d;
592 rx_poll[poll_q].eth_rx_qid = q;
594 dev_info->wrr_len += wt;
595 max_wt = RTE_MAX(max_wt, wt);
596 gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
601 /* Generate polling sequence based on weights */
604 for (i = 0; i < max_wrr_pos; i++) {
605 rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
606 rx_poll, max_wt, gcd, prev);
612 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
613 struct rte_ipv6_hdr **ipv6_hdr)
615 struct rte_ether_hdr *eth_hdr =
616 rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
617 struct rte_vlan_hdr *vlan_hdr;
622 switch (eth_hdr->ether_type) {
623 case RTE_BE16(RTE_ETHER_TYPE_IPV4):
624 *ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
627 case RTE_BE16(RTE_ETHER_TYPE_IPV6):
628 *ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
631 case RTE_BE16(RTE_ETHER_TYPE_VLAN):
632 vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
633 switch (vlan_hdr->eth_proto) {
634 case RTE_BE16(RTE_ETHER_TYPE_IPV4):
635 *ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
637 case RTE_BE16(RTE_ETHER_TYPE_IPV6):
638 *ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
650 /* Calculate RSS hash for IPv4/6 */
651 static inline uint32_t
652 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
656 struct rte_ipv4_tuple ipv4_tuple;
657 struct rte_ipv6_tuple ipv6_tuple;
658 struct rte_ipv4_hdr *ipv4_hdr;
659 struct rte_ipv6_hdr *ipv6_hdr;
661 rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
664 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
665 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
667 input_len = RTE_THASH_V4_L3_LEN;
668 } else if (ipv6_hdr) {
669 rte_thash_load_v6_addrs(ipv6_hdr,
670 (union rte_thash_tuple *)&ipv6_tuple);
672 input_len = RTE_THASH_V6_L3_LEN;
676 return rte_softrss_be(tuple, input_len, rss_key_be);
680 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
682 return !!rx_adapter->enq_block_count;
686 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
688 if (rx_adapter->rx_enq_block_start_ts)
691 rx_adapter->enq_block_count++;
692 if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
695 rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
699 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
700 struct rte_event_eth_rx_adapter_stats *stats)
702 if (unlikely(!stats->rx_enq_start_ts))
703 stats->rx_enq_start_ts = rte_get_tsc_cycles();
705 if (likely(!rxa_enq_blocked(rx_adapter)))
708 rx_adapter->enq_block_count = 0;
709 if (rx_adapter->rx_enq_block_start_ts) {
710 stats->rx_enq_end_ts = rte_get_tsc_cycles();
711 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
712 rx_adapter->rx_enq_block_start_ts;
713 rx_adapter->rx_enq_block_start_ts = 0;
717 /* Enqueue buffered events to event device */
718 static inline uint16_t
719 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
721 struct rte_eth_event_enqueue_buffer *buf =
722 &rx_adapter->event_enqueue_buffer;
723 struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
725 uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
726 rx_adapter->event_port_id,
729 if (n != buf->count) {
732 (buf->count - n) * sizeof(struct rte_event));
733 stats->rx_enq_retry++;
736 n ? rxa_enq_block_end_ts(rx_adapter, stats) :
737 rxa_enq_block_start_ts(rx_adapter);
740 stats->rx_enq_count += n;
746 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
748 uint16_t rx_queue_id,
749 struct rte_mbuf **mbufs,
753 struct eth_device_info *dev_info =
754 &rx_adapter->eth_devices[eth_dev_id];
755 struct eth_rx_queue_info *eth_rx_queue_info =
756 &dev_info->rx_queue[rx_queue_id];
757 struct rte_eth_event_enqueue_buffer *buf =
758 &rx_adapter->event_enqueue_buffer;
759 struct rte_event *ev = &buf->events[buf->count];
760 uint64_t event = eth_rx_queue_info->event;
761 uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
762 struct rte_mbuf *m = mbufs[0];
769 /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
770 rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
771 do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
773 for (i = 0; i < num; i++) {
777 rxa_do_softrss(m, rx_adapter->rss_key_be) :
780 ev->flow_id = (rss & ~flow_id_mask) |
781 (ev->flow_id & flow_id_mask);
786 if (dev_info->cb_fn) {
789 nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
790 ETH_EVENT_BUFFER_SIZE, buf->count, ev,
791 num, dev_info->cb_arg, &dropped);
792 if (unlikely(nb_cb > num))
793 RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
798 rx_adapter->stats.rx_dropped += dropped;
804 /* Enqueue packets from <port, q> to event buffer */
805 static inline uint32_t
806 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
813 struct rte_mbuf *mbufs[BATCH_SIZE];
814 struct rte_eth_event_enqueue_buffer *buf =
815 &rx_adapter->event_enqueue_buffer;
816 struct rte_event_eth_rx_adapter_stats *stats =
823 /* Don't do a batch dequeue from the rx queue if there isn't
824 * enough space in the enqueue buffer.
826 while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
827 if (buf->count >= BATCH_SIZE)
828 rxa_flush_event_buffer(rx_adapter);
830 stats->rx_poll_count++;
831 n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
837 rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
839 if (rx_count + nb_rx > max_rx)
844 rxa_flush_event_buffer(rx_adapter);
850 rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
857 struct eth_device_info *dev_info;
858 struct eth_rx_queue_info *queue_info;
865 dev_info = &rx_adapter->eth_devices[port_id];
866 queue_info = &dev_info->rx_queue[queue];
867 rte_spinlock_lock(&rx_adapter->intr_ring_lock);
868 if (rxa_shared_intr(dev_info, queue))
869 intr_enabled = &dev_info->shared_intr_enabled;
871 intr_enabled = &queue_info->intr_enabled;
875 err = rte_ring_enqueue(rx_adapter->intr_ring, data);
876 /* Entry should always be available.
877 * The ring size equals the maximum number of interrupt
878 * vectors supported (an interrupt vector is shared in
879 * case of shared interrupts)
882 RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
883 " to ring: %s", strerror(-err));
885 rte_eth_dev_rx_intr_disable(port_id, queue);
887 rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
891 rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
892 uint32_t num_intr_vec)
894 if (rx_adapter->num_intr_vec + num_intr_vec >
895 RTE_EVENT_ETH_INTR_RING_SIZE) {
896 RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
897 " %d needed %d limit %d", rx_adapter->num_intr_vec,
898 num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
905 /* Delete entries for (dev, queue) from the interrupt ring */
907 rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
908 struct eth_device_info *dev_info,
909 uint16_t rx_queue_id)
914 rte_spinlock_lock(&rx_adapter->intr_ring_lock);
916 n = rte_ring_count(rx_adapter->intr_ring);
917 for (i = 0; i < n; i++) {
918 rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
919 if (!rxa_shared_intr(dev_info, rx_queue_id)) {
920 if (qd.port == dev_info->dev->data->port_id &&
921 qd.queue == rx_queue_id)
924 if (qd.port == dev_info->dev->data->port_id)
927 rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
930 rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
933 /* pthread callback handling interrupt mode receive queues
934 * After receiving an Rx interrupt, it enqueues the port id and queue id of the
935 * interrupting queue to the adapter's ring buffer for interrupt events.
936 * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
937 * the adapter service function.
940 rxa_intr_thread(void *arg)
942 struct rte_event_eth_rx_adapter *rx_adapter = arg;
943 struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
947 n = rte_epoll_wait(rx_adapter->epd, epoll_events,
948 RTE_EVENT_ETH_INTR_RING_SIZE, -1);
950 RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
952 for (i = 0; i < n; i++) {
953 rxa_intr_ring_enqueue(rx_adapter,
954 epoll_events[i].epdata.data);
961 /* Dequeue <port, q> from interrupt ring and enqueue received
964 static inline uint32_t
965 rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
970 struct rte_eth_event_enqueue_buffer *buf;
971 rte_spinlock_t *ring_lock;
972 uint8_t max_done = 0;
974 if (rx_adapter->num_rx_intr == 0)
977 if (rte_ring_count(rx_adapter->intr_ring) == 0
978 && !rx_adapter->qd_valid)
981 buf = &rx_adapter->event_enqueue_buffer;
982 ring_lock = &rx_adapter->intr_ring_lock;
984 if (buf->count >= BATCH_SIZE)
985 rxa_flush_event_buffer(rx_adapter);
987 while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
988 struct eth_device_info *dev_info;
991 union queue_data qd = rx_adapter->qd;
994 if (!rx_adapter->qd_valid) {
995 struct eth_rx_queue_info *queue_info;
997 rte_spinlock_lock(ring_lock);
998 err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1000 rte_spinlock_unlock(ring_lock);
1006 rx_adapter->qd = qd;
1007 rx_adapter->qd_valid = 1;
1008 dev_info = &rx_adapter->eth_devices[port];
1009 if (rxa_shared_intr(dev_info, queue))
1010 dev_info->shared_intr_enabled = 1;
1012 queue_info = &dev_info->rx_queue[queue];
1013 queue_info->intr_enabled = 1;
1015 rte_eth_dev_rx_intr_enable(port, queue);
1016 rte_spinlock_unlock(ring_lock);
1021 dev_info = &rx_adapter->eth_devices[port];
1024 if (rxa_shared_intr(dev_info, queue)) {
1028 nb_queues = dev_info->dev->data->nb_rx_queues;
1030 for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1031 uint8_t enq_buffer_full;
1033 if (!rxa_intr_queue(dev_info, i))
1035 n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1036 rx_adapter->max_nb_rx,
1040 enq_buffer_full = !rxq_empty && n == 0;
1041 max_done = nb_rx > rx_adapter->max_nb_rx;
1043 if (enq_buffer_full || max_done) {
1044 dev_info->next_q_idx = i;
1049 rx_adapter->qd_valid = 0;
1051 /* Reinitialize for next interrupt */
1052 dev_info->next_q_idx = dev_info->multi_intr_cap ?
1053 RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1056 n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1057 rx_adapter->max_nb_rx,
1059 rx_adapter->qd_valid = !rxq_empty;
1061 if (nb_rx > rx_adapter->max_nb_rx)
1067 rx_adapter->stats.rx_intr_packets += nb_rx;
1072 * Polls receive queues added to the event adapter and enqueues received
1073 * packets to the event device.
1075 * The receive code enqueues initially to a temporary buffer, the
1076 * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1078 * If there isn't space available in the temporary buffer, packets from the
1079 * Rx queue aren't dequeued from the eth device, this back pressures the
1080 * eth device, in virtual device environments this back pressure is relayed to
1081 * the hypervisor's switching layer where adjustments can be made to deal with
1084 static inline uint32_t
1085 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
1089 struct rte_eth_event_enqueue_buffer *buf;
1093 wrr_pos = rx_adapter->wrr_pos;
1094 max_nb_rx = rx_adapter->max_nb_rx;
1095 buf = &rx_adapter->event_enqueue_buffer;
1097 /* Iterate through a WRR sequence */
1098 for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1099 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1100 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1101 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1103 /* Don't do a batch dequeue from the rx queue if there isn't
1104 * enough space in the enqueue buffer.
1106 if (buf->count >= BATCH_SIZE)
1107 rxa_flush_event_buffer(rx_adapter);
1108 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
1109 rx_adapter->wrr_pos = wrr_pos;
1113 nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1115 if (nb_rx > max_nb_rx) {
1116 rx_adapter->wrr_pos =
1117 (wrr_pos + 1) % rx_adapter->wrr_len;
1121 if (++wrr_pos == rx_adapter->wrr_len)
1128 rxa_service_func(void *args)
1130 struct rte_event_eth_rx_adapter *rx_adapter = args;
1131 struct rte_event_eth_rx_adapter_stats *stats;
1133 if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1135 if (!rx_adapter->rxa_started) {
1136 rte_spinlock_unlock(&rx_adapter->rx_lock);
1140 stats = &rx_adapter->stats;
1141 stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
1142 stats->rx_packets += rxa_poll(rx_adapter);
1143 rte_spinlock_unlock(&rx_adapter->rx_lock);
1148 rte_event_eth_rx_adapter_init(void)
1150 const char *name = "rte_event_eth_rx_adapter_array";
1151 const struct rte_memzone *mz;
1154 sz = sizeof(*event_eth_rx_adapter) *
1155 RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1156 sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1158 mz = rte_memzone_lookup(name);
1160 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1161 RTE_CACHE_LINE_SIZE);
1163 RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1169 event_eth_rx_adapter = mz->addr;
1173 static inline struct rte_event_eth_rx_adapter *
1174 rxa_id_to_adapter(uint8_t id)
1176 return event_eth_rx_adapter ?
1177 event_eth_rx_adapter[id] : NULL;
1181 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1182 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1185 struct rte_eventdev *dev;
1186 struct rte_event_dev_config dev_conf;
1189 struct rte_event_port_conf *port_conf = arg;
1190 struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1192 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1193 dev_conf = dev->data->dev_conf;
1195 started = dev->data->dev_started;
1197 rte_event_dev_stop(dev_id);
1198 port_id = dev_conf.nb_event_ports;
1199 dev_conf.nb_event_ports += 1;
1200 ret = rte_event_dev_configure(dev_id, &dev_conf);
1202 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1205 if (rte_event_dev_start(dev_id))
1211 ret = rte_event_port_setup(dev_id, port_id, port_conf);
1213 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1218 conf->event_port_id = port_id;
1219 conf->max_nb_rx = 128;
1221 ret = rte_event_dev_start(dev_id);
1222 rx_adapter->default_cb_arg = 1;
1227 rxa_epoll_create1(void)
1231 fd = epoll_create1(EPOLL_CLOEXEC);
1232 return fd < 0 ? -errno : fd;
1239 rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
1241 if (rx_adapter->epd != INIT_FD)
1244 rx_adapter->epd = rxa_epoll_create1();
1245 if (rx_adapter->epd < 0) {
1246 int err = rx_adapter->epd;
1247 rx_adapter->epd = INIT_FD;
1248 RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1256 rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1259 char thread_name[RTE_MAX_THREAD_NAME_LEN];
1261 if (rx_adapter->intr_ring)
1264 rx_adapter->intr_ring = rte_ring_create("intr_ring",
1265 RTE_EVENT_ETH_INTR_RING_SIZE,
1266 rte_socket_id(), 0);
1267 if (!rx_adapter->intr_ring)
1270 rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1271 RTE_EVENT_ETH_INTR_RING_SIZE *
1272 sizeof(struct rte_epoll_event),
1273 RTE_CACHE_LINE_SIZE,
1274 rx_adapter->socket_id);
1275 if (!rx_adapter->epoll_events) {
1280 rte_spinlock_init(&rx_adapter->intr_ring_lock);
1282 snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1283 "rx-intr-thread-%d", rx_adapter->id);
1285 err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1286 NULL, rxa_intr_thread, rx_adapter);
1288 rte_thread_setname(rx_adapter->rx_intr_thread, thread_name);
1292 RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1294 rte_ring_free(rx_adapter->intr_ring);
1295 rx_adapter->intr_ring = NULL;
1296 rx_adapter->epoll_events = NULL;
1301 rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1305 err = pthread_cancel(rx_adapter->rx_intr_thread);
1307 RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1310 err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1312 RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1314 rte_free(rx_adapter->epoll_events);
1315 rte_ring_free(rx_adapter->intr_ring);
1316 rx_adapter->intr_ring = NULL;
1317 rx_adapter->epoll_events = NULL;
1322 rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
1326 if (rx_adapter->num_rx_intr == 0)
1329 ret = rxa_destroy_intr_thread(rx_adapter);
1333 close(rx_adapter->epd);
1334 rx_adapter->epd = INIT_FD;
1340 rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1341 struct eth_device_info *dev_info,
1342 uint16_t rx_queue_id)
1345 uint16_t eth_dev_id = dev_info->dev->data->port_id;
1346 int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1348 err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1350 RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1355 err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1360 RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1363 dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1365 dev_info->shared_intr_enabled = 0;
1370 rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1371 struct eth_device_info *dev_info,
1378 if (dev_info->nb_rx_intr == 0)
1382 if (rx_queue_id == -1) {
1383 s = dev_info->nb_shared_intr;
1384 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1388 q = dev_info->intr_queue[i];
1389 sintr = rxa_shared_intr(dev_info, q);
1392 if (!sintr || s == 0) {
1394 err = rxa_disable_intr(rx_adapter, dev_info,
1398 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1403 if (!rxa_intr_queue(dev_info, rx_queue_id))
1405 if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1406 dev_info->nb_shared_intr == 1) {
1407 err = rxa_disable_intr(rx_adapter, dev_info,
1411 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1415 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1416 if (dev_info->intr_queue[i] == rx_queue_id) {
1417 for (; i < dev_info->nb_rx_intr - 1; i++)
1418 dev_info->intr_queue[i] =
1419 dev_info->intr_queue[i + 1];
1429 rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1430 struct eth_device_info *dev_info,
1431 uint16_t rx_queue_id)
1434 uint16_t eth_dev_id = dev_info->dev->data->port_id;
1435 union queue_data qd;
1437 uint16_t *intr_queue;
1438 int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1440 if (rxa_intr_queue(dev_info, rx_queue_id))
1443 intr_queue = dev_info->intr_queue;
1444 if (dev_info->intr_queue == NULL) {
1446 dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1447 dev_info->intr_queue =
1449 rx_adapter->mem_name,
1452 rx_adapter->socket_id);
1453 if (dev_info->intr_queue == NULL)
1457 init_fd = rx_adapter->epd;
1458 err = rxa_init_epd(rx_adapter);
1460 goto err_free_queue;
1462 qd.port = eth_dev_id;
1463 qd.queue = rx_queue_id;
1465 err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1470 RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1471 " Rx Queue %u err %d", rx_queue_id, err);
1475 err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1477 RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1478 " Rx Queue %u err %d", rx_queue_id, err);
1483 err = rxa_create_intr_thread(rx_adapter);
1486 dev_info->shared_intr_enabled = 1;
1488 dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1493 err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1495 RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1496 " Rx Queue %u err %d", rx_queue_id, err);
1498 err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1503 RTE_EDEV_LOG_ERR("Could not delete event for"
1504 " Rx Queue %u err %d", rx_queue_id, err1);
1507 if (init_fd == INIT_FD) {
1508 close(rx_adapter->epd);
1509 rx_adapter->epd = -1;
1512 if (intr_queue == NULL)
1513 rte_free(dev_info->intr_queue);
1519 rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1520 struct eth_device_info *dev_info,
1526 int shared_done = (dev_info->nb_shared_intr > 0);
1528 if (rx_queue_id != -1) {
1529 if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1531 return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1535 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1537 if (rxa_shared_intr(dev_info, i) && shared_done)
1540 err = rxa_config_intr(rx_adapter, dev_info, i);
1542 shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1545 dev_info->shared_intr_enabled = 1;
1554 shared_done = (dev_info->nb_shared_intr > 0);
1555 for (j = 0; j < i; j++) {
1556 if (rxa_intr_queue(dev_info, j))
1558 if (rxa_shared_intr(dev_info, j) && si != j)
1560 err = rxa_disable_intr(rx_adapter, dev_info, j);
1571 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
1574 struct rte_service_spec service;
1575 struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1577 if (rx_adapter->service_inited)
1580 memset(&service, 0, sizeof(service));
1581 snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1582 "rte_event_eth_rx_adapter_%d", id);
1583 service.socket_id = rx_adapter->socket_id;
1584 service.callback = rxa_service_func;
1585 service.callback_userdata = rx_adapter;
1586 /* Service function handles locking for queue add/del updates */
1587 service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1588 ret = rte_service_component_register(&service, &rx_adapter->service_id);
1590 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1595 ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1596 &rx_adapter_conf, rx_adapter->conf_arg);
1598 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1602 rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1603 rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1604 rx_adapter->service_inited = 1;
1605 rx_adapter->epd = INIT_FD;
1609 rte_service_component_unregister(rx_adapter->service_id);
1614 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1615 struct eth_device_info *dev_info,
1616 int32_t rx_queue_id,
1619 struct eth_rx_queue_info *queue_info;
1623 if (dev_info->rx_queue == NULL)
1626 if (rx_queue_id == -1) {
1627 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1628 rxa_update_queue(rx_adapter, dev_info, i, add);
1630 queue_info = &dev_info->rx_queue[rx_queue_id];
1631 enabled = queue_info->queue_enabled;
1633 rx_adapter->nb_queues += !enabled;
1634 dev_info->nb_dev_queues += !enabled;
1636 rx_adapter->nb_queues -= enabled;
1637 dev_info->nb_dev_queues -= enabled;
1639 queue_info->queue_enabled = !!add;
1644 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
1645 struct eth_device_info *dev_info,
1646 int32_t rx_queue_id)
1653 if (rx_adapter->nb_queues == 0)
1656 if (rx_queue_id == -1) {
1657 uint16_t nb_rx_queues;
1660 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1661 for (i = 0; i < nb_rx_queues; i++)
1662 rxa_sw_del(rx_adapter, dev_info, i);
1666 pollq = rxa_polled_queue(dev_info, rx_queue_id);
1667 intrq = rxa_intr_queue(dev_info, rx_queue_id);
1668 sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1669 rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1670 rx_adapter->num_rx_polled -= pollq;
1671 dev_info->nb_rx_poll -= pollq;
1672 rx_adapter->num_rx_intr -= intrq;
1673 dev_info->nb_rx_intr -= intrq;
1674 dev_info->nb_shared_intr -= intrq && sintrq;
1678 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1679 struct eth_device_info *dev_info,
1680 int32_t rx_queue_id,
1681 const struct rte_event_eth_rx_adapter_queue_conf *conf)
1683 struct eth_rx_queue_info *queue_info;
1684 const struct rte_event *ev = &conf->ev;
1688 struct rte_event *qi_ev;
1690 if (rx_queue_id == -1) {
1691 uint16_t nb_rx_queues;
1694 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1695 for (i = 0; i < nb_rx_queues; i++)
1696 rxa_add_queue(rx_adapter, dev_info, i, conf);
1700 pollq = rxa_polled_queue(dev_info, rx_queue_id);
1701 intrq = rxa_intr_queue(dev_info, rx_queue_id);
1702 sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1704 queue_info = &dev_info->rx_queue[rx_queue_id];
1705 queue_info->wt = conf->servicing_weight;
1707 qi_ev = (struct rte_event *)&queue_info->event;
1708 qi_ev->event = ev->event;
1709 qi_ev->op = RTE_EVENT_OP_NEW;
1710 qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
1711 qi_ev->sub_event_type = 0;
1713 if (conf->rx_queue_flags &
1714 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
1715 queue_info->flow_id_mask = ~0;
1719 rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
1720 if (rxa_polled_queue(dev_info, rx_queue_id)) {
1721 rx_adapter->num_rx_polled += !pollq;
1722 dev_info->nb_rx_poll += !pollq;
1723 rx_adapter->num_rx_intr -= intrq;
1724 dev_info->nb_rx_intr -= intrq;
1725 dev_info->nb_shared_intr -= intrq && sintrq;
1728 if (rxa_intr_queue(dev_info, rx_queue_id)) {
1729 rx_adapter->num_rx_polled -= pollq;
1730 dev_info->nb_rx_poll -= pollq;
1731 rx_adapter->num_rx_intr += !intrq;
1732 dev_info->nb_rx_intr += !intrq;
1733 dev_info->nb_shared_intr += !intrq && sintrq;
1734 if (dev_info->nb_shared_intr == 1) {
1735 if (dev_info->multi_intr_cap)
1736 dev_info->next_q_idx =
1737 RTE_MAX_RXTX_INTR_VEC_ID - 1;
1739 dev_info->next_q_idx = 0;
1744 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
1745 uint16_t eth_dev_id,
1747 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
1749 struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
1750 struct rte_event_eth_rx_adapter_queue_conf temp_conf;
1752 struct eth_rx_poll_entry *rx_poll;
1753 struct eth_rx_queue_info *rx_queue;
1755 uint16_t nb_rx_queues;
1756 uint32_t nb_rx_poll, nb_wrr;
1757 uint32_t nb_rx_intr;
1761 if (queue_conf->servicing_weight == 0) {
1762 struct rte_eth_dev_data *data = dev_info->dev->data;
1764 temp_conf = *queue_conf;
1765 if (!data->dev_conf.intr_conf.rxq) {
1766 /* If Rx interrupts are disabled set wt = 1 */
1767 temp_conf.servicing_weight = 1;
1769 queue_conf = &temp_conf;
1772 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1773 rx_queue = dev_info->rx_queue;
1774 wt = queue_conf->servicing_weight;
1776 if (dev_info->rx_queue == NULL) {
1777 dev_info->rx_queue =
1778 rte_zmalloc_socket(rx_adapter->mem_name,
1780 sizeof(struct eth_rx_queue_info), 0,
1781 rx_adapter->socket_id);
1782 if (dev_info->rx_queue == NULL)
1788 rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
1789 queue_conf->servicing_weight,
1790 &nb_rx_poll, &nb_rx_intr, &nb_wrr);
1792 if (dev_info->dev->intr_handle)
1793 dev_info->multi_intr_cap =
1794 rte_intr_cap_multiple(dev_info->dev->intr_handle);
1796 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
1799 goto err_free_rxqueue;
1802 num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
1804 ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
1806 goto err_free_rxqueue;
1808 ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
1810 goto err_free_rxqueue;
1814 if (rx_adapter->num_rx_intr > nb_rx_intr) {
1815 num_intr_vec = rxa_nb_intr_vect(dev_info,
1817 /* interrupt based queues are being converted to
1818 * poll mode queues, delete the interrupt configuration
1821 ret = rxa_del_intr_queue(rx_adapter,
1822 dev_info, rx_queue_id);
1824 goto err_free_rxqueue;
1828 if (nb_rx_intr == 0) {
1829 ret = rxa_free_intr_resources(rx_adapter);
1831 goto err_free_rxqueue;
1837 if (rx_queue_id == -1) {
1838 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1839 dev_info->intr_queue[i] = i;
1841 if (!rxa_intr_queue(dev_info, rx_queue_id))
1842 dev_info->intr_queue[nb_rx_intr - 1] =
1849 rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
1850 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
1852 rte_free(rx_adapter->eth_rx_poll);
1853 rte_free(rx_adapter->wrr_sched);
1855 rx_adapter->eth_rx_poll = rx_poll;
1856 rx_adapter->wrr_sched = rx_wrr;
1857 rx_adapter->wrr_len = nb_wrr;
1858 rx_adapter->num_intr_vec += num_intr_vec;
1862 if (rx_queue == NULL) {
1863 rte_free(dev_info->rx_queue);
1864 dev_info->rx_queue = NULL;
1874 rxa_ctrl(uint8_t id, int start)
1876 struct rte_event_eth_rx_adapter *rx_adapter;
1877 struct rte_eventdev *dev;
1878 struct eth_device_info *dev_info;
1880 int use_service = 0;
1883 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1884 rx_adapter = rxa_id_to_adapter(id);
1885 if (rx_adapter == NULL)
1888 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1890 RTE_ETH_FOREACH_DEV(i) {
1891 dev_info = &rx_adapter->eth_devices[i];
1892 /* if start check for num dev queues */
1893 if (start && !dev_info->nb_dev_queues)
1895 /* if stop check if dev has been started */
1896 if (stop && !dev_info->dev_rx_started)
1898 use_service |= !dev_info->internal_event_port;
1899 dev_info->dev_rx_started = start;
1900 if (dev_info->internal_event_port == 0)
1902 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
1903 &rte_eth_devices[i]) :
1904 (*dev->dev_ops->eth_rx_adapter_stop)(dev,
1905 &rte_eth_devices[i]);
1909 rte_spinlock_lock(&rx_adapter->rx_lock);
1910 rx_adapter->rxa_started = start;
1911 rte_service_runstate_set(rx_adapter->service_id, start);
1912 rte_spinlock_unlock(&rx_adapter->rx_lock);
1919 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
1920 rte_event_eth_rx_adapter_conf_cb conf_cb,
1923 struct rte_event_eth_rx_adapter *rx_adapter;
1927 char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
1928 const uint8_t default_rss_key[] = {
1929 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
1930 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
1931 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
1932 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
1933 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
1936 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1937 RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1938 if (conf_cb == NULL)
1941 if (event_eth_rx_adapter == NULL) {
1942 ret = rte_event_eth_rx_adapter_init();
1947 rx_adapter = rxa_id_to_adapter(id);
1948 if (rx_adapter != NULL) {
1949 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
1953 socket_id = rte_event_dev_socket_id(dev_id);
1954 snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
1955 "rte_event_eth_rx_adapter_%d",
1958 rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
1959 RTE_CACHE_LINE_SIZE, socket_id);
1960 if (rx_adapter == NULL) {
1961 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
1965 rx_adapter->eventdev_id = dev_id;
1966 rx_adapter->socket_id = socket_id;
1967 rx_adapter->conf_cb = conf_cb;
1968 rx_adapter->conf_arg = conf_arg;
1969 rx_adapter->id = id;
1970 strcpy(rx_adapter->mem_name, mem_name);
1971 rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
1973 sizeof(struct eth_device_info), 0,
1975 rte_convert_rss_key((const uint32_t *)default_rss_key,
1976 (uint32_t *)rx_adapter->rss_key_be,
1977 RTE_DIM(default_rss_key));
1979 if (rx_adapter->eth_devices == NULL) {
1980 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
1981 rte_free(rx_adapter);
1984 rte_spinlock_init(&rx_adapter->rx_lock);
1985 for (i = 0; i < RTE_MAX_ETHPORTS; i++)
1986 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
1988 event_eth_rx_adapter[id] = rx_adapter;
1989 if (conf_cb == rxa_default_conf_cb)
1990 rx_adapter->default_cb_arg = 1;
1991 rte_eventdev_trace_eth_rx_adapter_create(id, dev_id, conf_cb,
1997 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
1998 struct rte_event_port_conf *port_config)
2000 struct rte_event_port_conf *pc;
2003 if (port_config == NULL)
2005 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2007 pc = rte_malloc(NULL, sizeof(*pc), 0);
2011 ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2012 rxa_default_conf_cb,
2020 rte_event_eth_rx_adapter_free(uint8_t id)
2022 struct rte_event_eth_rx_adapter *rx_adapter;
2024 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2026 rx_adapter = rxa_id_to_adapter(id);
2027 if (rx_adapter == NULL)
2030 if (rx_adapter->nb_queues) {
2031 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2032 rx_adapter->nb_queues);
2036 if (rx_adapter->default_cb_arg)
2037 rte_free(rx_adapter->conf_arg);
2038 rte_free(rx_adapter->eth_devices);
2039 rte_free(rx_adapter);
2040 event_eth_rx_adapter[id] = NULL;
2042 rte_eventdev_trace_eth_rx_adapter_free(id);
2047 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2048 uint16_t eth_dev_id,
2049 int32_t rx_queue_id,
2050 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2054 struct rte_event_eth_rx_adapter *rx_adapter;
2055 struct rte_eventdev *dev;
2056 struct eth_device_info *dev_info;
2058 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2059 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2061 rx_adapter = rxa_id_to_adapter(id);
2062 if ((rx_adapter == NULL) || (queue_conf == NULL))
2065 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2066 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2070 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2071 "eth port %" PRIu16, id, eth_dev_id);
2075 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2076 && (queue_conf->rx_queue_flags &
2077 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2078 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2079 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2084 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2085 (rx_queue_id != -1)) {
2086 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2087 "event queue, eth port: %" PRIu16 " adapter id: %"
2088 PRIu8, eth_dev_id, id);
2092 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2093 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2094 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2095 (uint16_t)rx_queue_id);
2099 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2101 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2102 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2104 if (dev_info->rx_queue == NULL) {
2105 dev_info->rx_queue =
2106 rte_zmalloc_socket(rx_adapter->mem_name,
2107 dev_info->dev->data->nb_rx_queues *
2108 sizeof(struct eth_rx_queue_info), 0,
2109 rx_adapter->socket_id);
2110 if (dev_info->rx_queue == NULL)
2114 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2115 &rte_eth_devices[eth_dev_id],
2116 rx_queue_id, queue_conf);
2118 dev_info->internal_event_port = 1;
2119 rxa_update_queue(rx_adapter,
2120 &rx_adapter->eth_devices[eth_dev_id],
2125 rte_spinlock_lock(&rx_adapter->rx_lock);
2126 dev_info->internal_event_port = 0;
2127 ret = rxa_init_service(rx_adapter, id);
2129 uint32_t service_id = rx_adapter->service_id;
2130 ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2132 rte_service_component_runstate_set(service_id,
2133 rxa_sw_adapter_queue_count(rx_adapter));
2135 rte_spinlock_unlock(&rx_adapter->rx_lock);
2138 rte_eventdev_trace_eth_rx_adapter_queue_add(id, eth_dev_id,
2139 rx_queue_id, queue_conf, ret);
2147 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2148 int32_t rx_queue_id)
2151 struct rte_eventdev *dev;
2152 struct rte_event_eth_rx_adapter *rx_adapter;
2153 struct eth_device_info *dev_info;
2155 uint32_t nb_rx_poll = 0;
2156 uint32_t nb_wrr = 0;
2157 uint32_t nb_rx_intr;
2158 struct eth_rx_poll_entry *rx_poll = NULL;
2159 uint32_t *rx_wrr = NULL;
2162 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2163 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2165 rx_adapter = rxa_id_to_adapter(id);
2166 if (rx_adapter == NULL)
2169 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2170 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2176 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2177 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2178 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2179 (uint16_t)rx_queue_id);
2183 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2185 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2186 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2188 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2189 &rte_eth_devices[eth_dev_id],
2192 rxa_update_queue(rx_adapter,
2193 &rx_adapter->eth_devices[eth_dev_id],
2196 if (dev_info->nb_dev_queues == 0) {
2197 rte_free(dev_info->rx_queue);
2198 dev_info->rx_queue = NULL;
2202 rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2203 &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2205 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2210 rte_spinlock_lock(&rx_adapter->rx_lock);
2213 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2215 num_intr_vec = rxa_nb_intr_vect(dev_info,
2217 ret = rxa_del_intr_queue(rx_adapter, dev_info,
2223 if (nb_rx_intr == 0) {
2224 ret = rxa_free_intr_resources(rx_adapter);
2229 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2230 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2232 rte_free(rx_adapter->eth_rx_poll);
2233 rte_free(rx_adapter->wrr_sched);
2235 if (nb_rx_intr == 0) {
2236 rte_free(dev_info->intr_queue);
2237 dev_info->intr_queue = NULL;
2240 rx_adapter->eth_rx_poll = rx_poll;
2241 rx_adapter->wrr_sched = rx_wrr;
2242 rx_adapter->wrr_len = nb_wrr;
2243 rx_adapter->num_intr_vec += num_intr_vec;
2245 if (dev_info->nb_dev_queues == 0) {
2246 rte_free(dev_info->rx_queue);
2247 dev_info->rx_queue = NULL;
2250 rte_spinlock_unlock(&rx_adapter->rx_lock);
2257 rte_service_component_runstate_set(rx_adapter->service_id,
2258 rxa_sw_adapter_queue_count(rx_adapter));
2261 rte_eventdev_trace_eth_rx_adapter_queue_del(id, eth_dev_id,
2267 rte_event_eth_rx_adapter_start(uint8_t id)
2269 rte_eventdev_trace_eth_rx_adapter_start(id);
2270 return rxa_ctrl(id, 1);
2274 rte_event_eth_rx_adapter_stop(uint8_t id)
2276 rte_eventdev_trace_eth_rx_adapter_stop(id);
2277 return rxa_ctrl(id, 0);
2281 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2282 struct rte_event_eth_rx_adapter_stats *stats)
2284 struct rte_event_eth_rx_adapter *rx_adapter;
2285 struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2286 struct rte_event_eth_rx_adapter_stats dev_stats;
2287 struct rte_eventdev *dev;
2288 struct eth_device_info *dev_info;
2292 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2294 rx_adapter = rxa_id_to_adapter(id);
2295 if (rx_adapter == NULL || stats == NULL)
2298 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2299 memset(stats, 0, sizeof(*stats));
2300 RTE_ETH_FOREACH_DEV(i) {
2301 dev_info = &rx_adapter->eth_devices[i];
2302 if (dev_info->internal_event_port == 0 ||
2303 dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2305 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2306 &rte_eth_devices[i],
2310 dev_stats_sum.rx_packets += dev_stats.rx_packets;
2311 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2314 if (rx_adapter->service_inited)
2315 *stats = rx_adapter->stats;
2317 stats->rx_packets += dev_stats_sum.rx_packets;
2318 stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2323 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
2325 struct rte_event_eth_rx_adapter *rx_adapter;
2326 struct rte_eventdev *dev;
2327 struct eth_device_info *dev_info;
2330 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2332 rx_adapter = rxa_id_to_adapter(id);
2333 if (rx_adapter == NULL)
2336 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2337 RTE_ETH_FOREACH_DEV(i) {
2338 dev_info = &rx_adapter->eth_devices[i];
2339 if (dev_info->internal_event_port == 0 ||
2340 dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
2342 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
2343 &rte_eth_devices[i]);
2346 memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
2351 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
2353 struct rte_event_eth_rx_adapter *rx_adapter;
2355 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2357 rx_adapter = rxa_id_to_adapter(id);
2358 if (rx_adapter == NULL || service_id == NULL)
2361 if (rx_adapter->service_inited)
2362 *service_id = rx_adapter->service_id;
2364 return rx_adapter->service_inited ? 0 : -ESRCH;
2368 rte_event_eth_rx_adapter_cb_register(uint8_t id,
2369 uint16_t eth_dev_id,
2370 rte_event_eth_rx_adapter_cb_fn cb_fn,
2373 struct rte_event_eth_rx_adapter *rx_adapter;
2374 struct eth_device_info *dev_info;
2378 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2379 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2381 rx_adapter = rxa_id_to_adapter(id);
2382 if (rx_adapter == NULL)
2385 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2386 if (dev_info->rx_queue == NULL)
2389 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2393 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2394 "eth port %" PRIu16, id, eth_dev_id);
2398 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2399 RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
2400 PRIu16, eth_dev_id);
2404 rte_spinlock_lock(&rx_adapter->rx_lock);
2405 dev_info->cb_fn = cb_fn;
2406 dev_info->cb_arg = cb_arg;
2407 rte_spinlock_unlock(&rx_adapter->rx_lock);