1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2017 Intel Corporation.
10 #include <rte_cycles.h>
11 #include <rte_common.h>
13 #include <rte_errno.h>
14 #include <rte_ethdev.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
21 #include "rte_eventdev.h"
22 #include "eventdev_pmd.h"
23 #include "rte_eventdev_trace.h"
24 #include "rte_event_eth_rx_adapter.h"
27 #define BLOCK_CNT_THRESHOLD 10
28 #define ETH_EVENT_BUFFER_SIZE (4*BATCH_SIZE)
29 #define MAX_VECTOR_SIZE 1024
30 #define MIN_VECTOR_SIZE 4
31 #define MAX_VECTOR_NS 1E9
32 #define MIN_VECTOR_NS 1E5
34 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
35 #define ETH_RX_ADAPTER_MEM_NAME_LEN 32
37 #define RSS_KEY_SIZE 40
38 /* value written to intr thread pipe to signal thread exit */
39 #define ETH_BRIDGE_INTR_THREAD_EXIT 1
40 /* Sentinel value to detect initialized file handle */
44 * Used to store port and queue ID of interrupting Rx queue
56 * There is an instance of this struct per polled Rx queue added to the
59 struct eth_rx_poll_entry {
60 /* Eth port to poll */
62 /* Eth rx queue to poll */
66 struct eth_rx_vector_data {
67 TAILQ_ENTRY(eth_rx_vector_data) next;
70 uint16_t max_vector_count;
73 uint64_t vector_timeout_ticks;
74 struct rte_mempool *vector_pool;
75 struct rte_event_vector *vector_ev;
76 } __rte_cache_aligned;
78 TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
80 /* Instance per adapter */
81 struct rte_eth_event_enqueue_buffer {
82 /* Count of events in this buffer */
84 /* Array of events in this buffer */
85 struct rte_event events[ETH_EVENT_BUFFER_SIZE];
88 struct rte_event_eth_rx_adapter {
90 uint8_t rss_key_be[RSS_KEY_SIZE];
91 /* Event device identifier */
93 /* Per ethernet device structure */
94 struct eth_device_info *eth_devices;
95 /* Event port identifier */
96 uint8_t event_port_id;
97 /* Lock to serialize config updates with service function */
98 rte_spinlock_t rx_lock;
99 /* Max mbufs processed in any service function invocation */
101 /* Receive queues that need to be polled */
102 struct eth_rx_poll_entry *eth_rx_poll;
103 /* Size of the eth_rx_poll array */
104 uint16_t num_rx_polled;
105 /* Weighted round robin schedule */
107 /* wrr_sched[] size */
109 /* Next entry in wrr[] to begin polling */
111 /* Event burst buffer */
112 struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
113 /* Vector enable flag */
115 /* Timestamp of previous vector expiry list traversal */
116 uint64_t prev_expiry_ts;
117 /* Minimum ticks to wait before traversing expiry list */
118 uint64_t vector_tmo_ticks;
120 struct eth_rx_vector_data_list vector_list;
121 /* Per adapter stats */
122 struct rte_event_eth_rx_adapter_stats stats;
123 /* Block count, counts up to BLOCK_CNT_THRESHOLD */
124 uint16_t enq_block_count;
126 uint64_t rx_enq_block_start_ts;
127 /* epoll fd used to wait for Rx interrupts */
129 /* Num of interrupt driven interrupt queues */
130 uint32_t num_rx_intr;
131 /* Used to send <dev id, queue id> of interrupting Rx queues from
132 * the interrupt thread to the Rx thread
134 struct rte_ring *intr_ring;
135 /* Rx Queue data (dev id, queue id) for the last non-empty
139 /* queue_data is valid */
141 /* Interrupt ring lock, synchronizes Rx thread
142 * and interrupt thread
144 rte_spinlock_t intr_ring_lock;
145 /* event array passed to rte_poll_wait */
146 struct rte_epoll_event *epoll_events;
147 /* Count of interrupt vectors in use */
148 uint32_t num_intr_vec;
149 /* Thread blocked on Rx interrupts */
150 pthread_t rx_intr_thread;
151 /* Configuration callback for rte_service configuration */
152 rte_event_eth_rx_adapter_conf_cb conf_cb;
153 /* Configuration callback argument */
155 /* Set if default_cb is being used */
157 /* Service initialization state */
158 uint8_t service_inited;
159 /* Total count of Rx queues in adapter */
161 /* Memory allocation name */
162 char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
163 /* Socket identifier cached from eventdev */
165 /* Per adapter EAL service */
167 /* Adapter started flag */
171 } __rte_cache_aligned;
174 struct eth_device_info {
175 struct rte_eth_dev *dev;
176 struct eth_rx_queue_info *rx_queue;
178 rte_event_eth_rx_adapter_cb_fn cb_fn;
179 /* Rx callback argument */
181 /* Set if ethdev->eventdev packet transfer uses a
184 uint8_t internal_event_port;
185 /* Set if the adapter is processing rx queues for
186 * this eth device and packet processing has been
187 * started, allows for the code to know if the PMD
188 * rx_adapter_stop callback needs to be invoked
190 uint8_t dev_rx_started;
191 /* Number of queues added for this device */
192 uint16_t nb_dev_queues;
193 /* Number of poll based queues
194 * If nb_rx_poll > 0, the start callback will
195 * be invoked if not already invoked
198 /* Number of interrupt based queues
199 * If nb_rx_intr > 0, the start callback will
200 * be invoked if not already invoked.
203 /* Number of queues that use the shared interrupt */
204 uint16_t nb_shared_intr;
205 /* sum(wrr(q)) for all queues within the device
206 * useful when deleting all device queues
209 /* Intr based queue index to start polling from, this is used
210 * if the number of shared interrupts is non-zero
213 /* Intr based queue indices */
214 uint16_t *intr_queue;
215 /* device generates per Rx queue interrupt for queue index
216 * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
219 /* shared interrupt enabled */
220 int shared_intr_enabled;
224 struct eth_rx_queue_info {
225 int queue_enabled; /* True if added */
228 uint16_t wt; /* Polling weight */
229 uint32_t flow_id_mask; /* Set to ~0 if app provides flow id else 0 */
231 struct eth_rx_vector_data vector_data;
234 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
237 rxa_validate_id(uint8_t id)
239 return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
242 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
243 if (!rxa_validate_id(id)) { \
244 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
250 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
252 return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
255 /* Greatest common divisor */
256 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
260 return r ? rxa_gcd_u16(b, r) : b;
263 /* Returns the next queue in the polling sequence
265 * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
268 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
269 unsigned int n, int *cw,
270 struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
271 uint16_t gcd, int prev)
287 q = eth_rx_poll[i].eth_rx_qid;
288 d = eth_rx_poll[i].eth_dev_id;
289 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
297 rxa_shared_intr(struct eth_device_info *dev_info,
302 if (dev_info->dev->intr_handle == NULL)
305 multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
306 return !multi_intr_cap ||
307 rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
311 rxa_intr_queue(struct eth_device_info *dev_info,
314 struct eth_rx_queue_info *queue_info;
316 queue_info = &dev_info->rx_queue[rx_queue_id];
317 return dev_info->rx_queue &&
318 !dev_info->internal_event_port &&
319 queue_info->queue_enabled && queue_info->wt == 0;
323 rxa_polled_queue(struct eth_device_info *dev_info,
326 struct eth_rx_queue_info *queue_info;
328 queue_info = &dev_info->rx_queue[rx_queue_id];
329 return !dev_info->internal_event_port &&
330 dev_info->rx_queue &&
331 queue_info->queue_enabled && queue_info->wt != 0;
334 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
336 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
342 nbq = dev_info->dev->data->nb_rx_queues;
343 n = 0; /* non shared count */
344 s = 0; /* shared count */
346 if (rx_queue_id == -1) {
347 for (i = 0; i < nbq; i++) {
348 if (!rxa_shared_intr(dev_info, i))
349 n += add ? !rxa_intr_queue(dev_info, i) :
350 rxa_intr_queue(dev_info, i);
352 s += add ? !rxa_intr_queue(dev_info, i) :
353 rxa_intr_queue(dev_info, i);
357 if ((add && dev_info->nb_shared_intr == 0) ||
358 (!add && dev_info->nb_shared_intr))
362 if (!rxa_shared_intr(dev_info, rx_queue_id))
363 n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
364 rxa_intr_queue(dev_info, rx_queue_id);
366 n = add ? !dev_info->nb_shared_intr :
367 dev_info->nb_shared_intr == 1;
373 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
376 rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
377 struct eth_device_info *dev_info,
379 uint32_t *nb_rx_intr)
383 if (rx_queue_id == -1)
384 intr_diff = dev_info->nb_rx_intr;
386 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
388 *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
391 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
392 * interrupt queues could currently be poll mode Rx queues
395 rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
396 struct eth_device_info *dev_info,
398 uint32_t *nb_rx_poll,
399 uint32_t *nb_rx_intr,
404 uint32_t wrr_len_diff;
406 if (rx_queue_id == -1) {
407 intr_diff = dev_info->dev->data->nb_rx_queues -
408 dev_info->nb_rx_intr;
409 poll_diff = dev_info->nb_rx_poll;
410 wrr_len_diff = dev_info->wrr_len;
412 intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
413 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
414 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
418 *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
419 *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
420 *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
423 /* Calculate size of the eth_rx_poll and wrr_sched arrays
424 * after deleting poll mode rx queues
427 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
428 struct eth_device_info *dev_info,
430 uint32_t *nb_rx_poll,
434 uint32_t wrr_len_diff;
436 if (rx_queue_id == -1) {
437 poll_diff = dev_info->nb_rx_poll;
438 wrr_len_diff = dev_info->wrr_len;
440 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
441 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
445 *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
446 *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
449 /* Calculate nb_rx_* after adding poll mode rx queues
452 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
453 struct eth_device_info *dev_info,
456 uint32_t *nb_rx_poll,
457 uint32_t *nb_rx_intr,
462 uint32_t wrr_len_diff;
464 if (rx_queue_id == -1) {
465 intr_diff = dev_info->nb_rx_intr;
466 poll_diff = dev_info->dev->data->nb_rx_queues -
467 dev_info->nb_rx_poll;
468 wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
471 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
472 poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
473 wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
474 wt - dev_info->rx_queue[rx_queue_id].wt :
478 *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
479 *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
480 *nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
483 /* Calculate nb_rx_* after adding rx_queue_id */
485 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
486 struct eth_device_info *dev_info,
489 uint32_t *nb_rx_poll,
490 uint32_t *nb_rx_intr,
494 rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
495 wt, nb_rx_poll, nb_rx_intr, nb_wrr);
497 rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
498 nb_rx_poll, nb_rx_intr, nb_wrr);
501 /* Calculate nb_rx_* after deleting rx_queue_id */
503 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
504 struct eth_device_info *dev_info,
506 uint32_t *nb_rx_poll,
507 uint32_t *nb_rx_intr,
510 rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
512 rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
517 * Allocate the rx_poll array
519 static struct eth_rx_poll_entry *
520 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
521 uint32_t num_rx_polled)
525 len = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
526 RTE_CACHE_LINE_SIZE);
527 return rte_zmalloc_socket(rx_adapter->mem_name,
530 rx_adapter->socket_id);
534 * Allocate the WRR array
537 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
541 len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
542 RTE_CACHE_LINE_SIZE);
543 return rte_zmalloc_socket(rx_adapter->mem_name,
546 rx_adapter->socket_id);
550 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
553 struct eth_rx_poll_entry **rx_poll,
554 uint32_t **wrr_sched)
563 *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
564 if (*rx_poll == NULL) {
569 *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
570 if (*wrr_sched == NULL) {
577 /* Precalculate WRR polling sequence for all queues in rx_adapter */
579 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
580 struct eth_rx_poll_entry *rx_poll,
589 /* Initialize variables for calculation of wrr schedule */
590 uint16_t max_wrr_pos = 0;
591 unsigned int poll_q = 0;
598 /* Generate array of all queues to poll, the size of this
601 RTE_ETH_FOREACH_DEV(d) {
602 uint16_t nb_rx_queues;
603 struct eth_device_info *dev_info =
604 &rx_adapter->eth_devices[d];
605 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
606 if (dev_info->rx_queue == NULL)
608 if (dev_info->internal_event_port)
610 dev_info->wrr_len = 0;
611 for (q = 0; q < nb_rx_queues; q++) {
612 struct eth_rx_queue_info *queue_info =
613 &dev_info->rx_queue[q];
616 if (!rxa_polled_queue(dev_info, q))
619 rx_poll[poll_q].eth_dev_id = d;
620 rx_poll[poll_q].eth_rx_qid = q;
622 dev_info->wrr_len += wt;
623 max_wt = RTE_MAX(max_wt, wt);
624 gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
629 /* Generate polling sequence based on weights */
632 for (i = 0; i < max_wrr_pos; i++) {
633 rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
634 rx_poll, max_wt, gcd, prev);
640 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
641 struct rte_ipv6_hdr **ipv6_hdr)
643 struct rte_ether_hdr *eth_hdr =
644 rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
645 struct rte_vlan_hdr *vlan_hdr;
650 switch (eth_hdr->ether_type) {
651 case RTE_BE16(RTE_ETHER_TYPE_IPV4):
652 *ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
655 case RTE_BE16(RTE_ETHER_TYPE_IPV6):
656 *ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
659 case RTE_BE16(RTE_ETHER_TYPE_VLAN):
660 vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
661 switch (vlan_hdr->eth_proto) {
662 case RTE_BE16(RTE_ETHER_TYPE_IPV4):
663 *ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
665 case RTE_BE16(RTE_ETHER_TYPE_IPV6):
666 *ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
678 /* Calculate RSS hash for IPv4/6 */
679 static inline uint32_t
680 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
684 struct rte_ipv4_tuple ipv4_tuple;
685 struct rte_ipv6_tuple ipv6_tuple;
686 struct rte_ipv4_hdr *ipv4_hdr;
687 struct rte_ipv6_hdr *ipv6_hdr;
689 rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
692 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
693 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
695 input_len = RTE_THASH_V4_L3_LEN;
696 } else if (ipv6_hdr) {
697 rte_thash_load_v6_addrs(ipv6_hdr,
698 (union rte_thash_tuple *)&ipv6_tuple);
700 input_len = RTE_THASH_V6_L3_LEN;
704 return rte_softrss_be(tuple, input_len, rss_key_be);
708 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
710 return !!rx_adapter->enq_block_count;
714 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
716 if (rx_adapter->rx_enq_block_start_ts)
719 rx_adapter->enq_block_count++;
720 if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
723 rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
727 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
728 struct rte_event_eth_rx_adapter_stats *stats)
730 if (unlikely(!stats->rx_enq_start_ts))
731 stats->rx_enq_start_ts = rte_get_tsc_cycles();
733 if (likely(!rxa_enq_blocked(rx_adapter)))
736 rx_adapter->enq_block_count = 0;
737 if (rx_adapter->rx_enq_block_start_ts) {
738 stats->rx_enq_end_ts = rte_get_tsc_cycles();
739 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
740 rx_adapter->rx_enq_block_start_ts;
741 rx_adapter->rx_enq_block_start_ts = 0;
745 /* Enqueue buffered events to event device */
746 static inline uint16_t
747 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
749 struct rte_eth_event_enqueue_buffer *buf =
750 &rx_adapter->event_enqueue_buffer;
751 struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
756 uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
757 rx_adapter->event_port_id,
760 if (n != buf->count) {
763 (buf->count - n) * sizeof(struct rte_event));
764 stats->rx_enq_retry++;
767 n ? rxa_enq_block_end_ts(rx_adapter, stats) :
768 rxa_enq_block_start_ts(rx_adapter);
771 stats->rx_enq_count += n;
777 rxa_init_vector(struct rte_event_eth_rx_adapter *rx_adapter,
778 struct eth_rx_vector_data *vec)
780 vec->vector_ev->nb_elem = 0;
781 vec->vector_ev->port = vec->port;
782 vec->vector_ev->queue = vec->queue;
783 vec->vector_ev->attr_valid = true;
784 TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
787 static inline uint16_t
788 rxa_create_event_vector(struct rte_event_eth_rx_adapter *rx_adapter,
789 struct eth_rx_queue_info *queue_info,
790 struct rte_eth_event_enqueue_buffer *buf,
791 struct rte_mbuf **mbufs, uint16_t num)
793 struct rte_event *ev = &buf->events[buf->count];
794 struct eth_rx_vector_data *vec;
795 uint16_t filled, space, sz;
798 vec = &queue_info->vector_data;
800 if (vec->vector_ev == NULL) {
801 if (rte_mempool_get(vec->vector_pool,
802 (void **)&vec->vector_ev) < 0) {
803 rte_pktmbuf_free_bulk(mbufs, num);
806 rxa_init_vector(rx_adapter, vec);
809 if (vec->vector_ev->nb_elem == vec->max_vector_count) {
811 ev->event = vec->event;
812 ev->vec = vec->vector_ev;
815 vec->vector_ev = NULL;
816 TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
817 if (rte_mempool_get(vec->vector_pool,
818 (void **)&vec->vector_ev) < 0) {
819 rte_pktmbuf_free_bulk(mbufs, num);
822 rxa_init_vector(rx_adapter, vec);
825 space = vec->max_vector_count - vec->vector_ev->nb_elem;
826 sz = num > space ? space : num;
827 memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
828 sizeof(void *) * sz);
829 vec->vector_ev->nb_elem += sz;
832 vec->ts = rte_rdtsc();
835 if (vec->vector_ev->nb_elem == vec->max_vector_count) {
836 ev->event = vec->event;
837 ev->vec = vec->vector_ev;
840 vec->vector_ev = NULL;
841 TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
848 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
850 uint16_t rx_queue_id,
851 struct rte_mbuf **mbufs,
855 struct eth_device_info *dev_info =
856 &rx_adapter->eth_devices[eth_dev_id];
857 struct eth_rx_queue_info *eth_rx_queue_info =
858 &dev_info->rx_queue[rx_queue_id];
859 struct rte_eth_event_enqueue_buffer *buf =
860 &rx_adapter->event_enqueue_buffer;
861 struct rte_event *ev = &buf->events[buf->count];
862 uint64_t event = eth_rx_queue_info->event;
863 uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
864 struct rte_mbuf *m = mbufs[0];
871 if (!eth_rx_queue_info->ena_vector) {
872 /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
873 rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
874 do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
875 for (i = 0; i < num; i++) {
878 rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
881 ev->flow_id = (rss & ~flow_id_mask) |
882 (ev->flow_id & flow_id_mask);
887 num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
891 if (num && dev_info->cb_fn) {
894 nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
895 ETH_EVENT_BUFFER_SIZE, buf->count,
896 &buf->events[buf->count], num,
897 dev_info->cb_arg, &dropped);
898 if (unlikely(nb_cb > num))
899 RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
904 rx_adapter->stats.rx_dropped += dropped;
910 /* Enqueue packets from <port, q> to event buffer */
911 static inline uint32_t
912 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
919 struct rte_mbuf *mbufs[BATCH_SIZE];
920 struct rte_eth_event_enqueue_buffer *buf =
921 &rx_adapter->event_enqueue_buffer;
922 struct rte_event_eth_rx_adapter_stats *stats =
929 /* Don't do a batch dequeue from the rx queue if there isn't
930 * enough space in the enqueue buffer.
932 while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
933 if (buf->count >= BATCH_SIZE)
934 rxa_flush_event_buffer(rx_adapter);
936 stats->rx_poll_count++;
937 n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
943 rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
945 if (rx_count + nb_rx > max_rx)
950 rxa_flush_event_buffer(rx_adapter);
956 rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
963 struct eth_device_info *dev_info;
964 struct eth_rx_queue_info *queue_info;
971 dev_info = &rx_adapter->eth_devices[port_id];
972 queue_info = &dev_info->rx_queue[queue];
973 rte_spinlock_lock(&rx_adapter->intr_ring_lock);
974 if (rxa_shared_intr(dev_info, queue))
975 intr_enabled = &dev_info->shared_intr_enabled;
977 intr_enabled = &queue_info->intr_enabled;
981 err = rte_ring_enqueue(rx_adapter->intr_ring, data);
982 /* Entry should always be available.
983 * The ring size equals the maximum number of interrupt
984 * vectors supported (an interrupt vector is shared in
985 * case of shared interrupts)
988 RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
989 " to ring: %s", strerror(-err));
991 rte_eth_dev_rx_intr_disable(port_id, queue);
993 rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
997 rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
998 uint32_t num_intr_vec)
1000 if (rx_adapter->num_intr_vec + num_intr_vec >
1001 RTE_EVENT_ETH_INTR_RING_SIZE) {
1002 RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
1003 " %d needed %d limit %d", rx_adapter->num_intr_vec,
1004 num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
1011 /* Delete entries for (dev, queue) from the interrupt ring */
1013 rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
1014 struct eth_device_info *dev_info,
1015 uint16_t rx_queue_id)
1018 union queue_data qd;
1020 rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1022 n = rte_ring_count(rx_adapter->intr_ring);
1023 for (i = 0; i < n; i++) {
1024 rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1025 if (!rxa_shared_intr(dev_info, rx_queue_id)) {
1026 if (qd.port == dev_info->dev->data->port_id &&
1027 qd.queue == rx_queue_id)
1030 if (qd.port == dev_info->dev->data->port_id)
1033 rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
1036 rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1039 /* pthread callback handling interrupt mode receive queues
1040 * After receiving an Rx interrupt, it enqueues the port id and queue id of the
1041 * interrupting queue to the adapter's ring buffer for interrupt events.
1042 * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
1043 * the adapter service function.
1046 rxa_intr_thread(void *arg)
1048 struct rte_event_eth_rx_adapter *rx_adapter = arg;
1049 struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
1053 n = rte_epoll_wait(rx_adapter->epd, epoll_events,
1054 RTE_EVENT_ETH_INTR_RING_SIZE, -1);
1055 if (unlikely(n < 0))
1056 RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
1058 for (i = 0; i < n; i++) {
1059 rxa_intr_ring_enqueue(rx_adapter,
1060 epoll_events[i].epdata.data);
1067 /* Dequeue <port, q> from interrupt ring and enqueue received
1070 static inline uint32_t
1071 rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
1076 struct rte_eth_event_enqueue_buffer *buf;
1077 rte_spinlock_t *ring_lock;
1078 uint8_t max_done = 0;
1080 if (rx_adapter->num_rx_intr == 0)
1083 if (rte_ring_count(rx_adapter->intr_ring) == 0
1084 && !rx_adapter->qd_valid)
1087 buf = &rx_adapter->event_enqueue_buffer;
1088 ring_lock = &rx_adapter->intr_ring_lock;
1090 if (buf->count >= BATCH_SIZE)
1091 rxa_flush_event_buffer(rx_adapter);
1093 while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
1094 struct eth_device_info *dev_info;
1097 union queue_data qd = rx_adapter->qd;
1100 if (!rx_adapter->qd_valid) {
1101 struct eth_rx_queue_info *queue_info;
1103 rte_spinlock_lock(ring_lock);
1104 err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1106 rte_spinlock_unlock(ring_lock);
1112 rx_adapter->qd = qd;
1113 rx_adapter->qd_valid = 1;
1114 dev_info = &rx_adapter->eth_devices[port];
1115 if (rxa_shared_intr(dev_info, queue))
1116 dev_info->shared_intr_enabled = 1;
1118 queue_info = &dev_info->rx_queue[queue];
1119 queue_info->intr_enabled = 1;
1121 rte_eth_dev_rx_intr_enable(port, queue);
1122 rte_spinlock_unlock(ring_lock);
1127 dev_info = &rx_adapter->eth_devices[port];
1130 if (rxa_shared_intr(dev_info, queue)) {
1134 nb_queues = dev_info->dev->data->nb_rx_queues;
1136 for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1137 uint8_t enq_buffer_full;
1139 if (!rxa_intr_queue(dev_info, i))
1141 n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1142 rx_adapter->max_nb_rx,
1146 enq_buffer_full = !rxq_empty && n == 0;
1147 max_done = nb_rx > rx_adapter->max_nb_rx;
1149 if (enq_buffer_full || max_done) {
1150 dev_info->next_q_idx = i;
1155 rx_adapter->qd_valid = 0;
1157 /* Reinitialize for next interrupt */
1158 dev_info->next_q_idx = dev_info->multi_intr_cap ?
1159 RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1162 n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1163 rx_adapter->max_nb_rx,
1165 rx_adapter->qd_valid = !rxq_empty;
1167 if (nb_rx > rx_adapter->max_nb_rx)
1173 rx_adapter->stats.rx_intr_packets += nb_rx;
1178 * Polls receive queues added to the event adapter and enqueues received
1179 * packets to the event device.
1181 * The receive code enqueues initially to a temporary buffer, the
1182 * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1184 * If there isn't space available in the temporary buffer, packets from the
1185 * Rx queue aren't dequeued from the eth device, this back pressures the
1186 * eth device, in virtual device environments this back pressure is relayed to
1187 * the hypervisor's switching layer where adjustments can be made to deal with
1190 static inline uint32_t
1191 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
1195 struct rte_eth_event_enqueue_buffer *buf;
1199 wrr_pos = rx_adapter->wrr_pos;
1200 max_nb_rx = rx_adapter->max_nb_rx;
1201 buf = &rx_adapter->event_enqueue_buffer;
1203 /* Iterate through a WRR sequence */
1204 for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1205 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1206 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1207 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1209 /* Don't do a batch dequeue from the rx queue if there isn't
1210 * enough space in the enqueue buffer.
1212 if (buf->count >= BATCH_SIZE)
1213 rxa_flush_event_buffer(rx_adapter);
1214 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
1215 rx_adapter->wrr_pos = wrr_pos;
1219 nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1221 if (nb_rx > max_nb_rx) {
1222 rx_adapter->wrr_pos =
1223 (wrr_pos + 1) % rx_adapter->wrr_len;
1227 if (++wrr_pos == rx_adapter->wrr_len)
1234 rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
1236 struct rte_event_eth_rx_adapter *rx_adapter = arg;
1237 struct rte_eth_event_enqueue_buffer *buf =
1238 &rx_adapter->event_enqueue_buffer;
1239 struct rte_event *ev;
1242 rxa_flush_event_buffer(rx_adapter);
1244 if (vec->vector_ev->nb_elem == 0)
1246 ev = &buf->events[buf->count];
1249 ev->event = vec->event;
1250 ev->vec = vec->vector_ev;
1253 vec->vector_ev = NULL;
1258 rxa_service_func(void *args)
1260 struct rte_event_eth_rx_adapter *rx_adapter = args;
1261 struct rte_event_eth_rx_adapter_stats *stats;
1263 if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1265 if (!rx_adapter->rxa_started) {
1266 rte_spinlock_unlock(&rx_adapter->rx_lock);
1270 if (rx_adapter->ena_vector) {
1271 if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
1272 rx_adapter->vector_tmo_ticks) {
1273 struct eth_rx_vector_data *vec;
1275 TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1276 uint64_t elapsed_time = rte_rdtsc() - vec->ts;
1278 if (elapsed_time >= vec->vector_timeout_ticks) {
1279 rxa_vector_expire(vec, rx_adapter);
1280 TAILQ_REMOVE(&rx_adapter->vector_list,
1284 rx_adapter->prev_expiry_ts = rte_rdtsc();
1288 stats = &rx_adapter->stats;
1289 stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
1290 stats->rx_packets += rxa_poll(rx_adapter);
1291 rte_spinlock_unlock(&rx_adapter->rx_lock);
1296 rte_event_eth_rx_adapter_init(void)
1298 const char *name = "rte_event_eth_rx_adapter_array";
1299 const struct rte_memzone *mz;
1302 sz = sizeof(*event_eth_rx_adapter) *
1303 RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1304 sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1306 mz = rte_memzone_lookup(name);
1308 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1309 RTE_CACHE_LINE_SIZE);
1311 RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1317 event_eth_rx_adapter = mz->addr;
1321 static inline struct rte_event_eth_rx_adapter *
1322 rxa_id_to_adapter(uint8_t id)
1324 return event_eth_rx_adapter ?
1325 event_eth_rx_adapter[id] : NULL;
1329 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1330 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1333 struct rte_eventdev *dev;
1334 struct rte_event_dev_config dev_conf;
1337 struct rte_event_port_conf *port_conf = arg;
1338 struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1340 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1341 dev_conf = dev->data->dev_conf;
1343 started = dev->data->dev_started;
1345 rte_event_dev_stop(dev_id);
1346 port_id = dev_conf.nb_event_ports;
1347 dev_conf.nb_event_ports += 1;
1348 ret = rte_event_dev_configure(dev_id, &dev_conf);
1350 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1353 if (rte_event_dev_start(dev_id))
1359 ret = rte_event_port_setup(dev_id, port_id, port_conf);
1361 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1366 conf->event_port_id = port_id;
1367 conf->max_nb_rx = 128;
1369 ret = rte_event_dev_start(dev_id);
1370 rx_adapter->default_cb_arg = 1;
1375 rxa_epoll_create1(void)
1379 fd = epoll_create1(EPOLL_CLOEXEC);
1380 return fd < 0 ? -errno : fd;
1387 rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
1389 if (rx_adapter->epd != INIT_FD)
1392 rx_adapter->epd = rxa_epoll_create1();
1393 if (rx_adapter->epd < 0) {
1394 int err = rx_adapter->epd;
1395 rx_adapter->epd = INIT_FD;
1396 RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1404 rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1407 char thread_name[RTE_MAX_THREAD_NAME_LEN];
1409 if (rx_adapter->intr_ring)
1412 rx_adapter->intr_ring = rte_ring_create("intr_ring",
1413 RTE_EVENT_ETH_INTR_RING_SIZE,
1414 rte_socket_id(), 0);
1415 if (!rx_adapter->intr_ring)
1418 rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1419 RTE_EVENT_ETH_INTR_RING_SIZE *
1420 sizeof(struct rte_epoll_event),
1421 RTE_CACHE_LINE_SIZE,
1422 rx_adapter->socket_id);
1423 if (!rx_adapter->epoll_events) {
1428 rte_spinlock_init(&rx_adapter->intr_ring_lock);
1430 snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1431 "rx-intr-thread-%d", rx_adapter->id);
1433 err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1434 NULL, rxa_intr_thread, rx_adapter);
1438 RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1440 rte_ring_free(rx_adapter->intr_ring);
1441 rx_adapter->intr_ring = NULL;
1442 rx_adapter->epoll_events = NULL;
1447 rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1451 err = pthread_cancel(rx_adapter->rx_intr_thread);
1453 RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1456 err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1458 RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1460 rte_free(rx_adapter->epoll_events);
1461 rte_ring_free(rx_adapter->intr_ring);
1462 rx_adapter->intr_ring = NULL;
1463 rx_adapter->epoll_events = NULL;
1468 rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
1472 if (rx_adapter->num_rx_intr == 0)
1475 ret = rxa_destroy_intr_thread(rx_adapter);
1479 close(rx_adapter->epd);
1480 rx_adapter->epd = INIT_FD;
1486 rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1487 struct eth_device_info *dev_info,
1488 uint16_t rx_queue_id)
1491 uint16_t eth_dev_id = dev_info->dev->data->port_id;
1492 int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1494 err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1496 RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1501 err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1506 RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1509 dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1511 dev_info->shared_intr_enabled = 0;
1516 rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1517 struct eth_device_info *dev_info,
1524 if (dev_info->nb_rx_intr == 0)
1528 if (rx_queue_id == -1) {
1529 s = dev_info->nb_shared_intr;
1530 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1534 q = dev_info->intr_queue[i];
1535 sintr = rxa_shared_intr(dev_info, q);
1538 if (!sintr || s == 0) {
1540 err = rxa_disable_intr(rx_adapter, dev_info,
1544 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1549 if (!rxa_intr_queue(dev_info, rx_queue_id))
1551 if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1552 dev_info->nb_shared_intr == 1) {
1553 err = rxa_disable_intr(rx_adapter, dev_info,
1557 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1561 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1562 if (dev_info->intr_queue[i] == rx_queue_id) {
1563 for (; i < dev_info->nb_rx_intr - 1; i++)
1564 dev_info->intr_queue[i] =
1565 dev_info->intr_queue[i + 1];
1575 rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1576 struct eth_device_info *dev_info,
1577 uint16_t rx_queue_id)
1580 uint16_t eth_dev_id = dev_info->dev->data->port_id;
1581 union queue_data qd;
1583 uint16_t *intr_queue;
1584 int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1586 if (rxa_intr_queue(dev_info, rx_queue_id))
1589 intr_queue = dev_info->intr_queue;
1590 if (dev_info->intr_queue == NULL) {
1592 dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1593 dev_info->intr_queue =
1595 rx_adapter->mem_name,
1598 rx_adapter->socket_id);
1599 if (dev_info->intr_queue == NULL)
1603 init_fd = rx_adapter->epd;
1604 err = rxa_init_epd(rx_adapter);
1606 goto err_free_queue;
1608 qd.port = eth_dev_id;
1609 qd.queue = rx_queue_id;
1611 err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1616 RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1617 " Rx Queue %u err %d", rx_queue_id, err);
1621 err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1623 RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1624 " Rx Queue %u err %d", rx_queue_id, err);
1629 err = rxa_create_intr_thread(rx_adapter);
1632 dev_info->shared_intr_enabled = 1;
1634 dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1639 err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1641 RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1642 " Rx Queue %u err %d", rx_queue_id, err);
1644 err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1649 RTE_EDEV_LOG_ERR("Could not delete event for"
1650 " Rx Queue %u err %d", rx_queue_id, err1);
1653 if (init_fd == INIT_FD) {
1654 close(rx_adapter->epd);
1655 rx_adapter->epd = -1;
1658 if (intr_queue == NULL)
1659 rte_free(dev_info->intr_queue);
1665 rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1666 struct eth_device_info *dev_info,
1672 int shared_done = (dev_info->nb_shared_intr > 0);
1674 if (rx_queue_id != -1) {
1675 if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1677 return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1681 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1683 if (rxa_shared_intr(dev_info, i) && shared_done)
1686 err = rxa_config_intr(rx_adapter, dev_info, i);
1688 shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1691 dev_info->shared_intr_enabled = 1;
1700 shared_done = (dev_info->nb_shared_intr > 0);
1701 for (j = 0; j < i; j++) {
1702 if (rxa_intr_queue(dev_info, j))
1704 if (rxa_shared_intr(dev_info, j) && si != j)
1706 err = rxa_disable_intr(rx_adapter, dev_info, j);
1717 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
1720 struct rte_service_spec service;
1721 struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1723 if (rx_adapter->service_inited)
1726 memset(&service, 0, sizeof(service));
1727 snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1728 "rte_event_eth_rx_adapter_%d", id);
1729 service.socket_id = rx_adapter->socket_id;
1730 service.callback = rxa_service_func;
1731 service.callback_userdata = rx_adapter;
1732 /* Service function handles locking for queue add/del updates */
1733 service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1734 ret = rte_service_component_register(&service, &rx_adapter->service_id);
1736 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1741 ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1742 &rx_adapter_conf, rx_adapter->conf_arg);
1744 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1748 rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1749 rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1750 rx_adapter->service_inited = 1;
1751 rx_adapter->epd = INIT_FD;
1755 rte_service_component_unregister(rx_adapter->service_id);
1760 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1761 struct eth_device_info *dev_info,
1762 int32_t rx_queue_id,
1765 struct eth_rx_queue_info *queue_info;
1769 if (dev_info->rx_queue == NULL)
1772 if (rx_queue_id == -1) {
1773 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1774 rxa_update_queue(rx_adapter, dev_info, i, add);
1776 queue_info = &dev_info->rx_queue[rx_queue_id];
1777 enabled = queue_info->queue_enabled;
1779 rx_adapter->nb_queues += !enabled;
1780 dev_info->nb_dev_queues += !enabled;
1782 rx_adapter->nb_queues -= enabled;
1783 dev_info->nb_dev_queues -= enabled;
1785 queue_info->queue_enabled = !!add;
1790 rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t vector_count,
1791 uint64_t vector_ns, struct rte_mempool *mp, uint32_t qid,
1794 #define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
1795 struct eth_rx_vector_data *vector_data;
1798 vector_data = &queue_info->vector_data;
1799 vector_data->max_vector_count = vector_count;
1800 vector_data->port = port_id;
1801 vector_data->queue = qid;
1802 vector_data->vector_pool = mp;
1803 vector_data->vector_timeout_ticks =
1804 NSEC2TICK(vector_ns, rte_get_timer_hz());
1805 vector_data->ts = 0;
1806 flow_id = queue_info->event & 0xFFFFF;
1808 flow_id == 0 ? (qid & 0xFFF) | (port_id & 0xFF) << 12 : flow_id;
1809 vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
1813 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
1814 struct eth_device_info *dev_info,
1815 int32_t rx_queue_id)
1817 struct eth_rx_vector_data *vec;
1823 if (rx_adapter->nb_queues == 0)
1826 if (rx_queue_id == -1) {
1827 uint16_t nb_rx_queues;
1830 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1831 for (i = 0; i < nb_rx_queues; i++)
1832 rxa_sw_del(rx_adapter, dev_info, i);
1836 /* Push all the partial event vectors to event device. */
1837 TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1838 if (vec->queue != rx_queue_id)
1840 rxa_vector_expire(vec, rx_adapter);
1841 TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
1844 pollq = rxa_polled_queue(dev_info, rx_queue_id);
1845 intrq = rxa_intr_queue(dev_info, rx_queue_id);
1846 sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1847 rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1848 rx_adapter->num_rx_polled -= pollq;
1849 dev_info->nb_rx_poll -= pollq;
1850 rx_adapter->num_rx_intr -= intrq;
1851 dev_info->nb_rx_intr -= intrq;
1852 dev_info->nb_shared_intr -= intrq && sintrq;
1856 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1857 struct eth_device_info *dev_info,
1858 int32_t rx_queue_id,
1859 const struct rte_event_eth_rx_adapter_queue_conf *conf)
1861 struct eth_rx_queue_info *queue_info;
1862 const struct rte_event *ev = &conf->ev;
1866 struct rte_event *qi_ev;
1868 if (rx_queue_id == -1) {
1869 uint16_t nb_rx_queues;
1872 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1873 for (i = 0; i < nb_rx_queues; i++)
1874 rxa_add_queue(rx_adapter, dev_info, i, conf);
1878 pollq = rxa_polled_queue(dev_info, rx_queue_id);
1879 intrq = rxa_intr_queue(dev_info, rx_queue_id);
1880 sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1882 queue_info = &dev_info->rx_queue[rx_queue_id];
1883 queue_info->wt = conf->servicing_weight;
1885 qi_ev = (struct rte_event *)&queue_info->event;
1886 qi_ev->event = ev->event;
1887 qi_ev->op = RTE_EVENT_OP_NEW;
1888 qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
1889 qi_ev->sub_event_type = 0;
1891 if (conf->rx_queue_flags &
1892 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
1893 queue_info->flow_id_mask = ~0;
1897 rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
1898 if (rxa_polled_queue(dev_info, rx_queue_id)) {
1899 rx_adapter->num_rx_polled += !pollq;
1900 dev_info->nb_rx_poll += !pollq;
1901 rx_adapter->num_rx_intr -= intrq;
1902 dev_info->nb_rx_intr -= intrq;
1903 dev_info->nb_shared_intr -= intrq && sintrq;
1906 if (rxa_intr_queue(dev_info, rx_queue_id)) {
1907 rx_adapter->num_rx_polled -= pollq;
1908 dev_info->nb_rx_poll -= pollq;
1909 rx_adapter->num_rx_intr += !intrq;
1910 dev_info->nb_rx_intr += !intrq;
1911 dev_info->nb_shared_intr += !intrq && sintrq;
1912 if (dev_info->nb_shared_intr == 1) {
1913 if (dev_info->multi_intr_cap)
1914 dev_info->next_q_idx =
1915 RTE_MAX_RXTX_INTR_VEC_ID - 1;
1917 dev_info->next_q_idx = 0;
1923 rxa_sw_event_vector_configure(
1924 struct rte_event_eth_rx_adapter *rx_adapter, uint16_t eth_dev_id,
1926 const struct rte_event_eth_rx_adapter_event_vector_config *config)
1928 struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
1929 struct eth_rx_queue_info *queue_info;
1930 struct rte_event *qi_ev;
1932 if (rx_queue_id == -1) {
1933 uint16_t nb_rx_queues;
1936 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1937 for (i = 0; i < nb_rx_queues; i++)
1938 rxa_sw_event_vector_configure(rx_adapter, eth_dev_id, i,
1943 queue_info = &dev_info->rx_queue[rx_queue_id];
1944 qi_ev = (struct rte_event *)&queue_info->event;
1945 queue_info->ena_vector = 1;
1946 qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
1947 rxa_set_vector_data(queue_info, config->vector_sz,
1948 config->vector_timeout_ns, config->vector_mp,
1949 rx_queue_id, dev_info->dev->data->port_id);
1950 rx_adapter->ena_vector = 1;
1951 rx_adapter->vector_tmo_ticks =
1952 rx_adapter->vector_tmo_ticks ?
1953 RTE_MIN(config->vector_timeout_ns >> 1,
1954 rx_adapter->vector_tmo_ticks) :
1955 config->vector_timeout_ns >> 1;
1958 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
1959 uint16_t eth_dev_id,
1961 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
1963 struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
1964 struct rte_event_eth_rx_adapter_queue_conf temp_conf;
1966 struct eth_rx_poll_entry *rx_poll;
1967 struct eth_rx_queue_info *rx_queue;
1969 uint16_t nb_rx_queues;
1970 uint32_t nb_rx_poll, nb_wrr;
1971 uint32_t nb_rx_intr;
1975 if (queue_conf->servicing_weight == 0) {
1976 struct rte_eth_dev_data *data = dev_info->dev->data;
1978 temp_conf = *queue_conf;
1979 if (!data->dev_conf.intr_conf.rxq) {
1980 /* If Rx interrupts are disabled set wt = 1 */
1981 temp_conf.servicing_weight = 1;
1983 queue_conf = &temp_conf;
1986 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1987 rx_queue = dev_info->rx_queue;
1988 wt = queue_conf->servicing_weight;
1990 if (dev_info->rx_queue == NULL) {
1991 dev_info->rx_queue =
1992 rte_zmalloc_socket(rx_adapter->mem_name,
1994 sizeof(struct eth_rx_queue_info), 0,
1995 rx_adapter->socket_id);
1996 if (dev_info->rx_queue == NULL)
2002 rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
2003 queue_conf->servicing_weight,
2004 &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2006 if (dev_info->dev->intr_handle)
2007 dev_info->multi_intr_cap =
2008 rte_intr_cap_multiple(dev_info->dev->intr_handle);
2010 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2013 goto err_free_rxqueue;
2016 num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
2018 ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
2020 goto err_free_rxqueue;
2022 ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
2024 goto err_free_rxqueue;
2028 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2029 num_intr_vec = rxa_nb_intr_vect(dev_info,
2031 /* interrupt based queues are being converted to
2032 * poll mode queues, delete the interrupt configuration
2035 ret = rxa_del_intr_queue(rx_adapter,
2036 dev_info, rx_queue_id);
2038 goto err_free_rxqueue;
2042 if (nb_rx_intr == 0) {
2043 ret = rxa_free_intr_resources(rx_adapter);
2045 goto err_free_rxqueue;
2051 if (rx_queue_id == -1) {
2052 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
2053 dev_info->intr_queue[i] = i;
2055 if (!rxa_intr_queue(dev_info, rx_queue_id))
2056 dev_info->intr_queue[nb_rx_intr - 1] =
2063 rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
2064 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2066 rte_free(rx_adapter->eth_rx_poll);
2067 rte_free(rx_adapter->wrr_sched);
2069 rx_adapter->eth_rx_poll = rx_poll;
2070 rx_adapter->wrr_sched = rx_wrr;
2071 rx_adapter->wrr_len = nb_wrr;
2072 rx_adapter->num_intr_vec += num_intr_vec;
2076 if (rx_queue == NULL) {
2077 rte_free(dev_info->rx_queue);
2078 dev_info->rx_queue = NULL;
2088 rxa_ctrl(uint8_t id, int start)
2090 struct rte_event_eth_rx_adapter *rx_adapter;
2091 struct rte_eventdev *dev;
2092 struct eth_device_info *dev_info;
2094 int use_service = 0;
2097 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2098 rx_adapter = rxa_id_to_adapter(id);
2099 if (rx_adapter == NULL)
2102 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2104 RTE_ETH_FOREACH_DEV(i) {
2105 dev_info = &rx_adapter->eth_devices[i];
2106 /* if start check for num dev queues */
2107 if (start && !dev_info->nb_dev_queues)
2109 /* if stop check if dev has been started */
2110 if (stop && !dev_info->dev_rx_started)
2112 use_service |= !dev_info->internal_event_port;
2113 dev_info->dev_rx_started = start;
2114 if (dev_info->internal_event_port == 0)
2116 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
2117 &rte_eth_devices[i]) :
2118 (*dev->dev_ops->eth_rx_adapter_stop)(dev,
2119 &rte_eth_devices[i]);
2123 rte_spinlock_lock(&rx_adapter->rx_lock);
2124 rx_adapter->rxa_started = start;
2125 rte_service_runstate_set(rx_adapter->service_id, start);
2126 rte_spinlock_unlock(&rx_adapter->rx_lock);
2133 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
2134 rte_event_eth_rx_adapter_conf_cb conf_cb,
2137 struct rte_event_eth_rx_adapter *rx_adapter;
2141 char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
2142 const uint8_t default_rss_key[] = {
2143 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
2144 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
2145 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
2146 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
2147 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
2150 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2151 RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2152 if (conf_cb == NULL)
2155 if (event_eth_rx_adapter == NULL) {
2156 ret = rte_event_eth_rx_adapter_init();
2161 rx_adapter = rxa_id_to_adapter(id);
2162 if (rx_adapter != NULL) {
2163 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
2167 socket_id = rte_event_dev_socket_id(dev_id);
2168 snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
2169 "rte_event_eth_rx_adapter_%d",
2172 rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
2173 RTE_CACHE_LINE_SIZE, socket_id);
2174 if (rx_adapter == NULL) {
2175 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
2179 rx_adapter->eventdev_id = dev_id;
2180 rx_adapter->socket_id = socket_id;
2181 rx_adapter->conf_cb = conf_cb;
2182 rx_adapter->conf_arg = conf_arg;
2183 rx_adapter->id = id;
2184 TAILQ_INIT(&rx_adapter->vector_list);
2185 strcpy(rx_adapter->mem_name, mem_name);
2186 rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
2188 sizeof(struct eth_device_info), 0,
2190 rte_convert_rss_key((const uint32_t *)default_rss_key,
2191 (uint32_t *)rx_adapter->rss_key_be,
2192 RTE_DIM(default_rss_key));
2194 if (rx_adapter->eth_devices == NULL) {
2195 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
2196 rte_free(rx_adapter);
2199 rte_spinlock_init(&rx_adapter->rx_lock);
2200 for (i = 0; i < RTE_MAX_ETHPORTS; i++)
2201 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
2203 event_eth_rx_adapter[id] = rx_adapter;
2204 if (conf_cb == rxa_default_conf_cb)
2205 rx_adapter->default_cb_arg = 1;
2206 rte_eventdev_trace_eth_rx_adapter_create(id, dev_id, conf_cb,
2212 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2213 struct rte_event_port_conf *port_config)
2215 struct rte_event_port_conf *pc;
2218 if (port_config == NULL)
2220 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2222 pc = rte_malloc(NULL, sizeof(*pc), 0);
2226 ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2227 rxa_default_conf_cb,
2235 rte_event_eth_rx_adapter_free(uint8_t id)
2237 struct rte_event_eth_rx_adapter *rx_adapter;
2239 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2241 rx_adapter = rxa_id_to_adapter(id);
2242 if (rx_adapter == NULL)
2245 if (rx_adapter->nb_queues) {
2246 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2247 rx_adapter->nb_queues);
2251 if (rx_adapter->default_cb_arg)
2252 rte_free(rx_adapter->conf_arg);
2253 rte_free(rx_adapter->eth_devices);
2254 rte_free(rx_adapter);
2255 event_eth_rx_adapter[id] = NULL;
2257 rte_eventdev_trace_eth_rx_adapter_free(id);
2262 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2263 uint16_t eth_dev_id,
2264 int32_t rx_queue_id,
2265 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2269 struct rte_event_eth_rx_adapter *rx_adapter;
2270 struct rte_eventdev *dev;
2271 struct eth_device_info *dev_info;
2273 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2274 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2276 rx_adapter = rxa_id_to_adapter(id);
2277 if ((rx_adapter == NULL) || (queue_conf == NULL))
2280 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2281 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2285 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2286 "eth port %" PRIu16, id, eth_dev_id);
2290 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2291 && (queue_conf->rx_queue_flags &
2292 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2293 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2294 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2299 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0 &&
2300 (queue_conf->rx_queue_flags &
2301 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR)) {
2302 RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
2303 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2308 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2309 (rx_queue_id != -1)) {
2310 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2311 "event queue, eth port: %" PRIu16 " adapter id: %"
2312 PRIu8, eth_dev_id, id);
2316 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2317 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2318 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2319 (uint16_t)rx_queue_id);
2323 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2325 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2326 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2328 if (dev_info->rx_queue == NULL) {
2329 dev_info->rx_queue =
2330 rte_zmalloc_socket(rx_adapter->mem_name,
2331 dev_info->dev->data->nb_rx_queues *
2332 sizeof(struct eth_rx_queue_info), 0,
2333 rx_adapter->socket_id);
2334 if (dev_info->rx_queue == NULL)
2338 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2339 &rte_eth_devices[eth_dev_id],
2340 rx_queue_id, queue_conf);
2342 dev_info->internal_event_port = 1;
2343 rxa_update_queue(rx_adapter,
2344 &rx_adapter->eth_devices[eth_dev_id],
2349 rte_spinlock_lock(&rx_adapter->rx_lock);
2350 dev_info->internal_event_port = 0;
2351 ret = rxa_init_service(rx_adapter, id);
2353 uint32_t service_id = rx_adapter->service_id;
2354 ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2356 rte_service_component_runstate_set(service_id,
2357 rxa_sw_adapter_queue_count(rx_adapter));
2359 rte_spinlock_unlock(&rx_adapter->rx_lock);
2362 rte_eventdev_trace_eth_rx_adapter_queue_add(id, eth_dev_id,
2363 rx_queue_id, queue_conf, ret);
2371 rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
2373 limits->max_sz = MAX_VECTOR_SIZE;
2374 limits->min_sz = MIN_VECTOR_SIZE;
2375 limits->max_timeout_ns = MAX_VECTOR_NS;
2376 limits->min_timeout_ns = MIN_VECTOR_NS;
2382 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2383 int32_t rx_queue_id)
2386 struct rte_eventdev *dev;
2387 struct rte_event_eth_rx_adapter *rx_adapter;
2388 struct eth_device_info *dev_info;
2390 uint32_t nb_rx_poll = 0;
2391 uint32_t nb_wrr = 0;
2392 uint32_t nb_rx_intr;
2393 struct eth_rx_poll_entry *rx_poll = NULL;
2394 uint32_t *rx_wrr = NULL;
2397 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2398 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2400 rx_adapter = rxa_id_to_adapter(id);
2401 if (rx_adapter == NULL)
2404 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2405 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2411 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2412 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2413 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2414 (uint16_t)rx_queue_id);
2418 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2420 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2421 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2423 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2424 &rte_eth_devices[eth_dev_id],
2427 rxa_update_queue(rx_adapter,
2428 &rx_adapter->eth_devices[eth_dev_id],
2431 if (dev_info->nb_dev_queues == 0) {
2432 rte_free(dev_info->rx_queue);
2433 dev_info->rx_queue = NULL;
2437 rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2438 &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2440 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2445 rte_spinlock_lock(&rx_adapter->rx_lock);
2448 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2450 num_intr_vec = rxa_nb_intr_vect(dev_info,
2452 ret = rxa_del_intr_queue(rx_adapter, dev_info,
2458 if (nb_rx_intr == 0) {
2459 ret = rxa_free_intr_resources(rx_adapter);
2464 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2465 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2467 rte_free(rx_adapter->eth_rx_poll);
2468 rte_free(rx_adapter->wrr_sched);
2470 if (nb_rx_intr == 0) {
2471 rte_free(dev_info->intr_queue);
2472 dev_info->intr_queue = NULL;
2475 rx_adapter->eth_rx_poll = rx_poll;
2476 rx_adapter->wrr_sched = rx_wrr;
2477 rx_adapter->wrr_len = nb_wrr;
2478 rx_adapter->num_intr_vec += num_intr_vec;
2480 if (dev_info->nb_dev_queues == 0) {
2481 rte_free(dev_info->rx_queue);
2482 dev_info->rx_queue = NULL;
2485 rte_spinlock_unlock(&rx_adapter->rx_lock);
2492 rte_service_component_runstate_set(rx_adapter->service_id,
2493 rxa_sw_adapter_queue_count(rx_adapter));
2496 rte_eventdev_trace_eth_rx_adapter_queue_del(id, eth_dev_id,
2502 rte_event_eth_rx_adapter_queue_event_vector_config(
2503 uint8_t id, uint16_t eth_dev_id, int32_t rx_queue_id,
2504 struct rte_event_eth_rx_adapter_event_vector_config *config)
2506 struct rte_event_eth_rx_adapter_vector_limits limits;
2507 struct rte_event_eth_rx_adapter *rx_adapter;
2508 struct rte_eventdev *dev;
2512 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2513 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2515 rx_adapter = rxa_id_to_adapter(id);
2516 if ((rx_adapter == NULL) || (config == NULL))
2519 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2520 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2523 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2524 "eth port %" PRIu16,
2529 if (!(cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR)) {
2530 RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
2531 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2536 ret = rte_event_eth_rx_adapter_vector_limits_get(
2537 rx_adapter->eventdev_id, eth_dev_id, &limits);
2539 RTE_EDEV_LOG_ERR("Failed to get vector limits edev %" PRIu8
2540 "eth port %" PRIu16,
2541 rx_adapter->eventdev_id, eth_dev_id);
2545 if (config->vector_sz < limits.min_sz ||
2546 config->vector_sz > limits.max_sz ||
2547 config->vector_timeout_ns < limits.min_timeout_ns ||
2548 config->vector_timeout_ns > limits.max_timeout_ns ||
2549 config->vector_mp == NULL) {
2550 RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2551 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2555 if (config->vector_mp->elt_size <
2556 (sizeof(struct rte_event_vector) +
2557 (sizeof(uintptr_t) * config->vector_sz))) {
2558 RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2559 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2564 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2565 RTE_FUNC_PTR_OR_ERR_RET(
2566 *dev->dev_ops->eth_rx_adapter_event_vector_config,
2568 ret = dev->dev_ops->eth_rx_adapter_event_vector_config(
2569 dev, &rte_eth_devices[eth_dev_id], rx_queue_id, config);
2571 rxa_sw_event_vector_configure(rx_adapter, eth_dev_id,
2572 rx_queue_id, config);
2579 rte_event_eth_rx_adapter_vector_limits_get(
2580 uint8_t dev_id, uint16_t eth_port_id,
2581 struct rte_event_eth_rx_adapter_vector_limits *limits)
2583 struct rte_eventdev *dev;
2587 RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2588 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_port_id, -EINVAL);
2593 dev = &rte_eventdevs[dev_id];
2595 ret = rte_event_eth_rx_adapter_caps_get(dev_id, eth_port_id, &cap);
2597 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2598 "eth port %" PRIu16,
2599 dev_id, eth_port_id);
2603 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2604 RTE_FUNC_PTR_OR_ERR_RET(
2605 *dev->dev_ops->eth_rx_adapter_vector_limits_get,
2607 ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
2608 dev, &rte_eth_devices[eth_port_id], limits);
2610 ret = rxa_sw_vector_limits(limits);
2617 rte_event_eth_rx_adapter_start(uint8_t id)
2619 rte_eventdev_trace_eth_rx_adapter_start(id);
2620 return rxa_ctrl(id, 1);
2624 rte_event_eth_rx_adapter_stop(uint8_t id)
2626 rte_eventdev_trace_eth_rx_adapter_stop(id);
2627 return rxa_ctrl(id, 0);
2631 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2632 struct rte_event_eth_rx_adapter_stats *stats)
2634 struct rte_event_eth_rx_adapter *rx_adapter;
2635 struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2636 struct rte_event_eth_rx_adapter_stats dev_stats;
2637 struct rte_eventdev *dev;
2638 struct eth_device_info *dev_info;
2642 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2644 rx_adapter = rxa_id_to_adapter(id);
2645 if (rx_adapter == NULL || stats == NULL)
2648 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2649 memset(stats, 0, sizeof(*stats));
2650 RTE_ETH_FOREACH_DEV(i) {
2651 dev_info = &rx_adapter->eth_devices[i];
2652 if (dev_info->internal_event_port == 0 ||
2653 dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2655 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2656 &rte_eth_devices[i],
2660 dev_stats_sum.rx_packets += dev_stats.rx_packets;
2661 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2664 if (rx_adapter->service_inited)
2665 *stats = rx_adapter->stats;
2667 stats->rx_packets += dev_stats_sum.rx_packets;
2668 stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2673 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
2675 struct rte_event_eth_rx_adapter *rx_adapter;
2676 struct rte_eventdev *dev;
2677 struct eth_device_info *dev_info;
2680 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2682 rx_adapter = rxa_id_to_adapter(id);
2683 if (rx_adapter == NULL)
2686 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2687 RTE_ETH_FOREACH_DEV(i) {
2688 dev_info = &rx_adapter->eth_devices[i];
2689 if (dev_info->internal_event_port == 0 ||
2690 dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
2692 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
2693 &rte_eth_devices[i]);
2696 memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
2701 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
2703 struct rte_event_eth_rx_adapter *rx_adapter;
2705 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2707 rx_adapter = rxa_id_to_adapter(id);
2708 if (rx_adapter == NULL || service_id == NULL)
2711 if (rx_adapter->service_inited)
2712 *service_id = rx_adapter->service_id;
2714 return rx_adapter->service_inited ? 0 : -ESRCH;
2718 rte_event_eth_rx_adapter_cb_register(uint8_t id,
2719 uint16_t eth_dev_id,
2720 rte_event_eth_rx_adapter_cb_fn cb_fn,
2723 struct rte_event_eth_rx_adapter *rx_adapter;
2724 struct eth_device_info *dev_info;
2728 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2729 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2731 rx_adapter = rxa_id_to_adapter(id);
2732 if (rx_adapter == NULL)
2735 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2736 if (dev_info->rx_queue == NULL)
2739 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2743 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2744 "eth port %" PRIu16, id, eth_dev_id);
2748 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2749 RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
2750 PRIu16, eth_dev_id);
2754 rte_spinlock_lock(&rx_adapter->rx_lock);
2755 dev_info->cb_fn = cb_fn;
2756 dev_info->cb_arg = cb_arg;
2757 rte_spinlock_unlock(&rx_adapter->rx_lock);