net/af_xdp: support unaligned umem chunks
authorCiara Loftus <ciara.loftus@intel.com>
Mon, 30 Sep 2019 16:42:05 +0000 (16:42 +0000)
committerFerruh Yigit <ferruh.yigit@intel.com>
Wed, 23 Oct 2019 14:43:10 +0000 (16:43 +0200)
This patch enables the unaligned chunks feature for AF_XDP which allows
chunks to be placed at arbitrary places in the umem, as opposed to them
being required to be aligned to 2k. This allows for DPDK application
mempools to be mapped directly into the umem and in turn enable zero copy
transfer between umem and the PMD.

This patch replaces the zero copy via external mbuf mechanism introduced
in commit e9ff8bb71943 ("net/af_xdp: enable zero copy by external mbuf").
The pmd_zero copy vdev argument is also removed as now the PMD will
auto-detect presence of the unaligned chunks feature and enable it if so
and otherwise fall back to copy mode if not detected.

Signed-off-by: Ciara Loftus <ciara.loftus@intel.com>
Signed-off-by: Kevin Laatz <kevin.laatz@intel.com>
Reviewed-by: Xiaolong Ye <xiaolong.ye@intel.com>
doc/guides/nics/af_xdp.rst
doc/guides/rel_notes/release_19_11.rst
drivers/net/af_xdp/rte_eth_af_xdp.c

index ec46f08..b434b25 100644 (file)
@@ -35,7 +35,6 @@ The following options can be provided to set up an af_xdp port in DPDK.
 *   ``iface`` - name of the Kernel interface to attach to (required);
 *   ``start_queue`` - starting netdev queue id (optional, default 0);
 *   ``queue_count`` - total netdev queue number (optional, default 1);
-*   ``pmd_zero_copy`` - enable zero copy or not (optional, default 0);
 
 Prerequisites
 -------------
@@ -48,6 +47,7 @@ This is a Linux-specific PMD, thus the following prerequisites apply:
    <kernel src tree>/tools/lib/bpf;
 *  A Kernel bound interface to attach to;
 *  For need_wakeup feature, it requires kernel version later than v5.3-rc1;
+*  For PMD zero copy, it requires kernel version later than v5.4-rc1;
 
 Set up an af_xdp interface
 -----------------------------
index 9df7e6b..34a413d 100644 (file)
@@ -149,6 +149,13 @@ New Features
   * Added support for VLAN set VID offload command.
   * Added support for matching on packets withe Geneve tunnel header.
 
+* **Updated the AF_XDP PMD.**
+
+  Updated the AF_XDP PMD. The new features include:
+
+  * Enabled zero copy between application mempools and UMEM by enabling the
+    XDP_UMEM_UNALIGNED_CHUNKS UMEM flag.
+
 * **Added Marvell NITROX symmetric crypto PMD.**
 
   Added a symmetric crypto PMD for Marvell NITROX V security processor.
@@ -241,6 +248,8 @@ Removed Items
                                      ipv4_cksum|udp_cksum|tcp_cksum|timestamp|
                                      vlan_strip|vlan_filter|vlan_extend on|off
 
+* Removed AF_XDP pmd_zero copy vdev argument. Support is now auto-detected.
+
 
 API Changes
 -----------
index f9686c2..2b1245e 100644 (file)
@@ -58,7 +58,13 @@ static int af_xdp_logtype;
 
 #define ETH_AF_XDP_FRAME_SIZE          2048
 #define ETH_AF_XDP_NUM_BUFFERS         4096
+#ifdef XDP_UMEM_UNALIGNED_CHUNK_FLAG
+#define ETH_AF_XDP_MBUF_OVERHEAD       128 /* sizeof(struct rte_mbuf) */
+#define ETH_AF_XDP_DATA_HEADROOM \
+       (ETH_AF_XDP_MBUF_OVERHEAD + RTE_PKTMBUF_HEADROOM)
+#else
 #define ETH_AF_XDP_DATA_HEADROOM       0
