net/af_xdp: optimize Rx mbuf allocation
authorRongQing Li <lirongqing@baidu.com>
Wed, 25 Nov 2020 11:01:32 +0000 (19:01 +0800)
committerFerruh Yigit <ferruh.yigit@intel.com>
Fri, 8 Jan 2021 15:03:04 +0000 (16:03 +0100)
While receiving packets, the max bunch number of mbufs are allocated
and if hardware does not receive the max bunch number packets, it
will free redundancy mbufs, this is low performance.

So optimize Rx performance, by allocating number of mbuf based on
result of xsk_ring_cons__peek, to avoid to redundancy allocation,
and free mbuf when receive packets.

And Rx cached_cons must be roll backed if fails to allocate mbuf.

Signed-off-by: RongQing Li <lirongqing@baidu.com>
Signed-off-by: Dongsheng Rong <rongdongsheng@baidu.com>
Acked-by: Ciara Loftus <ciara.loftus@intel.com>
drivers/net/af_xdp/rte_eth_af_xdp.c

index 7fc70df..703b741 100644 (file)
@@ -255,28 +255,32 @@ af_xdp_rx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
        struct xsk_umem_info *umem = rxq->umem;
        uint32_t idx_rx = 0;
        unsigned long rx_bytes = 0;
-       int rcvd, i;
+       int i;
        struct rte_mbuf *fq_bufs[ETH_AF_XDP_RX_BATCH_SIZE];
 
-       /* allocate bufs for fill queue replenishment after rx */
-       if (rte_pktmbuf_alloc_bulk(umem->mb_pool, fq_bufs, nb_pkts)) {
-               AF_XDP_LOG(DEBUG,
-                       "Failed to get enough buffers for fq.\n");
-               return 0;
-       }
+       nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
 
-       rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
-
-       if (rcvd == 0) {
+       if (nb_pkts == 0) {
 #if defined(XDP_USE_NEED_WAKEUP)
                if (xsk_ring_prod__needs_wakeup(fq))
                        (void)poll(rxq->fds, 1, 1000);
 #endif
 
-               goto out;
+               return 0;
+       }
+
+       /* allocate bufs for fill queue replenishment after rx */
+       if (rte_pktmbuf_alloc_bulk(umem->mb_pool, fq_bufs, nb_pkts)) {
+               AF_XDP_LOG(DEBUG,
+                       "Failed to get enough buffers for fq.\n");
+               /* rollback cached_cons which is added by
+                * xsk_ring_cons__peek
+                */
+               rx->cached_cons -= nb_pkts;
+               return 0;
        }
 
-       for (i = 0; i < rcvd; i++) {
+       for (i = 0; i < nb_pkts; i++) {
                const struct xdp_desc *desc;
                uint64_t addr;
                uint32_t len;
@@ -301,20 +305,14 @@ af_xdp_rx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
                rx_bytes += len;
        }
 
-       xsk_ring_cons__release(rx, rcvd);
-
-       (void)reserve_fill_queue(umem, rcvd, fq_bufs, fq);
+       xsk_ring_cons__release(rx, nb_pkts);
+       (void)reserve_fill_queue(umem, nb_pkts, fq_bufs, fq);
 
        /* statistics */
-       rxq->stats.rx_pkts += rcvd;
+       rxq->stats.rx_pkts += nb_pkts;
        rxq->stats.rx_bytes += rx_bytes;
 
-out:
-       if (rcvd != nb_pkts)
-               rte_mempool_put_bulk(umem->mb_pool, (void **)&fq_bufs[rcvd],
-                                    nb_pkts - rcvd);
-
-       return rcvd;
+       return nb_pkts;
 }
 #else
 static uint16_t
@@ -326,7 +324,7 @@ af_xdp_rx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
        struct xsk_ring_prod *fq = &rxq->fq;
        uint32_t idx_rx = 0;
        unsigned long rx_bytes = 0;
-       int rcvd, i;
+       int i;
        uint32_t free_thresh = fq->size >> 1;
        struct rte_mbuf *mbufs[ETH_AF_XDP_RX_BATCH_SIZE];
 
@@ -334,20 +332,24 @@ af_xdp_rx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
                (void)reserve_fill_queue(umem, ETH_AF_XDP_RX_BATCH_SIZE,
                                         NULL, fq);
 
-       if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts) != 0))
-               return 0;
-
-       rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
-       if (rcvd == 0) {
+       nb_pkts = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
+       if (nb_pkts == 0) {
 #if defined(XDP_USE_NEED_WAKEUP)
                if (xsk_ring_prod__needs_wakeup(fq))
                        (void)poll(rxq->fds, 1, 1000);
 #endif
+               return 0;
+       }
 
-               goto out;
+       if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts))) {
+               /* rollback cached_cons which is added by
+                * xsk_ring_cons__peek
+                */
+               rx->cached_cons -= nb_pkts;
+               return 0;
        }
 
-       for (i = 0; i < rcvd; i++) {
+       for (i = 0; i < nb_pkts; i++) {
                const struct xdp_desc *desc;
                uint64_t addr;
                uint32_t len;
@@ -366,18 +368,13 @@ af_xdp_rx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
                bufs[i] = mbufs[i];
        }
 
-       xsk_ring_cons__release(rx, rcvd);
+       xsk_ring_cons__release(rx, nb_pkts);
 
        /* statistics */
-       rxq->stats.rx_pkts += rcvd;
+       rxq->stats.rx_pkts += nb_pkts;
        rxq->stats.rx_bytes += rx_bytes;
 
-out:
-       if (rcvd != nb_pkts)
-               rte_mempool_put_bulk(rxq->mb_pool, (void **)&mbufs[rcvd],
-                                    nb_pkts - rcvd);
-
-       return rcvd;
+       return nb_pkts;
 }
 #endif