eventdev/eth_rx: make enqueue buffer circular
authorGanapati Kundapura <ganapati.kundapura@intel.com>
Mon, 30 Aug 2021 13:06:25 +0000 (08:06 -0500)
committerJerin Jacob <jerinj@marvell.com>
Thu, 21 Oct 2021 08:14:49 +0000 (10:14 +0200)
Rx adapter uses memove() to move unprocessed events to the beginning of
the packet enqueue buffer. The use memmove() was found to consume good
amount of CPU cycles (about 20%).

This patch removes the use of memove() while implementing a circular
buffer to avoid copying of data. With this change RX adapter is able
to fill the buffer of 16384 events.

Signed-off-by: Ganapati Kundapura <ganapati.kundapura@intel.com>
Acked-by: Jerin Jacob <jerinj@marvell.com>
lib/eventdev/rte_event_eth_rx_adapter.c

index 89c4ca5..b3d9416 100644 (file)
@@ -25,7 +25,7 @@
 
 #define BATCH_SIZE             32
 #define BLOCK_CNT_THRESHOLD    10
-#define ETH_EVENT_BUFFER_SIZE  (4*BATCH_SIZE)
+#define ETH_EVENT_BUFFER_SIZE  (6*BATCH_SIZE)
 #define MAX_VECTOR_SIZE                1024
 #define MIN_VECTOR_SIZE                4
 #define MAX_VECTOR_NS          1E9
@@ -83,6 +83,13 @@ struct rte_eth_event_enqueue_buffer {
        uint16_t count;
        /* Array of events in this buffer */
        struct rte_event events[ETH_EVENT_BUFFER_SIZE];
+       /* Event enqueue happens from head */
+       uint16_t head;
+       /* New packets from rte_eth_rx_burst is enqued from tail */
+       uint16_t tail;
+       /* last element in the buffer before rollover */
+       uint16_t last;
+       uint16_t last_mask;
 };
 
 struct rte_event_eth_rx_adapter {
@@ -749,19 +756,35 @@ rxa_flush_event_buffer(struct rte_event_eth_rx_adapter *rx_adapter)
        struct rte_eth_event_enqueue_buffer *buf =
            &rx_adapter->event_enqueue_buffer;
        struct rte_event_eth_rx_adapter_stats *stats = &rx_adapter->stats;
+       uint16_t count = buf->last ? buf->last - buf->head : buf->count;
 
-       if (!buf->count)
+       if (!count)
                return 0;
 
        uint16_t n = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
                                        rx_adapter->event_port_id,
-                                       buf->events,
-                                       buf->count);
-       if (n != buf->count) {
-               memmove(buf->events,
-                       &buf->events[n],
-                       (buf->count - n) * sizeof(struct rte_event));
+                                       &buf->events[buf->head],
+                                       count);
+       if (n != count)
                stats->rx_enq_retry++;
+
+       buf->head += n;
+
+       if (buf->last && n == count) {
+               uint16_t n1;
+
+               n1 = rte_event_enqueue_new_burst(rx_adapter->eventdev_id,
+                                       rx_adapter->event_port_id,
+                                       &buf->events[0],
+                                       buf->tail);
+
+               if (n1 != buf->tail)
+                       stats->rx_enq_retry++;
+
+               buf->last = 0;
+               buf->head = n1;
+               buf->last_mask = 0;
+               n += n1;
        }
 
        n ? rxa_enq_block_end_ts(rx_adapter, stats) :
@@ -858,7 +881,7 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
                                        &dev_info->rx_queue[rx_queue_id];
        struct rte_eth_event_enqueue_buffer *buf =
                                        &rx_adapter->event_enqueue_buffer;
-       struct rte_event *ev = &buf->events[buf->count];
+       uint16_t new_tail = buf->tail;
        uint64_t event = eth_rx_queue_info->event;
        uint32_t flow_id_mask = eth_rx_queue_info->flow_id_mask;
        struct rte_mbuf *m = mbufs[0];
