1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2017 Intel Corporation.
10 #include <rte_cycles.h>
11 #include <rte_common.h>
13 #include <rte_errno.h>
14 #include <ethdev_driver.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
20 #include <rte_mbuf_dyn.h>
22 #include "rte_eventdev.h"
23 #include "eventdev_pmd.h"
24 #include "rte_eventdev_trace.h"
25 #include "rte_event_eth_rx_adapter.h"
28 #define BLOCK_CNT_THRESHOLD 10
29 #define ETH_EVENT_BUFFER_SIZE (6*BATCH_SIZE)
30 #define MAX_VECTOR_SIZE 1024
31 #define MIN_VECTOR_SIZE 4
32 #define MAX_VECTOR_NS 1E9
33 #define MIN_VECTOR_NS 1E5
35 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
36 #define ETH_RX_ADAPTER_MEM_NAME_LEN 32
38 #define RSS_KEY_SIZE 40
39 /* value written to intr thread pipe to signal thread exit */
40 #define ETH_BRIDGE_INTR_THREAD_EXIT 1
41 /* Sentinel value to detect initialized file handle */
45 * Used to store port and queue ID of interrupting Rx queue
57 * There is an instance of this struct per polled Rx queue added to the
60 struct eth_rx_poll_entry {
61 /* Eth port to poll */
63 /* Eth rx queue to poll */
67 struct eth_rx_vector_data {
68 TAILQ_ENTRY(eth_rx_vector_data) next;
71 uint16_t max_vector_count;
74 uint64_t vector_timeout_ticks;
75 struct rte_mempool *vector_pool;
76 struct rte_event_vector *vector_ev;
77 } __rte_cache_aligned;
79 TAILQ_HEAD(eth_rx_vector_data_list, eth_rx_vector_data);
81 /* Instance per adapter */
82 struct rte_eth_event_enqueue_buffer {
83 /* Count of events in this buffer */
85 /* Array of events in this buffer */
86 struct rte_event events[ETH_EVENT_BUFFER_SIZE];
87 /* Event enqueue happens from head */
89 /* New packets from rte_eth_rx_burst is enqued from tail */
91 /* last element in the buffer before rollover */
96 struct rte_event_eth_rx_adapter {
98 uint8_t rss_key_be[RSS_KEY_SIZE];
99 /* Event device identifier */
101 /* Per ethernet device structure */
102 struct eth_device_info *eth_devices;
103 /* Event port identifier */
104 uint8_t event_port_id;
105 /* Lock to serialize config updates with service function */
106 rte_spinlock_t rx_lock;
107 /* Max mbufs processed in any service function invocation */
109 /* Receive queues that need to be polled */
110 struct eth_rx_poll_entry *eth_rx_poll;
111 /* Size of the eth_rx_poll array */
112 uint16_t num_rx_polled;
113 /* Weighted round robin schedule */
115 /* wrr_sched[] size */
117 /* Next entry in wrr[] to begin polling */
119 /* Event burst buffer */
120 struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
121 /* Vector enable flag */
123 /* Timestamp of previous vector expiry list traversal */
124 uint64_t prev_expiry_ts;
125 /* Minimum ticks to wait before traversing expiry list */
126 uint64_t vector_tmo_ticks;
128 struct eth_rx_vector_data_list vector_list;
129 /* Per adapter stats */
130 struct rte_event_eth_rx_adapter_stats stats;
131 /* Block count, counts up to BLOCK_CNT_THRESHOLD */
132 uint16_t enq_block_count;
134 uint64_t rx_enq_block_start_ts;
135 /* epoll fd used to wait for Rx interrupts */
137 /* Num of interrupt driven interrupt queues */
138 uint32_t num_rx_intr;
139 /* Used to send <dev id, queue id> of interrupting Rx queues from
140 * the interrupt thread to the Rx thread
142 struct rte_ring *intr_ring;
143 /* Rx Queue data (dev id, queue id) for the last non-empty
147 /* queue_data is valid */
149 /* Interrupt ring lock, synchronizes Rx thread
150 * and interrupt thread
152 rte_spinlock_t intr_ring_lock;
153 /* event array passed to rte_poll_wait */
154 struct rte_epoll_event *epoll_events;
155 /* Count of interrupt vectors in use */
156 uint32_t num_intr_vec;
157 /* Thread blocked on Rx interrupts */
158 pthread_t rx_intr_thread;
159 /* Configuration callback for rte_service configuration */
160 rte_event_eth_rx_adapter_conf_cb conf_cb;
161 /* Configuration callback argument */
163 /* Set if default_cb is being used */
165 /* Service initialization state */
166 uint8_t service_inited;
167 /* Total count of Rx queues in adapter */
169 /* Memory allocation name */
170 char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
171 /* Socket identifier cached from eventdev */
173 /* Per adapter EAL service */
175 /* Adapter started flag */
179 } __rte_cache_aligned;
182 struct eth_device_info {
183 struct rte_eth_dev *dev;
184 struct eth_rx_queue_info *rx_queue;
186 rte_event_eth_rx_adapter_cb_fn cb_fn;
187 /* Rx callback argument */
189 /* Set if ethdev->eventdev packet transfer uses a
192 uint8_t internal_event_port;
193 /* Set if the adapter is processing rx queues for
194 * this eth device and packet processing has been
195 * started, allows for the code to know if the PMD
196 * rx_adapter_stop callback needs to be invoked
198 uint8_t dev_rx_started;
199 /* Number of queues added for this device */
200 uint16_t nb_dev_queues;
201 /* Number of poll based queues
202 * If nb_rx_poll > 0, the start callback will
203 * be invoked if not already invoked
206 /* Number of interrupt based queues
207 * If nb_rx_intr > 0, the start callback will
208 * be invoked if not already invoked.
211 /* Number of queues that use the shared interrupt */
212 uint16_t nb_shared_intr;
213 /* sum(wrr(q)) for all queues within the device
214 * useful when deleting all device queues
217 /* Intr based queue index to start polling from, this is used
218 * if the number of shared interrupts is non-zero
221 /* Intr based queue indices */
222 uint16_t *intr_queue;
223 /* device generates per Rx queue interrupt for queue index
224 * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
227 /* shared interrupt enabled */
228 int shared_intr_enabled;
232 struct eth_rx_queue_info {
233 int queue_enabled; /* True if added */
236 uint16_t wt; /* Polling weight */
237 uint32_t flow_id_mask; /* Set to ~0 if app provides flow id else 0 */
239 struct eth_rx_vector_data vector_data;
242 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
244 /* Enable dynamic timestamp field in mbuf */
245 static uint64_t event_eth_rx_timestamp_dynflag;
246 static int event_eth_rx_timestamp_dynfield_offset = -1;
248 static inline rte_mbuf_timestamp_t *
249 rxa_timestamp_dynfield(struct rte_mbuf *mbuf)
251 return RTE_MBUF_DYNFIELD(mbuf,
252 event_eth_rx_timestamp_dynfield_offset, rte_mbuf_timestamp_t *);
256 rxa_validate_id(uint8_t id)
258 return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
261 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
262 if (!rxa_validate_id(id)) { \
263 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
269 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
271 return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
274 /* Greatest common divisor */
275 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
279 return r ? rxa_gcd_u16(b, r) : b;
282 /* Returns the next queue in the polling sequence
284 * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
287 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
288 unsigned int n, int *cw,
289 struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
290 uint16_t gcd, int prev)
306 q = eth_rx_poll[i].eth_rx_qid;
307 d = eth_rx_poll[i].eth_dev_id;
308 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
316 rxa_shared_intr(struct eth_device_info *dev_info,
321 if (dev_info->dev->intr_handle == NULL)
324 multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
325 return !multi_intr_cap ||
326 rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
330 rxa_intr_queue(struct eth_device_info *dev_info,
333 struct eth_rx_queue_info *queue_info;
335 queue_info = &dev_info->rx_queue[rx_queue_id];
336 return dev_info->rx_queue &&
337 !dev_info->internal_event_port &&
338 queue_info->queue_enabled && queue_info->wt == 0;
342 rxa_polled_queue(struct eth_device_info *dev_info,
345 struct eth_rx_queue_info *queue_info;
347 queue_info = &dev_info->rx_queue[rx_queue_id];
348 return !dev_info->internal_event_port &&
349 dev_info->rx_queue &&
350 queue_info->queue_enabled && queue_info->wt != 0;
353 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
355 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
361 nbq = dev_info->dev->data->nb_rx_queues;
362 n = 0; /* non shared count */
363 s = 0; /* shared count */
365 if (rx_queue_id == -1) {
366 for (i = 0; i < nbq; i++) {
367 if (!rxa_shared_intr(dev_info, i))
368 n += add ? !rxa_intr_queue(dev_info, i) :
369 rxa_intr_queue(dev_info, i);
371 s += add ? !rxa_intr_queue(dev_info, i) :
372 rxa_intr_queue(dev_info, i);
376 if ((add && dev_info->nb_shared_intr == 0) ||
377 (!add && dev_info->nb_shared_intr))
381 if (!rxa_shared_intr(dev_info, rx_queue_id))
382 n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
383 rxa_intr_queue(dev_info, rx_queue_id);
385 n = add ? !dev_info->nb_shared_intr :
386 dev_info->nb_shared_intr == 1;
392 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
395 rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
396 struct eth_device_info *dev_info,
398 uint32_t *nb_rx_intr)
402 if (rx_queue_id == -1)
403 intr_diff = dev_info->nb_rx_intr;
405 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
407 *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
410 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
411 * interrupt queues could currently be poll mode Rx queues
414 rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
415 struct eth_device_info *dev_info,
417 uint32_t *nb_rx_poll,
418 uint32_t *nb_rx_intr,
423 uint32_t wrr_len_diff;
425 if (rx_queue_id == -1) {
426 intr_diff = dev_info->dev->data->nb_rx_queues -
427 dev_info->nb_rx_intr;
428 poll_diff = dev_info->nb_rx_poll;
429 wrr_len_diff = dev_info->wrr_len;
431 intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
432 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
433 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
437 *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
438 *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
439 *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
442 /* Calculate size of the eth_rx_poll and wrr_sched arrays
443 * after deleting poll mode rx queues
446 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
447 struct eth_device_info *dev_info,
449 uint32_t *nb_rx_poll,
453 uint32_t wrr_len_diff;
455 if (rx_queue_id == -1) {
456 poll_diff = dev_info->nb_rx_poll;
457 wrr_len_diff = dev_info->wrr_len;
459 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
460 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
464 *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
465 *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
468 /* Calculate nb_rx_* after adding poll mode rx queues
471 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
472 struct eth_device_info *dev_info,
475 uint32_t *nb_rx_poll,
476 uint32_t *nb_rx_intr,
481 uint32_t wrr_len_diff;
483 if (rx_queue_id == -1) {
484 intr_diff = dev_info->nb_rx_intr;
485 poll_diff = dev_info->dev->data->nb_rx_queues -
486 dev_info->nb_rx_poll;
487 wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
490 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
491 poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
492 wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
493 wt - dev_info->rx_queue[rx_queue_id].wt :
497 *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
498 *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
499 *nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
502 /* Calculate nb_rx_* after adding rx_queue_id */
504 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
505 struct eth_device_info *dev_info,
508 uint32_t *nb_rx_poll,
509 uint32_t *nb_rx_intr,
513 rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
514 wt, nb_rx_poll, nb_rx_intr, nb_wrr);
516 rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
517 nb_rx_poll, nb_rx_intr, nb_wrr);
520 /* Calculate nb_rx_* after deleting rx_queue_id */
522 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
523 struct eth_device_info *dev_info,
525 uint32_t *nb_rx_poll,
526 uint32_t *nb_rx_intr,
529 rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
531 rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
536 * Allocate the rx_poll array
538 static struct eth_rx_poll_entry *
539 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
540 uint32_t num_rx_polled)
544 len = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
545 RTE_CACHE_LINE_SIZE);
546 return rte_zmalloc_socket(rx_adapter->mem_name,
549 rx_adapter->socket_id);
553 * Allocate the WRR array
556 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
560 len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
561 RTE_CACHE_LINE_SIZE);
562 return rte_zmalloc_socket(rx_adapter->mem_name,
565 rx_adapter->socket_id);
569 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
572 struct eth_rx_poll_entry **rx_poll,
573 uint32_t **wrr_sched)
582 *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
583 if (*rx_poll == NULL) {
588 *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
589 if (*wrr_sched == NULL) {
596 /* Precalculate WRR polling sequence for all queues in rx_adapter */
598 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
599 struct eth_rx_poll_entry *rx_poll,
608 /* Initialize variables for calculation of wrr schedule */
609 uint16_t max_wrr_pos = 0;
610 unsigned int poll_q = 0;
617 /* Generate array of all queues to poll, the size of this
620 RTE_ETH_FOREACH_DEV(d) {
621 uint16_t nb_rx_queues;
622 struct eth_device_info *dev_info =
623 &rx_adapter->eth_devices[d];
624 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
625 if (dev_info->rx_queue == NULL)
627 if (dev_info->internal_event_port)
629 dev_info->wrr_len = 0;
630 for (q = 0; q < nb_rx_queues; q++) {
631 struct eth_rx_queue_info *queue_info =
632 &dev_info->rx_queue[q];
635 if (!rxa_polled_queue(dev_info, q))
638 rx_poll[poll_q].eth_dev_id = d;
639 rx_poll[poll_q].eth_rx_qid = q;
641 dev_info->wrr_len += wt;
642 max_wt = RTE_MAX(max_wt, wt);
643 gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
648 /* Generate polling sequence based on weights */
651 for (i = 0; i < max_wrr_pos; i++) {
652 rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
653 rx_poll, max_wt, gcd, prev);
659 rxa_mtoip(struct rte_mbuf *m, struct rte_ipv4_hdr **ipv4_hdr,
660 struct rte_ipv6_hdr **ipv6_hdr)
662 struct rte_ether_hdr *eth_hdr =
663 rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
664 struct rte_vlan_hdr *vlan_hdr;
669 switch (eth_hdr->ether_type) {
670 case RTE_BE16(RTE_ETHER_TYPE_IPV4):
671 *ipv4_hdr = (struct rte_ipv4_hdr *)(eth_hdr + 1);
674 case RTE_BE16(RTE_ETHER_TYPE_IPV6):
675 *ipv6_hdr = (struct rte_ipv6_hdr *)(eth_hdr + 1);
678 case RTE_BE16(RTE_ETHER_TYPE_VLAN):
679 vlan_hdr = (struct rte_vlan_hdr *)(eth_hdr + 1);
680 switch (vlan_hdr->eth_proto) {
681 case RTE_BE16(RTE_ETHER_TYPE_IPV4):
682 *ipv4_hdr = (struct rte_ipv4_hdr *)(vlan_hdr + 1);
684 case RTE_BE16(RTE_ETHER_TYPE_IPV6):
685 *ipv6_hdr = (struct rte_ipv6_hdr *)(vlan_hdr + 1);
697 /* Calculate RSS hash for IPv4/6 */
698 static inline uint32_t
699 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
703 struct rte_ipv4_tuple ipv4_tuple;
704 struct rte_ipv6_tuple ipv6_tuple;
705 struct rte_ipv4_hdr *ipv4_hdr;
706 struct rte_ipv6_hdr *ipv6_hdr;
708 rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
711 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
712 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
714 input_len = RTE_THASH_V4_L3_LEN;
715 } else if (ipv6_hdr) {
716 rte_thash_load_v6_addrs(ipv6_hdr,
717 (union rte_thash_tuple *)&ipv6_tuple);
719 input_len = RTE_THASH_V6_L3_LEN;
723 return rte_softrss_be(tuple, input_len, rss_key_be);
727 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
729 return !!rx_adapter->enq_block_count;
733 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
735 if (rx_adapter->rx_enq_block_start_ts)
738 rx_adapter->enq_block_count++;
739 if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
742 rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
746 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
747 struct rte_event_eth_rx_adapter_stats *stats)
749 if (unlikely(!stats->rx_enq_start_ts))
750 stats->rx_enq_start_ts = rte_get_tsc_cycles();
752 if (likely(!rxa_enq_blocked(rx_adapter)))
755 rx_adapter->enq_block_count = 0;
756 if (rx_adapter->rx_enq_block_start_ts) {
757 stats->rx_enq_end_ts = rte_get_tsc_cycles();
758 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
759 rx_adapter->rx_enq_block_start_ts;
760 rx_adapter->rx_enq_block_start_ts = 0;
764 /* Enqueue buffered events to event device */
765 static inline uint16_t
766 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
768 struct rte_eth_event_enqueue_buffer *buf =
769 &rx_adapter->event_enqueue_buffer;
770 struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
771 uint16_t count = buf->last ? buf->last - buf->head : buf->count;
776 uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
777 rx_adapter->event_port_id,
778 &buf->events[buf->head],
781 stats->rx_enq_retry++;
785 if (buf->last && n == count) {
788 n1 = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
789 rx_adapter->event_port_id,
794 stats->rx_enq_retry++;
802 n ? rxa_enq_block_end_ts(rx_adapter, stats) :
803 rxa_enq_block_start_ts(rx_adapter);
806 stats->rx_enq_count += n;
812 rxa_init_vector(struct rte_event_eth_rx_adapter *rx_adapter,
813 struct eth_rx_vector_data *vec)
815 vec->vector_ev->nb_elem = 0;
816 vec->vector_ev->port = vec->port;
817 vec->vector_ev->queue = vec->queue;
818 vec->vector_ev->attr_valid = true;
819 TAILQ_INSERT_TAIL(&rx_adapter->vector_list, vec, next);
822 static inline uint16_t
823 rxa_create_event_vector(struct rte_event_eth_rx_adapter *rx_adapter,
824 struct eth_rx_queue_info *queue_info,
825 struct rte_eth_event_enqueue_buffer *buf,
826 struct rte_mbuf **mbufs, uint16_t num)
828 struct rte_event *ev = &buf->events[buf->count];
829 struct eth_rx_vector_data *vec;
830 uint16_t filled, space, sz;
833 vec = &queue_info->vector_data;
835 if (vec->vector_ev == NULL) {
836 if (rte_mempool_get(vec->vector_pool,
837 (void **)&vec->vector_ev) < 0) {
838 rte_pktmbuf_free_bulk(mbufs, num);
841 rxa_init_vector(rx_adapter, vec);
844 if (vec->vector_ev->nb_elem == vec->max_vector_count) {
846 ev->event = vec->event;
847 ev->vec = vec->vector_ev;
850 vec->vector_ev = NULL;
851 TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
852 if (rte_mempool_get(vec->vector_pool,
853 (void **)&vec->vector_ev) < 0) {
854 rte_pktmbuf_free_bulk(mbufs, num);
857 rxa_init_vector(rx_adapter, vec);
860 space = vec->max_vector_count - vec->vector_ev->nb_elem;
861 sz = num > space ? space : num;
862 memcpy(vec->vector_ev->mbufs + vec->vector_ev->nb_elem, mbufs,
863 sizeof(void *) * sz);
864 vec->vector_ev->nb_elem += sz;
867 vec->ts = rte_rdtsc();
870 if (vec->vector_ev->nb_elem == vec->max_vector_count) {
871 ev->event = vec->event;
872 ev->vec = vec->vector_ev;
875 vec->vector_ev = NULL;
876 TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
883 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
885 uint16_t rx_queue_id,
886 struct rte_mbuf **mbufs,
890 struct eth_device_info *dev_info =
891 &rx_adapter->eth_devices[eth_dev_id];
892 struct eth_rx_queue_info *eth_rx_queue_info =
893 &dev_info->rx_queue[rx_queue_id];
894 struct rte_eth_event_enqueue_buffer *buf =
895 &rx_adapter->event_enqueue_buffer;
896 uint16_t new_tail = buf->tail;
897 uint64_t event = eth_rx_queue_info->event;
898 uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
899 struct rte_mbuf *m = mbufs[0];
905 uint64_t ts, ts_mask;
907 if (!eth_rx_queue_info->ena_vector) {
908 ts = m->ol_flags & event_eth_rx_timestamp_dynflag ?
909 0 : rte_get_tsc_cycles();
911 /* 0xffff ffff ffff ffff if PKT_RX_TIMESTAMP is set,
914 ts_mask = (uint64_t)(!(m->ol_flags &
915 event_eth_rx_timestamp_dynflag)) - 1ULL;
917 /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
918 rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
919 do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
920 for (i = 0; i < num; i++) {
921 struct rte_event *ev;
924 *rxa_timestamp_dynfield(m) = ts |
925 (*rxa_timestamp_dynfield(m) & ts_mask);
927 ev = &buf->events[new_tail];
929 rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
932 ev->flow_id = (rss & ~flow_id_mask) |
933 (ev->flow_id & flow_id_mask);
938 num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
942 if (num && dev_info->cb_fn) {
945 nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
947 (RTE_DIM(buf->events) & ~buf->last_mask),
948 buf->count >= BATCH_SIZE ?
949 buf->count - BATCH_SIZE : 0,
950 &buf->events[buf->tail],
954 if (unlikely(nb_cb > num))
955 RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
960 rx_adapter->stats.rx_dropped += dropped;
968 rxa_pkt_buf_available(struct rte_eth_event_enqueue_buffer *buf)
970 uint32_t nb_req = buf->tail + BATCH_SIZE;
973 if (nb_req <= RTE_DIM(buf->events))
976 if (buf->head >= BATCH_SIZE) {
978 buf->last = buf->tail;
984 return nb_req <= buf->head;
987 /* Enqueue packets from <port, q> to event buffer */
988 static inline uint32_t
989 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
996 struct rte_mbuf *mbufs[BATCH_SIZE];
997 struct rte_eth_event_enqueue_buffer *buf =
998 &rx_adapter->event_enqueue_buffer;
999 struct rte_event_eth_rx_adapter_stats *stats =
1006 /* Don't do a batch dequeue from the rx queue if there isn't
1007 * enough space in the enqueue buffer.
1009 while (rxa_pkt_buf_available(buf)) {
1010 if (buf->count >= BATCH_SIZE)
1011 rxa_flush_event_buffer(rx_adapter);
1013 stats->rx_poll_count++;
1014 n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
1020 rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
1022 if (rx_count + nb_rx > max_rx)
1027 rxa_flush_event_buffer(rx_adapter);
1033 rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
1039 union queue_data qd;
1040 struct eth_device_info *dev_info;
1041 struct eth_rx_queue_info *queue_info;
1048 dev_info = &rx_adapter->eth_devices[port_id];
1049 queue_info = &dev_info->rx_queue[queue];
1050 rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1051 if (rxa_shared_intr(dev_info, queue))
1052 intr_enabled = &dev_info->shared_intr_enabled;
1054 intr_enabled = &queue_info->intr_enabled;
1056 if (*intr_enabled) {
1058 err = rte_ring_enqueue(rx_adapter->intr_ring, data);
1059 /* Entry should always be available.
1060 * The ring size equals the maximum number of interrupt
1061 * vectors supported (an interrupt vector is shared in
1062 * case of shared interrupts)
1065 RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
1066 " to ring: %s", strerror(-err));
1068 rte_eth_dev_rx_intr_disable(port_id, queue);
1070 rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1074 rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
1075 uint32_t num_intr_vec)
1077 if (rx_adapter->num_intr_vec + num_intr_vec >
1078 RTE_EVENT_ETH_INTR_RING_SIZE) {
1079 RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
1080 " %d needed %d limit %d", rx_adapter->num_intr_vec,
1081 num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
1088 /* Delete entries for (dev, queue) from the interrupt ring */
1090 rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
1091 struct eth_device_info *dev_info,
1092 uint16_t rx_queue_id)
1095 union queue_data qd;
1097 rte_spinlock_lock(&rx_adapter->intr_ring_lock);
1099 n = rte_ring_count(rx_adapter->intr_ring);
1100 for (i = 0; i < n; i++) {
1101 rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1102 if (!rxa_shared_intr(dev_info, rx_queue_id)) {
1103 if (qd.port == dev_info->dev->data->port_id &&
1104 qd.queue == rx_queue_id)
1107 if (qd.port == dev_info->dev->data->port_id)
1110 rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
1113 rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
1116 /* pthread callback handling interrupt mode receive queues
1117 * After receiving an Rx interrupt, it enqueues the port id and queue id of the
1118 * interrupting queue to the adapter's ring buffer for interrupt events.
1119 * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
1120 * the adapter service function.
1123 rxa_intr_thread(void *arg)
1125 struct rte_event_eth_rx_adapter *rx_adapter = arg;
1126 struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
1130 n = rte_epoll_wait(rx_adapter->epd, epoll_events,
1131 RTE_EVENT_ETH_INTR_RING_SIZE, -1);
1132 if (unlikely(n < 0))
1133 RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
1135 for (i = 0; i < n; i++) {
1136 rxa_intr_ring_enqueue(rx_adapter,
1137 epoll_events[i].epdata.data);
1144 /* Dequeue <port, q> from interrupt ring and enqueue received
1147 static inline uint32_t
1148 rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
1153 struct rte_eth_event_enqueue_buffer *buf;
1154 rte_spinlock_t *ring_lock;
1155 uint8_t max_done = 0;
1157 if (rx_adapter->num_rx_intr == 0)
1160 if (rte_ring_count(rx_adapter->intr_ring) == 0
1161 && !rx_adapter->qd_valid)
1164 buf = &rx_adapter->event_enqueue_buffer;
1165 ring_lock = &rx_adapter->intr_ring_lock;
1167 if (buf->count >= BATCH_SIZE)
1168 rxa_flush_event_buffer(rx_adapter);
1170 while (rxa_pkt_buf_available(buf)) {
1171 struct eth_device_info *dev_info;
1174 union queue_data qd = rx_adapter->qd;
1177 if (!rx_adapter->qd_valid) {
1178 struct eth_rx_queue_info *queue_info;
1180 rte_spinlock_lock(ring_lock);
1181 err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1183 rte_spinlock_unlock(ring_lock);
1189 rx_adapter->qd = qd;
1190 rx_adapter->qd_valid = 1;
1191 dev_info = &rx_adapter->eth_devices[port];
1192 if (rxa_shared_intr(dev_info, queue))
1193 dev_info->shared_intr_enabled = 1;
1195 queue_info = &dev_info->rx_queue[queue];
1196 queue_info->intr_enabled = 1;
1198 rte_eth_dev_rx_intr_enable(port, queue);
1199 rte_spinlock_unlock(ring_lock);
1204 dev_info = &rx_adapter->eth_devices[port];
1207 if (rxa_shared_intr(dev_info, queue)) {
1211 nb_queues = dev_info->dev->data->nb_rx_queues;
1213 for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1214 uint8_t enq_buffer_full;
1216 if (!rxa_intr_queue(dev_info, i))
1218 n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1219 rx_adapter->max_nb_rx,
1223 enq_buffer_full = !rxq_empty && n == 0;
1224 max_done = nb_rx > rx_adapter->max_nb_rx;
1226 if (enq_buffer_full || max_done) {
1227 dev_info->next_q_idx = i;
1232 rx_adapter->qd_valid = 0;
1234 /* Reinitialize for next interrupt */
1235 dev_info->next_q_idx = dev_info->multi_intr_cap ?
1236 RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1239 n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1240 rx_adapter->max_nb_rx,
1242 rx_adapter->qd_valid = !rxq_empty;
1244 if (nb_rx > rx_adapter->max_nb_rx)
1250 rx_adapter->stats.rx_intr_packets += nb_rx;
1255 * Polls receive queues added to the event adapter and enqueues received
1256 * packets to the event device.
1258 * The receive code enqueues initially to a temporary buffer, the
1259 * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1261 * If there isn't space available in the temporary buffer, packets from the
1262 * Rx queue aren't dequeued from the eth device, this back pressures the
1263 * eth device, in virtual device environments this back pressure is relayed to
1264 * the hypervisor's switching layer where adjustments can be made to deal with
1267 static inline uint32_t
1268 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
1272 struct rte_eth_event_enqueue_buffer *buf;
1276 wrr_pos = rx_adapter->wrr_pos;
1277 max_nb_rx = rx_adapter->max_nb_rx;
1278 buf = &rx_adapter->event_enqueue_buffer;
1280 /* Iterate through a WRR sequence */
1281 for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1282 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1283 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1284 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1286 /* Don't do a batch dequeue from the rx queue if there isn't
1287 * enough space in the enqueue buffer.
1289 if (buf->count >= BATCH_SIZE)
1290 rxa_flush_event_buffer(rx_adapter);
1291 if (!rxa_pkt_buf_available(buf)) {
1292 rx_adapter->wrr_pos = wrr_pos;
1296 nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1298 if (nb_rx > max_nb_rx) {
1299 rx_adapter->wrr_pos =
1300 (wrr_pos + 1) % rx_adapter->wrr_len;
1304 if (++wrr_pos == rx_adapter->wrr_len)
1311 rxa_vector_expire(struct eth_rx_vector_data *vec, void *arg)
1313 struct rte_event_eth_rx_adapter *rx_adapter = arg;
1314 struct rte_eth_event_enqueue_buffer *buf =
1315 &rx_adapter->event_enqueue_buffer;
1316 struct rte_event *ev;
1319 rxa_flush_event_buffer(rx_adapter);
1321 if (vec->vector_ev->nb_elem == 0)
1323 ev = &buf->events[buf->count];
1326 ev->event = vec->event;
1327 ev->vec = vec->vector_ev;
1330 vec->vector_ev = NULL;
1335 rxa_service_func(void *args)
1337 struct rte_event_eth_rx_adapter *rx_adapter = args;
1338 struct rte_event_eth_rx_adapter_stats *stats;
1340 if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1342 if (!rx_adapter->rxa_started) {
1343 rte_spinlock_unlock(&rx_adapter->rx_lock);
1347 if (rx_adapter->ena_vector) {
1348 if ((rte_rdtsc() - rx_adapter->prev_expiry_ts) >=
1349 rx_adapter->vector_tmo_ticks) {
1350 struct eth_rx_vector_data *vec;
1352 TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1353 uint64_t elapsed_time = rte_rdtsc() - vec->ts;
1355 if (elapsed_time >= vec->vector_timeout_ticks) {
1356 rxa_vector_expire(vec, rx_adapter);
1357 TAILQ_REMOVE(&rx_adapter->vector_list,
1361 rx_adapter->prev_expiry_ts = rte_rdtsc();
1365 stats = &rx_adapter->stats;
1366 stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
1367 stats->rx_packets += rxa_poll(rx_adapter);
1368 rte_spinlock_unlock(&rx_adapter->rx_lock);
1373 rte_event_eth_rx_adapter_init(void)
1375 const char *name = "rte_event_eth_rx_adapter_array";
1376 const struct rte_memzone *mz;
1379 sz = sizeof(*event_eth_rx_adapter) *
1380 RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1381 sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1383 mz = rte_memzone_lookup(name);
1385 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1386 RTE_CACHE_LINE_SIZE);
1388 RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1394 event_eth_rx_adapter = mz->addr;
1398 static inline struct rte_event_eth_rx_adapter *
1399 rxa_id_to_adapter(uint8_t id)
1401 return event_eth_rx_adapter ?
1402 event_eth_rx_adapter[id] : NULL;
1406 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1407 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1410 struct rte_eventdev *dev;
1411 struct rte_event_dev_config dev_conf;
1414 struct rte_event_port_conf *port_conf = arg;
1415 struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1417 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1418 dev_conf = dev->data->dev_conf;
1420 started = dev->data->dev_started;
1422 rte_event_dev_stop(dev_id);
1423 port_id = dev_conf.nb_event_ports;
1424 dev_conf.nb_event_ports += 1;
1425 ret = rte_event_dev_configure(dev_id, &dev_conf);
1427 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1430 if (rte_event_dev_start(dev_id))
1436 ret = rte_event_port_setup(dev_id, port_id, port_conf);
1438 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1443 conf->event_port_id = port_id;
1444 conf->max_nb_rx = 128;
1446 ret = rte_event_dev_start(dev_id);
1447 rx_adapter->default_cb_arg = 1;
1452 rxa_epoll_create1(void)
1456 fd = epoll_create1(EPOLL_CLOEXEC);
1457 return fd < 0 ? -errno : fd;
1464 rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
1466 if (rx_adapter->epd != INIT_FD)
1469 rx_adapter->epd = rxa_epoll_create1();
1470 if (rx_adapter->epd < 0) {
1471 int err = rx_adapter->epd;
1472 rx_adapter->epd = INIT_FD;
1473 RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1481 rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1484 char thread_name[RTE_MAX_THREAD_NAME_LEN];
1486 if (rx_adapter->intr_ring)
1489 rx_adapter->intr_ring = rte_ring_create("intr_ring",
1490 RTE_EVENT_ETH_INTR_RING_SIZE,
1491 rte_socket_id(), 0);
1492 if (!rx_adapter->intr_ring)
1495 rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1496 RTE_EVENT_ETH_INTR_RING_SIZE *
1497 sizeof(struct rte_epoll_event),
1498 RTE_CACHE_LINE_SIZE,
1499 rx_adapter->socket_id);
1500 if (!rx_adapter->epoll_events) {
1505 rte_spinlock_init(&rx_adapter->intr_ring_lock);
1507 snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1508 "rx-intr-thread-%d", rx_adapter->id);
1510 err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1511 NULL, rxa_intr_thread, rx_adapter);
1515 RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1516 rte_free(rx_adapter->epoll_events);
1518 rte_ring_free(rx_adapter->intr_ring);
1519 rx_adapter->intr_ring = NULL;
1520 rx_adapter->epoll_events = NULL;
1525 rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1529 err = pthread_cancel(rx_adapter->rx_intr_thread);
1531 RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1534 err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1536 RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1538 rte_free(rx_adapter->epoll_events);
1539 rte_ring_free(rx_adapter->intr_ring);
1540 rx_adapter->intr_ring = NULL;
1541 rx_adapter->epoll_events = NULL;
1546 rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
1550 if (rx_adapter->num_rx_intr == 0)
1553 ret = rxa_destroy_intr_thread(rx_adapter);
1557 close(rx_adapter->epd);
1558 rx_adapter->epd = INIT_FD;
1564 rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1565 struct eth_device_info *dev_info,
1566 uint16_t rx_queue_id)
1569 uint16_t eth_dev_id = dev_info->dev->data->port_id;
1570 int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1572 err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1574 RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1579 err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1584 RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1587 dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1589 dev_info->shared_intr_enabled = 0;
1594 rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1595 struct eth_device_info *dev_info,
1602 if (dev_info->nb_rx_intr == 0)
1606 if (rx_queue_id == -1) {
1607 s = dev_info->nb_shared_intr;
1608 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1612 q = dev_info->intr_queue[i];
1613 sintr = rxa_shared_intr(dev_info, q);
1616 if (!sintr || s == 0) {
1618 err = rxa_disable_intr(rx_adapter, dev_info,
1622 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1627 if (!rxa_intr_queue(dev_info, rx_queue_id))
1629 if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1630 dev_info->nb_shared_intr == 1) {
1631 err = rxa_disable_intr(rx_adapter, dev_info,
1635 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1639 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1640 if (dev_info->intr_queue[i] == rx_queue_id) {
1641 for (; i < dev_info->nb_rx_intr - 1; i++)
1642 dev_info->intr_queue[i] =
1643 dev_info->intr_queue[i + 1];
1653 rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1654 struct eth_device_info *dev_info,
1655 uint16_t rx_queue_id)
1658 uint16_t eth_dev_id = dev_info->dev->data->port_id;
1659 union queue_data qd;
1661 uint16_t *intr_queue;
1662 int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1664 if (rxa_intr_queue(dev_info, rx_queue_id))
1667 intr_queue = dev_info->intr_queue;
1668 if (dev_info->intr_queue == NULL) {
1670 dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1671 dev_info->intr_queue =
1673 rx_adapter->mem_name,
1676 rx_adapter->socket_id);
1677 if (dev_info->intr_queue == NULL)
1681 init_fd = rx_adapter->epd;
1682 err = rxa_init_epd(rx_adapter);
1684 goto err_free_queue;
1686 qd.port = eth_dev_id;
1687 qd.queue = rx_queue_id;
1689 err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1694 RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1695 " Rx Queue %u err %d", rx_queue_id, err);
1699 err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1701 RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1702 " Rx Queue %u err %d", rx_queue_id, err);
1707 err = rxa_create_intr_thread(rx_adapter);
1710 dev_info->shared_intr_enabled = 1;
1712 dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1717 err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1719 RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1720 " Rx Queue %u err %d", rx_queue_id, err);
1722 err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1727 RTE_EDEV_LOG_ERR("Could not delete event for"
1728 " Rx Queue %u err %d", rx_queue_id, err1);
1731 if (init_fd == INIT_FD) {
1732 close(rx_adapter->epd);
1733 rx_adapter->epd = -1;
1736 if (intr_queue == NULL)
1737 rte_free(dev_info->intr_queue);
1743 rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1744 struct eth_device_info *dev_info,
1750 int shared_done = (dev_info->nb_shared_intr > 0);
1752 if (rx_queue_id != -1) {
1753 if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1755 return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1759 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1761 if (rxa_shared_intr(dev_info, i) && shared_done)
1764 err = rxa_config_intr(rx_adapter, dev_info, i);
1766 shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1769 dev_info->shared_intr_enabled = 1;
1778 shared_done = (dev_info->nb_shared_intr > 0);
1779 for (j = 0; j < i; j++) {
1780 if (rxa_intr_queue(dev_info, j))
1782 if (rxa_shared_intr(dev_info, j) && si != j)
1784 err = rxa_disable_intr(rx_adapter, dev_info, j);
1795 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
1798 struct rte_service_spec service;
1799 struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1801 if (rx_adapter->service_inited)
1804 memset(&service, 0, sizeof(service));
1805 snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1806 "rte_event_eth_rx_adapter_%d", id);
1807 service.socket_id = rx_adapter->socket_id;
1808 service.callback = rxa_service_func;
1809 service.callback_userdata = rx_adapter;
1810 /* Service function handles locking for queue add/del updates */
1811 service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1812 ret = rte_service_component_register(&service, &rx_adapter->service_id);
1814 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1819 ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1820 &rx_adapter_conf, rx_adapter->conf_arg);
1822 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1826 rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1827 rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1828 rx_adapter->service_inited = 1;
1829 rx_adapter->epd = INIT_FD;
1833 rte_service_component_unregister(rx_adapter->service_id);
1838 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1839 struct eth_device_info *dev_info,
1840 int32_t rx_queue_id,
1843 struct eth_rx_queue_info *queue_info;
1847 if (dev_info->rx_queue == NULL)
1850 if (rx_queue_id == -1) {
1851 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1852 rxa_update_queue(rx_adapter, dev_info, i, add);
1854 queue_info = &dev_info->rx_queue[rx_queue_id];
1855 enabled = queue_info->queue_enabled;
1857 rx_adapter->nb_queues += !enabled;
1858 dev_info->nb_dev_queues += !enabled;
1860 rx_adapter->nb_queues -= enabled;
1861 dev_info->nb_dev_queues -= enabled;
1863 queue_info->queue_enabled = !!add;
1868 rxa_set_vector_data(struct eth_rx_queue_info *queue_info, uint16_t vector_count,
1869 uint64_t vector_ns, struct rte_mempool *mp, uint32_t qid,
1872 #define NSEC2TICK(__ns, __freq) (((__ns) * (__freq)) / 1E9)
1873 struct eth_rx_vector_data *vector_data;
1876 vector_data = &queue_info->vector_data;
1877 vector_data->max_vector_count = vector_count;
1878 vector_data->port = port_id;
1879 vector_data->queue = qid;
1880 vector_data->vector_pool = mp;
1881 vector_data->vector_timeout_ticks =
1882 NSEC2TICK(vector_ns, rte_get_timer_hz());
1883 vector_data->ts = 0;
1884 flow_id = queue_info->event & 0xFFFFF;
1886 flow_id == 0 ? (qid & 0xFFF) | (port_id & 0xFF) << 12 : flow_id;
1887 vector_data->event = (queue_info->event & ~0xFFFFF) | flow_id;
1891 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
1892 struct eth_device_info *dev_info,
1893 int32_t rx_queue_id)
1895 struct eth_rx_vector_data *vec;
1901 if (rx_adapter->nb_queues == 0)
1904 if (rx_queue_id == -1) {
1905 uint16_t nb_rx_queues;
1908 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1909 for (i = 0; i < nb_rx_queues; i++)
1910 rxa_sw_del(rx_adapter, dev_info, i);
1914 /* Push all the partial event vectors to event device. */
1915 TAILQ_FOREACH(vec, &rx_adapter->vector_list, next) {
1916 if (vec->queue != rx_queue_id)
1918 rxa_vector_expire(vec, rx_adapter);
1919 TAILQ_REMOVE(&rx_adapter->vector_list, vec, next);
1922 pollq = rxa_polled_queue(dev_info, rx_queue_id);
1923 intrq = rxa_intr_queue(dev_info, rx_queue_id);
1924 sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1925 rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1926 rx_adapter->num_rx_polled -= pollq;
1927 dev_info->nb_rx_poll -= pollq;
1928 rx_adapter->num_rx_intr -= intrq;
1929 dev_info->nb_rx_intr -= intrq;
1930 dev_info->nb_shared_intr -= intrq && sintrq;
1934 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1935 struct eth_device_info *dev_info,
1936 int32_t rx_queue_id,
1937 const struct rte_event_eth_rx_adapter_queue_conf *conf)
1939 struct eth_rx_queue_info *queue_info;
1940 const struct rte_event *ev = &conf->ev;
1944 struct rte_event *qi_ev;
1946 if (rx_queue_id == -1) {
1947 uint16_t nb_rx_queues;
1950 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1951 for (i = 0; i < nb_rx_queues; i++)
1952 rxa_add_queue(rx_adapter, dev_info, i, conf);
1956 pollq = rxa_polled_queue(dev_info, rx_queue_id);
1957 intrq = rxa_intr_queue(dev_info, rx_queue_id);
1958 sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1960 queue_info = &dev_info->rx_queue[rx_queue_id];
1961 queue_info->wt = conf->servicing_weight;
1963 qi_ev = (struct rte_event *)&queue_info->event;
1964 qi_ev->event = ev->event;
1965 qi_ev->op = RTE_EVENT_OP_NEW;
1966 qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
1967 qi_ev->sub_event_type = 0;
1969 if (conf->rx_queue_flags &
1970 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
1971 queue_info->flow_id_mask = ~0;
1975 if (conf->rx_queue_flags &
1976 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
1977 queue_info->ena_vector = 1;
1978 qi_ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER_VECTOR;
1979 rxa_set_vector_data(queue_info, conf->vector_sz,
1980 conf->vector_timeout_ns, conf->vector_mp,
1981 rx_queue_id, dev_info->dev->data->port_id);
1982 rx_adapter->ena_vector = 1;
1983 rx_adapter->vector_tmo_ticks =
1984 rx_adapter->vector_tmo_ticks ?
1985 RTE_MIN(queue_info->vector_data
1986 .vector_timeout_ticks >>
1988 rx_adapter->vector_tmo_ticks) :
1989 queue_info->vector_data.vector_timeout_ticks >>
1993 rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
1994 if (rxa_polled_queue(dev_info, rx_queue_id)) {
1995 rx_adapter->num_rx_polled += !pollq;
1996 dev_info->nb_rx_poll += !pollq;
1997 rx_adapter->num_rx_intr -= intrq;
1998 dev_info->nb_rx_intr -= intrq;
1999 dev_info->nb_shared_intr -= intrq && sintrq;
2002 if (rxa_intr_queue(dev_info, rx_queue_id)) {
2003 rx_adapter->num_rx_polled -= pollq;
2004 dev_info->nb_rx_poll -= pollq;
2005 rx_adapter->num_rx_intr += !intrq;
2006 dev_info->nb_rx_intr += !intrq;
2007 dev_info->nb_shared_intr += !intrq && sintrq;
2008 if (dev_info->nb_shared_intr == 1) {
2009 if (dev_info->multi_intr_cap)
2010 dev_info->next_q_idx =
2011 RTE_MAX_RXTX_INTR_VEC_ID - 1;
2013 dev_info->next_q_idx = 0;
2018 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
2019 uint16_t eth_dev_id,
2021 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2023 struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
2024 struct rte_event_eth_rx_adapter_queue_conf temp_conf;
2026 struct eth_rx_poll_entry *rx_poll;
2027 struct eth_rx_queue_info *rx_queue;
2029 uint16_t nb_rx_queues;
2030 uint32_t nb_rx_poll, nb_wrr;
2031 uint32_t nb_rx_intr;
2035 if (queue_conf->servicing_weight == 0) {
2036 struct rte_eth_dev_data *data = dev_info->dev->data;
2038 temp_conf = *queue_conf;
2039 if (!data->dev_conf.intr_conf.rxq) {
2040 /* If Rx interrupts are disabled set wt = 1 */
2041 temp_conf.servicing_weight = 1;
2043 queue_conf = &temp_conf;
2046 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
2047 rx_queue = dev_info->rx_queue;
2048 wt = queue_conf->servicing_weight;
2050 if (dev_info->rx_queue == NULL) {
2051 dev_info->rx_queue =
2052 rte_zmalloc_socket(rx_adapter->mem_name,
2054 sizeof(struct eth_rx_queue_info), 0,
2055 rx_adapter->socket_id);
2056 if (dev_info->rx_queue == NULL)
2062 rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
2063 queue_conf->servicing_weight,
2064 &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2066 if (dev_info->dev->intr_handle)
2067 dev_info->multi_intr_cap =
2068 rte_intr_cap_multiple(dev_info->dev->intr_handle);
2070 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2073 goto err_free_rxqueue;
2076 num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
2078 ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
2080 goto err_free_rxqueue;
2082 ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
2084 goto err_free_rxqueue;
2088 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2089 num_intr_vec = rxa_nb_intr_vect(dev_info,
2091 /* interrupt based queues are being converted to
2092 * poll mode queues, delete the interrupt configuration
2095 ret = rxa_del_intr_queue(rx_adapter,
2096 dev_info, rx_queue_id);
2098 goto err_free_rxqueue;
2102 if (nb_rx_intr == 0) {
2103 ret = rxa_free_intr_resources(rx_adapter);
2105 goto err_free_rxqueue;
2111 if (rx_queue_id == -1) {
2112 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
2113 dev_info->intr_queue[i] = i;
2115 if (!rxa_intr_queue(dev_info, rx_queue_id))
2116 dev_info->intr_queue[nb_rx_intr - 1] =
2123 rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
2124 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2126 rte_free(rx_adapter->eth_rx_poll);
2127 rte_free(rx_adapter->wrr_sched);
2129 rx_adapter->eth_rx_poll = rx_poll;
2130 rx_adapter->wrr_sched = rx_wrr;
2131 rx_adapter->wrr_len = nb_wrr;
2132 rx_adapter->num_intr_vec += num_intr_vec;
2136 if (rx_queue == NULL) {
2137 rte_free(dev_info->rx_queue);
2138 dev_info->rx_queue = NULL;
2148 rxa_ctrl(uint8_t id, int start)
2150 struct rte_event_eth_rx_adapter *rx_adapter;
2151 struct rte_eventdev *dev;
2152 struct eth_device_info *dev_info;
2154 int use_service = 0;
2157 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2158 rx_adapter = rxa_id_to_adapter(id);
2159 if (rx_adapter == NULL)
2162 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2164 RTE_ETH_FOREACH_DEV(i) {
2165 dev_info = &rx_adapter->eth_devices[i];
2166 /* if start check for num dev queues */
2167 if (start && !dev_info->nb_dev_queues)
2169 /* if stop check if dev has been started */
2170 if (stop && !dev_info->dev_rx_started)
2172 use_service |= !dev_info->internal_event_port;
2173 dev_info->dev_rx_started = start;
2174 if (dev_info->internal_event_port == 0)
2176 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
2177 &rte_eth_devices[i]) :
2178 (*dev->dev_ops->eth_rx_adapter_stop)(dev,
2179 &rte_eth_devices[i]);
2183 rte_spinlock_lock(&rx_adapter->rx_lock);
2184 rx_adapter->rxa_started = start;
2185 rte_service_runstate_set(rx_adapter->service_id, start);
2186 rte_spinlock_unlock(&rx_adapter->rx_lock);
2193 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
2194 rte_event_eth_rx_adapter_conf_cb conf_cb,
2197 struct rte_event_eth_rx_adapter *rx_adapter;
2201 char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
2202 const uint8_t default_rss_key[] = {
2203 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
2204 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
2205 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
2206 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
2207 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
2210 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2211 RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2212 if (conf_cb == NULL)
2215 if (event_eth_rx_adapter == NULL) {
2216 ret = rte_event_eth_rx_adapter_init();
2221 rx_adapter = rxa_id_to_adapter(id);
2222 if (rx_adapter != NULL) {
2223 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
2227 socket_id = rte_event_dev_socket_id(dev_id);
2228 snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
2229 "rte_event_eth_rx_adapter_%d",
2232 rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
2233 RTE_CACHE_LINE_SIZE, socket_id);
2234 if (rx_adapter == NULL) {
2235 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
2239 rx_adapter->eventdev_id = dev_id;
2240 rx_adapter->socket_id = socket_id;
2241 rx_adapter->conf_cb = conf_cb;
2242 rx_adapter->conf_arg = conf_arg;
2243 rx_adapter->id = id;
2244 TAILQ_INIT(&rx_adapter->vector_list);
2245 strcpy(rx_adapter->mem_name, mem_name);
2246 rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
2248 sizeof(struct eth_device_info), 0,
2250 rte_convert_rss_key((const uint32_t *)default_rss_key,
2251 (uint32_t *)rx_adapter->rss_key_be,
2252 RTE_DIM(default_rss_key));
2254 if (rx_adapter->eth_devices == NULL) {
2255 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
2256 rte_free(rx_adapter);
2259 rte_spinlock_init(&rx_adapter->rx_lock);
2260 for (i = 0; i < RTE_MAX_ETHPORTS; i++)
2261 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
2263 event_eth_rx_adapter[id] = rx_adapter;
2264 if (conf_cb == rxa_default_conf_cb)
2265 rx_adapter->default_cb_arg = 1;
2267 if (rte_mbuf_dyn_rx_timestamp_register(
2268 &event_eth_rx_timestamp_dynfield_offset,
2269 &event_eth_rx_timestamp_dynflag) != 0) {
2270 RTE_EDEV_LOG_ERR("Error registering timestamp field in mbuf\n");
2274 rte_eventdev_trace_eth_rx_adapter_create(id, dev_id, conf_cb,
2280 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2281 struct rte_event_port_conf *port_config)
2283 struct rte_event_port_conf *pc;
2286 if (port_config == NULL)
2288 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2290 pc = rte_malloc(NULL, sizeof(*pc), 0);
2294 ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2295 rxa_default_conf_cb,
2303 rte_event_eth_rx_adapter_free(uint8_t id)
2305 struct rte_event_eth_rx_adapter *rx_adapter;
2307 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2309 rx_adapter = rxa_id_to_adapter(id);
2310 if (rx_adapter == NULL)
2313 if (rx_adapter->nb_queues) {
2314 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2315 rx_adapter->nb_queues);
2319 if (rx_adapter->default_cb_arg)
2320 rte_free(rx_adapter->conf_arg);
2321 rte_free(rx_adapter->eth_devices);
2322 rte_free(rx_adapter);
2323 event_eth_rx_adapter[id] = NULL;
2325 rte_eventdev_trace_eth_rx_adapter_free(id);
2330 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2331 uint16_t eth_dev_id,
2332 int32_t rx_queue_id,
2333 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2337 struct rte_event_eth_rx_adapter *rx_adapter;
2338 struct rte_eventdev *dev;
2339 struct eth_device_info *dev_info;
2340 struct rte_event_eth_rx_adapter_vector_limits limits;
2342 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2343 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2345 rx_adapter = rxa_id_to_adapter(id);
2346 if ((rx_adapter == NULL) || (queue_conf == NULL))
2349 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2350 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2354 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2355 "eth port %" PRIu16, id, eth_dev_id);
2359 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2360 && (queue_conf->rx_queue_flags &
2361 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2362 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2363 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2368 if (queue_conf->rx_queue_flags &
2369 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_EVENT_VECTOR) {
2371 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_EVENT_VECTOR) == 0) {
2372 RTE_EDEV_LOG_ERR("Event vectorization is not supported,"
2373 " eth port: %" PRIu16
2374 " adapter id: %" PRIu8,
2379 ret = rte_event_eth_rx_adapter_vector_limits_get(
2380 rx_adapter->eventdev_id, eth_dev_id, &limits);
2382 RTE_EDEV_LOG_ERR("Failed to get event device vector limits,"
2383 " eth port: %" PRIu16
2384 " adapter id: %" PRIu8,
2388 if (queue_conf->vector_sz < limits.min_sz ||
2389 queue_conf->vector_sz > limits.max_sz ||
2390 queue_conf->vector_timeout_ns < limits.min_timeout_ns ||
2391 queue_conf->vector_timeout_ns > limits.max_timeout_ns ||
2392 queue_conf->vector_mp == NULL) {
2393 RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2394 " eth port: %" PRIu16
2395 " adapter id: %" PRIu8,
2399 if (queue_conf->vector_mp->elt_size <
2400 (sizeof(struct rte_event_vector) +
2401 (sizeof(uintptr_t) * queue_conf->vector_sz))) {
2402 RTE_EDEV_LOG_ERR("Invalid event vector configuration,"
2403 " eth port: %" PRIu16
2404 " adapter id: %" PRIu8,
2410 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2411 (rx_queue_id != -1)) {
2412 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2413 "event queue, eth port: %" PRIu16 " adapter id: %"
2414 PRIu8, eth_dev_id, id);
2418 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2419 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2420 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2421 (uint16_t)rx_queue_id);
2425 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2427 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2428 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2430 if (dev_info->rx_queue == NULL) {
2431 dev_info->rx_queue =
2432 rte_zmalloc_socket(rx_adapter->mem_name,
2433 dev_info->dev->data->nb_rx_queues *
2434 sizeof(struct eth_rx_queue_info), 0,
2435 rx_adapter->socket_id);
2436 if (dev_info->rx_queue == NULL)
2440 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2441 &rte_eth_devices[eth_dev_id],
2442 rx_queue_id, queue_conf);
2444 dev_info->internal_event_port = 1;
2445 rxa_update_queue(rx_adapter,
2446 &rx_adapter->eth_devices[eth_dev_id],
2451 rte_spinlock_lock(&rx_adapter->rx_lock);
2452 dev_info->internal_event_port = 0;
2453 ret = rxa_init_service(rx_adapter, id);
2455 uint32_t service_id = rx_adapter->service_id;
2456 ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2458 rte_service_component_runstate_set(service_id,
2459 rxa_sw_adapter_queue_count(rx_adapter));
2461 rte_spinlock_unlock(&rx_adapter->rx_lock);
2464 rte_eventdev_trace_eth_rx_adapter_queue_add(id, eth_dev_id,
2465 rx_queue_id, queue_conf, ret);
2473 rxa_sw_vector_limits(struct rte_event_eth_rx_adapter_vector_limits *limits)
2475 limits->max_sz = MAX_VECTOR_SIZE;
2476 limits->min_sz = MIN_VECTOR_SIZE;
2477 limits->max_timeout_ns = MAX_VECTOR_NS;
2478 limits->min_timeout_ns = MIN_VECTOR_NS;
2484 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2485 int32_t rx_queue_id)
2488 struct rte_eventdev *dev;
2489 struct rte_event_eth_rx_adapter *rx_adapter;
2490 struct eth_device_info *dev_info;
2492 uint32_t nb_rx_poll = 0;
2493 uint32_t nb_wrr = 0;
2494 uint32_t nb_rx_intr;
2495 struct eth_rx_poll_entry *rx_poll = NULL;
2496 uint32_t *rx_wrr = NULL;
2499 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2500 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2502 rx_adapter = rxa_id_to_adapter(id);
2503 if (rx_adapter == NULL)
2506 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2507 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2513 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2514 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2515 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2516 (uint16_t)rx_queue_id);
2520 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2522 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2523 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2525 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2526 &rte_eth_devices[eth_dev_id],
2529 rxa_update_queue(rx_adapter,
2530 &rx_adapter->eth_devices[eth_dev_id],
2533 if (dev_info->nb_dev_queues == 0) {
2534 rte_free(dev_info->rx_queue);
2535 dev_info->rx_queue = NULL;
2539 rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2540 &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2542 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2547 rte_spinlock_lock(&rx_adapter->rx_lock);
2550 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2552 num_intr_vec = rxa_nb_intr_vect(dev_info,
2554 ret = rxa_del_intr_queue(rx_adapter, dev_info,
2560 if (nb_rx_intr == 0) {
2561 ret = rxa_free_intr_resources(rx_adapter);
2566 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2567 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2569 rte_free(rx_adapter->eth_rx_poll);
2570 rte_free(rx_adapter->wrr_sched);
2572 if (nb_rx_intr == 0) {
2573 rte_free(dev_info->intr_queue);
2574 dev_info->intr_queue = NULL;
2577 rx_adapter->eth_rx_poll = rx_poll;
2578 rx_adapter->wrr_sched = rx_wrr;
2579 rx_adapter->wrr_len = nb_wrr;
2580 rx_adapter->num_intr_vec += num_intr_vec;
2582 if (dev_info->nb_dev_queues == 0) {
2583 rte_free(dev_info->rx_queue);
2584 dev_info->rx_queue = NULL;
2587 rte_spinlock_unlock(&rx_adapter->rx_lock);
2594 rte_service_component_runstate_set(rx_adapter->service_id,
2595 rxa_sw_adapter_queue_count(rx_adapter));
2598 rte_eventdev_trace_eth_rx_adapter_queue_del(id, eth_dev_id,
2604 rte_event_eth_rx_adapter_vector_limits_get(
2605 uint8_t dev_id, uint16_t eth_port_id,
2606 struct rte_event_eth_rx_adapter_vector_limits *limits)
2608 struct rte_eventdev *dev;
2612 RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
2613 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_port_id, -EINVAL);
2618 dev = &rte_eventdevs[dev_id];
2620 ret = rte_event_eth_rx_adapter_caps_get(dev_id, eth_port_id, &cap);
2622 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2623 "eth port %" PRIu16,
2624 dev_id, eth_port_id);
2628 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2629 RTE_FUNC_PTR_OR_ERR_RET(
2630 *dev->dev_ops->eth_rx_adapter_vector_limits_get,
2632 ret = dev->dev_ops->eth_rx_adapter_vector_limits_get(
2633 dev, &rte_eth_devices[eth_port_id], limits);
2635 ret = rxa_sw_vector_limits(limits);
2642 rte_event_eth_rx_adapter_start(uint8_t id)
2644 rte_eventdev_trace_eth_rx_adapter_start(id);
2645 return rxa_ctrl(id, 1);
2649 rte_event_eth_rx_adapter_stop(uint8_t id)
2651 rte_eventdev_trace_eth_rx_adapter_stop(id);
2652 return rxa_ctrl(id, 0);
2656 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2657 struct rte_event_eth_rx_adapter_stats *stats)
2659 struct rte_event_eth_rx_adapter *rx_adapter;
2660 struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2661 struct rte_event_eth_rx_adapter_stats dev_stats;
2662 struct rte_eventdev *dev;
2663 struct eth_device_info *dev_info;
2667 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2669 rx_adapter = rxa_id_to_adapter(id);
2670 if (rx_adapter == NULL || stats == NULL)
2673 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2674 memset(stats, 0, sizeof(*stats));
2675 RTE_ETH_FOREACH_DEV(i) {
2676 dev_info = &rx_adapter->eth_devices[i];
2677 if (dev_info->internal_event_port == 0 ||
2678 dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2680 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2681 &rte_eth_devices[i],
2685 dev_stats_sum.rx_packets += dev_stats.rx_packets;
2686 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2689 if (rx_adapter->service_inited)
2690 *stats = rx_adapter->stats;
2692 stats->rx_packets += dev_stats_sum.rx_packets;
2693 stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2698 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
2700 struct rte_event_eth_rx_adapter *rx_adapter;
2701 struct rte_eventdev *dev;
2702 struct eth_device_info *dev_info;
2705 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2707 rx_adapter = rxa_id_to_adapter(id);
2708 if (rx_adapter == NULL)
2711 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2712 RTE_ETH_FOREACH_DEV(i) {
2713 dev_info = &rx_adapter->eth_devices[i];
2714 if (dev_info->internal_event_port == 0 ||
2715 dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
2717 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
2718 &rte_eth_devices[i]);
2721 memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
2726 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
2728 struct rte_event_eth_rx_adapter *rx_adapter;
2730 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2732 rx_adapter = rxa_id_to_adapter(id);
2733 if (rx_adapter == NULL || service_id == NULL)
2736 if (rx_adapter->service_inited)
2737 *service_id = rx_adapter->service_id;
2739 return rx_adapter->service_inited ? 0 : -ESRCH;
2743 rte_event_eth_rx_adapter_cb_register(uint8_t id,
2744 uint16_t eth_dev_id,
2745 rte_event_eth_rx_adapter_cb_fn cb_fn,
2748 struct rte_event_eth_rx_adapter *rx_adapter;
2749 struct eth_device_info *dev_info;
2753 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2754 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2756 rx_adapter = rxa_id_to_adapter(id);
2757 if (rx_adapter == NULL)
2760 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2761 if (dev_info->rx_queue == NULL)
2764 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2768 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2769 "eth port %" PRIu16, id, eth_dev_id);
2773 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2774 RTE_EDEV_LOG_ERR("Rx callback not supported for eth port %"
2775 PRIu16, eth_dev_id);
2779 rte_spinlock_lock(&rx_adapter->rx_lock);
2780 dev_info->cb_fn = cb_fn;
2781 dev_info->cb_arg = cb_arg;
2782 rte_spinlock_unlock(&rx_adapter->rx_lock);