vhost: enhance async enqueue for small packets
authorJiayu Hu <jiayu.hu@intel.com>
Mon, 11 Jan 2021 12:16:27 +0000 (07:16 -0500)
committerFerruh Yigit <ferruh.yigit@intel.com>
Wed, 13 Jan 2021 17:51:58 +0000 (18:51 +0100)
Async enqueue offloads large copies to DMA devices, and small copies
are still performed by the CPU. However, it requires users to get
enqueue completed packets by rte_vhost_poll_enqueue_completed(), even
if they are completed by the CPU when rte_vhost_submit_enqueue_burst()
returns. This design incurs extra overheads of tracking completed
pktmbufs and function calls, thus degrading performance on small packets.

This patch enhances async enqueue for small packets by enabling
rte_vhost_submit_enqueue_burst() to return completed packets.

Signed-off-by: Jiayu Hu <jiayu.hu@intel.com>
Tested-by: Yinan Wang <yinan.wang@intel.com>
Reviewed-by: Maxime Coquelin <maxime.coquelin@redhat.com>
doc/guides/prog_guide/vhost_lib.rst
examples/vhost/main.c
lib/librte_vhost/rte_vhost_async.h
lib/librte_vhost/vhost.c
lib/librte_vhost/vhost.h
lib/librte_vhost/vhost_user.c
lib/librte_vhost/virtio_net.c

index ba4c62a..dc29229 100644 (file)
@@ -245,11 +245,13 @@ The following is an overview of some key Vhost API functions:
 
   Unregister the async copy device channel from a vhost queue.
 
 
   Unregister the async copy device channel from a vhost queue.
 
-* ``rte_vhost_submit_enqueue_burst(vid, queue_id, pkts, count)``
+* ``rte_vhost_submit_enqueue_burst(vid, queue_id, pkts, count, comp_pkts, comp_count)``
 
   Submit an enqueue request to transmit ``count`` packets from host to guest
 
   Submit an enqueue request to transmit ``count`` packets from host to guest
-  by async data path. Enqueue is not guaranteed to finish upon the return of
-  this API call.
+  by async data path. Successfully enqueued packets can be transfer completed
+  or being occupied by DMA engines; transfer completed packets are returned in
+  ``comp_pkts``, but others are not guaranteed to finish, when this API
+  call returns.
 
   Applications must not free the packets submitted for enqueue until the
   packets are completed.
 
   Applications must not free the packets submitted for enqueue until the
   packets are completed.
