vhost: optimize packed ring dequeue
authorMarvin Liu <yong.liu@intel.com>
Thu, 24 Oct 2019 16:08:31 +0000 (00:08 +0800)
committerFerruh Yigit <ferruh.yigit@intel.com>
Fri, 25 Oct 2019 17:20:47 +0000 (19:20 +0200)
Optimize vhost device packed ring dequeue function by splitting batch
and single functions. No-chained and direct descriptors will be handled
by batch and other will be handled by single as before.

Signed-off-by: Marvin Liu <yong.liu@intel.com>
Reviewed-by: Maxime Coquelin <maxime.coquelin@redhat.com>
lib/librte_vhost/virtio_net.c

index 0243573..ab67269 100644 (file)
@@ -201,68 +201,6 @@ vhost_flush_enqueue_batch_packed(struct virtio_net *dev,
        vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
 }
 
-static __rte_always_inline void
-flush_shadow_used_ring_packed(struct virtio_net *dev,
-                       struct vhost_virtqueue *vq)
-{
-       int i;
-       uint16_t used_idx = vq->last_used_idx;
-       uint16_t head_idx = vq->last_used_idx;
-       uint16_t head_flags = 0;
-
-       /* Split loop in two to save memory barriers */
-       for (i = 0; i < vq->shadow_used_idx; i++) {
-               vq->desc_packed[used_idx].id = vq->shadow_used_packed[i].id;
-               vq->desc_packed[used_idx].len = vq->shadow_used_packed[i].len;
-
-               used_idx += vq->shadow_used_packed[i].count;
-               if (used_idx >= vq->size)
-                       used_idx -= vq->size;
-       }
-
-       for (i = 0; i < vq->shadow_used_idx; i++) {
-               uint16_t flags;
-
-               if (vq->shadow_used_packed[i].len)
-                       flags = VRING_DESC_F_WRITE;
-               else
-                       flags = 0;
-
-               if (vq->used_wrap_counter) {
-                       flags |= VRING_DESC_F_USED;
-                       flags |= VRING_DESC_F_AVAIL;
-               } else {
-                       flags &= ~VRING_DESC_F_USED;
-                       flags &= ~VRING_DESC_F_AVAIL;
-               }
-
-               if (i > 0) {
-                       vq->desc_packed[vq->last_used_idx].flags = flags;
-
-                       vhost_log_cache_used_vring(dev, vq,
-                                       vq->last_used_idx *
-                                       sizeof(struct vring_packed_desc),
-                                       sizeof(struct vring_packed_desc));
-               } else {
-                       head_idx = vq->last_used_idx;
-                       head_flags = flags;
-               }
-
-               vq_inc_last_used_packed(vq, vq->shadow_used_packed[i].count);
-       }
-
-       __atomic_store_n(&vq->desc_packed[head_idx].flags, head_flags,
-                        __ATOMIC_RELEASE);
-
-       vhost_log_cache_used_vring(dev, vq,
-                               head_idx *
-                               sizeof(struct vring_packed_desc),
-                               sizeof(struct vring_packed_desc));
-
-       vq->shadow_used_idx = 0;
-       vhost_log_cache_sync(dev, vq);
-}
-
 static __rte_always_inline void
 vhost_shadow_dequeue_batch_packed(struct virtio_net *dev,
                                  struct vhost_virtqueue *vq,
@@ -335,17 +273,6 @@ vhost_shadow_dequeue_single_packed(struct vhost_virtqueue *vq,
        vq_inc_last_used_packed(vq, count);
 }
 
-static __rte_always_inline void
-update_shadow_used_ring_packed(struct vhost_virtqueue *vq,
-                        uint16_t desc_idx, uint32_t len, uint16_t count)
-{
-       uint16_t i = vq->shadow_used_idx++;
-
-       vq->shadow_used_packed[i].id  = desc_idx;
-       vq->shadow_used_packed[i].len = len;
-       vq->shadow_used_packed[i].count = count;
-}
-
 static inline void
 do_data_copy_enqueue(struct virtio_net *dev, struct vhost_virtqueue *vq)
 {
@@ -403,7 +330,7 @@ vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
        }
 }
 
-static __rte_unused void
+static __rte_always_inline void
 vhost_flush_dequeue_packed(struct virtio_net *dev,
                           struct vhost_virtqueue *vq)
 {
@@ -1893,7 +1820,7 @@ free_buf:
        return -1;
 }
 
