net/af_xdp: allow bigger batch sizes
authorCiara Loftus <ciara.loftus@intel.com>
Wed, 10 Mar 2021 07:48:14 +0000 (07:48 +0000)
committerFerruh Yigit <ferruh.yigit@intel.com>
Wed, 10 Mar 2021 17:49:32 +0000 (18:49 +0100)
Prior to this commit, the maximum batch sizes for zero-copy and
copy-mode rx and copy-mode tx were set to 32. Apart from zero-copy tx,
the user could never rx/tx any more than 32 packets at a time and
without inspecting the code the user wouldn't be aware of this.

This commit removes these upper limits placed on the user and instead
sets an internal batch size equal to the default ring size (2048).
Batches larger than this are still processed, however they are split
into smaller batches similar to how it's done in other drivers. This is
necessary because some arrays used during rx/tx need to be sized at
compile-time.

Allowing a larger batch size allows for fewer batches and thus larger
bulk operations, fewer ring accesses and fewer syscalls which should
yield improved performance.

Signed-off-by: Ciara Loftus <ciara.loftus@intel.com>
Reviewed-by: Ferruh Yigit <ferruh.yigit@intel.com>
drivers/net/af_xdp/rte_eth_af_xdp.c

index 3957227..be524e4 100644 (file)
@@ -66,8 +66,8 @@ RTE_LOG_REGISTER(af_xdp_logtype, pmd.net.af_xdp, NOTICE);
 #define ETH_AF_XDP_DFLT_START_QUEUE_IDX        0
 #define ETH_AF_XDP_DFLT_QUEUE_COUNT    1
 
-#define ETH_AF_XDP_RX_BATCH_SIZE       32
-#define ETH_AF_XDP_TX_BATCH_SIZE       32
+#define ETH_AF_XDP_RX_BATCH_SIZE       XSK_RING_CONS__DEFAULT_NUM_DESCS
+#define ETH_AF_XDP_TX_BATCH_SIZE       XSK_RING_CONS__DEFAULT_NUM_DESCS
 
 
 struct xsk_umem_info {
@@ -329,8 +329,7 @@ af_xdp_rx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
        struct rte_mbuf *mbufs[ETH_AF_XDP_RX_BATCH_SIZE];
 
        if (xsk_prod_nb_free(fq, free_thresh) >= free_thresh)
-               (void)reserve_fill_queue(umem, ETH_AF_XDP_RX_BATCH_SIZE,
-                                        NULL, fq);
+               (void)reserve_fill_queue(umem, nb_pkts, NULL, fq);
 
        nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
        if (nb_pkts == 0) {
@@ -379,10 +378,8 @@ af_xdp_rx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 #endif
 
 static uint16_t
-eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
+af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 {
-       nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_RX_BATCH_SIZE);
-
 #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
        return af_xdp_rx_zc(queue, bufs, nb_pkts);
 #else
@@ -390,6 +387,32 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 #endif
 }
 
+static uint16_t
+eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
+{
+       uint16_t nb_rx;
+
+       if (likely(nb_pkts <= ETH_AF_XDP_RX_BATCH_SIZE))
+               return af_xdp_rx(queue, bufs, nb_pkts);
+
+       /* Split larger batch into smaller batches of size
+        * ETH_AF_XDP_RX_BATCH_SIZE or less.
+        */
+       nb_rx = 0;
+       while (nb_pkts) {
+               uint16_t ret, n;
+
+               n = (uint16_t)RTE_MIN(nb_pkts, ETH_AF_XDP_RX_BATCH_SIZE);
+               ret = af_xdp_rx(queue, &bufs[nb_rx], n);
+               nb_rx = (uint16_t)(nb_rx + ret);
+               nb_pkts = (uint16_t)(nb_pkts - ret);
+               if (ret < n)
+                       break;
+       }
+
+       return nb_rx;
+}
+
 static void
 pull_umem_cq(struct xsk_umem_info *umem, int size, struct xsk_ring_cons *cq)
 {
@@ -535,8 +558,6 @@ af_xdp_tx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
        uint32_t idx_tx;
        struct xsk_ring_cons *cq = &txq->pair->cq;
 
-       nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE);
-
        pull_umem_cq(umem, nb_pkts, cq);
 
        nb_pkts = rte_ring_dequeue_bulk(umem->buf_ring, addrs,
@@ -575,6 +596,32 @@ af_xdp_tx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 
        return nb_pkts;
 }
+
+static uint16_t
+af_xdp_tx_cp_batch(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
+{
+       uint16_t nb_tx;
+
+       if (likely(nb_pkts <= ETH_AF_XDP_TX_BATCH_SIZE))
+               return af_xdp_tx_cp(queue, bufs, nb_pkts);
+
+       nb_tx = 0;
+       while (nb_pkts) {
+               uint16_t ret, n;
+
+               /* Split larger batch into smaller batches of size
+                * ETH_AF_XDP_TX_BATCH_SIZE or less.
+                */
+               n = (uint16_t)RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE);
+               ret = af_xdp_tx_cp(queue, &bufs[nb_tx], n);
+               nb_tx = (uint16_t)(nb_tx + ret);
+               nb_pkts = (uint16_t)(nb_pkts - ret);
+               if (ret < n)
+                       break;
+       }
+
+       return nb_tx;
+}
 #endif
 
 static uint16_t
@@ -583,7 +630,7 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
        return af_xdp_tx_zc(queue, bufs, nb_pkts);
 #else
-       return af_xdp_tx_cp(queue, bufs, nb_pkts);
+       return af_xdp_tx_cp_batch(queue, bufs, nb_pkts);
 #endif
 }