From 8113fd15e229364af766c3ab08f836d735a3e2cc Mon Sep 17 00:00:00 2001 From: Ganapati Kundapura Date: Mon, 30 Aug 2021 08:06:25 -0500 Subject: [PATCH] eventdev/eth_rx: make enqueue buffer circular Rx adapter uses memove() to move unprocessed events to the beginning of the packet enqueue buffer. The use memmove() was found to consume good amount of CPU cycles (about 20%). This patch removes the use of memove() while implementing a circular buffer to avoid copying of data. With this change RX adapter is able to fill the buffer of 16384 events. Signed-off-by: Ganapati Kundapura Acked-by: Jerin Jacob --- lib/eventdev/rte_event_eth_rx_adapter.c | 84 ++++++++++++++++++++----- 1 file changed, 68 insertions(+), 16 deletions(-) diff --git a/lib/eventdev/rte_event_eth_rx_adapter.c b/lib/eventdev/rte_event_eth_rx_adapter.c index 89c4ca5d40..b3d94162bb 100644 --- a/lib/eventdev/rte_event_eth_rx_adapter.c +++ b/lib/eventdev/rte_event_eth_rx_adapter.c @@ -25,7 +25,7 @@ #define BATCH_SIZE 32 #define BLOCK_CNT_THRESHOLD 10 -#define ETH_EVENT_BUFFER_SIZE (4*BATCH_SIZE) +#define ETH_EVENT_BUFFER_SIZE (6*BATCH_SIZE) #define MAX_VECTOR_SIZE 1024 #define MIN_VECTOR_SIZE 4 #define MAX_VECTOR_NS 1E9 @@ -83,6 +83,13 @@ struct rte_eth_event_enqueue_buffer { uint16_t count; /* Array of events in this buffer */ struct rte_event events[ETH_EVENT_BUFFER_SIZE]; + /* Event enqueue happens from head */ + uint16_t head; + /* New packets from rte_eth_rx_burst is enqued from tail */ + uint16_t tail; + /* last element in the buffer before rollover */ + uint16_t last; + uint16_t last_mask; }; struct rte_event_eth_rx_adapter { @@ -749,19 +756,35 @@ rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter) struct rte_eth_event_enqueue_buffer *buf = &rx_adapter->event_enqueue_buffer; struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats; + uint16_t count = buf->last ? buf->last - buf->head : buf->count; - if (!buf->count) + if (!count) return 0; uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id, rx_adapter->event_port_id, - buf->events, - buf->count); - if (n != buf->count) { - memmove(buf->events, - &buf->events[n], - (buf->count - n) * sizeof(struct rte_event)); + &buf->events[buf->head], + count); + if (n != count) stats->rx_enq_retry++; + + buf->head += n; + + if (buf->last && n == count) { + uint16_t n1; + + n1 = rte_event_enqueue_new_burst(rx_adapter->eventdev_id, + rx_adapter->event_port_id, + &buf->events[0], + buf->tail); + + if (n1 != buf->tail) + stats->rx_enq_retry++; + + buf->last = 0; + buf->head = n1; + buf->last_mask = 0; + n += n1; } n ? rxa_enq_block_end_ts(rx_adapter, stats) : @@ -858,7 +881,7 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter, &dev_info->rx_queue[rx_queue_id]; struct rte_eth_event_enqueue_buffer *buf = &rx_adapter->event_enqueue_buffer; - struct rte_event *ev = &buf->events[buf->count]; + uint16_t new_tail = buf->tail; uint64_t event = eth_rx_queue_info->event; uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask; struct rte_mbuf *m = mbufs[0]; @@ -873,7 +896,10 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter, rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1); do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask; for (i = 0; i < num; i++) { + struct rte_event *ev; + m = mbufs[i]; + ev = &buf->events[new_tail]; rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be) : m->hash.rss; @@ -881,7 +907,7 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter, ev->flow_id = (rss & ~flow_id_mask) | (ev->flow_id & flow_id_mask); ev->mbuf = m; - ev++; + new_tail++; } } else { num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info, @@ -892,9 +918,14 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter, dropped = 0; nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id, - ETH_EVENT_BUFFER_SIZE, buf->count, - &buf->events[buf->count], num, - dev_info->cb_arg, &dropped); + buf->last | + (RTE_DIM(buf->events) & ~buf->last_mask), + buf->count >= BATCH_SIZE ? + buf->count - BATCH_SIZE : 0, + &buf->events[buf->tail], + num, + dev_info->cb_arg, + &dropped); if (unlikely(nb_cb > num)) RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events", nb_cb, num); @@ -905,6 +936,27 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter, } buf->count += num; + buf->tail += num; +} + +static inline bool +rxa_pkt_buf_available(struct rte_eth_event_enqueue_buffer *buf) +{ + uint32_t nb_req = buf->tail + BATCH_SIZE; + + if (!buf->last) { + if (nb_req <= RTE_DIM(buf->events)) + return true; + + if (buf->head >= BATCH_SIZE) { + buf->last_mask = ~0; + buf->last = buf->tail; + buf->tail = 0; + return true; + } + } + + return nb_req <= buf->head; } /* Enqueue packets from to event buffer */ @@ -929,7 +981,7 @@ rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter, /* Don't do a batch dequeue from the rx queue if there isn't * enough space in the enqueue buffer. */ - while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) { + while (rxa_pkt_buf_available(buf)) { if (buf->count >= BATCH_SIZE) rxa_flush_event_buffer(rx_adapter); @@ -1090,7 +1142,7 @@ rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter) if (buf->count >= BATCH_SIZE) rxa_flush_event_buffer(rx_adapter); - while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) { + while (rxa_pkt_buf_available(buf)) { struct eth_device_info *dev_info; uint16_t port; uint16_t queue; @@ -1211,7 +1263,7 @@ rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter) */ if (buf->count >= BATCH_SIZE) rxa_flush_event_buffer(rx_adapter); - if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) { + if (!rxa_pkt_buf_available(buf)) { rx_adapter->wrr_pos = wrr_pos; return nb_rx; } -- 2.20.1