From: Nikhil Rao Date: Mon, 2 Jul 2018 09:11:13 +0000 (+0530) Subject: eventdev: add interrupt driven queues to Rx adapter X-Git-Url: http://git.droids-corp.org/?a=commitdiff_plain;h=3810ae435783;p=dpdk.git eventdev: add interrupt driven queues to Rx adapter Add support for interrupt driven queues when eth device is configured for rxq interrupts and servicing weight for the queue is configured to be zero. A interrupt driven packet received counter has been added to rte_event_eth_rx_adapter_stats. Signed-off-by: Nikhil Rao --- diff --git a/config/common_base b/config/common_base index e4241db169..c305a7725c 100644 --- a/config/common_base +++ b/config/common_base @@ -575,6 +575,7 @@ CONFIG_RTE_LIBRTE_EVENTDEV_DEBUG=n CONFIG_RTE_EVENT_MAX_DEVS=16 CONFIG_RTE_EVENT_MAX_QUEUES_PER_DEV=64 CONFIG_RTE_EVENT_TIMER_ADAPTER_NUM_MAX=32 +CONFIG_RTE_EVENT_ETH_INTR_RING_SIZE=1024 CONFIG_RTE_EVENT_CRYPTO_ADAPTER_MAX_INSTANCE=32 # diff --git a/config/rte_config.h b/config/rte_config.h index 8a3432acaa..28f04b41da 100644 --- a/config/rte_config.h +++ b/config/rte_config.h @@ -64,6 +64,7 @@ #define RTE_EVENT_MAX_DEVS 16 #define RTE_EVENT_MAX_QUEUES_PER_DEV 64 #define RTE_EVENT_TIMER_ADAPTER_NUM_MAX 32 +#define RTE_EVENT_ETH_INTR_RING_SIZE 1024 #define RTE_EVENT_CRYPTO_ADAPTER_MAX_INSTANCE 32 /* rawdev defines */ diff --git a/doc/guides/prog_guide/event_ethernet_rx_adapter.rst b/doc/guides/prog_guide/event_ethernet_rx_adapter.rst index 319e4f0557..810dfc9d95 100644 --- a/doc/guides/prog_guide/event_ethernet_rx_adapter.rst +++ b/doc/guides/prog_guide/event_ethernet_rx_adapter.rst @@ -144,3 +144,27 @@ enqueued event counts are a sum of the counts from the eventdev PMD callbacks if the callback is supported, and the counts maintained by the service function, if one exists. The service function also maintains a count of cycles for which it was not able to enqueue to the event device. + +Interrupt Based Rx Queues +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The service core function is typically set up to poll ethernet Rx queues for +packets. Certain queues may have low packet rates and it would be more +efficient to enable the Rx queue interrupt and read packets after receiving +the interrupt. + +The servicing_weight member of struct rte_event_eth_rx_adapter_queue_conf +is applicable when the adapter uses a service core function. The application +has to enable Rx queue interrupts when configuring the ethernet device +using the ``rte_eth_dev_configure()`` function and then use a servicing_weight +of zero when addding the Rx queue to the adapter. + +The adapter creates a thread blocked on the interrupt, on an interrupt this +thread enqueues the port id and the queue id to a ring buffer. The adapter +service function dequeues the port id and queue id from the ring buffer, +invokes the ``rte_eth_rx_burst()`` to receive packets on the queue and +converts the received packets to events in the same manner as packets +received on a polled Rx queue. The interrupt thread is affinitized to the same +CPUs as the lcores of the Rx adapter service function, if the Rx adapter +service function has not been mapped to any lcores, the interrupt thread +is mapped to the master lcore. diff --git a/lib/librte_eventdev/Makefile b/lib/librte_eventdev/Makefile index b3e25464b5..47f599a6bf 100644 --- a/lib/librte_eventdev/Makefile +++ b/lib/librte_eventdev/Makefile @@ -8,14 +8,19 @@ include $(RTE_SDK)/mk/rte.vars.mk LIB = librte_eventdev.a # library version -LIBABIVER := 4 +LIBABIVER := 5 # build flags CFLAGS += -DALLOW_EXPERIMENTAL_API CFLAGS += -O3 CFLAGS += $(WERROR_FLAGS) +ifeq ($(CONFIG_RTE_EXEC_ENV_LINUXAPP),y) +CFLAGS += -DLINUX +else +CFLAGS += -DBSD +endif LDLIBS += -lrte_eal -lrte_ring -lrte_ethdev -lrte_hash -lrte_mempool -lrte_timer -LDLIBS += -lrte_mbuf -lrte_cryptodev +LDLIBS += -lrte_mbuf -lrte_cryptodev -lpthread # library source files SRCS-y += rte_eventdev.c diff --git a/lib/librte_eventdev/meson.build b/lib/librte_eventdev/meson.build index bd138bd571..3cbaf2982d 100644 --- a/lib/librte_eventdev/meson.build +++ b/lib/librte_eventdev/meson.build @@ -1,8 +1,15 @@ # SPDX-License-Identifier: BSD-3-Clause # Copyright(c) 2017 Intel Corporation -version = 4 +version = 5 allow_experimental_apis = true + +if host_machine.system() == 'linux' + cflags += '-DLINUX' +else + cflags += '-DBSD' +endif + sources = files('rte_eventdev.c', 'rte_event_ring.c', 'rte_event_eth_rx_adapter.c', diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c b/lib/librte_eventdev/rte_event_eth_rx_adapter.c index 8fe037f16b..42dd7f80ef 100644 --- a/lib/librte_eventdev/rte_event_eth_rx_adapter.c +++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c @@ -2,6 +2,11 @@ * Copyright(c) 2017 Intel Corporation. * All rights reserved. */ +#if defined(LINUX) +#include +#endif +#include + #include #include #include @@ -11,6 +16,7 @@ #include #include #include +#include #include "rte_eventdev.h" #include "rte_eventdev_pmd.h" @@ -24,6 +30,22 @@ #define ETH_RX_ADAPTER_MEM_NAME_LEN 32 #define RSS_KEY_SIZE 40 +/* value written to intr thread pipe to signal thread exit */ +#define ETH_BRIDGE_INTR_THREAD_EXIT 1 +/* Sentinel value to detect initialized file handle */ +#define INIT_FD -1 + +/* + * Used to store port and queue ID of interrupting Rx queue + */ +union queue_data { + RTE_STD_C11 + void *ptr; + struct { + uint16_t port; + uint16_t queue; + }; +}; /* * There is an instance of this struct per polled Rx queue added to the @@ -75,6 +97,30 @@ struct rte_event_eth_rx_adapter { uint16_t enq_block_count; /* Block start ts */ uint64_t rx_enq_block_start_ts; + /* epoll fd used to wait for Rx interrupts */ + int epd; + /* Num of interrupt driven interrupt queues */ + uint32_t num_rx_intr; + /* Used to send of interrupting Rx queues from + * the interrupt thread to the Rx thread + */ + struct rte_ring *intr_ring; + /* Rx Queue data (dev id, queue id) for the last non-empty + * queue polled + */ + union queue_data qd; + /* queue_data is valid */ + int qd_valid; + /* Interrupt ring lock, synchronizes Rx thread + * and interrupt thread + */ + rte_spinlock_t intr_ring_lock; + /* event array passed to rte_poll_wait */ + struct rte_epoll_event *epoll_events; + /* Count of interrupt vectors in use */ + uint32_t num_intr_vec; + /* Thread blocked on Rx interrupts */ + pthread_t rx_intr_thread; /* Configuration callback for rte_service configuration */ rte_event_eth_rx_adapter_conf_cb conf_cb; /* Configuration callback argument */ @@ -93,6 +139,8 @@ struct rte_event_eth_rx_adapter { uint32_t service_id; /* Adapter started flag */ uint8_t rxa_started; + /* Adapter ID */ + uint8_t id; } __rte_cache_aligned; /* Per eth device */ @@ -111,19 +159,40 @@ struct eth_device_info { uint8_t dev_rx_started; /* Number of queues added for this device */ uint16_t nb_dev_queues; - /* If nb_rx_poll > 0, the start callback will + /* Number of poll based queues + * If nb_rx_poll > 0, the start callback will * be invoked if not already invoked */ uint16_t nb_rx_poll; + /* Number of interrupt based queues + * If nb_rx_intr > 0, the start callback will + * be invoked if not already invoked. + */ + uint16_t nb_rx_intr; + /* Number of queues that use the shared interrupt */ + uint16_t nb_shared_intr; /* sum(wrr(q)) for all queues within the device * useful when deleting all device queues */ uint32_t wrr_len; + /* Intr based queue index to start polling from, this is used + * if the number of shared interrupts is non-zero + */ + uint16_t next_q_idx; + /* Intr based queue indices */ + uint16_t *intr_queue; + /* device generates per Rx queue interrupt for queue index + * for queue indices < RTE_MAX_RXTX_INTR_VEC_ID - 1 + */ + int multi_intr_cap; + /* shared interrupt enabled */ + int shared_intr_enabled; }; /* Per Rx queue */ struct eth_rx_queue_info { int queue_enabled; /* True if added */ + int intr_enabled; uint16_t wt; /* Polling weight */ uint8_t event_queue_id; /* Event queue to enqueue packets to */ uint8_t sched_type; /* Sched type for events */ @@ -150,7 +219,7 @@ rxa_validate_id(uint8_t id) static inline int rxa_sw_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter) { - return rx_adapter->num_rx_polled; + return rx_adapter->num_rx_polled + rx_adapter->num_rx_intr; } /* Greatest common divisor */ @@ -194,6 +263,32 @@ rxa_wrr_next(struct rte_event_eth_rx_adapter *rx_adapter, } } +static inline int +rxa_shared_intr(struct eth_device_info *dev_info, + int rx_queue_id) +{ + int multi_intr_cap; + + if (dev_info->dev->intr_handle == NULL) + return 0; + + multi_intr_cap = rte_intr_cap_multiple(dev_info->dev->intr_handle); + return !multi_intr_cap || + rx_queue_id >= RTE_MAX_RXTX_INTR_VEC_ID - 1; +} + +static inline int +rxa_intr_queue(struct eth_device_info *dev_info, + int rx_queue_id) +{ + struct eth_rx_queue_info *queue_info; + + queue_info = &dev_info->rx_queue[rx_queue_id]; + return dev_info->rx_queue && + !dev_info->internal_event_port && + queue_info->queue_enabled && queue_info->wt == 0; +} + static inline int rxa_polled_queue(struct eth_device_info *dev_info, int rx_queue_id) @@ -206,6 +301,95 @@ rxa_polled_queue(struct eth_device_info *dev_info, queue_info->queue_enabled && queue_info->wt != 0; } +/* Calculate change in number of vectors after Rx queue ID is add/deleted */ +static int +rxa_nb_intr_vect(struct eth_device_info *dev_info, int rx_queue_id, int add) +{ + uint16_t i; + int n, s; + uint16_t nbq; + + nbq = dev_info->dev->data->nb_rx_queues; + n = 0; /* non shared count */ + s = 0; /* shared count */ + + if (rx_queue_id == -1) { + for (i = 0; i < nbq; i++) { + if (!rxa_shared_intr(dev_info, i)) + n += add ? !rxa_intr_queue(dev_info, i) : + rxa_intr_queue(dev_info, i); + else + s += add ? !rxa_intr_queue(dev_info, i) : + rxa_intr_queue(dev_info, i); + } + + if (s > 0) { + if ((add && dev_info->nb_shared_intr == 0) || + (!add && dev_info->nb_shared_intr)) + n += 1; + } + } else { + if (!rxa_shared_intr(dev_info, rx_queue_id)) + n = add ? !rxa_intr_queue(dev_info, rx_queue_id) : + rxa_intr_queue(dev_info, rx_queue_id); + else + n = add ? !dev_info->nb_shared_intr : + dev_info->nb_shared_intr == 1; + } + + return add ? n : -n; +} + +/* Calculate nb_rx_intr after deleting interrupt mode rx queues + */ +static void +rxa_calc_nb_post_intr_del(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + int rx_queue_id, + uint32_t *nb_rx_intr) +{ + uint32_t intr_diff; + + if (rx_queue_id == -1) + intr_diff = dev_info->nb_rx_intr; + else + intr_diff = rxa_intr_queue(dev_info, rx_queue_id); + + *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff; +} + +/* Calculate nb_rx_* after adding interrupt mode rx queues, newly added + * interrupt queues could currently be poll mode Rx queues + */ +static void +rxa_calc_nb_post_add_intr(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + int rx_queue_id, + uint32_t *nb_rx_poll, + uint32_t *nb_rx_intr, + uint32_t *nb_wrr) +{ + uint32_t intr_diff; + uint32_t poll_diff; + uint32_t wrr_len_diff; + + if (rx_queue_id == -1) { + intr_diff = dev_info->dev->data->nb_rx_queues - + dev_info->nb_rx_intr; + poll_diff = dev_info->nb_rx_poll; + wrr_len_diff = dev_info->wrr_len; + } else { + intr_diff = !rxa_intr_queue(dev_info, rx_queue_id); + poll_diff = rxa_polled_queue(dev_info, rx_queue_id); + wrr_len_diff = poll_diff ? dev_info->rx_queue[rx_queue_id].wt : + 0; + } + + *nb_rx_intr = rx_adapter->num_rx_intr + intr_diff; + *nb_rx_poll = rx_adapter->num_rx_polled - poll_diff; + *nb_wrr = rx_adapter->wrr_len - wrr_len_diff; +} + /* Calculate size of the eth_rx_poll and wrr_sched arrays * after deleting poll mode rx queues */ @@ -240,17 +424,21 @@ rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter, int rx_queue_id, uint16_t wt, uint32_t *nb_rx_poll, + uint32_t *nb_rx_intr, uint32_t *nb_wrr) { + uint32_t intr_diff; uint32_t poll_diff; uint32_t wrr_len_diff; if (rx_queue_id == -1) { + intr_diff = dev_info->nb_rx_intr; poll_diff = dev_info->dev->data->nb_rx_queues - dev_info->nb_rx_poll; wrr_len_diff = wt*dev_info->dev->data->nb_rx_queues - dev_info->wrr_len; } else { + intr_diff = rxa_intr_queue(dev_info, rx_queue_id); poll_diff = !rxa_polled_queue(dev_info, rx_queue_id); wrr_len_diff = rxa_polled_queue(dev_info, rx_queue_id) ? wt - dev_info->rx_queue[rx_queue_id].wt : @@ -258,6 +446,7 @@ rxa_calc_nb_post_add_poll(struct rte_event_eth_rx_adapter *rx_adapter, } *nb_rx_poll = rx_adapter->num_rx_polled + poll_diff; + *nb_rx_intr = rx_adapter->num_rx_intr - intr_diff; *nb_wrr = rx_adapter->wrr_len + wrr_len_diff; } @@ -268,10 +457,15 @@ rxa_calc_nb_post_add(struct rte_event_eth_rx_adapter *rx_adapter, int rx_queue_id, uint16_t wt, uint32_t *nb_rx_poll, + uint32_t *nb_rx_intr, uint32_t *nb_wrr) { - rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id, - wt, nb_rx_poll, nb_wrr); + if (wt != 0) + rxa_calc_nb_post_add_poll(rx_adapter, dev_info, rx_queue_id, + wt, nb_rx_poll, nb_rx_intr, nb_wrr); + else + rxa_calc_nb_post_add_intr(rx_adapter, dev_info, rx_queue_id, + nb_rx_poll, nb_rx_intr, nb_wrr); } /* Calculate nb_rx_* after deleting rx_queue_id */ @@ -280,10 +474,13 @@ rxa_calc_nb_post_del(struct rte_event_eth_rx_adapter *rx_adapter, struct eth_device_info *dev_info, int rx_queue_id, uint32_t *nb_rx_poll, + uint32_t *nb_rx_intr, uint32_t *nb_wrr) { rxa_calc_nb_post_poll_del(rx_adapter, dev_info, rx_queue_id, nb_rx_poll, nb_wrr); + rxa_calc_nb_post_intr_del(rx_adapter, dev_info, rx_queue_id, + nb_rx_intr); } /* @@ -622,7 +819,8 @@ rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter, uint16_t port_id, uint16_t queue_id, uint32_t rx_count, - uint32_t max_rx) + uint32_t max_rx, + int *rxq_empty) { struct rte_mbuf *mbufs[BATCH_SIZE]; struct rte_eth_event_enqueue_buffer *buf = @@ -632,6 +830,8 @@ rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter, uint16_t n; uint32_t nb_rx = 0; + if (rxq_empty) + *rxq_empty = 0; /* Don't do a batch dequeue from the rx queue if there isn't * enough space in the enqueue buffer. */ @@ -641,8 +841,11 @@ rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter, stats->rx_poll_count++; n = rte_eth_rx_burst(port_id, queue_id, mbufs, BATCH_SIZE); - if (unlikely(!n)) + if (unlikely(!n)) { + if (rxq_empty) + *rxq_empty = 1; break; + } rxa_buffer_mbufs(rx_adapter, port_id, queue_id, mbufs, n); nb_rx += n; if (rx_count + nb_rx > max_rx) @@ -655,6 +858,228 @@ rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter, return nb_rx; } +static inline void +rxa_intr_ring_enqueue(struct rte_event_eth_rx_adapter *rx_adapter, + void *data) +{ + uint16_t port_id; + uint16_t queue; + int err; + union queue_data qd; + struct eth_device_info *dev_info; + struct eth_rx_queue_info *queue_info; + int *intr_enabled; + + qd.ptr = data; + port_id = qd.port; + queue = qd.queue; + + dev_info = &rx_adapter->eth_devices[port_id]; + queue_info = &dev_info->rx_queue[queue]; + rte_spinlock_lock(&rx_adapter->intr_ring_lock); + if (rxa_shared_intr(dev_info, queue)) + intr_enabled = &dev_info->shared_intr_enabled; + else + intr_enabled = &queue_info->intr_enabled; + + if (*intr_enabled) { + *intr_enabled = 0; + err = rte_ring_enqueue(rx_adapter->intr_ring, data); + /* Entry should always be available. + * The ring size equals the maximum number of interrupt + * vectors supported (an interrupt vector is shared in + * case of shared interrupts) + */ + if (err) + RTE_EDEV_LOG_ERR("Failed to enqueue interrupt" + " to ring: %s", strerror(err)); + else + rte_eth_dev_rx_intr_disable(port_id, queue); + } + rte_spinlock_unlock(&rx_adapter->intr_ring_lock); +} + +static int +rxa_intr_ring_check_avail(struct rte_event_eth_rx_adapter *rx_adapter, + uint32_t num_intr_vec) +{ + if (rx_adapter->num_intr_vec + num_intr_vec > + RTE_EVENT_ETH_INTR_RING_SIZE) { + RTE_EDEV_LOG_ERR("Exceeded intr ring slots current" + " %d needed %d limit %d", rx_adapter->num_intr_vec, + num_intr_vec, RTE_EVENT_ETH_INTR_RING_SIZE); + return -ENOSPC; + } + + return 0; +} + +/* Delete entries for (dev, queue) from the interrupt ring */ +static void +rxa_intr_ring_del_entries(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + uint16_t rx_queue_id) +{ + int i, n; + union queue_data qd; + + rte_spinlock_lock(&rx_adapter->intr_ring_lock); + + n = rte_ring_count(rx_adapter->intr_ring); + for (i = 0; i < n; i++) { + rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr); + if (!rxa_shared_intr(dev_info, rx_queue_id)) { + if (qd.port == dev_info->dev->data->port_id && + qd.queue == rx_queue_id) + continue; + } else { + if (qd.port == dev_info->dev->data->port_id) + continue; + } + rte_ring_enqueue(rx_adapter->intr_ring, qd.ptr); + } + + rte_spinlock_unlock(&rx_adapter->intr_ring_lock); +} + +/* pthread callback handling interrupt mode receive queues + * After receiving an Rx interrupt, it enqueues the port id and queue id of the + * interrupting queue to the adapter's ring buffer for interrupt events. + * These events are picked up by rxa_intr_ring_dequeue() which is invoked from + * the adapter service function. + */ +static void * +rxa_intr_thread(void *arg) +{ + struct rte_event_eth_rx_adapter *rx_adapter = arg; + struct rte_epoll_event *epoll_events = rx_adapter->epoll_events; + int n, i; + + while (1) { + n = rte_epoll_wait(rx_adapter->epd, epoll_events, + RTE_EVENT_ETH_INTR_RING_SIZE, -1); + if (unlikely(n < 0)) + RTE_EDEV_LOG_ERR("rte_epoll_wait returned error %d", + n); + for (i = 0; i < n; i++) { + rxa_intr_ring_enqueue(rx_adapter, + epoll_events[i].epdata.data); + } + } + + return NULL; +} + +/* Dequeue from interrupt ring and enqueue received + * mbufs to eventdev + */ +static inline uint32_t +rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter) +{ + uint32_t n; + uint32_t nb_rx = 0; + int rxq_empty; + struct rte_eth_event_enqueue_buffer *buf; + rte_spinlock_t *ring_lock; + uint8_t max_done = 0; + + if (rx_adapter->num_rx_intr == 0) + return 0; + + if (rte_ring_count(rx_adapter->intr_ring) == 0 + && !rx_adapter->qd_valid) + return 0; + + buf = &rx_adapter->event_enqueue_buffer; + ring_lock = &rx_adapter->intr_ring_lock; + + if (buf->count >= BATCH_SIZE) + rxa_flush_event_buffer(rx_adapter); + + while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) { + struct eth_device_info *dev_info; + uint16_t port; + uint16_t queue; + union queue_data qd = rx_adapter->qd; + int err; + + if (!rx_adapter->qd_valid) { + struct eth_rx_queue_info *queue_info; + + rte_spinlock_lock(ring_lock); + err = rte_ring_dequeue(rx_adapter->intr_ring, &qd.ptr); + if (err) { + rte_spinlock_unlock(ring_lock); + break; + } + + port = qd.port; + queue = qd.queue; + rx_adapter->qd = qd; + rx_adapter->qd_valid = 1; + dev_info = &rx_adapter->eth_devices[port]; + if (rxa_shared_intr(dev_info, queue)) + dev_info->shared_intr_enabled = 1; + else { + queue_info = &dev_info->rx_queue[queue]; + queue_info->intr_enabled = 1; + } + rte_eth_dev_rx_intr_enable(port, queue); + rte_spinlock_unlock(ring_lock); + } else { + port = qd.port; + queue = qd.queue; + + dev_info = &rx_adapter->eth_devices[port]; + } + + if (rxa_shared_intr(dev_info, queue)) { + uint16_t i; + uint16_t nb_queues; + + nb_queues = dev_info->dev->data->nb_rx_queues; + n = 0; + for (i = dev_info->next_q_idx; i < nb_queues; i++) { + uint8_t enq_buffer_full; + + if (!rxa_intr_queue(dev_info, i)) + continue; + n = rxa_eth_rx(rx_adapter, port, i, nb_rx, + rx_adapter->max_nb_rx, + &rxq_empty); + nb_rx += n; + + enq_buffer_full = !rxq_empty && n == 0; + max_done = nb_rx > rx_adapter->max_nb_rx; + + if (enq_buffer_full || max_done) { + dev_info->next_q_idx = i; + goto done; + } + } + + rx_adapter->qd_valid = 0; + + /* Reinitialize for next interrupt */ + dev_info->next_q_idx = dev_info->multi_intr_cap ? + RTE_MAX_RXTX_INTR_VEC_ID - 1 : + 0; + } else { + n = rxa_eth_rx(rx_adapter, port, queue, nb_rx, + rx_adapter->max_nb_rx, + &rxq_empty); + rx_adapter->qd_valid = !rxq_empty; + nb_rx += n; + if (nb_rx > rx_adapter->max_nb_rx) + break; + } + } + +done: + rx_adapter->stats.rx_intr_packets += nb_rx; + return nb_rx; +} + /* * Polls receive queues added to the event adapter and enqueues received * packets to the event device. @@ -668,7 +1093,7 @@ rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter, * the hypervisor's switching layer where adjustments can be made to deal with * it. */ -static inline void +static inline uint32_t rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter) { uint32_t num_queue; @@ -676,7 +1101,6 @@ rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter) struct rte_eth_event_enqueue_buffer *buf; uint32_t wrr_pos; uint32_t max_nb_rx; - struct rte_event_eth_rx_adapter_stats *stats; wrr_pos = rx_adapter->wrr_pos; max_nb_rx = rx_adapter->max_nb_rx; @@ -696,10 +1120,11 @@ rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter) rxa_flush_event_buffer(rx_adapter); if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) { rx_adapter->wrr_pos = wrr_pos; - break; + return nb_rx; } - nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx); + nb_rx += rxa_eth_rx(rx_adapter, d, qid, nb_rx, max_nb_rx, + NULL); if (nb_rx > max_nb_rx) { rx_adapter->wrr_pos = (wrr_pos + 1) % rx_adapter->wrr_len; @@ -709,14 +1134,14 @@ rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter) if (++wrr_pos == rx_adapter->wrr_len) wrr_pos = 0; } - - stats->rx_packets += nb_rx; + return nb_rx; } static int rxa_service_func(void *args) { struct rte_event_eth_rx_adapter *rx_adapter = args; + struct rte_event_eth_rx_adapter_stats *stats; if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0) return 0; @@ -724,7 +1149,10 @@ rxa_service_func(void *args) return 0; rte_spinlock_unlock(&rx_adapter->rx_lock); } - rxa_poll(rx_adapter); + + stats = &rx_adapter->stats; + stats->rx_packets += rxa_intr_ring_dequeue(rx_adapter); + stats->rx_packets += rxa_poll(rx_adapter); rte_spinlock_unlock(&rx_adapter->rx_lock); return 0; } @@ -808,6 +1236,350 @@ rxa_default_conf_cb(uint8_t id, uint8_t dev_id, return ret; } +static int +rxa_epoll_create1(void) +{ +#if defined(LINUX) + int fd; + fd = epoll_create1(EPOLL_CLOEXEC); + return fd < 0 ? -errno : fd; +#elif defined(BSD) + return -ENOTSUP; +#endif +} + +static int +rxa_init_epd(struct rte_event_eth_rx_adapter *rx_adapter) +{ + if (rx_adapter->epd != INIT_FD) + return 0; + + rx_adapter->epd = rxa_epoll_create1(); + if (rx_adapter->epd < 0) { + int err = rx_adapter->epd; + rx_adapter->epd = INIT_FD; + RTE_EDEV_LOG_ERR("epoll_create1() failed, err %d", err); + return err; + } + + return 0; +} + +static int +rxa_create_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter) +{ + int err; + char thread_name[RTE_MAX_THREAD_NAME_LEN]; + + if (rx_adapter->intr_ring) + return 0; + + rx_adapter->intr_ring = rte_ring_create("intr_ring", + RTE_EVENT_ETH_INTR_RING_SIZE, + rte_socket_id(), 0); + if (!rx_adapter->intr_ring) + return -ENOMEM; + + rx_adapter->epoll_events = rte_zmalloc_socket(rx_adapter->mem_name, + RTE_EVENT_ETH_INTR_RING_SIZE * + sizeof(struct rte_epoll_event), + RTE_CACHE_LINE_SIZE, + rx_adapter->socket_id); + if (!rx_adapter->epoll_events) { + err = -ENOMEM; + goto error; + } + + rte_spinlock_init(&rx_adapter->intr_ring_lock); + + snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, + "rx-intr-thread-%d", rx_adapter->id); + + err = rte_ctrl_thread_create(&rx_adapter->rx_intr_thread, thread_name, + NULL, rxa_intr_thread, rx_adapter); + if (!err) { + rte_thread_setname(rx_adapter->rx_intr_thread, thread_name); + return 0; + } + + RTE_EDEV_LOG_ERR("Failed to create interrupt thread err = %d\n", err); +error: + rte_ring_free(rx_adapter->intr_ring); + rx_adapter->intr_ring = NULL; + rx_adapter->epoll_events = NULL; + return err; +} + +static int +rxa_destroy_intr_thread(struct rte_event_eth_rx_adapter *rx_adapter) +{ + int err; + + err = pthread_cancel(rx_adapter->rx_intr_thread); + if (err) + RTE_EDEV_LOG_ERR("Can't cancel interrupt thread err = %d\n", + err); + + err = pthread_join(rx_adapter->rx_intr_thread, NULL); + if (err) + RTE_EDEV_LOG_ERR("Can't join interrupt thread err = %d\n", err); + + rte_free(rx_adapter->epoll_events); + rte_ring_free(rx_adapter->intr_ring); + rx_adapter->intr_ring = NULL; + rx_adapter->epoll_events = NULL; + return 0; +} + +static int +rxa_free_intr_resources(struct rte_event_eth_rx_adapter *rx_adapter) +{ + int ret; + + if (rx_adapter->num_rx_intr == 0) + return 0; + + ret = rxa_destroy_intr_thread(rx_adapter); + if (ret) + return ret; + + close(rx_adapter->epd); + rx_adapter->epd = INIT_FD; + + return ret; +} + +static int +rxa_disable_intr(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + uint16_t rx_queue_id) +{ + int err; + uint16_t eth_dev_id = dev_info->dev->data->port_id; + int sintr = rxa_shared_intr(dev_info, rx_queue_id); + + err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id); + if (err) { + RTE_EDEV_LOG_ERR("Could not disable interrupt for Rx queue %u", + rx_queue_id); + return err; + } + + err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id, + rx_adapter->epd, + RTE_INTR_EVENT_DEL, + 0); + if (err) + RTE_EDEV_LOG_ERR("Interrupt event deletion failed %d", err); + + if (sintr) + dev_info->rx_queue[rx_queue_id].intr_enabled = 0; + else + dev_info->shared_intr_enabled = 0; + return err; +} + +static int +rxa_del_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + int rx_queue_id) +{ + int err; + int i; + int s; + + if (dev_info->nb_rx_intr == 0) + return 0; + + err = 0; + if (rx_queue_id == -1) { + s = dev_info->nb_shared_intr; + for (i = 0; i < dev_info->nb_rx_intr; i++) { + int sintr; + uint16_t q; + + q = dev_info->intr_queue[i]; + sintr = rxa_shared_intr(dev_info, q); + s -= sintr; + + if (!sintr || s == 0) { + + err = rxa_disable_intr(rx_adapter, dev_info, + q); + if (err) + return err; + rxa_intr_ring_del_entries(rx_adapter, dev_info, + q); + } + } + } else { + if (!rxa_intr_queue(dev_info, rx_queue_id)) + return 0; + if (!rxa_shared_intr(dev_info, rx_queue_id) || + dev_info->nb_shared_intr == 1) { + err = rxa_disable_intr(rx_adapter, dev_info, + rx_queue_id); + if (err) + return err; + rxa_intr_ring_del_entries(rx_adapter, dev_info, + rx_queue_id); + } + + for (i = 0; i < dev_info->nb_rx_intr; i++) { + if (dev_info->intr_queue[i] == rx_queue_id) { + for (; i < dev_info->nb_rx_intr - 1; i++) + dev_info->intr_queue[i] = + dev_info->intr_queue[i + 1]; + break; + } + } + } + + return err; +} + +static int +rxa_config_intr(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + uint16_t rx_queue_id) +{ + int err, err1; + uint16_t eth_dev_id = dev_info->dev->data->port_id; + union queue_data qd; + int init_fd; + uint16_t *intr_queue; + int sintr = rxa_shared_intr(dev_info, rx_queue_id); + + if (rxa_intr_queue(dev_info, rx_queue_id)) + return 0; + + intr_queue = dev_info->intr_queue; + if (dev_info->intr_queue == NULL) { + size_t len = + dev_info->dev->data->nb_rx_queues * sizeof(uint16_t); + dev_info->intr_queue = + rte_zmalloc_socket( + rx_adapter->mem_name, + len, + 0, + rx_adapter->socket_id); + if (dev_info->intr_queue == NULL) + return -ENOMEM; + } + + init_fd = rx_adapter->epd; + err = rxa_init_epd(rx_adapter); + if (err) + goto err_free_queue; + + qd.port = eth_dev_id; + qd.queue = rx_queue_id; + + err = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id, + rx_adapter->epd, + RTE_INTR_EVENT_ADD, + qd.ptr); + if (err) { + RTE_EDEV_LOG_ERR("Failed to add interrupt event for" + " Rx Queue %u err %d", rx_queue_id, err); + goto err_del_fd; + } + + err = rte_eth_dev_rx_intr_enable(eth_dev_id, rx_queue_id); + if (err) { + RTE_EDEV_LOG_ERR("Could not enable interrupt for" + " Rx Queue %u err %d", rx_queue_id, err); + + goto err_del_event; + } + + err = rxa_create_intr_thread(rx_adapter); + if (!err) { + if (sintr) + dev_info->shared_intr_enabled = 1; + else + dev_info->rx_queue[rx_queue_id].intr_enabled = 1; + return 0; + } + + + err = rte_eth_dev_rx_intr_disable(eth_dev_id, rx_queue_id); + if (err) + RTE_EDEV_LOG_ERR("Could not disable interrupt for" + " Rx Queue %u err %d", rx_queue_id, err); +err_del_event: + err1 = rte_eth_dev_rx_intr_ctl_q(eth_dev_id, rx_queue_id, + rx_adapter->epd, + RTE_INTR_EVENT_DEL, + 0); + if (err1) { + RTE_EDEV_LOG_ERR("Could not delete event for" + " Rx Queue %u err %d", rx_queue_id, err1); + } +err_del_fd: + if (init_fd == INIT_FD) { + close(rx_adapter->epd); + rx_adapter->epd = -1; + } +err_free_queue: + if (intr_queue == NULL) + rte_free(dev_info->intr_queue); + + return err; +} + +static int +rxa_add_intr_queue(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + int rx_queue_id) + +{ + int i, j, err; + int si = -1; + int shared_done = (dev_info->nb_shared_intr > 0); + + if (rx_queue_id != -1) { + if (rxa_shared_intr(dev_info, rx_queue_id) && shared_done) + return 0; + return rxa_config_intr(rx_adapter, dev_info, rx_queue_id); + } + + err = 0; + for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) { + + if (rxa_shared_intr(dev_info, i) && shared_done) + continue; + + err = rxa_config_intr(rx_adapter, dev_info, i); + + shared_done = err == 0 && rxa_shared_intr(dev_info, i); + if (shared_done) { + si = i; + dev_info->shared_intr_enabled = 1; + } + if (err) + break; + } + + if (err == 0) + return 0; + + shared_done = (dev_info->nb_shared_intr > 0); + for (j = 0; j < i; j++) { + if (rxa_intr_queue(dev_info, j)) + continue; + if (rxa_shared_intr(dev_info, j) && si != j) + continue; + err = rxa_disable_intr(rx_adapter, dev_info, j); + if (err) + break; + + } + + return err; +} + + static int rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id) { @@ -843,6 +1615,7 @@ rxa_init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id) rx_adapter->event_port_id = rx_adapter_conf.event_port_id; rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx; rx_adapter->service_inited = 1; + rx_adapter->epd = INIT_FD; return 0; err_done: @@ -886,6 +1659,9 @@ rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter, int32_t rx_queue_id) { int pollq; + int intrq; + int sintrq; + if (rx_adapter->nb_queues == 0) return; @@ -901,9 +1677,14 @@ rxa_sw_del(struct rte_event_eth_rx_adapter *rx_adapter, } pollq = rxa_polled_queue(dev_info, rx_queue_id); + intrq = rxa_intr_queue(dev_info, rx_queue_id); + sintrq = rxa_shared_intr(dev_info, rx_queue_id); rxa_update_queue(rx_adapter, dev_info, rx_queue_id, 0); rx_adapter->num_rx_polled -= pollq; dev_info->nb_rx_poll -= pollq; + rx_adapter->num_rx_intr -= intrq; + dev_info->nb_rx_intr -= intrq; + dev_info->nb_shared_intr -= intrq && sintrq; } static void @@ -915,6 +1696,8 @@ rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter, struct eth_rx_queue_info *queue_info; const struct rte_event *ev = &conf->ev; int pollq; + int intrq; + int sintrq; if (rx_queue_id == -1) { uint16_t nb_rx_queues; @@ -927,6 +1710,8 @@ rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter, } pollq = rxa_polled_queue(dev_info, rx_queue_id); + intrq = rxa_intr_queue(dev_info, rx_queue_id); + sintrq = rxa_shared_intr(dev_info, rx_queue_id); queue_info = &dev_info->rx_queue[rx_queue_id]; queue_info->event_queue_id = ev->queue_id; @@ -944,6 +1729,24 @@ rxa_add_queue(struct rte_event_eth_rx_adapter *rx_adapter, if (rxa_polled_queue(dev_info, rx_queue_id)) { rx_adapter->num_rx_polled += !pollq; dev_info->nb_rx_poll += !pollq; + rx_adapter->num_rx_intr -= intrq; + dev_info->nb_rx_intr -= intrq; + dev_info->nb_shared_intr -= intrq && sintrq; + } + + if (rxa_intr_queue(dev_info, rx_queue_id)) { + rx_adapter->num_rx_polled -= pollq; + dev_info->nb_rx_poll -= pollq; + rx_adapter->num_rx_intr += !intrq; + dev_info->nb_rx_intr += !intrq; + dev_info->nb_shared_intr += !intrq && sintrq; + if (dev_info->nb_shared_intr == 1) { + if (dev_info->multi_intr_cap) + dev_info->next_q_idx = + RTE_MAX_RXTX_INTR_VEC_ID - 1; + else + dev_info->next_q_idx = 0; + } } } @@ -960,24 +1763,24 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter, uint32_t *rx_wrr; uint16_t nb_rx_queues; uint32_t nb_rx_poll, nb_wrr; + uint32_t nb_rx_intr; + int num_intr_vec; + uint16_t wt; if (queue_conf->servicing_weight == 0) { - struct rte_eth_dev_data *data = dev_info->dev->data; - if (data->dev_conf.intr_conf.rxq) { - RTE_EDEV_LOG_ERR("Interrupt driven queues" - " not supported"); - return -ENOTSUP; - } - temp_conf = *queue_conf; - /* If Rx interrupts are disabled set wt = 1 */ - temp_conf.servicing_weight = 1; + temp_conf = *queue_conf; + if (!data->dev_conf.intr_conf.rxq) { + /* If Rx interrupts are disabled set wt = 1 */ + temp_conf.servicing_weight = 1; + } queue_conf = &temp_conf; } nb_rx_queues = dev_info->dev->data->nb_rx_queues; rx_queue = dev_info->rx_queue; + wt = queue_conf->servicing_weight; if (dev_info->rx_queue == NULL) { dev_info->rx_queue = @@ -993,13 +1796,65 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter, rxa_calc_nb_post_add(rx_adapter, dev_info, rx_queue_id, queue_conf->servicing_weight, - &nb_rx_poll, &nb_wrr); + &nb_rx_poll, &nb_rx_intr, &nb_wrr); + + if (dev_info->dev->intr_handle) + dev_info->multi_intr_cap = + rte_intr_cap_multiple(dev_info->dev->intr_handle); ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr, &rx_poll, &rx_wrr); if (ret) goto err_free_rxqueue; + if (wt == 0) { + num_intr_vec = rxa_nb_intr_vect(dev_info, rx_queue_id, 1); + + ret = rxa_intr_ring_check_avail(rx_adapter, num_intr_vec); + if (ret) + goto err_free_rxqueue; + + ret = rxa_add_intr_queue(rx_adapter, dev_info, rx_queue_id); + if (ret) + goto err_free_rxqueue; + } else { + + num_intr_vec = 0; + if (rx_adapter->num_rx_intr > nb_rx_intr) { + num_intr_vec = rxa_nb_intr_vect(dev_info, + rx_queue_id, 0); + /* interrupt based queues are being converted to + * poll mode queues, delete the interrupt configuration + * for those. + */ + ret = rxa_del_intr_queue(rx_adapter, + dev_info, rx_queue_id); + if (ret) + goto err_free_rxqueue; + } + } + + if (nb_rx_intr == 0) { + ret = rxa_free_intr_resources(rx_adapter); + if (ret) + goto err_free_rxqueue; + } + + if (wt == 0) { + uint16_t i; + + if (rx_queue_id == -1) { + for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) + dev_info->intr_queue[i] = i; + } else { + if (!rxa_intr_queue(dev_info, rx_queue_id)) + dev_info->intr_queue[nb_rx_intr - 1] = + rx_queue_id; + } + } + + + rxa_add_queue(rx_adapter, dev_info, rx_queue_id, queue_conf); rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr); @@ -1009,6 +1864,7 @@ static int rxa_sw_add(struct rte_event_eth_rx_adapter *rx_adapter, rx_adapter->eth_rx_poll = rx_poll; rx_adapter->wrr_sched = rx_wrr; rx_adapter->wrr_len = nb_wrr; + rx_adapter->num_intr_vec += num_intr_vec; return 0; err_free_rxqueue: @@ -1119,6 +1975,7 @@ rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id, rx_adapter->socket_id = socket_id; rx_adapter->conf_cb = conf_cb; rx_adapter->conf_arg = conf_arg; + rx_adapter->id = id; strcpy(rx_adapter->mem_name, mem_name); rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name, /* FIXME: incompatible with hotplug */ @@ -1302,8 +2159,10 @@ rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id, uint32_t cap; uint32_t nb_rx_poll = 0; uint32_t nb_wrr = 0; + uint32_t nb_rx_intr; struct eth_rx_poll_entry *rx_poll = NULL; uint32_t *rx_wrr = NULL; + int num_intr_vec; RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL); RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL); @@ -1346,29 +2205,59 @@ rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id, } } else { rxa_calc_nb_post_del(rx_adapter, dev_info, rx_queue_id, - &nb_rx_poll, &nb_wrr); + &nb_rx_poll, &nb_rx_intr, &nb_wrr); + ret = rxa_alloc_poll_arrays(rx_adapter, nb_rx_poll, nb_wrr, &rx_poll, &rx_wrr); if (ret) return ret; rte_spinlock_lock(&rx_adapter->rx_lock); + + num_intr_vec = 0; + if (rx_adapter->num_rx_intr > nb_rx_intr) { + + num_intr_vec = rxa_nb_intr_vect(dev_info, + rx_queue_id, 0); + ret = rxa_del_intr_queue(rx_adapter, dev_info, + rx_queue_id); + if (ret) + goto unlock_ret; + } + + if (nb_rx_intr == 0) { + ret = rxa_free_intr_resources(rx_adapter); + if (ret) + goto unlock_ret; + } + rxa_sw_del(rx_adapter, dev_info, rx_queue_id); rxa_calc_wrr_sequence(rx_adapter, rx_poll, rx_wrr); rte_free(rx_adapter->eth_rx_poll); rte_free(rx_adapter->wrr_sched); + if (nb_rx_intr == 0) { + rte_free(dev_info->intr_queue); + dev_info->intr_queue = NULL; + } + rx_adapter->eth_rx_poll = rx_poll; - rx_adapter->num_rx_polled = nb_rx_poll; rx_adapter->wrr_sched = rx_wrr; rx_adapter->wrr_len = nb_wrr; + rx_adapter->num_intr_vec += num_intr_vec; if (dev_info->nb_dev_queues == 0) { rte_free(dev_info->rx_queue); dev_info->rx_queue = NULL; } +unlock_ret: rte_spinlock_unlock(&rx_adapter->rx_lock); + if (ret) { + rte_free(rx_poll); + rte_free(rx_wrr); + return ret; + } rte_service_component_runstate_set(rx_adapter->service_id, rxa_sw_adapter_queue_count(rx_adapter)); @@ -1377,7 +2266,6 @@ rte_event_eth_rx_adapter_queue_del(uint8_t id, uint16_t eth_dev_id, return ret; } - int rte_event_eth_rx_adapter_start(uint8_t id) { diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.h b/lib/librte_eventdev/rte_event_eth_rx_adapter.h index 307b2b5081..97f25e9097 100644 --- a/lib/librte_eventdev/rte_event_eth_rx_adapter.h +++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.h @@ -64,8 +64,7 @@ * the service function ID of the adapter in this case. * * Note: - * 1) Interrupt driven receive queues are currently unimplemented. - * 2) Devices created after an instance of rte_event_eth_rx_adapter_create + * 1) Devices created after an instance of rte_event_eth_rx_adapter_create * should be added to a new instance of the rx adapter. */ @@ -199,6 +198,8 @@ struct rte_event_eth_rx_adapter_stats { * block cycles can be used to compute the percentage of * cycles the service is blocked by the event device. */ + uint64_t rx_intr_packets; + /**< Received packet count for interrupt mode Rx queues */ }; /**