1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2017 Intel Corporation.
10 #include <rte_cycles.h>
11 #include <rte_common.h>
13 #include <rte_errno.h>
14 #include <ethdev_driver.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
20 #include <rte_mbuf_dyn.h>
22 #include "rte_eventdev.h"
23 #include "eventdev_pmd.h"
24 #include "rte_eventdev_trace.h"
25 #include "rte_event_eth_rx_adapter.h"
28 #define BLOCK_CNT_THRESHOLD 10
29 #define ETH_EVENT_BUFFER_SIZE (6*BATCH_SIZE)
30 #define MAX_VECTOR_SIZE 1024
31 #define MIN_VECTOR_SIZE 4
32 #define MAX_VECTOR_NS 1E9
33 #define MIN_VECTOR_NS 1E5
35 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
36 #define ETH_RX_ADAPTER_MEM_NAME_LEN 32
38 #define RSS_KEY_SIZE 40
39 /* value written to intr thread pipe to signal thread exit */
40 #define ETH_BRIDGE_INTR_THREAD_EXIT 1
41 /* Sentinel value to detect initialized file handle */
44 #define RXA_ADAPTER_ARRAY "rte_event_eth_rx_adapter_array"
47 * Used to store port and queue ID of interrupting Rx queue
59 * There is an instance of this struct per polled Rx queue added to the
62 struct eth_rx_poll_entry {
63 /* Eth port to poll */
65 /* Eth rx queue to poll */
69 struct eth_rx_vector_data {
70 TAILQ_ENTRY(eth_rx_vector_data) next;
73 uint16_t max_vector_count;
76 uint64_t vector_timeout_ticks;
77 struct rte_mempool *vector_pool;
78 struct rte_event_vector *vector_ev;
79 } __rte_cache_aligned;
81 TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
83 /* Instance per adapter */
84 struct rte_eth_event_enqueue_buffer {
85 /* Count of events in this buffer */
87 /* Array of events in this buffer */
88 struct rte_event *events;
89 /* size of event buffer */
91 /* Event enqueue happens from head */
93 /* New packets from rte_eth_rx_burst is enqued from tail */
95 /* last element in the buffer before rollover */
100 struct rte_event_eth_rx_adapter {
102 uint8_t rss_key_be[RSS_KEY_SIZE];
103 /* Event device identifier */
105 /* Per ethernet device structure */
106 struct eth_device_info *eth_devices;
107 /* Event port identifier */
108 uint8_t event_port_id;
109 /* Lock to serialize config updates with service function */
110 rte_spinlock_t rx_lock;
111 /* Max mbufs processed in any service function invocation */
113 /* Receive queues that need to be polled */
114 struct eth_rx_poll_entry *eth_rx_poll;
115 /* Size of the eth_rx_poll array */
116 uint16_t num_rx_polled;
117 /* Weighted round robin schedule */
119 /* wrr_sched[] size */
121 /* Next entry in wrr[] to begin polling */
123 /* Event burst buffer */
124 struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
125 /* Vector enable flag */
127 /* Timestamp of previous vector expiry list traversal */
128 uint64_t prev_expiry_ts;
129 /* Minimum ticks to wait before traversing expiry list */
130 uint64_t vector_tmo_ticks;
132 struct eth_rx_vector_data_list vector_list;
133 /* Per adapter stats */
134 struct rte_event_eth_rx_adapter_stats stats;
135 /* Block count, counts up to BLOCK_CNT_THRESHOLD */
136 uint16_t enq_block_count;
138 uint64_t rx_enq_block_start_ts;
139 /* epoll fd used to wait for Rx interrupts */
141 /* Num of interrupt driven interrupt queues */
142 uint32_t num_rx_intr;
143 /* Used to send <dev id, queue id> of interrupting Rx queues from
144 * the interrupt thread to the Rx thread
146 struct rte_ring *intr_ring;
147 /* Rx Queue data (dev id, queue id) for the last non-empty
151 /* queue_data is valid */
153 /* Interrupt ring lock, synchronizes Rx thread
154 * and interrupt thread
156 rte_spinlock_t intr_ring_lock;
157 /* event array passed to rte_poll_wait */
158 struct rte_epoll_event *epoll_events;
159 /* Count of interrupt vectors in use */
160 uint32_t num_intr_vec;
161 /* Thread blocked on Rx interrupts */
162 pthread_t rx_intr_thread;
163 /* Configuration callback for rte_service configuration */
164 rte_event_eth_rx_adapter_conf_cb conf_cb;
165 /* Configuration callback argument */
167 /* Set if default_cb is being used */
169 /* Service initialization state */
170 uint8_t service_inited;
171 /* Total count of Rx queues in adapter */
173 /* Memory allocation name */
174 char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
175 /* Socket identifier cached from eventdev */
177 /* Per adapter EAL service */
179 /* Adapter started flag */
183 } __rte_cache_aligned;
186 struct eth_device_info {
187 struct rte_eth_dev *dev;
188 struct eth_rx_queue_info *rx_queue;
190 rte_event_eth_rx_adapter_cb_fn cb_fn;
191 /* Rx callback argument */
193 /* Set if ethdev->eventdev packet transfer uses a
196 uint8_t internal_event_port;
197 /* Set if the adapter is processing rx queues for
198 * this eth device and packet processing has been
199 * started, allows for the code to know if the PMD
200 * rx_adapter_stop callback needs to be invoked
202 uint8_t dev_rx_started;
203 /* Number of queues added for this device */
204 uint16_t nb_dev_queues;
205 /* Number of poll based queues
206 * If nb_rx_poll > 0, the start callback will
207 * be invoked if not already invoked
210 /* Number of interrupt based queues
211 * If nb_rx_intr > 0, the start callback will
212 * be invoked if not already invoked.
215 /* Number of queues that use the shared interrupt */
216 uint16_t nb_shared_intr;
217 /* sum(wrr(q)) for all queues within the device
218 * useful when deleting all device queues
221 /* Intr based queue index to start polling from, this is used
222 * if the number of shared interrupts is non-zero
225 /* Intr based queue indices */
226 uint16_t *intr_queue;
227 /* device generates per Rx queue interrupt for queue index
228 * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
231 /* shared interrupt enabled */
232 int shared_intr_enabled;
236 struct eth_rx_queue_info {
237 int queue_enabled; /* True if added */
240 uint16_t wt; /* Polling weight */
241 uint32_t flow_id_mask; /* Set to ~0 if app provides flow id else 0 */
243 struct eth_rx_vector_data vector_data;
246 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
248 /* Enable dynamic timestamp field in mbuf */
249 static uint64_t event_eth_rx_timestamp_dynflag;
250 static int event_eth_rx_timestamp_dynfield_offset = -1;
252 static inline rte_mbuf_timestamp_t *
253 rxa_timestamp_dynfield(struct rte_mbuf *mbuf)
255 return RTE_MBUF_DYNFIELD(mbuf,
256 event_eth_rx_timestamp_dynfield_offset, rte_mbuf_timestamp_t *);
260 rxa_validate_id(uint8_t id)
262 return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
265 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
266 if (!rxa_validate_id(id)) { \
267 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
273 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
275 return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
278 /* Greatest common divisor */
279 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
283 return r ? rxa_gcd_u16(b, r) : b;
286 /* Returns the next queue in the polling sequence
288 * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
291 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
292 unsigned int n, int *cw,
293 struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
294 uint16_t gcd, int prev)
310 q = eth_rx_poll[i].eth_rx_qid;
311 d = eth_rx_poll[i].eth_dev_id;
312 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
320 rxa_shared_intr(struct eth_device_info *dev_info,
325 if (dev_info->dev->intr_handle == NULL)
328 multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
329 return !multi_intr_cap ||
330 rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
334 rxa_intr_queue(struct eth_device_info *dev_info,
337 struct eth_rx_queue_info *queue_info;
339 queue_info = &dev_info->rx_queue[rx_queue_id];
340 return dev_info->rx_queue &&
341 !dev_info->internal_event_port &&
342 queue_info->queue_enabled && queue_info->wt == 0;
346 rxa_polled_queue(struct eth_device_info *dev_info,
349 struct eth_rx_queue_info *queue_info;
351 queue_info = &dev_info->rx_queue[rx_queue_id];
352 return !dev_info->internal_event_port &&
353 dev_info->rx_queue &&
354 queue_info->queue_enabled && queue_info->wt != 0;
357 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
359 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
365 nbq = dev_info->dev->data->nb_rx_queues;
366 n = 0; /* non shared count */
367 s = 0; /* shared count */
369 if (rx_queue_id == -1) {
370 for (i = 0; i < nbq; i++) {
371 if (!rxa_shared_intr(dev_info, i))
372 n += add ? !rxa_intr_queue(dev_info, i) :
373 rxa_intr_queue(dev_info, i);
375 s += add ? !rxa_intr_queue(dev_info, i) :
376 rxa_intr_queue(dev_info, i);
380 if ((add && dev_info->nb_shared_intr == 0) ||
381 (!add && dev_info->nb_shared_intr))
385 if (!rxa_shared_intr(dev_info, rx_queue_id))
386 n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
387 rxa_intr_queue(dev_info, rx_queue_id);
389 n = add ? !dev_info->nb_shared_intr :
390 dev_info->nb_shared_intr == 1;
396 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
399 rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
400 struct eth_device_info *dev_info,
402 uint32_t *nb_rx_intr)
406 if (rx_queue_id == -1)
407 intr_diff = dev_info->nb_rx_intr;
409 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
411 *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
414 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
415 * interrupt queues could currently be poll mode Rx queues
418 rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
419 struct eth_device_info *dev_info,
421 uint32_t *nb_rx_poll,
422 uint32_t *nb_rx_intr,
427 uint32_t wrr_len_diff;
429 if (rx_queue_id == -1) {
430 intr_diff = dev_info->dev->data->nb_rx_queues -
431 dev_info->nb_rx_intr;
432 poll_diff = dev_info->nb_rx_poll;
433 wrr_len_diff = dev_info->wrr_len;
435 intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
436 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
437 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
441 *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
442 *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
443 *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
446 /* Calculate size of the eth_rx_poll and wrr_sched arrays
447 * after deleting poll mode rx queues
450 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
451 struct eth_device_info *dev_info,
453 uint32_t *nb_rx_poll,
457 uint32_t wrr_len_diff;
459 if (rx_queue_id == -1) {
460 poll_diff = dev_info->nb_rx_poll;
461 wrr_len_diff = dev_info->wrr_len;
463 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
464 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
468 *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
469 *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
472 /* Calculate nb_rx_* after adding poll mode rx queues
475 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
476 struct eth_device_info *dev_info,
479 uint32_t *nb_rx_poll,
480 uint32_t *nb_rx_intr,
485 uint32_t wrr_len_diff;
487 if (rx_queue_id == -1) {
488 intr_diff = dev_info->nb_rx_intr;
489 poll_diff = dev_info->dev->data->nb_rx_queues -
490 dev_info->nb_rx_poll;
491 wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
494 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
495 poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
496 wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
497 wt - dev_info->rx_queue[rx_queue_id].wt :
501 *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
502 *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
503 *nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
506 /* Calculate nb_rx_* after adding rx_queue_id */
508 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
509 struct eth_device_info *dev_info,
512 uint32_t *nb_rx_poll,
513 uint32_t *nb_rx_intr,
517 rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
518 wt, nb_rx_poll, nb_rx_intr, nb_wrr);
520 rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
521 nb_rx_poll, nb_rx_intr, nb_wrr);
524 /* Calculate nb_rx_* after deleting rx_queue_id */
526 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
527 struct eth_device_info *dev_info,
529 uint32_t *nb_rx_poll,
530 uint32_t *nb_rx_intr,
533 rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
535 rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
540 * Allocate the rx_poll array
542 static struct eth_rx_poll_entry *
543 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
544 uint32_t num_rx_polled)
548 len = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
549 RTE_CACHE_LINE_SIZE);
550 return rte_zmalloc_socket(rx_adapter->mem_name,
553 rx_adapter->socket_id);
557 * Allocate the WRR array
560 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
564 len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
565 RTE_CACHE_LINE_SIZE);
566 return rte_zmalloc_socket(rx_adapter->mem_name,
569 rx_adapter->socket_id);
573 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
576 struct eth_rx_poll_entry **rx_poll,
577 uint32_t **wrr_sched)
586 *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
587 if (*rx_poll == NULL) {
592 *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
593 if (*wrr_sched == NULL) {
600 /* Precalculate WRR polling sequence for all queues in rx_adapter */
602 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
603 struct eth_rx_poll_entry *rx_poll,
612 /* Initialize variables for calculation of wrr schedule */
613 uint16_t max_wrr_pos = 0;
614 unsigned int poll_q = 0;
621 /* Generate array of all queues to poll, the size of this
624 RTE_ETH_FOREACH_DEV(d) {
625 uint16_t nb_rx_queues;
626 struct eth_device_info *dev_info =
627 &rx_adapter->eth_devices[d];
628 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
629 if (dev_info->rx_queue == NULL)
631 if (dev_info->internal_event_port)
633 dev_info->wrr_len = 0;
634 for (q = 0; q < nb_rx_queues; q++) {
635 struct eth_rx_queue_info *queue_info =
636 &dev_info->rx_queue[q];
639 if (!rxa_polled_queue(dev_info, q))
642 rx_poll[poll_q].eth_dev_id = d;
643 rx_poll[poll_q].eth_rx_qid = q;
645 dev_info->wrr_len += wt;
646 max_wt = RTE_MAX(max_wt, wt);
647 gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
652 /* Generate polling sequence based on weights */
655 for (i = 0; i < max_wrr_pos; i++) {
656 rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
657 rx_poll, max_wt, gcd, prev);
663 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
664 struct rte_ipv6_hdr **ipv6_hdr)
666 struct rte_ether_hdr *eth_hdr =
667 rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
668 struct rte_vlan_hdr *vlan_hdr;
673 switch (eth_hdr->ether_type) {
674 case RTE_BE16(RTE_ETHER_TYPE_IPV4):
675 *ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
678 case RTE_BE16(RTE_ETHER_TYPE_IPV6):
679 *ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
682 case RTE_BE16(RTE_ETHER_TYPE_VLAN):
683 vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
684 switch (vlan_hdr->eth_proto) {
685 case RTE_BE16(RTE_ETHER_TYPE_IPV4):
686 *ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
688 case RTE_BE16(RTE_ETHER_TYPE_IPV6):
689 *ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
701 /* Calculate RSS hash for IPv4/6 */
702 static inline uint32_t
703 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
707 struct rte_ipv4_tuple ipv4_tuple;
708 struct rte_ipv6_tuple ipv6_tuple;
709 struct rte_ipv4_hdr *ipv4_hdr;
710 struct rte_ipv6_hdr *ipv6_hdr;
712 rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
715 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
716 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
718 input_len = RTE_THASH_V4_L3_LEN;
719 } else if (ipv6_hdr) {
720 rte_thash_load_v6_addrs(ipv6_hdr,
721 (union rte_thash_tuple *)&ipv6_tuple);
723 input_len = RTE_THASH_V6_L3_LEN;
727 return rte_softrss_be(tuple, input_len, rss_key_be);
731 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
733 return !!rx_adapter->enq_block_count;
737 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
739 if (rx_adapter->rx_enq_block_start_ts)
742 rx_adapter->enq_block_count++;
743 if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
746 rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
750 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
751 struct rte_event_eth_rx_adapter_stats *stats)
753 if (unlikely(!stats->rx_enq_start_ts))
754 stats->rx_enq_start_ts = rte_get_tsc_cycles();
756 if (likely(!rxa_enq_blocked(rx_adapter)))
759 rx_adapter->enq_block_count = 0;
760 if (rx_adapter->rx_enq_block_start_ts) {
761 stats->rx_enq_end_ts = rte_get_tsc_cycles();
762 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
763 rx_adapter->rx_enq_block_start_ts;
764 rx_adapter->rx_enq_block_start_ts = 0;
768 /* Enqueue buffered events to event device */
769 static inline uint16_t
770 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
772 struct rte_eth_event_enqueue_buffer *buf =
773 &rx_adapter->event_enqueue_buffer;
774 struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
775 uint16_t count = buf->last ? buf->last - buf->head : buf->count;
780 uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
781 rx_adapter->event_port_id,
782 &buf->events[buf->head],
785 stats->rx_enq_retry++;
789 if (buf->last && n == count) {
792 n1 = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
793 rx_adapter->event_port_id,
798 stats->rx_enq_retry++;
806 n ? rxa_enq_block_end_ts(rx_adapter, stats) :
807 rxa_enq_block_start_ts(rx_adapter);
810 stats->rx_enq_count += n;
816 rxa_init_vector(struct rte_event_eth_rx_adapter *rx_adapter,
817 struct eth_rx_vector_data *vec)
819 vec->vector_ev->nb_elem = 0;
820 vec->vector_ev->port = vec->port;
821 vec->vector_ev->queue = vec->queue;
822 vec->vector_ev->attr_valid = true;
823 TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
826 static inline uint16_t
827 rxa_create_event_vector(struct rte_event_eth_rx_adapter *rx_adapter,
828 struct eth_rx_queue_info *queue_info,
829 struct rte_eth_event_enqueue_buffer *buf,
830 struct rte_mbuf **mbufs, uint16_t num)
832 struct rte_event *ev = &buf->events[buf->count];
833 struct eth_rx_vector_data *vec;
834 uint16_t filled, space, sz;
837 vec = &queue_info->vector_data;
839 if (vec->vector_ev == NULL) {
840 if (rte_mempool_get(vec->vector_pool,
841 (void **)&vec->vector_ev) < 0) {
842 rte_pktmbuf_free_bulk(mbufs, num);
845 rxa_init_vector(rx_adapter, vec);
848 if (vec->vector_ev->nb_elem == vec->max_vector_count) {
850 ev->event = vec->event;
851 ev->vec = vec->vector_ev;
854 vec->vector_ev = NULL;
855 TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
856 if (rte_mempool_get(vec->vector_pool,
857 (void **)&vec->vector_ev) < 0) {
858 rte_pktmbuf_free_bulk(mbufs, num);
861 rxa_init_vector(rx_adapter, vec);
864 space = vec->max_vector_count - vec->vector_ev->nb_elem;
865 sz = num > space ? space : num;
866 memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
867 sizeof(void *) * sz);
868 vec->vector_ev->nb_elem += sz;
871 vec->ts = rte_rdtsc();
874 if (vec->vector_ev->nb_elem == vec->max_vector_count) {
875 ev->event = vec->event;
876 ev->vec = vec->vector_ev;
879 vec->vector_ev = NULL;
880 TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
887 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
889 uint16_t rx_queue_id,
890 struct rte_mbuf **mbufs,
894 struct eth_device_info *dev_info =
895 &rx_adapter->eth_devices[eth_dev_id];
896 struct eth_rx_queue_info *eth_rx_queue_info =
897 &dev_info->rx_queue[rx_queue_id];
898 struct rte_eth_event_enqueue_buffer *buf =
899 &rx_adapter->event_enqueue_buffer;
900 uint16_t new_tail = buf->tail;
901 uint64_t event = eth_rx_queue_info->event;
902 uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
903 struct rte_mbuf *m = mbufs[0];
909 uint64_t ts, ts_mask;
911 if (!eth_rx_queue_info->ena_vector) {
912 ts = m->ol_flags & event_eth_rx_timestamp_dynflag ?
913 0 : rte_get_tsc_cycles();
915 /* 0xffff ffff ffff ffff if PKT_RX_TIMESTAMP is set,
918 ts_mask = (uint64_t)(!(m->ol_flags &
919 event_eth_rx_timestamp_dynflag)) - 1ULL;
921 /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
922 rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
923 do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
924 for (i = 0; i < num; i++) {
925 struct rte_event *ev;
928 *rxa_timestamp_dynfield(m) = ts |
929 (*rxa_timestamp_dynfield(m) & ts_mask);
931 ev = &buf->events[new_tail];
933 rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
936 ev->flow_id = (rss & ~flow_id_mask) |
937 (ev->flow_id & flow_id_mask);
942 num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
946 if (num && dev_info->cb_fn) {
949 nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
951 (buf->events_size & ~buf->last_mask),
952 buf->count >= BATCH_SIZE ?
953 buf->count - BATCH_SIZE : 0,
954 &buf->events[buf->tail],
958 if (unlikely(nb_cb > num))
959 RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
964 rx_adapter->stats.rx_dropped += dropped;
972 rxa_pkt_buf_available(struct rte_eth_event_enqueue_buffer *buf)
974 uint32_t nb_req = buf->tail + BATCH_SIZE;
977 if (nb_req <= buf->events_size)
980 if (buf->head >= BATCH_SIZE) {
982 buf->last = buf->tail;
988 return nb_req <= buf->head;
991 /* Enqueue packets from <port, q> to event buffer */
992 static inline uint32_t
993 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
1000 struct rte_mbuf *mbufs[BATCH_SIZE];
1001 struct rte_eth_event_enqueue_buffer *buf =
1002 &rx_adapter->event_enqueue_buffer;
1003 struct rte_event_eth_rx_adapter_stats *stats =
1010 /* Don't do a batch dequeue from the rx queue if there isn't
1011 * enough space in the enqueue buffer.
1013 while (rxa_pkt_buf_available(buf)) {
1014 if (buf->count >= BATCH_SIZE)
1015 rxa_flush_event_buffer(rx_adapter);
1017 stats->rx_poll_count++;
1018 n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
1024 rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
1026 if (rx_count + nb_rx > max_rx)
1031 rxa_flush_event_buffer(rx_adapter);
1037 rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
1043 union queue_data qd;
1044 struct eth_device_info *dev_info;
1045 struct eth_rx_queue_info *queue_info;
1052 dev_info = &rx_adapter->eth_devices[port_id];
1053 queue_info = &dev_info->rx_queue[queue];
1054 rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1055 if (rxa_shared_intr(dev_info, queue))
1056 intr_enabled = &dev_info->shared_intr_enabled;
1058 intr_enabled = &queue_info->intr_enabled;
1060 if (*intr_enabled) {
1062 err = rte_ring_enqueue(rx_adapter->intr_ring, data);
1063 /* Entry should always be available.
1064 * The ring size equals the maximum number of interrupt
1065 * vectors supported (an interrupt vector is shared in
1066 * case of shared interrupts)
1069 RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
1070 " to ring: %s", strerror(-err));
1072 rte_eth_dev_rx_intr_disable(port_id, queue);
1074 rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1078 rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
1079 uint32_t num_intr_vec)
1081 if (rx_adapter->num_intr_vec + num_intr_vec >
1082 RTE_EVENT_ETH_INTR_RING_SIZE) {
1083 RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
1084 " %d needed %d limit %d", rx_adapter->num_intr_vec,
1085 num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
1092 /* Delete entries for (dev, queue) from the interrupt ring */
1094 rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
1095 struct eth_device_info *dev_info,
1096 uint16_t rx_queue_id)
1099 union queue_data qd;
1101 rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1103 n = rte_ring_count(rx_adapter->intr_ring);
1104 for (i = 0; i < n; i++) {
1105 rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1106 if (!rxa_shared_intr(dev_info, rx_queue_id)) {
1107 if (qd.port == dev_info->dev->data->port_id &&
1108 qd.queue == rx_queue_id)
1111 if (qd.port == dev_info->dev->data->port_id)
1114 rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
1117 rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1120 /* pthread callback handling interrupt mode receive queues
1121 * After receiving an Rx interrupt, it enqueues the port id and queue id of the
1122 * interrupting queue to the adapter's ring buffer for interrupt events.
1123 * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
1124 * the adapter service function.
1127 rxa_intr_thread(void *arg)
1129 struct rte_event_eth_rx_adapter *rx_adapter = arg;
1130 struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
1134 n = rte_epoll_wait(rx_adapter->epd, epoll_events,
1135 RTE_EVENT_ETH_INTR_RING_SIZE, -1);
1136 if (unlikely(n < 0))
1137 RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
1139 for (i = 0; i < n; i++) {
1140 rxa_intr_ring_enqueue(rx_adapter,
1141 epoll_events[i].epdata.data);
1148 /* Dequeue <port, q> from interrupt ring and enqueue received
1151 static inline uint32_t
1152 rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
1157 struct rte_eth_event_enqueue_buffer *buf;
1158 rte_spinlock_t *ring_lock;
1159 uint8_t max_done = 0;
1161 if (rx_adapter->num_rx_intr == 0)
1164 if (rte_ring_count(rx_adapter->intr_ring) == 0
1165 && !rx_adapter->qd_valid)
1168 buf = &rx_adapter->event_enqueue_buffer;
1169 ring_lock = &rx_adapter->intr_ring_lock;
1171 if (buf->count >= BATCH_SIZE)
1172 rxa_flush_event_buffer(rx_adapter);
1174 while (rxa_pkt_buf_available(buf)) {
1175 struct eth_device_info *dev_info;
1178 union queue_data qd = rx_adapter->qd;
1181 if (!rx_adapter->qd_valid) {
1182 struct eth_rx_queue_info *queue_info;
1184 rte_spinlock_lock(ring_lock);
1185 err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1187 rte_spinlock_unlock(ring_lock);
1193 rx_adapter->qd = qd;
1194 rx_adapter->qd_valid = 1;
1195 dev_info = &rx_adapter->eth_devices[port];
1196 if (rxa_shared_intr(dev_info, queue))
1197 dev_info->shared_intr_enabled = 1;
1199 queue_info = &dev_info->rx_queue[queue];
1200 queue_info->intr_enabled = 1;
1202 rte_eth_dev_rx_intr_enable(port, queue);
1203 rte_spinlock_unlock(ring_lock);
1208 dev_info = &rx_adapter->eth_devices[port];
1211 if (rxa_shared_intr(dev_info, queue)) {
1215 nb_queues = dev_info->dev->data->nb_rx_queues;
1217 for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1218 uint8_t enq_buffer_full;
1220 if (!rxa_intr_queue(dev_info, i))
1222 n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1223 rx_adapter->max_nb_rx,
1227 enq_buffer_full = !rxq_empty && n == 0;
1228 max_done = nb_rx > rx_adapter->max_nb_rx;
1230 if (enq_buffer_full || max_done) {
1231 dev_info->next_q_idx = i;
1236 rx_adapter->qd_valid = 0;
1238 /* Reinitialize for next interrupt */
1239 dev_info->next_q_idx = dev_info->multi_intr_cap ?
1240 RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1243 n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1244 rx_adapter->max_nb_rx,
1246 rx_adapter->qd_valid = !rxq_empty;
1248 if (nb_rx > rx_adapter->max_nb_rx)
1254 rx_adapter->stats.rx_intr_packets += nb_rx;
1259 * Polls receive queues added to the event adapter and enqueues received
1260 * packets to the event device.
1262 * The receive code enqueues initially to a temporary buffer, the
1263 * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1265 * If there isn't space available in the temporary buffer, packets from the
1266 * Rx queue aren't dequeued from the eth device, this back pressures the
1267 * eth device, in virtual device environments this back pressure is relayed to
1268 * the hypervisor's switching layer where adjustments can be made to deal with
1271 static inline uint32_t
1272 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
1276 struct rte_eth_event_enqueue_buffer *buf;
1280 wrr_pos = rx_adapter->wrr_pos;
1281 max_nb_rx = rx_adapter->max_nb_rx;
1282 buf = &rx_adapter->event_enqueue_buffer;
1284 /* Iterate through a WRR sequence */
1285 for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1286 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1287 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1288 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1290 /* Don't do a batch dequeue from the rx queue if there isn't
1291 * enough space in the enqueue buffer.
1293 if (buf->count >= BATCH_SIZE)
1294 rxa_flush_event_buffer(rx_adapter);
1295 if (!rxa_pkt_buf_available(buf)) {
1296 rx_adapter->wrr_pos = wrr_pos;
1300 nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1302 if (nb_rx > max_nb_rx) {
1303 rx_adapter->wrr_pos =
1304 (wrr_pos + 1) % rx_adapter->wrr_len;
1308 if (++wrr_pos == rx_adapter->wrr_len)
1315 rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
1317 struct rte_event_eth_rx_adapter *rx_adapter = arg;
1318 struct rte_eth_event_enqueue_buffer *buf =
1319 &rx_adapter->event_enqueue_buffer;
1320 struct rte_event *ev;
1323 rxa_flush_event_buffer(rx_adapter);
1325 if (vec->vector_ev->nb_elem == 0)
1327 ev = &buf->events[buf->count];
1330 ev->event = vec->event;
1331 ev->vec = vec->vector_ev;
1334 vec->vector_ev = NULL;
1339 rxa_service_func(void *args)
1341 struct rte_event_eth_rx_adapter *rx_adapter = args;
1342 struct rte_event_eth_rx_adapter_stats *stats;
1344 if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1346 if (!rx_adapter->rxa_started) {
1347 rte_spinlock_unlock(&rx_adapter->rx_lock);
1351 if (rx_adapter->ena_vector) {
1352 if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
1353 rx_adapter->vector_tmo_ticks) {
1354 struct eth_rx_vector_data *vec;
1356 TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1357 uint64_t elapsed_time = rte_rdtsc() - vec->ts;
1359 if (elapsed_time >= vec->vector_timeout_ticks) {
1360 rxa_vector_expire(vec, rx_adapter);
1361 TAILQ_REMOVE(&rx_adapter->vector_list,
1365 rx_adapter->prev_expiry_ts = rte_rdtsc();
1369 stats = &rx_adapter->stats;
1370 stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
1371 stats->rx_packets += rxa_poll(rx_adapter);
1372 rte_spinlock_unlock(&rx_adapter->rx_lock);
1377 rte_event_eth_rx_adapter_init(void)
1379 const char *name = RXA_ADAPTER_ARRAY;
1380 const struct rte_memzone *mz;
1383 sz = sizeof(*event_eth_rx_adapter) *
1384 RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1385 sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1387 mz = rte_memzone_lookup(name);
1389 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1390 RTE_CACHE_LINE_SIZE);
1392 RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1398 event_eth_rx_adapter = mz->addr;
1403 rxa_memzone_lookup(void)
1405 const struct rte_memzone *mz;
1407 if (event_eth_rx_adapter == NULL) {
1408 mz = rte_memzone_lookup(RXA_ADAPTER_ARRAY);
1411 event_eth_rx_adapter = mz->addr;
1417 static inline struct rte_event_eth_rx_adapter *
1418 rxa_id_to_adapter(uint8_t id)
1420 return event_eth_rx_adapter ?
1421 event_eth_rx_adapter[id] : NULL;
1425 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1426 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1429 struct rte_eventdev *dev;
1430 struct rte_event_dev_config dev_conf;
1433 struct rte_event_port_conf *port_conf = arg;
1434 struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1436 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1437 dev_conf = dev->data->dev_conf;
1439 started = dev->data->dev_started;
1441 rte_event_dev_stop(dev_id);
1442 port_id = dev_conf.nb_event_ports;
1443 dev_conf.nb_event_ports += 1;
1444 ret = rte_event_dev_configure(dev_id, &dev_conf);
1446 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1449 if (rte_event_dev_start(dev_id))
1455 ret = rte_event_port_setup(dev_id, port_id, port_conf);
1457 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1462 conf->event_port_id = port_id;
1463 conf->max_nb_rx = 128;
1465 ret = rte_event_dev_start(dev_id);
1466 rx_adapter->default_cb_arg = 1;
1471 rxa_epoll_create1(void)
1475 fd = epoll_create1(EPOLL_CLOEXEC);
1476 return fd < 0 ? -errno : fd;
1483 rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
1485 if (rx_adapter->epd != INIT_FD)
1488 rx_adapter->epd = rxa_epoll_create1();
1489 if (rx_adapter->epd < 0) {
1490 int err = rx_adapter->epd;
1491 rx_adapter->epd = INIT_FD;
1492 RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1500 rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1503 char thread_name[RTE_MAX_THREAD_NAME_LEN];
1505 if (rx_adapter->intr_ring)
1508 rx_adapter->intr_ring = rte_ring_create("intr_ring",
1509 RTE_EVENT_ETH_INTR_RING_SIZE,
1510 rte_socket_id(), 0);
1511 if (!rx_adapter->intr_ring)
1514 rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1515 RTE_EVENT_ETH_INTR_RING_SIZE *
1516 sizeof(struct rte_epoll_event),
1517 RTE_CACHE_LINE_SIZE,
1518 rx_adapter->socket_id);
1519 if (!rx_adapter->epoll_events) {
1524 rte_spinlock_init(&rx_adapter->intr_ring_lock);
1526 snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1527 "rx-intr-thread-%d", rx_adapter->id);
1529 err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1530 NULL, rxa_intr_thread, rx_adapter);
1534 RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1535 rte_free(rx_adapter->epoll_events);
1537 rte_ring_free(rx_adapter->intr_ring);
1538 rx_adapter->intr_ring = NULL;
1539 rx_adapter->epoll_events = NULL;
1544 rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1548 err = pthread_cancel(rx_adapter->rx_intr_thread);
1550 RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1553 err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1555 RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1557 rte_free(rx_adapter->epoll_events);
1558 rte_ring_free(rx_adapter->intr_ring);
1559 rx_adapter->intr_ring = NULL;
1560 rx_adapter->epoll_events = NULL;
1565 rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
1569 if (rx_adapter->num_rx_intr == 0)
1572 ret = rxa_destroy_intr_thread(rx_adapter);
1576 close(rx_adapter->epd);
1577 rx_adapter->epd = INIT_FD;
1583 rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1584 struct eth_device_info *dev_info,
1585 uint16_t rx_queue_id)
1588 uint16_t eth_dev_id = dev_info->dev->data->port_id;
1589 int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1591 err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1593 RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1598 err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1603 RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1606 dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1608 dev_info->shared_intr_enabled = 0;
1613 rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1614 struct eth_device_info *dev_info,
1621 if (dev_info->nb_rx_intr == 0)
1625 if (rx_queue_id == -1) {
1626 s = dev_info->nb_shared_intr;
1627 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1631 q = dev_info->intr_queue[i];
1632 sintr = rxa_shared_intr(dev_info, q);
1635 if (!sintr || s == 0) {
1637 err = rxa_disable_intr(rx_adapter, dev_info,
1641 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1646 if (!rxa_intr_queue(dev_info, rx_queue_id))
1648 if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1649 dev_info->nb_shared_intr == 1) {
1650 err = rxa_disable_intr(rx_adapter, dev_info,
1654 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1658 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1659 if (dev_info->intr_queue[i] == rx_queue_id) {
1660 for (; i < dev_info->nb_rx_intr - 1; i++)
1661 dev_info->intr_queue[i] =
1662 dev_info->intr_queue[i + 1];
1672 rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1673 struct eth_device_info *dev_info,
1674 uint16_t rx_queue_id)
1677 uint16_t eth_dev_id = dev_info->dev->data->port_id;
1678 union queue_data qd;
1680 uint16_t *intr_queue;
1681 int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1683 if (rxa_intr_queue(dev_info, rx_queue_id))
1686 intr_queue = dev_info->intr_queue;
1687 if (dev_info->intr_queue == NULL) {
1689 dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1690 dev_info->intr_queue =
1692 rx_adapter->mem_name,
1695 rx_adapter->socket_id);
1696 if (dev_info->intr_queue == NULL)
1700 init_fd = rx_adapter->epd;
1701 err = rxa_init_epd(rx_adapter);
1703 goto err_free_queue;
1705 qd.port = eth_dev_id;
1706 qd.queue = rx_queue_id;
1708 err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1713 RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1714 " Rx Queue %u err %d", rx_queue_id, err);
1718 err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1720 RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1721 " Rx Queue %u err %d", rx_queue_id, err);
1726 err = rxa_create_intr_thread(rx_adapter);
1729 dev_info->shared_intr_enabled = 1;
1731 dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1736 err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1738 RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1739 " Rx Queue %u err %d", rx_queue_id, err);
1741 err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1746 RTE_EDEV_LOG_ERR("Could not delete event for"
1747 " Rx Queue %u err %d", rx_queue_id, err1);
1750 if (init_fd == INIT_FD) {
1751 close(rx_adapter->epd);
1752 rx_adapter->epd = -1;
1755 if (intr_queue == NULL)
1756 rte_free(dev_info->intr_queue);
1762 rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1763 struct eth_device_info *dev_info,
1769 int shared_done = (dev_info->nb_shared_intr > 0);
1771 if (rx_queue_id != -1) {
1772 if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1774 return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1778 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1780 if (rxa_shared_intr(dev_info, i) && shared_done)
1783 err = rxa_config_intr(rx_adapter, dev_info, i);
1785 shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1788 dev_info->shared_intr_enabled = 1;
1797 shared_done = (dev_info->nb_shared_intr > 0);
1798 for (j = 0; j < i; j++) {
1799 if (rxa_intr_queue(dev_info, j))
1801 if (rxa_shared_intr(dev_info, j) && si != j)
1803 err = rxa_disable_intr(rx_adapter, dev_info, j);
1814 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
1817 struct rte_service_spec service;
1818 struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1820 if (rx_adapter->service_inited)
1823 memset(&service, 0, sizeof(service));
1824 snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1825 "rte_event_eth_rx_adapter_%d", id);
1826 service.socket_id = rx_adapter->socket_id;
1827 service.callback = rxa_service_func;
1828 service.callback_userdata = rx_adapter;
1829 /* Service function handles locking for queue add/del updates */
1830 service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1831 ret = rte_service_component_register(&service, &rx_adapter->service_id);
1833 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1838 ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1839 &rx_adapter_conf, rx_adapter->conf_arg);
1841 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1845 rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1846 rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1847 rx_adapter->service_inited = 1;
1848 rx_adapter->epd = INIT_FD;
1852 rte_service_component_unregister(rx_adapter->service_id);
1857 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1858 struct eth_device_info *dev_info,
1859 int32_t rx_queue_id,
1862 struct eth_rx_queue_info *queue_info;
1866 if (dev_info->rx_queue == NULL)
1869 if (rx_queue_id == -1) {
1870 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1871 rxa_update_queue(rx_adapter, dev_info, i, add);
1873 queue_info = &dev_info->rx_queue[rx_queue_id];
1874 enabled = queue_info->queue_enabled;
1876 rx_adapter->nb_queues += !enabled;
1877 dev_info->nb_dev_queues += !enabled;
1879 rx_adapter->nb_queues -= enabled;
1880 dev_info->nb_dev_queues -= enabled;
1882 queue_info->queue_enabled = !!add;
1887 rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t vector_count,
1888 uint64_t vector_ns, struct rte_mempool *mp, uint32_t qid,
1891 #define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
1892 struct eth_rx_vector_data *vector_data;
1895 vector_data = &queue_info->vector_data;
1896 vector_data->max_vector_count = vector_count;
1897 vector_data->port = port_id;
1898 vector_data->queue = qid;
1899 vector_data->vector_pool = mp;
1900 vector_data->vector_timeout_ticks =
1901 NSEC2TICK(vector_ns, rte_get_timer_hz());
1902 vector_data->ts = 0;
1903 flow_id = queue_info->event & 0xFFFFF;
1905 flow_id == 0 ? (qid & 0xFFF) | (port_id & 0xFF) << 12 : flow_id;
1906 vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
1910 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
1911 struct eth_device_info *dev_info,
1912 int32_t rx_queue_id)
1914 struct eth_rx_vector_data *vec;
1920 if (rx_adapter->nb_queues == 0)
1923 if (rx_queue_id == -1) {
1924 uint16_t nb_rx_queues;
1927 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1928 for (i = 0; i < nb_rx_queues; i++)
1929 rxa_sw_del(rx_adapter, dev_info, i);
1933 /* Push all the partial event vectors to event device. */
1934 TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1935 if (vec->queue != rx_queue_id)
1937 rxa_vector_expire(vec, rx_adapter);
1938 TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
1941 pollq = rxa_polled_queue(dev_info, rx_queue_id);
1942 intrq = rxa_intr_queue(dev_info, rx_queue_id);
1943 sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1944 rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1945 rx_adapter->num_rx_polled -= pollq;
1946 dev_info->nb_rx_poll -= pollq;
1947 rx_adapter->num_rx_intr -= intrq;
1948 dev_info->nb_rx_intr -= intrq;
1949 dev_info->nb_shared_intr -= intrq && sintrq;
1953 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1954 struct eth_device_info *dev_info,
1955 int32_t rx_queue_id,
1956 const struct rte_event_eth_rx_adapter_queue_conf *conf)
1958 struct eth_rx_queue_info *queue_info;
1959 const struct rte_event *ev = &conf->ev;
1963 struct rte_event *qi_ev;
1965 if (rx_queue_id == -1) {
1966 uint16_t nb_rx_queues;
1969 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1970 for (i = 0; i < nb_rx_queues; i++)
1971 rxa_add_queue(rx_adapter, dev_info, i, conf);
1975 pollq = rxa_polled_queue(dev_info, rx_queue_id);
1976 intrq = rxa_intr_queue(dev_info, rx_queue_id);
1977 sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1979 queue_info = &dev_info->rx_queue[rx_queue_id];
1980 queue_info->wt = conf->servicing_weight;
1982 qi_ev = (struct rte_event *)&queue_info->event;
1983 qi_ev->event = ev->event;
1984 qi_ev->op = RTE_EVENT_OP_NEW;
1985 qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
1986 qi_ev->sub_event_type = 0;
1988 if (conf->rx_queue_flags &
1989 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
1990 queue_info->flow_id_mask = ~0;
1994 if (conf->rx_queue_flags &
1995 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
1996 queue_info->ena_vector = 1;
1997 qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
1998 rxa_set_vector_data(queue_info, conf->vector_sz,
1999 conf->vector_timeout_ns, conf->vector_mp,
2000 rx_queue_id, dev_info->dev->data->port_id);
2001 rx_adapter->ena_vector = 1;
2002 rx_adapter->vector_tmo_ticks =
2003 rx_adapter->vector_tmo_ticks ?
2004 RTE_MIN(queue_info->vector_data
2005 .vector_timeout_ticks >>
2007 rx_adapter->vector_tmo_ticks) :
2008 queue_info->vector_data.vector_timeout_ticks >>
2012 rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
2013 if (rxa_polled_queue(dev_info, rx_queue_id)) {
2014 rx_adapter->num_rx_polled += !pollq;
2015 dev_info->nb_rx_poll += !pollq;
2016 rx_adapter->num_rx_intr -= intrq;
2017 dev_info->nb_rx_intr -= intrq;
2018 dev_info->nb_shared_intr -= intrq && sintrq;
2021 if (rxa_intr_queue(dev_info, rx_queue_id)) {
2022 rx_adapter->num_rx_polled -= pollq;
2023 dev_info->nb_rx_poll -= pollq;
2024 rx_adapter->num_rx_intr += !intrq;
2025 dev_info->nb_rx_intr += !intrq;
2026 dev_info->nb_shared_intr += !intrq && sintrq;
2027 if (dev_info->nb_shared_intr == 1) {
2028 if (dev_info->multi_intr_cap)
2029 dev_info->next_q_idx =
2030 RTE_MAX_RXTX_INTR_VEC_ID - 1;
2032 dev_info->next_q_idx = 0;
2037 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
2038 uint16_t eth_dev_id,
2040 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2042 struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
2043 struct rte_event_eth_rx_adapter_queue_conf temp_conf;
2045 struct eth_rx_poll_entry *rx_poll;
2046 struct eth_rx_queue_info *rx_queue;
2048 uint16_t nb_rx_queues;
2049 uint32_t nb_rx_poll, nb_wrr;
2050 uint32_t nb_rx_intr;
2054 if (queue_conf->servicing_weight == 0) {
2055 struct rte_eth_dev_data *data = dev_info->dev->data;
2057 temp_conf = *queue_conf;
2058 if (!data->dev_conf.intr_conf.rxq) {
2059 /* If Rx interrupts are disabled set wt = 1 */
2060 temp_conf.servicing_weight = 1;
2062 queue_conf = &temp_conf;
2065 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
2066 rx_queue = dev_info->rx_queue;
2067 wt = queue_conf->servicing_weight;
2069 if (dev_info->rx_queue == NULL) {
2070 dev_info->rx_queue =
2071 rte_zmalloc_socket(rx_adapter->mem_name,
2073 sizeof(struct eth_rx_queue_info), 0,
2074 rx_adapter->socket_id);
2075 if (dev_info->rx_queue == NULL)
2081 rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
2082 queue_conf->servicing_weight,
2083 &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2085 if (dev_info->dev->intr_handle)
2086 dev_info->multi_intr_cap =
2087 rte_intr_cap_multiple(dev_info->dev->intr_handle);
2089 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2092 goto err_free_rxqueue;
2095 num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
2097 ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
2099 goto err_free_rxqueue;
2101 ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
2103 goto err_free_rxqueue;
2107 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2108 num_intr_vec = rxa_nb_intr_vect(dev_info,
2110 /* interrupt based queues are being converted to
2111 * poll mode queues, delete the interrupt configuration
2114 ret = rxa_del_intr_queue(rx_adapter,
2115 dev_info, rx_queue_id);
2117 goto err_free_rxqueue;
2121 if (nb_rx_intr == 0) {
2122 ret = rxa_free_intr_resources(rx_adapter);
2124 goto err_free_rxqueue;
2130 if (rx_queue_id == -1) {
2131 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
2132 dev_info->intr_queue[i] = i;
2134 if (!rxa_intr_queue(dev_info, rx_queue_id))
2135 dev_info->intr_queue[nb_rx_intr - 1] =
2142 rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
2143 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2145 rte_free(rx_adapter->eth_rx_poll);
2146 rte_free(rx_adapter->wrr_sched);
2148 rx_adapter->eth_rx_poll = rx_poll;
2149 rx_adapter->wrr_sched = rx_wrr;
2150 rx_adapter->wrr_len = nb_wrr;
2151 rx_adapter->num_intr_vec += num_intr_vec;
2155 if (rx_queue == NULL) {
2156 rte_free(dev_info->rx_queue);
2157 dev_info->rx_queue = NULL;
2167 rxa_ctrl(uint8_t id, int start)
2169 struct rte_event_eth_rx_adapter *rx_adapter;
2170 struct rte_eventdev *dev;
2171 struct eth_device_info *dev_info;
2173 int use_service = 0;
2176 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2177 rx_adapter = rxa_id_to_adapter(id);
2178 if (rx_adapter == NULL)
2181 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2183 RTE_ETH_FOREACH_DEV(i) {
2184 dev_info = &rx_adapter->eth_devices[i];
2185 /* if start check for num dev queues */
2186 if (start && !dev_info->nb_dev_queues)
2188 /* if stop check if dev has been started */
2189 if (stop && !dev_info->dev_rx_started)
2191 use_service |= !dev_info->internal_event_port;
2192 dev_info->dev_rx_started = start;
2193 if (dev_info->internal_event_port == 0)
2195 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
2196 &rte_eth_devices[i]) :
2197 (*dev->dev_ops->eth_rx_adapter_stop)(dev,
2198 &rte_eth_devices[i]);
2202 rte_spinlock_lock(&rx_adapter->rx_lock);
2203 rx_adapter->rxa_started = start;
2204 rte_service_runstate_set(rx_adapter->service_id, start);
2205 rte_spinlock_unlock(&rx_adapter->rx_lock);
2212 rxa_create(uint8_t id, uint8_t dev_id,
2213 struct rte_event_eth_rx_adapter_params *rxa_params,
2214 rte_event_eth_rx_adapter_conf_cb conf_cb,
2217 struct rte_event_eth_rx_adapter *rx_adapter;
2218 struct rte_eth_event_enqueue_buffer *buf;
2219 struct rte_event *events;
2223 char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
2224 const uint8_t default_rss_key[] = {
2225 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
2226 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
2227 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
2228 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
2229 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
2232 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2233 RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2235 if (conf_cb == NULL)
2238 if (event_eth_rx_adapter == NULL) {
2239 ret = rte_event_eth_rx_adapter_init();
2244 rx_adapter = rxa_id_to_adapter(id);
2245 if (rx_adapter != NULL) {
2246 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
2250 socket_id = rte_event_dev_socket_id(dev_id);
2251 snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
2252 "rte_event_eth_rx_adapter_%d",
2255 rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
2256 RTE_CACHE_LINE_SIZE, socket_id);
2257 if (rx_adapter == NULL) {
2258 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
2262 rx_adapter->eventdev_id = dev_id;
2263 rx_adapter->socket_id = socket_id;
2264 rx_adapter->conf_cb = conf_cb;
2265 rx_adapter->conf_arg = conf_arg;
2266 rx_adapter->id = id;
2267 TAILQ_INIT(&rx_adapter->vector_list);
2268 strcpy(rx_adapter->mem_name, mem_name);
2269 rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
2271 sizeof(struct eth_device_info), 0,
2273 rte_convert_rss_key((const uint32_t *)default_rss_key,
2274 (uint32_t *)rx_adapter->rss_key_be,
2275 RTE_DIM(default_rss_key));
2277 if (rx_adapter->eth_devices == NULL) {
2278 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
2279 rte_free(rx_adapter);
2283 rte_spinlock_init(&rx_adapter->rx_lock);
2285 for (i = 0; i < RTE_MAX_ETHPORTS; i++)
2286 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
2288 /* Rx adapter event buffer allocation */
2289 buf = &rx_adapter->event_enqueue_buffer;
2290 buf->events_size = rxa_params->event_buf_size;
2292 events = rte_zmalloc_socket(rx_adapter->mem_name,
2293 buf->events_size * sizeof(*events),
2295 if (events == NULL) {
2296 RTE_EDEV_LOG_ERR("Failed to allocate mem for event buffer\n");
2297 rte_free(rx_adapter->eth_devices);
2298 rte_free(rx_adapter);
2302 rx_adapter->event_enqueue_buffer.events = events;
2304 event_eth_rx_adapter[id] = rx_adapter;
2306 if (conf_cb == rxa_default_conf_cb)
2307 rx_adapter->default_cb_arg = 1;
2309 if (rte_mbuf_dyn_rx_timestamp_register(
2310 &event_eth_rx_timestamp_dynfield_offset,
2311 &event_eth_rx_timestamp_dynflag) != 0) {
2312 RTE_EDEV_LOG_ERR("Error registering timestamp field in mbuf\n");
2316 rte_eventdev_trace_eth_rx_adapter_create(id, dev_id, conf_cb,
2322 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
2323 rte_event_eth_rx_adapter_conf_cb conf_cb,
2326 struct rte_event_eth_rx_adapter_params rxa_params = {0};
2328 /* use default values for adapter params */
2329 rxa_params.event_buf_size = ETH_EVENT_BUFFER_SIZE;
2331 return rxa_create(id, dev_id, &rxa_params, conf_cb, conf_arg);
2335 rte_event_eth_rx_adapter_create_with_params(uint8_t id, uint8_t dev_id,
2336 struct rte_event_port_conf *port_config,
2337 struct rte_event_eth_rx_adapter_params *rxa_params)
2339 struct rte_event_port_conf *pc;
2341 struct rte_event_eth_rx_adapter_params temp_params = {0};
2343 if (port_config == NULL)
2346 /* use default values if rxa_params is NULL */
2347 if (rxa_params == NULL) {
2348 rxa_params = &temp_params;
2349 rxa_params->event_buf_size = ETH_EVENT_BUFFER_SIZE;
2352 if (rxa_params->event_buf_size == 0)
2355 pc = rte_malloc(NULL, sizeof(*pc), 0);
2361 /* adjust event buff size with BATCH_SIZE used for fetching packets
2362 * from NIC rx queues to get full buffer utilization and prevent
2363 * unnecessary rollovers.
2365 rxa_params->event_buf_size = RTE_ALIGN(rxa_params->event_buf_size,
2367 rxa_params->event_buf_size += (BATCH_SIZE + BATCH_SIZE);
2369 ret = rxa_create(id, dev_id, rxa_params, rxa_default_conf_cb, pc);
2377 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2378 struct rte_event_port_conf *port_config)
2380 struct rte_event_port_conf *pc;
2383 if (port_config == NULL)
2386 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2388 pc = rte_malloc(NULL, sizeof(*pc), 0);
2393 ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2394 rxa_default_conf_cb,
2402 rte_event_eth_rx_adapter_free(uint8_t id)
2404 struct rte_event_eth_rx_adapter *rx_adapter;
2406 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2408 rx_adapter = rxa_id_to_adapter(id);
2409 if (rx_adapter == NULL)
2412 if (rx_adapter->nb_queues) {
2413 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2414 rx_adapter->nb_queues);
2418 if (rx_adapter->default_cb_arg)
2419 rte_free(rx_adapter->conf_arg);
2420 rte_free(rx_adapter->eth_devices);
2421 rte_free(rx_adapter->event_enqueue_buffer.events);
2422 rte_free(rx_adapter);
2423 event_eth_rx_adapter[id] = NULL;
2425 rte_eventdev_trace_eth_rx_adapter_free(id);
2430 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2431 uint16_t eth_dev_id,
2432 int32_t rx_queue_id,
2433 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2437 struct rte_event_eth_rx_adapter *rx_adapter;
2438 struct rte_eventdev *dev;
2439 struct eth_device_info *dev_info;
2440 struct rte_event_eth_rx_adapter_vector_limits limits;
2442 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2443 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2445 rx_adapter = rxa_id_to_adapter(id);
2446 if ((rx_adapter == NULL) || (queue_conf == NULL))
2449 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2450 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2454 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2455 "eth port %" PRIu16, id, eth_dev_id);
2459 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2460 && (queue_conf->rx_queue_flags &
2461 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2462 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2463 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2468 if (queue_conf->rx_queue_flags &
2469 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
2471 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0) {
2472 RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
2473 " eth port: %" PRIu16
2474 " adapter id: %" PRIu8,
2479 ret = rte_event_eth_rx_adapter_vector_limits_get(
2480 rx_adapter->eventdev_id, eth_dev_id, &limits);
2482 RTE_EDEV_LOG_ERR("Failed to get event device vector limits,"
2483 " eth port: %" PRIu16
2484 " adapter id: %" PRIu8,
2488 if (queue_conf->vector_sz < limits.min_sz ||
2489 queue_conf->vector_sz > limits.max_sz ||
2490 queue_conf->vector_timeout_ns < limits.min_timeout_ns ||
2491 queue_conf->vector_timeout_ns > limits.max_timeout_ns ||
2492 queue_conf->vector_mp == NULL) {
2493 RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2494 " eth port: %" PRIu16
2495 " adapter id: %" PRIu8,
2499 if (queue_conf->vector_mp->elt_size <
2500 (sizeof(struct rte_event_vector) +
2501 (sizeof(uintptr_t) * queue_conf->vector_sz))) {
2502 RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2503 " eth port: %" PRIu16
2504 " adapter id: %" PRIu8,
2510 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2511 (rx_queue_id != -1)) {
2512 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2513 "event queue, eth port: %" PRIu16 " adapter id: %"
2514 PRIu8, eth_dev_id, id);
2518 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2519 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2520 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2521 (uint16_t)rx_queue_id);
2525 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2527 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2528 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2530 if (dev_info->rx_queue == NULL) {
2531 dev_info->rx_queue =
2532 rte_zmalloc_socket(rx_adapter->mem_name,
2533 dev_info->dev->data->nb_rx_queues *
2534 sizeof(struct eth_rx_queue_info), 0,
2535 rx_adapter->socket_id);
2536 if (dev_info->rx_queue == NULL)
2540 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2541 &rte_eth_devices[eth_dev_id],
2542 rx_queue_id, queue_conf);
2544 dev_info->internal_event_port = 1;
2545 rxa_update_queue(rx_adapter,
2546 &rx_adapter->eth_devices[eth_dev_id],
2551 rte_spinlock_lock(&rx_adapter->rx_lock);
2552 dev_info->internal_event_port = 0;
2553 ret = rxa_init_service(rx_adapter, id);
2555 uint32_t service_id = rx_adapter->service_id;
2556 ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2558 rte_service_component_runstate_set(service_id,
2559 rxa_sw_adapter_queue_count(rx_adapter));
2561 rte_spinlock_unlock(&rx_adapter->rx_lock);
2564 rte_eventdev_trace_eth_rx_adapter_queue_add(id, eth_dev_id,
2565 rx_queue_id, queue_conf, ret);
2573 rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
2575 limits->max_sz = MAX_VECTOR_SIZE;
2576 limits->min_sz = MIN_VECTOR_SIZE;
2577 limits->max_timeout_ns = MAX_VECTOR_NS;
2578 limits->min_timeout_ns = MIN_VECTOR_NS;
2584 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2585 int32_t rx_queue_id)
2588 struct rte_eventdev *dev;
2589 struct rte_event_eth_rx_adapter *rx_adapter;
2590 struct eth_device_info *dev_info;
2592 uint32_t nb_rx_poll = 0;
2593 uint32_t nb_wrr = 0;
2594 uint32_t nb_rx_intr;
2595 struct eth_rx_poll_entry *rx_poll = NULL;
2596 uint32_t *rx_wrr = NULL;
2599 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2600 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2602 rx_adapter = rxa_id_to_adapter(id);
2603 if (rx_adapter == NULL)
2606 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2607 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2613 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2614 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2615 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2616 (uint16_t)rx_queue_id);
2620 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2622 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2623 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2625 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2626 &rte_eth_devices[eth_dev_id],
2629 rxa_update_queue(rx_adapter,
2630 &rx_adapter->eth_devices[eth_dev_id],
2633 if (dev_info->nb_dev_queues == 0) {
2634 rte_free(dev_info->rx_queue);
2635 dev_info->rx_queue = NULL;
2639 rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2640 &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2642 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2647 rte_spinlock_lock(&rx_adapter->rx_lock);
2650 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2652 num_intr_vec = rxa_nb_intr_vect(dev_info,
2654 ret = rxa_del_intr_queue(rx_adapter, dev_info,
2660 if (nb_rx_intr == 0) {
2661 ret = rxa_free_intr_resources(rx_adapter);
2666 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2667 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2669 rte_free(rx_adapter->eth_rx_poll);
2670 rte_free(rx_adapter->wrr_sched);
2672 if (nb_rx_intr == 0) {
2673 rte_free(dev_info->intr_queue);
2674 dev_info->intr_queue = NULL;
2677 rx_adapter->eth_rx_poll = rx_poll;
2678 rx_adapter->wrr_sched = rx_wrr;
2679 rx_adapter->wrr_len = nb_wrr;
2680 rx_adapter->num_intr_vec += num_intr_vec;
2682 if (dev_info->nb_dev_queues == 0) {
2683 rte_free(dev_info->rx_queue);
2684 dev_info->rx_queue = NULL;
2687 rte_spinlock_unlock(&rx_adapter->rx_lock);
2694 rte_service_component_runstate_set(rx_adapter->service_id,
2695 rxa_sw_adapter_queue_count(rx_adapter));
2698 rte_eventdev_trace_eth_rx_adapter_queue_del(id, eth_dev_id,
2704 rte_event_eth_rx_adapter_vector_limits_get(
2705 uint8_t dev_id, uint16_t eth_port_id,
2706 struct rte_event_eth_rx_adapter_vector_limits *limits)
2708 struct rte_eventdev *dev;
2712 RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2713 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_port_id, -EINVAL);
2718 dev = &rte_eventdevs[dev_id];
2720 ret = rte_event_eth_rx_adapter_caps_get(dev_id, eth_port_id, &cap);
2722 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2723 "eth port %" PRIu16,
2724 dev_id, eth_port_id);
2728 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2729 RTE_FUNC_PTR_OR_ERR_RET(
2730 *dev->dev_ops->eth_rx_adapter_vector_limits_get,
2732 ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
2733 dev, &rte_eth_devices[eth_port_id], limits);
2735 ret = rxa_sw_vector_limits(limits);
2742 rte_event_eth_rx_adapter_start(uint8_t id)
2744 rte_eventdev_trace_eth_rx_adapter_start(id);
2745 return rxa_ctrl(id, 1);
2749 rte_event_eth_rx_adapter_stop(uint8_t id)
2751 rte_eventdev_trace_eth_rx_adapter_stop(id);
2752 return rxa_ctrl(id, 0);
2756 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2757 struct rte_event_eth_rx_adapter_stats *stats)
2759 struct rte_event_eth_rx_adapter *rx_adapter;
2760 struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2761 struct rte_event_eth_rx_adapter_stats dev_stats;
2762 struct rte_eventdev *dev;
2763 struct eth_device_info *dev_info;
2767 if (rxa_memzone_lookup())
2770 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2772 rx_adapter = rxa_id_to_adapter(id);
2773 if (rx_adapter == NULL || stats == NULL)
2776 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2777 memset(stats, 0, sizeof(*stats));
2778 RTE_ETH_FOREACH_DEV(i) {
2779 dev_info = &rx_adapter->eth_devices[i];
2780 if (dev_info->internal_event_port == 0 ||
2781 dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2783 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2784 &rte_eth_devices[i],
2788 dev_stats_sum.rx_packets += dev_stats.rx_packets;
2789 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2792 if (rx_adapter->service_inited)
2793 *stats = rx_adapter->stats;
2795 stats->rx_packets += dev_stats_sum.rx_packets;
2796 stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2802 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
2804 struct rte_event_eth_rx_adapter *rx_adapter;
2805 struct rte_eventdev *dev;
2806 struct eth_device_info *dev_info;
2809 if (rxa_memzone_lookup())
2812 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2814 rx_adapter = rxa_id_to_adapter(id);
2815 if (rx_adapter == NULL)
2818 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2819 RTE_ETH_FOREACH_DEV(i) {
2820 dev_info = &rx_adapter->eth_devices[i];
2821 if (dev_info->internal_event_port == 0 ||
2822 dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
2824 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
2825 &rte_eth_devices[i]);
2828 memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
2833 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
2835 struct rte_event_eth_rx_adapter *rx_adapter;
2837 if (rxa_memzone_lookup())
2840 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2842 rx_adapter = rxa_id_to_adapter(id);
2843 if (rx_adapter == NULL || service_id == NULL)
2846 if (rx_adapter->service_inited)
2847 *service_id = rx_adapter->service_id;
2849 return rx_adapter->service_inited ? 0 : -ESRCH;
2853 rte_event_eth_rx_adapter_cb_register(uint8_t id,
2854 uint16_t eth_dev_id,
2855 rte_event_eth_rx_adapter_cb_fn cb_fn,
2858 struct rte_event_eth_rx_adapter *rx_adapter;
2859 struct eth_device_info *dev_info;
2863 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2864 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2866 rx_adapter = rxa_id_to_adapter(id);
2867 if (rx_adapter == NULL)
2870 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2871 if (dev_info->rx_queue == NULL)
2874 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2878 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2879 "eth port %" PRIu16, id, eth_dev_id);
2883 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2884 RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
2885 PRIu16, eth_dev_id);
2889 rte_spinlock_lock(&rx_adapter->rx_lock);
2890 dev_info->cb_fn = cb_fn;
2891 dev_info->cb_arg = cb_arg;
2892 rte_spinlock_unlock(&rx_adapter->rx_lock);
2898 rte_event_eth_rx_adapter_queue_conf_get(uint8_t id,
2899 uint16_t eth_dev_id,
2900 uint16_t rx_queue_id,
2901 struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2903 struct rte_eventdev *dev;
2904 struct rte_event_eth_rx_adapter *rx_adapter;
2905 struct eth_device_info *dev_info;
2906 struct eth_rx_queue_info *queue_info;
2907 struct rte_event *qi_ev;
2910 if (rxa_memzone_lookup())
2913 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2914 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2916 if (rx_queue_id >= rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2917 RTE_EDEV_LOG_ERR("Invalid rx queue_id %u", rx_queue_id);
2921 if (queue_conf == NULL) {
2922 RTE_EDEV_LOG_ERR("Rx queue conf struct cannot be NULL");
2926 rx_adapter = rxa_id_to_adapter(id);
2927 if (rx_adapter == NULL)
2930 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2931 if (dev_info->rx_queue == NULL ||
2932 !dev_info->rx_queue[rx_queue_id].queue_enabled) {
2933 RTE_EDEV_LOG_ERR("Rx queue %u not added", rx_queue_id);
2937 queue_info = &dev_info->rx_queue[rx_queue_id];
2938 qi_ev = (struct rte_event *)&queue_info->event;
2940 memset(queue_conf, 0, sizeof(*queue_conf));
2941 queue_conf->rx_queue_flags = 0;
2942 if (queue_info->flow_id_mask != 0)
2943 queue_conf->rx_queue_flags |=
2944 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID;
2945 queue_conf->servicing_weight = queue_info->wt;
2947 memcpy(&queue_conf->ev, qi_ev, sizeof(*qi_ev));
2949 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2950 if (dev->dev_ops->eth_rx_adapter_queue_conf_get != NULL) {
2951 ret = (*dev->dev_ops->eth_rx_adapter_queue_conf_get)(dev,
2952 &rte_eth_devices[eth_dev_id],