1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2017 Intel Corporation.
10 #include <rte_cycles.h>
11 #include <rte_common.h>
13 #include <rte_errno.h>
14 #include <rte_ethdev.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
21 #include "rte_eventdev.h"
22 #include "rte_eventdev_pmd.h"
23 #include "rte_event_eth_rx_adapter.h"
26 #define BLOCK_CNT_THRESHOLD 10
27 #define ETH_EVENT_BUFFER_SIZE (4*BATCH_SIZE)
29 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
30 #define ETH_RX_ADAPTER_MEM_NAME_LEN 32
32 #define RSS_KEY_SIZE 40
33 /* value written to intr thread pipe to signal thread exit */
34 #define ETH_BRIDGE_INTR_THREAD_EXIT 1
35 /* Sentinel value to detect initialized file handle */
39 * Used to store port and queue ID of interrupting Rx queue
51 * There is an instance of this struct per polled Rx queue added to the
54 struct eth_rx_poll_entry {
55 /* Eth port to poll */
57 /* Eth rx queue to poll */
61 /* Instance per adapter */
62 struct rte_eth_event_enqueue_buffer {
63 /* Count of events in this buffer */
65 /* Array of events in this buffer */
66 struct rte_event events[ETH_EVENT_BUFFER_SIZE];
69 struct rte_event_eth_rx_adapter {
71 uint8_t rss_key_be[RSS_KEY_SIZE];
72 /* Event device identifier */
74 /* Per ethernet device structure */
75 struct eth_device_info *eth_devices;
76 /* Event port identifier */
77 uint8_t event_port_id;
78 /* Lock to serialize config updates with service function */
79 rte_spinlock_t rx_lock;
80 /* Max mbufs processed in any service function invocation */
82 /* Receive queues that need to be polled */
83 struct eth_rx_poll_entry *eth_rx_poll;
84 /* Size of the eth_rx_poll array */
85 uint16_t num_rx_polled;
86 /* Weighted round robin schedule */
88 /* wrr_sched[] size */
90 /* Next entry in wrr[] to begin polling */
92 /* Event burst buffer */
93 struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
94 /* Per adapter stats */
95 struct rte_event_eth_rx_adapter_stats stats;
96 /* Block count, counts up to BLOCK_CNT_THRESHOLD */
97 uint16_t enq_block_count;
99 uint64_t rx_enq_block_start_ts;
100 /* epoll fd used to wait for Rx interrupts */
102 /* Num of interrupt driven interrupt queues */
103 uint32_t num_rx_intr;
104 /* Used to send <dev id, queue id> of interrupting Rx queues from
105 * the interrupt thread to the Rx thread
107 struct rte_ring *intr_ring;
108 /* Rx Queue data (dev id, queue id) for the last non-empty
112 /* queue_data is valid */
114 /* Interrupt ring lock, synchronizes Rx thread
115 * and interrupt thread
117 rte_spinlock_t intr_ring_lock;
118 /* event array passed to rte_poll_wait */
119 struct rte_epoll_event *epoll_events;
120 /* Count of interrupt vectors in use */
121 uint32_t num_intr_vec;
122 /* Thread blocked on Rx interrupts */
123 pthread_t rx_intr_thread;
124 /* Configuration callback for rte_service configuration */
125 rte_event_eth_rx_adapter_conf_cb conf_cb;
126 /* Configuration callback argument */
128 /* Set if default_cb is being used */
130 /* Service initialization state */
131 uint8_t service_inited;
132 /* Total count of Rx queues in adapter */
134 /* Memory allocation name */
135 char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
136 /* Socket identifier cached from eventdev */
138 /* Per adapter EAL service */
140 /* Adapter started flag */
144 } __rte_cache_aligned;
147 struct eth_device_info {
148 struct rte_eth_dev *dev;
149 struct eth_rx_queue_info *rx_queue;
151 rte_event_eth_rx_adapter_cb_fn cb_fn;
152 /* Rx callback argument */
154 /* Set if ethdev->eventdev packet transfer uses a
157 uint8_t internal_event_port;
158 /* Set if the adapter is processing rx queues for
159 * this eth device and packet processing has been
160 * started, allows for the code to know if the PMD
161 * rx_adapter_stop callback needs to be invoked
163 uint8_t dev_rx_started;
164 /* Number of queues added for this device */
165 uint16_t nb_dev_queues;
166 /* Number of poll based queues
167 * If nb_rx_poll > 0, the start callback will
168 * be invoked if not already invoked
171 /* Number of interrupt based queues
172 * If nb_rx_intr > 0, the start callback will
173 * be invoked if not already invoked.
176 /* Number of queues that use the shared interrupt */
177 uint16_t nb_shared_intr;
178 /* sum(wrr(q)) for all queues within the device
179 * useful when deleting all device queues
182 /* Intr based queue index to start polling from, this is used
183 * if the number of shared interrupts is non-zero
186 /* Intr based queue indices */
187 uint16_t *intr_queue;
188 /* device generates per Rx queue interrupt for queue index
189 * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
192 /* shared interrupt enabled */
193 int shared_intr_enabled;
197 struct eth_rx_queue_info {
198 int queue_enabled; /* True if added */
200 uint16_t wt; /* Polling weight */
201 uint8_t event_queue_id; /* Event queue to enqueue packets to */
202 uint8_t sched_type; /* Sched type for events */
203 uint8_t priority; /* Event priority */
204 uint32_t flow_id; /* App provided flow identifier */
205 uint32_t flow_id_mask; /* Set to ~0 if app provides flow id else 0 */
208 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
211 rxa_validate_id(uint8_t id)
213 return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
216 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
217 if (!rxa_validate_id(id)) { \
218 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
224 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
226 return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
229 /* Greatest common divisor */
230 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
234 return r ? rxa_gcd_u16(b, r) : b;
237 /* Returns the next queue in the polling sequence
239 * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
242 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
243 unsigned int n, int *cw,
244 struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
245 uint16_t gcd, int prev)
261 q = eth_rx_poll[i].eth_rx_qid;
262 d = eth_rx_poll[i].eth_dev_id;
263 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
271 rxa_shared_intr(struct eth_device_info *dev_info,
276 if (dev_info->dev->intr_handle == NULL)
279 multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
280 return !multi_intr_cap ||
281 rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
285 rxa_intr_queue(struct eth_device_info *dev_info,
288 struct eth_rx_queue_info *queue_info;
290 queue_info = &dev_info->rx_queue[rx_queue_id];
291 return dev_info->rx_queue &&
292 !dev_info->internal_event_port &&
293 queue_info->queue_enabled && queue_info->wt == 0;
297 rxa_polled_queue(struct eth_device_info *dev_info,
300 struct eth_rx_queue_info *queue_info;
302 queue_info = &dev_info->rx_queue[rx_queue_id];
303 return !dev_info->internal_event_port &&
304 dev_info->rx_queue &&
305 queue_info->queue_enabled && queue_info->wt != 0;
308 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
310 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
316 nbq = dev_info->dev->data->nb_rx_queues;
317 n = 0; /* non shared count */
318 s = 0; /* shared count */
320 if (rx_queue_id == -1) {
321 for (i = 0; i < nbq; i++) {
322 if (!rxa_shared_intr(dev_info, i))
323 n += add ? !rxa_intr_queue(dev_info, i) :
324 rxa_intr_queue(dev_info, i);
326 s += add ? !rxa_intr_queue(dev_info, i) :
327 rxa_intr_queue(dev_info, i);
331 if ((add && dev_info->nb_shared_intr == 0) ||
332 (!add && dev_info->nb_shared_intr))
336 if (!rxa_shared_intr(dev_info, rx_queue_id))
337 n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
338 rxa_intr_queue(dev_info, rx_queue_id);
340 n = add ? !dev_info->nb_shared_intr :
341 dev_info->nb_shared_intr == 1;
347 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
350 rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
351 struct eth_device_info *dev_info,
353 uint32_t *nb_rx_intr)
357 if (rx_queue_id == -1)
358 intr_diff = dev_info->nb_rx_intr;
360 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
362 *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
365 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
366 * interrupt queues could currently be poll mode Rx queues
369 rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
370 struct eth_device_info *dev_info,
372 uint32_t *nb_rx_poll,
373 uint32_t *nb_rx_intr,
378 uint32_t wrr_len_diff;
380 if (rx_queue_id == -1) {
381 intr_diff = dev_info->dev->data->nb_rx_queues -
382 dev_info->nb_rx_intr;
383 poll_diff = dev_info->nb_rx_poll;
384 wrr_len_diff = dev_info->wrr_len;
386 intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
387 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
388 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
392 *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
393 *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
394 *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
397 /* Calculate size of the eth_rx_poll and wrr_sched arrays
398 * after deleting poll mode rx queues
401 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
402 struct eth_device_info *dev_info,
404 uint32_t *nb_rx_poll,
408 uint32_t wrr_len_diff;
410 if (rx_queue_id == -1) {
411 poll_diff = dev_info->nb_rx_poll;
412 wrr_len_diff = dev_info->wrr_len;
414 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
415 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
419 *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
420 *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
423 /* Calculate nb_rx_* after adding poll mode rx queues
426 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
427 struct eth_device_info *dev_info,
430 uint32_t *nb_rx_poll,
431 uint32_t *nb_rx_intr,
436 uint32_t wrr_len_diff;
438 if (rx_queue_id == -1) {
439 intr_diff = dev_info->nb_rx_intr;
440 poll_diff = dev_info->dev->data->nb_rx_queues -
441 dev_info->nb_rx_poll;
442 wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
445 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
446 poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
447 wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
448 wt - dev_info->rx_queue[rx_queue_id].wt :
452 *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
453 *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
454 *nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
457 /* Calculate nb_rx_* after adding rx_queue_id */
459 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
460 struct eth_device_info *dev_info,
463 uint32_t *nb_rx_poll,
464 uint32_t *nb_rx_intr,
468 rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
469 wt, nb_rx_poll, nb_rx_intr, nb_wrr);
471 rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
472 nb_rx_poll, nb_rx_intr, nb_wrr);
475 /* Calculate nb_rx_* after deleting rx_queue_id */
477 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
478 struct eth_device_info *dev_info,
480 uint32_t *nb_rx_poll,
481 uint32_t *nb_rx_intr,
484 rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
486 rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
491 * Allocate the rx_poll array
493 static struct eth_rx_poll_entry *
494 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
495 uint32_t num_rx_polled)
499 len = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
500 RTE_CACHE_LINE_SIZE);
501 return rte_zmalloc_socket(rx_adapter->mem_name,
504 rx_adapter->socket_id);
508 * Allocate the WRR array
511 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
515 len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
516 RTE_CACHE_LINE_SIZE);
517 return rte_zmalloc_socket(rx_adapter->mem_name,
520 rx_adapter->socket_id);
524 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
527 struct eth_rx_poll_entry **rx_poll,
528 uint32_t **wrr_sched)
537 *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
538 if (*rx_poll == NULL) {
543 *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
544 if (*wrr_sched == NULL) {
551 /* Precalculate WRR polling sequence for all queues in rx_adapter */
553 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
554 struct eth_rx_poll_entry *rx_poll,
563 /* Initialize variables for calculation of wrr schedule */
564 uint16_t max_wrr_pos = 0;
565 unsigned int poll_q = 0;
572 /* Generate array of all queues to poll, the size of this
575 RTE_ETH_FOREACH_DEV(d) {
576 uint16_t nb_rx_queues;
577 struct eth_device_info *dev_info =
578 &rx_adapter->eth_devices[d];
579 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
580 if (dev_info->rx_queue == NULL)
582 if (dev_info->internal_event_port)
584 dev_info->wrr_len = 0;
585 for (q = 0; q < nb_rx_queues; q++) {
586 struct eth_rx_queue_info *queue_info =
587 &dev_info->rx_queue[q];
590 if (!rxa_polled_queue(dev_info, q))
593 rx_poll[poll_q].eth_dev_id = d;
594 rx_poll[poll_q].eth_rx_qid = q;
596 dev_info->wrr_len += wt;
597 max_wt = RTE_MAX(max_wt, wt);
598 gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
603 /* Generate polling sequence based on weights */
606 for (i = 0; i < max_wrr_pos; i++) {
607 rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
608 rx_poll, max_wt, gcd, prev);
614 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
615 struct rte_ipv6_hdr **ipv6_hdr)
617 struct rte_ether_hdr *eth_hdr =
618 rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
619 struct rte_vlan_hdr *vlan_hdr;
624 switch (eth_hdr->ether_type) {
625 case RTE_BE16(RTE_ETHER_TYPE_IPv4):
626 *ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
629 case RTE_BE16(RTE_ETHER_TYPE_IPv6):
630 *ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
633 case RTE_BE16(RTE_ETHER_TYPE_VLAN):
634 vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
635 switch (vlan_hdr->eth_proto) {
636 case RTE_BE16(RTE_ETHER_TYPE_IPv4):
637 *ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
639 case RTE_BE16(RTE_ETHER_TYPE_IPv6):
640 *ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
652 /* Calculate RSS hash for IPv4/6 */
653 static inline uint32_t
654 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
658 struct rte_ipv4_tuple ipv4_tuple;
659 struct rte_ipv6_tuple ipv6_tuple;
660 struct rte_ipv4_hdr *ipv4_hdr;
661 struct rte_ipv6_hdr *ipv6_hdr;
663 rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
666 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
667 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
669 input_len = RTE_THASH_V4_L3_LEN;
670 } else if (ipv6_hdr) {
671 rte_thash_load_v6_addrs(ipv6_hdr,
672 (union rte_thash_tuple *)&ipv6_tuple);
674 input_len = RTE_THASH_V6_L3_LEN;
678 return rte_softrss_be(tuple, input_len, rss_key_be);
682 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
684 return !!rx_adapter->enq_block_count;
688 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
690 if (rx_adapter->rx_enq_block_start_ts)
693 rx_adapter->enq_block_count++;
694 if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
697 rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
701 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
702 struct rte_event_eth_rx_adapter_stats *stats)
704 if (unlikely(!stats->rx_enq_start_ts))
705 stats->rx_enq_start_ts = rte_get_tsc_cycles();
707 if (likely(!rxa_enq_blocked(rx_adapter)))
710 rx_adapter->enq_block_count = 0;
711 if (rx_adapter->rx_enq_block_start_ts) {
712 stats->rx_enq_end_ts = rte_get_tsc_cycles();
713 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
714 rx_adapter->rx_enq_block_start_ts;
715 rx_adapter->rx_enq_block_start_ts = 0;
719 /* Add event to buffer, free space check is done prior to calling
723 rxa_buffer_event(struct rte_event_eth_rx_adapter *rx_adapter,
724 struct rte_event *ev)
726 struct rte_eth_event_enqueue_buffer *buf =
727 &rx_adapter->event_enqueue_buffer;
728 rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event));
731 /* Enqueue buffered events to event device */
732 static inline uint16_t
733 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
735 struct rte_eth_event_enqueue_buffer *buf =
736 &rx_adapter->event_enqueue_buffer;
737 struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
739 uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
740 rx_adapter->event_port_id,
743 if (n != buf->count) {
746 (buf->count - n) * sizeof(struct rte_event));
747 stats->rx_enq_retry++;
750 n ? rxa_enq_block_end_ts(rx_adapter, stats) :
751 rxa_enq_block_start_ts(rx_adapter);
754 stats->rx_enq_count += n;
760 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
762 uint16_t rx_queue_id,
763 struct rte_mbuf **mbufs,
767 struct eth_device_info *dev_info =
768 &rx_adapter->eth_devices[eth_dev_id];
769 struct eth_rx_queue_info *eth_rx_queue_info =
770 &dev_info->rx_queue[rx_queue_id];
771 struct rte_eth_event_enqueue_buffer *buf =
772 &rx_adapter->event_enqueue_buffer;
773 int32_t qid = eth_rx_queue_info->event_queue_id;
774 uint8_t sched_type = eth_rx_queue_info->sched_type;
775 uint8_t priority = eth_rx_queue_info->priority;
777 struct rte_event events[BATCH_SIZE];
778 struct rte_mbuf *m = mbufs[0];
783 struct rte_mbuf *cb_mbufs[BATCH_SIZE];
786 /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
787 rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
788 do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
790 if ((m->ol_flags & PKT_RX_TIMESTAMP) == 0) {
791 ts = rte_get_tsc_cycles();
792 for (i = 0; i < num; i++) {
796 m->ol_flags |= PKT_RX_TIMESTAMP;
801 nb_cb = dev_info->cb_fn ? dev_info->cb_fn(eth_dev_id, rx_queue_id,
802 ETH_EVENT_BUFFER_SIZE,
813 for (i = 0; i < num; i++) {
815 struct rte_event *ev = &events[i];
818 rxa_do_softrss(m, rx_adapter->rss_key_be) :
821 eth_rx_queue_info->flow_id &
822 eth_rx_queue_info->flow_id_mask;
823 flow_id |= rss & ~eth_rx_queue_info->flow_id_mask;
824 ev->flow_id = flow_id;
825 ev->op = RTE_EVENT_OP_NEW;
826 ev->sched_type = sched_type;
828 ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
829 ev->sub_event_type = 0;
830 ev->priority = priority;
833 rxa_buffer_event(rx_adapter, ev);
837 /* Enqueue packets from <port, q> to event buffer */
838 static inline uint32_t
839 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
846 struct rte_mbuf *mbufs[BATCH_SIZE];
847 struct rte_eth_event_enqueue_buffer *buf =
848 &rx_adapter->event_enqueue_buffer;
849 struct rte_event_eth_rx_adapter_stats *stats =
856 /* Don't do a batch dequeue from the rx queue if there isn't
857 * enough space in the enqueue buffer.
859 while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
860 if (buf->count >= BATCH_SIZE)
861 rxa_flush_event_buffer(rx_adapter);
863 stats->rx_poll_count++;
864 n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
870 rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
872 if (rx_count + nb_rx > max_rx)
877 rxa_flush_event_buffer(rx_adapter);
883 rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
890 struct eth_device_info *dev_info;
891 struct eth_rx_queue_info *queue_info;
898 dev_info = &rx_adapter->eth_devices[port_id];
899 queue_info = &dev_info->rx_queue[queue];
900 rte_spinlock_lock(&rx_adapter->intr_ring_lock);
901 if (rxa_shared_intr(dev_info, queue))
902 intr_enabled = &dev_info->shared_intr_enabled;
904 intr_enabled = &queue_info->intr_enabled;
908 err = rte_ring_enqueue(rx_adapter->intr_ring, data);
909 /* Entry should always be available.
910 * The ring size equals the maximum number of interrupt
911 * vectors supported (an interrupt vector is shared in
912 * case of shared interrupts)
915 RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
916 " to ring: %s", strerror(-err));
918 rte_eth_dev_rx_intr_disable(port_id, queue);
920 rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
924 rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
925 uint32_t num_intr_vec)
927 if (rx_adapter->num_intr_vec + num_intr_vec >
928 RTE_EVENT_ETH_INTR_RING_SIZE) {
929 RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
930 " %d needed %d limit %d", rx_adapter->num_intr_vec,
931 num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
938 /* Delete entries for (dev, queue) from the interrupt ring */
940 rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
941 struct eth_device_info *dev_info,
942 uint16_t rx_queue_id)
947 rte_spinlock_lock(&rx_adapter->intr_ring_lock);
949 n = rte_ring_count(rx_adapter->intr_ring);
950 for (i = 0; i < n; i++) {
951 rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
952 if (!rxa_shared_intr(dev_info, rx_queue_id)) {
953 if (qd.port == dev_info->dev->data->port_id &&
954 qd.queue == rx_queue_id)
957 if (qd.port == dev_info->dev->data->port_id)
960 rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
963 rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
966 /* pthread callback handling interrupt mode receive queues
967 * After receiving an Rx interrupt, it enqueues the port id and queue id of the
968 * interrupting queue to the adapter's ring buffer for interrupt events.
969 * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
970 * the adapter service function.
973 rxa_intr_thread(void *arg)
975 struct rte_event_eth_rx_adapter *rx_adapter = arg;
976 struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
980 n = rte_epoll_wait(rx_adapter->epd, epoll_events,
981 RTE_EVENT_ETH_INTR_RING_SIZE, -1);
983 RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
985 for (i = 0; i < n; i++) {
986 rxa_intr_ring_enqueue(rx_adapter,
987 epoll_events[i].epdata.data);
994 /* Dequeue <port, q> from interrupt ring and enqueue received
997 static inline uint32_t
998 rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
1003 struct rte_eth_event_enqueue_buffer *buf;
1004 rte_spinlock_t *ring_lock;
1005 uint8_t max_done = 0;
1007 if (rx_adapter->num_rx_intr == 0)
1010 if (rte_ring_count(rx_adapter->intr_ring) == 0
1011 && !rx_adapter->qd_valid)
1014 buf = &rx_adapter->event_enqueue_buffer;
1015 ring_lock = &rx_adapter->intr_ring_lock;
1017 if (buf->count >= BATCH_SIZE)
1018 rxa_flush_event_buffer(rx_adapter);
1020 while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
1021 struct eth_device_info *dev_info;
1024 union queue_data qd = rx_adapter->qd;
1027 if (!rx_adapter->qd_valid) {
1028 struct eth_rx_queue_info *queue_info;
1030 rte_spinlock_lock(ring_lock);
1031 err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1033 rte_spinlock_unlock(ring_lock);
1039 rx_adapter->qd = qd;
1040 rx_adapter->qd_valid = 1;
1041 dev_info = &rx_adapter->eth_devices[port];
1042 if (rxa_shared_intr(dev_info, queue))
1043 dev_info->shared_intr_enabled = 1;
1045 queue_info = &dev_info->rx_queue[queue];
1046 queue_info->intr_enabled = 1;
1048 rte_eth_dev_rx_intr_enable(port, queue);
1049 rte_spinlock_unlock(ring_lock);
1054 dev_info = &rx_adapter->eth_devices[port];
1057 if (rxa_shared_intr(dev_info, queue)) {
1061 nb_queues = dev_info->dev->data->nb_rx_queues;
1063 for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1064 uint8_t enq_buffer_full;
1066 if (!rxa_intr_queue(dev_info, i))
1068 n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1069 rx_adapter->max_nb_rx,
1073 enq_buffer_full = !rxq_empty && n == 0;
1074 max_done = nb_rx > rx_adapter->max_nb_rx;
1076 if (enq_buffer_full || max_done) {
1077 dev_info->next_q_idx = i;
1082 rx_adapter->qd_valid = 0;
1084 /* Reinitialize for next interrupt */
1085 dev_info->next_q_idx = dev_info->multi_intr_cap ?
1086 RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1089 n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1090 rx_adapter->max_nb_rx,
1092 rx_adapter->qd_valid = !rxq_empty;
1094 if (nb_rx > rx_adapter->max_nb_rx)
1100 rx_adapter->stats.rx_intr_packets += nb_rx;
1105 * Polls receive queues added to the event adapter and enqueues received
1106 * packets to the event device.
1108 * The receive code enqueues initially to a temporary buffer, the
1109 * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1111 * If there isn't space available in the temporary buffer, packets from the
1112 * Rx queue aren't dequeued from the eth device, this back pressures the
1113 * eth device, in virtual device environments this back pressure is relayed to
1114 * the hypervisor's switching layer where adjustments can be made to deal with
1117 static inline uint32_t
1118 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
1122 struct rte_eth_event_enqueue_buffer *buf;
1126 wrr_pos = rx_adapter->wrr_pos;
1127 max_nb_rx = rx_adapter->max_nb_rx;
1128 buf = &rx_adapter->event_enqueue_buffer;
1130 /* Iterate through a WRR sequence */
1131 for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1132 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1133 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1134 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1136 /* Don't do a batch dequeue from the rx queue if there isn't
1137 * enough space in the enqueue buffer.
1139 if (buf->count >= BATCH_SIZE)
1140 rxa_flush_event_buffer(rx_adapter);
1141 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
1142 rx_adapter->wrr_pos = wrr_pos;
1146 nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1148 if (nb_rx > max_nb_rx) {
1149 rx_adapter->wrr_pos =
1150 (wrr_pos + 1) % rx_adapter->wrr_len;
1154 if (++wrr_pos == rx_adapter->wrr_len)
1161 rxa_service_func(void *args)
1163 struct rte_event_eth_rx_adapter *rx_adapter = args;
1164 struct rte_event_eth_rx_adapter_stats *stats;
1166 if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1168 if (!rx_adapter->rxa_started) {
1169 rte_spinlock_unlock(&rx_adapter->rx_lock);
1173 stats = &rx_adapter->stats;
1174 stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
1175 stats->rx_packets += rxa_poll(rx_adapter);
1176 rte_spinlock_unlock(&rx_adapter->rx_lock);
1181 rte_event_eth_rx_adapter_init(void)
1183 const char *name = "rte_event_eth_rx_adapter_array";
1184 const struct rte_memzone *mz;
1187 sz = sizeof(*event_eth_rx_adapter) *
1188 RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1189 sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1191 mz = rte_memzone_lookup(name);
1193 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1194 RTE_CACHE_LINE_SIZE);
1196 RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1202 event_eth_rx_adapter = mz->addr;
1206 static inline struct rte_event_eth_rx_adapter *
1207 rxa_id_to_adapter(uint8_t id)
1209 return event_eth_rx_adapter ?
1210 event_eth_rx_adapter[id] : NULL;
1214 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1215 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1218 struct rte_eventdev *dev;
1219 struct rte_event_dev_config dev_conf;
1222 struct rte_event_port_conf *port_conf = arg;
1223 struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1225 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1226 dev_conf = dev->data->dev_conf;
1228 started = dev->data->dev_started;
1230 rte_event_dev_stop(dev_id);
1231 port_id = dev_conf.nb_event_ports;
1232 dev_conf.nb_event_ports += 1;
1233 ret = rte_event_dev_configure(dev_id, &dev_conf);
1235 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1238 if (rte_event_dev_start(dev_id))
1244 ret = rte_event_port_setup(dev_id, port_id, port_conf);
1246 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1251 conf->event_port_id = port_id;
1252 conf->max_nb_rx = 128;
1254 ret = rte_event_dev_start(dev_id);
1255 rx_adapter->default_cb_arg = 1;
1260 rxa_epoll_create1(void)
1264 fd = epoll_create1(EPOLL_CLOEXEC);
1265 return fd < 0 ? -errno : fd;
1272 rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
1274 if (rx_adapter->epd != INIT_FD)
1277 rx_adapter->epd = rxa_epoll_create1();
1278 if (rx_adapter->epd < 0) {
1279 int err = rx_adapter->epd;
1280 rx_adapter->epd = INIT_FD;
1281 RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1289 rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1292 char thread_name[RTE_MAX_THREAD_NAME_LEN];
1294 if (rx_adapter->intr_ring)
1297 rx_adapter->intr_ring = rte_ring_create("intr_ring",
1298 RTE_EVENT_ETH_INTR_RING_SIZE,
1299 rte_socket_id(), 0);
1300 if (!rx_adapter->intr_ring)
1303 rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1304 RTE_EVENT_ETH_INTR_RING_SIZE *
1305 sizeof(struct rte_epoll_event),
1306 RTE_CACHE_LINE_SIZE,
1307 rx_adapter->socket_id);
1308 if (!rx_adapter->epoll_events) {
1313 rte_spinlock_init(&rx_adapter->intr_ring_lock);
1315 snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1316 "rx-intr-thread-%d", rx_adapter->id);
1318 err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1319 NULL, rxa_intr_thread, rx_adapter);
1321 rte_thread_setname(rx_adapter->rx_intr_thread, thread_name);
1325 RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1327 rte_ring_free(rx_adapter->intr_ring);
1328 rx_adapter->intr_ring = NULL;
1329 rx_adapter->epoll_events = NULL;
1334 rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1338 err = pthread_cancel(rx_adapter->rx_intr_thread);
1340 RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1343 err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1345 RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1347 rte_free(rx_adapter->epoll_events);
1348 rte_ring_free(rx_adapter->intr_ring);
1349 rx_adapter->intr_ring = NULL;
1350 rx_adapter->epoll_events = NULL;
1355 rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
1359 if (rx_adapter->num_rx_intr == 0)
1362 ret = rxa_destroy_intr_thread(rx_adapter);
1366 close(rx_adapter->epd);
1367 rx_adapter->epd = INIT_FD;
1373 rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1374 struct eth_device_info *dev_info,
1375 uint16_t rx_queue_id)
1378 uint16_t eth_dev_id = dev_info->dev->data->port_id;
1379 int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1381 err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1383 RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1388 err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1393 RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1396 dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1398 dev_info->shared_intr_enabled = 0;
1403 rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1404 struct eth_device_info *dev_info,
1411 if (dev_info->nb_rx_intr == 0)
1415 if (rx_queue_id == -1) {
1416 s = dev_info->nb_shared_intr;
1417 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1421 q = dev_info->intr_queue[i];
1422 sintr = rxa_shared_intr(dev_info, q);
1425 if (!sintr || s == 0) {
1427 err = rxa_disable_intr(rx_adapter, dev_info,
1431 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1436 if (!rxa_intr_queue(dev_info, rx_queue_id))
1438 if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1439 dev_info->nb_shared_intr == 1) {
1440 err = rxa_disable_intr(rx_adapter, dev_info,
1444 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1448 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1449 if (dev_info->intr_queue[i] == rx_queue_id) {
1450 for (; i < dev_info->nb_rx_intr - 1; i++)
1451 dev_info->intr_queue[i] =
1452 dev_info->intr_queue[i + 1];
1462 rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1463 struct eth_device_info *dev_info,
1464 uint16_t rx_queue_id)
1467 uint16_t eth_dev_id = dev_info->dev->data->port_id;
1468 union queue_data qd;
1470 uint16_t *intr_queue;
1471 int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1473 if (rxa_intr_queue(dev_info, rx_queue_id))
1476 intr_queue = dev_info->intr_queue;
1477 if (dev_info->intr_queue == NULL) {
1479 dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1480 dev_info->intr_queue =
1482 rx_adapter->mem_name,
1485 rx_adapter->socket_id);
1486 if (dev_info->intr_queue == NULL)
1490 init_fd = rx_adapter->epd;
1491 err = rxa_init_epd(rx_adapter);
1493 goto err_free_queue;
1495 qd.port = eth_dev_id;
1496 qd.queue = rx_queue_id;
1498 err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1503 RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1504 " Rx Queue %u err %d", rx_queue_id, err);
1508 err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1510 RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1511 " Rx Queue %u err %d", rx_queue_id, err);
1516 err = rxa_create_intr_thread(rx_adapter);
1519 dev_info->shared_intr_enabled = 1;
1521 dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1526 err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1528 RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1529 " Rx Queue %u err %d", rx_queue_id, err);
1531 err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1536 RTE_EDEV_LOG_ERR("Could not delete event for"
1537 " Rx Queue %u err %d", rx_queue_id, err1);
1540 if (init_fd == INIT_FD) {
1541 close(rx_adapter->epd);
1542 rx_adapter->epd = -1;
1545 if (intr_queue == NULL)
1546 rte_free(dev_info->intr_queue);
1552 rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1553 struct eth_device_info *dev_info,
1559 int shared_done = (dev_info->nb_shared_intr > 0);
1561 if (rx_queue_id != -1) {
1562 if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1564 return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1568 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1570 if (rxa_shared_intr(dev_info, i) && shared_done)
1573 err = rxa_config_intr(rx_adapter, dev_info, i);
1575 shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1578 dev_info->shared_intr_enabled = 1;
1587 shared_done = (dev_info->nb_shared_intr > 0);
1588 for (j = 0; j < i; j++) {
1589 if (rxa_intr_queue(dev_info, j))
1591 if (rxa_shared_intr(dev_info, j) && si != j)
1593 err = rxa_disable_intr(rx_adapter, dev_info, j);
1604 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
1607 struct rte_service_spec service;
1608 struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1610 if (rx_adapter->service_inited)
1613 memset(&service, 0, sizeof(service));
1614 snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1615 "rte_event_eth_rx_adapter_%d", id);
1616 service.socket_id = rx_adapter->socket_id;
1617 service.callback = rxa_service_func;
1618 service.callback_userdata = rx_adapter;
1619 /* Service function handles locking for queue add/del updates */
1620 service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1621 ret = rte_service_component_register(&service, &rx_adapter->service_id);
1623 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1628 ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1629 &rx_adapter_conf, rx_adapter->conf_arg);
1631 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1635 rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1636 rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1637 rx_adapter->service_inited = 1;
1638 rx_adapter->epd = INIT_FD;
1642 rte_service_component_unregister(rx_adapter->service_id);
1647 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1648 struct eth_device_info *dev_info,
1649 int32_t rx_queue_id,
1652 struct eth_rx_queue_info *queue_info;
1656 if (dev_info->rx_queue == NULL)
1659 if (rx_queue_id == -1) {
1660 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1661 rxa_update_queue(rx_adapter, dev_info, i, add);
1663 queue_info = &dev_info->rx_queue[rx_queue_id];
1664 enabled = queue_info->queue_enabled;
1666 rx_adapter->nb_queues += !enabled;
1667 dev_info->nb_dev_queues += !enabled;
1669 rx_adapter->nb_queues -= enabled;
1670 dev_info->nb_dev_queues -= enabled;
1672 queue_info->queue_enabled = !!add;
1677 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
1678 struct eth_device_info *dev_info,
1679 int32_t rx_queue_id)
1686 if (rx_adapter->nb_queues == 0)
1689 if (rx_queue_id == -1) {
1690 uint16_t nb_rx_queues;
1693 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1694 for (i = 0; i < nb_rx_queues; i++)
1695 rxa_sw_del(rx_adapter, dev_info, i);
1699 pollq = rxa_polled_queue(dev_info, rx_queue_id);
1700 intrq = rxa_intr_queue(dev_info, rx_queue_id);
1701 sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1702 rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1703 rx_adapter->num_rx_polled -= pollq;
1704 dev_info->nb_rx_poll -= pollq;
1705 rx_adapter->num_rx_intr -= intrq;
1706 dev_info->nb_rx_intr -= intrq;
1707 dev_info->nb_shared_intr -= intrq && sintrq;
1711 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1712 struct eth_device_info *dev_info,
1713 int32_t rx_queue_id,
1714 const struct rte_event_eth_rx_adapter_queue_conf *conf)
1716 struct eth_rx_queue_info *queue_info;
1717 const struct rte_event *ev = &conf->ev;
1722 if (rx_queue_id == -1) {
1723 uint16_t nb_rx_queues;
1726 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1727 for (i = 0; i < nb_rx_queues; i++)
1728 rxa_add_queue(rx_adapter, dev_info, i, conf);
1732 pollq = rxa_polled_queue(dev_info, rx_queue_id);
1733 intrq = rxa_intr_queue(dev_info, rx_queue_id);
1734 sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1736 queue_info = &dev_info->rx_queue[rx_queue_id];
1737 queue_info->event_queue_id = ev->queue_id;
1738 queue_info->sched_type = ev->sched_type;
1739 queue_info->priority = ev->priority;
1740 queue_info->wt = conf->servicing_weight;
1742 if (conf->rx_queue_flags &
1743 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
1744 queue_info->flow_id = ev->flow_id;
1745 queue_info->flow_id_mask = ~0;
1748 rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
1749 if (rxa_polled_queue(dev_info, rx_queue_id)) {
1750 rx_adapter->num_rx_polled += !pollq;
1751 dev_info->nb_rx_poll += !pollq;
1752 rx_adapter->num_rx_intr -= intrq;
1753 dev_info->nb_rx_intr -= intrq;
1754 dev_info->nb_shared_intr -= intrq && sintrq;
1757 if (rxa_intr_queue(dev_info, rx_queue_id)) {
1758 rx_adapter->num_rx_polled -= pollq;
1759 dev_info->nb_rx_poll -= pollq;
1760 rx_adapter->num_rx_intr += !intrq;
1761 dev_info->nb_rx_intr += !intrq;
1762 dev_info->nb_shared_intr += !intrq && sintrq;
1763 if (dev_info->nb_shared_intr == 1) {
1764 if (dev_info->multi_intr_cap)
1765 dev_info->next_q_idx =
1766 RTE_MAX_RXTX_INTR_VEC_ID - 1;
1768 dev_info->next_q_idx = 0;
1773 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
1774 uint16_t eth_dev_id,
1776 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
1778 struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
1779 struct rte_event_eth_rx_adapter_queue_conf temp_conf;
1781 struct eth_rx_poll_entry *rx_poll;
1782 struct eth_rx_queue_info *rx_queue;
1784 uint16_t nb_rx_queues;
1785 uint32_t nb_rx_poll, nb_wrr;
1786 uint32_t nb_rx_intr;
1790 if (queue_conf->servicing_weight == 0) {
1791 struct rte_eth_dev_data *data = dev_info->dev->data;
1793 temp_conf = *queue_conf;
1794 if (!data->dev_conf.intr_conf.rxq) {
1795 /* If Rx interrupts are disabled set wt = 1 */
1796 temp_conf.servicing_weight = 1;
1798 queue_conf = &temp_conf;
1801 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1802 rx_queue = dev_info->rx_queue;
1803 wt = queue_conf->servicing_weight;
1805 if (dev_info->rx_queue == NULL) {
1806 dev_info->rx_queue =
1807 rte_zmalloc_socket(rx_adapter->mem_name,
1809 sizeof(struct eth_rx_queue_info), 0,
1810 rx_adapter->socket_id);
1811 if (dev_info->rx_queue == NULL)
1817 rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
1818 queue_conf->servicing_weight,
1819 &nb_rx_poll, &nb_rx_intr, &nb_wrr);
1821 if (dev_info->dev->intr_handle)
1822 dev_info->multi_intr_cap =
1823 rte_intr_cap_multiple(dev_info->dev->intr_handle);
1825 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
1828 goto err_free_rxqueue;
1831 num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
1833 ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
1835 goto err_free_rxqueue;
1837 ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
1839 goto err_free_rxqueue;
1843 if (rx_adapter->num_rx_intr > nb_rx_intr) {
1844 num_intr_vec = rxa_nb_intr_vect(dev_info,
1846 /* interrupt based queues are being converted to
1847 * poll mode queues, delete the interrupt configuration
1850 ret = rxa_del_intr_queue(rx_adapter,
1851 dev_info, rx_queue_id);
1853 goto err_free_rxqueue;
1857 if (nb_rx_intr == 0) {
1858 ret = rxa_free_intr_resources(rx_adapter);
1860 goto err_free_rxqueue;
1866 if (rx_queue_id == -1) {
1867 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1868 dev_info->intr_queue[i] = i;
1870 if (!rxa_intr_queue(dev_info, rx_queue_id))
1871 dev_info->intr_queue[nb_rx_intr - 1] =
1878 rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
1879 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
1881 rte_free(rx_adapter->eth_rx_poll);
1882 rte_free(rx_adapter->wrr_sched);
1884 rx_adapter->eth_rx_poll = rx_poll;
1885 rx_adapter->wrr_sched = rx_wrr;
1886 rx_adapter->wrr_len = nb_wrr;
1887 rx_adapter->num_intr_vec += num_intr_vec;
1891 if (rx_queue == NULL) {
1892 rte_free(dev_info->rx_queue);
1893 dev_info->rx_queue = NULL;
1903 rxa_ctrl(uint8_t id, int start)
1905 struct rte_event_eth_rx_adapter *rx_adapter;
1906 struct rte_eventdev *dev;
1907 struct eth_device_info *dev_info;
1909 int use_service = 0;
1912 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1913 rx_adapter = rxa_id_to_adapter(id);
1914 if (rx_adapter == NULL)
1917 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1919 RTE_ETH_FOREACH_DEV(i) {
1920 dev_info = &rx_adapter->eth_devices[i];
1921 /* if start check for num dev queues */
1922 if (start && !dev_info->nb_dev_queues)
1924 /* if stop check if dev has been started */
1925 if (stop && !dev_info->dev_rx_started)
1927 use_service |= !dev_info->internal_event_port;
1928 dev_info->dev_rx_started = start;
1929 if (dev_info->internal_event_port == 0)
1931 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
1932 &rte_eth_devices[i]) :
1933 (*dev->dev_ops->eth_rx_adapter_stop)(dev,
1934 &rte_eth_devices[i]);
1938 rte_spinlock_lock(&rx_adapter->rx_lock);
1939 rx_adapter->rxa_started = start;
1940 rte_service_runstate_set(rx_adapter->service_id, start);
1941 rte_spinlock_unlock(&rx_adapter->rx_lock);
1948 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
1949 rte_event_eth_rx_adapter_conf_cb conf_cb,
1952 struct rte_event_eth_rx_adapter *rx_adapter;
1956 char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
1957 const uint8_t default_rss_key[] = {
1958 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
1959 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
1960 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
1961 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
1962 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
1965 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1966 RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1967 if (conf_cb == NULL)
1970 if (event_eth_rx_adapter == NULL) {
1971 ret = rte_event_eth_rx_adapter_init();
1976 rx_adapter = rxa_id_to_adapter(id);
1977 if (rx_adapter != NULL) {
1978 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
1982 socket_id = rte_event_dev_socket_id(dev_id);
1983 snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
1984 "rte_event_eth_rx_adapter_%d",
1987 rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
1988 RTE_CACHE_LINE_SIZE, socket_id);
1989 if (rx_adapter == NULL) {
1990 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
1994 rx_adapter->eventdev_id = dev_id;
1995 rx_adapter->socket_id = socket_id;
1996 rx_adapter->conf_cb = conf_cb;
1997 rx_adapter->conf_arg = conf_arg;
1998 rx_adapter->id = id;
1999 strcpy(rx_adapter->mem_name, mem_name);
2000 rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
2002 sizeof(struct eth_device_info), 0,
2004 rte_convert_rss_key((const uint32_t *)default_rss_key,
2005 (uint32_t *)rx_adapter->rss_key_be,
2006 RTE_DIM(default_rss_key));
2008 if (rx_adapter->eth_devices == NULL) {
2009 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
2010 rte_free(rx_adapter);
2013 rte_spinlock_init(&rx_adapter->rx_lock);
2014 for (i = 0; i < RTE_MAX_ETHPORTS; i++)
2015 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
2017 event_eth_rx_adapter[id] = rx_adapter;
2018 if (conf_cb == rxa_default_conf_cb)
2019 rx_adapter->default_cb_arg = 1;
2024 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2025 struct rte_event_port_conf *port_config)
2027 struct rte_event_port_conf *pc;
2030 if (port_config == NULL)
2032 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2034 pc = rte_malloc(NULL, sizeof(*pc), 0);
2038 ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2039 rxa_default_conf_cb,
2047 rte_event_eth_rx_adapter_free(uint8_t id)
2049 struct rte_event_eth_rx_adapter *rx_adapter;
2051 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2053 rx_adapter = rxa_id_to_adapter(id);
2054 if (rx_adapter == NULL)
2057 if (rx_adapter->nb_queues) {
2058 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2059 rx_adapter->nb_queues);
2063 if (rx_adapter->default_cb_arg)
2064 rte_free(rx_adapter->conf_arg);
2065 rte_free(rx_adapter->eth_devices);
2066 rte_free(rx_adapter);
2067 event_eth_rx_adapter[id] = NULL;
2073 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2074 uint16_t eth_dev_id,
2075 int32_t rx_queue_id,
2076 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2080 struct rte_event_eth_rx_adapter *rx_adapter;
2081 struct rte_eventdev *dev;
2082 struct eth_device_info *dev_info;
2084 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2085 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2087 rx_adapter = rxa_id_to_adapter(id);
2088 if ((rx_adapter == NULL) || (queue_conf == NULL))
2091 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2092 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2096 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2097 "eth port %" PRIu16, id, eth_dev_id);
2101 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2102 && (queue_conf->rx_queue_flags &
2103 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2104 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2105 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2110 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2111 (rx_queue_id != -1)) {
2112 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2113 "event queue, eth port: %" PRIu16 " adapter id: %"
2114 PRIu8, eth_dev_id, id);
2118 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2119 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2120 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2121 (uint16_t)rx_queue_id);
2125 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2127 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2128 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2130 if (dev_info->rx_queue == NULL) {
2131 dev_info->rx_queue =
2132 rte_zmalloc_socket(rx_adapter->mem_name,
2133 dev_info->dev->data->nb_rx_queues *
2134 sizeof(struct eth_rx_queue_info), 0,
2135 rx_adapter->socket_id);
2136 if (dev_info->rx_queue == NULL)
2140 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2141 &rte_eth_devices[eth_dev_id],
2142 rx_queue_id, queue_conf);
2144 dev_info->internal_event_port = 1;
2145 rxa_update_queue(rx_adapter,
2146 &rx_adapter->eth_devices[eth_dev_id],
2151 rte_spinlock_lock(&rx_adapter->rx_lock);
2152 dev_info->internal_event_port = 0;
2153 ret = rxa_init_service(rx_adapter, id);
2155 uint32_t service_id = rx_adapter->service_id;
2156 ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2158 rte_service_component_runstate_set(service_id,
2159 rxa_sw_adapter_queue_count(rx_adapter));
2161 rte_spinlock_unlock(&rx_adapter->rx_lock);
2171 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2172 int32_t rx_queue_id)
2175 struct rte_eventdev *dev;
2176 struct rte_event_eth_rx_adapter *rx_adapter;
2177 struct eth_device_info *dev_info;
2179 uint32_t nb_rx_poll = 0;
2180 uint32_t nb_wrr = 0;
2181 uint32_t nb_rx_intr;
2182 struct eth_rx_poll_entry *rx_poll = NULL;
2183 uint32_t *rx_wrr = NULL;
2186 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2187 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2189 rx_adapter = rxa_id_to_adapter(id);
2190 if (rx_adapter == NULL)
2193 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2194 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2200 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2201 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2202 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2203 (uint16_t)rx_queue_id);
2207 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2209 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2210 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2212 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2213 &rte_eth_devices[eth_dev_id],
2216 rxa_update_queue(rx_adapter,
2217 &rx_adapter->eth_devices[eth_dev_id],
2220 if (dev_info->nb_dev_queues == 0) {
2221 rte_free(dev_info->rx_queue);
2222 dev_info->rx_queue = NULL;
2226 rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2227 &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2229 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2234 rte_spinlock_lock(&rx_adapter->rx_lock);
2237 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2239 num_intr_vec = rxa_nb_intr_vect(dev_info,
2241 ret = rxa_del_intr_queue(rx_adapter, dev_info,
2247 if (nb_rx_intr == 0) {
2248 ret = rxa_free_intr_resources(rx_adapter);
2253 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2254 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2256 rte_free(rx_adapter->eth_rx_poll);
2257 rte_free(rx_adapter->wrr_sched);
2259 if (nb_rx_intr == 0) {
2260 rte_free(dev_info->intr_queue);
2261 dev_info->intr_queue = NULL;
2264 rx_adapter->eth_rx_poll = rx_poll;
2265 rx_adapter->wrr_sched = rx_wrr;
2266 rx_adapter->wrr_len = nb_wrr;
2267 rx_adapter->num_intr_vec += num_intr_vec;
2269 if (dev_info->nb_dev_queues == 0) {
2270 rte_free(dev_info->rx_queue);
2271 dev_info->rx_queue = NULL;
2274 rte_spinlock_unlock(&rx_adapter->rx_lock);
2281 rte_service_component_runstate_set(rx_adapter->service_id,
2282 rxa_sw_adapter_queue_count(rx_adapter));
2289 rte_event_eth_rx_adapter_start(uint8_t id)
2291 return rxa_ctrl(id, 1);
2295 rte_event_eth_rx_adapter_stop(uint8_t id)
2297 return rxa_ctrl(id, 0);
2300 int __rte_experimental
2301 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2302 struct rte_event_eth_rx_adapter_stats *stats)
2304 struct rte_event_eth_rx_adapter *rx_adapter;
2305 struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2306 struct rte_event_eth_rx_adapter_stats dev_stats;
2307 struct rte_eventdev *dev;
2308 struct eth_device_info *dev_info;
2312 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2314 rx_adapter = rxa_id_to_adapter(id);
2315 if (rx_adapter == NULL || stats == NULL)
2318 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2319 memset(stats, 0, sizeof(*stats));
2320 RTE_ETH_FOREACH_DEV(i) {
2321 dev_info = &rx_adapter->eth_devices[i];
2322 if (dev_info->internal_event_port == 0 ||
2323 dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2325 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2326 &rte_eth_devices[i],
2330 dev_stats_sum.rx_packets += dev_stats.rx_packets;
2331 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2334 if (rx_adapter->service_inited)
2335 *stats = rx_adapter->stats;
2337 stats->rx_packets += dev_stats_sum.rx_packets;
2338 stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2343 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
2345 struct rte_event_eth_rx_adapter *rx_adapter;
2346 struct rte_eventdev *dev;
2347 struct eth_device_info *dev_info;
2350 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2352 rx_adapter = rxa_id_to_adapter(id);
2353 if (rx_adapter == NULL)
2356 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2357 RTE_ETH_FOREACH_DEV(i) {
2358 dev_info = &rx_adapter->eth_devices[i];
2359 if (dev_info->internal_event_port == 0 ||
2360 dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
2362 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
2363 &rte_eth_devices[i]);
2366 memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
2371 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
2373 struct rte_event_eth_rx_adapter *rx_adapter;
2375 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2377 rx_adapter = rxa_id_to_adapter(id);
2378 if (rx_adapter == NULL || service_id == NULL)
2381 if (rx_adapter->service_inited)
2382 *service_id = rx_adapter->service_id;
2384 return rx_adapter->service_inited ? 0 : -ESRCH;
2387 int __rte_experimental
2388 rte_event_eth_rx_adapter_cb_register(uint8_t id,
2389 uint16_t eth_dev_id,
2390 rte_event_eth_rx_adapter_cb_fn cb_fn,
2393 struct rte_event_eth_rx_adapter *rx_adapter;
2394 struct eth_device_info *dev_info;
2398 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2399 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2401 rx_adapter = rxa_id_to_adapter(id);
2402 if (rx_adapter == NULL)
2405 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2406 if (dev_info->rx_queue == NULL)
2409 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2413 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2414 "eth port %" PRIu16, id, eth_dev_id);
2418 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2419 RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
2420 PRIu16, eth_dev_id);
2424 rte_spinlock_lock(&rx_adapter->rx_lock);
2425 dev_info->cb_fn = cb_fn;
2426 dev_info->cb_arg = cb_arg;
2427 rte_spinlock_unlock(&rx_adapter->rx_lock);