+#endif
 #define ETH_AF_XDP_DFLT_NUM_DESCS      XSK_RING_CONS__DEFAULT_NUM_DESCS
 #define ETH_AF_XDP_DFLT_START_QUEUE_IDX        0
 #define ETH_AF_XDP_DFLT_QUEUE_COUNT    1
@@ -73,7 +79,8 @@ struct xsk_umem_info {
        struct xsk_umem *umem;
        struct rte_ring *buf_ring;
        const struct rte_memzone *mz;
-       int pmd_zc;
+       struct rte_mempool *mb_pool;
+       void *buffer;
 };
 
 struct rx_stats {
@@ -98,10 +105,12 @@ struct pkt_rx_queue {
 struct tx_stats {
        uint64_t tx_pkts;
        uint64_t tx_bytes;
+       uint64_t tx_dropped;
 };
 
 struct pkt_tx_queue {
        struct xsk_ring_prod tx;
+       struct xsk_umem_info *umem;
 
        struct tx_stats stats;
 
@@ -117,7 +126,6 @@ struct pmd_internals {
        int max_queue_cnt;
        int combined_queue_cnt;
 
-       int pmd_zc;
        struct rte_ether_addr eth_addr;
 
        struct pkt_rx_queue *rx_queues;
@@ -127,13 +135,11 @@ struct pmd_internals {
 #define ETH_AF_XDP_IFACE_ARG                   "iface"
 #define ETH_AF_XDP_START_QUEUE_ARG             "start_queue"
 #define ETH_AF_XDP_QUEUE_COUNT_ARG             "queue_count"
-#define ETH_AF_XDP_PMD_ZC_ARG                  "pmd_zero_copy"
 
 static const char * const valid_arguments[] = {
        ETH_AF_XDP_IFACE_ARG,
        ETH_AF_XDP_START_QUEUE_ARG,
        ETH_AF_XDP_QUEUE_COUNT_ARG,
-       ETH_AF_XDP_PMD_ZC_ARG,
        NULL
 };
 
@@ -144,8 +150,39 @@ static const struct rte_eth_link pmd_link = {
        .link_autoneg = ETH_LINK_AUTONEG
 };
 
+#if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
 static inline int
-reserve_fill_queue(struct xsk_umem_info *umem, uint16_t reserve_size)
+reserve_fill_queue_zc(struct xsk_umem_info *umem, uint16_t reserve_size,
+                     struct rte_mbuf **bufs)
+{
+       struct xsk_ring_prod *fq = &umem->fq;
+       uint32_t idx;
+       uint16_t i;
+
+       if (unlikely(!xsk_ring_prod__reserve(fq, reserve_size, &idx))) {
+               for (i = 0; i < reserve_size; i++)
+                       rte_pktmbuf_free(bufs[i]);
+               AF_XDP_LOG(DEBUG, "Failed to reserve enough fq descs.\n");
+               return -1;
+       }
+
+       for (i = 0; i < reserve_size; i++) {
+               __u64 *fq_addr;
+               uint64_t addr;
+
+               fq_addr = xsk_ring_prod__fill_addr(fq, idx++);
+               addr = (uint64_t)bufs[i] - (uint64_t)umem->buffer;
+               *fq_addr = addr;
+       }
+
+       xsk_ring_prod__submit(fq, reserve_size);
+
+       return 0;
+}
+#else
+static inline int
+reserve_fill_queue_cp(struct xsk_umem_info *umem, uint16_t reserve_size,
+                     struct rte_mbuf **bufs __rte_unused)
 {
        struct xsk_ring_prod *fq = &umem->fq;
        void *addrs[reserve_size];
@@ -176,32 +213,99 @@ reserve_fill_queue(struct xsk_umem_info *umem, uint16_t reserve_size)
 
        return 0;
 }
+#endif
 
-static void
-umem_buf_release_to_fq(void *addr, void *opaque)
+static inline int
+reserve_fill_queue(struct xsk_umem_info *umem, uint16_t reserve_size,
+                  struct rte_mbuf **bufs)
 {
-       struct xsk_umem_info *umem = (struct xsk_umem_info *)opaque;
-       uint64_t umem_addr = (uint64_t)addr - umem->mz->addr_64;
-
-       rte_ring_enqueue(umem->buf_ring, (void *)umem_addr);
+#if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
+       return reserve_fill_queue_zc(umem, reserve_size, bufs);
+#else
+       return reserve_fill_queue_cp(umem, reserve_size, bufs);
+#endif
 }
 
+#if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
 static uint16_t
-eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
+af_xdp_rx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 {
        struct pkt_rx_queue *rxq = queue;
        struct xsk_ring_cons *rx = &rxq->rx;
        struct xsk_umem_info *umem = rxq->umem;
-       struct xsk_ring_prod *fq = &umem->fq;
        uint32_t idx_rx = 0;
-       uint32_t free_thresh = fq->size >> 1;
-       int pmd_zc = umem->pmd_zc;
-       struct rte_mbuf *mbufs[ETH_AF_XDP_RX_BATCH_SIZE];
-       unsigned long dropped = 0;
        unsigned long rx_bytes = 0;
        int rcvd, i;
+       struct rte_mbuf *fq_bufs[ETH_AF_XDP_RX_BATCH_SIZE];
 
-       nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_RX_BATCH_SIZE);
+       /* allocate bufs for fill queue replenishment after rx */
+       if (rte_pktmbuf_alloc_bulk(umem->mb_pool, fq_bufs, nb_pkts)) {
+               AF_XDP_LOG(DEBUG,
+                       "Failed to get enough buffers for fq.\n");
+               return -1;
+       }
+
+       rcvd = xsk_ring_cons__peek(rx, nb_pkts, &idx_rx);
+
+       if (rcvd == 0) {
+#if defined(XDP_USE_NEED_WAKEUP)
+               if (xsk_ring_prod__needs_wakeup(&umem->fq))
+                       (void)poll(rxq->fds, 1, 1000);
+#endif
+
+               goto out;
+       }
+
+       for (i = 0; i < rcvd; i++) {
+               const struct xdp_desc *desc;
+               uint64_t addr;
+               uint32_t len;
+               uint64_t offset;
+
+               desc = xsk_ring_cons__rx_desc(rx, idx_rx++);
+               addr = desc->addr;
+               len = desc->len;
+
+               offset = xsk_umem__extract_offset(addr);
+               addr = xsk_umem__extract_addr(addr);
+
+               bufs[i] = (struct rte_mbuf *)
+                               xsk_umem__get_data(umem->buffer, addr);
+               bufs[i]->data_off = offset - sizeof(struct rte_mbuf);
+
+               rte_pktmbuf_pkt_len(bufs[i]) = len;
+               rte_pktmbuf_data_len(bufs[i]) = len;
+               rx_bytes += len;
+       }
+
+       xsk_ring_cons__release(rx, rcvd);
+
+       (void)reserve_fill_queue(umem, rcvd, fq_bufs);
+
+       /* statistics */
+       rxq->stats.rx_pkts += rcvd;
+       rxq->stats.rx_bytes += rx_bytes;
+
+out:
+       if (rcvd != nb_pkts)
+               rte_mempool_put_bulk(umem->mb_pool, (void **)&fq_bufs[rcvd],
+                                    nb_pkts - rcvd);
+
+       return rcvd;
+}
+#else
+static uint16_t
+af_xdp_rx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
+{
+       struct pkt_rx_queue *rxq = queue;
+       struct xsk_ring_cons *rx = &rxq->rx;
+       struct xsk_umem_info *umem = rxq->umem;
+       struct xsk_ring_prod *fq = &umem->fq;
+       uint32_t idx_rx = 0;
+       unsigned long rx_bytes = 0;
+       int rcvd, i;
+       uint32_t free_thresh = fq->size >> 1;
+       struct rte_mbuf *mbufs[ETH_AF_XDP_RX_BATCH_SIZE];
 
        if (unlikely(rte_pktmbuf_alloc_bulk(rxq->mb_pool, mbufs, nb_pkts) != 0))
                return 0;
@@ -217,32 +321,21 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
        }
 
        if (xsk_prod_nb_free(fq, free_thresh) >= free_thresh)
-               (void)reserve_fill_queue(umem, ETH_AF_XDP_RX_BATCH_SIZE);
+               (void)reserve_fill_queue(umem, ETH_AF_XDP_RX_BATCH_SIZE, NULL);
 
        for (i = 0; i < rcvd; i++) {
                const struct xdp_desc *desc;
                uint64_t addr;
                uint32_t len;
                void *pkt;
-               uint16_t buf_len = ETH_AF_XDP_FRAME_SIZE;
-               struct rte_mbuf_ext_shared_info *shinfo;
 
                desc = xsk_ring_cons__rx_desc(rx, idx_rx++);
                addr = desc->addr;
                len = desc->len;
                pkt = xsk_umem__get_data(rxq->umem->mz->addr, addr);
 
-               if (pmd_zc) {
-                       shinfo = rte_pktmbuf_ext_shinfo_init_helper(pkt,
-                                       &buf_len, umem_buf_release_to_fq, umem);
-
-                       rte_pktmbuf_attach_extbuf(mbufs[i], pkt, 0, buf_len,
-                                                 shinfo);
-               } else {
-                       rte_memcpy(rte_pktmbuf_mtod(mbufs[i], void *),
-                                                       pkt, len);
-                       rte_ring_enqueue(umem->buf_ring, (void *)addr);
-               }
+               rte_memcpy(rte_pktmbuf_mtod(mbufs[i], void *), pkt, len);
+               rte_ring_enqueue(umem->buf_ring, (void *)addr);
                rte_pktmbuf_pkt_len(mbufs[i]) = len;
                rte_pktmbuf_data_len(mbufs[i]) = len;
                rx_bytes += len;
@@ -252,7 +345,7 @@ eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
        xsk_ring_cons__release(rx, rcvd);
 
        /* statistics */
-       rxq->stats.rx_pkts += (rcvd - dropped);
+       rxq->stats.rx_pkts += rcvd;
        rxq->stats.rx_bytes += rx_bytes;
 
 out:
@@ -262,6 +355,19 @@ out:
 
        return rcvd;
 }
+#endif
+
+static uint16_t
+eth_af_xdp_rx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
+{
+       nb_pkts = RTE_MIN(nb_pkts, ETH_AF_XDP_RX_BATCH_SIZE);
+
+#if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
+       return af_xdp_rx_zc(queue, bufs, nb_pkts);
+#else
+       return af_xdp_rx_cp(queue, bufs, nb_pkts);
+#endif
+}
 
 static void
 pull_umem_cq(struct xsk_umem_info *umem, int size)
@@ -275,7 +381,13 @@ pull_umem_cq(struct xsk_umem_info *umem, int size)
        for (i = 0; i < n; i++) {
                uint64_t addr;
                addr = *xsk_ring_cons__comp_addr(cq, idx_cq++);
+#if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
+               addr = xsk_umem__extract_addr(addr);
+               rte_pktmbuf_free((struct rte_mbuf *)
+                                       xsk_umem__get_data(umem->buffer, addr));
+#else
                rte_ring_enqueue(umem->buf_ring, (void *)addr);
+#endif
        }
 
        xsk_ring_cons__release(cq, n);
@@ -284,7 +396,7 @@ pull_umem_cq(struct xsk_umem_info *umem, int size)
 static void
 kick_tx(struct pkt_tx_queue *txq)
 {
-       struct xsk_umem_info *umem = txq->pair->umem;
+       struct xsk_umem_info *umem = txq->umem;
 
 #if defined(XDP_USE_NEED_WAKEUP)
        if (xsk_ring_prod__needs_wakeup(&txq->tx))
@@ -299,24 +411,96 @@ kick_tx(struct pkt_tx_queue *txq)
                        if (errno == EAGAIN)
                                pull_umem_cq(umem, ETH_AF_XDP_TX_BATCH_SIZE);
                }
+#ifndef XDP_UMEM_UNALIGNED_CHUNK_FLAG
        pull_umem_cq(umem, ETH_AF_XDP_TX_BATCH_SIZE);
+#endif
 }
 
-static inline bool
-in_umem_range(struct xsk_umem_info *umem, uint64_t addr)
+#if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
+static uint16_t
+af_xdp_tx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 {
-       uint64_t mz_base_addr = umem->mz->addr_64;
+       struct pkt_tx_queue *txq = queue;
+       struct xsk_umem_info *umem = txq->umem;
+       struct rte_mbuf *mbuf;
+       unsigned long tx_bytes = 0;
+       int i;
+       uint32_t idx_tx;
+       uint16_t count = 0;
+       struct xdp_desc *desc;
+       uint64_t addr, offset;
 
-       return addr >= mz_base_addr && addr < mz_base_addr + umem->mz->len;
-}
+       pull_umem_cq(umem, nb_pkts);
+
+       for (i = 0; i < nb_pkts; i++) {
+               mbuf = bufs[i];
+
+               if (mbuf->pool == umem->mb_pool) {
+                       if (!xsk_ring_prod__reserve(&txq->tx, 1, &idx_tx)) {
+                               kick_tx(txq);
+                               goto out;
+                       }
+                       desc = xsk_ring_prod__tx_desc(&txq->tx, idx_tx);
+                       desc->len = mbuf->pkt_len;
+                       addr = (uint64_t)mbuf - (uint64_t)umem->buffer;
+                       offset = rte_pktmbuf_mtod(mbuf, uint64_t) -
+                                       (uint64_t)mbuf;
+                       offset = offset << XSK_UNALIGNED_BUF_OFFSET_SHIFT;
+                       desc->addr = addr | offset;
+                       count++;
+               } else {
+                       struct rte_mbuf *local_mbuf =
+                                       rte_pktmbuf_alloc(umem->mb_pool);
+                       void *pkt;
+
+                       if (local_mbuf == NULL)
+                               goto out;
+
+                       if (!xsk_ring_prod__reserve(&txq->tx, 1, &idx_tx)) {
+                               rte_pktmbuf_free(local_mbuf);
+                               kick_tx(txq);
+                               goto out;
+                       }
+
+                       desc = xsk_ring_prod__tx_desc(&txq->tx, idx_tx);
+                       desc->len = mbuf->pkt_len;
+
+                       addr = (uint64_t)local_mbuf - (uint64_t)umem->buffer;
+                       offset = rte_pktmbuf_mtod(local_mbuf, uint64_t) -
+                                       (uint64_t)local_mbuf;
+                       pkt = xsk_umem__get_data(umem->buffer, addr + offset);
+                       offset = offset << XSK_UNALIGNED_BUF_OFFSET_SHIFT;
+                       desc->addr = addr | offset;
+                       rte_memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *),
+                                       desc->len);
+                       rte_pktmbuf_free(mbuf);
+                       count++;
+               }
 
+               tx_bytes += mbuf->pkt_len;
+       }
+
+#if defined(XDP_USE_NEED_WAKEUP)
+       if (xsk_ring_prod__needs_wakeup(&txq->tx))
+#endif
+               kick_tx(txq);
+
+out:
+       xsk_ring_prod__submit(&txq->tx, count);
+
+       txq->stats.tx_pkts += count;
+       txq->stats.tx_bytes += tx_bytes;
+       txq->stats.tx_dropped += nb_pkts - count;
+
+       return count;
+}
+#else
 static uint16_t
-eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
+af_xdp_tx_cp(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
 {
        struct pkt_tx_queue *txq = queue;
-       struct xsk_umem_info *umem = txq->pair->umem;
+       struct xsk_umem_info *umem = txq->umem;
        struct rte_mbuf *mbuf;
-       int pmd_zc = umem->pmd_zc;
        void *addrs[ETH_AF_XDP_TX_BATCH_SIZE];
        unsigned long tx_bytes = 0;
        int i;
@@ -345,24 +529,12 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
                mbuf = bufs[i];
                desc->len = mbuf->pkt_len;
 
-               /*
-                * We need to make sure the external mbuf address is within
-                * current port's umem memzone range
-                */
-               if (pmd_zc && RTE_MBUF_HAS_EXTBUF(mbuf) &&
-                               in_umem_range(umem, (uint64_t)mbuf->buf_addr)) {
-                       desc->addr = (uint64_t)mbuf->buf_addr -
-                               umem->mz->addr_64;
-                       mbuf->buf_addr = xsk_umem__get_data(umem->mz->addr,
-                                       (uint64_t)addrs[i]);
-               } else {
-                       desc->addr = (uint64_t)addrs[i];
-                       pkt = xsk_umem__get_data(umem->mz->addr,
-                                       desc->addr);
-                       rte_memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *),
-                                       desc->len);
-               }
+               desc->addr = (uint64_t)addrs[i];
+               pkt = xsk_umem__get_data(umem->mz->addr,
+                                        desc->addr);
+               rte_memcpy(pkt, rte_pktmbuf_mtod(mbuf, void *), desc->len);
                tx_bytes += mbuf->pkt_len;
