1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2017 Intel Corporation.
10 #include <rte_cycles.h>
11 #include <rte_common.h>
13 #include <rte_errno.h>
14 #include <rte_ethdev.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
21 #include "rte_eventdev.h"
22 #include "rte_eventdev_pmd.h"
23 #include "rte_event_eth_rx_adapter.h"
26 #define BLOCK_CNT_THRESHOLD 10
27 #define ETH_EVENT_BUFFER_SIZE (4*BATCH_SIZE)
29 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
30 #define ETH_RX_ADAPTER_MEM_NAME_LEN 32
32 #define RSS_KEY_SIZE 40
33 /* value written to intr thread pipe to signal thread exit */
34 #define ETH_BRIDGE_INTR_THREAD_EXIT 1
35 /* Sentinel value to detect initialized file handle */
39 * Used to store port and queue ID of interrupting Rx queue
51 * There is an instance of this struct per polled Rx queue added to the
54 struct eth_rx_poll_entry {
55 /* Eth port to poll */
57 /* Eth rx queue to poll */
61 /* Instance per adapter */
62 struct rte_eth_event_enqueue_buffer {
63 /* Count of events in this buffer */
65 /* Array of events in this buffer */
66 struct rte_event events[ETH_EVENT_BUFFER_SIZE];
69 struct rte_event_eth_rx_adapter {
71 uint8_t rss_key_be[RSS_KEY_SIZE];
72 /* Event device identifier */
74 /* Per ethernet device structure */
75 struct eth_device_info *eth_devices;
76 /* Event port identifier */
77 uint8_t event_port_id;
78 /* Lock to serialize config updates with service function */
79 rte_spinlock_t rx_lock;
80 /* Max mbufs processed in any service function invocation */
82 /* Receive queues that need to be polled */
83 struct eth_rx_poll_entry *eth_rx_poll;
84 /* Size of the eth_rx_poll array */
85 uint16_t num_rx_polled;
86 /* Weighted round robin schedule */
88 /* wrr_sched[] size */
90 /* Next entry in wrr[] to begin polling */
92 /* Event burst buffer */
93 struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
94 /* Per adapter stats */
95 struct rte_event_eth_rx_adapter_stats stats;
96 /* Block count, counts up to BLOCK_CNT_THRESHOLD */
97 uint16_t enq_block_count;
99 uint64_t rx_enq_block_start_ts;
100 /* epoll fd used to wait for Rx interrupts */
102 /* Num of interrupt driven interrupt queues */
103 uint32_t num_rx_intr;
104 /* Used to send <dev id, queue id> of interrupting Rx queues from
105 * the interrupt thread to the Rx thread
107 struct rte_ring *intr_ring;
108 /* Rx Queue data (dev id, queue id) for the last non-empty
112 /* queue_data is valid */
114 /* Interrupt ring lock, synchronizes Rx thread
115 * and interrupt thread
117 rte_spinlock_t intr_ring_lock;
118 /* event array passed to rte_poll_wait */
119 struct rte_epoll_event *epoll_events;
120 /* Count of interrupt vectors in use */
121 uint32_t num_intr_vec;
122 /* Thread blocked on Rx interrupts */
123 pthread_t rx_intr_thread;
124 /* Configuration callback for rte_service configuration */
125 rte_event_eth_rx_adapter_conf_cb conf_cb;
126 /* Configuration callback argument */
128 /* Set if default_cb is being used */
130 /* Service initialization state */
131 uint8_t service_inited;
132 /* Total count of Rx queues in adapter */
134 /* Memory allocation name */
135 char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
136 /* Socket identifier cached from eventdev */
138 /* Per adapter EAL service */
140 /* Adapter started flag */
144 } __rte_cache_aligned;
147 struct eth_device_info {
148 struct rte_eth_dev *dev;
149 struct eth_rx_queue_info *rx_queue;
151 rte_event_eth_rx_adapter_cb_fn cb_fn;
152 /* Rx callback argument */
154 /* Set if ethdev->eventdev packet transfer uses a
157 uint8_t internal_event_port;
158 /* Set if the adapter is processing rx queues for
159 * this eth device and packet processing has been
160 * started, allows for the code to know if the PMD
161 * rx_adapter_stop callback needs to be invoked
163 uint8_t dev_rx_started;
164 /* Number of queues added for this device */
165 uint16_t nb_dev_queues;
166 /* Number of poll based queues
167 * If nb_rx_poll > 0, the start callback will
168 * be invoked if not already invoked
171 /* Number of interrupt based queues
172 * If nb_rx_intr > 0, the start callback will
173 * be invoked if not already invoked.
176 /* Number of queues that use the shared interrupt */
177 uint16_t nb_shared_intr;
178 /* sum(wrr(q)) for all queues within the device
179 * useful when deleting all device queues
182 /* Intr based queue index to start polling from, this is used
183 * if the number of shared interrupts is non-zero
186 /* Intr based queue indices */
187 uint16_t *intr_queue;
188 /* device generates per Rx queue interrupt for queue index
189 * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
192 /* shared interrupt enabled */
193 int shared_intr_enabled;
197 struct eth_rx_queue_info {
198 int queue_enabled; /* True if added */
200 uint16_t wt; /* Polling weight */
201 uint32_t flow_id_mask; /* Set to ~0 if app provides flow id else 0 */
205 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
208 rxa_validate_id(uint8_t id)
210 return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
213 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
214 if (!rxa_validate_id(id)) { \
215 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
221 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
223 return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
226 /* Greatest common divisor */
227 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
231 return r ? rxa_gcd_u16(b, r) : b;
234 /* Returns the next queue in the polling sequence
236 * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
239 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
240 unsigned int n, int *cw,
241 struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
242 uint16_t gcd, int prev)
258 q = eth_rx_poll[i].eth_rx_qid;
259 d = eth_rx_poll[i].eth_dev_id;
260 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
268 rxa_shared_intr(struct eth_device_info *dev_info,
273 if (dev_info->dev->intr_handle == NULL)
276 multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
277 return !multi_intr_cap ||
278 rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
282 rxa_intr_queue(struct eth_device_info *dev_info,
285 struct eth_rx_queue_info *queue_info;
287 queue_info = &dev_info->rx_queue[rx_queue_id];
288 return dev_info->rx_queue &&
289 !dev_info->internal_event_port &&
290 queue_info->queue_enabled && queue_info->wt == 0;
294 rxa_polled_queue(struct eth_device_info *dev_info,
297 struct eth_rx_queue_info *queue_info;
299 queue_info = &dev_info->rx_queue[rx_queue_id];
300 return !dev_info->internal_event_port &&
301 dev_info->rx_queue &&
302 queue_info->queue_enabled && queue_info->wt != 0;
305 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
307 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
313 nbq = dev_info->dev->data->nb_rx_queues;
314 n = 0; /* non shared count */
315 s = 0; /* shared count */
317 if (rx_queue_id == -1) {
318 for (i = 0; i < nbq; i++) {
319 if (!rxa_shared_intr(dev_info, i))
320 n += add ? !rxa_intr_queue(dev_info, i) :
321 rxa_intr_queue(dev_info, i);
323 s += add ? !rxa_intr_queue(dev_info, i) :
324 rxa_intr_queue(dev_info, i);
328 if ((add && dev_info->nb_shared_intr == 0) ||
329 (!add && dev_info->nb_shared_intr))
333 if (!rxa_shared_intr(dev_info, rx_queue_id))
334 n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
335 rxa_intr_queue(dev_info, rx_queue_id);
337 n = add ? !dev_info->nb_shared_intr :
338 dev_info->nb_shared_intr == 1;
344 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
347 rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
348 struct eth_device_info *dev_info,
350 uint32_t *nb_rx_intr)
354 if (rx_queue_id == -1)
355 intr_diff = dev_info->nb_rx_intr;
357 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
359 *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
362 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
363 * interrupt queues could currently be poll mode Rx queues
366 rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
367 struct eth_device_info *dev_info,
369 uint32_t *nb_rx_poll,
370 uint32_t *nb_rx_intr,
375 uint32_t wrr_len_diff;
377 if (rx_queue_id == -1) {
378 intr_diff = dev_info->dev->data->nb_rx_queues -
379 dev_info->nb_rx_intr;
380 poll_diff = dev_info->nb_rx_poll;
381 wrr_len_diff = dev_info->wrr_len;
383 intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
384 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
385 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
389 *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
390 *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
391 *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
394 /* Calculate size of the eth_rx_poll and wrr_sched arrays
395 * after deleting poll mode rx queues
398 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
399 struct eth_device_info *dev_info,
401 uint32_t *nb_rx_poll,
405 uint32_t wrr_len_diff;
407 if (rx_queue_id == -1) {
408 poll_diff = dev_info->nb_rx_poll;
409 wrr_len_diff = dev_info->wrr_len;
411 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
412 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
416 *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
417 *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
420 /* Calculate nb_rx_* after adding poll mode rx queues
423 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
424 struct eth_device_info *dev_info,
427 uint32_t *nb_rx_poll,
428 uint32_t *nb_rx_intr,
433 uint32_t wrr_len_diff;
435 if (rx_queue_id == -1) {
436 intr_diff = dev_info->nb_rx_intr;
437 poll_diff = dev_info->dev->data->nb_rx_queues -
438 dev_info->nb_rx_poll;
439 wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
442 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
443 poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
444 wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
445 wt - dev_info->rx_queue[rx_queue_id].wt :
449 *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
450 *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
451 *nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
454 /* Calculate nb_rx_* after adding rx_queue_id */
456 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
457 struct eth_device_info *dev_info,
460 uint32_t *nb_rx_poll,
461 uint32_t *nb_rx_intr,
465 rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
466 wt, nb_rx_poll, nb_rx_intr, nb_wrr);
468 rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
469 nb_rx_poll, nb_rx_intr, nb_wrr);
472 /* Calculate nb_rx_* after deleting rx_queue_id */
474 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
475 struct eth_device_info *dev_info,
477 uint32_t *nb_rx_poll,
478 uint32_t *nb_rx_intr,
481 rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
483 rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
488 * Allocate the rx_poll array
490 static struct eth_rx_poll_entry *
491 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
492 uint32_t num_rx_polled)
496 len = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
497 RTE_CACHE_LINE_SIZE);
498 return rte_zmalloc_socket(rx_adapter->mem_name,
501 rx_adapter->socket_id);
505 * Allocate the WRR array
508 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
512 len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
513 RTE_CACHE_LINE_SIZE);
514 return rte_zmalloc_socket(rx_adapter->mem_name,
517 rx_adapter->socket_id);
521 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
524 struct eth_rx_poll_entry **rx_poll,
525 uint32_t **wrr_sched)
534 *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
535 if (*rx_poll == NULL) {
540 *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
541 if (*wrr_sched == NULL) {
548 /* Precalculate WRR polling sequence for all queues in rx_adapter */
550 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
551 struct eth_rx_poll_entry *rx_poll,
560 /* Initialize variables for calculation of wrr schedule */
561 uint16_t max_wrr_pos = 0;
562 unsigned int poll_q = 0;
569 /* Generate array of all queues to poll, the size of this
572 RTE_ETH_FOREACH_DEV(d) {
573 uint16_t nb_rx_queues;
574 struct eth_device_info *dev_info =
575 &rx_adapter->eth_devices[d];
576 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
577 if (dev_info->rx_queue == NULL)
579 if (dev_info->internal_event_port)
581 dev_info->wrr_len = 0;
582 for (q = 0; q < nb_rx_queues; q++) {
583 struct eth_rx_queue_info *queue_info =
584 &dev_info->rx_queue[q];
587 if (!rxa_polled_queue(dev_info, q))
590 rx_poll[poll_q].eth_dev_id = d;
591 rx_poll[poll_q].eth_rx_qid = q;
593 dev_info->wrr_len += wt;
594 max_wt = RTE_MAX(max_wt, wt);
595 gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
600 /* Generate polling sequence based on weights */
603 for (i = 0; i < max_wrr_pos; i++) {
604 rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
605 rx_poll, max_wt, gcd, prev);
611 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
612 struct rte_ipv6_hdr **ipv6_hdr)
614 struct rte_ether_hdr *eth_hdr =
615 rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
616 struct rte_vlan_hdr *vlan_hdr;
621 switch (eth_hdr->ether_type) {
622 case RTE_BE16(RTE_ETHER_TYPE_IPV4):
623 *ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
626 case RTE_BE16(RTE_ETHER_TYPE_IPV6):
627 *ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
630 case RTE_BE16(RTE_ETHER_TYPE_VLAN):
631 vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
632 switch (vlan_hdr->eth_proto) {
633 case RTE_BE16(RTE_ETHER_TYPE_IPV4):
634 *ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
636 case RTE_BE16(RTE_ETHER_TYPE_IPV6):
637 *ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
649 /* Calculate RSS hash for IPv4/6 */
650 static inline uint32_t
651 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
655 struct rte_ipv4_tuple ipv4_tuple;
656 struct rte_ipv6_tuple ipv6_tuple;
657 struct rte_ipv4_hdr *ipv4_hdr;
658 struct rte_ipv6_hdr *ipv6_hdr;
660 rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
663 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
664 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
666 input_len = RTE_THASH_V4_L3_LEN;
667 } else if (ipv6_hdr) {
668 rte_thash_load_v6_addrs(ipv6_hdr,
669 (union rte_thash_tuple *)&ipv6_tuple);
671 input_len = RTE_THASH_V6_L3_LEN;
675 return rte_softrss_be(tuple, input_len, rss_key_be);
679 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
681 return !!rx_adapter->enq_block_count;
685 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
687 if (rx_adapter->rx_enq_block_start_ts)
690 rx_adapter->enq_block_count++;
691 if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
694 rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
698 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
699 struct rte_event_eth_rx_adapter_stats *stats)
701 if (unlikely(!stats->rx_enq_start_ts))
702 stats->rx_enq_start_ts = rte_get_tsc_cycles();
704 if (likely(!rxa_enq_blocked(rx_adapter)))
707 rx_adapter->enq_block_count = 0;
708 if (rx_adapter->rx_enq_block_start_ts) {
709 stats->rx_enq_end_ts = rte_get_tsc_cycles();
710 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
711 rx_adapter->rx_enq_block_start_ts;
712 rx_adapter->rx_enq_block_start_ts = 0;
716 /* Enqueue buffered events to event device */
717 static inline uint16_t
718 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
720 struct rte_eth_event_enqueue_buffer *buf =
721 &rx_adapter->event_enqueue_buffer;
722 struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
724 uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
725 rx_adapter->event_port_id,
728 if (n != buf->count) {
731 (buf->count - n) * sizeof(struct rte_event));
732 stats->rx_enq_retry++;
735 n ? rxa_enq_block_end_ts(rx_adapter, stats) :
736 rxa_enq_block_start_ts(rx_adapter);
739 stats->rx_enq_count += n;
745 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
747 uint16_t rx_queue_id,
748 struct rte_mbuf **mbufs,
752 struct eth_device_info *dev_info =
753 &rx_adapter->eth_devices[eth_dev_id];
754 struct eth_rx_queue_info *eth_rx_queue_info =
755 &dev_info->rx_queue[rx_queue_id];
756 struct rte_eth_event_enqueue_buffer *buf =
757 &rx_adapter->event_enqueue_buffer;
758 struct rte_event *ev = &buf->events[buf->count];
759 uint64_t event = eth_rx_queue_info->event;
760 uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
761 struct rte_mbuf *m = mbufs[0];
766 struct rte_mbuf *cb_mbufs[BATCH_SIZE];
769 /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
770 rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
771 do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
773 if ((m->ol_flags & PKT_RX_TIMESTAMP) == 0) {
774 ts = rte_get_tsc_cycles();
775 for (i = 0; i < num; i++) {
779 m->ol_flags |= PKT_RX_TIMESTAMP;
784 nb_cb = dev_info->cb_fn ? dev_info->cb_fn(eth_dev_id, rx_queue_id,
785 ETH_EVENT_BUFFER_SIZE,
796 for (i = 0; i < num; i++) {
800 rxa_do_softrss(m, rx_adapter->rss_key_be) :
803 ev->flow_id = (rss & ~flow_id_mask) |
804 (ev->flow_id & flow_id_mask);
812 /* Enqueue packets from <port, q> to event buffer */
813 static inline uint32_t
814 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
821 struct rte_mbuf *mbufs[BATCH_SIZE];
822 struct rte_eth_event_enqueue_buffer *buf =
823 &rx_adapter->event_enqueue_buffer;
824 struct rte_event_eth_rx_adapter_stats *stats =
831 /* Don't do a batch dequeue from the rx queue if there isn't
832 * enough space in the enqueue buffer.
834 while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
835 if (buf->count >= BATCH_SIZE)
836 rxa_flush_event_buffer(rx_adapter);
838 stats->rx_poll_count++;
839 n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
845 rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
847 if (rx_count + nb_rx > max_rx)
852 rxa_flush_event_buffer(rx_adapter);
858 rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
865 struct eth_device_info *dev_info;
866 struct eth_rx_queue_info *queue_info;
873 dev_info = &rx_adapter->eth_devices[port_id];
874 queue_info = &dev_info->rx_queue[queue];
875 rte_spinlock_lock(&rx_adapter->intr_ring_lock);
876 if (rxa_shared_intr(dev_info, queue))
877 intr_enabled = &dev_info->shared_intr_enabled;
879 intr_enabled = &queue_info->intr_enabled;
883 err = rte_ring_enqueue(rx_adapter->intr_ring, data);
884 /* Entry should always be available.
885 * The ring size equals the maximum number of interrupt
886 * vectors supported (an interrupt vector is shared in
887 * case of shared interrupts)
890 RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
891 " to ring: %s", strerror(-err));
893 rte_eth_dev_rx_intr_disable(port_id, queue);
895 rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
899 rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
900 uint32_t num_intr_vec)
902 if (rx_adapter->num_intr_vec + num_intr_vec >
903 RTE_EVENT_ETH_INTR_RING_SIZE) {
904 RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
905 " %d needed %d limit %d", rx_adapter->num_intr_vec,
906 num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
913 /* Delete entries for (dev, queue) from the interrupt ring */
915 rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
916 struct eth_device_info *dev_info,
917 uint16_t rx_queue_id)
922 rte_spinlock_lock(&rx_adapter->intr_ring_lock);
924 n = rte_ring_count(rx_adapter->intr_ring);
925 for (i = 0; i < n; i++) {
926 rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
927 if (!rxa_shared_intr(dev_info, rx_queue_id)) {
928 if (qd.port == dev_info->dev->data->port_id &&
929 qd.queue == rx_queue_id)
932 if (qd.port == dev_info->dev->data->port_id)
935 rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
938 rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
941 /* pthread callback handling interrupt mode receive queues
942 * After receiving an Rx interrupt, it enqueues the port id and queue id of the
943 * interrupting queue to the adapter's ring buffer for interrupt events.
944 * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
945 * the adapter service function.
948 rxa_intr_thread(void *arg)
950 struct rte_event_eth_rx_adapter *rx_adapter = arg;
951 struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
955 n = rte_epoll_wait(rx_adapter->epd, epoll_events,
956 RTE_EVENT_ETH_INTR_RING_SIZE, -1);
958 RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
960 for (i = 0; i < n; i++) {
961 rxa_intr_ring_enqueue(rx_adapter,
962 epoll_events[i].epdata.data);
969 /* Dequeue <port, q> from interrupt ring and enqueue received
972 static inline uint32_t
973 rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
978 struct rte_eth_event_enqueue_buffer *buf;
979 rte_spinlock_t *ring_lock;
980 uint8_t max_done = 0;
982 if (rx_adapter->num_rx_intr == 0)
985 if (rte_ring_count(rx_adapter->intr_ring) == 0
986 && !rx_adapter->qd_valid)
989 buf = &rx_adapter->event_enqueue_buffer;
990 ring_lock = &rx_adapter->intr_ring_lock;
992 if (buf->count >= BATCH_SIZE)
993 rxa_flush_event_buffer(rx_adapter);
995 while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
996 struct eth_device_info *dev_info;
999 union queue_data qd = rx_adapter->qd;
1002 if (!rx_adapter->qd_valid) {
1003 struct eth_rx_queue_info *queue_info;
1005 rte_spinlock_lock(ring_lock);
1006 err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1008 rte_spinlock_unlock(ring_lock);
1014 rx_adapter->qd = qd;
1015 rx_adapter->qd_valid = 1;
1016 dev_info = &rx_adapter->eth_devices[port];
1017 if (rxa_shared_intr(dev_info, queue))
1018 dev_info->shared_intr_enabled = 1;
1020 queue_info = &dev_info->rx_queue[queue];
1021 queue_info->intr_enabled = 1;
1023 rte_eth_dev_rx_intr_enable(port, queue);
1024 rte_spinlock_unlock(ring_lock);
1029 dev_info = &rx_adapter->eth_devices[port];
1032 if (rxa_shared_intr(dev_info, queue)) {
1036 nb_queues = dev_info->dev->data->nb_rx_queues;
1038 for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1039 uint8_t enq_buffer_full;
1041 if (!rxa_intr_queue(dev_info, i))
1043 n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1044 rx_adapter->max_nb_rx,
1048 enq_buffer_full = !rxq_empty && n == 0;
1049 max_done = nb_rx > rx_adapter->max_nb_rx;
1051 if (enq_buffer_full || max_done) {
1052 dev_info->next_q_idx = i;
1057 rx_adapter->qd_valid = 0;
1059 /* Reinitialize for next interrupt */
1060 dev_info->next_q_idx = dev_info->multi_intr_cap ?
1061 RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1064 n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1065 rx_adapter->max_nb_rx,
1067 rx_adapter->qd_valid = !rxq_empty;
1069 if (nb_rx > rx_adapter->max_nb_rx)
1075 rx_adapter->stats.rx_intr_packets += nb_rx;
1080 * Polls receive queues added to the event adapter and enqueues received
1081 * packets to the event device.
1083 * The receive code enqueues initially to a temporary buffer, the
1084 * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1086 * If there isn't space available in the temporary buffer, packets from the
1087 * Rx queue aren't dequeued from the eth device, this back pressures the
1088 * eth device, in virtual device environments this back pressure is relayed to
1089 * the hypervisor's switching layer where adjustments can be made to deal with
1092 static inline uint32_t
1093 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
1097 struct rte_eth_event_enqueue_buffer *buf;
1101 wrr_pos = rx_adapter->wrr_pos;
1102 max_nb_rx = rx_adapter->max_nb_rx;
1103 buf = &rx_adapter->event_enqueue_buffer;
1105 /* Iterate through a WRR sequence */
1106 for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1107 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1108 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1109 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1111 /* Don't do a batch dequeue from the rx queue if there isn't
1112 * enough space in the enqueue buffer.
1114 if (buf->count >= BATCH_SIZE)
1115 rxa_flush_event_buffer(rx_adapter);
1116 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
1117 rx_adapter->wrr_pos = wrr_pos;
1121 nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1123 if (nb_rx > max_nb_rx) {
1124 rx_adapter->wrr_pos =
1125 (wrr_pos + 1) % rx_adapter->wrr_len;
1129 if (++wrr_pos == rx_adapter->wrr_len)
1136 rxa_service_func(void *args)
1138 struct rte_event_eth_rx_adapter *rx_adapter = args;
1139 struct rte_event_eth_rx_adapter_stats *stats;
1141 if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1143 if (!rx_adapter->rxa_started) {
1144 rte_spinlock_unlock(&rx_adapter->rx_lock);
1148 stats = &rx_adapter->stats;
1149 stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
1150 stats->rx_packets += rxa_poll(rx_adapter);
1151 rte_spinlock_unlock(&rx_adapter->rx_lock);
1156 rte_event_eth_rx_adapter_init(void)
1158 const char *name = "rte_event_eth_rx_adapter_array";
1159 const struct rte_memzone *mz;
1162 sz = sizeof(*event_eth_rx_adapter) *
1163 RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1164 sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1166 mz = rte_memzone_lookup(name);
1168 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1169 RTE_CACHE_LINE_SIZE);
1171 RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1177 event_eth_rx_adapter = mz->addr;
1181 static inline struct rte_event_eth_rx_adapter *
1182 rxa_id_to_adapter(uint8_t id)
1184 return event_eth_rx_adapter ?
1185 event_eth_rx_adapter[id] : NULL;
1189 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1190 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1193 struct rte_eventdev *dev;
1194 struct rte_event_dev_config dev_conf;
1197 struct rte_event_port_conf *port_conf = arg;
1198 struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1200 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1201 dev_conf = dev->data->dev_conf;
1203 started = dev->data->dev_started;
1205 rte_event_dev_stop(dev_id);
1206 port_id = dev_conf.nb_event_ports;
1207 dev_conf.nb_event_ports += 1;
1208 ret = rte_event_dev_configure(dev_id, &dev_conf);
1210 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1213 if (rte_event_dev_start(dev_id))
1219 ret = rte_event_port_setup(dev_id, port_id, port_conf);
1221 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1226 conf->event_port_id = port_id;
1227 conf->max_nb_rx = 128;
1229 ret = rte_event_dev_start(dev_id);
1230 rx_adapter->default_cb_arg = 1;
1235 rxa_epoll_create1(void)
1239 fd = epoll_create1(EPOLL_CLOEXEC);
1240 return fd < 0 ? -errno : fd;
1247 rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
1249 if (rx_adapter->epd != INIT_FD)
1252 rx_adapter->epd = rxa_epoll_create1();
1253 if (rx_adapter->epd < 0) {
1254 int err = rx_adapter->epd;
1255 rx_adapter->epd = INIT_FD;
1256 RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1264 rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1267 char thread_name[RTE_MAX_THREAD_NAME_LEN];
1269 if (rx_adapter->intr_ring)
1272 rx_adapter->intr_ring = rte_ring_create("intr_ring",
1273 RTE_EVENT_ETH_INTR_RING_SIZE,
1274 rte_socket_id(), 0);
1275 if (!rx_adapter->intr_ring)
1278 rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1279 RTE_EVENT_ETH_INTR_RING_SIZE *
1280 sizeof(struct rte_epoll_event),
1281 RTE_CACHE_LINE_SIZE,
1282 rx_adapter->socket_id);
1283 if (!rx_adapter->epoll_events) {
1288 rte_spinlock_init(&rx_adapter->intr_ring_lock);
1290 snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1291 "rx-intr-thread-%d", rx_adapter->id);
1293 err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1294 NULL, rxa_intr_thread, rx_adapter);
1296 rte_thread_setname(rx_adapter->rx_intr_thread, thread_name);
1300 RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1302 rte_ring_free(rx_adapter->intr_ring);
1303 rx_adapter->intr_ring = NULL;
1304 rx_adapter->epoll_events = NULL;
1309 rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1313 err = pthread_cancel(rx_adapter->rx_intr_thread);
1315 RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1318 err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1320 RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1322 rte_free(rx_adapter->epoll_events);
1323 rte_ring_free(rx_adapter->intr_ring);
1324 rx_adapter->intr_ring = NULL;
1325 rx_adapter->epoll_events = NULL;
1330 rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
1334 if (rx_adapter->num_rx_intr == 0)
1337 ret = rxa_destroy_intr_thread(rx_adapter);
1341 close(rx_adapter->epd);
1342 rx_adapter->epd = INIT_FD;
1348 rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1349 struct eth_device_info *dev_info,
1350 uint16_t rx_queue_id)
1353 uint16_t eth_dev_id = dev_info->dev->data->port_id;
1354 int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1356 err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1358 RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1363 err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1368 RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1371 dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1373 dev_info->shared_intr_enabled = 0;
1378 rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1379 struct eth_device_info *dev_info,
1386 if (dev_info->nb_rx_intr == 0)
1390 if (rx_queue_id == -1) {
1391 s = dev_info->nb_shared_intr;
1392 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1396 q = dev_info->intr_queue[i];
1397 sintr = rxa_shared_intr(dev_info, q);
1400 if (!sintr || s == 0) {
1402 err = rxa_disable_intr(rx_adapter, dev_info,
1406 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1411 if (!rxa_intr_queue(dev_info, rx_queue_id))
1413 if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1414 dev_info->nb_shared_intr == 1) {
1415 err = rxa_disable_intr(rx_adapter, dev_info,
1419 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1423 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1424 if (dev_info->intr_queue[i] == rx_queue_id) {
1425 for (; i < dev_info->nb_rx_intr - 1; i++)
1426 dev_info->intr_queue[i] =
1427 dev_info->intr_queue[i + 1];
1437 rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1438 struct eth_device_info *dev_info,
1439 uint16_t rx_queue_id)
1442 uint16_t eth_dev_id = dev_info->dev->data->port_id;
1443 union queue_data qd;
1445 uint16_t *intr_queue;
1446 int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1448 if (rxa_intr_queue(dev_info, rx_queue_id))
1451 intr_queue = dev_info->intr_queue;
1452 if (dev_info->intr_queue == NULL) {
1454 dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1455 dev_info->intr_queue =
1457 rx_adapter->mem_name,
1460 rx_adapter->socket_id);
1461 if (dev_info->intr_queue == NULL)
1465 init_fd = rx_adapter->epd;
1466 err = rxa_init_epd(rx_adapter);
1468 goto err_free_queue;
1470 qd.port = eth_dev_id;
1471 qd.queue = rx_queue_id;
1473 err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1478 RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1479 " Rx Queue %u err %d", rx_queue_id, err);
1483 err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1485 RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1486 " Rx Queue %u err %d", rx_queue_id, err);
1491 err = rxa_create_intr_thread(rx_adapter);
1494 dev_info->shared_intr_enabled = 1;
1496 dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1501 err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1503 RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1504 " Rx Queue %u err %d", rx_queue_id, err);
1506 err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1511 RTE_EDEV_LOG_ERR("Could not delete event for"
1512 " Rx Queue %u err %d", rx_queue_id, err1);
1515 if (init_fd == INIT_FD) {
1516 close(rx_adapter->epd);
1517 rx_adapter->epd = -1;
1520 if (intr_queue == NULL)
1521 rte_free(dev_info->intr_queue);
1527 rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1528 struct eth_device_info *dev_info,
1534 int shared_done = (dev_info->nb_shared_intr > 0);
1536 if (rx_queue_id != -1) {
1537 if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1539 return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1543 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1545 if (rxa_shared_intr(dev_info, i) && shared_done)
1548 err = rxa_config_intr(rx_adapter, dev_info, i);
1550 shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1553 dev_info->shared_intr_enabled = 1;
1562 shared_done = (dev_info->nb_shared_intr > 0);
1563 for (j = 0; j < i; j++) {
1564 if (rxa_intr_queue(dev_info, j))
1566 if (rxa_shared_intr(dev_info, j) && si != j)
1568 err = rxa_disable_intr(rx_adapter, dev_info, j);
1579 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
1582 struct rte_service_spec service;
1583 struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1585 if (rx_adapter->service_inited)
1588 memset(&service, 0, sizeof(service));
1589 snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1590 "rte_event_eth_rx_adapter_%d", id);
1591 service.socket_id = rx_adapter->socket_id;
1592 service.callback = rxa_service_func;
1593 service.callback_userdata = rx_adapter;
1594 /* Service function handles locking for queue add/del updates */
1595 service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1596 ret = rte_service_component_register(&service, &rx_adapter->service_id);
1598 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1603 ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1604 &rx_adapter_conf, rx_adapter->conf_arg);
1606 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1610 rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1611 rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1612 rx_adapter->service_inited = 1;
1613 rx_adapter->epd = INIT_FD;
1617 rte_service_component_unregister(rx_adapter->service_id);
1622 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1623 struct eth_device_info *dev_info,
1624 int32_t rx_queue_id,
1627 struct eth_rx_queue_info *queue_info;
1631 if (dev_info->rx_queue == NULL)
1634 if (rx_queue_id == -1) {
1635 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1636 rxa_update_queue(rx_adapter, dev_info, i, add);
1638 queue_info = &dev_info->rx_queue[rx_queue_id];
1639 enabled = queue_info->queue_enabled;
1641 rx_adapter->nb_queues += !enabled;
1642 dev_info->nb_dev_queues += !enabled;
1644 rx_adapter->nb_queues -= enabled;
1645 dev_info->nb_dev_queues -= enabled;
1647 queue_info->queue_enabled = !!add;
1652 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
1653 struct eth_device_info *dev_info,
1654 int32_t rx_queue_id)
1661 if (rx_adapter->nb_queues == 0)
1664 if (rx_queue_id == -1) {
1665 uint16_t nb_rx_queues;
1668 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1669 for (i = 0; i < nb_rx_queues; i++)
1670 rxa_sw_del(rx_adapter, dev_info, i);
1674 pollq = rxa_polled_queue(dev_info, rx_queue_id);
1675 intrq = rxa_intr_queue(dev_info, rx_queue_id);
1676 sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1677 rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1678 rx_adapter->num_rx_polled -= pollq;
1679 dev_info->nb_rx_poll -= pollq;
1680 rx_adapter->num_rx_intr -= intrq;
1681 dev_info->nb_rx_intr -= intrq;
1682 dev_info->nb_shared_intr -= intrq && sintrq;
1686 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1687 struct eth_device_info *dev_info,
1688 int32_t rx_queue_id,
1689 const struct rte_event_eth_rx_adapter_queue_conf *conf)
1691 struct eth_rx_queue_info *queue_info;
1692 const struct rte_event *ev = &conf->ev;
1696 struct rte_event *qi_ev;
1698 if (rx_queue_id == -1) {
1699 uint16_t nb_rx_queues;
1702 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1703 for (i = 0; i < nb_rx_queues; i++)
1704 rxa_add_queue(rx_adapter, dev_info, i, conf);
1708 pollq = rxa_polled_queue(dev_info, rx_queue_id);
1709 intrq = rxa_intr_queue(dev_info, rx_queue_id);
1710 sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1712 queue_info = &dev_info->rx_queue[rx_queue_id];
1713 queue_info->wt = conf->servicing_weight;
1715 qi_ev = (struct rte_event *)&queue_info->event;
1716 qi_ev->event = ev->event;
1717 qi_ev->op = RTE_EVENT_OP_NEW;
1718 qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
1719 qi_ev->sub_event_type = 0;
1721 if (conf->rx_queue_flags &
1722 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
1723 queue_info->flow_id_mask = ~0;
1727 rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
1728 if (rxa_polled_queue(dev_info, rx_queue_id)) {
1729 rx_adapter->num_rx_polled += !pollq;
1730 dev_info->nb_rx_poll += !pollq;
1731 rx_adapter->num_rx_intr -= intrq;
1732 dev_info->nb_rx_intr -= intrq;
1733 dev_info->nb_shared_intr -= intrq && sintrq;
1736 if (rxa_intr_queue(dev_info, rx_queue_id)) {
1737 rx_adapter->num_rx_polled -= pollq;
1738 dev_info->nb_rx_poll -= pollq;
1739 rx_adapter->num_rx_intr += !intrq;
1740 dev_info->nb_rx_intr += !intrq;
1741 dev_info->nb_shared_intr += !intrq && sintrq;
1742 if (dev_info->nb_shared_intr == 1) {
1743 if (dev_info->multi_intr_cap)
1744 dev_info->next_q_idx =
1745 RTE_MAX_RXTX_INTR_VEC_ID - 1;
1747 dev_info->next_q_idx = 0;
1752 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
1753 uint16_t eth_dev_id,
1755 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
1757 struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
1758 struct rte_event_eth_rx_adapter_queue_conf temp_conf;
1760 struct eth_rx_poll_entry *rx_poll;
1761 struct eth_rx_queue_info *rx_queue;
1763 uint16_t nb_rx_queues;
1764 uint32_t nb_rx_poll, nb_wrr;
1765 uint32_t nb_rx_intr;
1769 if (queue_conf->servicing_weight == 0) {
1770 struct rte_eth_dev_data *data = dev_info->dev->data;
1772 temp_conf = *queue_conf;
1773 if (!data->dev_conf.intr_conf.rxq) {
1774 /* If Rx interrupts are disabled set wt = 1 */
1775 temp_conf.servicing_weight = 1;
1777 queue_conf = &temp_conf;
1780 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1781 rx_queue = dev_info->rx_queue;
1782 wt = queue_conf->servicing_weight;
1784 if (dev_info->rx_queue == NULL) {
1785 dev_info->rx_queue =
1786 rte_zmalloc_socket(rx_adapter->mem_name,
1788 sizeof(struct eth_rx_queue_info), 0,
1789 rx_adapter->socket_id);
1790 if (dev_info->rx_queue == NULL)
1796 rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
1797 queue_conf->servicing_weight,
1798 &nb_rx_poll, &nb_rx_intr, &nb_wrr);
1800 if (dev_info->dev->intr_handle)
1801 dev_info->multi_intr_cap =
1802 rte_intr_cap_multiple(dev_info->dev->intr_handle);
1804 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
1807 goto err_free_rxqueue;
1810 num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
1812 ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
1814 goto err_free_rxqueue;
1816 ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
1818 goto err_free_rxqueue;
1822 if (rx_adapter->num_rx_intr > nb_rx_intr) {
1823 num_intr_vec = rxa_nb_intr_vect(dev_info,
1825 /* interrupt based queues are being converted to
1826 * poll mode queues, delete the interrupt configuration
1829 ret = rxa_del_intr_queue(rx_adapter,
1830 dev_info, rx_queue_id);
1832 goto err_free_rxqueue;
1836 if (nb_rx_intr == 0) {
1837 ret = rxa_free_intr_resources(rx_adapter);
1839 goto err_free_rxqueue;
1845 if (rx_queue_id == -1) {
1846 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1847 dev_info->intr_queue[i] = i;
1849 if (!rxa_intr_queue(dev_info, rx_queue_id))
1850 dev_info->intr_queue[nb_rx_intr - 1] =
1857 rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
1858 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
1860 rte_free(rx_adapter->eth_rx_poll);
1861 rte_free(rx_adapter->wrr_sched);
1863 rx_adapter->eth_rx_poll = rx_poll;
1864 rx_adapter->wrr_sched = rx_wrr;
1865 rx_adapter->wrr_len = nb_wrr;
1866 rx_adapter->num_intr_vec += num_intr_vec;
1870 if (rx_queue == NULL) {
1871 rte_free(dev_info->rx_queue);
1872 dev_info->rx_queue = NULL;
1882 rxa_ctrl(uint8_t id, int start)
1884 struct rte_event_eth_rx_adapter *rx_adapter;
1885 struct rte_eventdev *dev;
1886 struct eth_device_info *dev_info;
1888 int use_service = 0;
1891 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1892 rx_adapter = rxa_id_to_adapter(id);
1893 if (rx_adapter == NULL)
1896 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1898 RTE_ETH_FOREACH_DEV(i) {
1899 dev_info = &rx_adapter->eth_devices[i];
1900 /* if start check for num dev queues */
1901 if (start && !dev_info->nb_dev_queues)
1903 /* if stop check if dev has been started */
1904 if (stop && !dev_info->dev_rx_started)
1906 use_service |= !dev_info->internal_event_port;
1907 dev_info->dev_rx_started = start;
1908 if (dev_info->internal_event_port == 0)
1910 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
1911 &rte_eth_devices[i]) :
1912 (*dev->dev_ops->eth_rx_adapter_stop)(dev,
1913 &rte_eth_devices[i]);
1917 rte_spinlock_lock(&rx_adapter->rx_lock);
1918 rx_adapter->rxa_started = start;
1919 rte_service_runstate_set(rx_adapter->service_id, start);
1920 rte_spinlock_unlock(&rx_adapter->rx_lock);
1927 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
1928 rte_event_eth_rx_adapter_conf_cb conf_cb,
1931 struct rte_event_eth_rx_adapter *rx_adapter;
1935 char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
1936 const uint8_t default_rss_key[] = {
1937 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
1938 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
1939 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
1940 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
1941 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
1944 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1945 RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1946 if (conf_cb == NULL)
1949 if (event_eth_rx_adapter == NULL) {
1950 ret = rte_event_eth_rx_adapter_init();
1955 rx_adapter = rxa_id_to_adapter(id);
1956 if (rx_adapter != NULL) {
1957 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
1961 socket_id = rte_event_dev_socket_id(dev_id);
1962 snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
1963 "rte_event_eth_rx_adapter_%d",
1966 rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
1967 RTE_CACHE_LINE_SIZE, socket_id);
1968 if (rx_adapter == NULL) {
1969 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
1973 rx_adapter->eventdev_id = dev_id;
1974 rx_adapter->socket_id = socket_id;
1975 rx_adapter->conf_cb = conf_cb;
1976 rx_adapter->conf_arg = conf_arg;
1977 rx_adapter->id = id;
1978 strcpy(rx_adapter->mem_name, mem_name);
1979 rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
1981 sizeof(struct eth_device_info), 0,
1983 rte_convert_rss_key((const uint32_t *)default_rss_key,
1984 (uint32_t *)rx_adapter->rss_key_be,
1985 RTE_DIM(default_rss_key));
1987 if (rx_adapter->eth_devices == NULL) {
1988 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
1989 rte_free(rx_adapter);
1992 rte_spinlock_init(&rx_adapter->rx_lock);
1993 for (i = 0; i < RTE_MAX_ETHPORTS; i++)
1994 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
1996 event_eth_rx_adapter[id] = rx_adapter;
1997 if (conf_cb == rxa_default_conf_cb)
1998 rx_adapter->default_cb_arg = 1;
2003 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2004 struct rte_event_port_conf *port_config)
2006 struct rte_event_port_conf *pc;
2009 if (port_config == NULL)
2011 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2013 pc = rte_malloc(NULL, sizeof(*pc), 0);
2017 ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2018 rxa_default_conf_cb,
2026 rte_event_eth_rx_adapter_free(uint8_t id)
2028 struct rte_event_eth_rx_adapter *rx_adapter;
2030 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2032 rx_adapter = rxa_id_to_adapter(id);
2033 if (rx_adapter == NULL)
2036 if (rx_adapter->nb_queues) {
2037 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2038 rx_adapter->nb_queues);
2042 if (rx_adapter->default_cb_arg)
2043 rte_free(rx_adapter->conf_arg);
2044 rte_free(rx_adapter->eth_devices);
2045 rte_free(rx_adapter);
2046 event_eth_rx_adapter[id] = NULL;
2052 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2053 uint16_t eth_dev_id,
2054 int32_t rx_queue_id,
2055 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2059 struct rte_event_eth_rx_adapter *rx_adapter;
2060 struct rte_eventdev *dev;
2061 struct eth_device_info *dev_info;
2063 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2064 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2066 rx_adapter = rxa_id_to_adapter(id);
2067 if ((rx_adapter == NULL) || (queue_conf == NULL))
2070 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2071 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2075 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2076 "eth port %" PRIu16, id, eth_dev_id);
2080 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2081 && (queue_conf->rx_queue_flags &
2082 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2083 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2084 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2089 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2090 (rx_queue_id != -1)) {
2091 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2092 "event queue, eth port: %" PRIu16 " adapter id: %"
2093 PRIu8, eth_dev_id, id);
2097 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2098 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2099 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2100 (uint16_t)rx_queue_id);
2104 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2106 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2107 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2109 if (dev_info->rx_queue == NULL) {
2110 dev_info->rx_queue =
2111 rte_zmalloc_socket(rx_adapter->mem_name,
2112 dev_info->dev->data->nb_rx_queues *
2113 sizeof(struct eth_rx_queue_info), 0,
2114 rx_adapter->socket_id);
2115 if (dev_info->rx_queue == NULL)
2119 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2120 &rte_eth_devices[eth_dev_id],
2121 rx_queue_id, queue_conf);
2123 dev_info->internal_event_port = 1;
2124 rxa_update_queue(rx_adapter,
2125 &rx_adapter->eth_devices[eth_dev_id],
2130 rte_spinlock_lock(&rx_adapter->rx_lock);
2131 dev_info->internal_event_port = 0;
2132 ret = rxa_init_service(rx_adapter, id);
2134 uint32_t service_id = rx_adapter->service_id;
2135 ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2137 rte_service_component_runstate_set(service_id,
2138 rxa_sw_adapter_queue_count(rx_adapter));
2140 rte_spinlock_unlock(&rx_adapter->rx_lock);
2150 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2151 int32_t rx_queue_id)
2154 struct rte_eventdev *dev;
2155 struct rte_event_eth_rx_adapter *rx_adapter;
2156 struct eth_device_info *dev_info;
2158 uint32_t nb_rx_poll = 0;
2159 uint32_t nb_wrr = 0;
2160 uint32_t nb_rx_intr;
2161 struct eth_rx_poll_entry *rx_poll = NULL;
2162 uint32_t *rx_wrr = NULL;
2165 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2166 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2168 rx_adapter = rxa_id_to_adapter(id);
2169 if (rx_adapter == NULL)
2172 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2173 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2179 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2180 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2181 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2182 (uint16_t)rx_queue_id);
2186 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2188 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2189 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2191 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2192 &rte_eth_devices[eth_dev_id],
2195 rxa_update_queue(rx_adapter,
2196 &rx_adapter->eth_devices[eth_dev_id],
2199 if (dev_info->nb_dev_queues == 0) {
2200 rte_free(dev_info->rx_queue);
2201 dev_info->rx_queue = NULL;
2205 rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2206 &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2208 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2213 rte_spinlock_lock(&rx_adapter->rx_lock);
2216 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2218 num_intr_vec = rxa_nb_intr_vect(dev_info,
2220 ret = rxa_del_intr_queue(rx_adapter, dev_info,
2226 if (nb_rx_intr == 0) {
2227 ret = rxa_free_intr_resources(rx_adapter);
2232 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2233 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2235 rte_free(rx_adapter->eth_rx_poll);
2236 rte_free(rx_adapter->wrr_sched);
2238 if (nb_rx_intr == 0) {
2239 rte_free(dev_info->intr_queue);
2240 dev_info->intr_queue = NULL;
2243 rx_adapter->eth_rx_poll = rx_poll;
2244 rx_adapter->wrr_sched = rx_wrr;
2245 rx_adapter->wrr_len = nb_wrr;
2246 rx_adapter->num_intr_vec += num_intr_vec;
2248 if (dev_info->nb_dev_queues == 0) {
2249 rte_free(dev_info->rx_queue);
2250 dev_info->rx_queue = NULL;
2253 rte_spinlock_unlock(&rx_adapter->rx_lock);
2260 rte_service_component_runstate_set(rx_adapter->service_id,
2261 rxa_sw_adapter_queue_count(rx_adapter));
2268 rte_event_eth_rx_adapter_start(uint8_t id)
2270 return rxa_ctrl(id, 1);
2274 rte_event_eth_rx_adapter_stop(uint8_t id)
2276 return rxa_ctrl(id, 0);
2280 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2281 struct rte_event_eth_rx_adapter_stats *stats)
2283 struct rte_event_eth_rx_adapter *rx_adapter;
2284 struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2285 struct rte_event_eth_rx_adapter_stats dev_stats;
2286 struct rte_eventdev *dev;
2287 struct eth_device_info *dev_info;
2291 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2293 rx_adapter = rxa_id_to_adapter(id);
2294 if (rx_adapter == NULL || stats == NULL)
2297 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2298 memset(stats, 0, sizeof(*stats));
2299 RTE_ETH_FOREACH_DEV(i) {
2300 dev_info = &rx_adapter->eth_devices[i];
2301 if (dev_info->internal_event_port == 0 ||
2302 dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2304 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2305 &rte_eth_devices[i],
2309 dev_stats_sum.rx_packets += dev_stats.rx_packets;
2310 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2313 if (rx_adapter->service_inited)
2314 *stats = rx_adapter->stats;
2316 stats->rx_packets += dev_stats_sum.rx_packets;
2317 stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2322 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
2324 struct rte_event_eth_rx_adapter *rx_adapter;
2325 struct rte_eventdev *dev;
2326 struct eth_device_info *dev_info;
2329 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2331 rx_adapter = rxa_id_to_adapter(id);
2332 if (rx_adapter == NULL)
2335 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2336 RTE_ETH_FOREACH_DEV(i) {
2337 dev_info = &rx_adapter->eth_devices[i];
2338 if (dev_info->internal_event_port == 0 ||
2339 dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
2341 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
2342 &rte_eth_devices[i]);
2345 memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
2350 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
2352 struct rte_event_eth_rx_adapter *rx_adapter;
2354 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2356 rx_adapter = rxa_id_to_adapter(id);
2357 if (rx_adapter == NULL || service_id == NULL)
2360 if (rx_adapter->service_inited)
2361 *service_id = rx_adapter->service_id;
2363 return rx_adapter->service_inited ? 0 : -ESRCH;
2367 rte_event_eth_rx_adapter_cb_register(uint8_t id,
2368 uint16_t eth_dev_id,
2369 rte_event_eth_rx_adapter_cb_fn cb_fn,
2372 struct rte_event_eth_rx_adapter *rx_adapter;
2373 struct eth_device_info *dev_info;
2377 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2378 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2380 rx_adapter = rxa_id_to_adapter(id);
2381 if (rx_adapter == NULL)
2384 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2385 if (dev_info->rx_queue == NULL)
2388 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2392 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2393 "eth port %" PRIu16, id, eth_dev_id);
2397 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2398 RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
2399 PRIu16, eth_dev_id);
2403 rte_spinlock_lock(&rx_adapter->rx_lock);
2404 dev_info->cb_fn = cb_fn;
2405 dev_info->cb_arg = cb_arg;
2406 rte_spinlock_unlock(&rx_adapter->rx_lock);