-static __rte_unused int
+static __rte_always_inline int
 virtio_dev_tx_batch_packed(struct virtio_net *dev,
                           struct vhost_virtqueue *vq,
                           struct rte_mempool *mbuf_pool,
@@ -1961,7 +1888,7 @@ vhost_dequeue_single_packed(struct virtio_net *dev,
        return 0;
 }
 
-static __rte_unused int
+static __rte_always_inline int
 virtio_dev_tx_single_packed(struct virtio_net *dev,
                            struct vhost_virtqueue *vq,
                            struct rte_mempool *mbuf_pool,
@@ -1981,7 +1908,7 @@ virtio_dev_tx_single_packed(struct virtio_net *dev,
        return 0;
 }
 
-static __rte_unused int
+static __rte_always_inline int
 virtio_dev_tx_batch_packed_zmbuf(struct virtio_net *dev,
                                 struct vhost_virtqueue *vq,
                                 struct rte_mempool *mbuf_pool,
@@ -2030,7 +1957,7 @@ free_pkt:
        return -1;
 }
 
-static __rte_unused int
+static __rte_always_inline int
 virtio_dev_tx_single_packed_zmbuf(struct virtio_net *dev,
                                  struct vhost_virtqueue *vq,
                                  struct rte_mempool *mbuf_pool,
@@ -2061,7 +1988,7 @@ virtio_dev_tx_single_packed_zmbuf(struct virtio_net *dev,
        return 0;
 }
 
-static __rte_unused void
+static __rte_always_inline void
 free_zmbuf(struct vhost_virtqueue *vq)
 {
        struct zcopy_mbuf *next = NULL;
@@ -2102,111 +2029,77 @@ free_zmbuf(struct vhost_virtqueue *vq)
 }
 
 static __rte_noinline uint16_t
-virtio_dev_tx_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
-       struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
+virtio_dev_tx_packed_zmbuf(struct virtio_net *dev,
+                          struct vhost_virtqueue *vq,
+                          struct rte_mempool *mbuf_pool,
+                          struct rte_mbuf **pkts,
+                          uint32_t count)
 {
-       uint16_t i;
-
-       if (unlikely(dev->dequeue_zero_copy)) {
-               struct zcopy_mbuf *zmbuf, *next;
-
-               for (zmbuf = TAILQ_FIRST(&vq->zmbuf_list);
-                    zmbuf != NULL; zmbuf = next) {
-                       next = TAILQ_NEXT(zmbuf, next);
+       uint32_t pkt_idx = 0;
+       uint32_t remained = count;
 
-                       if (mbuf_is_consumed(zmbuf->mbuf)) {
-                               update_shadow_used_ring_packed(vq,
-                                               zmbuf->desc_idx,
-                                               0,
-                                               zmbuf->desc_count);
+       free_zmbuf(vq);
 
-                               TAILQ_REMOVE(&vq->zmbuf_list, zmbuf, next);
-                               restore_mbuf(zmbuf->mbuf);
-                               rte_pktmbuf_free(zmbuf->mbuf);
-                               put_zmbuf(zmbuf);
-                               vq->nr_zmbuf -= 1;
+       do {
+               if (remained >= PACKED_BATCH_SIZE) {
+                       if (!virtio_dev_tx_batch_packed_zmbuf(dev, vq,
+                               mbuf_pool, &pkts[pkt_idx])) {
+                               pkt_idx += PACKED_BATCH_SIZE;
+                               remained -= PACKED_BATCH_SIZE;
+                               continue;
                        }
                }
 
-               if (likely(vq->shadow_used_idx)) {
-                       flush_shadow_used_ring_packed(dev, vq);
-                       vhost_vring_call_packed(dev, vq);
-               }
-       }
-
-       VHOST_LOG_DEBUG(VHOST_DATA, "(%d) %s\n", dev->vid, __func__);
-
-       count = RTE_MIN(count, MAX_PKT_BURST);
-       VHOST_LOG_DEBUG(VHOST_DATA, "(%d) about to dequeue %u buffers\n",
-                       dev->vid, count);
-
-       for (i = 0; i < count; i++) {
-               struct buf_vector buf_vec[BUF_VECTOR_MAX];
-               uint16_t buf_id;
-               uint32_t buf_len;
-               uint16_t desc_count, nr_vec = 0;
-               int err;
-
-               if (unlikely(fill_vec_buf_packed(dev, vq,
-                                               vq->last_avail_idx, &desc_count,
-                                               buf_vec, &nr_vec,
-                                               &buf_id, &buf_len,
-                                               VHOST_ACCESS_RO) < 0))
+               if (virtio_dev_tx_single_packed_zmbuf(dev, vq, mbuf_pool,
+                                                     &pkts[pkt_idx]))
                        break;
+               pkt_idx++;
+               remained--;
 
-               if (likely(dev->dequeue_zero_copy == 0))
-                       update_shadow_used_ring_packed(vq, buf_id, 0,
-                                       desc_count);
+       } while (remained);
 
-               pkts[i] = virtio_dev_pktmbuf_alloc(dev, mbuf_pool, buf_len);
-               if (unlikely(pkts[i] == NULL))
-                       break;
-
-               err = copy_desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts[i],
-                               mbuf_pool);
-               if (unlikely(err)) {
-                       rte_pktmbuf_free(pkts[i]);
-                       break;
-               }
+       if (pkt_idx)
+               vhost_vring_call_packed(dev, vq);
 
-               if (unlikely(dev->dequeue_zero_copy)) {
-                       struct zcopy_mbuf *zmbuf;
+       return pkt_idx;
+}
 
-                       zmbuf = get_zmbuf(vq);
-                       if (!zmbuf) {
-                               rte_pktmbuf_free(pkts[i]);
-                               break;
-                       }
-                       zmbuf->mbuf = pkts[i];
-                       zmbuf->desc_idx = buf_id;
-                       zmbuf->desc_count = desc_count;
+static __rte_noinline uint16_t
+virtio_dev_tx_packed(struct virtio_net *dev,
+                    struct vhost_virtqueue *vq,
+                    struct rte_mempool *mbuf_pool,
+                    struct rte_mbuf **pkts,
+                    uint32_t count)
+{
+       uint32_t pkt_idx = 0;
+       uint32_t remained = count;
 
-                       /*
-                        * Pin lock the mbuf; we will check later to see
-                        * whether the mbuf is freed (when we are the last
-                        * user) or not. If that's the case, we then could
-                        * update the used ring safely.
-                        */
-                       rte_mbuf_refcnt_update(pkts[i], 1);
+       do {
+               rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
 
-                       vq->nr_zmbuf += 1;
-                       TAILQ_INSERT_TAIL(&vq->zmbuf_list, zmbuf, next);
+               if (remained >= PACKED_BATCH_SIZE) {
+                       if (!virtio_dev_tx_batch_packed(dev, vq, mbuf_pool,
+                                                       &pkts[pkt_idx])) {
+                               vhost_flush_dequeue_packed(dev, vq);
+                               pkt_idx += PACKED_BATCH_SIZE;
+                               remained -= PACKED_BATCH_SIZE;
+                               continue;
+                       }
                }
 
-               vq_inc_last_avail_packed(vq, desc_count);
-       }
+               if (virtio_dev_tx_single_packed(dev, vq, mbuf_pool,
+                                               &pkts[pkt_idx]))
+                       break;
+               vhost_flush_dequeue_packed(dev, vq);
+               pkt_idx++;
+               remained--;
 
-       if (likely(dev->dequeue_zero_copy == 0)) {
+       } while (remained);
+
+       if (vq->shadow_used_idx)
                do_data_copy_dequeue(vq);
-               if (unlikely(i < count))
-                       vq->shadow_used_idx = i;
-               if (likely(vq->shadow_used_idx)) {
-                       flush_shadow_used_ring_packed(dev, vq);
-                       vhost_vring_call_packed(dev, vq);
-               }
-       }
 
-       return i;
+       return pkt_idx;
 }
 
 uint16_t
@@ -2282,9 +2175,14 @@ rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
                count -= 1;
        }
 
-       if (vq_is_packed(dev))
-               count = virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts, count);
-       else
+       if (vq_is_packed(dev)) {
+               if (unlikely(dev->dequeue_zero_copy))
+                       count = virtio_dev_tx_packed_zmbuf(dev, vq, mbuf_pool,
+                                                          pkts, count);
+               else
+                       count = virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts,
+                                                    count);
+       } else
                count = virtio_dev_tx_split(dev, vq, mbuf_pool, pkts, count);
 
 out: