From: Ciara Loftus Date: Wed, 10 Mar 2021 07:48:14 +0000 (+0000) Subject: net/af_xdp: allow bigger batch sizes X-Git-Url: http://git.droids-corp.org/?a=commitdiff_plain;h=d96394ea263cd2657610561d85c37a793ded573b;p=dpdk.git net/af_xdp: allow bigger batch sizes Prior to this commit, the maximum batch sizes for zero-copy and copy-mode rx and copy-mode tx were set to 32. Apart from zero-copy tx, the user could never rx/tx any more than 32 packets at a time and without inspecting the code the user wouldn't be aware of this. This commit removes these upper limits placed on the user and instead sets an internal batch size equal to the default ring size (2048). Batches larger than this are still processed, however they are split into smaller batches similar to how it's done in other drivers. This is necessary because some arrays used during rx/tx need to be sized at compile-time. Allowing a larger batch size allows for fewer batches and thus larger bulk operations, fewer ring accesses and fewer syscalls which should yield improved performance. Signed-off-by: Ciara Loftus Reviewed-by: Ferruh Yigit --- diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c b/drivers/net/af_xdp/rte_eth_af_xdp.c index 3957227bf0..be524e4784 100644 --- a/drivers/net/af_xdp/rte_eth_af_xdp.c +++ b/drivers/net/af_xdp/rte_eth_af_xdp.c @@ -66,8 +66,8 @@ RTE_LOG_REGISTER(af_xdp_logtype, pmd.net.af_xdp, NOTICE); #define ETH_AF_XDP_DFLT_START_QUEUE_IDX 0 #define ETH_AF_XDP_DFLT_QUEUE_COUNT 1 -#define ETH_AF_XDP_RX_BATCH_SIZE 32 -#define ETH_AF_XDP_TX_BATCH_SIZE 32 +#define ETH_AF_XDP_RX_BATCH_SIZE XSK_RING_CONS__DEFAULT_NUM_DESCS +#define ETH_AF_XDP_TX_BATCH_SIZE XSK_RING_CONS__DEFAULT_NUM_DESCS struct xsk_umem_info { @@ -329,8 +329,7 @@ af_xdp_rx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) struct rte_mbuf *mbufs[ETH_AF_XDP_RX_BATCH_SIZE]; if (xsk_prod_nb_free(fq, free_thresh) >= free_thresh) - (void)reserve_fill_queue(umem, ETH_AF_XDP_RX_BATCH_SIZE, - NULL, fq); + (void)reserve_fill_queue(umem, nb_pkts, NULL, fq); nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx); if (nb_pkts == 0) { @@ -379,10 +378,8 @@ af_xdp_rx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) #endif static uint16_t -eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) +af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) { - nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_RX_BATCH_SIZE); - #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG) return af_xdp_rx_zc(queue, bufs, nb_pkts); #else @@ -390,6 +387,32 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) #endif } +static uint16_t +eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) +{ + uint16_t nb_rx; + + if (likely(nb_pkts <= ETH_AF_XDP_RX_BATCH_SIZE)) + return af_xdp_rx(queue, bufs, nb_pkts); + + /* Split larger batch into smaller batches of size + * ETH_AF_XDP_RX_BATCH_SIZE or less. + */ + nb_rx = 0; + while (nb_pkts) { + uint16_t ret, n; + + n = (uint16_t)RTE_MIN(nb_pkts, ETH_AF_XDP_RX_BATCH_SIZE); + ret = af_xdp_rx(queue, &bufs[nb_rx], n); + nb_rx = (uint16_t)(nb_rx + ret); + nb_pkts = (uint16_t)(nb_pkts - ret); + if (ret < n) + break; + } + + return nb_rx; +} + static void pull_umem_cq(struct xsk_umem_info *umem, int size, struct xsk_ring_cons *cq) { @@ -535,8 +558,6 @@ af_xdp_tx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) uint32_t idx_tx; struct xsk_ring_cons *cq = &txq->pair->cq; - nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE); - pull_umem_cq(umem, nb_pkts, cq); nb_pkts = rte_ring_dequeue_bulk(umem->buf_ring, addrs, @@ -575,6 +596,32 @@ af_xdp_tx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) return nb_pkts; } + +static uint16_t +af_xdp_tx_cp_batch(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) +{ + uint16_t nb_tx; + + if (likely(nb_pkts <= ETH_AF_XDP_TX_BATCH_SIZE)) + return af_xdp_tx_cp(queue, bufs, nb_pkts); + + nb_tx = 0; + while (nb_pkts) { + uint16_t ret, n; + + /* Split larger batch into smaller batches of size + * ETH_AF_XDP_TX_BATCH_SIZE or less. + */ + n = (uint16_t)RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE); + ret = af_xdp_tx_cp(queue, &bufs[nb_tx], n); + nb_tx = (uint16_t)(nb_tx + ret); + nb_pkts = (uint16_t)(nb_pkts - ret); + if (ret < n) + break; + } + + return nb_tx; +} #endif static uint16_t @@ -583,7 +630,7 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG) return af_xdp_tx_zc(queue, bufs, nb_pkts); #else - return af_xdp_tx_cp(queue, bufs, nb_pkts); + return af_xdp_tx_cp_batch(queue, bufs, nb_pkts); #endif }