1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2017 Intel Corporation.
10 #include <rte_cycles.h>
11 #include <rte_common.h>
13 #include <rte_errno.h>
14 #include <rte_ethdev.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
21 #include "rte_eventdev.h"
22 #include "rte_eventdev_pmd.h"
23 #include "rte_eventdev_trace.h"
24 #include "rte_event_eth_rx_adapter.h"
27 #define BLOCK_CNT_THRESHOLD 10
28 #define ETH_EVENT_BUFFER_SIZE (4*BATCH_SIZE)
30 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
31 #define ETH_RX_ADAPTER_MEM_NAME_LEN 32
33 #define RSS_KEY_SIZE 40
34 /* value written to intr thread pipe to signal thread exit */
35 #define ETH_BRIDGE_INTR_THREAD_EXIT 1
36 /* Sentinel value to detect initialized file handle */
40 * Used to store port and queue ID of interrupting Rx queue
52 * There is an instance of this struct per polled Rx queue added to the
55 struct eth_rx_poll_entry {
56 /* Eth port to poll */
58 /* Eth rx queue to poll */
62 /* Instance per adapter */
63 struct rte_eth_event_enqueue_buffer {
64 /* Count of events in this buffer */
66 /* Array of events in this buffer */
67 struct rte_event events[ETH_EVENT_BUFFER_SIZE];
70 struct rte_event_eth_rx_adapter {
72 uint8_t rss_key_be[RSS_KEY_SIZE];
73 /* Event device identifier */
75 /* Per ethernet device structure */
76 struct eth_device_info *eth_devices;
77 /* Event port identifier */
78 uint8_t event_port_id;
79 /* Lock to serialize config updates with service function */
80 rte_spinlock_t rx_lock;
81 /* Max mbufs processed in any service function invocation */
83 /* Receive queues that need to be polled */
84 struct eth_rx_poll_entry *eth_rx_poll;
85 /* Size of the eth_rx_poll array */
86 uint16_t num_rx_polled;
87 /* Weighted round robin schedule */
89 /* wrr_sched[] size */
91 /* Next entry in wrr[] to begin polling */
93 /* Event burst buffer */
94 struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
95 /* Per adapter stats */
96 struct rte_event_eth_rx_adapter_stats stats;
97 /* Block count, counts up to BLOCK_CNT_THRESHOLD */
98 uint16_t enq_block_count;
100 uint64_t rx_enq_block_start_ts;
101 /* epoll fd used to wait for Rx interrupts */
103 /* Num of interrupt driven interrupt queues */
104 uint32_t num_rx_intr;
105 /* Used to send <dev id, queue id> of interrupting Rx queues from
106 * the interrupt thread to the Rx thread
108 struct rte_ring *intr_ring;
109 /* Rx Queue data (dev id, queue id) for the last non-empty
113 /* queue_data is valid */
115 /* Interrupt ring lock, synchronizes Rx thread
116 * and interrupt thread
118 rte_spinlock_t intr_ring_lock;
119 /* event array passed to rte_poll_wait */
120 struct rte_epoll_event *epoll_events;
121 /* Count of interrupt vectors in use */
122 uint32_t num_intr_vec;
123 /* Thread blocked on Rx interrupts */
124 pthread_t rx_intr_thread;
125 /* Configuration callback for rte_service configuration */
126 rte_event_eth_rx_adapter_conf_cb conf_cb;
127 /* Configuration callback argument */
129 /* Set if default_cb is being used */
131 /* Service initialization state */
132 uint8_t service_inited;
133 /* Total count of Rx queues in adapter */
135 /* Memory allocation name */
136 char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
137 /* Socket identifier cached from eventdev */
139 /* Per adapter EAL service */
141 /* Adapter started flag */
145 } __rte_cache_aligned;
148 struct eth_device_info {
149 struct rte_eth_dev *dev;
150 struct eth_rx_queue_info *rx_queue;
152 rte_event_eth_rx_adapter_cb_fn cb_fn;
153 /* Rx callback argument */
155 /* Set if ethdev->eventdev packet transfer uses a
158 uint8_t internal_event_port;
159 /* Set if the adapter is processing rx queues for
160 * this eth device and packet processing has been
161 * started, allows for the code to know if the PMD
162 * rx_adapter_stop callback needs to be invoked
164 uint8_t dev_rx_started;
165 /* Number of queues added for this device */
166 uint16_t nb_dev_queues;
167 /* Number of poll based queues
168 * If nb_rx_poll > 0, the start callback will
169 * be invoked if not already invoked
172 /* Number of interrupt based queues
173 * If nb_rx_intr > 0, the start callback will
174 * be invoked if not already invoked.
177 /* Number of queues that use the shared interrupt */
178 uint16_t nb_shared_intr;
179 /* sum(wrr(q)) for all queues within the device
180 * useful when deleting all device queues
183 /* Intr based queue index to start polling from, this is used
184 * if the number of shared interrupts is non-zero
187 /* Intr based queue indices */
188 uint16_t *intr_queue;
189 /* device generates per Rx queue interrupt for queue index
190 * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
193 /* shared interrupt enabled */
194 int shared_intr_enabled;
198 struct eth_rx_queue_info {
199 int queue_enabled; /* True if added */
201 uint16_t wt; /* Polling weight */
202 uint32_t flow_id_mask; /* Set to ~0 if app provides flow id else 0 */
206 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
209 rxa_validate_id(uint8_t id)
211 return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
214 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
215 if (!rxa_validate_id(id)) { \
216 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
222 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
224 return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
227 /* Greatest common divisor */
228 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
232 return r ? rxa_gcd_u16(b, r) : b;
235 /* Returns the next queue in the polling sequence
237 * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
240 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
241 unsigned int n, int *cw,
242 struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
243 uint16_t gcd, int prev)
259 q = eth_rx_poll[i].eth_rx_qid;
260 d = eth_rx_poll[i].eth_dev_id;
261 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
269 rxa_shared_intr(struct eth_device_info *dev_info,
274 if (dev_info->dev->intr_handle == NULL)
277 multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
278 return !multi_intr_cap ||
279 rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
283 rxa_intr_queue(struct eth_device_info *dev_info,
286 struct eth_rx_queue_info *queue_info;
288 queue_info = &dev_info->rx_queue[rx_queue_id];
289 return dev_info->rx_queue &&
290 !dev_info->internal_event_port &&
291 queue_info->queue_enabled && queue_info->wt == 0;
295 rxa_polled_queue(struct eth_device_info *dev_info,
298 struct eth_rx_queue_info *queue_info;
300 queue_info = &dev_info->rx_queue[rx_queue_id];
301 return !dev_info->internal_event_port &&
302 dev_info->rx_queue &&
303 queue_info->queue_enabled && queue_info->wt != 0;
306 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
308 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
314 nbq = dev_info->dev->data->nb_rx_queues;
315 n = 0; /* non shared count */
316 s = 0; /* shared count */
318 if (rx_queue_id == -1) {
319 for (i = 0; i < nbq; i++) {
320 if (!rxa_shared_intr(dev_info, i))
321 n += add ? !rxa_intr_queue(dev_info, i) :
322 rxa_intr_queue(dev_info, i);
324 s += add ? !rxa_intr_queue(dev_info, i) :
325 rxa_intr_queue(dev_info, i);
329 if ((add && dev_info->nb_shared_intr == 0) ||
330 (!add && dev_info->nb_shared_intr))
334 if (!rxa_shared_intr(dev_info, rx_queue_id))
335 n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
336 rxa_intr_queue(dev_info, rx_queue_id);
338 n = add ? !dev_info->nb_shared_intr :
339 dev_info->nb_shared_intr == 1;
345 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
348 rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
349 struct eth_device_info *dev_info,
351 uint32_t *nb_rx_intr)
355 if (rx_queue_id == -1)
356 intr_diff = dev_info->nb_rx_intr;
358 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
360 *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
363 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
364 * interrupt queues could currently be poll mode Rx queues
367 rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
368 struct eth_device_info *dev_info,
370 uint32_t *nb_rx_poll,
371 uint32_t *nb_rx_intr,
376 uint32_t wrr_len_diff;
378 if (rx_queue_id == -1) {
379 intr_diff = dev_info->dev->data->nb_rx_queues -
380 dev_info->nb_rx_intr;
381 poll_diff = dev_info->nb_rx_poll;
382 wrr_len_diff = dev_info->wrr_len;
384 intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
385 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
386 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
390 *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
391 *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
392 *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
395 /* Calculate size of the eth_rx_poll and wrr_sched arrays
396 * after deleting poll mode rx queues
399 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
400 struct eth_device_info *dev_info,
402 uint32_t *nb_rx_poll,
406 uint32_t wrr_len_diff;
408 if (rx_queue_id == -1) {
409 poll_diff = dev_info->nb_rx_poll;
410 wrr_len_diff = dev_info->wrr_len;
412 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
413 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
417 *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
418 *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
421 /* Calculate nb_rx_* after adding poll mode rx queues
424 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
425 struct eth_device_info *dev_info,
428 uint32_t *nb_rx_poll,
429 uint32_t *nb_rx_intr,
434 uint32_t wrr_len_diff;
436 if (rx_queue_id == -1) {
437 intr_diff = dev_info->nb_rx_intr;
438 poll_diff = dev_info->dev->data->nb_rx_queues -
439 dev_info->nb_rx_poll;
440 wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
443 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
444 poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
445 wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
446 wt - dev_info->rx_queue[rx_queue_id].wt :
450 *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
451 *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
452 *nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
455 /* Calculate nb_rx_* after adding rx_queue_id */
457 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
458 struct eth_device_info *dev_info,
461 uint32_t *nb_rx_poll,
462 uint32_t *nb_rx_intr,
466 rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
467 wt, nb_rx_poll, nb_rx_intr, nb_wrr);
469 rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
470 nb_rx_poll, nb_rx_intr, nb_wrr);
473 /* Calculate nb_rx_* after deleting rx_queue_id */
475 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
476 struct eth_device_info *dev_info,
478 uint32_t *nb_rx_poll,
479 uint32_t *nb_rx_intr,
482 rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
484 rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
489 * Allocate the rx_poll array
491 static struct eth_rx_poll_entry *
492 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
493 uint32_t num_rx_polled)
497 len = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
498 RTE_CACHE_LINE_SIZE);
499 return rte_zmalloc_socket(rx_adapter->mem_name,
502 rx_adapter->socket_id);
506 * Allocate the WRR array
509 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
513 len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
514 RTE_CACHE_LINE_SIZE);
515 return rte_zmalloc_socket(rx_adapter->mem_name,
518 rx_adapter->socket_id);
522 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
525 struct eth_rx_poll_entry **rx_poll,
526 uint32_t **wrr_sched)
535 *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
536 if (*rx_poll == NULL) {
541 *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
542 if (*wrr_sched == NULL) {
549 /* Precalculate WRR polling sequence for all queues in rx_adapter */
551 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
552 struct eth_rx_poll_entry *rx_poll,
561 /* Initialize variables for calculation of wrr schedule */
562 uint16_t max_wrr_pos = 0;
563 unsigned int poll_q = 0;
570 /* Generate array of all queues to poll, the size of this
573 RTE_ETH_FOREACH_DEV(d) {
574 uint16_t nb_rx_queues;
575 struct eth_device_info *dev_info =
576 &rx_adapter->eth_devices[d];
577 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
578 if (dev_info->rx_queue == NULL)
580 if (dev_info->internal_event_port)
582 dev_info->wrr_len = 0;
583 for (q = 0; q < nb_rx_queues; q++) {
584 struct eth_rx_queue_info *queue_info =
585 &dev_info->rx_queue[q];
588 if (!rxa_polled_queue(dev_info, q))
591 rx_poll[poll_q].eth_dev_id = d;
592 rx_poll[poll_q].eth_rx_qid = q;
594 dev_info->wrr_len += wt;
595 max_wt = RTE_MAX(max_wt, wt);
596 gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
601 /* Generate polling sequence based on weights */
604 for (i = 0; i < max_wrr_pos; i++) {
605 rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
606 rx_poll, max_wt, gcd, prev);
612 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
613 struct rte_ipv6_hdr **ipv6_hdr)
615 struct rte_ether_hdr *eth_hdr =
616 rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
617 struct rte_vlan_hdr *vlan_hdr;
622 switch (eth_hdr->ether_type) {
623 case RTE_BE16(RTE_ETHER_TYPE_IPV4):
624 *ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
627 case RTE_BE16(RTE_ETHER_TYPE_IPV6):
628 *ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
631 case RTE_BE16(RTE_ETHER_TYPE_VLAN):
632 vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
633 switch (vlan_hdr->eth_proto) {
634 case RTE_BE16(RTE_ETHER_TYPE_IPV4):
635 *ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
637 case RTE_BE16(RTE_ETHER_TYPE_IPV6):
638 *ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
650 /* Calculate RSS hash for IPv4/6 */
651 static inline uint32_t
652 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
656 struct rte_ipv4_tuple ipv4_tuple;
657 struct rte_ipv6_tuple ipv6_tuple;
658 struct rte_ipv4_hdr *ipv4_hdr;
659 struct rte_ipv6_hdr *ipv6_hdr;
661 rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
664 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
665 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
667 input_len = RTE_THASH_V4_L3_LEN;
668 } else if (ipv6_hdr) {
669 rte_thash_load_v6_addrs(ipv6_hdr,
670 (union rte_thash_tuple *)&ipv6_tuple);
672 input_len = RTE_THASH_V6_L3_LEN;
676 return rte_softrss_be(tuple, input_len, rss_key_be);
680 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
682 return !!rx_adapter->enq_block_count;
686 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
688 if (rx_adapter->rx_enq_block_start_ts)
691 rx_adapter->enq_block_count++;
692 if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
695 rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
699 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
700 struct rte_event_eth_rx_adapter_stats *stats)
702 if (unlikely(!stats->rx_enq_start_ts))
703 stats->rx_enq_start_ts = rte_get_tsc_cycles();
705 if (likely(!rxa_enq_blocked(rx_adapter)))
708 rx_adapter->enq_block_count = 0;
709 if (rx_adapter->rx_enq_block_start_ts) {
710 stats->rx_enq_end_ts = rte_get_tsc_cycles();
711 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
712 rx_adapter->rx_enq_block_start_ts;
713 rx_adapter->rx_enq_block_start_ts = 0;
717 /* Enqueue buffered events to event device */
718 static inline uint16_t
719 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
721 struct rte_eth_event_enqueue_buffer *buf =
722 &rx_adapter->event_enqueue_buffer;
723 struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
725 uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
726 rx_adapter->event_port_id,
729 if (n != buf->count) {
732 (buf->count - n) * sizeof(struct rte_event));
733 stats->rx_enq_retry++;
736 n ? rxa_enq_block_end_ts(rx_adapter, stats) :
737 rxa_enq_block_start_ts(rx_adapter);
740 stats->rx_enq_count += n;
746 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
748 uint16_t rx_queue_id,
749 struct rte_mbuf **mbufs,
753 struct eth_device_info *dev_info =
754 &rx_adapter->eth_devices[eth_dev_id];
755 struct eth_rx_queue_info *eth_rx_queue_info =
756 &dev_info->rx_queue[rx_queue_id];
757 struct rte_eth_event_enqueue_buffer *buf =
758 &rx_adapter->event_enqueue_buffer;
759 struct rte_event *ev = &buf->events[buf->count];
760 uint64_t event = eth_rx_queue_info->event;
761 uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
762 struct rte_mbuf *m = mbufs[0];
770 /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
771 rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
772 do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
774 if ((m->ol_flags & PKT_RX_TIMESTAMP) == 0) {
775 ts = rte_get_tsc_cycles();
776 for (i = 0; i < num; i++) {
780 m->ol_flags |= PKT_RX_TIMESTAMP;
784 for (i = 0; i < num; i++) {
788 rxa_do_softrss(m, rx_adapter->rss_key_be) :
791 ev->flow_id = (rss & ~flow_id_mask) |
792 (ev->flow_id & flow_id_mask);
797 if (dev_info->cb_fn) {
800 nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
801 ETH_EVENT_BUFFER_SIZE, buf->count, ev,
802 num, dev_info->cb_arg, &dropped);
803 if (unlikely(nb_cb > num))
804 RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
809 rx_adapter->stats.rx_dropped += dropped;
815 /* Enqueue packets from <port, q> to event buffer */
816 static inline uint32_t
817 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
824 struct rte_mbuf *mbufs[BATCH_SIZE];
825 struct rte_eth_event_enqueue_buffer *buf =
826 &rx_adapter->event_enqueue_buffer;
827 struct rte_event_eth_rx_adapter_stats *stats =
834 /* Don't do a batch dequeue from the rx queue if there isn't
835 * enough space in the enqueue buffer.
837 while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
838 if (buf->count >= BATCH_SIZE)
839 rxa_flush_event_buffer(rx_adapter);
841 stats->rx_poll_count++;
842 n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
848 rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
850 if (rx_count + nb_rx > max_rx)
855 rxa_flush_event_buffer(rx_adapter);
861 rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
868 struct eth_device_info *dev_info;
869 struct eth_rx_queue_info *queue_info;
876 dev_info = &rx_adapter->eth_devices[port_id];
877 queue_info = &dev_info->rx_queue[queue];
878 rte_spinlock_lock(&rx_adapter->intr_ring_lock);
879 if (rxa_shared_intr(dev_info, queue))
880 intr_enabled = &dev_info->shared_intr_enabled;
882 intr_enabled = &queue_info->intr_enabled;
886 err = rte_ring_enqueue(rx_adapter->intr_ring, data);
887 /* Entry should always be available.
888 * The ring size equals the maximum number of interrupt
889 * vectors supported (an interrupt vector is shared in
890 * case of shared interrupts)
893 RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
894 " to ring: %s", strerror(-err));
896 rte_eth_dev_rx_intr_disable(port_id, queue);
898 rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
902 rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
903 uint32_t num_intr_vec)
905 if (rx_adapter->num_intr_vec + num_intr_vec >
906 RTE_EVENT_ETH_INTR_RING_SIZE) {
907 RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
908 " %d needed %d limit %d", rx_adapter->num_intr_vec,
909 num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
916 /* Delete entries for (dev, queue) from the interrupt ring */
918 rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
919 struct eth_device_info *dev_info,
920 uint16_t rx_queue_id)
925 rte_spinlock_lock(&rx_adapter->intr_ring_lock);
927 n = rte_ring_count(rx_adapter->intr_ring);
928 for (i = 0; i < n; i++) {
929 rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
930 if (!rxa_shared_intr(dev_info, rx_queue_id)) {
931 if (qd.port == dev_info->dev->data->port_id &&
932 qd.queue == rx_queue_id)
935 if (qd.port == dev_info->dev->data->port_id)
938 rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
941 rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
944 /* pthread callback handling interrupt mode receive queues
945 * After receiving an Rx interrupt, it enqueues the port id and queue id of the
946 * interrupting queue to the adapter's ring buffer for interrupt events.
947 * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
948 * the adapter service function.
951 rxa_intr_thread(void *arg)
953 struct rte_event_eth_rx_adapter *rx_adapter = arg;
954 struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
958 n = rte_epoll_wait(rx_adapter->epd, epoll_events,
959 RTE_EVENT_ETH_INTR_RING_SIZE, -1);
961 RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
963 for (i = 0; i < n; i++) {
964 rxa_intr_ring_enqueue(rx_adapter,
965 epoll_events[i].epdata.data);
972 /* Dequeue <port, q> from interrupt ring and enqueue received
975 static inline uint32_t
976 rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
981 struct rte_eth_event_enqueue_buffer *buf;
982 rte_spinlock_t *ring_lock;
983 uint8_t max_done = 0;
985 if (rx_adapter->num_rx_intr == 0)
988 if (rte_ring_count(rx_adapter->intr_ring) == 0
989 && !rx_adapter->qd_valid)
992 buf = &rx_adapter->event_enqueue_buffer;
993 ring_lock = &rx_adapter->intr_ring_lock;
995 if (buf->count >= BATCH_SIZE)
996 rxa_flush_event_buffer(rx_adapter);
998 while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
999 struct eth_device_info *dev_info;
1002 union queue_data qd = rx_adapter->qd;
1005 if (!rx_adapter->qd_valid) {
1006 struct eth_rx_queue_info *queue_info;
1008 rte_spinlock_lock(ring_lock);
1009 err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1011 rte_spinlock_unlock(ring_lock);
1017 rx_adapter->qd = qd;
1018 rx_adapter->qd_valid = 1;
1019 dev_info = &rx_adapter->eth_devices[port];
1020 if (rxa_shared_intr(dev_info, queue))
1021 dev_info->shared_intr_enabled = 1;
1023 queue_info = &dev_info->rx_queue[queue];
1024 queue_info->intr_enabled = 1;
1026 rte_eth_dev_rx_intr_enable(port, queue);
1027 rte_spinlock_unlock(ring_lock);
1032 dev_info = &rx_adapter->eth_devices[port];
1035 if (rxa_shared_intr(dev_info, queue)) {
1039 nb_queues = dev_info->dev->data->nb_rx_queues;
1041 for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1042 uint8_t enq_buffer_full;
1044 if (!rxa_intr_queue(dev_info, i))
1046 n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1047 rx_adapter->max_nb_rx,
1051 enq_buffer_full = !rxq_empty && n == 0;
1052 max_done = nb_rx > rx_adapter->max_nb_rx;
1054 if (enq_buffer_full || max_done) {
1055 dev_info->next_q_idx = i;
1060 rx_adapter->qd_valid = 0;
1062 /* Reinitialize for next interrupt */
1063 dev_info->next_q_idx = dev_info->multi_intr_cap ?
1064 RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1067 n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1068 rx_adapter->max_nb_rx,
1070 rx_adapter->qd_valid = !rxq_empty;
1072 if (nb_rx > rx_adapter->max_nb_rx)
1078 rx_adapter->stats.rx_intr_packets += nb_rx;
1083 * Polls receive queues added to the event adapter and enqueues received
1084 * packets to the event device.
1086 * The receive code enqueues initially to a temporary buffer, the
1087 * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1089 * If there isn't space available in the temporary buffer, packets from the
1090 * Rx queue aren't dequeued from the eth device, this back pressures the
1091 * eth device, in virtual device environments this back pressure is relayed to
1092 * the hypervisor's switching layer where adjustments can be made to deal with
1095 static inline uint32_t
1096 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
1100 struct rte_eth_event_enqueue_buffer *buf;
1104 wrr_pos = rx_adapter->wrr_pos;
1105 max_nb_rx = rx_adapter->max_nb_rx;
1106 buf = &rx_adapter->event_enqueue_buffer;
1108 /* Iterate through a WRR sequence */
1109 for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1110 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1111 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1112 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1114 /* Don't do a batch dequeue from the rx queue if there isn't
1115 * enough space in the enqueue buffer.
1117 if (buf->count >= BATCH_SIZE)
1118 rxa_flush_event_buffer(rx_adapter);
1119 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
1120 rx_adapter->wrr_pos = wrr_pos;
1124 nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1126 if (nb_rx > max_nb_rx) {
1127 rx_adapter->wrr_pos =
1128 (wrr_pos + 1) % rx_adapter->wrr_len;
1132 if (++wrr_pos == rx_adapter->wrr_len)
1139 rxa_service_func(void *args)
1141 struct rte_event_eth_rx_adapter *rx_adapter = args;
1142 struct rte_event_eth_rx_adapter_stats *stats;
1144 if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1146 if (!rx_adapter->rxa_started) {
1147 rte_spinlock_unlock(&rx_adapter->rx_lock);
1151 stats = &rx_adapter->stats;
1152 stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
1153 stats->rx_packets += rxa_poll(rx_adapter);
1154 rte_spinlock_unlock(&rx_adapter->rx_lock);
1159 rte_event_eth_rx_adapter_init(void)
1161 const char *name = "rte_event_eth_rx_adapter_array";
1162 const struct rte_memzone *mz;
1165 sz = sizeof(*event_eth_rx_adapter) *
1166 RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1167 sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1169 mz = rte_memzone_lookup(name);
1171 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1172 RTE_CACHE_LINE_SIZE);
1174 RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1180 event_eth_rx_adapter = mz->addr;
1184 static inline struct rte_event_eth_rx_adapter *
1185 rxa_id_to_adapter(uint8_t id)
1187 return event_eth_rx_adapter ?
1188 event_eth_rx_adapter[id] : NULL;
1192 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1193 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1196 struct rte_eventdev *dev;
1197 struct rte_event_dev_config dev_conf;
1200 struct rte_event_port_conf *port_conf = arg;
1201 struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1203 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1204 dev_conf = dev->data->dev_conf;
1206 started = dev->data->dev_started;
1208 rte_event_dev_stop(dev_id);
1209 port_id = dev_conf.nb_event_ports;
1210 dev_conf.nb_event_ports += 1;
1211 ret = rte_event_dev_configure(dev_id, &dev_conf);
1213 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1216 if (rte_event_dev_start(dev_id))
1222 ret = rte_event_port_setup(dev_id, port_id, port_conf);
1224 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1229 conf->event_port_id = port_id;
1230 conf->max_nb_rx = 128;
1232 ret = rte_event_dev_start(dev_id);
1233 rx_adapter->default_cb_arg = 1;
1238 rxa_epoll_create1(void)
1242 fd = epoll_create1(EPOLL_CLOEXEC);
1243 return fd < 0 ? -errno : fd;
1250 rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
1252 if (rx_adapter->epd != INIT_FD)
1255 rx_adapter->epd = rxa_epoll_create1();
1256 if (rx_adapter->epd < 0) {
1257 int err = rx_adapter->epd;
1258 rx_adapter->epd = INIT_FD;
1259 RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1267 rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1270 char thread_name[RTE_MAX_THREAD_NAME_LEN];
1272 if (rx_adapter->intr_ring)
1275 rx_adapter->intr_ring = rte_ring_create("intr_ring",
1276 RTE_EVENT_ETH_INTR_RING_SIZE,
1277 rte_socket_id(), 0);
1278 if (!rx_adapter->intr_ring)
1281 rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1282 RTE_EVENT_ETH_INTR_RING_SIZE *
1283 sizeof(struct rte_epoll_event),
1284 RTE_CACHE_LINE_SIZE,
1285 rx_adapter->socket_id);
1286 if (!rx_adapter->epoll_events) {
1291 rte_spinlock_init(&rx_adapter->intr_ring_lock);
1293 snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1294 "rx-intr-thread-%d", rx_adapter->id);
1296 err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1297 NULL, rxa_intr_thread, rx_adapter);
1299 rte_thread_setname(rx_adapter->rx_intr_thread, thread_name);
1303 RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1305 rte_ring_free(rx_adapter->intr_ring);
1306 rx_adapter->intr_ring = NULL;
1307 rx_adapter->epoll_events = NULL;
1312 rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1316 err = pthread_cancel(rx_adapter->rx_intr_thread);
1318 RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1321 err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1323 RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1325 rte_free(rx_adapter->epoll_events);
1326 rte_ring_free(rx_adapter->intr_ring);
1327 rx_adapter->intr_ring = NULL;
1328 rx_adapter->epoll_events = NULL;
1333 rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
1337 if (rx_adapter->num_rx_intr == 0)
1340 ret = rxa_destroy_intr_thread(rx_adapter);
1344 close(rx_adapter->epd);
1345 rx_adapter->epd = INIT_FD;
1351 rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1352 struct eth_device_info *dev_info,
1353 uint16_t rx_queue_id)
1356 uint16_t eth_dev_id = dev_info->dev->data->port_id;
1357 int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1359 err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1361 RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1366 err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1371 RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1374 dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1376 dev_info->shared_intr_enabled = 0;
1381 rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1382 struct eth_device_info *dev_info,
1389 if (dev_info->nb_rx_intr == 0)
1393 if (rx_queue_id == -1) {
1394 s = dev_info->nb_shared_intr;
1395 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1399 q = dev_info->intr_queue[i];
1400 sintr = rxa_shared_intr(dev_info, q);
1403 if (!sintr || s == 0) {
1405 err = rxa_disable_intr(rx_adapter, dev_info,
1409 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1414 if (!rxa_intr_queue(dev_info, rx_queue_id))
1416 if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1417 dev_info->nb_shared_intr == 1) {
1418 err = rxa_disable_intr(rx_adapter, dev_info,
1422 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1426 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1427 if (dev_info->intr_queue[i] == rx_queue_id) {
1428 for (; i < dev_info->nb_rx_intr - 1; i++)
1429 dev_info->intr_queue[i] =
1430 dev_info->intr_queue[i + 1];
1440 rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1441 struct eth_device_info *dev_info,
1442 uint16_t rx_queue_id)
1445 uint16_t eth_dev_id = dev_info->dev->data->port_id;
1446 union queue_data qd;
1448 uint16_t *intr_queue;
1449 int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1451 if (rxa_intr_queue(dev_info, rx_queue_id))
1454 intr_queue = dev_info->intr_queue;
1455 if (dev_info->intr_queue == NULL) {
1457 dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1458 dev_info->intr_queue =
1460 rx_adapter->mem_name,
1463 rx_adapter->socket_id);
1464 if (dev_info->intr_queue == NULL)
1468 init_fd = rx_adapter->epd;
1469 err = rxa_init_epd(rx_adapter);
1471 goto err_free_queue;
1473 qd.port = eth_dev_id;
1474 qd.queue = rx_queue_id;
1476 err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1481 RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1482 " Rx Queue %u err %d", rx_queue_id, err);
1486 err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1488 RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1489 " Rx Queue %u err %d", rx_queue_id, err);
1494 err = rxa_create_intr_thread(rx_adapter);
1497 dev_info->shared_intr_enabled = 1;
1499 dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1504 err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1506 RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1507 " Rx Queue %u err %d", rx_queue_id, err);
1509 err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1514 RTE_EDEV_LOG_ERR("Could not delete event for"
1515 " Rx Queue %u err %d", rx_queue_id, err1);
1518 if (init_fd == INIT_FD) {
1519 close(rx_adapter->epd);
1520 rx_adapter->epd = -1;
1523 if (intr_queue == NULL)
1524 rte_free(dev_info->intr_queue);
1530 rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1531 struct eth_device_info *dev_info,
1537 int shared_done = (dev_info->nb_shared_intr > 0);
1539 if (rx_queue_id != -1) {
1540 if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1542 return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1546 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1548 if (rxa_shared_intr(dev_info, i) && shared_done)
1551 err = rxa_config_intr(rx_adapter, dev_info, i);
1553 shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1556 dev_info->shared_intr_enabled = 1;
1565 shared_done = (dev_info->nb_shared_intr > 0);
1566 for (j = 0; j < i; j++) {
1567 if (rxa_intr_queue(dev_info, j))
1569 if (rxa_shared_intr(dev_info, j) && si != j)
1571 err = rxa_disable_intr(rx_adapter, dev_info, j);
1582 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
1585 struct rte_service_spec service;
1586 struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1588 if (rx_adapter->service_inited)
1591 memset(&service, 0, sizeof(service));
1592 snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1593 "rte_event_eth_rx_adapter_%d", id);
1594 service.socket_id = rx_adapter->socket_id;
1595 service.callback = rxa_service_func;
1596 service.callback_userdata = rx_adapter;
1597 /* Service function handles locking for queue add/del updates */
1598 service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1599 ret = rte_service_component_register(&service, &rx_adapter->service_id);
1601 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1606 ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1607 &rx_adapter_conf, rx_adapter->conf_arg);
1609 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1613 rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1614 rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1615 rx_adapter->service_inited = 1;
1616 rx_adapter->epd = INIT_FD;
1620 rte_service_component_unregister(rx_adapter->service_id);
1625 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1626 struct eth_device_info *dev_info,
1627 int32_t rx_queue_id,
1630 struct eth_rx_queue_info *queue_info;
1634 if (dev_info->rx_queue == NULL)
1637 if (rx_queue_id == -1) {
1638 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1639 rxa_update_queue(rx_adapter, dev_info, i, add);
1641 queue_info = &dev_info->rx_queue[rx_queue_id];
1642 enabled = queue_info->queue_enabled;
1644 rx_adapter->nb_queues += !enabled;
1645 dev_info->nb_dev_queues += !enabled;
1647 rx_adapter->nb_queues -= enabled;
1648 dev_info->nb_dev_queues -= enabled;
1650 queue_info->queue_enabled = !!add;
1655 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
1656 struct eth_device_info *dev_info,
1657 int32_t rx_queue_id)
1664 if (rx_adapter->nb_queues == 0)
1667 if (rx_queue_id == -1) {
1668 uint16_t nb_rx_queues;
1671 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1672 for (i = 0; i < nb_rx_queues; i++)
1673 rxa_sw_del(rx_adapter, dev_info, i);
1677 pollq = rxa_polled_queue(dev_info, rx_queue_id);
1678 intrq = rxa_intr_queue(dev_info, rx_queue_id);
1679 sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1680 rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1681 rx_adapter->num_rx_polled -= pollq;
1682 dev_info->nb_rx_poll -= pollq;
1683 rx_adapter->num_rx_intr -= intrq;
1684 dev_info->nb_rx_intr -= intrq;
1685 dev_info->nb_shared_intr -= intrq && sintrq;
1689 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1690 struct eth_device_info *dev_info,
1691 int32_t rx_queue_id,
1692 const struct rte_event_eth_rx_adapter_queue_conf *conf)
1694 struct eth_rx_queue_info *queue_info;
1695 const struct rte_event *ev = &conf->ev;
1699 struct rte_event *qi_ev;
1701 if (rx_queue_id == -1) {
1702 uint16_t nb_rx_queues;
1705 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1706 for (i = 0; i < nb_rx_queues; i++)
1707 rxa_add_queue(rx_adapter, dev_info, i, conf);
1711 pollq = rxa_polled_queue(dev_info, rx_queue_id);
1712 intrq = rxa_intr_queue(dev_info, rx_queue_id);
1713 sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1715 queue_info = &dev_info->rx_queue[rx_queue_id];
1716 queue_info->wt = conf->servicing_weight;
1718 qi_ev = (struct rte_event *)&queue_info->event;
1719 qi_ev->event = ev->event;
1720 qi_ev->op = RTE_EVENT_OP_NEW;
1721 qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
1722 qi_ev->sub_event_type = 0;
1724 if (conf->rx_queue_flags &
1725 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
1726 queue_info->flow_id_mask = ~0;
1730 rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
1731 if (rxa_polled_queue(dev_info, rx_queue_id)) {
1732 rx_adapter->num_rx_polled += !pollq;
1733 dev_info->nb_rx_poll += !pollq;
1734 rx_adapter->num_rx_intr -= intrq;
1735 dev_info->nb_rx_intr -= intrq;
1736 dev_info->nb_shared_intr -= intrq && sintrq;
1739 if (rxa_intr_queue(dev_info, rx_queue_id)) {
1740 rx_adapter->num_rx_polled -= pollq;
1741 dev_info->nb_rx_poll -= pollq;
1742 rx_adapter->num_rx_intr += !intrq;
1743 dev_info->nb_rx_intr += !intrq;
1744 dev_info->nb_shared_intr += !intrq && sintrq;
1745 if (dev_info->nb_shared_intr == 1) {
1746 if (dev_info->multi_intr_cap)
1747 dev_info->next_q_idx =
1748 RTE_MAX_RXTX_INTR_VEC_ID - 1;
1750 dev_info->next_q_idx = 0;
1755 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
1756 uint16_t eth_dev_id,
1758 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
1760 struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
1761 struct rte_event_eth_rx_adapter_queue_conf temp_conf;
1763 struct eth_rx_poll_entry *rx_poll;
1764 struct eth_rx_queue_info *rx_queue;
1766 uint16_t nb_rx_queues;
1767 uint32_t nb_rx_poll, nb_wrr;
1768 uint32_t nb_rx_intr;
1772 if (queue_conf->servicing_weight == 0) {
1773 struct rte_eth_dev_data *data = dev_info->dev->data;
1775 temp_conf = *queue_conf;
1776 if (!data->dev_conf.intr_conf.rxq) {
1777 /* If Rx interrupts are disabled set wt = 1 */
1778 temp_conf.servicing_weight = 1;
1780 queue_conf = &temp_conf;
1783 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1784 rx_queue = dev_info->rx_queue;
1785 wt = queue_conf->servicing_weight;
1787 if (dev_info->rx_queue == NULL) {
1788 dev_info->rx_queue =
1789 rte_zmalloc_socket(rx_adapter->mem_name,
1791 sizeof(struct eth_rx_queue_info), 0,
1792 rx_adapter->socket_id);
1793 if (dev_info->rx_queue == NULL)
1799 rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
1800 queue_conf->servicing_weight,
1801 &nb_rx_poll, &nb_rx_intr, &nb_wrr);
1803 if (dev_info->dev->intr_handle)
1804 dev_info->multi_intr_cap =
1805 rte_intr_cap_multiple(dev_info->dev->intr_handle);
1807 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
1810 goto err_free_rxqueue;
1813 num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
1815 ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
1817 goto err_free_rxqueue;
1819 ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
1821 goto err_free_rxqueue;
1825 if (rx_adapter->num_rx_intr > nb_rx_intr) {
1826 num_intr_vec = rxa_nb_intr_vect(dev_info,
1828 /* interrupt based queues are being converted to
1829 * poll mode queues, delete the interrupt configuration
1832 ret = rxa_del_intr_queue(rx_adapter,
1833 dev_info, rx_queue_id);
1835 goto err_free_rxqueue;
1839 if (nb_rx_intr == 0) {
1840 ret = rxa_free_intr_resources(rx_adapter);
1842 goto err_free_rxqueue;
1848 if (rx_queue_id == -1) {
1849 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1850 dev_info->intr_queue[i] = i;
1852 if (!rxa_intr_queue(dev_info, rx_queue_id))
1853 dev_info->intr_queue[nb_rx_intr - 1] =
1860 rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
1861 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
1863 rte_free(rx_adapter->eth_rx_poll);
1864 rte_free(rx_adapter->wrr_sched);
1866 rx_adapter->eth_rx_poll = rx_poll;
1867 rx_adapter->wrr_sched = rx_wrr;
1868 rx_adapter->wrr_len = nb_wrr;
1869 rx_adapter->num_intr_vec += num_intr_vec;
1873 if (rx_queue == NULL) {
1874 rte_free(dev_info->rx_queue);
1875 dev_info->rx_queue = NULL;
1885 rxa_ctrl(uint8_t id, int start)
1887 struct rte_event_eth_rx_adapter *rx_adapter;
1888 struct rte_eventdev *dev;
1889 struct eth_device_info *dev_info;
1891 int use_service = 0;
1894 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1895 rx_adapter = rxa_id_to_adapter(id);
1896 if (rx_adapter == NULL)
1899 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1901 RTE_ETH_FOREACH_DEV(i) {
1902 dev_info = &rx_adapter->eth_devices[i];
1903 /* if start check for num dev queues */
1904 if (start && !dev_info->nb_dev_queues)
1906 /* if stop check if dev has been started */
1907 if (stop && !dev_info->dev_rx_started)
1909 use_service |= !dev_info->internal_event_port;
1910 dev_info->dev_rx_started = start;
1911 if (dev_info->internal_event_port == 0)
1913 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
1914 &rte_eth_devices[i]) :
1915 (*dev->dev_ops->eth_rx_adapter_stop)(dev,
1916 &rte_eth_devices[i]);
1920 rte_spinlock_lock(&rx_adapter->rx_lock);
1921 rx_adapter->rxa_started = start;
1922 rte_service_runstate_set(rx_adapter->service_id, start);
1923 rte_spinlock_unlock(&rx_adapter->rx_lock);
1930 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
1931 rte_event_eth_rx_adapter_conf_cb conf_cb,
1934 struct rte_event_eth_rx_adapter *rx_adapter;
1938 char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
1939 const uint8_t default_rss_key[] = {
1940 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
1941 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
1942 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
1943 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
1944 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
1947 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1948 RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1949 if (conf_cb == NULL)
1952 if (event_eth_rx_adapter == NULL) {
1953 ret = rte_event_eth_rx_adapter_init();
1958 rx_adapter = rxa_id_to_adapter(id);
1959 if (rx_adapter != NULL) {
1960 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
1964 socket_id = rte_event_dev_socket_id(dev_id);
1965 snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
1966 "rte_event_eth_rx_adapter_%d",
1969 rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
1970 RTE_CACHE_LINE_SIZE, socket_id);
1971 if (rx_adapter == NULL) {
1972 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
1976 rx_adapter->eventdev_id = dev_id;
1977 rx_adapter->socket_id = socket_id;
1978 rx_adapter->conf_cb = conf_cb;
1979 rx_adapter->conf_arg = conf_arg;
1980 rx_adapter->id = id;
1981 strcpy(rx_adapter->mem_name, mem_name);
1982 rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
1984 sizeof(struct eth_device_info), 0,
1986 rte_convert_rss_key((const uint32_t *)default_rss_key,
1987 (uint32_t *)rx_adapter->rss_key_be,
1988 RTE_DIM(default_rss_key));
1990 if (rx_adapter->eth_devices == NULL) {
1991 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
1992 rte_free(rx_adapter);
1995 rte_spinlock_init(&rx_adapter->rx_lock);
1996 for (i = 0; i < RTE_MAX_ETHPORTS; i++)
1997 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
1999 event_eth_rx_adapter[id] = rx_adapter;
2000 if (conf_cb == rxa_default_conf_cb)
2001 rx_adapter->default_cb_arg = 1;
2002 rte_eventdev_trace_eth_rx_adapter_create(id, dev_id, conf_cb,
2008 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2009 struct rte_event_port_conf *port_config)
2011 struct rte_event_port_conf *pc;
2014 if (port_config == NULL)
2016 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2018 pc = rte_malloc(NULL, sizeof(*pc), 0);
2022 ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2023 rxa_default_conf_cb,
2031 rte_event_eth_rx_adapter_free(uint8_t id)
2033 struct rte_event_eth_rx_adapter *rx_adapter;
2035 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2037 rx_adapter = rxa_id_to_adapter(id);
2038 if (rx_adapter == NULL)
2041 if (rx_adapter->nb_queues) {
2042 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2043 rx_adapter->nb_queues);
2047 if (rx_adapter->default_cb_arg)
2048 rte_free(rx_adapter->conf_arg);
2049 rte_free(rx_adapter->eth_devices);
2050 rte_free(rx_adapter);
2051 event_eth_rx_adapter[id] = NULL;
2053 rte_eventdev_trace_eth_rx_adapter_free(id);
2058 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2059 uint16_t eth_dev_id,
2060 int32_t rx_queue_id,
2061 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2065 struct rte_event_eth_rx_adapter *rx_adapter;
2066 struct rte_eventdev *dev;
2067 struct eth_device_info *dev_info;
2069 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2070 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2072 rx_adapter = rxa_id_to_adapter(id);
2073 if ((rx_adapter == NULL) || (queue_conf == NULL))
2076 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2077 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2081 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2082 "eth port %" PRIu16, id, eth_dev_id);
2086 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2087 && (queue_conf->rx_queue_flags &
2088 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2089 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2090 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2095 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2096 (rx_queue_id != -1)) {
2097 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2098 "event queue, eth port: %" PRIu16 " adapter id: %"
2099 PRIu8, eth_dev_id, id);
2103 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2104 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2105 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2106 (uint16_t)rx_queue_id);
2110 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2112 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2113 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2115 if (dev_info->rx_queue == NULL) {
2116 dev_info->rx_queue =
2117 rte_zmalloc_socket(rx_adapter->mem_name,
2118 dev_info->dev->data->nb_rx_queues *
2119 sizeof(struct eth_rx_queue_info), 0,
2120 rx_adapter->socket_id);
2121 if (dev_info->rx_queue == NULL)
2125 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2126 &rte_eth_devices[eth_dev_id],
2127 rx_queue_id, queue_conf);
2129 dev_info->internal_event_port = 1;
2130 rxa_update_queue(rx_adapter,
2131 &rx_adapter->eth_devices[eth_dev_id],
2136 rte_spinlock_lock(&rx_adapter->rx_lock);
2137 dev_info->internal_event_port = 0;
2138 ret = rxa_init_service(rx_adapter, id);
2140 uint32_t service_id = rx_adapter->service_id;
2141 ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2143 rte_service_component_runstate_set(service_id,
2144 rxa_sw_adapter_queue_count(rx_adapter));
2146 rte_spinlock_unlock(&rx_adapter->rx_lock);
2149 rte_eventdev_trace_eth_rx_adapter_queue_add(id, eth_dev_id,
2150 rx_queue_id, queue_conf, ret);
2158 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2159 int32_t rx_queue_id)
2162 struct rte_eventdev *dev;
2163 struct rte_event_eth_rx_adapter *rx_adapter;
2164 struct eth_device_info *dev_info;
2166 uint32_t nb_rx_poll = 0;
2167 uint32_t nb_wrr = 0;
2168 uint32_t nb_rx_intr;
2169 struct eth_rx_poll_entry *rx_poll = NULL;
2170 uint32_t *rx_wrr = NULL;
2173 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2174 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2176 rx_adapter = rxa_id_to_adapter(id);
2177 if (rx_adapter == NULL)
2180 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2181 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2187 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2188 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2189 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2190 (uint16_t)rx_queue_id);
2194 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2196 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2197 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2199 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2200 &rte_eth_devices[eth_dev_id],
2203 rxa_update_queue(rx_adapter,
2204 &rx_adapter->eth_devices[eth_dev_id],
2207 if (dev_info->nb_dev_queues == 0) {
2208 rte_free(dev_info->rx_queue);
2209 dev_info->rx_queue = NULL;
2213 rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2214 &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2216 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2221 rte_spinlock_lock(&rx_adapter->rx_lock);
2224 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2226 num_intr_vec = rxa_nb_intr_vect(dev_info,
2228 ret = rxa_del_intr_queue(rx_adapter, dev_info,
2234 if (nb_rx_intr == 0) {
2235 ret = rxa_free_intr_resources(rx_adapter);
2240 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2241 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2243 rte_free(rx_adapter->eth_rx_poll);
2244 rte_free(rx_adapter->wrr_sched);
2246 if (nb_rx_intr == 0) {
2247 rte_free(dev_info->intr_queue);
2248 dev_info->intr_queue = NULL;
2251 rx_adapter->eth_rx_poll = rx_poll;
2252 rx_adapter->wrr_sched = rx_wrr;
2253 rx_adapter->wrr_len = nb_wrr;
2254 rx_adapter->num_intr_vec += num_intr_vec;
2256 if (dev_info->nb_dev_queues == 0) {
2257 rte_free(dev_info->rx_queue);
2258 dev_info->rx_queue = NULL;
2261 rte_spinlock_unlock(&rx_adapter->rx_lock);
2268 rte_service_component_runstate_set(rx_adapter->service_id,
2269 rxa_sw_adapter_queue_count(rx_adapter));
2272 rte_eventdev_trace_eth_rx_adapter_queue_del(id, eth_dev_id,
2278 rte_event_eth_rx_adapter_start(uint8_t id)
2280 rte_eventdev_trace_eth_rx_adapter_start(id);
2281 return rxa_ctrl(id, 1);
2285 rte_event_eth_rx_adapter_stop(uint8_t id)
2287 rte_eventdev_trace_eth_rx_adapter_stop(id);
2288 return rxa_ctrl(id, 0);
2292 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2293 struct rte_event_eth_rx_adapter_stats *stats)
2295 struct rte_event_eth_rx_adapter *rx_adapter;
2296 struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2297 struct rte_event_eth_rx_adapter_stats dev_stats;
2298 struct rte_eventdev *dev;
2299 struct eth_device_info *dev_info;
2303 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2305 rx_adapter = rxa_id_to_adapter(id);
2306 if (rx_adapter == NULL || stats == NULL)
2309 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2310 memset(stats, 0, sizeof(*stats));
2311 RTE_ETH_FOREACH_DEV(i) {
2312 dev_info = &rx_adapter->eth_devices[i];
2313 if (dev_info->internal_event_port == 0 ||
2314 dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2316 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2317 &rte_eth_devices[i],
2321 dev_stats_sum.rx_packets += dev_stats.rx_packets;
2322 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2325 if (rx_adapter->service_inited)
2326 *stats = rx_adapter->stats;
2328 stats->rx_packets += dev_stats_sum.rx_packets;
2329 stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2334 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
2336 struct rte_event_eth_rx_adapter *rx_adapter;
2337 struct rte_eventdev *dev;
2338 struct eth_device_info *dev_info;
2341 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2343 rx_adapter = rxa_id_to_adapter(id);
2344 if (rx_adapter == NULL)
2347 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2348 RTE_ETH_FOREACH_DEV(i) {
2349 dev_info = &rx_adapter->eth_devices[i];
2350 if (dev_info->internal_event_port == 0 ||
2351 dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
2353 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
2354 &rte_eth_devices[i]);
2357 memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
2362 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
2364 struct rte_event_eth_rx_adapter *rx_adapter;
2366 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2368 rx_adapter = rxa_id_to_adapter(id);
2369 if (rx_adapter == NULL || service_id == NULL)
2372 if (rx_adapter->service_inited)
2373 *service_id = rx_adapter->service_id;
2375 return rx_adapter->service_inited ? 0 : -ESRCH;
2379 rte_event_eth_rx_adapter_cb_register(uint8_t id,
2380 uint16_t eth_dev_id,
2381 rte_event_eth_rx_adapter_cb_fn cb_fn,
2384 struct rte_event_eth_rx_adapter *rx_adapter;
2385 struct eth_device_info *dev_info;
2389 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2390 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2392 rx_adapter = rxa_id_to_adapter(id);
2393 if (rx_adapter == NULL)
2396 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2397 if (dev_info->rx_queue == NULL)
2400 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2404 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2405 "eth port %" PRIu16, id, eth_dev_id);
2409 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2410 RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
2411 PRIu16, eth_dev_id);
2415 rte_spinlock_lock(&rx_adapter->rx_lock);
2416 dev_info->cb_fn = cb_fn;
2417 dev_info->cb_arg = cb_arg;
2418 rte_spinlock_unlock(&rx_adapter->rx_lock);