@@ -873,7 +896,10 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
                rss_mask = ~(((m->ol_flags & PKT_RX_RSS_HASH) != 0) - 1);
                do_rss = !rss_mask && !eth_rx_queue_info->flow_id_mask;
                for (i = 0; i < num; i++) {
+                       struct rte_event *ev;
+
                        m = mbufs[i];
+                       ev = &buf->events[new_tail];
 
                        rss = do_rss ? rxa_do_softrss(m, rx_adapter->rss_key_be)
                                     : m->hash.rss;
@@ -881,7 +907,7 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
                        ev->flow_id = (rss & ~flow_id_mask) |
                                      (ev->flow_id & flow_id_mask);
                        ev->mbuf = m;
-                       ev++;
+                       new_tail++;
                }
        } else {
                num = rxa_create_event_vector(rx_adapter, eth_rx_queue_info,
@@ -892,9 +918,14 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
 
                dropped = 0;
                nb_cb = dev_info->cb_fn(eth_dev_id, rx_queue_id,
-                                       ETH_EVENT_BUFFER_SIZE, buf->count,
-                                       &buf->events[buf->count], num,
-                                       dev_info->cb_arg, &dropped);
+                                      buf->last |
+                                      (RTE_DIM(buf->events) & ~buf->last_mask),
+                                      buf->count >= BATCH_SIZE ?
+                                               buf->count - BATCH_SIZE : 0,
+                                      &buf->events[buf->tail],
+                                      num,
+                                      dev_info->cb_arg,
+                                      &dropped);
                if (unlikely(nb_cb > num))
                        RTE_EDEV_LOG_ERR("Rx CB returned %d (> %d) events",
                                nb_cb, num);
@@ -905,6 +936,27 @@ rxa_buffer_mbufs(struct rte_event_eth_rx_adapter *rx_adapter,
        }
 
        buf->count += num;
+       buf->tail += num;
+}
+
+static inline bool
+rxa_pkt_buf_available(struct rte_eth_event_enqueue_buffer *buf)
+{
+       uint32_t nb_req = buf->tail + BATCH_SIZE;
+
+       if (!buf->last) {
+               if (nb_req <= RTE_DIM(buf->events))
+                       return true;
+
+               if (buf->head >= BATCH_SIZE) {
+                       buf->last_mask = ~0;
+                       buf->last = buf->tail;
+                       buf->tail = 0;
+                       return true;
+               }
+       }
+
+       return nb_req <= buf->head;
 }
 
 /* Enqueue packets from  <port, q>  to event buffer */
@@ -929,7 +981,7 @@ rxa_eth_rx(struct rte_event_eth_rx_adapter *rx_adapter,
        /* Don't do a batch dequeue from the rx queue if there isn't
         * enough space in the enqueue buffer.
         */
-       while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
+       while (rxa_pkt_buf_available(buf)) {
                if (buf->count >= BATCH_SIZE)
                        rxa_flush_event_buffer(rx_adapter);
 
@@ -1090,7 +1142,7 @@ rxa_intr_ring_dequeue(struct rte_event_eth_rx_adapter *rx_adapter)
        if (buf->count >= BATCH_SIZE)
                rxa_flush_event_buffer(rx_adapter);
 
-       while (BATCH_SIZE <= (RTE_DIM(buf->events) - buf->count)) {
+       while (rxa_pkt_buf_available(buf)) {
                struct eth_device_info *dev_info;
                uint16_t port;
                uint16_t queue;
@@ -1211,7 +1263,7 @@ rxa_poll(struct rte_event_eth_rx_adapter *rx_adapter)
                 */
                if (buf->count >= BATCH_SIZE)
                        rxa_flush_event_buffer(rx_adapter);
-               if (BATCH_SIZE > (ETH_EVENT_BUFFER_SIZE - buf->count)) {
+               if (!rxa_pkt_buf_available(buf)) {
                        rx_adapter->wrr_pos = wrr_pos;
                        return nb_rx;
                }