From: Xiaolong Ye Date: Wed, 17 Apr 2019 13:49:45 +0000 (+0800) Subject: net/af_xdp: make reserve/submit peek/release consistent X-Git-Url: http://git.droids-corp.org/?a=commitdiff_plain;h=10edf857fde4;p=dpdk.git net/af_xdp: make reserve/submit peek/release consistent As David pointed out, if we reserve N slots for Tx, but only submit n slots, we would end up with an incorrect opinion of the number of available slots later, we also would get wrong idx when we call xsk_ring_prod__reserve next time. It also applies to xsk_ring_cons__peek()/xsk_ring_cons__release(). This patch ensures that both reserve/submit and peek/release are consistent. Fixes: f1debd77efaf ("net/af_xdp: introduce AF_XDP PMD") Suggested-by: David Marchand Signed-off-by: Xiaolong Ye Reviewed-by: David Marchand --- diff --git a/drivers/net/af_xdp/rte_eth_af_xdp.c b/drivers/net/af_xdp/rte_eth_af_xdp.c index c46916bbee..6a00965231 100644 --- a/drivers/net/af_xdp/rte_eth_af_xdp.c +++ b/drivers/net/af_xdp/rte_eth_af_xdp.c @@ -134,30 +134,34 @@ static const struct rte_eth_link pmd_link = { }; static inline int -reserve_fill_queue(struct xsk_umem_info *umem, int reserve_size) +reserve_fill_queue(struct xsk_umem_info *umem, uint16_t reserve_size) { struct xsk_ring_prod *fq = &umem->fq; + void *addrs[reserve_size]; uint32_t idx; - int i, ret; + uint16_t i; + + if (rte_ring_dequeue_bulk(umem->buf_ring, addrs, reserve_size, NULL) + != reserve_size) { + AF_XDP_LOG(DEBUG, "Failed to get enough buffers for fq.\n"); + return -1; + } - ret = xsk_ring_prod__reserve(fq, reserve_size, &idx); - if (unlikely(!ret)) { - AF_XDP_LOG(ERR, "Failed to reserve enough fq descs.\n"); - return ret; + if (unlikely(!xsk_ring_prod__reserve(fq, reserve_size, &idx))) { + AF_XDP_LOG(DEBUG, "Failed to reserve enough fq descs.\n"); + rte_ring_enqueue_bulk(umem->buf_ring, addrs, + reserve_size, NULL); + return -1; } for (i = 0; i < reserve_size; i++) { __u64 *fq_addr; - void *addr = NULL; - if (rte_ring_dequeue(umem->buf_ring, &addr)) { - i--; - break; - } + fq_addr = xsk_ring_prod__fill_addr(fq, idx++); - *fq_addr = (uint64_t)addr; + *fq_addr = (uint64_t)addrs[i]; } - xsk_ring_prod__submit(fq, i); + xsk_ring_prod__submit(fq, reserve_size); return 0; } @@ -174,21 +178,20 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) struct rte_mbuf *mbufs[ETH_AF_XDP_TX_BATCH_SIZE]; unsigned long dropped = 0; unsigned long rx_bytes = 0; - uint16_t count = 0; int rcvd, i; nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE); + if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts) != 0)) + return 0; + rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx); if (rcvd == 0) - return 0; + goto out; if (xsk_prod_nb_free(fq, free_thresh) >= free_thresh) (void)reserve_fill_queue(umem, ETH_AF_XDP_RX_BATCH_SIZE); - if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, rcvd) != 0)) - return 0; - for (i = 0; i < rcvd; i++) { const struct xdp_desc *desc; uint64_t addr; @@ -204,7 +207,7 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) rte_pktmbuf_pkt_len(mbufs[i]) = len; rte_pktmbuf_data_len(mbufs[i]) = len; rx_bytes += len; - bufs[count++] = mbufs[i]; + bufs[i] = mbufs[i]; rte_ring_enqueue(umem->buf_ring, (void *)addr); } @@ -215,7 +218,12 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) rxq->stats.rx_pkts += (rcvd - dropped); rxq->stats.rx_bytes += rx_bytes; - return count; +out: + if (rcvd != nb_pkts) + rte_mempool_put_bulk(rxq->mb_pool, (void **)&mbufs[rcvd], + nb_pkts - rcvd); + + return rcvd; } static void @@ -262,7 +270,7 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) struct rte_mbuf *mbuf; void *addrs[ETH_AF_XDP_TX_BATCH_SIZE]; unsigned long tx_bytes = 0; - int i, valid = 0; + int i; uint32_t idx_tx; nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_TX_BATCH_SIZE); @@ -283,20 +291,18 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) for (i = 0; i < nb_pkts; i++) { struct xdp_desc *desc; void *pkt; - uint32_t buf_len = ETH_AF_XDP_FRAME_SIZE - - ETH_AF_XDP_DATA_HEADROOM; + desc = xsk_ring_prod__tx_desc(&txq->tx, idx_tx + i); mbuf = bufs[i]; - if (mbuf->pkt_len <= buf_len) { - desc->addr = (uint64_t)addrs[valid]; - desc->len = mbuf->pkt_len; - pkt = xsk_umem__get_data(umem->mz->addr, - desc->addr); - rte_memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *), - desc->len); - valid++; - tx_bytes += mbuf->pkt_len; - } + + desc->addr = (uint64_t)addrs[i]; + desc->len = mbuf->pkt_len; + pkt = xsk_umem__get_data(umem->mz->addr, + desc->addr); + rte_memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *), + desc->len); + tx_bytes += mbuf->pkt_len; + rte_pktmbuf_free(mbuf); } @@ -304,12 +310,7 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts) kick_tx(txq); - if (valid < nb_pkts) - rte_ring_enqueue_bulk(umem->buf_ring, &addrs[valid], - nb_pkts - valid, NULL); - - txq->stats.err_pkts += nb_pkts - valid; - txq->stats.tx_pkts += valid; + txq->stats.tx_pkts += nb_pkts; txq->stats.tx_bytes += tx_bytes; return nb_pkts;