index 8d8c303..2230997 100644 (file)
@@ -809,13 +809,16 @@ virtio_xmit(struct vhost_dev *dst_vdev, struct vhost_dev *src_vdev,
            struct rte_mbuf *m)
 {
        uint16_t ret;
            struct rte_mbuf *m)
 {
        uint16_t ret;
-       struct rte_mbuf *m_cpl[1];
+       struct rte_mbuf *m_cpl[1], *comp_pkt;
+       uint32_t nr_comp = 0;
 
        if (builtin_net_driver) {
                ret = vs_enqueue_pkts(dst_vdev, VIRTIO_RXQ, &m, 1);
        } else if (async_vhost_driver) {
                ret = rte_vhost_submit_enqueue_burst(dst_vdev->vid, VIRTIO_RXQ,
 
        if (builtin_net_driver) {
                ret = vs_enqueue_pkts(dst_vdev, VIRTIO_RXQ, &m, 1);
        } else if (async_vhost_driver) {
                ret = rte_vhost_submit_enqueue_burst(dst_vdev->vid, VIRTIO_RXQ,
-                                               &m, 1);
+                                               &m, 1, &comp_pkt, &nr_comp);
+               if (nr_comp == 1)
+                       goto done;
 
                if (likely(ret))
                        dst_vdev->nr_async_pkts++;
 
                if (likely(ret))
                        dst_vdev->nr_async_pkts++;
@@ -829,6 +832,7 @@ virtio_xmit(struct vhost_dev *dst_vdev, struct vhost_dev *src_vdev,
                ret = rte_vhost_enqueue_burst(dst_vdev->vid, VIRTIO_RXQ, &m, 1);
        }
 
                ret = rte_vhost_enqueue_burst(dst_vdev->vid, VIRTIO_RXQ, &m, 1);
        }
 
+done:
        if (enable_stats) {
                rte_atomic64_inc(&dst_vdev->stats.rx_total_atomic);
                rte_atomic64_add(&dst_vdev->stats.rx_atomic, ret);
        if (enable_stats) {
                rte_atomic64_inc(&dst_vdev->stats.rx_total_atomic);
                rte_atomic64_add(&dst_vdev->stats.rx_atomic, ret);
@@ -1090,7 +1094,8 @@ static __rte_always_inline void
 drain_eth_rx(struct vhost_dev *vdev)
 {
        uint16_t rx_count, enqueue_count;
 drain_eth_rx(struct vhost_dev *vdev)
 {
        uint16_t rx_count, enqueue_count;
-       struct rte_mbuf *pkts[MAX_PKT_BURST];
+       struct rte_mbuf *pkts[MAX_PKT_BURST], *comp_pkts[MAX_PKT_BURST];
+       uint32_t nr_comp = 0;
 
        rx_count = rte_eth_rx_burst(ports[0], vdev->vmdq_rx_q,
                                    pkts, MAX_PKT_BURST);
 
        rx_count = rte_eth_rx_burst(ports[0], vdev->vmdq_rx_q,
                                    pkts, MAX_PKT_BURST);
@@ -1124,7 +1129,12 @@ drain_eth_rx(struct vhost_dev *vdev)
                                                pkts, rx_count);
        } else if (async_vhost_driver) {
                enqueue_count = rte_vhost_submit_enqueue_burst(vdev->vid,
                                                pkts, rx_count);
        } else if (async_vhost_driver) {
                enqueue_count = rte_vhost_submit_enqueue_burst(vdev->vid,
-                                       VIRTIO_RXQ, pkts, rx_count);
+                                       VIRTIO_RXQ, pkts, rx_count, comp_pkts,
+                                       &nr_comp);
+               if (nr_comp > 0) {
+                       free_pkts(comp_pkts, nr_comp);
+                       enqueue_count -= nr_comp;
+               }
                vdev->nr_async_pkts += enqueue_count;
        } else {
                enqueue_count = rte_vhost_enqueue_burst(vdev->vid, VIRTIO_RXQ,
                vdev->nr_async_pkts += enqueue_count;
        } else {
                enqueue_count = rte_vhost_enqueue_burst(vdev->vid, VIRTIO_RXQ,
index 03bd558..c855ff8 100644 (file)
@@ -87,13 +87,8 @@ struct rte_vhost_async_channel_ops {
  * inflight async packet information
  */
 struct async_inflight_info {
  * inflight async packet information
  */
 struct async_inflight_info {
-       union {
-               uint32_t info;
-               struct {
-                       uint16_t descs; /* num of descs inflight */
-                       uint16_t segs; /* iov segs inflight */
-               };
-       };
+       struct rte_mbuf *mbuf;
+       uint16_t descs; /* num of descs inflight */
 };
 
 /**
 };
 
 /**
@@ -147,9 +142,13 @@ __rte_experimental
 int rte_vhost_async_channel_unregister(int vid, uint16_t queue_id);
 
 /**
 int rte_vhost_async_channel_unregister(int vid, uint16_t queue_id);
 
 /**
- * This function submits enqueue data to async engine. This function has
- * no guarantee to the transfer completion upon return. Applications
- * should poll transfer status by rte_vhost_poll_enqueue_completed()
+ * This function submits enqueue data to async engine. Successfully
+ * enqueued packets can be transfer completed or being occupied by DMA
+ * engines, when this API returns. Transfer completed packets are returned
+ * in comp_pkts, so users need to guarantee its size is greater than or
+ * equal to the size of pkts; for packets that are successfully enqueued
+ * but not transfer completed, users should poll transfer status by
+ * rte_vhost_poll_enqueue_completed().
  *
  * @param vid
  *  id of vhost device to enqueue data
  *
  * @param vid
  *  id of vhost device to enqueue data
@@ -159,12 +158,19 @@ int rte_vhost_async_channel_unregister(int vid, uint16_t queue_id);
  *  array of packets to be enqueued
  * @param count
  *  packets num to be enqueued
  *  array of packets to be enqueued
  * @param count
  *  packets num to be enqueued
+ * @param comp_pkts
+ *  empty array to get transfer completed packets. Users need to
+ *  guarantee its size is greater than or equal to that of pkts
+ * @param comp_count
+ *  num of packets that are transfer completed, when this API returns.
+ *  If no packets are transfer completed, its value is set to 0.
  * @return
  * @return
- *  num of packets enqueued
+ *  num of packets enqueued, including in-flight and transfer completed
  */
 __rte_experimental
 uint16_t rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
  */
 __rte_experimental
 uint16_t rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
-               struct rte_mbuf **pkts, uint16_t count);
+               struct rte_mbuf **pkts, uint16_t count,
+               struct rte_mbuf **comp_pkts, uint32_t *comp_count);
 
 /**
  * This function checks async completion status for a specific vhost
 
 /**
  * This function checks async completion status for a specific vhost
index c69b105..efb136e 100644 (file)
@@ -327,17 +327,17 @@ cleanup_device(struct virtio_net *dev, int destroy)
 static void
 vhost_free_async_mem(struct vhost_virtqueue *vq)
 {
 static void
 vhost_free_async_mem(struct vhost_virtqueue *vq)
 {
-       if (vq->async_pkts_pending)
-               rte_free(vq->async_pkts_pending);
        if (vq->async_pkts_info)
                rte_free(vq->async_pkts_info);
        if (vq->async_pkts_info)
                rte_free(vq->async_pkts_info);
+       if (vq->async_descs_split)
+               rte_free(vq->async_descs_split);
        if (vq->it_pool)
                rte_free(vq->it_pool);
        if (vq->vec_pool)
                rte_free(vq->vec_pool);
 
        if (vq->it_pool)
                rte_free(vq->it_pool);
        if (vq->vec_pool)
                rte_free(vq->vec_pool);
 
-       vq->async_pkts_pending = NULL;
        vq->async_pkts_info = NULL;
        vq->async_pkts_info = NULL;
+       vq->async_descs_split = NULL;
        vq->it_pool = NULL;
        vq->vec_pool = NULL;
 }
        vq->it_pool = NULL;
        vq->vec_pool = NULL;
 }
@@ -1628,9 +1628,6 @@ int rte_vhost_async_channel_register(int vid, uint16_t queue_id,
        node = SOCKET_ID_ANY;
 #endif
 
        node = SOCKET_ID_ANY;
 #endif
 
-       vq->async_pkts_pending = rte_malloc_socket(NULL,
-                       vq->size * sizeof(uintptr_t),
-                       RTE_CACHE_LINE_SIZE, node);
        vq->async_pkts_info = rte_malloc_socket(NULL,
                        vq->size * sizeof(struct async_inflight_info),
                        RTE_CACHE_LINE_SIZE, node);
        vq->async_pkts_info = rte_malloc_socket(NULL,
                        vq->size * sizeof(struct async_inflight_info),
                        RTE_CACHE_LINE_SIZE, node);
@@ -1640,7 +1637,10 @@ int rte_vhost_async_channel_register(int vid, uint16_t queue_id,
        vq->vec_pool = rte_malloc_socket(NULL,
                        VHOST_MAX_ASYNC_VEC * sizeof(struct iovec),
                        RTE_CACHE_LINE_SIZE, node);
        vq->vec_pool = rte_malloc_socket(NULL,
                        VHOST_MAX_ASYNC_VEC * sizeof(struct iovec),
                        RTE_CACHE_LINE_SIZE, node);
-       if (!vq->async_pkts_pending || !vq->async_pkts_info ||
+       vq->async_descs_split = rte_malloc_socket(NULL,
+                       vq->size * sizeof(struct vring_used_elem),
+                       RTE_CACHE_LINE_SIZE, node);
+       if (!vq->async_descs_split || !vq->async_pkts_info ||
                !vq->it_pool || !vq->vec_pool) {
                vhost_free_async_mem(vq);
                VHOST_LOG_CONFIG(ERR,
                !vq->it_pool || !vq->vec_pool) {
                vhost_free_async_mem(vq);
                VHOST_LOG_CONFIG(ERR,
index 23e11ff..658f6fc 100644 (file)
@@ -202,11 +202,13 @@ struct vhost_virtqueue {
        struct iovec *vec_pool;
 
        /* async data transfer status */
        struct iovec *vec_pool;
 
        /* async data transfer status */
-       uintptr_t       **async_pkts_pending;
        struct async_inflight_info *async_pkts_info;
        uint16_t        async_pkts_idx;
        uint16_t        async_pkts_inflight_n;
        uint16_t        async_last_pkts_n;
        struct async_inflight_info *async_pkts_info;
        uint16_t        async_pkts_idx;
        uint16_t        async_pkts_inflight_n;
        uint16_t        async_last_pkts_n;
+       struct vring_used_elem  *async_descs_split;
+       uint16_t async_desc_idx;
+       uint16_t last_async_desc_idx;
 
        /* vq async features */
        bool            async_inorder;
 
        /* vq async features */
        bool            async_inorder;
@@ -733,8 +735,7 @@ vhost_vring_call_split(struct virtio_net *dev, struct vhost_virtqueue *vq)
        /* Don't kick guest if we don't reach index specified by guest. */
        if (dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX)) {
                uint16_t old = vq->signalled_used;
        /* Don't kick guest if we don't reach index specified by guest. */
        if (dev->features & (1ULL << VIRTIO_RING_F_EVENT_IDX)) {
                uint16_t old = vq->signalled_used;
-               uint16_t new = vq->async_pkts_inflight_n ?
-                                       vq->used->idx:vq->last_used_idx;
+               uint16_t new = vq->last_used_idx;
                bool signalled_used_valid = vq->signalled_used_valid;
 
                vq->signalled_used = new;
                bool signalled_used_valid = vq->signalled_used_valid;
 
                vq->signalled_used = new;
index 4fb1924..a60bb94 100644 (file)
@@ -2010,12 +2010,13 @@ vhost_user_get_vring_base(struct virtio_net **pdev,
        } else {
                rte_free(vq->shadow_used_split);
                vq->shadow_used_split = NULL;
        } else {
                rte_free(vq->shadow_used_split);
                vq->shadow_used_split = NULL;
-               if (vq->async_pkts_pending)
-                       rte_free(vq->async_pkts_pending);
+
                if (vq->async_pkts_info)
                        rte_free(vq->async_pkts_info);
                if (vq->async_pkts_info)
                        rte_free(vq->async_pkts_info);
-               vq->async_pkts_pending = NULL;
+               if (vq->async_descs_split)
+                       rte_free(vq->async_descs_split);
                vq->async_pkts_info = NULL;
                vq->async_pkts_info = NULL;
+               vq->async_descs_split = NULL;
        }
 
        rte_free(vq->batch_copy_elems);
        }
 
        rte_free(vq->batch_copy_elems);
index 5ee63fc..6580983 100644 (file)
@@ -117,31 +117,6 @@ flush_shadow_used_ring_split(struct virtio_net *dev, struct vhost_virtqueue *vq)
                sizeof(vq->used->idx));
 }
 
                sizeof(vq->used->idx));
 }
 
-static __rte_always_inline void
-async_flush_shadow_used_ring_split(struct virtio_net *dev,
-       struct vhost_virtqueue *vq)
-{
-       uint16_t used_idx = vq->last_used_idx & (vq->size - 1);
-
-       if (used_idx + vq->shadow_used_idx <= vq->size) {
-               do_flush_shadow_used_ring_split(dev, vq, used_idx, 0,
-                                         vq->shadow_used_idx);
-       } else {
-               uint16_t size;
-
-               /* update used ring interval [used_idx, vq->size] */
-               size = vq->size - used_idx;
-               do_flush_shadow_used_ring_split(dev, vq, used_idx, 0, size);
-
-               /* update the left half used ring interval [0, left_size] */
-               do_flush_shadow_used_ring_split(dev, vq, 0, size,
-                                         vq->shadow_used_idx - size);
-       }
-
-       vq->last_used_idx += vq->shadow_used_idx;
-       vq->shadow_used_idx = 0;
-}
-
 static __rte_always_inline void
 update_shadow_used_ring_split(struct vhost_virtqueue *vq,
                         uint16_t desc_idx, uint32_t len)
 static __rte_always_inline void
 update_shadow_used_ring_split(struct vhost_virtqueue *vq,
                         uint16_t desc_idx, uint32_t len)
@@ -1480,7 +1455,8 @@ virtio_dev_rx_async_get_info_idx(uint16_t pkts_idx,
 static __rte_noinline uint32_t
 virtio_dev_rx_async_submit_split(struct virtio_net *dev,
        struct vhost_virtqueue *vq, uint16_t queue_id,
 static __rte_noinline uint32_t
 virtio_dev_rx_async_submit_split(struct virtio_net *dev,
        struct vhost_virtqueue *vq, uint16_t queue_id,
-       struct rte_mbuf **pkts, uint32_t count)
+       struct rte_mbuf **pkts, uint32_t count,
+       struct rte_mbuf **comp_pkts, uint32_t *comp_count)
 {
        uint32_t pkt_idx = 0, pkt_burst_idx = 0;
        uint16_t num_buffers;
 {
        uint32_t pkt_idx = 0, pkt_burst_idx = 0;
        uint16_t num_buffers;
@@ -1494,10 +1470,15 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
        struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
        struct rte_vhost_iov_iter *src_it = it_pool;
        struct rte_vhost_iov_iter *dst_it = it_pool + 1;
        struct iovec *dst_iovec = vec_pool + (VHOST_MAX_ASYNC_VEC >> 1);
        struct rte_vhost_iov_iter *src_it = it_pool;
        struct rte_vhost_iov_iter *dst_it = it_pool + 1;
-       uint16_t n_free_slot, slot_idx = 0;
+       uint16_t slot_idx = 0;
        uint16_t segs_await = 0;
        struct async_inflight_info *pkts_info = vq->async_pkts_info;
        uint32_t n_pkts = 0, pkt_err = 0;
        uint16_t segs_await = 0;
        struct async_inflight_info *pkts_info = vq->async_pkts_info;
        uint32_t n_pkts = 0, pkt_err = 0;
+       uint32_t num_async_pkts = 0, num_done_pkts = 0;
+       struct {
+               uint16_t pkt_idx;
+               uint16_t last_avail_idx;
+       } async_pkts_log[MAX_PKT_BURST];
 
        /*
         * The ordering between avail index and desc reads need to be enforced.
 
        /*
         * The ordering between avail index and desc reads need to be enforced.
@@ -1531,21 +1512,50 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
                        break;
                }
 
                        break;
                }
 
-               slot_idx = (vq->async_pkts_idx + pkt_idx) & (vq->size - 1);
+               slot_idx = (vq->async_pkts_idx + num_async_pkts) &
+                       (vq->size - 1);
                if (src_it->count) {
                if (src_it->count) {
-                       async_fill_desc(&tdes[pkt_burst_idx], src_it, dst_it);
-                       pkt_burst_idx++;
+                       uint16_t from, to;
+
+                       async_fill_desc(&tdes[pkt_burst_idx++], src_it, dst_it);
                        pkts_info[slot_idx].descs = num_buffers;
                        pkts_info[slot_idx].descs = num_buffers;
-                       pkts_info[slot_idx].segs = src_it->nr_segs;
+                       pkts_info[slot_idx].mbuf = pkts[pkt_idx];
+                       async_pkts_log[num_async_pkts].pkt_idx = pkt_idx;
+                       async_pkts_log[num_async_pkts++].last_avail_idx =
+                               vq->last_avail_idx;
                        src_iovec += src_it->nr_segs;
                        dst_iovec += dst_it->nr_segs;
                        src_it += 2;
                        dst_it += 2;
                        segs_await += src_it->nr_segs;
                        src_iovec += src_it->nr_segs;
                        dst_iovec += dst_it->nr_segs;
                        src_it += 2;
                        dst_it += 2;
                        segs_await += src_it->nr_segs;
-               } else {
-                       pkts_info[slot_idx].info = num_buffers;
-                       vq->async_pkts_inflight_n++;
-               }
+
+                       /**
+                        * recover shadow used ring and keep DMA-occupied
+                        * descriptors.
+                        */
+                       from = vq->shadow_used_idx - num_buffers;
+                       to = vq->async_desc_idx & (vq->size - 1);
+                       if (num_buffers + to <= vq->size) {
+                               rte_memcpy(&vq->async_descs_split[to],
+                                               &vq->shadow_used_split[from],
+                                               num_buffers *
+                                               sizeof(struct vring_used_elem));
+                       } else {
+                               int size = vq->size - to;
+
+                               rte_memcpy(&vq->async_descs_split[to],
+                                               &vq->shadow_used_split[from],
+                                               size *
+                                               sizeof(struct vring_used_elem));
+                               rte_memcpy(vq->async_descs_split,
+                                               &vq->shadow_used_split[from +
+                                               size], (num_buffers - size) *
+                                          sizeof(struct vring_used_elem));
+                       }
+                       vq->async_desc_idx += num_buffers;
+                       vq->shadow_used_idx -= num_buffers;
+               } else
+                       comp_pkts[num_done_pkts++] = pkts[pkt_idx];
 
                vq->last_avail_idx += num_buffers;
 
 
                vq->last_avail_idx += num_buffers;
 
@@ -1554,9 +1564,9 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
                 * - buffered packet number reaches transfer threshold
                 * - unused async iov number is less than max vhost vector
                 */
                 * - buffered packet number reaches transfer threshold
                 * - unused async iov number is less than max vhost vector
                 */
-               if (pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
-                       (VHOST_MAX_ASYNC_VEC / 2 - segs_await <
-                       BUF_VECTOR_MAX)) {
+               if (unlikely(pkt_burst_idx >= VHOST_ASYNC_BATCH_THRESHOLD ||
+                       ((VHOST_MAX_ASYNC_VEC >> 1) - segs_await <
+                       BUF_VECTOR_MAX))) {
                        n_pkts = vq->async_ops.transfer_data(dev->vid,
                                        queue_id, tdes, 0, pkt_burst_idx);
                        src_iovec = vec_pool;
                        n_pkts = vq->async_ops.transfer_data(dev->vid,
                                        queue_id, tdes, 0, pkt_burst_idx);
                        src_iovec = vec_pool;
@@ -1564,7 +1574,7 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
                        src_it = it_pool;
                        dst_it = it_pool + 1;
                        segs_await = 0;
                        src_it = it_pool;
                        dst_it = it_pool + 1;
                        segs_await = 0;
-                       vq->async_pkts_inflight_n += pkt_burst_idx;
+                       vq->async_pkts_inflight_n += n_pkts;
 
                        if (unlikely(n_pkts < pkt_burst_idx)) {
                                /*
 
                        if (unlikely(n_pkts < pkt_burst_idx)) {
                                /*
@@ -1584,7 +1594,7 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
        if (pkt_burst_idx) {
                n_pkts = vq->async_ops.transfer_data(dev->vid,
                                queue_id, tdes, 0, pkt_burst_idx);
        if (pkt_burst_idx) {
                n_pkts = vq->async_ops.transfer_data(dev->vid,
                                queue_id, tdes, 0, pkt_burst_idx);
-               vq->async_pkts_inflight_n += pkt_burst_idx;
+               vq->async_pkts_inflight_n += n_pkts;
 
                if (unlikely(n_pkts < pkt_burst_idx))
                        pkt_err = pkt_burst_idx - n_pkts;
 
                if (unlikely(n_pkts < pkt_burst_idx))
                        pkt_err = pkt_burst_idx - n_pkts;
@@ -1592,32 +1602,33 @@ virtio_dev_rx_async_submit_split(struct virtio_net *dev,
 
        do_data_copy_enqueue(dev, vq);
 
 
        do_data_copy_enqueue(dev, vq);
 
-       while (unlikely(pkt_err && pkt_idx)) {
-               if (pkts_info[slot_idx].segs)
-                       pkt_err--;
-               vq->last_avail_idx -= pkts_info[slot_idx].descs;
-               vq->shadow_used_idx -= pkts_info[slot_idx].descs;
-               vq->async_pkts_inflight_n--;
-               slot_idx = (slot_idx - 1) & (vq->size - 1);
-               pkt_idx--;
-       }
-
-       n_free_slot = vq->size - vq->async_pkts_idx;
-       if (n_free_slot > pkt_idx) {
-               rte_memcpy(&vq->async_pkts_pending[vq->async_pkts_idx],
-                       pkts, pkt_idx * sizeof(uintptr_t));
-               vq->async_pkts_idx += pkt_idx;
-       } else {
-               rte_memcpy(&vq->async_pkts_pending[vq->async_pkts_idx],
-                       pkts, n_free_slot * sizeof(uintptr_t));
-               rte_memcpy(&vq->async_pkts_pending[0],
-                       &pkts[n_free_slot],
-                       (pkt_idx - n_free_slot) * sizeof(uintptr_t));
-               vq->async_pkts_idx = pkt_idx - n_free_slot;
+       if (unlikely(pkt_err)) {
+               uint16_t num_descs = 0;
+
+               num_async_pkts -= pkt_err;
+               /* calculate the sum of descriptors of DMA-error packets. */
+               while (pkt_err-- > 0) {
+                       num_descs += pkts_info[slot_idx & (vq->size - 1)].descs;
+                       slot_idx--;
+               }
+               vq->async_desc_idx -= num_descs;
+               /* recover shadow used ring and available ring */
+               vq->shadow_used_idx -= (vq->last_avail_idx -
+                               async_pkts_log[num_async_pkts].last_avail_idx -
+                               num_descs);
+               vq->last_avail_idx =
+                       async_pkts_log[num_async_pkts].last_avail_idx;
+               pkt_idx = async_pkts_log[num_async_pkts].pkt_idx;
+               num_done_pkts = pkt_idx - num_async_pkts;
        }
 
        }
 
-       if (likely(vq->shadow_used_idx))
-               async_flush_shadow_used_ring_split(dev, vq);
+       vq->async_pkts_idx += num_async_pkts;
+       *comp_count = num_done_pkts;
+
+       if (likely(vq->shadow_used_idx)) {
+               flush_shadow_used_ring_split(dev, vq);
+               vhost_vring_call_split(dev, vq);
+       }
 
        return pkt_idx;
 }
 
        return pkt_idx;
 }
@@ -1629,8 +1640,8 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
        struct vhost_virtqueue *vq;
        uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0;
        uint16_t start_idx, pkts_idx, vq_size;
        struct vhost_virtqueue *vq;
        uint16_t n_pkts_cpl = 0, n_pkts_put = 0, n_descs = 0;
        uint16_t start_idx, pkts_idx, vq_size;
-       uint16_t n_inflight;
        struct async_inflight_info *pkts_info;
        struct async_inflight_info *pkts_info;
+       uint16_t from, i;
 
        if (!dev)
                return 0;
 
        if (!dev)
                return 0;
@@ -1652,8 +1663,7 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 
        rte_spinlock_lock(&vq->access_lock);
 
 
        rte_spinlock_lock(&vq->access_lock);
 
-       n_inflight = vq->async_pkts_inflight_n;
-       pkts_idx = vq->async_pkts_idx;
+       pkts_idx = vq->async_pkts_idx & (vq->size - 1);
        pkts_info = vq->async_pkts_info;
        vq_size = vq->size;
        start_idx = virtio_dev_rx_async_get_info_idx(pkts_idx,
        pkts_info = vq->async_pkts_info;
        vq_size = vq->size;
        start_idx = virtio_dev_rx_async_get_info_idx(pkts_idx,
@@ -1664,42 +1674,61 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
                        queue_id, 0, count - vq->async_last_pkts_n);
        n_pkts_cpl += vq->async_last_pkts_n;
 
                        queue_id, 0, count - vq->async_last_pkts_n);
        n_pkts_cpl += vq->async_last_pkts_n;
 
-       rte_atomic_thread_fence(__ATOMIC_RELEASE);
-
-       while (likely((n_pkts_put < count) && n_inflight)) {
-               uint16_t info_idx = (start_idx + n_pkts_put) & (vq_size - 1);
-               if (n_pkts_cpl && pkts_info[info_idx].segs)
-                       n_pkts_cpl--;
-               else if (!n_pkts_cpl && pkts_info[info_idx].segs)
-                       break;
-               n_pkts_put++;
-               n_inflight--;
-               n_descs += pkts_info[info_idx].descs;
-       }
-
-       vq->async_last_pkts_n = n_pkts_cpl;
+       n_pkts_put = RTE_MIN(count, n_pkts_cpl);
+       if (unlikely(n_pkts_put == 0)) {
+               vq->async_last_pkts_n = n_pkts_cpl;
+               goto done;
+       }
+
+       for (i = 0; i < n_pkts_put; i++) {
+               from = (start_idx + i) & (vq_size - 1);
+               n_descs += pkts_info[from].descs;
+               pkts[i] = pkts_info[from].mbuf;
+       }
+       vq->async_last_pkts_n = n_pkts_cpl - n_pkts_put;
+       vq->async_pkts_inflight_n -= n_pkts_put;
+
+       if (likely(vq->enabled && vq->access_ok)) {
+               uint16_t nr_left = n_descs;
+               uint16_t nr_copy;
+               uint16_t to;
+
+               /* write back completed descriptors to used ring */
+               do {
+                       from = vq->last_async_desc_idx & (vq->size - 1);
+                       nr_copy = nr_left + from <= vq->size ? nr_left :
+                               vq->size - from;
+                       to = vq->last_used_idx & (vq->size - 1);
+
+                       if (to + nr_copy <= vq->size) {
+                               rte_memcpy(&vq->used->ring[to],
+                                               &vq->async_descs_split[from],
+                                               nr_copy *
+                                               sizeof(struct vring_used_elem));
+                       } else {
+                               uint16_t size = vq->size - to;
+
+                               rte_memcpy(&vq->used->ring[to],
+                                               &vq->async_descs_split[from],
+                                               size *
+                                               sizeof(struct vring_used_elem));
+                               rte_memcpy(vq->used->ring,
+                                               &vq->async_descs_split[from +
+                                               size], (nr_copy - size) *
+                                               sizeof(struct vring_used_elem));
+                       }
 
 
-       if (n_pkts_put) {
-               vq->async_pkts_inflight_n = n_inflight;
-               if (likely(vq->enabled && vq->access_ok)) {
-                       __atomic_add_fetch(&vq->used->idx,
-                                       n_descs, __ATOMIC_RELEASE);
-                       vhost_vring_call_split(dev, vq);
-               }
+                       vq->last_async_desc_idx += nr_copy;
+                       vq->last_used_idx += nr_copy;
+                       nr_left -= nr_copy;
+               } while (nr_left > 0);
 
 
-               if (start_idx + n_pkts_put <= vq_size) {
-                       rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
-                               n_pkts_put * sizeof(uintptr_t));
-               } else {
-                       rte_memcpy(pkts, &vq->async_pkts_pending[start_idx],
-                               (vq_size - start_idx) * sizeof(uintptr_t));
-                       rte_memcpy(&pkts[vq_size - start_idx],
-                               vq->async_pkts_pending,
-                               (n_pkts_put + start_idx - vq_size) *
-                               sizeof(uintptr_t));
-               }
-       }
+               __atomic_add_fetch(&vq->used->idx, n_descs, __ATOMIC_RELEASE);
+               vhost_vring_call_split(dev, vq);
+       } else
+               vq->last_async_desc_idx += n_descs;
 
 
+done:
        rte_spinlock_unlock(&vq->access_lock);
 
        return n_pkts_put;
        rte_spinlock_unlock(&vq->access_lock);
 
        return n_pkts_put;
@@ -1707,7 +1736,8 @@ uint16_t rte_vhost_poll_enqueue_completed(int vid, uint16_t queue_id,
 
 static __rte_always_inline uint32_t
 virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
 
 static __rte_always_inline uint32_t
 virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
-       struct rte_mbuf **pkts, uint32_t count)
+       struct rte_mbuf **pkts, uint32_t count,
+       struct rte_mbuf **comp_pkts, uint32_t *comp_count)
 {
        struct vhost_virtqueue *vq;
        uint32_t nb_tx = 0;
 {
        struct vhost_virtqueue *vq;
        uint32_t nb_tx = 0;
@@ -1742,7 +1772,8 @@ virtio_dev_rx_async_submit(struct virtio_net *dev, uint16_t queue_id,
                nb_tx = 0;
        else
                nb_tx = virtio_dev_rx_async_submit_split(dev,
                nb_tx = 0;
        else
                nb_tx = virtio_dev_rx_async_submit_split(dev,
-                               vq, queue_id, pkts, count);
+                               vq, queue_id, pkts, count, comp_pkts,
+                               comp_count);
 
 out:
        if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
 
 out:
        if (dev->features & (1ULL << VIRTIO_F_IOMMU_PLATFORM))
@@ -1756,10 +1787,12 @@ out_access_unlock:
 
 uint16_t
 rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
 
 uint16_t
 rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
-               struct rte_mbuf **pkts, uint16_t count)
+               struct rte_mbuf **pkts, uint16_t count,
+               struct rte_mbuf **comp_pkts, uint32_t *comp_count)
 {
        struct virtio_net *dev = get_device(vid);
 
 {
        struct virtio_net *dev = get_device(vid);
 
+       *comp_count = 0;
        if (!dev)
                return 0;
 
        if (!dev)
                return 0;
 
@@ -1770,7 +1803,8 @@ rte_vhost_submit_enqueue_burst(int vid, uint16_t queue_id,
                return 0;
        }
 
                return 0;
        }
 
-       return virtio_dev_rx_async_submit(dev, queue_id, pkts, count);
+       return virtio_dev_rx_async_submit(dev, queue_id, pkts, count, comp_pkts,
+                       comp_count);
 }
 
 static inline bool
 }
 
 static inline bool