1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2017 Intel Corporation.
10 #include <rte_cycles.h>
11 #include <rte_common.h>
13 #include <rte_errno.h>
14 #include <rte_ethdev.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
21 #include "rte_eventdev.h"
22 #include "rte_eventdev_pmd.h"
23 #include "rte_event_eth_rx_adapter.h"
26 #define BLOCK_CNT_THRESHOLD 10
27 #define ETH_EVENT_BUFFER_SIZE (4*BATCH_SIZE)
29 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
30 #define ETH_RX_ADAPTER_MEM_NAME_LEN 32
32 #define RSS_KEY_SIZE 40
33 /* value written to intr thread pipe to signal thread exit */
34 #define ETH_BRIDGE_INTR_THREAD_EXIT 1
35 /* Sentinel value to detect initialized file handle */
39 * Used to store port and queue ID of interrupting Rx queue
51 * There is an instance of this struct per polled Rx queue added to the
54 struct eth_rx_poll_entry {
55 /* Eth port to poll */
57 /* Eth rx queue to poll */
61 /* Instance per adapter */
62 struct rte_eth_event_enqueue_buffer {
63 /* Count of events in this buffer */
65 /* Array of events in this buffer */
66 struct rte_event events[ETH_EVENT_BUFFER_SIZE];
69 struct rte_event_eth_rx_adapter {
71 uint8_t rss_key_be[RSS_KEY_SIZE];
72 /* Event device identifier */
74 /* Per ethernet device structure */
75 struct eth_device_info *eth_devices;
76 /* Event port identifier */
77 uint8_t event_port_id;
78 /* Lock to serialize config updates with service function */
79 rte_spinlock_t rx_lock;
80 /* Max mbufs processed in any service function invocation */
82 /* Receive queues that need to be polled */
83 struct eth_rx_poll_entry *eth_rx_poll;
84 /* Size of the eth_rx_poll array */
85 uint16_t num_rx_polled;
86 /* Weighted round robin schedule */
88 /* wrr_sched[] size */
90 /* Next entry in wrr[] to begin polling */
92 /* Event burst buffer */
93 struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
94 /* Per adapter stats */
95 struct rte_event_eth_rx_adapter_stats stats;
96 /* Block count, counts up to BLOCK_CNT_THRESHOLD */
97 uint16_t enq_block_count;
99 uint64_t rx_enq_block_start_ts;
100 /* epoll fd used to wait for Rx interrupts */
102 /* Num of interrupt driven interrupt queues */
103 uint32_t num_rx_intr;
104 /* Used to send <dev id, queue id> of interrupting Rx queues from
105 * the interrupt thread to the Rx thread
107 struct rte_ring *intr_ring;
108 /* Rx Queue data (dev id, queue id) for the last non-empty
112 /* queue_data is valid */
114 /* Interrupt ring lock, synchronizes Rx thread
115 * and interrupt thread
117 rte_spinlock_t intr_ring_lock;
118 /* event array passed to rte_poll_wait */
119 struct rte_epoll_event *epoll_events;
120 /* Count of interrupt vectors in use */
121 uint32_t num_intr_vec;
122 /* Thread blocked on Rx interrupts */
123 pthread_t rx_intr_thread;
124 /* Configuration callback for rte_service configuration */
125 rte_event_eth_rx_adapter_conf_cb conf_cb;
126 /* Configuration callback argument */
128 /* Set if default_cb is being used */
130 /* Service initialization state */
131 uint8_t service_inited;
132 /* Total count of Rx queues in adapter */
134 /* Memory allocation name */
135 char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
136 /* Socket identifier cached from eventdev */
138 /* Per adapter EAL service */
140 /* Adapter started flag */
144 } __rte_cache_aligned;
147 struct eth_device_info {
148 struct rte_eth_dev *dev;
149 struct eth_rx_queue_info *rx_queue;
151 rte_event_eth_rx_adapter_cb_fn cb_fn;
152 /* Rx callback argument */
154 /* Set if ethdev->eventdev packet transfer uses a
157 uint8_t internal_event_port;
158 /* Set if the adapter is processing rx queues for
159 * this eth device and packet processing has been
160 * started, allows for the code to know if the PMD
161 * rx_adapter_stop callback needs to be invoked
163 uint8_t dev_rx_started;
164 /* Number of queues added for this device */
165 uint16_t nb_dev_queues;
166 /* Number of poll based queues
167 * If nb_rx_poll > 0, the start callback will
168 * be invoked if not already invoked
171 /* Number of interrupt based queues
172 * If nb_rx_intr > 0, the start callback will
173 * be invoked if not already invoked.
176 /* Number of queues that use the shared interrupt */
177 uint16_t nb_shared_intr;
178 /* sum(wrr(q)) for all queues within the device
179 * useful when deleting all device queues
182 /* Intr based queue index to start polling from, this is used
183 * if the number of shared interrupts is non-zero
186 /* Intr based queue indices */
187 uint16_t *intr_queue;
188 /* device generates per Rx queue interrupt for queue index
189 * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
192 /* shared interrupt enabled */
193 int shared_intr_enabled;
197 struct eth_rx_queue_info {
198 int queue_enabled; /* True if added */
200 uint16_t wt; /* Polling weight */
201 uint8_t event_queue_id; /* Event queue to enqueue packets to */
202 uint8_t sched_type; /* Sched type for events */
203 uint8_t priority; /* Event priority */
204 uint32_t flow_id; /* App provided flow identifier */
205 uint32_t flow_id_mask; /* Set to ~0 if app provides flow id else 0 */
208 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
211 rxa_validate_id(uint8_t id)
213 return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
216 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
217 if (!rxa_validate_id(id)) { \
218 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
224 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
226 return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
229 /* Greatest common divisor */
230 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
234 return r ? rxa_gcd_u16(b, r) : b;
237 /* Returns the next queue in the polling sequence
239 * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
242 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
243 unsigned int n, int *cw,
244 struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
245 uint16_t gcd, int prev)
261 q = eth_rx_poll[i].eth_rx_qid;
262 d = eth_rx_poll[i].eth_dev_id;
263 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
271 rxa_shared_intr(struct eth_device_info *dev_info,
276 if (dev_info->dev->intr_handle == NULL)
279 multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
280 return !multi_intr_cap ||
281 rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
285 rxa_intr_queue(struct eth_device_info *dev_info,
288 struct eth_rx_queue_info *queue_info;
290 queue_info = &dev_info->rx_queue[rx_queue_id];
291 return dev_info->rx_queue &&
292 !dev_info->internal_event_port &&
293 queue_info->queue_enabled && queue_info->wt == 0;
297 rxa_polled_queue(struct eth_device_info *dev_info,
300 struct eth_rx_queue_info *queue_info;
302 queue_info = &dev_info->rx_queue[rx_queue_id];
303 return !dev_info->internal_event_port &&
304 dev_info->rx_queue &&
305 queue_info->queue_enabled && queue_info->wt != 0;
308 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
310 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
316 nbq = dev_info->dev->data->nb_rx_queues;
317 n = 0; /* non shared count */
318 s = 0; /* shared count */
320 if (rx_queue_id == -1) {
321 for (i = 0; i < nbq; i++) {
322 if (!rxa_shared_intr(dev_info, i))
323 n += add ? !rxa_intr_queue(dev_info, i) :
324 rxa_intr_queue(dev_info, i);
326 s += add ? !rxa_intr_queue(dev_info, i) :
327 rxa_intr_queue(dev_info, i);
331 if ((add && dev_info->nb_shared_intr == 0) ||
332 (!add && dev_info->nb_shared_intr))
336 if (!rxa_shared_intr(dev_info, rx_queue_id))
337 n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
338 rxa_intr_queue(dev_info, rx_queue_id);
340 n = add ? !dev_info->nb_shared_intr :
341 dev_info->nb_shared_intr == 1;
347 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
350 rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
351 struct eth_device_info *dev_info,
353 uint32_t *nb_rx_intr)
357 if (rx_queue_id == -1)
358 intr_diff = dev_info->nb_rx_intr;
360 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
362 *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
365 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
366 * interrupt queues could currently be poll mode Rx queues
369 rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
370 struct eth_device_info *dev_info,
372 uint32_t *nb_rx_poll,
373 uint32_t *nb_rx_intr,
378 uint32_t wrr_len_diff;
380 if (rx_queue_id == -1) {
381 intr_diff = dev_info->dev->data->nb_rx_queues -
382 dev_info->nb_rx_intr;
383 poll_diff = dev_info->nb_rx_poll;
384 wrr_len_diff = dev_info->wrr_len;
386 intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
387 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
388 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
392 *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
393 *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
394 *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
397 /* Calculate size of the eth_rx_poll and wrr_sched arrays
398 * after deleting poll mode rx queues
401 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
402 struct eth_device_info *dev_info,
404 uint32_t *nb_rx_poll,
408 uint32_t wrr_len_diff;
410 if (rx_queue_id == -1) {
411 poll_diff = dev_info->nb_rx_poll;
412 wrr_len_diff = dev_info->wrr_len;
414 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
415 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
419 *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
420 *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
423 /* Calculate nb_rx_* after adding poll mode rx queues
426 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
427 struct eth_device_info *dev_info,
430 uint32_t *nb_rx_poll,
431 uint32_t *nb_rx_intr,
436 uint32_t wrr_len_diff;
438 if (rx_queue_id == -1) {
439 intr_diff = dev_info->nb_rx_intr;
440 poll_diff = dev_info->dev->data->nb_rx_queues -
441 dev_info->nb_rx_poll;
442 wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
445 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
446 poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
447 wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
448 wt - dev_info->rx_queue[rx_queue_id].wt :
452 *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
453 *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
454 *nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
457 /* Calculate nb_rx_* after adding rx_queue_id */
459 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
460 struct eth_device_info *dev_info,
463 uint32_t *nb_rx_poll,
464 uint32_t *nb_rx_intr,
468 rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
469 wt, nb_rx_poll, nb_rx_intr, nb_wrr);
471 rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
472 nb_rx_poll, nb_rx_intr, nb_wrr);
475 /* Calculate nb_rx_* after deleting rx_queue_id */
477 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
478 struct eth_device_info *dev_info,
480 uint32_t *nb_rx_poll,
481 uint32_t *nb_rx_intr,
484 rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
486 rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
491 * Allocate the rx_poll array
493 static struct eth_rx_poll_entry *
494 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
495 uint32_t num_rx_polled)
499 len = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
500 RTE_CACHE_LINE_SIZE);
501 return rte_zmalloc_socket(rx_adapter->mem_name,
504 rx_adapter->socket_id);
508 * Allocate the WRR array
511 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
515 len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
516 RTE_CACHE_LINE_SIZE);
517 return rte_zmalloc_socket(rx_adapter->mem_name,
520 rx_adapter->socket_id);
524 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
527 struct eth_rx_poll_entry **rx_poll,
528 uint32_t **wrr_sched)
537 *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
538 if (*rx_poll == NULL) {
543 *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
544 if (*wrr_sched == NULL) {
551 /* Precalculate WRR polling sequence for all queues in rx_adapter */
553 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
554 struct eth_rx_poll_entry *rx_poll,
563 /* Initialize variables for calculation of wrr schedule */
564 uint16_t max_wrr_pos = 0;
565 unsigned int poll_q = 0;
572 /* Generate array of all queues to poll, the size of this
575 RTE_ETH_FOREACH_DEV(d) {
576 uint16_t nb_rx_queues;
577 struct eth_device_info *dev_info =
578 &rx_adapter->eth_devices[d];
579 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
580 if (dev_info->rx_queue == NULL)
582 if (dev_info->internal_event_port)
584 dev_info->wrr_len = 0;
585 for (q = 0; q < nb_rx_queues; q++) {
586 struct eth_rx_queue_info *queue_info =
587 &dev_info->rx_queue[q];
590 if (!rxa_polled_queue(dev_info, q))
593 rx_poll[poll_q].eth_dev_id = d;
594 rx_poll[poll_q].eth_rx_qid = q;
596 dev_info->wrr_len += wt;
597 max_wt = RTE_MAX(max_wt, wt);
598 gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
603 /* Generate polling sequence based on weights */
606 for (i = 0; i < max_wrr_pos; i++) {
607 rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
608 rx_poll, max_wt, gcd, prev);
614 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
615 struct rte_ipv6_hdr **ipv6_hdr)
617 struct rte_ether_hdr *eth_hdr =
618 rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
619 struct rte_vlan_hdr *vlan_hdr;
624 switch (eth_hdr->ether_type) {
625 case RTE_BE16(RTE_ETHER_TYPE_IPV4):
626 *ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
629 case RTE_BE16(RTE_ETHER_TYPE_IPV6):
630 *ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
633 case RTE_BE16(RTE_ETHER_TYPE_VLAN):
634 vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
635 switch (vlan_hdr->eth_proto) {
636 case RTE_BE16(RTE_ETHER_TYPE_IPV4):
637 *ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
639 case RTE_BE16(RTE_ETHER_TYPE_IPV6):
640 *ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
652 /* Calculate RSS hash for IPv4/6 */
653 static inline uint32_t
654 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
658 struct rte_ipv4_tuple ipv4_tuple;
659 struct rte_ipv6_tuple ipv6_tuple;
660 struct rte_ipv4_hdr *ipv4_hdr;
661 struct rte_ipv6_hdr *ipv6_hdr;
663 rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
666 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
667 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
669 input_len = RTE_THASH_V4_L3_LEN;
670 } else if (ipv6_hdr) {
671 rte_thash_load_v6_addrs(ipv6_hdr,
672 (union rte_thash_tuple *)&ipv6_tuple);
674 input_len = RTE_THASH_V6_L3_LEN;
678 return rte_softrss_be(tuple, input_len, rss_key_be);
682 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
684 return !!rx_adapter->enq_block_count;
688 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
690 if (rx_adapter->rx_enq_block_start_ts)
693 rx_adapter->enq_block_count++;
694 if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
697 rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
701 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
702 struct rte_event_eth_rx_adapter_stats *stats)
704 if (unlikely(!stats->rx_enq_start_ts))
705 stats->rx_enq_start_ts = rte_get_tsc_cycles();
707 if (likely(!rxa_enq_blocked(rx_adapter)))
710 rx_adapter->enq_block_count = 0;
711 if (rx_adapter->rx_enq_block_start_ts) {
712 stats->rx_enq_end_ts = rte_get_tsc_cycles();
713 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
714 rx_adapter->rx_enq_block_start_ts;
715 rx_adapter->rx_enq_block_start_ts = 0;
719 /* Enqueue buffered events to event device */
720 static inline uint16_t
721 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
723 struct rte_eth_event_enqueue_buffer *buf =
724 &rx_adapter->event_enqueue_buffer;
725 struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
727 uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
728 rx_adapter->event_port_id,
731 if (n != buf->count) {
734 (buf->count - n) * sizeof(struct rte_event));
735 stats->rx_enq_retry++;
738 n ? rxa_enq_block_end_ts(rx_adapter, stats) :
739 rxa_enq_block_start_ts(rx_adapter);
742 stats->rx_enq_count += n;
748 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
750 uint16_t rx_queue_id,
751 struct rte_mbuf **mbufs,
755 struct eth_device_info *dev_info =
756 &rx_adapter->eth_devices[eth_dev_id];
757 struct eth_rx_queue_info *eth_rx_queue_info =
758 &dev_info->rx_queue[rx_queue_id];
759 struct rte_eth_event_enqueue_buffer *buf =
760 &rx_adapter->event_enqueue_buffer;
761 struct rte_event *ev = &buf->events[buf->count];
762 int32_t qid = eth_rx_queue_info->event_queue_id;
763 uint8_t sched_type = eth_rx_queue_info->sched_type;
764 uint8_t priority = eth_rx_queue_info->priority;
766 struct rte_mbuf *m = mbufs[0];
771 struct rte_mbuf *cb_mbufs[BATCH_SIZE];
774 /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
775 rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
776 do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
778 if ((m->ol_flags & PKT_RX_TIMESTAMP) == 0) {
779 ts = rte_get_tsc_cycles();
780 for (i = 0; i < num; i++) {
784 m->ol_flags |= PKT_RX_TIMESTAMP;
789 nb_cb = dev_info->cb_fn ? dev_info->cb_fn(eth_dev_id, rx_queue_id,
790 ETH_EVENT_BUFFER_SIZE,
801 for (i = 0; i < num; i++) {
805 rxa_do_softrss(m, rx_adapter->rss_key_be) :
808 eth_rx_queue_info->flow_id &
809 eth_rx_queue_info->flow_id_mask;
810 flow_id |= rss & ~eth_rx_queue_info->flow_id_mask;
811 ev->flow_id = flow_id;
812 ev->op = RTE_EVENT_OP_NEW;
813 ev->sched_type = sched_type;
815 ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
816 ev->sub_event_type = 0;
817 ev->priority = priority;
825 /* Enqueue packets from <port, q> to event buffer */
826 static inline uint32_t
827 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
834 struct rte_mbuf *mbufs[BATCH_SIZE];
835 struct rte_eth_event_enqueue_buffer *buf =
836 &rx_adapter->event_enqueue_buffer;
837 struct rte_event_eth_rx_adapter_stats *stats =
844 /* Don't do a batch dequeue from the rx queue if there isn't
845 * enough space in the enqueue buffer.
847 while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
848 if (buf->count >= BATCH_SIZE)
849 rxa_flush_event_buffer(rx_adapter);
851 stats->rx_poll_count++;
852 n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
858 rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
860 if (rx_count + nb_rx > max_rx)
865 rxa_flush_event_buffer(rx_adapter);
871 rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
878 struct eth_device_info *dev_info;
879 struct eth_rx_queue_info *queue_info;
886 dev_info = &rx_adapter->eth_devices[port_id];
887 queue_info = &dev_info->rx_queue[queue];
888 rte_spinlock_lock(&rx_adapter->intr_ring_lock);
889 if (rxa_shared_intr(dev_info, queue))
890 intr_enabled = &dev_info->shared_intr_enabled;
892 intr_enabled = &queue_info->intr_enabled;
896 err = rte_ring_enqueue(rx_adapter->intr_ring, data);
897 /* Entry should always be available.
898 * The ring size equals the maximum number of interrupt
899 * vectors supported (an interrupt vector is shared in
900 * case of shared interrupts)
903 RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
904 " to ring: %s", strerror(-err));
906 rte_eth_dev_rx_intr_disable(port_id, queue);
908 rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
912 rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
913 uint32_t num_intr_vec)
915 if (rx_adapter->num_intr_vec + num_intr_vec >
916 RTE_EVENT_ETH_INTR_RING_SIZE) {
917 RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
918 " %d needed %d limit %d", rx_adapter->num_intr_vec,
919 num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
926 /* Delete entries for (dev, queue) from the interrupt ring */
928 rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
929 struct eth_device_info *dev_info,
930 uint16_t rx_queue_id)
935 rte_spinlock_lock(&rx_adapter->intr_ring_lock);
937 n = rte_ring_count(rx_adapter->intr_ring);
938 for (i = 0; i < n; i++) {
939 rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
940 if (!rxa_shared_intr(dev_info, rx_queue_id)) {
941 if (qd.port == dev_info->dev->data->port_id &&
942 qd.queue == rx_queue_id)
945 if (qd.port == dev_info->dev->data->port_id)
948 rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
951 rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
954 /* pthread callback handling interrupt mode receive queues
955 * After receiving an Rx interrupt, it enqueues the port id and queue id of the
956 * interrupting queue to the adapter's ring buffer for interrupt events.
957 * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
958 * the adapter service function.
961 rxa_intr_thread(void *arg)
963 struct rte_event_eth_rx_adapter *rx_adapter = arg;
964 struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
968 n = rte_epoll_wait(rx_adapter->epd, epoll_events,
969 RTE_EVENT_ETH_INTR_RING_SIZE, -1);
971 RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
973 for (i = 0; i < n; i++) {
974 rxa_intr_ring_enqueue(rx_adapter,
975 epoll_events[i].epdata.data);
982 /* Dequeue <port, q> from interrupt ring and enqueue received
985 static inline uint32_t
986 rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
991 struct rte_eth_event_enqueue_buffer *buf;
992 rte_spinlock_t *ring_lock;
993 uint8_t max_done = 0;
995 if (rx_adapter->num_rx_intr == 0)
998 if (rte_ring_count(rx_adapter->intr_ring) == 0
999 && !rx_adapter->qd_valid)
1002 buf = &rx_adapter->event_enqueue_buffer;
1003 ring_lock = &rx_adapter->intr_ring_lock;
1005 if (buf->count >= BATCH_SIZE)
1006 rxa_flush_event_buffer(rx_adapter);
1008 while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
1009 struct eth_device_info *dev_info;
1012 union queue_data qd = rx_adapter->qd;
1015 if (!rx_adapter->qd_valid) {
1016 struct eth_rx_queue_info *queue_info;
1018 rte_spinlock_lock(ring_lock);
1019 err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1021 rte_spinlock_unlock(ring_lock);
1027 rx_adapter->qd = qd;
1028 rx_adapter->qd_valid = 1;
1029 dev_info = &rx_adapter->eth_devices[port];
1030 if (rxa_shared_intr(dev_info, queue))
1031 dev_info->shared_intr_enabled = 1;
1033 queue_info = &dev_info->rx_queue[queue];
1034 queue_info->intr_enabled = 1;
1036 rte_eth_dev_rx_intr_enable(port, queue);
1037 rte_spinlock_unlock(ring_lock);
1042 dev_info = &rx_adapter->eth_devices[port];
1045 if (rxa_shared_intr(dev_info, queue)) {
1049 nb_queues = dev_info->dev->data->nb_rx_queues;
1051 for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1052 uint8_t enq_buffer_full;
1054 if (!rxa_intr_queue(dev_info, i))
1056 n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1057 rx_adapter->max_nb_rx,
1061 enq_buffer_full = !rxq_empty && n == 0;
1062 max_done = nb_rx > rx_adapter->max_nb_rx;
1064 if (enq_buffer_full || max_done) {
1065 dev_info->next_q_idx = i;
1070 rx_adapter->qd_valid = 0;
1072 /* Reinitialize for next interrupt */
1073 dev_info->next_q_idx = dev_info->multi_intr_cap ?
1074 RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1077 n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1078 rx_adapter->max_nb_rx,
1080 rx_adapter->qd_valid = !rxq_empty;
1082 if (nb_rx > rx_adapter->max_nb_rx)
1088 rx_adapter->stats.rx_intr_packets += nb_rx;
1093 * Polls receive queues added to the event adapter and enqueues received
1094 * packets to the event device.
1096 * The receive code enqueues initially to a temporary buffer, the
1097 * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1099 * If there isn't space available in the temporary buffer, packets from the
1100 * Rx queue aren't dequeued from the eth device, this back pressures the
1101 * eth device, in virtual device environments this back pressure is relayed to
1102 * the hypervisor's switching layer where adjustments can be made to deal with
1105 static inline uint32_t
1106 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
1110 struct rte_eth_event_enqueue_buffer *buf;
1114 wrr_pos = rx_adapter->wrr_pos;
1115 max_nb_rx = rx_adapter->max_nb_rx;
1116 buf = &rx_adapter->event_enqueue_buffer;
1118 /* Iterate through a WRR sequence */
1119 for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1120 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1121 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1122 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1124 /* Don't do a batch dequeue from the rx queue if there isn't
1125 * enough space in the enqueue buffer.
1127 if (buf->count >= BATCH_SIZE)
1128 rxa_flush_event_buffer(rx_adapter);
1129 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
1130 rx_adapter->wrr_pos = wrr_pos;
1134 nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1136 if (nb_rx > max_nb_rx) {
1137 rx_adapter->wrr_pos =
1138 (wrr_pos + 1) % rx_adapter->wrr_len;
1142 if (++wrr_pos == rx_adapter->wrr_len)
1149 rxa_service_func(void *args)
1151 struct rte_event_eth_rx_adapter *rx_adapter = args;
1152 struct rte_event_eth_rx_adapter_stats *stats;
1154 if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1156 if (!rx_adapter->rxa_started) {
1157 rte_spinlock_unlock(&rx_adapter->rx_lock);
1161 stats = &rx_adapter->stats;
1162 stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
1163 stats->rx_packets += rxa_poll(rx_adapter);
1164 rte_spinlock_unlock(&rx_adapter->rx_lock);
1169 rte_event_eth_rx_adapter_init(void)
1171 const char *name = "rte_event_eth_rx_adapter_array";
1172 const struct rte_memzone *mz;
1175 sz = sizeof(*event_eth_rx_adapter) *
1176 RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1177 sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1179 mz = rte_memzone_lookup(name);
1181 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1182 RTE_CACHE_LINE_SIZE);
1184 RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1190 event_eth_rx_adapter = mz->addr;
1194 static inline struct rte_event_eth_rx_adapter *
1195 rxa_id_to_adapter(uint8_t id)
1197 return event_eth_rx_adapter ?
1198 event_eth_rx_adapter[id] : NULL;
1202 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1203 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1206 struct rte_eventdev *dev;
1207 struct rte_event_dev_config dev_conf;
1210 struct rte_event_port_conf *port_conf = arg;
1211 struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1213 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1214 dev_conf = dev->data->dev_conf;
1216 started = dev->data->dev_started;
1218 rte_event_dev_stop(dev_id);
1219 port_id = dev_conf.nb_event_ports;
1220 dev_conf.nb_event_ports += 1;
1221 ret = rte_event_dev_configure(dev_id, &dev_conf);
1223 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1226 if (rte_event_dev_start(dev_id))
1232 ret = rte_event_port_setup(dev_id, port_id, port_conf);
1234 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1239 conf->event_port_id = port_id;
1240 conf->max_nb_rx = 128;
1242 ret = rte_event_dev_start(dev_id);
1243 rx_adapter->default_cb_arg = 1;
1248 rxa_epoll_create1(void)
1252 fd = epoll_create1(EPOLL_CLOEXEC);
1253 return fd < 0 ? -errno : fd;
1260 rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
1262 if (rx_adapter->epd != INIT_FD)
1265 rx_adapter->epd = rxa_epoll_create1();
1266 if (rx_adapter->epd < 0) {
1267 int err = rx_adapter->epd;
1268 rx_adapter->epd = INIT_FD;
1269 RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1277 rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1280 char thread_name[RTE_MAX_THREAD_NAME_LEN];
1282 if (rx_adapter->intr_ring)
1285 rx_adapter->intr_ring = rte_ring_create("intr_ring",
1286 RTE_EVENT_ETH_INTR_RING_SIZE,
1287 rte_socket_id(), 0);
1288 if (!rx_adapter->intr_ring)
1291 rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1292 RTE_EVENT_ETH_INTR_RING_SIZE *
1293 sizeof(struct rte_epoll_event),
1294 RTE_CACHE_LINE_SIZE,
1295 rx_adapter->socket_id);
1296 if (!rx_adapter->epoll_events) {
1301 rte_spinlock_init(&rx_adapter->intr_ring_lock);
1303 snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1304 "rx-intr-thread-%d", rx_adapter->id);
1306 err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1307 NULL, rxa_intr_thread, rx_adapter);
1309 rte_thread_setname(rx_adapter->rx_intr_thread, thread_name);
1313 RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1315 rte_ring_free(rx_adapter->intr_ring);
1316 rx_adapter->intr_ring = NULL;
1317 rx_adapter->epoll_events = NULL;
1322 rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1326 err = pthread_cancel(rx_adapter->rx_intr_thread);
1328 RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1331 err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1333 RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1335 rte_free(rx_adapter->epoll_events);
1336 rte_ring_free(rx_adapter->intr_ring);
1337 rx_adapter->intr_ring = NULL;
1338 rx_adapter->epoll_events = NULL;
1343 rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
1347 if (rx_adapter->num_rx_intr == 0)
1350 ret = rxa_destroy_intr_thread(rx_adapter);
1354 close(rx_adapter->epd);
1355 rx_adapter->epd = INIT_FD;
1361 rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1362 struct eth_device_info *dev_info,
1363 uint16_t rx_queue_id)
1366 uint16_t eth_dev_id = dev_info->dev->data->port_id;
1367 int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1369 err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1371 RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1376 err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1381 RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1384 dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1386 dev_info->shared_intr_enabled = 0;
1391 rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1392 struct eth_device_info *dev_info,
1399 if (dev_info->nb_rx_intr == 0)
1403 if (rx_queue_id == -1) {
1404 s = dev_info->nb_shared_intr;
1405 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1409 q = dev_info->intr_queue[i];
1410 sintr = rxa_shared_intr(dev_info, q);
1413 if (!sintr || s == 0) {
1415 err = rxa_disable_intr(rx_adapter, dev_info,
1419 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1424 if (!rxa_intr_queue(dev_info, rx_queue_id))
1426 if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1427 dev_info->nb_shared_intr == 1) {
1428 err = rxa_disable_intr(rx_adapter, dev_info,
1432 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1436 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1437 if (dev_info->intr_queue[i] == rx_queue_id) {
1438 for (; i < dev_info->nb_rx_intr - 1; i++)
1439 dev_info->intr_queue[i] =
1440 dev_info->intr_queue[i + 1];
1450 rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1451 struct eth_device_info *dev_info,
1452 uint16_t rx_queue_id)
1455 uint16_t eth_dev_id = dev_info->dev->data->port_id;
1456 union queue_data qd;
1458 uint16_t *intr_queue;
1459 int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1461 if (rxa_intr_queue(dev_info, rx_queue_id))
1464 intr_queue = dev_info->intr_queue;
1465 if (dev_info->intr_queue == NULL) {
1467 dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1468 dev_info->intr_queue =
1470 rx_adapter->mem_name,
1473 rx_adapter->socket_id);
1474 if (dev_info->intr_queue == NULL)
1478 init_fd = rx_adapter->epd;
1479 err = rxa_init_epd(rx_adapter);
1481 goto err_free_queue;
1483 qd.port = eth_dev_id;
1484 qd.queue = rx_queue_id;
1486 err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1491 RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1492 " Rx Queue %u err %d", rx_queue_id, err);
1496 err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1498 RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1499 " Rx Queue %u err %d", rx_queue_id, err);
1504 err = rxa_create_intr_thread(rx_adapter);
1507 dev_info->shared_intr_enabled = 1;
1509 dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1514 err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1516 RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1517 " Rx Queue %u err %d", rx_queue_id, err);
1519 err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1524 RTE_EDEV_LOG_ERR("Could not delete event for"
1525 " Rx Queue %u err %d", rx_queue_id, err1);
1528 if (init_fd == INIT_FD) {
1529 close(rx_adapter->epd);
1530 rx_adapter->epd = -1;
1533 if (intr_queue == NULL)
1534 rte_free(dev_info->intr_queue);
1540 rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1541 struct eth_device_info *dev_info,
1547 int shared_done = (dev_info->nb_shared_intr > 0);
1549 if (rx_queue_id != -1) {
1550 if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1552 return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1556 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1558 if (rxa_shared_intr(dev_info, i) && shared_done)
1561 err = rxa_config_intr(rx_adapter, dev_info, i);
1563 shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1566 dev_info->shared_intr_enabled = 1;
1575 shared_done = (dev_info->nb_shared_intr > 0);
1576 for (j = 0; j < i; j++) {
1577 if (rxa_intr_queue(dev_info, j))
1579 if (rxa_shared_intr(dev_info, j) && si != j)
1581 err = rxa_disable_intr(rx_adapter, dev_info, j);
1592 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
1595 struct rte_service_spec service;
1596 struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1598 if (rx_adapter->service_inited)
1601 memset(&service, 0, sizeof(service));
1602 snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1603 "rte_event_eth_rx_adapter_%d", id);
1604 service.socket_id = rx_adapter->socket_id;
1605 service.callback = rxa_service_func;
1606 service.callback_userdata = rx_adapter;
1607 /* Service function handles locking for queue add/del updates */
1608 service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1609 ret = rte_service_component_register(&service, &rx_adapter->service_id);
1611 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1616 ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1617 &rx_adapter_conf, rx_adapter->conf_arg);
1619 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1623 rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1624 rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1625 rx_adapter->service_inited = 1;
1626 rx_adapter->epd = INIT_FD;
1630 rte_service_component_unregister(rx_adapter->service_id);
1635 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1636 struct eth_device_info *dev_info,
1637 int32_t rx_queue_id,
1640 struct eth_rx_queue_info *queue_info;
1644 if (dev_info->rx_queue == NULL)
1647 if (rx_queue_id == -1) {
1648 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1649 rxa_update_queue(rx_adapter, dev_info, i, add);
1651 queue_info = &dev_info->rx_queue[rx_queue_id];
1652 enabled = queue_info->queue_enabled;
1654 rx_adapter->nb_queues += !enabled;
1655 dev_info->nb_dev_queues += !enabled;
1657 rx_adapter->nb_queues -= enabled;
1658 dev_info->nb_dev_queues -= enabled;
1660 queue_info->queue_enabled = !!add;
1665 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
1666 struct eth_device_info *dev_info,
1667 int32_t rx_queue_id)
1674 if (rx_adapter->nb_queues == 0)
1677 if (rx_queue_id == -1) {
1678 uint16_t nb_rx_queues;
1681 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1682 for (i = 0; i < nb_rx_queues; i++)
1683 rxa_sw_del(rx_adapter, dev_info, i);
1687 pollq = rxa_polled_queue(dev_info, rx_queue_id);
1688 intrq = rxa_intr_queue(dev_info, rx_queue_id);
1689 sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1690 rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1691 rx_adapter->num_rx_polled -= pollq;
1692 dev_info->nb_rx_poll -= pollq;
1693 rx_adapter->num_rx_intr -= intrq;
1694 dev_info->nb_rx_intr -= intrq;
1695 dev_info->nb_shared_intr -= intrq && sintrq;
1699 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1700 struct eth_device_info *dev_info,
1701 int32_t rx_queue_id,
1702 const struct rte_event_eth_rx_adapter_queue_conf *conf)
1704 struct eth_rx_queue_info *queue_info;
1705 const struct rte_event *ev = &conf->ev;
1710 if (rx_queue_id == -1) {
1711 uint16_t nb_rx_queues;
1714 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1715 for (i = 0; i < nb_rx_queues; i++)
1716 rxa_add_queue(rx_adapter, dev_info, i, conf);
1720 pollq = rxa_polled_queue(dev_info, rx_queue_id);
1721 intrq = rxa_intr_queue(dev_info, rx_queue_id);
1722 sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1724 queue_info = &dev_info->rx_queue[rx_queue_id];
1725 queue_info->event_queue_id = ev->queue_id;
1726 queue_info->sched_type = ev->sched_type;
1727 queue_info->priority = ev->priority;
1728 queue_info->wt = conf->servicing_weight;
1730 if (conf->rx_queue_flags &
1731 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
1732 queue_info->flow_id = ev->flow_id;
1733 queue_info->flow_id_mask = ~0;
1736 rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
1737 if (rxa_polled_queue(dev_info, rx_queue_id)) {
1738 rx_adapter->num_rx_polled += !pollq;
1739 dev_info->nb_rx_poll += !pollq;
1740 rx_adapter->num_rx_intr -= intrq;
1741 dev_info->nb_rx_intr -= intrq;
1742 dev_info->nb_shared_intr -= intrq && sintrq;
1745 if (rxa_intr_queue(dev_info, rx_queue_id)) {
1746 rx_adapter->num_rx_polled -= pollq;
1747 dev_info->nb_rx_poll -= pollq;
1748 rx_adapter->num_rx_intr += !intrq;
1749 dev_info->nb_rx_intr += !intrq;
1750 dev_info->nb_shared_intr += !intrq && sintrq;
1751 if (dev_info->nb_shared_intr == 1) {
1752 if (dev_info->multi_intr_cap)
1753 dev_info->next_q_idx =
1754 RTE_MAX_RXTX_INTR_VEC_ID - 1;
1756 dev_info->next_q_idx = 0;
1761 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
1762 uint16_t eth_dev_id,
1764 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
1766 struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
1767 struct rte_event_eth_rx_adapter_queue_conf temp_conf;
1769 struct eth_rx_poll_entry *rx_poll;
1770 struct eth_rx_queue_info *rx_queue;
1772 uint16_t nb_rx_queues;
1773 uint32_t nb_rx_poll, nb_wrr;
1774 uint32_t nb_rx_intr;
1778 if (queue_conf->servicing_weight == 0) {
1779 struct rte_eth_dev_data *data = dev_info->dev->data;
1781 temp_conf = *queue_conf;
1782 if (!data->dev_conf.intr_conf.rxq) {
1783 /* If Rx interrupts are disabled set wt = 1 */
1784 temp_conf.servicing_weight = 1;
1786 queue_conf = &temp_conf;
1789 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1790 rx_queue = dev_info->rx_queue;
1791 wt = queue_conf->servicing_weight;
1793 if (dev_info->rx_queue == NULL) {
1794 dev_info->rx_queue =
1795 rte_zmalloc_socket(rx_adapter->mem_name,
1797 sizeof(struct eth_rx_queue_info), 0,
1798 rx_adapter->socket_id);
1799 if (dev_info->rx_queue == NULL)
1805 rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
1806 queue_conf->servicing_weight,
1807 &nb_rx_poll, &nb_rx_intr, &nb_wrr);
1809 if (dev_info->dev->intr_handle)
1810 dev_info->multi_intr_cap =
1811 rte_intr_cap_multiple(dev_info->dev->intr_handle);
1813 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
1816 goto err_free_rxqueue;
1819 num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
1821 ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
1823 goto err_free_rxqueue;
1825 ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
1827 goto err_free_rxqueue;
1831 if (rx_adapter->num_rx_intr > nb_rx_intr) {
1832 num_intr_vec = rxa_nb_intr_vect(dev_info,
1834 /* interrupt based queues are being converted to
1835 * poll mode queues, delete the interrupt configuration
1838 ret = rxa_del_intr_queue(rx_adapter,
1839 dev_info, rx_queue_id);
1841 goto err_free_rxqueue;
1845 if (nb_rx_intr == 0) {
1846 ret = rxa_free_intr_resources(rx_adapter);
1848 goto err_free_rxqueue;
1854 if (rx_queue_id == -1) {
1855 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1856 dev_info->intr_queue[i] = i;
1858 if (!rxa_intr_queue(dev_info, rx_queue_id))
1859 dev_info->intr_queue[nb_rx_intr - 1] =
1866 rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
1867 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
1869 rte_free(rx_adapter->eth_rx_poll);
1870 rte_free(rx_adapter->wrr_sched);
1872 rx_adapter->eth_rx_poll = rx_poll;
1873 rx_adapter->wrr_sched = rx_wrr;
1874 rx_adapter->wrr_len = nb_wrr;
1875 rx_adapter->num_intr_vec += num_intr_vec;
1879 if (rx_queue == NULL) {
1880 rte_free(dev_info->rx_queue);
1881 dev_info->rx_queue = NULL;
1891 rxa_ctrl(uint8_t id, int start)
1893 struct rte_event_eth_rx_adapter *rx_adapter;
1894 struct rte_eventdev *dev;
1895 struct eth_device_info *dev_info;
1897 int use_service = 0;
1900 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1901 rx_adapter = rxa_id_to_adapter(id);
1902 if (rx_adapter == NULL)
1905 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1907 RTE_ETH_FOREACH_DEV(i) {
1908 dev_info = &rx_adapter->eth_devices[i];
1909 /* if start check for num dev queues */
1910 if (start && !dev_info->nb_dev_queues)
1912 /* if stop check if dev has been started */
1913 if (stop && !dev_info->dev_rx_started)
1915 use_service |= !dev_info->internal_event_port;
1916 dev_info->dev_rx_started = start;
1917 if (dev_info->internal_event_port == 0)
1919 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
1920 &rte_eth_devices[i]) :
1921 (*dev->dev_ops->eth_rx_adapter_stop)(dev,
1922 &rte_eth_devices[i]);
1926 rte_spinlock_lock(&rx_adapter->rx_lock);
1927 rx_adapter->rxa_started = start;
1928 rte_service_runstate_set(rx_adapter->service_id, start);
1929 rte_spinlock_unlock(&rx_adapter->rx_lock);
1936 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
1937 rte_event_eth_rx_adapter_conf_cb conf_cb,
1940 struct rte_event_eth_rx_adapter *rx_adapter;
1944 char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
1945 const uint8_t default_rss_key[] = {
1946 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
1947 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
1948 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
1949 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
1950 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
1953 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1954 RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1955 if (conf_cb == NULL)
1958 if (event_eth_rx_adapter == NULL) {
1959 ret = rte_event_eth_rx_adapter_init();
1964 rx_adapter = rxa_id_to_adapter(id);
1965 if (rx_adapter != NULL) {
1966 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
1970 socket_id = rte_event_dev_socket_id(dev_id);
1971 snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
1972 "rte_event_eth_rx_adapter_%d",
1975 rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
1976 RTE_CACHE_LINE_SIZE, socket_id);
1977 if (rx_adapter == NULL) {
1978 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
1982 rx_adapter->eventdev_id = dev_id;
1983 rx_adapter->socket_id = socket_id;
1984 rx_adapter->conf_cb = conf_cb;
1985 rx_adapter->conf_arg = conf_arg;
1986 rx_adapter->id = id;
1987 strcpy(rx_adapter->mem_name, mem_name);
1988 rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
1990 sizeof(struct eth_device_info), 0,
1992 rte_convert_rss_key((const uint32_t *)default_rss_key,
1993 (uint32_t *)rx_adapter->rss_key_be,
1994 RTE_DIM(default_rss_key));
1996 if (rx_adapter->eth_devices == NULL) {
1997 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
1998 rte_free(rx_adapter);
2001 rte_spinlock_init(&rx_adapter->rx_lock);
2002 for (i = 0; i < RTE_MAX_ETHPORTS; i++)
2003 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
2005 event_eth_rx_adapter[id] = rx_adapter;
2006 if (conf_cb == rxa_default_conf_cb)
2007 rx_adapter->default_cb_arg = 1;
2012 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2013 struct rte_event_port_conf *port_config)
2015 struct rte_event_port_conf *pc;
2018 if (port_config == NULL)
2020 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2022 pc = rte_malloc(NULL, sizeof(*pc), 0);
2026 ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2027 rxa_default_conf_cb,
2035 rte_event_eth_rx_adapter_free(uint8_t id)
2037 struct rte_event_eth_rx_adapter *rx_adapter;
2039 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2041 rx_adapter = rxa_id_to_adapter(id);
2042 if (rx_adapter == NULL)
2045 if (rx_adapter->nb_queues) {
2046 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2047 rx_adapter->nb_queues);
2051 if (rx_adapter->default_cb_arg)
2052 rte_free(rx_adapter->conf_arg);
2053 rte_free(rx_adapter->eth_devices);
2054 rte_free(rx_adapter);
2055 event_eth_rx_adapter[id] = NULL;
2061 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2062 uint16_t eth_dev_id,
2063 int32_t rx_queue_id,
2064 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2068 struct rte_event_eth_rx_adapter *rx_adapter;
2069 struct rte_eventdev *dev;
2070 struct eth_device_info *dev_info;
2072 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2073 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2075 rx_adapter = rxa_id_to_adapter(id);
2076 if ((rx_adapter == NULL) || (queue_conf == NULL))
2079 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2080 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2084 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2085 "eth port %" PRIu16, id, eth_dev_id);
2089 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2090 && (queue_conf->rx_queue_flags &
2091 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2092 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2093 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2098 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2099 (rx_queue_id != -1)) {
2100 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2101 "event queue, eth port: %" PRIu16 " adapter id: %"
2102 PRIu8, eth_dev_id, id);
2106 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2107 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2108 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2109 (uint16_t)rx_queue_id);
2113 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2115 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2116 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2118 if (dev_info->rx_queue == NULL) {
2119 dev_info->rx_queue =
2120 rte_zmalloc_socket(rx_adapter->mem_name,
2121 dev_info->dev->data->nb_rx_queues *
2122 sizeof(struct eth_rx_queue_info), 0,
2123 rx_adapter->socket_id);
2124 if (dev_info->rx_queue == NULL)
2128 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2129 &rte_eth_devices[eth_dev_id],
2130 rx_queue_id, queue_conf);
2132 dev_info->internal_event_port = 1;
2133 rxa_update_queue(rx_adapter,
2134 &rx_adapter->eth_devices[eth_dev_id],
2139 rte_spinlock_lock(&rx_adapter->rx_lock);
2140 dev_info->internal_event_port = 0;
2141 ret = rxa_init_service(rx_adapter, id);
2143 uint32_t service_id = rx_adapter->service_id;
2144 ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2146 rte_service_component_runstate_set(service_id,
2147 rxa_sw_adapter_queue_count(rx_adapter));
2149 rte_spinlock_unlock(&rx_adapter->rx_lock);
2159 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2160 int32_t rx_queue_id)
2163 struct rte_eventdev *dev;
2164 struct rte_event_eth_rx_adapter *rx_adapter;
2165 struct eth_device_info *dev_info;
2167 uint32_t nb_rx_poll = 0;
2168 uint32_t nb_wrr = 0;
2169 uint32_t nb_rx_intr;
2170 struct eth_rx_poll_entry *rx_poll = NULL;
2171 uint32_t *rx_wrr = NULL;
2174 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2175 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2177 rx_adapter = rxa_id_to_adapter(id);
2178 if (rx_adapter == NULL)
2181 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2182 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2188 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2189 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2190 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2191 (uint16_t)rx_queue_id);
2195 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2197 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2198 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2200 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2201 &rte_eth_devices[eth_dev_id],
2204 rxa_update_queue(rx_adapter,
2205 &rx_adapter->eth_devices[eth_dev_id],
2208 if (dev_info->nb_dev_queues == 0) {
2209 rte_free(dev_info->rx_queue);
2210 dev_info->rx_queue = NULL;
2214 rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2215 &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2217 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2222 rte_spinlock_lock(&rx_adapter->rx_lock);
2225 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2227 num_intr_vec = rxa_nb_intr_vect(dev_info,
2229 ret = rxa_del_intr_queue(rx_adapter, dev_info,
2235 if (nb_rx_intr == 0) {
2236 ret = rxa_free_intr_resources(rx_adapter);
2241 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2242 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2244 rte_free(rx_adapter->eth_rx_poll);
2245 rte_free(rx_adapter->wrr_sched);
2247 if (nb_rx_intr == 0) {
2248 rte_free(dev_info->intr_queue);
2249 dev_info->intr_queue = NULL;
2252 rx_adapter->eth_rx_poll = rx_poll;
2253 rx_adapter->wrr_sched = rx_wrr;
2254 rx_adapter->wrr_len = nb_wrr;
2255 rx_adapter->num_intr_vec += num_intr_vec;
2257 if (dev_info->nb_dev_queues == 0) {
2258 rte_free(dev_info->rx_queue);
2259 dev_info->rx_queue = NULL;
2262 rte_spinlock_unlock(&rx_adapter->rx_lock);
2269 rte_service_component_runstate_set(rx_adapter->service_id,
2270 rxa_sw_adapter_queue_count(rx_adapter));
2277 rte_event_eth_rx_adapter_start(uint8_t id)
2279 return rxa_ctrl(id, 1);
2283 rte_event_eth_rx_adapter_stop(uint8_t id)
2285 return rxa_ctrl(id, 0);
2289 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2290 struct rte_event_eth_rx_adapter_stats *stats)
2292 struct rte_event_eth_rx_adapter *rx_adapter;
2293 struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2294 struct rte_event_eth_rx_adapter_stats dev_stats;
2295 struct rte_eventdev *dev;
2296 struct eth_device_info *dev_info;
2300 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2302 rx_adapter = rxa_id_to_adapter(id);
2303 if (rx_adapter == NULL || stats == NULL)
2306 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2307 memset(stats, 0, sizeof(*stats));
2308 RTE_ETH_FOREACH_DEV(i) {
2309 dev_info = &rx_adapter->eth_devices[i];
2310 if (dev_info->internal_event_port == 0 ||
2311 dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2313 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2314 &rte_eth_devices[i],
2318 dev_stats_sum.rx_packets += dev_stats.rx_packets;
2319 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2322 if (rx_adapter->service_inited)
2323 *stats = rx_adapter->stats;
2325 stats->rx_packets += dev_stats_sum.rx_packets;
2326 stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2331 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
2333 struct rte_event_eth_rx_adapter *rx_adapter;
2334 struct rte_eventdev *dev;
2335 struct eth_device_info *dev_info;
2338 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2340 rx_adapter = rxa_id_to_adapter(id);
2341 if (rx_adapter == NULL)
2344 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2345 RTE_ETH_FOREACH_DEV(i) {
2346 dev_info = &rx_adapter->eth_devices[i];
2347 if (dev_info->internal_event_port == 0 ||
2348 dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
2350 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
2351 &rte_eth_devices[i]);
2354 memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
2359 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
2361 struct rte_event_eth_rx_adapter *rx_adapter;
2363 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2365 rx_adapter = rxa_id_to_adapter(id);
2366 if (rx_adapter == NULL || service_id == NULL)
2369 if (rx_adapter->service_inited)
2370 *service_id = rx_adapter->service_id;
2372 return rx_adapter->service_inited ? 0 : -ESRCH;
2376 rte_event_eth_rx_adapter_cb_register(uint8_t id,
2377 uint16_t eth_dev_id,
2378 rte_event_eth_rx_adapter_cb_fn cb_fn,
2381 struct rte_event_eth_rx_adapter *rx_adapter;
2382 struct eth_device_info *dev_info;
2386 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2387 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2389 rx_adapter = rxa_id_to_adapter(id);
2390 if (rx_adapter == NULL)
2393 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2394 if (dev_info->rx_queue == NULL)
2397 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2401 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2402 "eth port %" PRIu16, id, eth_dev_id);
2406 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2407 RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
2408 PRIu16, eth_dev_id);
2412 rte_spinlock_lock(&rx_adapter->rx_lock);
2413 dev_info->cb_fn = cb_fn;
2414 dev_info->cb_arg = cb_arg;
2415 rte_spinlock_unlock(&rx_adapter->rx_lock);