From: Nikhil Rao Date: Tue, 10 Oct 2017 22:21:36 +0000 (+0530) Subject: eventdev: add eth Rx adapter implementation X-Git-Tag: spdx-start~1616 X-Git-Url: http://git.droids-corp.org/?a=commitdiff_plain;h=9c38b704d280ac128003238d7d80bf07fa556a7d;p=dpdk.git eventdev: add eth Rx adapter implementation The adapter implementation uses eventdev PMDs to configure the packet transfer if HW support is available and if not, it uses an EAL service function that reads packets from ethernet Rx queues and injects these as events into the event device. Signed-off-by: Gage Eads Signed-off-by: Abhinandan Gujjar Signed-off-by: Nikhil Rao Acked-by: Jerin Jacob --- diff --git a/lib/Makefile b/lib/Makefile index 76e3688d63..c5d032c9eb 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -51,7 +51,7 @@ DIRS-$(CONFIG_RTE_LIBRTE_CRYPTODEV) += librte_cryptodev DEPDIRS-librte_cryptodev := librte_eal librte_mempool librte_ring librte_mbuf DEPDIRS-librte_cryptodev += librte_kvargs DIRS-$(CONFIG_RTE_LIBRTE_EVENTDEV) += librte_eventdev -DEPDIRS-librte_eventdev := librte_eal librte_ring librte_ether +DEPDIRS-librte_eventdev := librte_eal librte_ring librte_ether librte_hash DIRS-$(CONFIG_RTE_LIBRTE_VHOST) += librte_vhost DEPDIRS-librte_vhost := librte_eal librte_mempool librte_mbuf librte_ether DIRS-$(CONFIG_RTE_LIBRTE_HASH) += librte_hash diff --git a/lib/librte_eventdev/Makefile b/lib/librte_eventdev/Makefile index 285653384f..daa4ef1d98 100644 --- a/lib/librte_eventdev/Makefile +++ b/lib/librte_eventdev/Makefile @@ -43,6 +43,7 @@ CFLAGS += $(WERROR_FLAGS) # library source files SRCS-y += rte_eventdev.c SRCS-y += rte_event_ring.c +SRCS-y += rte_event_eth_rx_adapter.c # export include files SYMLINK-y-include += rte_eventdev.h diff --git a/lib/librte_eventdev/rte_event_eth_rx_adapter.c b/lib/librte_eventdev/rte_event_eth_rx_adapter.c new file mode 100644 index 0000000000..1e42dde739 --- /dev/null +++ b/lib/librte_eventdev/rte_event_eth_rx_adapter.c @@ -0,0 +1,1240 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rte_eventdev.h" +#include "rte_eventdev_pmd.h" +#include "rte_event_eth_rx_adapter.h" + +#define BATCH_SIZE 32 +#define BLOCK_CNT_THRESHOLD 10 +#define ETH_EVENT_BUFFER_SIZE (4*BATCH_SIZE) + +#define ETH_RX_ADAPTER_SERVICE_NAME_LEN 32 +#define ETH_RX_ADAPTER_MEM_NAME_LEN 32 + +#define RSS_KEY_SIZE 40 + +/* + * There is an instance of this struct per polled Rx queue added to the + * adapter + */ +struct eth_rx_poll_entry { + /* Eth port to poll */ + uint8_t eth_dev_id; + /* Eth rx queue to poll */ + uint16_t eth_rx_qid; +}; + +/* Instance per adapter */ +struct rte_eth_event_enqueue_buffer { + /* Count of events in this buffer */ + uint16_t count; + /* Array of events in this buffer */ + struct rte_event events[ETH_EVENT_BUFFER_SIZE]; +}; + +struct rte_event_eth_rx_adapter { + /* RSS key */ + uint8_t rss_key_be[RSS_KEY_SIZE]; + /* Event device identifier */ + uint8_t eventdev_id; + /* Per ethernet device structure */ + struct eth_device_info *eth_devices; + /* Event port identifier */ + uint8_t event_port_id; + /* Lock to serialize config updates with service function */ + rte_spinlock_t rx_lock; + /* Max mbufs processed in any service function invocation */ + uint32_t max_nb_rx; + /* Receive queues that need to be polled */ + struct eth_rx_poll_entry *eth_rx_poll; + /* Size of the eth_rx_poll array */ + uint16_t num_rx_polled; + /* Weighted round robin schedule */ + uint32_t *wrr_sched; + /* wrr_sched[] size */ + uint32_t wrr_len; + /* Next entry in wrr[] to begin polling */ + uint32_t wrr_pos; + /* Event burst buffer */ + struct rte_eth_event_enqueue_buffer event_enqueue_buffer; + /* Per adapter stats */ + struct rte_event_eth_rx_adapter_stats stats; + /* Block count, counts upto BLOCK_CNT_THRESHOLD */ + uint16_t enq_block_count; + /* Block start ts */ + uint64_t rx_enq_block_start_ts; + /* Configuration callback for rte_service configuration */ + rte_event_eth_rx_adapter_conf_cb conf_cb; + /* Configuration callback argument */ + void *conf_arg; + /* Set if default_cb is being used */ + int default_cb_arg; + /* Service initialization state */ + uint8_t service_inited; + /* Total count of Rx queues in adapter */ + uint32_t nb_queues; + /* Memory allocation name */ + char mem_name[ETH_RX_ADAPTER_MEM_NAME_LEN]; + /* Socket identifier cached from eventdev */ + int socket_id; + /* Per adapter EAL service */ + uint32_t service_id; +} __rte_cache_aligned; + +/* Per eth device */ +struct eth_device_info { + struct rte_eth_dev *dev; + struct eth_rx_queue_info *rx_queue; + /* Set if ethdev->eventdev packet transfer uses a + * hardware mechanism + */ + uint8_t internal_event_port; + /* Set if the adapter is processing rx queues for + * this eth device and packet processing has been + * started, allows for the code to know if the PMD + * rx_adapter_stop callback needs to be invoked + */ + uint8_t dev_rx_started; + /* If nb_dev_queues > 0, the start callback will + * be invoked if not already invoked + */ + uint16_t nb_dev_queues; +}; + +/* Per Rx queue */ +struct eth_rx_queue_info { + int queue_enabled; /* True if added */ + uint16_t wt; /* Polling weight */ + uint8_t event_queue_id; /* Event queue to enqueue packets to */ + uint8_t sched_type; /* Sched type for events */ + uint8_t priority; /* Event priority */ + uint32_t flow_id; /* App provided flow identifier */ + uint32_t flow_id_mask; /* Set to ~0 if app provides flow id else 0 */ +}; + +static struct rte_event_eth_rx_adapter **event_eth_rx_adapter; + +static inline int +valid_id(uint8_t id) +{ + return id < RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE; +} + +#define RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, retval) do { \ + if (!valid_id(id)) { \ + RTE_EDEV_LOG_ERR("Invalid eth Rx adapter id = %d\n", id); \ + return retval; \ + } \ +} while (0) + +static inline int +sw_rx_adapter_queue_count(struct rte_event_eth_rx_adapter *rx_adapter) +{ + return rx_adapter->num_rx_polled; +} + +/* Greatest common divisor */ +static uint16_t gcd_u16(uint16_t a, uint16_t b) +{ + uint16_t r = a % b; + + return r ? gcd_u16(b, r) : b; +} + +/* Returns the next queue in the polling sequence + * + * http://kb.linuxvirtualserver.org/wiki/Weighted_Round-Robin_Scheduling + */ +static int +wrr_next(struct rte_event_eth_rx_adapter *rx_adapter, + unsigned int n, int *cw, + struct eth_rx_poll_entry *eth_rx_poll, uint16_t max_wt, + uint16_t gcd, int prev) +{ + int i = prev; + uint16_t w; + + while (1) { + uint16_t q; + uint8_t d; + + i = (i + 1) % n; + if (i == 0) { + *cw = *cw - gcd; + if (*cw <= 0) + *cw = max_wt; + } + + q = eth_rx_poll[i].eth_rx_qid; + d = eth_rx_poll[i].eth_dev_id; + w = rx_adapter->eth_devices[d].rx_queue[q].wt; + + if ((int)w >= *cw) + return i; + } +} + +/* Precalculate WRR polling sequence for all queues in rx_adapter */ +static int +eth_poll_wrr_calc(struct rte_event_eth_rx_adapter *rx_adapter) +{ + uint8_t d; + uint16_t q; + unsigned int i; + + /* Initialize variables for calculation of wrr schedule */ + uint16_t max_wrr_pos = 0; + unsigned int poll_q = 0; + uint16_t max_wt = 0; + uint16_t gcd = 0; + + struct eth_rx_poll_entry *rx_poll = NULL; + uint32_t *rx_wrr = NULL; + + if (rx_adapter->num_rx_polled) { + size_t len = RTE_ALIGN(rx_adapter->num_rx_polled * + sizeof(*rx_adapter->eth_rx_poll), + RTE_CACHE_LINE_SIZE); + rx_poll = rte_zmalloc_socket(rx_adapter->mem_name, + len, + RTE_CACHE_LINE_SIZE, + rx_adapter->socket_id); + if (rx_poll == NULL) + return -ENOMEM; + + /* Generate array of all queues to poll, the size of this + * array is poll_q + */ + for (d = 0; d < rte_eth_dev_count(); d++) { + uint16_t nb_rx_queues; + struct eth_device_info *dev_info = + &rx_adapter->eth_devices[d]; + nb_rx_queues = dev_info->dev->data->nb_rx_queues; + if (dev_info->rx_queue == NULL) + continue; + for (q = 0; q < nb_rx_queues; q++) { + struct eth_rx_queue_info *queue_info = + &dev_info->rx_queue[q]; + if (queue_info->queue_enabled == 0) + continue; + + uint16_t wt = queue_info->wt; + rx_poll[poll_q].eth_dev_id = d; + rx_poll[poll_q].eth_rx_qid = q; + max_wrr_pos += wt; + max_wt = RTE_MAX(max_wt, wt); + gcd = (gcd) ? gcd_u16(gcd, wt) : wt; + poll_q++; + } + } + + len = RTE_ALIGN(max_wrr_pos * sizeof(*rx_wrr), + RTE_CACHE_LINE_SIZE); + rx_wrr = rte_zmalloc_socket(rx_adapter->mem_name, + len, + RTE_CACHE_LINE_SIZE, + rx_adapter->socket_id); + if (rx_wrr == NULL) { + rte_free(rx_poll); + return -ENOMEM; + } + + /* Generate polling sequence based on weights */ + int prev = -1; + int cw = -1; + for (i = 0; i < max_wrr_pos; i++) { + rx_wrr[i] = wrr_next(rx_adapter, poll_q, &cw, + rx_poll, max_wt, gcd, prev); + prev = rx_wrr[i]; + } + } + + rte_free(rx_adapter->eth_rx_poll); + rte_free(rx_adapter->wrr_sched); + + rx_adapter->eth_rx_poll = rx_poll; + rx_adapter->wrr_sched = rx_wrr; + rx_adapter->wrr_len = max_wrr_pos; + + return 0; +} + +static inline void +mtoip(struct rte_mbuf *m, struct ipv4_hdr **ipv4_hdr, + struct ipv6_hdr **ipv6_hdr) +{ + struct ether_hdr *eth_hdr = rte_pktmbuf_mtod(m, struct ether_hdr *); + struct vlan_hdr *vlan_hdr; + + *ipv4_hdr = NULL; + *ipv6_hdr = NULL; + + switch (eth_hdr->ether_type) { + case RTE_BE16(ETHER_TYPE_IPv4): + *ipv4_hdr = (struct ipv4_hdr *)(eth_hdr + 1); + break; + + case RTE_BE16(ETHER_TYPE_IPv6): + *ipv6_hdr = (struct ipv6_hdr *)(eth_hdr + 1); + break; + + case RTE_BE16(ETHER_TYPE_VLAN): + vlan_hdr = (struct vlan_hdr *)(eth_hdr + 1); + switch (vlan_hdr->eth_proto) { + case RTE_BE16(ETHER_TYPE_IPv4): + *ipv4_hdr = (struct ipv4_hdr *)(vlan_hdr + 1); + break; + case RTE_BE16(ETHER_TYPE_IPv6): + *ipv6_hdr = (struct ipv6_hdr *)(vlan_hdr + 1); + break; + default: + break; + } + break; + + default: + break; + } +} + +/* Calculate RSS hash for IPv4/6 */ +static inline uint32_t +do_softrss(struct rte_mbuf *m, const uint8_t *rss_key_be) +{ + uint32_t input_len; + void *tuple; + struct rte_ipv4_tuple ipv4_tuple; + struct rte_ipv6_tuple ipv6_tuple; + struct ipv4_hdr *ipv4_hdr; + struct ipv6_hdr *ipv6_hdr; + + mtoip(m, &ipv4_hdr, &ipv6_hdr); + + if (ipv4_hdr) { + ipv4_tuple.src_addr = rte_be_to_cpu_32(ipv4_hdr->src_addr); + ipv4_tuple.dst_addr = rte_be_to_cpu_32(ipv4_hdr->dst_addr); + tuple = &ipv4_tuple; + input_len = RTE_THASH_V4_L3_LEN; + } else if (ipv6_hdr) { + rte_thash_load_v6_addrs(ipv6_hdr, + (union rte_thash_tuple *)&ipv6_tuple); + tuple = &ipv6_tuple; + input_len = RTE_THASH_V6_L3_LEN; + } else + return 0; + + return rte_softrss_be(tuple, input_len, rss_key_be); +} + +static inline int +rx_enq_blocked(struct rte_event_eth_rx_adapter *rx_adapter) +{ + return !!rx_adapter->enq_block_count; +} + +static inline void +rx_enq_block_start_ts(struct rte_event_eth_rx_adapter *rx_adapter) +{ + if (rx_adapter->rx_enq_block_start_ts) + return; + + rx_adapter->enq_block_count++; + if (rx_adapter->enq_block_count < BLOCK_CNT_THRESHOLD) + return; + + rx_adapter->rx_enq_block_start_ts = rte_get_tsc_cycles(); +} + +static inline void +rx_enq_block_end_ts(struct rte_event_eth_rx_adapter *rx_adapter, + struct rte_event_eth_rx_adapter_stats *stats) +{ + if (unlikely(!stats->rx_enq_start_ts)) + stats->rx_enq_start_ts = rte_get_tsc_cycles(); + + if (likely(!rx_enq_blocked(rx_adapter))) + return; + + rx_adapter->enq_block_count = 0; + if (rx_adapter->rx_enq_block_start_ts) { + stats->rx_enq_end_ts = rte_get_tsc_cycles(); + stats->rx_enq_block_cycles += stats->rx_enq_end_ts - + rx_adapter->rx_enq_block_start_ts; + rx_adapter->rx_enq_block_start_ts = 0; + } +} + +/* Add event to buffer, free space check is done prior to calling + * this function + */ +static inline void +buf_event_enqueue(struct rte_event_eth_rx_adapter *rx_adapter, + struct rte_event *ev) +{ + struct rte_eth_event_enqueue_buffer *buf = + &rx_adapter->event_enqueue_buffer; + rte_memcpy(&buf->events[buf->count++], ev, sizeof(struct rte_event)); +} + +/* Enqueue buffered events to event device */ +static inline uint16_t +flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter) +{ + struct rte_eth_event_enqueue_buffer *buf = + &rx_adapter->event_enqueue_buffer; + struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats; + + uint16_t n = rte_event_enqueue_burst(rx_adapter->eventdev_id, + rx_adapter->event_port_id, + buf->events, + buf->count); + if (n != buf->count) { + memmove(buf->events, + &buf->events[n], + (buf->count - n) * sizeof(struct rte_event)); + stats->rx_enq_retry++; + } + + n ? rx_enq_block_end_ts(rx_adapter, stats) : + rx_enq_block_start_ts(rx_adapter); + + buf->count -= n; + stats->rx_enq_count += n; + + return n; +} + +static inline void +fill_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter, + uint8_t dev_id, + uint16_t rx_queue_id, + struct rte_mbuf **mbufs, + uint16_t num) +{ + uint32_t i; + struct eth_device_info *eth_device_info = + &rx_adapter->eth_devices[dev_id]; + struct eth_rx_queue_info *eth_rx_queue_info = + ð_device_info->rx_queue[rx_queue_id]; + + int32_t qid = eth_rx_queue_info->event_queue_id; + uint8_t sched_type = eth_rx_queue_info->sched_type; + uint8_t priority = eth_rx_queue_info->priority; + uint32_t flow_id; + struct rte_event events[BATCH_SIZE]; + struct rte_mbuf *m = mbufs[0]; + uint32_t rss_mask; + uint32_t rss; + int do_rss; + + /* 0xffff ffff if PKT_RX_RSS_HASH is set, otherwise 0 */ + rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1); + do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask; + + for (i = 0; i < num; i++) { + m = mbufs[i]; + struct rte_event *ev = &events[i]; + + rss = do_rss ? + do_softrss(m, rx_adapter->rss_key_be) : m->hash.rss; + flow_id = + eth_rx_queue_info->flow_id & + eth_rx_queue_info->flow_id_mask; + flow_id |= rss & ~eth_rx_queue_info->flow_id_mask; + + ev->flow_id = flow_id; + ev->op = RTE_EVENT_OP_NEW; + ev->sched_type = sched_type; + ev->queue_id = qid; + ev->event_type = RTE_EVENT_TYPE_ETH_RX_ADAPTER; + ev->sub_event_type = 0; + ev->priority = priority; + ev->mbuf = m; + + buf_event_enqueue(rx_adapter, ev); + } +} + +/* + * Polls receive queues added to the event adapter and enqueues received + * packets to the event device. + * + * The receive code enqueues initially to a temporary buffer, the + * temporary buffer is drained anytime it holds >= BATCH_SIZE packets + * + * If there isn't space available in the temporary buffer, packets from the + * Rx queue aren't dequeued from the eth device, this back pressures the + * eth device, in virtual device environments this back pressure is relayed to + * the hypervisor's switching layer where adjustments can be made to deal with + * it. + */ +static inline uint32_t +eth_rx_poll(struct rte_event_eth_rx_adapter *rx_adapter) +{ + uint32_t num_queue; + uint16_t n; + uint32_t nb_rx = 0; + struct rte_mbuf *mbufs[BATCH_SIZE]; + struct rte_eth_event_enqueue_buffer *buf; + uint32_t wrr_pos; + uint32_t max_nb_rx; + + wrr_pos = rx_adapter->wrr_pos; + max_nb_rx = rx_adapter->max_nb_rx; + buf = &rx_adapter->event_enqueue_buffer; + struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats; + + /* Iterate through a WRR sequence */ + for (num_queue = 0; num_queue < rx_adapter->wrr_len; num_queue++) { + unsigned int poll_idx = rx_adapter->wrr_sched[wrr_pos]; + uint16_t qid = rx_adapter->eth_rx_poll[poll_idx].eth_rx_qid; + uint8_t d = rx_adapter->eth_rx_poll[poll_idx].eth_dev_id; + + /* Don't do a batch dequeue from the rx queue if there isn't + * enough space in the enqueue buffer. + */ + if (buf->count >= BATCH_SIZE) + flush_event_buffer(rx_adapter); + if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) + break; + + stats->rx_poll_count++; + n = rte_eth_rx_burst(d, qid, mbufs, BATCH_SIZE); + + if (n) { + stats->rx_packets += n; + /* The check before rte_eth_rx_burst() ensures that + * all n mbufs can be buffered + */ + fill_event_buffer(rx_adapter, d, qid, mbufs, n); + nb_rx += n; + if (nb_rx > max_nb_rx) { + rx_adapter->wrr_pos = + (wrr_pos + 1) % rx_adapter->wrr_len; + return nb_rx; + } + } + + if (++wrr_pos == rx_adapter->wrr_len) + wrr_pos = 0; + } + + return nb_rx; +} + +static int +event_eth_rx_adapter_service_func(void *args) +{ + struct rte_event_eth_rx_adapter *rx_adapter = args; + struct rte_eth_event_enqueue_buffer *buf; + + buf = &rx_adapter->event_enqueue_buffer; + if (rte_spinlock_trylock(&rx_adapter->rx_lock) == 0) + return 0; + if (eth_rx_poll(rx_adapter) == 0 && buf->count) + flush_event_buffer(rx_adapter); + rte_spinlock_unlock(&rx_adapter->rx_lock); + return 0; +} + +static int +rte_event_eth_rx_adapter_init(void) +{ + const char *name = "rte_event_eth_rx_adapter_array"; + const struct rte_memzone *mz; + unsigned int sz; + + sz = sizeof(*event_eth_rx_adapter) * + RTE_EVENT_ETH_RX_ADAPTER_MAX_INSTANCE; + sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE); + + mz = rte_memzone_lookup(name); + if (mz == NULL) { + mz = rte_memzone_reserve_aligned(name, sz, rte_socket_id(), 0, + RTE_CACHE_LINE_SIZE); + if (mz == NULL) { + RTE_EDEV_LOG_ERR("failed to reserve memzone err = %" + PRId32, rte_errno); + return -rte_errno; + } + } + + event_eth_rx_adapter = mz->addr; + return 0; +} + +static inline struct rte_event_eth_rx_adapter * +id_to_rx_adapter(uint8_t id) +{ + return event_eth_rx_adapter ? + event_eth_rx_adapter[id] : NULL; +} + +static int +default_conf_cb(uint8_t id, uint8_t dev_id, + struct rte_event_eth_rx_adapter_conf *conf, void *arg) +{ + int ret; + struct rte_eventdev *dev; + struct rte_event_dev_config dev_conf; + int started; + uint8_t port_id; + struct rte_event_port_conf *port_conf = arg; + struct rte_event_eth_rx_adapter *rx_adapter = id_to_rx_adapter(id); + + dev = &rte_eventdevs[rx_adapter->eventdev_id]; + dev_conf = dev->data->dev_conf; + + started = dev->data->dev_started; + if (started) + rte_event_dev_stop(dev_id); + port_id = dev_conf.nb_event_ports; + dev_conf.nb_event_ports += 1; + ret = rte_event_dev_configure(dev_id, &dev_conf); + if (ret) { + RTE_EDEV_LOG_ERR("failed to configure event dev %u\n", + dev_id); + if (started) + rte_event_dev_start(dev_id); + return ret; + } + + ret = rte_event_port_setup(dev_id, port_id, port_conf); + if (ret) { + RTE_EDEV_LOG_ERR("failed to setup event port %u\n", + port_id); + return ret; + } + + conf->event_port_id = port_id; + conf->max_nb_rx = 128; + if (started) + rte_event_dev_start(dev_id); + rx_adapter->default_cb_arg = 1; + return ret; +} + +static int +init_service(struct rte_event_eth_rx_adapter *rx_adapter, uint8_t id) +{ + int ret; + struct rte_service_spec service; + struct rte_event_eth_rx_adapter_conf rx_adapter_conf; + + if (rx_adapter->service_inited) + return 0; + + memset(&service, 0, sizeof(service)); + snprintf(service.name, ETH_RX_ADAPTER_SERVICE_NAME_LEN, + "rte_event_eth_rx_adapter_%d", id); + service.socket_id = rx_adapter->socket_id; + service.callback = event_eth_rx_adapter_service_func; + service.callback_userdata = rx_adapter; + /* Service function handles locking for queue add/del updates */ + service.capabilities = RTE_SERVICE_CAP_MT_SAFE; + ret = rte_service_component_register(&service, &rx_adapter->service_id); + if (ret) { + RTE_EDEV_LOG_ERR("failed to register service %s err = %" PRId32, + service.name, ret); + return ret; + } + + ret = rx_adapter->conf_cb(id, rx_adapter->eventdev_id, + &rx_adapter_conf, rx_adapter->conf_arg); + if (ret) { + RTE_EDEV_LOG_ERR("configuration callback failed err = %" PRId32, + ret); + goto err_done; + } + rx_adapter->event_port_id = rx_adapter_conf.event_port_id; + rx_adapter->max_nb_rx = rx_adapter_conf.max_nb_rx; + rx_adapter->service_inited = 1; + return 0; + +err_done: + rte_service_component_unregister(rx_adapter->service_id); + return ret; +} + + +static void +update_queue_info(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + int32_t rx_queue_id, + uint8_t add) +{ + struct eth_rx_queue_info *queue_info; + int enabled; + uint16_t i; + + if (dev_info->rx_queue == NULL) + return; + + if (rx_queue_id == -1) { + for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) + update_queue_info(rx_adapter, dev_info, i, add); + } else { + queue_info = &dev_info->rx_queue[rx_queue_id]; + enabled = queue_info->queue_enabled; + if (add) { + rx_adapter->nb_queues += !enabled; + dev_info->nb_dev_queues += !enabled; + } else { + rx_adapter->nb_queues -= enabled; + dev_info->nb_dev_queues -= enabled; + } + queue_info->queue_enabled = !!add; + } +} + +static int +event_eth_rx_adapter_queue_del(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + uint16_t rx_queue_id) +{ + struct eth_rx_queue_info *queue_info; + + if (rx_adapter->nb_queues == 0) + return 0; + + queue_info = &dev_info->rx_queue[rx_queue_id]; + rx_adapter->num_rx_polled -= queue_info->queue_enabled; + update_queue_info(rx_adapter, dev_info, rx_queue_id, 0); + return 0; +} + +static void +event_eth_rx_adapter_queue_add(struct rte_event_eth_rx_adapter *rx_adapter, + struct eth_device_info *dev_info, + uint16_t rx_queue_id, + const struct rte_event_eth_rx_adapter_queue_conf *conf) + +{ + struct eth_rx_queue_info *queue_info; + const struct rte_event *ev = &conf->ev; + + queue_info = &dev_info->rx_queue[rx_queue_id]; + queue_info->event_queue_id = ev->queue_id; + queue_info->sched_type = ev->sched_type; + queue_info->priority = ev->priority; + queue_info->wt = conf->servicing_weight; + + if (conf->rx_queue_flags & + RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID) { + queue_info->flow_id = ev->flow_id; + queue_info->flow_id_mask = ~0; + } + + /* The same queue can be added more than once */ + rx_adapter->num_rx_polled += !queue_info->queue_enabled; + update_queue_info(rx_adapter, dev_info, rx_queue_id, 1); +} + +static int add_rx_queue(struct rte_event_eth_rx_adapter *rx_adapter, + uint8_t eth_dev_id, + int rx_queue_id, + const struct rte_event_eth_rx_adapter_queue_conf *queue_conf) +{ + struct eth_device_info *dev_info = &rx_adapter->eth_devices[eth_dev_id]; + struct rte_event_eth_rx_adapter_queue_conf temp_conf; + uint32_t i; + int ret; + + if (queue_conf->servicing_weight == 0) { + + struct rte_eth_dev_data *data = dev_info->dev->data; + if (data->dev_conf.intr_conf.rxq) { + RTE_EDEV_LOG_ERR("Interrupt driven queues" + " not supported"); + return -ENOTSUP; + } + temp_conf = *queue_conf; + + /* If Rx interrupts are disabled set wt = 1 */ + temp_conf.servicing_weight = 1; + queue_conf = &temp_conf; + } + + if (dev_info->rx_queue == NULL) { + dev_info->rx_queue = + rte_zmalloc_socket(rx_adapter->mem_name, + dev_info->dev->data->nb_rx_queues * + sizeof(struct eth_rx_queue_info), 0, + rx_adapter->socket_id); + if (dev_info->rx_queue == NULL) + return -ENOMEM; + } + + if (rx_queue_id == -1) { + for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) + event_eth_rx_adapter_queue_add(rx_adapter, + dev_info, i, + queue_conf); + } else { + event_eth_rx_adapter_queue_add(rx_adapter, dev_info, + (uint16_t)rx_queue_id, + queue_conf); + } + + ret = eth_poll_wrr_calc(rx_adapter); + if (ret) { + event_eth_rx_adapter_queue_del(rx_adapter, + dev_info, rx_queue_id); + return ret; + } + + return ret; +} + +static int +rx_adapter_ctrl(uint8_t id, int start) +{ + struct rte_event_eth_rx_adapter *rx_adapter; + struct rte_eventdev *dev; + struct eth_device_info *dev_info; + uint32_t i; + int use_service = 0; + int stop = !start; + + RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL); + rx_adapter = id_to_rx_adapter(id); + if (rx_adapter == NULL) + return -EINVAL; + + dev = &rte_eventdevs[rx_adapter->eventdev_id]; + + for (i = 0; i < rte_eth_dev_count(); i++) { + dev_info = &rx_adapter->eth_devices[i]; + /* if start check for num dev queues */ + if (start && !dev_info->nb_dev_queues) + continue; + /* if stop check if dev has been started */ + if (stop && !dev_info->dev_rx_started) + continue; + use_service |= !dev_info->internal_event_port; + dev_info->dev_rx_started = start; + if (dev_info->internal_event_port == 0) + continue; + start ? (*dev->dev_ops->eth_rx_adapter_start)(dev, + &rte_eth_devices[i]) : + (*dev->dev_ops->eth_rx_adapter_stop)(dev, + &rte_eth_devices[i]); + } + + if (use_service) + rte_service_runstate_set(rx_adapter->service_id, start); + + return 0; +} + +int +rte_event_eth_rx_adapter_create_ext(uint8_t id, uint8_t dev_id, + rte_event_eth_rx_adapter_conf_cb conf_cb, + void *conf_arg) +{ + struct rte_event_eth_rx_adapter *rx_adapter; + int ret; + int socket_id; + uint8_t i; + char mem_name[ETH_RX_ADAPTER_SERVICE_NAME_LEN]; + const uint8_t default_rss_key[] = { + 0x6d, 0x5a, 0x56, 0xda, 0x25, 0x5b, 0x0e, 0xc2, + 0x41, 0x67, 0x25, 0x3d, 0x43, 0xa3, 0x8f, 0xb0, + 0xd0, 0xca, 0x2b, 0xcb, 0xae, 0x7b, 0x30, 0xb4, + 0x77, 0xcb, 0x2d, 0xa3, 0x80, 0x30, 0xf2, 0x0c, + 0x6a, 0x42, 0xb7, 0x3b, 0xbe, 0xac, 0x01, 0xfa, + }; + + RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL); + RTE_EVENTDEV_VALID_DEVID_OR_ERR_RET(dev_id, -EINVAL); + if (conf_cb == NULL) + return -EINVAL; + + if (event_eth_rx_adapter == NULL) { + ret = rte_event_eth_rx_adapter_init(); + if (ret) + return ret; + } + + rx_adapter = id_to_rx_adapter(id); + if (rx_adapter != NULL) { + RTE_EDEV_LOG_ERR("Eth Rx adapter exists id = %" PRIu8, id); + return -EEXIST; + } + + socket_id = rte_event_dev_socket_id(dev_id); + snprintf(mem_name, ETH_RX_ADAPTER_MEM_NAME_LEN, + "rte_event_eth_rx_adapter_%d", + id); + + rx_adapter = rte_zmalloc_socket(mem_name, sizeof(*rx_adapter), + RTE_CACHE_LINE_SIZE, socket_id); + if (rx_adapter == NULL) { + RTE_EDEV_LOG_ERR("failed to get mem for rx adapter"); + return -ENOMEM; + } + + rx_adapter->eventdev_id = dev_id; + rx_adapter->socket_id = socket_id; + rx_adapter->conf_cb = conf_cb; + rx_adapter->conf_arg = conf_arg; + strcpy(rx_adapter->mem_name, mem_name); + rx_adapter->eth_devices = rte_zmalloc_socket(rx_adapter->mem_name, + rte_eth_dev_count() * + sizeof(struct eth_device_info), 0, + socket_id); + rte_convert_rss_key((const uint32_t *)default_rss_key, + (uint32_t *)rx_adapter->rss_key_be, + RTE_DIM(default_rss_key)); + + if (rx_adapter->eth_devices == NULL) { + RTE_EDEV_LOG_ERR("failed to get mem for eth devices\n"); + rte_free(rx_adapter); + return -ENOMEM; + } + rte_spinlock_init(&rx_adapter->rx_lock); + for (i = 0; i < rte_eth_dev_count(); i++) + rx_adapter->eth_devices[i].dev = &rte_eth_devices[i]; + + event_eth_rx_adapter[id] = rx_adapter; + if (conf_cb == default_conf_cb) + rx_adapter->default_cb_arg = 1; + return 0; +} + +int +rte_event_eth_rx_adapter_create(uint8_t id, uint8_t dev_id, + struct rte_event_port_conf *port_config) +{ + struct rte_event_port_conf *pc; + int ret; + + if (port_config == NULL) + return -EINVAL; + RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL); + + pc = rte_malloc(NULL, sizeof(*pc), 0); + if (pc == NULL) + return -ENOMEM; + *pc = *port_config; + ret = rte_event_eth_rx_adapter_create_ext(id, dev_id, + default_conf_cb, + pc); + if (ret) + rte_free(pc); + return ret; +} + +int +rte_event_eth_rx_adapter_free(uint8_t id) +{ + struct rte_event_eth_rx_adapter *rx_adapter; + + RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL); + + rx_adapter = id_to_rx_adapter(id); + if (rx_adapter == NULL) + return -EINVAL; + + if (rx_adapter->nb_queues) { + RTE_EDEV_LOG_ERR("%" PRIu16 " Rx queues not deleted", + rx_adapter->nb_queues); + return -EBUSY; + } + + if (rx_adapter->default_cb_arg) + rte_free(rx_adapter->conf_arg); + rte_free(rx_adapter->eth_devices); + rte_free(rx_adapter); + event_eth_rx_adapter[id] = NULL; + + return 0; +} + +int +rte_event_eth_rx_adapter_queue_add(uint8_t id, + uint8_t eth_dev_id, + int32_t rx_queue_id, + const struct rte_event_eth_rx_adapter_queue_conf *queue_conf) +{ + int ret; + uint32_t cap; + struct rte_event_eth_rx_adapter *rx_adapter; + struct rte_eventdev *dev; + struct eth_device_info *dev_info; + int start_service; + + RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL); + RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL); + + rx_adapter = id_to_rx_adapter(id); + if ((rx_adapter == NULL) || (queue_conf == NULL)) + return -EINVAL; + + dev = &rte_eventdevs[rx_adapter->eventdev_id]; + ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id, + eth_dev_id, + &cap); + if (ret) { + RTE_EDEV_LOG_ERR("Failed to get adapter caps edev %" PRIu8 + "eth port %" PRIu8, id, eth_dev_id); + return ret; + } + + if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_OVERRIDE_FLOW_ID) == 0 + && (queue_conf->rx_queue_flags & + RTE_EVENT_ETH_RX_ADAPTER_QUEUE_FLOW_ID_VALID)) { + RTE_EDEV_LOG_ERR("Flow ID override is not supported," + " eth port: %" PRIu8 " adapter id: %" PRIu8, + eth_dev_id, id); + return -EINVAL; + } + + if ((cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_MULTI_EVENTQ) == 0 && + (rx_queue_id != -1)) { + RTE_EDEV_LOG_ERR("Rx queues can only be connected to single " + "event queue id %u eth port %u", id, eth_dev_id); + return -EINVAL; + } + + if (rx_queue_id != -1 && (uint16_t)rx_queue_id >= + rte_eth_devices[eth_dev_id].data->nb_rx_queues) { + RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16, + (uint16_t)rx_queue_id); + return -EINVAL; + } + + start_service = 0; + dev_info = &rx_adapter->eth_devices[eth_dev_id]; + + if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) { + RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_add, + -ENOTSUP); + if (dev_info->rx_queue == NULL) { + dev_info->rx_queue = + rte_zmalloc_socket(rx_adapter->mem_name, + dev_info->dev->data->nb_rx_queues * + sizeof(struct eth_rx_queue_info), 0, + rx_adapter->socket_id); + if (dev_info->rx_queue == NULL) + return -ENOMEM; + } + + ret = (*dev->dev_ops->eth_rx_adapter_queue_add)(dev, + &rte_eth_devices[eth_dev_id], + rx_queue_id, queue_conf); + if (ret == 0) { + update_queue_info(rx_adapter, + &rx_adapter->eth_devices[eth_dev_id], + rx_queue_id, + 1); + } + } else { + rte_spinlock_lock(&rx_adapter->rx_lock); + ret = init_service(rx_adapter, id); + if (ret == 0) + ret = add_rx_queue(rx_adapter, eth_dev_id, rx_queue_id, + queue_conf); + rte_spinlock_unlock(&rx_adapter->rx_lock); + if (ret == 0) + start_service = !!sw_rx_adapter_queue_count(rx_adapter); + } + + if (ret) + return ret; + + if (start_service) + rte_service_component_runstate_set(rx_adapter->service_id, 1); + + return 0; +} + +int +rte_event_eth_rx_adapter_queue_del(uint8_t id, uint8_t eth_dev_id, + int32_t rx_queue_id) +{ + int ret = 0; + struct rte_eventdev *dev; + struct rte_event_eth_rx_adapter *rx_adapter; + struct eth_device_info *dev_info; + uint32_t cap; + uint16_t i; + + RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL); + RTE_ETH_VALID_PORTID_OR_ERR_RET(eth_dev_id, -EINVAL); + + rx_adapter = id_to_rx_adapter(id); + if (rx_adapter == NULL) + return -EINVAL; + + dev = &rte_eventdevs[rx_adapter->eventdev_id]; + ret = rte_event_eth_rx_adapter_caps_get(rx_adapter->eventdev_id, + eth_dev_id, + &cap); + if (ret) + return ret; + + if (rx_queue_id != -1 && (uint16_t)rx_queue_id >= + rte_eth_devices[eth_dev_id].data->nb_rx_queues) { + RTE_EDEV_LOG_ERR("Invalid rx queue_id %" PRIu16, + (uint16_t)rx_queue_id); + return -EINVAL; + } + + dev_info = &rx_adapter->eth_devices[eth_dev_id]; + + if (cap & RTE_EVENT_ETH_RX_ADAPTER_CAP_INTERNAL_PORT) { + RTE_FUNC_PTR_OR_ERR_RET(*dev->dev_ops->eth_rx_adapter_queue_del, + -ENOTSUP); + ret = (*dev->dev_ops->eth_rx_adapter_queue_del)(dev, + &rte_eth_devices[eth_dev_id], + rx_queue_id); + if (ret == 0) { + update_queue_info(rx_adapter, + &rx_adapter->eth_devices[eth_dev_id], + rx_queue_id, + 0); + if (dev_info->nb_dev_queues == 0) { + rte_free(dev_info->rx_queue); + dev_info->rx_queue = NULL; + } + } + } else { + int rc; + rte_spinlock_lock(&rx_adapter->rx_lock); + if (rx_queue_id == -1) { + for (i = 0; i < dev_info->dev->data->nb_rx_queues; i++) + event_eth_rx_adapter_queue_del(rx_adapter, + dev_info, + i); + } else { + event_eth_rx_adapter_queue_del(rx_adapter, + dev_info, + (uint16_t)rx_queue_id); + } + + rc = eth_poll_wrr_calc(rx_adapter); + if (rc) + RTE_EDEV_LOG_ERR("WRR recalculation failed %" PRId32, + rc); + + if (dev_info->nb_dev_queues == 0) { + rte_free(dev_info->rx_queue); + dev_info->rx_queue = NULL; + } + + rte_spinlock_unlock(&rx_adapter->rx_lock); + rte_service_component_runstate_set(rx_adapter->service_id, + sw_rx_adapter_queue_count(rx_adapter)); + } + + return ret; +} + + +int +rte_event_eth_rx_adapter_start(uint8_t id) +{ + return rx_adapter_ctrl(id, 1); +} + +int +rte_event_eth_rx_adapter_stop(uint8_t id) +{ + return rx_adapter_ctrl(id, 0); +} + +int +rte_event_eth_rx_adapter_stats_get(uint8_t id, + struct rte_event_eth_rx_adapter_stats *stats) +{ + struct rte_event_eth_rx_adapter *rx_adapter; + struct rte_event_eth_rx_adapter_stats dev_stats_sum = { 0 }; + struct rte_event_eth_rx_adapter_stats dev_stats; + struct rte_eventdev *dev; + struct eth_device_info *dev_info; + uint32_t i; + int ret; + + RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL); + + rx_adapter = id_to_rx_adapter(id); + if (rx_adapter == NULL || stats == NULL) + return -EINVAL; + + dev = &rte_eventdevs[rx_adapter->eventdev_id]; + memset(stats, 0, sizeof(*stats)); + for (i = 0; i < rte_eth_dev_count(); i++) { + dev_info = &rx_adapter->eth_devices[i]; + if (dev_info->internal_event_port == 0 || + dev->dev_ops->eth_rx_adapter_stats_get == NULL) + continue; + ret = (*dev->dev_ops->eth_rx_adapter_stats_get)(dev, + &rte_eth_devices[i], + &dev_stats); + if (ret) + continue; + dev_stats_sum.rx_packets += dev_stats.rx_packets; + dev_stats_sum.rx_enq_count += dev_stats.rx_enq_count; + } + + if (rx_adapter->service_inited) + *stats = rx_adapter->stats; + + stats->rx_packets += dev_stats_sum.rx_packets; + stats->rx_enq_count += dev_stats_sum.rx_enq_count; + return 0; +} + +int +rte_event_eth_rx_adapter_stats_reset(uint8_t id) +{ + struct rte_event_eth_rx_adapter *rx_adapter; + struct rte_eventdev *dev; + struct eth_device_info *dev_info; + uint32_t i; + + RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL); + + rx_adapter = id_to_rx_adapter(id); + if (rx_adapter == NULL) + return -EINVAL; + + dev = &rte_eventdevs[rx_adapter->eventdev_id]; + for (i = 0; i < rte_eth_dev_count(); i++) { + dev_info = &rx_adapter->eth_devices[i]; + if (dev_info->internal_event_port == 0 || + dev->dev_ops->eth_rx_adapter_stats_reset == NULL) + continue; + (*dev->dev_ops->eth_rx_adapter_stats_reset)(dev, + &rte_eth_devices[i]); + } + + memset(&rx_adapter->stats, 0, sizeof(rx_adapter->stats)); + return 0; +} + +int +rte_event_eth_rx_adapter_service_id_get(uint8_t id, uint32_t *service_id) +{ + struct rte_event_eth_rx_adapter *rx_adapter; + + RTE_EVENT_ETH_RX_ADAPTER_ID_VALID_OR_ERR_RET(id, -EINVAL); + + rx_adapter = id_to_rx_adapter(id); + if (rx_adapter == NULL || service_id == NULL) + return -EINVAL; + + if (rx_adapter->service_inited) + *service_id = rx_adapter->service_id; + + return rx_adapter->service_inited ? 0 : -ESRCH; +} diff --git a/lib/librte_eventdev/rte_eventdev_version.map b/lib/librte_eventdev/rte_eventdev_version.map index 23ec7267f8..828b9a3403 100644 --- a/lib/librte_eventdev/rte_eventdev_version.map +++ b/lib/librte_eventdev/rte_eventdev_version.map @@ -55,5 +55,14 @@ DPDK_17.11 { rte_event_queue_attr_get; rte_event_eth_rx_adapter_caps_get; + rte_event_eth_rx_adapter_create; + rte_event_eth_rx_adapter_create_ext; + rte_event_eth_rx_adapter_free; + rte_event_eth_rx_adapter_queue_add; + rte_event_eth_rx_adapter_queue_del; + rte_event_eth_rx_adapter_start; + rte_event_eth_rx_adapter_stop; + rte_event_eth_rx_adapter_stats_get; + rte_event_eth_rx_adapter_stats_reset; } DPDK_17.08;