net/af_xdp: improve packet loss
authorCiara Loftus <ciara.loftus@intel.com>
Tue, 23 Jun 2020 14:29:25 +0000 (14:29 +0000)
committerFerruh Yigit <ferruh.yigit@intel.com>
Tue, 30 Jun 2020 14:24:37 +0000 (16:24 +0200)
This commit makes some changes to the AF_XDP PMD in an effort to improve
its packet loss characteristics.

1. In the case of failed transmission due to inability to reserve a tx
   descriptor, the PMD now pulls from the completion ring, issues a
   syscall in which the kernel attempts to complete outstanding tx
   operations, then tries to reserve the tx descriptor again. Prior to
   this we dropped the packet after the syscall and didn't try to
   re-reserve.

2. During completion ring cleanup, always pull as many entries as
   possible from the ring as opposed to the batch size or just how many
   packets we're going to attempt to send. Keeping the completion ring
   emptier should reduce failed transmissions in the kernel, as the
   kernel requires space in the completion ring to successfully tx.

3. Size the fill ring as twice the receive ring size which may help
   reduce allocation failures in the driver.

4. Emulate a tx_free_thresh - when the number of available entries in
   the completion ring rises above this, we pull from it. The threshold
   is set to 1k entries.

With these changes, a benchmark which measured the packet rate at which
0.01% packet loss could be reached improved from ~0.1G to ~3Gbps.

Signed-off-by: Ciara Loftus <ciara.loftus@intel.com>
Acked-by: Xiaolong Ye <xiaolong.ye@intel.com>
drivers/net/af_xdp/rte_eth_af_xdp.c

index 06124ba..2d69221 100644 (file)
@@ -396,6 +396,8 @@ kick_tx(struct pkt_tx_queue *txq)
 {
        struct xsk_umem_info *umem = txq->umem;
 
+       pull_umem_cq(umem, XSK_RING_CONS__DEFAULT_NUM_DESCS);
+
 #if defined(XDP_USE_NEED_WAKEUP)
        if (xsk_ring_prod__needs_wakeup(&txq->tx))
 #endif
@@ -407,11 +409,9 @@ kick_tx(struct pkt_tx_queue *txq)
 
                        /* pull from completion queue to leave more space */
                        if (errno == EAGAIN)
-                               pull_umem_cq(umem, ETH_AF_XDP_TX_BATCH_SIZE);
+                               pull_umem_cq(umem,
+                                            XSK_RING_CONS__DEFAULT_NUM_DESCS);
                }
-#ifndef XDP_UMEM_UNALIGNED_CHUNK_FLAG
-       pull_umem_cq(umem, ETH_AF_XDP_TX_BATCH_SIZE);
-#endif
 }
 
 #if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
@@ -427,8 +427,10 @@ af_xdp_tx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
        uint16_t count = 0;
        struct xdp_desc *desc;
        uint64_t addr, offset;
+       uint32_t free_thresh = umem->cq.size >> 1;
 
-       pull_umem_cq(umem, nb_pkts);
+       if (xsk_cons_nb_avail(&umem->cq, free_thresh) >= free_thresh)
+               pull_umem_cq(umem, XSK_RING_CONS__DEFAULT_NUM_DESCS);
 
        for (i = 0; i < nb_pkts; i++) {
                mbuf = bufs[i];
@@ -436,7 +438,9 @@ af_xdp_tx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
                if (mbuf->pool == umem->mb_pool) {
                        if (!xsk_ring_prod__reserve(&txq->tx, 1, &idx_tx)) {
                                kick_tx(txq);
-                               goto out;
+                               if (!xsk_ring_prod__reserve(&txq->tx, 1,
+                                                           &idx_tx))
+                                       goto out;
                        }
                        desc = xsk_ring_prod__tx_desc(&txq->tx, idx_tx);
                        desc->len = mbuf->pkt_len;
@@ -758,7 +762,7 @@ xsk_umem_info *xdp_umem_configure(struct pmd_internals *internals __rte_unused,
        struct xsk_umem_info *umem;
        int ret;
        struct xsk_umem_config usr_config = {
-               .fill_size = ETH_AF_XDP_DFLT_NUM_DESCS,
+               .fill_size = ETH_AF_XDP_DFLT_NUM_DESCS * 2,
                .comp_size = ETH_AF_XDP_DFLT_NUM_DESCS,
                .flags = XDP_UMEM_UNALIGNED_CHUNK_FLAG};
        void *base_addr = NULL;
@@ -867,7 +871,7 @@ xsk_configure(struct pmd_internals *internals, struct pkt_rx_queue *rxq,
        struct xsk_socket_config cfg;
        struct pkt_tx_queue *txq = rxq->pair;
        int ret = 0;
-       int reserve_size = ETH_AF_XDP_DFLT_NUM_DESCS / 2;
+       int reserve_size = ETH_AF_XDP_DFLT_NUM_DESCS;
        struct rte_mbuf *fq_bufs[reserve_size];
 
        rxq->umem = xdp_umem_configure(internals, rxq);