+               rte_pktmbuf_free(mbuf);
        }
 
        xsk_ring_prod__submit(&txq->tx, nb_pkts);
@@ -372,11 +544,19 @@ eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
        txq->stats.tx_pkts += nb_pkts;
        txq->stats.tx_bytes += tx_bytes;
 
-       for (i = 0; i < nb_pkts; i++)
-               rte_pktmbuf_free(bufs[i]);
-
        return nb_pkts;
 }
+#endif
+
+static uint16_t
+eth_af_xdp_tx(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
+{
+#if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
+       return af_xdp_tx_zc(queue, bufs, nb_pkts);
+#else
+       return af_xdp_tx_cp(queue, bufs, nb_pkts);
+#endif
+}
 
 static int
 eth_dev_start(struct rte_eth_dev *dev)
@@ -448,6 +628,7 @@ eth_stats_get(struct rte_eth_dev *dev, struct rte_eth_stats *stats)
                stats->ipackets += stats->q_ipackets[i];
                stats->ibytes += stats->q_ibytes[i];
                stats->imissed += rxq->stats.rx_dropped;
+               stats->oerrors += txq->stats.tx_dropped;
                ret = getsockopt(xsk_socket__fd(rxq->xsk), SOL_XDP,
                                XDP_STATISTICS, &xdp_stats, &optlen);
                if (ret != 0) {
@@ -496,11 +677,16 @@ remove_xdp_program(struct pmd_internals *internals)
 static void
 xdp_umem_destroy(struct xsk_umem_info *umem)
 {
+#if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
+       rte_mempool_free(umem->mb_pool);
+       umem->mb_pool = NULL;
+#else
        rte_memzone_free(umem->mz);
        umem->mz = NULL;
 
        rte_ring_free(umem->buf_ring);
        umem->buf_ring = NULL;
+#endif
 
        rte_free(umem);
        umem = NULL;
@@ -550,6 +736,55 @@ eth_link_update(struct rte_eth_dev *dev __rte_unused,
        return 0;
 }
 
+#if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
+static inline uint64_t get_base_addr(struct rte_mempool *mp)
+{
+       struct rte_mempool_memhdr *memhdr;
+
+       memhdr = STAILQ_FIRST(&mp->mem_list);
+       return (uint64_t)memhdr->addr & ~(getpagesize() - 1);
+}
+
+static struct
+xsk_umem_info *xdp_umem_configure(struct pmd_internals *internals __rte_unused,
+                                 struct pkt_rx_queue *rxq)
+{
+       struct xsk_umem_info *umem;
+       int ret;
+       struct xsk_umem_config usr_config = {
+               .fill_size = ETH_AF_XDP_DFLT_NUM_DESCS,
+               .comp_size = ETH_AF_XDP_DFLT_NUM_DESCS,
+               .flags = XDP_UMEM_UNALIGNED_CHUNK_FLAG};
+       void *base_addr = NULL;
+       struct rte_mempool *mb_pool = rxq->mb_pool;
+
+       usr_config.frame_size = rte_pktmbuf_data_room_size(mb_pool) +
+                                       ETH_AF_XDP_MBUF_OVERHEAD +
+                                       mb_pool->private_data_size;
+       usr_config.frame_headroom = ETH_AF_XDP_DATA_HEADROOM +
+                                       mb_pool->private_data_size;
+
+       umem = rte_zmalloc_socket("umem", sizeof(*umem), 0, rte_socket_id());
+       if (umem == NULL) {
+               AF_XDP_LOG(ERR, "Failed to allocate umem info");
+               return NULL;
+       }
+
+       umem->mb_pool = mb_pool;
+       base_addr = (void *)get_base_addr(mb_pool);
+
+       ret = xsk_umem__create(&umem->umem, base_addr,
+                              mb_pool->populated_size * usr_config.frame_size,
+                              &umem->fq, &umem->cq,
+                              &usr_config);
+
+       if (ret) {
+               AF_XDP_LOG(ERR, "Failed to create umem");
+               goto err;
+       }
+       umem->buffer = base_addr;
+
+#else
 static struct
 xsk_umem_info *xdp_umem_configure(struct pmd_internals *internals,
                                  struct pkt_rx_queue *rxq)
@@ -610,6 +845,7 @@ xsk_umem_info *xdp_umem_configure(struct pmd_internals *internals,
        }
        umem->mz = mz;
 
+#endif
        return umem;
 
 err:
@@ -624,11 +860,13 @@ xsk_configure(struct pmd_internals *internals, struct pkt_rx_queue *rxq,
        struct xsk_socket_config cfg;
        struct pkt_tx_queue *txq = rxq->pair;
        int ret = 0;
-       int reserve_size;
+       int reserve_size = ETH_AF_XDP_DFLT_NUM_DESCS / 2;
+       struct rte_mbuf *fq_bufs[reserve_size];
 
        rxq->umem = xdp_umem_configure(internals, rxq);
        if (rxq->umem == NULL)
                return -ENOMEM;
+       txq->umem = rxq->umem;
 
        cfg.rx_size = ring_size;
        cfg.tx_size = ring_size;
@@ -648,8 +886,13 @@ xsk_configure(struct pmd_internals *internals, struct pkt_rx_queue *rxq,
                goto err;
        }
 
-       reserve_size = ETH_AF_XDP_DFLT_NUM_DESCS / 2;
-       ret = reserve_fill_queue(rxq->umem, reserve_size);
+#if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
+       if (rte_pktmbuf_alloc_bulk(rxq->umem->mb_pool, fq_bufs, reserve_size)) {
+               AF_XDP_LOG(DEBUG, "Failed to get enough buffers for fq.\n");
+               goto err;
+       }
+#endif
+       ret = reserve_fill_queue(rxq->umem, reserve_size, fq_bufs);
        if (ret) {
                xsk_socket__delete(rxq->xsk);
                AF_XDP_LOG(ERR, "Failed to reserve fill queue.\n");
@@ -673,7 +916,6 @@ eth_rx_queue_setup(struct rte_eth_dev *dev,
                   struct rte_mempool *mb_pool)
 {
        struct pmd_internals *internals = dev->data->dev_private;
-       uint32_t buf_size, data_size;
        struct pkt_rx_queue *rxq;
        int ret;
 
@@ -681,6 +923,10 @@ eth_rx_queue_setup(struct rte_eth_dev *dev,
 
        AF_XDP_LOG(INFO, "Set up rx queue, rx queue id: %d, xsk queue id: %d\n",
                   rx_queue_id, rxq->xsk_queue_idx);
+
+#ifndef XDP_UMEM_UNALIGNED_CHUNK_FLAG
+       uint32_t buf_size, data_size;
+
        /* Now get the space available for data in the mbuf */
        buf_size = rte_pktmbuf_data_room_size(mb_pool) -
                RTE_PKTMBUF_HEADROOM;
@@ -692,6 +938,7 @@ eth_rx_queue_setup(struct rte_eth_dev *dev,
                ret = -ENOMEM;
                goto err;
        }
+#endif
 
        rxq->mb_pool = mb_pool;
 
@@ -704,8 +951,6 @@ eth_rx_queue_setup(struct rte_eth_dev *dev,
        rxq->fds[0].fd = xsk_socket__fd(rxq->xsk);
        rxq->fds[0].events = POLLIN;
 
-       rxq->umem->pmd_zc = internals->pmd_zc;
-
        dev->data->rx_queues[rx_queue_id] = rxq;
        return 0;
 
@@ -887,7 +1132,7 @@ xdp_get_channels_info(const char *if_name, int *max_queues,
 
 static int
 parse_parameters(struct rte_kvargs *kvlist, char *if_name, int *start_queue,
-                       int *queue_cnt, int *pmd_zc)
+                       int *queue_cnt)
 {
        int ret;
 
@@ -908,11 +1153,6 @@ parse_parameters(struct rte_kvargs *kvlist, char *if_name, int *start_queue,
                goto free_kvlist;
        }
 
-       ret = rte_kvargs_process(kvlist, ETH_AF_XDP_PMD_ZC_ARG,
-                                &parse_integer_arg, pmd_zc);
-       if (ret < 0)
-               goto free_kvlist;
-
 free_kvlist:
        rte_kvargs_free(kvlist);
        return ret;
@@ -950,7 +1190,7 @@ error:
 
 static struct rte_eth_dev *
 init_internals(struct rte_vdev_device *dev, const char *if_name,
-                       int start_queue_idx, int queue_cnt, int pmd_zc)
+                       int start_queue_idx, int queue_cnt)
 {
        const char *name = rte_vdev_device_name(dev);
        const unsigned int numa_node = dev->device.numa_node;
@@ -965,7 +1205,6 @@ init_internals(struct rte_vdev_device *dev, const char *if_name,
 
        internals->start_queue_idx = start_queue_idx;
        internals->queue_cnt = queue_cnt;
-       internals->pmd_zc = pmd_zc;
        strlcpy(internals->if_name, if_name, IFNAMSIZ);
 
        if (xdp_get_channels_info(if_name, &internals->max_queue_cnt,
@@ -1021,8 +1260,9 @@ init_internals(struct rte_vdev_device *dev, const char *if_name,
        /* Let rte_eth_dev_close() release the port resources. */
        eth_dev->data->dev_flags |= RTE_ETH_DEV_CLOSE_REMOVE;
 
-       if (internals->pmd_zc)
-               AF_XDP_LOG(INFO, "Zero copy between umem and mbuf enabled.\n");
+#if defined(XDP_UMEM_UNALIGNED_CHUNK_FLAG)
+       AF_XDP_LOG(INFO, "Zero copy between umem and mbuf enabled.\n");
+#endif
 
        return eth_dev;
 
@@ -1044,7 +1284,6 @@ rte_pmd_af_xdp_probe(struct rte_vdev_device *dev)
        int xsk_queue_cnt = ETH_AF_XDP_DFLT_QUEUE_COUNT;
        struct rte_eth_dev *eth_dev = NULL;
        const char *name;
-       int pmd_zc = 0;
 
        AF_XDP_LOG(INFO, "Initializing pmd_af_xdp for %s\n",
                rte_vdev_device_name(dev));
@@ -1072,7 +1311,7 @@ rte_pmd_af_xdp_probe(struct rte_vdev_device *dev)
                dev->device.numa_node = rte_socket_id();
 
        if (parse_parameters(kvlist, if_name, &xsk_start_queue_idx,
-                            &xsk_queue_cnt, &pmd_zc) < 0) {
+                            &xsk_queue_cnt) < 0) {
                AF_XDP_LOG(ERR, "Invalid kvargs value\n");
                return -EINVAL;
        }
@@ -1083,7 +1322,7 @@ rte_pmd_af_xdp_probe(struct rte_vdev_device *dev)
        }
 
        eth_dev = init_internals(dev, if_name, xsk_start_queue_idx,
-                                       xsk_queue_cnt, pmd_zc);
+                                       xsk_queue_cnt);
        if (eth_dev == NULL) {
                AF_XDP_LOG(ERR, "Failed to init internals\n");
                return -1;
@@ -1126,8 +1365,7 @@ RTE_PMD_REGISTER_VDEV(net_af_xdp, pmd_af_xdp_drv);
 RTE_PMD_REGISTER_PARAM_STRING(net_af_xdp,
                              "iface=<string> "
                              "start_queue=<int> "
-                             "queue_count=<int> "
-                             "pmd_zero_copy=<0|1>");
+                             "queue_count=<int> ");
 
 RTE_INIT(af_xdp_init_log)
 {