1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2017 Intel Corporation.
10 #include <rte_cycles.h>
11 #include <rte_common.h>
13 #include <rte_errno.h>
14 #include <rte_ethdev.h>
16 #include <rte_malloc.h>
17 #include <rte_service_component.h>
18 #include <rte_thash.h>
19 #include <rte_interrupts.h>
21 #include "rte_eventdev.h"
22 #include "rte_eventdev_pmd.h"
23 #include "rte_event_eth_rx_adapter.h"
26 #define BLOCK_CNT_THRESHOLD 10
27 #define ETH_EVENT_BUFFER_SIZE (4*BATCH_SIZE)
29 #define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32
30 #define ETH_RX_ADAPTER_MEM_NAME_LEN 32
32 #define RSS_KEY_SIZE 40
33 /* value written to intr thread pipe to signal thread exit */
34 #define ETH_BRIDGE_INTR_THREAD_EXIT 1
35 /* Sentinel value to detect initialized file handle */
39 * Used to store port and queue ID of interrupting Rx queue
51 * There is an instance of this struct per polled Rx queue added to the
54 struct eth_rx_poll_entry {
55 /* Eth port to poll */
57 /* Eth rx queue to poll */
61 /* Instance per adapter */
62 struct rte_eth_event_enqueue_buffer {
63 /* Count of events in this buffer */
65 /* Array of events in this buffer */
66 struct rte_event events[ETH_EVENT_BUFFER_SIZE];
69 struct rte_event_eth_rx_adapter {
71 uint8_t rss_key_be[RSS_KEY_SIZE];
72 /* Event device identifier */
74 /* Per ethernet device structure */
75 struct eth_device_info *eth_devices;
76 /* Event port identifier */
77 uint8_t event_port_id;
78 /* Lock to serialize config updates with service function */
79 rte_spinlock_t rx_lock;
80 /* Max mbufs processed in any service function invocation */
82 /* Receive queues that need to be polled */
83 struct eth_rx_poll_entry *eth_rx_poll;
84 /* Size of the eth_rx_poll array */
85 uint16_t num_rx_polled;
86 /* Weighted round robin schedule */
88 /* wrr_sched[] size */
90 /* Next entry in wrr[] to begin polling */
92 /* Event burst buffer */
93 struct rte_eth_event_enqueue_buffer event_enqueue_buffer;
94 /* Per adapter stats */
95 struct rte_event_eth_rx_adapter_stats stats;
96 /* Block count, counts up to BLOCK_CNT_THRESHOLD */
97 uint16_t enq_block_count;
99 uint64_t rx_enq_block_start_ts;
100 /* epoll fd used to wait for Rx interrupts */
102 /* Num of interrupt driven interrupt queues */
103 uint32_t num_rx_intr;
104 /* Used to send <dev id, queue id> of interrupting Rx queues from
105 * the interrupt thread to the Rx thread
107 struct rte_ring *intr_ring;
108 /* Rx Queue data (dev id, queue id) for the last non-empty
112 /* queue_data is valid */
114 /* Interrupt ring lock, synchronizes Rx thread
115 * and interrupt thread
117 rte_spinlock_t intr_ring_lock;
118 /* event array passed to rte_poll_wait */
119 struct rte_epoll_event *epoll_events;
120 /* Count of interrupt vectors in use */
121 uint32_t num_intr_vec;
122 /* Thread blocked on Rx interrupts */
123 pthread_t rx_intr_thread;
124 /* Configuration callback for rte_service configuration */
125 rte_event_eth_rx_adapter_conf_cb conf_cb;
126 /* Configuration callback argument */
128 /* Set if default_cb is being used */
130 /* Service initialization state */
131 uint8_t service_inited;
132 /* Total count of Rx queues in adapter */
134 /* Memory allocation name */
135 char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN];
136 /* Socket identifier cached from eventdev */
138 /* Per adapter EAL service */
140 /* Adapter started flag */
144 } __rte_cache_aligned;
147 struct eth_device_info {
148 struct rte_eth_dev *dev;
149 struct eth_rx_queue_info *rx_queue;
150 /* Set if ethdev->eventdev packet transfer uses a
153 uint8_t internal_event_port;
154 /* Set if the adapter is processing rx queues for
155 * this eth device and packet processing has been
156 * started, allows for the code to know if the PMD
157 * rx_adapter_stop callback needs to be invoked
159 uint8_t dev_rx_started;
160 /* Number of queues added for this device */
161 uint16_t nb_dev_queues;
162 /* Number of poll based queues
163 * If nb_rx_poll > 0, the start callback will
164 * be invoked if not already invoked
167 /* Number of interrupt based queues
168 * If nb_rx_intr > 0, the start callback will
169 * be invoked if not already invoked.
172 /* Number of queues that use the shared interrupt */
173 uint16_t nb_shared_intr;
174 /* sum(wrr(q)) for all queues within the device
175 * useful when deleting all device queues
178 /* Intr based queue index to start polling from, this is used
179 * if the number of shared interrupts is non-zero
182 /* Intr based queue indices */
183 uint16_t *intr_queue;
184 /* device generates per Rx queue interrupt for queue index
185 * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1
188 /* shared interrupt enabled */
189 int shared_intr_enabled;
193 struct eth_rx_queue_info {
194 int queue_enabled; /* True if added */
196 uint16_t wt; /* Polling weight */
197 uint8_t event_queue_id; /* Event queue to enqueue packets to */
198 uint8_t sched_type; /* Sched type for events */
199 uint8_t priority; /* Event priority */
200 uint32_t flow_id; /* App provided flow identifier */
201 uint32_t flow_id_mask; /* Set to ~0 if app provides flow id else 0 */
204 static struct rte_event_eth_rx_adapter **event_eth_rx_adapter;
207 rxa_validate_id(uint8_t id)
209 return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
212 #define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \
213 if (!rxa_validate_id(id)) { \
214 RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \
220 rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter)
222 return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr;
225 /* Greatest common divisor */
226 static uint16_t rxa_gcd_u16(uint16_t a, uint16_t b)
230 return r ? rxa_gcd_u16(b, r) : b;
233 /* Returns the next queue in the polling sequence
235 * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling
238 rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter,
239 unsigned int n, int *cw,
240 struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt,
241 uint16_t gcd, int prev)
257 q = eth_rx_poll[i].eth_rx_qid;
258 d = eth_rx_poll[i].eth_dev_id;
259 w = rx_adapter->eth_devices[d].rx_queue[q].wt;
267 rxa_shared_intr(struct eth_device_info *dev_info,
272 if (dev_info->dev->intr_handle == NULL)
275 multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle);
276 return !multi_intr_cap ||
277 rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1;
281 rxa_intr_queue(struct eth_device_info *dev_info,
284 struct eth_rx_queue_info *queue_info;
286 queue_info = &dev_info->rx_queue[rx_queue_id];
287 return dev_info->rx_queue &&
288 !dev_info->internal_event_port &&
289 queue_info->queue_enabled && queue_info->wt == 0;
293 rxa_polled_queue(struct eth_device_info *dev_info,
296 struct eth_rx_queue_info *queue_info;
298 queue_info = &dev_info->rx_queue[rx_queue_id];
299 return !dev_info->internal_event_port &&
300 dev_info->rx_queue &&
301 queue_info->queue_enabled && queue_info->wt != 0;
304 /* Calculate change in number of vectors after Rx queue ID is add/deleted */
306 rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add)
312 nbq = dev_info->dev->data->nb_rx_queues;
313 n = 0; /* non shared count */
314 s = 0; /* shared count */
316 if (rx_queue_id == -1) {
317 for (i = 0; i < nbq; i++) {
318 if (!rxa_shared_intr(dev_info, i))
319 n += add ? !rxa_intr_queue(dev_info, i) :
320 rxa_intr_queue(dev_info, i);
322 s += add ? !rxa_intr_queue(dev_info, i) :
323 rxa_intr_queue(dev_info, i);
327 if ((add && dev_info->nb_shared_intr == 0) ||
328 (!add && dev_info->nb_shared_intr))
332 if (!rxa_shared_intr(dev_info, rx_queue_id))
333 n = add ? !rxa_intr_queue(dev_info, rx_queue_id) :
334 rxa_intr_queue(dev_info, rx_queue_id);
336 n = add ? !dev_info->nb_shared_intr :
337 dev_info->nb_shared_intr == 1;
343 /* Calculate nb_rx_intr after deleting interrupt mode rx queues
346 rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter,
347 struct eth_device_info *dev_info,
349 uint32_t *nb_rx_intr)
353 if (rx_queue_id == -1)
354 intr_diff = dev_info->nb_rx_intr;
356 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
358 *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
361 /* Calculate nb_rx_* after adding interrupt mode rx queues, newly added
362 * interrupt queues could currently be poll mode Rx queues
365 rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter,
366 struct eth_device_info *dev_info,
368 uint32_t *nb_rx_poll,
369 uint32_t *nb_rx_intr,
374 uint32_t wrr_len_diff;
376 if (rx_queue_id == -1) {
377 intr_diff = dev_info->dev->data->nb_rx_queues -
378 dev_info->nb_rx_intr;
379 poll_diff = dev_info->nb_rx_poll;
380 wrr_len_diff = dev_info->wrr_len;
382 intr_diff = !rxa_intr_queue(dev_info, rx_queue_id);
383 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
384 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
388 *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff;
389 *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
390 *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
393 /* Calculate size of the eth_rx_poll and wrr_sched arrays
394 * after deleting poll mode rx queues
397 rxa_calc_nb_post_poll_del(struct rte_event_eth_rx_adapter *rx_adapter,
398 struct eth_device_info *dev_info,
400 uint32_t *nb_rx_poll,
404 uint32_t wrr_len_diff;
406 if (rx_queue_id == -1) {
407 poll_diff = dev_info->nb_rx_poll;
408 wrr_len_diff = dev_info->wrr_len;
410 poll_diff = rxa_polled_queue(dev_info, rx_queue_id);
411 wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt :
415 *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff;
416 *nb_wrr = rx_adapter->wrr_len - wrr_len_diff;
419 /* Calculate nb_rx_* after adding poll mode rx queues
422 rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter,
423 struct eth_device_info *dev_info,
426 uint32_t *nb_rx_poll,
427 uint32_t *nb_rx_intr,
432 uint32_t wrr_len_diff;
434 if (rx_queue_id == -1) {
435 intr_diff = dev_info->nb_rx_intr;
436 poll_diff = dev_info->dev->data->nb_rx_queues -
437 dev_info->nb_rx_poll;
438 wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues
441 intr_diff = rxa_intr_queue(dev_info, rx_queue_id);
442 poll_diff = !rxa_polled_queue(dev_info, rx_queue_id);
443 wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ?
444 wt - dev_info->rx_queue[rx_queue_id].wt :
448 *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff;
449 *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff;
450 *nb_wrr = rx_adapter->wrr_len + wrr_len_diff;
453 /* Calculate nb_rx_* after adding rx_queue_id */
455 rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter,
456 struct eth_device_info *dev_info,
459 uint32_t *nb_rx_poll,
460 uint32_t *nb_rx_intr,
464 rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id,
465 wt, nb_rx_poll, nb_rx_intr, nb_wrr);
467 rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id,
468 nb_rx_poll, nb_rx_intr, nb_wrr);
471 /* Calculate nb_rx_* after deleting rx_queue_id */
473 rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter,
474 struct eth_device_info *dev_info,
476 uint32_t *nb_rx_poll,
477 uint32_t *nb_rx_intr,
480 rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll,
482 rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id,
487 * Allocate the rx_poll array
489 static struct eth_rx_poll_entry *
490 rxa_alloc_poll(struct rte_event_eth_rx_adapter *rx_adapter,
491 uint32_t num_rx_polled)
495 len = RTE_ALIGN(num_rx_polled * sizeof(*rx_adapter->eth_rx_poll),
496 RTE_CACHE_LINE_SIZE);
497 return rte_zmalloc_socket(rx_adapter->mem_name,
500 rx_adapter->socket_id);
504 * Allocate the WRR array
507 rxa_alloc_wrr(struct rte_event_eth_rx_adapter *rx_adapter, int nb_wrr)
511 len = RTE_ALIGN(nb_wrr * sizeof(*rx_adapter->wrr_sched),
512 RTE_CACHE_LINE_SIZE);
513 return rte_zmalloc_socket(rx_adapter->mem_name,
516 rx_adapter->socket_id);
520 rxa_alloc_poll_arrays(struct rte_event_eth_rx_adapter *rx_adapter,
523 struct eth_rx_poll_entry **rx_poll,
524 uint32_t **wrr_sched)
533 *rx_poll = rxa_alloc_poll(rx_adapter, nb_poll);
534 if (*rx_poll == NULL) {
539 *wrr_sched = rxa_alloc_wrr(rx_adapter, nb_wrr);
540 if (*wrr_sched == NULL) {
547 /* Precalculate WRR polling sequence for all queues in rx_adapter */
549 rxa_calc_wrr_sequence(struct rte_event_eth_rx_adapter *rx_adapter,
550 struct eth_rx_poll_entry *rx_poll,
559 /* Initialize variables for calculation of wrr schedule */
560 uint16_t max_wrr_pos = 0;
561 unsigned int poll_q = 0;
568 /* Generate array of all queues to poll, the size of this
571 RTE_ETH_FOREACH_DEV(d) {
572 uint16_t nb_rx_queues;
573 struct eth_device_info *dev_info =
574 &rx_adapter->eth_devices[d];
575 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
576 if (dev_info->rx_queue == NULL)
578 if (dev_info->internal_event_port)
580 dev_info->wrr_len = 0;
581 for (q = 0; q < nb_rx_queues; q++) {
582 struct eth_rx_queue_info *queue_info =
583 &dev_info->rx_queue[q];
586 if (!rxa_polled_queue(dev_info, q))
589 rx_poll[poll_q].eth_dev_id = d;
590 rx_poll[poll_q].eth_rx_qid = q;
592 dev_info->wrr_len += wt;
593 max_wt = RTE_MAX(max_wt, wt);
594 gcd = (gcd) ? rxa_gcd_u16(gcd, wt) : wt;
599 /* Generate polling sequence based on weights */
602 for (i = 0; i < max_wrr_pos; i++) {
603 rx_wrr[i] = rxa_wrr_next(rx_adapter, poll_q, &cw,
604 rx_poll, max_wt, gcd, prev);
610 rxa_mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr,
611 struct ipv6_hdr **ipv6_hdr)
613 struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(m, struct ether_hdr *);
614 struct vlan_hdr *vlan_hdr;
619 switch (eth_hdr->ether_type) {
620 case RTE_BE16(ETHER_TYPE_IPv4):
621 *ipv4_hdr = (struct ipv4_hdr *)(eth_hdr + 1);
624 case RTE_BE16(ETHER_TYPE_IPv6):
625 *ipv6_hdr = (struct ipv6_hdr *)(eth_hdr + 1);
628 case RTE_BE16(ETHER_TYPE_VLAN):
629 vlan_hdr = (struct vlan_hdr *)(eth_hdr + 1);
630 switch (vlan_hdr->eth_proto) {
631 case RTE_BE16(ETHER_TYPE_IPv4):
632 *ipv4_hdr = (struct ipv4_hdr *)(vlan_hdr + 1);
634 case RTE_BE16(ETHER_TYPE_IPv6):
635 *ipv6_hdr = (struct ipv6_hdr *)(vlan_hdr + 1);
647 /* Calculate RSS hash for IPv4/6 */
648 static inline uint32_t
649 rxa_do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be)
653 struct rte_ipv4_tuple ipv4_tuple;
654 struct rte_ipv6_tuple ipv6_tuple;
655 struct ipv4_hdr *ipv4_hdr;
656 struct ipv6_hdr *ipv6_hdr;
658 rxa_mtoip(m, &ipv4_hdr, &ipv6_hdr);
661 ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr);
662 ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr);
664 input_len = RTE_THASH_V4_L3_LEN;
665 } else if (ipv6_hdr) {
666 rte_thash_load_v6_addrs(ipv6_hdr,
667 (union rte_thash_tuple *)&ipv6_tuple);
669 input_len = RTE_THASH_V6_L3_LEN;
673 return rte_softrss_be(tuple, input_len, rss_key_be);
677 rxa_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter)
679 return !!rx_adapter->enq_block_count;
683 rxa_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter)
685 if (rx_adapter->rx_enq_block_start_ts)
688 rx_adapter->enq_block_count++;
689 if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD)
692 rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles();
696 rxa_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter,
697 struct rte_event_eth_rx_adapter_stats *stats)
699 if (unlikely(!stats->rx_enq_start_ts))
700 stats->rx_enq_start_ts = rte_get_tsc_cycles();
702 if (likely(!rxa_enq_blocked(rx_adapter)))
705 rx_adapter->enq_block_count = 0;
706 if (rx_adapter->rx_enq_block_start_ts) {
707 stats->rx_enq_end_ts = rte_get_tsc_cycles();
708 stats->rx_enq_block_cycles += stats->rx_enq_end_ts -
709 rx_adapter->rx_enq_block_start_ts;
710 rx_adapter->rx_enq_block_start_ts = 0;
714 /* Add event to buffer, free space check is done prior to calling
718 rxa_buffer_event(struct rte_event_eth_rx_adapter *rx_adapter,
719 struct rte_event *ev)
721 struct rte_eth_event_enqueue_buffer *buf =
722 &rx_adapter->event_enqueue_buffer;
723 rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event));
726 /* Enqueue buffered events to event device */
727 static inline uint16_t
728 rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
730 struct rte_eth_event_enqueue_buffer *buf =
731 &rx_adapter->event_enqueue_buffer;
732 struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
734 uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
735 rx_adapter->event_port_id,
738 if (n != buf->count) {
741 (buf->count - n) * sizeof(struct rte_event));
742 stats->rx_enq_retry++;
745 n ? rxa_enq_block_end_ts(rx_adapter, stats) :
746 rxa_enq_block_start_ts(rx_adapter);
749 stats->rx_enq_count += n;
755 rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
757 uint16_t rx_queue_id,
758 struct rte_mbuf **mbufs,
762 struct eth_device_info *eth_device_info =
763 &rx_adapter->eth_devices[eth_dev_id];
764 struct eth_rx_queue_info *eth_rx_queue_info =
765 ð_device_info->rx_queue[rx_queue_id];
767 int32_t qid = eth_rx_queue_info->event_queue_id;
768 uint8_t sched_type = eth_rx_queue_info->sched_type;
769 uint8_t priority = eth_rx_queue_info->priority;
771 struct rte_event events[BATCH_SIZE];
772 struct rte_mbuf *m = mbufs[0];
778 /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */
779 rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
780 do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
782 if ((m->ol_flags & PKT_RX_TIMESTAMP) == 0) {
783 ts = rte_get_tsc_cycles();
784 for (i = 0; i < num; i++) {
788 m->ol_flags |= PKT_RX_TIMESTAMP;
792 for (i = 0; i < num; i++) {
794 struct rte_event *ev = &events[i];
797 rxa_do_softrss(m, rx_adapter->rss_key_be) :
800 eth_rx_queue_info->flow_id &
801 eth_rx_queue_info->flow_id_mask;
802 flow_id |= rss & ~eth_rx_queue_info->flow_id_mask;
803 ev->flow_id = flow_id;
804 ev->op = RTE_EVENT_OP_NEW;
805 ev->sched_type = sched_type;
807 ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER;
808 ev->sub_event_type = 0;
809 ev->priority = priority;
812 rxa_buffer_event(rx_adapter, ev);
816 /* Enqueue packets from <port, q> to event buffer */
817 static inline uint32_t
818 rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
825 struct rte_mbuf *mbufs[BATCH_SIZE];
826 struct rte_eth_event_enqueue_buffer *buf =
827 &rx_adapter->event_enqueue_buffer;
828 struct rte_event_eth_rx_adapter_stats *stats =
835 /* Don't do a batch dequeue from the rx queue if there isn't
836 * enough space in the enqueue buffer.
838 while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
839 if (buf->count >= BATCH_SIZE)
840 rxa_flush_event_buffer(rx_adapter);
842 stats->rx_poll_count++;
843 n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE);
849 rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n);
851 if (rx_count + nb_rx > max_rx)
855 if (buf->count >= BATCH_SIZE)
856 rxa_flush_event_buffer(rx_adapter);
862 rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter,
869 struct eth_device_info *dev_info;
870 struct eth_rx_queue_info *queue_info;
877 dev_info = &rx_adapter->eth_devices[port_id];
878 queue_info = &dev_info->rx_queue[queue];
879 rte_spinlock_lock(&rx_adapter->intr_ring_lock);
880 if (rxa_shared_intr(dev_info, queue))
881 intr_enabled = &dev_info->shared_intr_enabled;
883 intr_enabled = &queue_info->intr_enabled;
887 err = rte_ring_enqueue(rx_adapter->intr_ring, data);
888 /* Entry should always be available.
889 * The ring size equals the maximum number of interrupt
890 * vectors supported (an interrupt vector is shared in
891 * case of shared interrupts)
894 RTE_EDEV_LOG_ERR("Failed to enqueue interrupt"
895 " to ring: %s", strerror(err));
897 rte_eth_dev_rx_intr_disable(port_id, queue);
899 rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
903 rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter,
904 uint32_t num_intr_vec)
906 if (rx_adapter->num_intr_vec + num_intr_vec >
907 RTE_EVENT_ETH_INTR_RING_SIZE) {
908 RTE_EDEV_LOG_ERR("Exceeded intr ring slots current"
909 " %d needed %d limit %d", rx_adapter->num_intr_vec,
910 num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE);
917 /* Delete entries for (dev, queue) from the interrupt ring */
919 rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter,
920 struct eth_device_info *dev_info,
921 uint16_t rx_queue_id)
926 rte_spinlock_lock(&rx_adapter->intr_ring_lock);
928 n = rte_ring_count(rx_adapter->intr_ring);
929 for (i = 0; i < n; i++) {
930 rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
931 if (!rxa_shared_intr(dev_info, rx_queue_id)) {
932 if (qd.port == dev_info->dev->data->port_id &&
933 qd.queue == rx_queue_id)
936 if (qd.port == dev_info->dev->data->port_id)
939 rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr);
942 rte_spinlock_unlock(&rx_adapter->intr_ring_lock);
945 /* pthread callback handling interrupt mode receive queues
946 * After receiving an Rx interrupt, it enqueues the port id and queue id of the
947 * interrupting queue to the adapter's ring buffer for interrupt events.
948 * These events are picked up by rxa_intr_ring_dequeue() which is invoked from
949 * the adapter service function.
952 rxa_intr_thread(void *arg)
954 struct rte_event_eth_rx_adapter *rx_adapter = arg;
955 struct rte_epoll_event *epoll_events = rx_adapter->epoll_events;
959 n = rte_epoll_wait(rx_adapter->epd, epoll_events,
960 RTE_EVENT_ETH_INTR_RING_SIZE, -1);
962 RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d",
964 for (i = 0; i < n; i++) {
965 rxa_intr_ring_enqueue(rx_adapter,
966 epoll_events[i].epdata.data);
973 /* Dequeue <port, q> from interrupt ring and enqueue received
976 static inline uint32_t
977 rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
982 struct rte_eth_event_enqueue_buffer *buf;
983 rte_spinlock_t *ring_lock;
984 uint8_t max_done = 0;
986 if (rx_adapter->num_rx_intr == 0)
989 if (rte_ring_count(rx_adapter->intr_ring) == 0
990 && !rx_adapter->qd_valid)
993 buf = &rx_adapter->event_enqueue_buffer;
994 ring_lock = &rx_adapter->intr_ring_lock;
996 if (buf->count >= BATCH_SIZE)
997 rxa_flush_event_buffer(rx_adapter);
999 while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
1000 struct eth_device_info *dev_info;
1003 union queue_data qd = rx_adapter->qd;
1006 if (!rx_adapter->qd_valid) {
1007 struct eth_rx_queue_info *queue_info;
1009 rte_spinlock_lock(ring_lock);
1010 err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr);
1012 rte_spinlock_unlock(ring_lock);
1018 rx_adapter->qd = qd;
1019 rx_adapter->qd_valid = 1;
1020 dev_info = &rx_adapter->eth_devices[port];
1021 if (rxa_shared_intr(dev_info, queue))
1022 dev_info->shared_intr_enabled = 1;
1024 queue_info = &dev_info->rx_queue[queue];
1025 queue_info->intr_enabled = 1;
1027 rte_eth_dev_rx_intr_enable(port, queue);
1028 rte_spinlock_unlock(ring_lock);
1033 dev_info = &rx_adapter->eth_devices[port];
1036 if (rxa_shared_intr(dev_info, queue)) {
1040 nb_queues = dev_info->dev->data->nb_rx_queues;
1042 for (i = dev_info->next_q_idx; i < nb_queues; i++) {
1043 uint8_t enq_buffer_full;
1045 if (!rxa_intr_queue(dev_info, i))
1047 n = rxa_eth_rx(rx_adapter, port, i, nb_rx,
1048 rx_adapter->max_nb_rx,
1052 enq_buffer_full = !rxq_empty && n == 0;
1053 max_done = nb_rx > rx_adapter->max_nb_rx;
1055 if (enq_buffer_full || max_done) {
1056 dev_info->next_q_idx = i;
1061 rx_adapter->qd_valid = 0;
1063 /* Reinitialize for next interrupt */
1064 dev_info->next_q_idx = dev_info->multi_intr_cap ?
1065 RTE_MAX_RXTX_INTR_VEC_ID - 1 :
1068 n = rxa_eth_rx(rx_adapter, port, queue, nb_rx,
1069 rx_adapter->max_nb_rx,
1071 rx_adapter->qd_valid = !rxq_empty;
1073 if (nb_rx > rx_adapter->max_nb_rx)
1079 rx_adapter->stats.rx_intr_packets += nb_rx;
1084 * Polls receive queues added to the event adapter and enqueues received
1085 * packets to the event device.
1087 * The receive code enqueues initially to a temporary buffer, the
1088 * temporary buffer is drained anytime it holds >= BATCH_SIZE packets
1090 * If there isn't space available in the temporary buffer, packets from the
1091 * Rx queue aren't dequeued from the eth device, this back pressures the
1092 * eth device, in virtual device environments this back pressure is relayed to
1093 * the hypervisor's switching layer where adjustments can be made to deal with
1096 static inline uint32_t
1097 rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
1101 struct rte_eth_event_enqueue_buffer *buf;
1105 wrr_pos = rx_adapter->wrr_pos;
1106 max_nb_rx = rx_adapter->max_nb_rx;
1107 buf = &rx_adapter->event_enqueue_buffer;
1108 stats = &rx_adapter->stats;
1110 /* Iterate through a WRR sequence */
1111 for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) {
1112 unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos];
1113 uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid;
1114 uint16_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id;
1116 /* Don't do a batch dequeue from the rx queue if there isn't
1117 * enough space in the enqueue buffer.
1119 if (buf->count >= BATCH_SIZE)
1120 rxa_flush_event_buffer(rx_adapter);
1121 if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
1122 rx_adapter->wrr_pos = wrr_pos;
1126 nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx,
1128 if (nb_rx > max_nb_rx) {
1129 rx_adapter->wrr_pos =
1130 (wrr_pos + 1) % rx_adapter->wrr_len;
1134 if (++wrr_pos == rx_adapter->wrr_len)
1141 rxa_service_func(void *args)
1143 struct rte_event_eth_rx_adapter *rx_adapter = args;
1144 struct rte_event_eth_rx_adapter_stats *stats;
1146 if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0)
1148 if (!rx_adapter->rxa_started) {
1150 rte_spinlock_unlock(&rx_adapter->rx_lock);
1153 stats = &rx_adapter->stats;
1154 stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter);
1155 stats->rx_packets += rxa_poll(rx_adapter);
1156 rte_spinlock_unlock(&rx_adapter->rx_lock);
1161 rte_event_eth_rx_adapter_init(void)
1163 const char *name = "rte_event_eth_rx_adapter_array";
1164 const struct rte_memzone *mz;
1167 sz = sizeof(*event_eth_rx_adapter) *
1168 RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE;
1169 sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE);
1171 mz = rte_memzone_lookup(name);
1173 mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0,
1174 RTE_CACHE_LINE_SIZE);
1176 RTE_EDEV_LOG_ERR("failed to reserve memzone err = %"
1182 event_eth_rx_adapter = mz->addr;
1186 static inline struct rte_event_eth_rx_adapter *
1187 rxa_id_to_adapter(uint8_t id)
1189 return event_eth_rx_adapter ?
1190 event_eth_rx_adapter[id] : NULL;
1194 rxa_default_conf_cb(uint8_t id, uint8_t dev_id,
1195 struct rte_event_eth_rx_adapter_conf *conf, void *arg)
1198 struct rte_eventdev *dev;
1199 struct rte_event_dev_config dev_conf;
1202 struct rte_event_port_conf *port_conf = arg;
1203 struct rte_event_eth_rx_adapter *rx_adapter = rxa_id_to_adapter(id);
1205 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1206 dev_conf = dev->data->dev_conf;
1208 started = dev->data->dev_started;
1210 rte_event_dev_stop(dev_id);
1211 port_id = dev_conf.nb_event_ports;
1212 dev_conf.nb_event_ports += 1;
1213 ret = rte_event_dev_configure(dev_id, &dev_conf);
1215 RTE_EDEV_LOG_ERR("failed to configure event dev %u\n",
1218 if (rte_event_dev_start(dev_id))
1224 ret = rte_event_port_setup(dev_id, port_id, port_conf);
1226 RTE_EDEV_LOG_ERR("failed to setup event port %u\n",
1231 conf->event_port_id = port_id;
1232 conf->max_nb_rx = 128;
1234 ret = rte_event_dev_start(dev_id);
1235 rx_adapter->default_cb_arg = 1;
1240 rxa_epoll_create1(void)
1244 fd = epoll_create1(EPOLL_CLOEXEC);
1245 return fd < 0 ? -errno : fd;
1252 rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter)
1254 if (rx_adapter->epd != INIT_FD)
1257 rx_adapter->epd = rxa_epoll_create1();
1258 if (rx_adapter->epd < 0) {
1259 int err = rx_adapter->epd;
1260 rx_adapter->epd = INIT_FD;
1261 RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err);
1269 rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1272 char thread_name[RTE_MAX_THREAD_NAME_LEN];
1274 if (rx_adapter->intr_ring)
1277 rx_adapter->intr_ring = rte_ring_create("intr_ring",
1278 RTE_EVENT_ETH_INTR_RING_SIZE,
1279 rte_socket_id(), 0);
1280 if (!rx_adapter->intr_ring)
1283 rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name,
1284 RTE_EVENT_ETH_INTR_RING_SIZE *
1285 sizeof(struct rte_epoll_event),
1286 RTE_CACHE_LINE_SIZE,
1287 rx_adapter->socket_id);
1288 if (!rx_adapter->epoll_events) {
1293 rte_spinlock_init(&rx_adapter->intr_ring_lock);
1295 snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN,
1296 "rx-intr-thread-%d", rx_adapter->id);
1298 err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name,
1299 NULL, rxa_intr_thread, rx_adapter);
1301 rte_thread_setname(rx_adapter->rx_intr_thread, thread_name);
1305 RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err);
1307 rte_ring_free(rx_adapter->intr_ring);
1308 rx_adapter->intr_ring = NULL;
1309 rx_adapter->epoll_events = NULL;
1314 rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter)
1318 err = pthread_cancel(rx_adapter->rx_intr_thread);
1320 RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n",
1323 err = pthread_join(rx_adapter->rx_intr_thread, NULL);
1325 RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err);
1327 rte_free(rx_adapter->epoll_events);
1328 rte_ring_free(rx_adapter->intr_ring);
1329 rx_adapter->intr_ring = NULL;
1330 rx_adapter->epoll_events = NULL;
1335 rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter)
1339 if (rx_adapter->num_rx_intr == 0)
1342 ret = rxa_destroy_intr_thread(rx_adapter);
1346 close(rx_adapter->epd);
1347 rx_adapter->epd = INIT_FD;
1353 rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1354 struct eth_device_info *dev_info,
1355 uint16_t rx_queue_id)
1358 uint16_t eth_dev_id = dev_info->dev->data->port_id;
1359 int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1361 err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1363 RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u",
1368 err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1373 RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err);
1376 dev_info->rx_queue[rx_queue_id].intr_enabled = 0;
1378 dev_info->shared_intr_enabled = 0;
1383 rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1384 struct eth_device_info *dev_info,
1391 if (dev_info->nb_rx_intr == 0)
1395 if (rx_queue_id == -1) {
1396 s = dev_info->nb_shared_intr;
1397 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1401 q = dev_info->intr_queue[i];
1402 sintr = rxa_shared_intr(dev_info, q);
1405 if (!sintr || s == 0) {
1407 err = rxa_disable_intr(rx_adapter, dev_info,
1411 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1416 if (!rxa_intr_queue(dev_info, rx_queue_id))
1418 if (!rxa_shared_intr(dev_info, rx_queue_id) ||
1419 dev_info->nb_shared_intr == 1) {
1420 err = rxa_disable_intr(rx_adapter, dev_info,
1424 rxa_intr_ring_del_entries(rx_adapter, dev_info,
1428 for (i = 0; i < dev_info->nb_rx_intr; i++) {
1429 if (dev_info->intr_queue[i] == rx_queue_id) {
1430 for (; i < dev_info->nb_rx_intr - 1; i++)
1431 dev_info->intr_queue[i] =
1432 dev_info->intr_queue[i + 1];
1442 rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter,
1443 struct eth_device_info *dev_info,
1444 uint16_t rx_queue_id)
1447 uint16_t eth_dev_id = dev_info->dev->data->port_id;
1448 union queue_data qd;
1450 uint16_t *intr_queue;
1451 int sintr = rxa_shared_intr(dev_info, rx_queue_id);
1453 if (rxa_intr_queue(dev_info, rx_queue_id))
1456 intr_queue = dev_info->intr_queue;
1457 if (dev_info->intr_queue == NULL) {
1459 dev_info->dev->data->nb_rx_queues * sizeof(uint16_t);
1460 dev_info->intr_queue =
1462 rx_adapter->mem_name,
1465 rx_adapter->socket_id);
1466 if (dev_info->intr_queue == NULL)
1470 init_fd = rx_adapter->epd;
1471 err = rxa_init_epd(rx_adapter);
1473 goto err_free_queue;
1475 qd.port = eth_dev_id;
1476 qd.queue = rx_queue_id;
1478 err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1483 RTE_EDEV_LOG_ERR("Failed to add interrupt event for"
1484 " Rx Queue %u err %d", rx_queue_id, err);
1488 err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id);
1490 RTE_EDEV_LOG_ERR("Could not enable interrupt for"
1491 " Rx Queue %u err %d", rx_queue_id, err);
1496 err = rxa_create_intr_thread(rx_adapter);
1499 dev_info->shared_intr_enabled = 1;
1501 dev_info->rx_queue[rx_queue_id].intr_enabled = 1;
1506 err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id);
1508 RTE_EDEV_LOG_ERR("Could not disable interrupt for"
1509 " Rx Queue %u err %d", rx_queue_id, err);
1511 err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id,
1516 RTE_EDEV_LOG_ERR("Could not delete event for"
1517 " Rx Queue %u err %d", rx_queue_id, err1);
1520 if (init_fd == INIT_FD) {
1521 close(rx_adapter->epd);
1522 rx_adapter->epd = -1;
1525 if (intr_queue == NULL)
1526 rte_free(dev_info->intr_queue);
1532 rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1533 struct eth_device_info *dev_info,
1539 int shared_done = (dev_info->nb_shared_intr > 0);
1541 if (rx_queue_id != -1) {
1542 if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done)
1544 return rxa_config_intr(rx_adapter, dev_info, rx_queue_id);
1548 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) {
1550 if (rxa_shared_intr(dev_info, i) && shared_done)
1553 err = rxa_config_intr(rx_adapter, dev_info, i);
1555 shared_done = err == 0 && rxa_shared_intr(dev_info, i);
1558 dev_info->shared_intr_enabled = 1;
1567 shared_done = (dev_info->nb_shared_intr > 0);
1568 for (j = 0; j < i; j++) {
1569 if (rxa_intr_queue(dev_info, j))
1571 if (rxa_shared_intr(dev_info, j) && si != j)
1573 err = rxa_disable_intr(rx_adapter, dev_info, j);
1584 rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id)
1587 struct rte_service_spec service;
1588 struct rte_event_eth_rx_adapter_conf rx_adapter_conf;
1590 if (rx_adapter->service_inited)
1593 memset(&service, 0, sizeof(service));
1594 snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN,
1595 "rte_event_eth_rx_adapter_%d", id);
1596 service.socket_id = rx_adapter->socket_id;
1597 service.callback = rxa_service_func;
1598 service.callback_userdata = rx_adapter;
1599 /* Service function handles locking for queue add/del updates */
1600 service.capabilities = RTE_SERVICE_CAP_MT_SAFE;
1601 ret = rte_service_component_register(&service, &rx_adapter->service_id);
1603 RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32,
1608 ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id,
1609 &rx_adapter_conf, rx_adapter->conf_arg);
1611 RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32,
1615 rx_adapter->event_port_id = rx_adapter_conf.event_port_id;
1616 rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx;
1617 rx_adapter->service_inited = 1;
1618 rx_adapter->epd = INIT_FD;
1622 rte_service_component_unregister(rx_adapter->service_id);
1627 rxa_update_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1628 struct eth_device_info *dev_info,
1629 int32_t rx_queue_id,
1632 struct eth_rx_queue_info *queue_info;
1636 if (dev_info->rx_queue == NULL)
1639 if (rx_queue_id == -1) {
1640 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1641 rxa_update_queue(rx_adapter, dev_info, i, add);
1643 queue_info = &dev_info->rx_queue[rx_queue_id];
1644 enabled = queue_info->queue_enabled;
1646 rx_adapter->nb_queues += !enabled;
1647 dev_info->nb_dev_queues += !enabled;
1649 rx_adapter->nb_queues -= enabled;
1650 dev_info->nb_dev_queues -= enabled;
1652 queue_info->queue_enabled = !!add;
1657 rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter,
1658 struct eth_device_info *dev_info,
1659 int32_t rx_queue_id)
1666 if (rx_adapter->nb_queues == 0)
1669 if (rx_queue_id == -1) {
1670 uint16_t nb_rx_queues;
1673 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1674 for (i = 0; i < nb_rx_queues; i++)
1675 rxa_sw_del(rx_adapter, dev_info, i);
1679 pollq = rxa_polled_queue(dev_info, rx_queue_id);
1680 intrq = rxa_intr_queue(dev_info, rx_queue_id);
1681 sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1682 rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0);
1683 rx_adapter->num_rx_polled -= pollq;
1684 dev_info->nb_rx_poll -= pollq;
1685 rx_adapter->num_rx_intr -= intrq;
1686 dev_info->nb_rx_intr -= intrq;
1687 dev_info->nb_shared_intr -= intrq && sintrq;
1691 rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter,
1692 struct eth_device_info *dev_info,
1693 int32_t rx_queue_id,
1694 const struct rte_event_eth_rx_adapter_queue_conf *conf)
1696 struct eth_rx_queue_info *queue_info;
1697 const struct rte_event *ev = &conf->ev;
1702 if (rx_queue_id == -1) {
1703 uint16_t nb_rx_queues;
1706 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1707 for (i = 0; i < nb_rx_queues; i++)
1708 rxa_add_queue(rx_adapter, dev_info, i, conf);
1712 pollq = rxa_polled_queue(dev_info, rx_queue_id);
1713 intrq = rxa_intr_queue(dev_info, rx_queue_id);
1714 sintrq = rxa_shared_intr(dev_info, rx_queue_id);
1716 queue_info = &dev_info->rx_queue[rx_queue_id];
1717 queue_info->event_queue_id = ev->queue_id;
1718 queue_info->sched_type = ev->sched_type;
1719 queue_info->priority = ev->priority;
1720 queue_info->wt = conf->servicing_weight;
1722 if (conf->rx_queue_flags &
1723 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) {
1724 queue_info->flow_id = ev->flow_id;
1725 queue_info->flow_id_mask = ~0;
1728 rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 1);
1729 if (rxa_polled_queue(dev_info, rx_queue_id)) {
1730 rx_adapter->num_rx_polled += !pollq;
1731 dev_info->nb_rx_poll += !pollq;
1732 rx_adapter->num_rx_intr -= intrq;
1733 dev_info->nb_rx_intr -= intrq;
1734 dev_info->nb_shared_intr -= intrq && sintrq;
1737 if (rxa_intr_queue(dev_info, rx_queue_id)) {
1738 rx_adapter->num_rx_polled -= pollq;
1739 dev_info->nb_rx_poll -= pollq;
1740 rx_adapter->num_rx_intr += !intrq;
1741 dev_info->nb_rx_intr += !intrq;
1742 dev_info->nb_shared_intr += !intrq && sintrq;
1743 if (dev_info->nb_shared_intr == 1) {
1744 if (dev_info->multi_intr_cap)
1745 dev_info->next_q_idx =
1746 RTE_MAX_RXTX_INTR_VEC_ID - 1;
1748 dev_info->next_q_idx = 0;
1753 static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter,
1754 uint16_t eth_dev_id,
1756 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
1758 struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id];
1759 struct rte_event_eth_rx_adapter_queue_conf temp_conf;
1761 struct eth_rx_poll_entry *rx_poll;
1762 struct eth_rx_queue_info *rx_queue;
1764 uint16_t nb_rx_queues;
1765 uint32_t nb_rx_poll, nb_wrr;
1766 uint32_t nb_rx_intr;
1770 if (queue_conf->servicing_weight == 0) {
1771 struct rte_eth_dev_data *data = dev_info->dev->data;
1773 temp_conf = *queue_conf;
1774 if (!data->dev_conf.intr_conf.rxq) {
1775 /* If Rx interrupts are disabled set wt = 1 */
1776 temp_conf.servicing_weight = 1;
1778 queue_conf = &temp_conf;
1781 nb_rx_queues = dev_info->dev->data->nb_rx_queues;
1782 rx_queue = dev_info->rx_queue;
1783 wt = queue_conf->servicing_weight;
1785 if (dev_info->rx_queue == NULL) {
1786 dev_info->rx_queue =
1787 rte_zmalloc_socket(rx_adapter->mem_name,
1789 sizeof(struct eth_rx_queue_info), 0,
1790 rx_adapter->socket_id);
1791 if (dev_info->rx_queue == NULL)
1797 rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id,
1798 queue_conf->servicing_weight,
1799 &nb_rx_poll, &nb_rx_intr, &nb_wrr);
1801 if (dev_info->dev->intr_handle)
1802 dev_info->multi_intr_cap =
1803 rte_intr_cap_multiple(dev_info->dev->intr_handle);
1805 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
1808 goto err_free_rxqueue;
1811 num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1);
1813 ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec);
1815 goto err_free_rxqueue;
1817 ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id);
1819 goto err_free_rxqueue;
1823 if (rx_adapter->num_rx_intr > nb_rx_intr) {
1824 num_intr_vec = rxa_nb_intr_vect(dev_info,
1826 /* interrupt based queues are being converted to
1827 * poll mode queues, delete the interrupt configuration
1830 ret = rxa_del_intr_queue(rx_adapter,
1831 dev_info, rx_queue_id);
1833 goto err_free_rxqueue;
1837 if (nb_rx_intr == 0) {
1838 ret = rxa_free_intr_resources(rx_adapter);
1840 goto err_free_rxqueue;
1846 if (rx_queue_id == -1) {
1847 for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++)
1848 dev_info->intr_queue[i] = i;
1850 if (!rxa_intr_queue(dev_info, rx_queue_id))
1851 dev_info->intr_queue[nb_rx_intr - 1] =
1858 rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf);
1859 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
1861 rte_free(rx_adapter->eth_rx_poll);
1862 rte_free(rx_adapter->wrr_sched);
1864 rx_adapter->eth_rx_poll = rx_poll;
1865 rx_adapter->wrr_sched = rx_wrr;
1866 rx_adapter->wrr_len = nb_wrr;
1867 rx_adapter->num_intr_vec += num_intr_vec;
1871 if (rx_queue == NULL) {
1872 rte_free(dev_info->rx_queue);
1873 dev_info->rx_queue = NULL;
1883 rxa_ctrl(uint8_t id, int start)
1885 struct rte_event_eth_rx_adapter *rx_adapter;
1886 struct rte_eventdev *dev;
1887 struct eth_device_info *dev_info;
1889 int use_service = 0;
1892 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1893 rx_adapter = rxa_id_to_adapter(id);
1894 if (rx_adapter == NULL)
1897 dev = &rte_eventdevs[rx_adapter->eventdev_id];
1899 RTE_ETH_FOREACH_DEV(i) {
1900 dev_info = &rx_adapter->eth_devices[i];
1901 /* if start check for num dev queues */
1902 if (start && !dev_info->nb_dev_queues)
1904 /* if stop check if dev has been started */
1905 if (stop && !dev_info->dev_rx_started)
1907 use_service |= !dev_info->internal_event_port;
1908 dev_info->dev_rx_started = start;
1909 if (dev_info->internal_event_port == 0)
1911 start ? (*dev->dev_ops->eth_rx_adapter_start)(dev,
1912 &rte_eth_devices[i]) :
1913 (*dev->dev_ops->eth_rx_adapter_stop)(dev,
1914 &rte_eth_devices[i]);
1918 rte_spinlock_lock(&rx_adapter->rx_lock);
1919 rx_adapter->rxa_started = start;
1920 rte_service_runstate_set(rx_adapter->service_id, start);
1921 rte_spinlock_unlock(&rx_adapter->rx_lock);
1928 rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id,
1929 rte_event_eth_rx_adapter_conf_cb conf_cb,
1932 struct rte_event_eth_rx_adapter *rx_adapter;
1936 char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN];
1937 const uint8_t default_rss_key[] = {
1938 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2,
1939 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0,
1940 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4,
1941 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c,
1942 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa,
1945 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
1946 RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL);
1947 if (conf_cb == NULL)
1950 if (event_eth_rx_adapter == NULL) {
1951 ret = rte_event_eth_rx_adapter_init();
1956 rx_adapter = rxa_id_to_adapter(id);
1957 if (rx_adapter != NULL) {
1958 RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id);
1962 socket_id = rte_event_dev_socket_id(dev_id);
1963 snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN,
1964 "rte_event_eth_rx_adapter_%d",
1967 rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter),
1968 RTE_CACHE_LINE_SIZE, socket_id);
1969 if (rx_adapter == NULL) {
1970 RTE_EDEV_LOG_ERR("failed to get mem for rx adapter");
1974 rx_adapter->eventdev_id = dev_id;
1975 rx_adapter->socket_id = socket_id;
1976 rx_adapter->conf_cb = conf_cb;
1977 rx_adapter->conf_arg = conf_arg;
1978 rx_adapter->id = id;
1979 strcpy(rx_adapter->mem_name, mem_name);
1980 rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name,
1981 /* FIXME: incompatible with hotplug */
1982 rte_eth_dev_count_total() *
1983 sizeof(struct eth_device_info), 0,
1985 rte_convert_rss_key((const uint32_t *)default_rss_key,
1986 (uint32_t *)rx_adapter->rss_key_be,
1987 RTE_DIM(default_rss_key));
1989 if (rx_adapter->eth_devices == NULL) {
1990 RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n");
1991 rte_free(rx_adapter);
1994 rte_spinlock_init(&rx_adapter->rx_lock);
1995 RTE_ETH_FOREACH_DEV(i)
1996 rx_adapter->eth_devices[i].dev = &rte_eth_devices[i];
1998 event_eth_rx_adapter[id] = rx_adapter;
1999 if (conf_cb == rxa_default_conf_cb)
2000 rx_adapter->default_cb_arg = 1;
2005 rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id,
2006 struct rte_event_port_conf *port_config)
2008 struct rte_event_port_conf *pc;
2011 if (port_config == NULL)
2013 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2015 pc = rte_malloc(NULL, sizeof(*pc), 0);
2019 ret = rte_event_eth_rx_adapter_create_ext(id, dev_id,
2020 rxa_default_conf_cb,
2028 rte_event_eth_rx_adapter_free(uint8_t id)
2030 struct rte_event_eth_rx_adapter *rx_adapter;
2032 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2034 rx_adapter = rxa_id_to_adapter(id);
2035 if (rx_adapter == NULL)
2038 if (rx_adapter->nb_queues) {
2039 RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted",
2040 rx_adapter->nb_queues);
2044 if (rx_adapter->default_cb_arg)
2045 rte_free(rx_adapter->conf_arg);
2046 rte_free(rx_adapter->eth_devices);
2047 rte_free(rx_adapter);
2048 event_eth_rx_adapter[id] = NULL;
2054 rte_event_eth_rx_adapter_queue_add(uint8_t id,
2055 uint16_t eth_dev_id,
2056 int32_t rx_queue_id,
2057 const struct rte_event_eth_rx_adapter_queue_conf *queue_conf)
2061 struct rte_event_eth_rx_adapter *rx_adapter;
2062 struct rte_eventdev *dev;
2063 struct eth_device_info *dev_info;
2065 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2066 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2068 rx_adapter = rxa_id_to_adapter(id);
2069 if ((rx_adapter == NULL) || (queue_conf == NULL))
2072 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2073 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2077 RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8
2078 "eth port %" PRIu16, id, eth_dev_id);
2082 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0
2083 && (queue_conf->rx_queue_flags &
2084 RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) {
2085 RTE_EDEV_LOG_ERR("Flow ID override is not supported,"
2086 " eth port: %" PRIu16 " adapter id: %" PRIu8,
2091 if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 &&
2092 (rx_queue_id != -1)) {
2093 RTE_EDEV_LOG_ERR("Rx queues can only be connected to single "
2094 "event queue, eth port: %" PRIu16 " adapter id: %"
2095 PRIu8, eth_dev_id, id);
2099 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2100 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2101 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2102 (uint16_t)rx_queue_id);
2106 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2108 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2109 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add,
2111 if (dev_info->rx_queue == NULL) {
2112 dev_info->rx_queue =
2113 rte_zmalloc_socket(rx_adapter->mem_name,
2114 dev_info->dev->data->nb_rx_queues *
2115 sizeof(struct eth_rx_queue_info), 0,
2116 rx_adapter->socket_id);
2117 if (dev_info->rx_queue == NULL)
2121 ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev,
2122 &rte_eth_devices[eth_dev_id],
2123 rx_queue_id, queue_conf);
2125 dev_info->internal_event_port = 1;
2126 rxa_update_queue(rx_adapter,
2127 &rx_adapter->eth_devices[eth_dev_id],
2132 rte_spinlock_lock(&rx_adapter->rx_lock);
2133 dev_info->internal_event_port = 0;
2134 ret = rxa_init_service(rx_adapter, id);
2136 uint32_t service_id = rx_adapter->service_id;
2137 ret = rxa_sw_add(rx_adapter, eth_dev_id, rx_queue_id,
2139 rte_service_component_runstate_set(service_id,
2140 rxa_sw_adapter_queue_count(rx_adapter));
2142 rte_spinlock_unlock(&rx_adapter->rx_lock);
2152 rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id,
2153 int32_t rx_queue_id)
2156 struct rte_eventdev *dev;
2157 struct rte_event_eth_rx_adapter *rx_adapter;
2158 struct eth_device_info *dev_info;
2160 uint32_t nb_rx_poll = 0;
2161 uint32_t nb_wrr = 0;
2162 uint32_t nb_rx_intr;
2163 struct eth_rx_poll_entry *rx_poll = NULL;
2164 uint32_t *rx_wrr = NULL;
2167 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2168 RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL);
2170 rx_adapter = rxa_id_to_adapter(id);
2171 if (rx_adapter == NULL)
2174 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2175 ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id,
2181 if (rx_queue_id != -1 && (uint16_t)rx_queue_id >=
2182 rte_eth_devices[eth_dev_id].data->nb_rx_queues) {
2183 RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16,
2184 (uint16_t)rx_queue_id);
2188 dev_info = &rx_adapter->eth_devices[eth_dev_id];
2190 if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) {
2191 RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del,
2193 ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev,
2194 &rte_eth_devices[eth_dev_id],
2197 rxa_update_queue(rx_adapter,
2198 &rx_adapter->eth_devices[eth_dev_id],
2201 if (dev_info->nb_dev_queues == 0) {
2202 rte_free(dev_info->rx_queue);
2203 dev_info->rx_queue = NULL;
2207 rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id,
2208 &nb_rx_poll, &nb_rx_intr, &nb_wrr);
2210 ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr,
2215 rte_spinlock_lock(&rx_adapter->rx_lock);
2218 if (rx_adapter->num_rx_intr > nb_rx_intr) {
2220 num_intr_vec = rxa_nb_intr_vect(dev_info,
2222 ret = rxa_del_intr_queue(rx_adapter, dev_info,
2228 if (nb_rx_intr == 0) {
2229 ret = rxa_free_intr_resources(rx_adapter);
2234 rxa_sw_del(rx_adapter, dev_info, rx_queue_id);
2235 rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr);
2237 rte_free(rx_adapter->eth_rx_poll);
2238 rte_free(rx_adapter->wrr_sched);
2240 if (nb_rx_intr == 0) {
2241 rte_free(dev_info->intr_queue);
2242 dev_info->intr_queue = NULL;
2245 rx_adapter->eth_rx_poll = rx_poll;
2246 rx_adapter->wrr_sched = rx_wrr;
2247 rx_adapter->wrr_len = nb_wrr;
2248 rx_adapter->num_intr_vec += num_intr_vec;
2250 if (dev_info->nb_dev_queues == 0) {
2251 rte_free(dev_info->rx_queue);
2252 dev_info->rx_queue = NULL;
2255 rte_spinlock_unlock(&rx_adapter->rx_lock);
2262 rte_service_component_runstate_set(rx_adapter->service_id,
2263 rxa_sw_adapter_queue_count(rx_adapter));
2270 rte_event_eth_rx_adapter_start(uint8_t id)
2272 return rxa_ctrl(id, 1);
2276 rte_event_eth_rx_adapter_stop(uint8_t id)
2278 return rxa_ctrl(id, 0);
2282 rte_event_eth_rx_adapter_stats_get(uint8_t id,
2283 struct rte_event_eth_rx_adapter_stats *stats)
2285 struct rte_event_eth_rx_adapter *rx_adapter;
2286 struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 };
2287 struct rte_event_eth_rx_adapter_stats dev_stats;
2288 struct rte_eventdev *dev;
2289 struct eth_device_info *dev_info;
2293 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2295 rx_adapter = rxa_id_to_adapter(id);
2296 if (rx_adapter == NULL || stats == NULL)
2299 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2300 memset(stats, 0, sizeof(*stats));
2301 RTE_ETH_FOREACH_DEV(i) {
2302 dev_info = &rx_adapter->eth_devices[i];
2303 if (dev_info->internal_event_port == 0 ||
2304 dev->dev_ops->eth_rx_adapter_stats_get == NULL)
2306 ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev,
2307 &rte_eth_devices[i],
2311 dev_stats_sum.rx_packets += dev_stats.rx_packets;
2312 dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count;
2315 if (rx_adapter->service_inited)
2316 *stats = rx_adapter->stats;
2318 stats->rx_packets += dev_stats_sum.rx_packets;
2319 stats->rx_enq_count += dev_stats_sum.rx_enq_count;
2324 rte_event_eth_rx_adapter_stats_reset(uint8_t id)
2326 struct rte_event_eth_rx_adapter *rx_adapter;
2327 struct rte_eventdev *dev;
2328 struct eth_device_info *dev_info;
2331 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2333 rx_adapter = rxa_id_to_adapter(id);
2334 if (rx_adapter == NULL)
2337 dev = &rte_eventdevs[rx_adapter->eventdev_id];
2338 RTE_ETH_FOREACH_DEV(i) {
2339 dev_info = &rx_adapter->eth_devices[i];
2340 if (dev_info->internal_event_port == 0 ||
2341 dev->dev_ops->eth_rx_adapter_stats_reset == NULL)
2343 (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev,
2344 &rte_eth_devices[i]);
2347 memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats));
2352 rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id)
2354 struct rte_event_eth_rx_adapter *rx_adapter;
2356 RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL);
2358 rx_adapter = rxa_id_to_adapter(id);
2359 if (rx_adapter == NULL || service_id == NULL)
2362 if (rx_adapter->service_inited)
2363 *service_id = rx_adapter->service_id;
2365 return rx_adapter->service_inited ? 0 : -ESRCH;