vhost: fix batch enqueue only handle few packets
[dpdk.git] / lib / librte_vhost / virtio_net.c
index 1c63262..21c3117 100644 (file)
@@ -31,6 +31,12 @@ rxvq_is_mergeable(struct virtio_net *dev)
        return dev->features & (1ULL << VIRTIO_NET_F_MRG_RXBUF);
 }
 
+static  __rte_always_inline bool
+virtio_net_is_inorder(struct virtio_net *dev)
+{
+       return dev->features & (1ULL << VIRTIO_F_IN_ORDER);
+}
+
 static bool
 is_valid_virt_queue_idx(uint32_t idx, int is_tx, uint32_t nr_vring)
 {
@@ -202,65 +208,21 @@ vhost_flush_enqueue_batch_packed(struct virtio_net *dev,
 }
 
 static __rte_always_inline void
-flush_shadow_used_ring_packed(struct virtio_net *dev,
-                       struct vhost_virtqueue *vq)
+vhost_shadow_dequeue_batch_packed_inorder(struct vhost_virtqueue *vq,
+                                         uint16_t id)
 {
-       int i;
-       uint16_t used_idx = vq->last_used_idx;
-       uint16_t head_idx = vq->last_used_idx;
-       uint16_t head_flags = 0;
-
-       /* Split loop in two to save memory barriers */
-       for (i = 0; i < vq->shadow_used_idx; i++) {
-               vq->desc_packed[used_idx].id = vq->shadow_used_packed[i].id;
-               vq->desc_packed[used_idx].len = vq->shadow_used_packed[i].len;
-
-               used_idx += vq->shadow_used_packed[i].count;
-               if (used_idx >= vq->size)
-                       used_idx -= vq->size;
-       }
-
-       for (i = 0; i < vq->shadow_used_idx; i++) {
-               uint16_t flags;
-
-               if (vq->shadow_used_packed[i].len)
-                       flags = VRING_DESC_F_WRITE;
-               else
-                       flags = 0;
-
-               if (vq->used_wrap_counter) {
-                       flags |= VRING_DESC_F_USED;
-                       flags |= VRING_DESC_F_AVAIL;
-               } else {
-                       flags &= ~VRING_DESC_F_USED;
-                       flags &= ~VRING_DESC_F_AVAIL;
-               }
-
-               if (i > 0) {
-                       vq->desc_packed[vq->last_used_idx].flags = flags;
-
-                       vhost_log_cache_used_vring(dev, vq,
-                                       vq->last_used_idx *
-                                       sizeof(struct vring_packed_desc),
-                                       sizeof(struct vring_packed_desc));
-               } else {
-                       head_idx = vq->last_used_idx;
-                       head_flags = flags;
-               }
+       vq->shadow_used_packed[0].id = id;
 
-               vq_inc_last_used_packed(vq, vq->shadow_used_packed[i].count);
+       if (!vq->shadow_used_idx) {
+               vq->shadow_last_used_idx = vq->last_used_idx;
+               vq->shadow_used_packed[0].flags =
+                       PACKED_DESC_DEQUEUE_USED_FLAG(vq->used_wrap_counter);
+               vq->shadow_used_packed[0].len = 0;
+               vq->shadow_used_packed[0].count = 1;
+               vq->shadow_used_idx++;
        }
 
-       __atomic_store_n(&vq->desc_packed[head_idx].flags, head_flags,
-                        __ATOMIC_RELEASE);
-
-       vhost_log_cache_used_vring(dev, vq,
-                               head_idx *
-                               sizeof(struct vring_packed_desc),
-                               sizeof(struct vring_packed_desc));
-
-       vq->shadow_used_idx = 0;
-       vhost_log_cache_sync(dev, vq);
+       vq_inc_last_used_packed(vq, PACKED_BATCH_SIZE);
 }
 
 static __rte_always_inline void
@@ -336,14 +298,31 @@ vhost_shadow_dequeue_single_packed(struct vhost_virtqueue *vq,
 }
 
 static __rte_always_inline void
-update_shadow_used_ring_packed(struct vhost_virtqueue *vq,
-                        uint16_t desc_idx, uint32_t len, uint16_t count)
+vhost_shadow_dequeue_single_packed_inorder(struct vhost_virtqueue *vq,
+                                          uint16_t buf_id,
+                                          uint16_t count)
 {
-       uint16_t i = vq->shadow_used_idx++;
+       uint16_t flags;
+
+       vq->shadow_used_packed[0].id = buf_id;
 
-       vq->shadow_used_packed[i].id  = desc_idx;
-       vq->shadow_used_packed[i].len = len;
-       vq->shadow_used_packed[i].count = count;
+       flags = vq->desc_packed[vq->last_used_idx].flags;
+       if (vq->used_wrap_counter) {
+               flags |= VRING_DESC_F_USED;
+               flags |= VRING_DESC_F_AVAIL;
+       } else {
+               flags &= ~VRING_DESC_F_USED;
+               flags &= ~VRING_DESC_F_AVAIL;
+       }
+
+       if (!vq->shadow_used_idx) {
+               vq->shadow_last_used_idx = vq->last_used_idx;
+               vq->shadow_used_packed[0].len = 0;
+               vq->shadow_used_packed[0].flags = flags;
+               vq->shadow_used_idx++;
+       }
+
+       vq_inc_last_used_packed(vq, count);
 }
 
 static inline void
@@ -403,7 +382,7 @@ vhost_shadow_enqueue_single_packed(struct virtio_net *dev,
        }
 }
 
-static __rte_unused void
+static __rte_always_inline void
 vhost_flush_dequeue_packed(struct virtio_net *dev,
                           struct vhost_virtqueue *vq)
 {
@@ -466,6 +445,7 @@ virtio_enqueue_offload(struct rte_mbuf *m_buf, struct virtio_net_hdr *net_hdr)
 
                ipv4_hdr = rte_pktmbuf_mtod_offset(m_buf, struct rte_ipv4_hdr *,
                                                   m_buf->l2_len);
+               ipv4_hdr->hdr_checksum = 0;
                ipv4_hdr->hdr_checksum = rte_ipv4_cksum(ipv4_hdr);
        }
 
@@ -1128,6 +1108,10 @@ virtio_dev_rx_batch_packed(struct virtio_net *dev,
                           pkts[i]->pkt_len);
        }
 
+       vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
+               vhost_log_cache_write_iova(dev, vq, descs[avail_idx + i].addr,
+                                          lens[i]);
+
        vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
                ids[i] = descs[avail_idx + i].id;
 
@@ -1175,7 +1159,8 @@ virtio_dev_rx_packed(struct virtio_net *dev,
                rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
 
                if (remained >= PACKED_BATCH_SIZE) {
-                       if (!virtio_dev_rx_batch_packed(dev, vq, pkts)) {
+                       if (!virtio_dev_rx_batch_packed(dev, vq,
+                                                       &pkts[pkt_idx])) {
                                pkt_idx += PACKED_BATCH_SIZE;
                                remained -= PACKED_BATCH_SIZE;
                                continue;
@@ -1672,8 +1657,11 @@ virtio_dev_pktmbuf_alloc(struct virtio_net *dev, struct rte_mempool *mp,
 {
        struct rte_mbuf *pkt = rte_pktmbuf_alloc(mp);
 
-       if (unlikely(pkt == NULL))
+       if (unlikely(pkt == NULL)) {
+               RTE_LOG(ERR, VHOST_DATA,
+                       "Failed to allocate memory for mbuf.\n");
                return NULL;
+       }
 
        if (rte_pktmbuf_tailroom(pkt) >= data_len)
                return pkt;
@@ -1893,7 +1881,7 @@ free_buf:
        return -1;
 }
 
-static __rte_unused int
+static __rte_always_inline int
 virtio_dev_tx_batch_packed(struct virtio_net *dev,
                           struct vhost_virtqueue *vq,
                           struct rte_mempool *mbuf_pool,
@@ -1917,7 +1905,11 @@ virtio_dev_tx_batch_packed(struct virtio_net *dev,
                           (void *)(uintptr_t)(desc_addrs[i] + buf_offset),
                           pkts[i]->pkt_len);
 
-       vhost_shadow_dequeue_batch_packed(dev, vq, ids);
+       if (virtio_net_is_inorder(dev))
+               vhost_shadow_dequeue_batch_packed_inorder(vq,
+                       ids[PACKED_BATCH_SIZE - 1]);
+       else
+               vhost_shadow_dequeue_batch_packed(dev, vq, ids);
 
        vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
 
@@ -1961,7 +1953,7 @@ vhost_dequeue_single_packed(struct virtio_net *dev,
        return 0;
 }
 
-static __rte_unused int
+static __rte_always_inline int
 virtio_dev_tx_single_packed(struct virtio_net *dev,
                            struct vhost_virtqueue *vq,
                            struct rte_mempool *mbuf_pool,
@@ -1974,119 +1966,209 @@ virtio_dev_tx_single_packed(struct virtio_net *dev,
                                        &desc_count))
                return -1;
 
-       vhost_shadow_dequeue_single_packed(vq, buf_id, desc_count);
+       if (virtio_net_is_inorder(dev))
+               vhost_shadow_dequeue_single_packed_inorder(vq, buf_id,
+                                                          desc_count);
+       else
+               vhost_shadow_dequeue_single_packed(vq, buf_id, desc_count);
 
        vq_inc_last_avail_packed(vq, desc_count);
 
        return 0;
 }
 
-static __rte_noinline uint16_t
-virtio_dev_tx_packed(struct virtio_net *dev, struct vhost_virtqueue *vq,
-       struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
+static __rte_always_inline int
+virtio_dev_tx_batch_packed_zmbuf(struct virtio_net *dev,
+                                struct vhost_virtqueue *vq,
+                                struct rte_mempool *mbuf_pool,
+                                struct rte_mbuf **pkts)
 {
+       struct zcopy_mbuf *zmbufs[PACKED_BATCH_SIZE];
+       uintptr_t desc_addrs[PACKED_BATCH_SIZE];
+       uint16_t ids[PACKED_BATCH_SIZE];
        uint16_t i;
 
-       if (unlikely(dev->dequeue_zero_copy)) {
-               struct zcopy_mbuf *zmbuf, *next;
+       uint16_t avail_idx = vq->last_avail_idx;
 
-               for (zmbuf = TAILQ_FIRST(&vq->zmbuf_list);
-                    zmbuf != NULL; zmbuf = next) {
-                       next = TAILQ_NEXT(zmbuf, next);
+       if (vhost_reserve_avail_batch_packed(dev, vq, mbuf_pool, pkts,
+                                            avail_idx, desc_addrs, ids))
+               return -1;
 
-                       if (mbuf_is_consumed(zmbuf->mbuf)) {
-                               update_shadow_used_ring_packed(vq,
-                                               zmbuf->desc_idx,
-                                               0,
-                                               zmbuf->desc_count);
+       vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
+               zmbufs[i] = get_zmbuf(vq);
 
-                               TAILQ_REMOVE(&vq->zmbuf_list, zmbuf, next);
-                               restore_mbuf(zmbuf->mbuf);
-                               rte_pktmbuf_free(zmbuf->mbuf);
-                               put_zmbuf(zmbuf);
-                               vq->nr_zmbuf -= 1;
-                       }
-               }
+       vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
+               if (!zmbufs[i])
+                       goto free_pkt;
+       }
 
-               if (likely(vq->shadow_used_idx)) {
-                       flush_shadow_used_ring_packed(dev, vq);
-                       vhost_vring_call_packed(dev, vq);
-               }
+       vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE) {
+               zmbufs[i]->mbuf = pkts[i];
+               zmbufs[i]->desc_idx = avail_idx + i;
+               zmbufs[i]->desc_count = 1;
        }
 
-       VHOST_LOG_DEBUG(VHOST_DATA, "(%d) %s\n", dev->vid, __func__);
+       vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
+               rte_mbuf_refcnt_update(pkts[i], 1);
 
-       count = RTE_MIN(count, MAX_PKT_BURST);
-       VHOST_LOG_DEBUG(VHOST_DATA, "(%d) about to dequeue %u buffers\n",
-                       dev->vid, count);
+       vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
+               TAILQ_INSERT_TAIL(&vq->zmbuf_list, zmbufs[i], next);
 
-       for (i = 0; i < count; i++) {
-               struct buf_vector buf_vec[BUF_VECTOR_MAX];
-               uint16_t buf_id;
-               uint32_t buf_len;
-               uint16_t desc_count, nr_vec = 0;
-               int err;
+       vq->nr_zmbuf += PACKED_BATCH_SIZE;
+       vq_inc_last_avail_packed(vq, PACKED_BATCH_SIZE);
 
-               if (unlikely(fill_vec_buf_packed(dev, vq,
-                                               vq->last_avail_idx, &desc_count,
-                                               buf_vec, &nr_vec,
-                                               &buf_id, &buf_len,
-                                               VHOST_ACCESS_RO) < 0))
-                       break;
+       return 0;
 
-               if (likely(dev->dequeue_zero_copy == 0))
-                       update_shadow_used_ring_packed(vq, buf_id, 0,
-                                       desc_count);
+free_pkt:
+       vhost_for_each_try_unroll(i, 0, PACKED_BATCH_SIZE)
+               rte_pktmbuf_free(pkts[i]);
 
-               pkts[i] = virtio_dev_pktmbuf_alloc(dev, mbuf_pool, buf_len);
-               if (unlikely(pkts[i] == NULL))
-                       break;
+       return -1;
+}
 
-               err = copy_desc_to_mbuf(dev, vq, buf_vec, nr_vec, pkts[i],
-                               mbuf_pool);
-               if (unlikely(err)) {
-                       rte_pktmbuf_free(pkts[i]);
-                       break;
+static __rte_always_inline int
+virtio_dev_tx_single_packed_zmbuf(struct virtio_net *dev,
+                                 struct vhost_virtqueue *vq,
+                                 struct rte_mempool *mbuf_pool,
+                                 struct rte_mbuf **pkts)
+{
+       uint16_t buf_id, desc_count;
+       struct zcopy_mbuf *zmbuf;
+
+       if (vhost_dequeue_single_packed(dev, vq, mbuf_pool, pkts, &buf_id,
+                                       &desc_count))
+               return -1;
+
+       zmbuf = get_zmbuf(vq);
+       if (!zmbuf) {
+               rte_pktmbuf_free(*pkts);
+               return -1;
+       }
+       zmbuf->mbuf = *pkts;
+       zmbuf->desc_idx = vq->last_avail_idx;
+       zmbuf->desc_count = desc_count;
+
+       rte_mbuf_refcnt_update(*pkts, 1);
+
+       vq->nr_zmbuf += 1;
+       TAILQ_INSERT_TAIL(&vq->zmbuf_list, zmbuf, next);
+
+       vq_inc_last_avail_packed(vq, desc_count);
+       return 0;
+}
+
+static __rte_always_inline void
+free_zmbuf(struct vhost_virtqueue *vq)
+{
+       struct zcopy_mbuf *next = NULL;
+       struct zcopy_mbuf *zmbuf;
+
+       for (zmbuf = TAILQ_FIRST(&vq->zmbuf_list);
+            zmbuf != NULL; zmbuf = next) {
+               next = TAILQ_NEXT(zmbuf, next);
+
+               uint16_t last_used_idx = vq->last_used_idx;
+
+               if (mbuf_is_consumed(zmbuf->mbuf)) {
+                       uint16_t flags;
+                       flags = vq->desc_packed[last_used_idx].flags;
+                       if (vq->used_wrap_counter) {
+                               flags |= VRING_DESC_F_USED;
+                               flags |= VRING_DESC_F_AVAIL;
+                       } else {
+                               flags &= ~VRING_DESC_F_USED;
+                               flags &= ~VRING_DESC_F_AVAIL;
+                       }
+
+                       vq->desc_packed[last_used_idx].id = zmbuf->desc_idx;
+                       vq->desc_packed[last_used_idx].len = 0;
+
+                       rte_smp_wmb();
+                       vq->desc_packed[last_used_idx].flags = flags;
+
+                       vq_inc_last_used_packed(vq, zmbuf->desc_count);
+
+                       TAILQ_REMOVE(&vq->zmbuf_list, zmbuf, next);
+                       restore_mbuf(zmbuf->mbuf);
+                       rte_pktmbuf_free(zmbuf->mbuf);
+                       put_zmbuf(zmbuf);
+                       vq->nr_zmbuf -= 1;
                }
+       }
+}
 
-               if (unlikely(dev->dequeue_zero_copy)) {
-                       struct zcopy_mbuf *zmbuf;
+static __rte_noinline uint16_t
+virtio_dev_tx_packed_zmbuf(struct virtio_net *dev,
+                          struct vhost_virtqueue *vq,
+                          struct rte_mempool *mbuf_pool,
+                          struct rte_mbuf **pkts,
+                          uint32_t count)
+{
+       uint32_t pkt_idx = 0;
+       uint32_t remained = count;
 
-                       zmbuf = get_zmbuf(vq);
-                       if (!zmbuf) {
-                               rte_pktmbuf_free(pkts[i]);
-                               break;
+       free_zmbuf(vq);
+
+       do {
+               if (remained >= PACKED_BATCH_SIZE) {
+                       if (!virtio_dev_tx_batch_packed_zmbuf(dev, vq,
+                               mbuf_pool, &pkts[pkt_idx])) {
+                               pkt_idx += PACKED_BATCH_SIZE;
+                               remained -= PACKED_BATCH_SIZE;
+                               continue;
                        }
-                       zmbuf->mbuf = pkts[i];
-                       zmbuf->desc_idx = buf_id;
-                       zmbuf->desc_count = desc_count;
+               }
 
-                       /*
-                        * Pin lock the mbuf; we will check later to see
-                        * whether the mbuf is freed (when we are the last
-                        * user) or not. If that's the case, we then could
-                        * update the used ring safely.
-                        */
-                       rte_mbuf_refcnt_update(pkts[i], 1);
+               if (virtio_dev_tx_single_packed_zmbuf(dev, vq, mbuf_pool,
+                                                     &pkts[pkt_idx]))
+                       break;
+               pkt_idx++;
+               remained--;
 
-                       vq->nr_zmbuf += 1;
-                       TAILQ_INSERT_TAIL(&vq->zmbuf_list, zmbuf, next);
+       } while (remained);
+
+       if (pkt_idx)
+               vhost_vring_call_packed(dev, vq);
+
+       return pkt_idx;
+}
+
+static __rte_noinline uint16_t
+virtio_dev_tx_packed(struct virtio_net *dev,
+                    struct vhost_virtqueue *vq,
+                    struct rte_mempool *mbuf_pool,
+                    struct rte_mbuf **pkts,
+                    uint32_t count)
+{
+       uint32_t pkt_idx = 0;
+       uint32_t remained = count;
+
+       do {
+               rte_prefetch0(&vq->desc_packed[vq->last_avail_idx]);
+
+               if (remained >= PACKED_BATCH_SIZE) {
+                       if (!virtio_dev_tx_batch_packed(dev, vq, mbuf_pool,
+                                                       &pkts[pkt_idx])) {
+                               vhost_flush_dequeue_packed(dev, vq);
+                               pkt_idx += PACKED_BATCH_SIZE;
+                               remained -= PACKED_BATCH_SIZE;
+                               continue;
+                       }
                }
 
-               vq_inc_last_avail_packed(vq, desc_count);
-       }
+               if (virtio_dev_tx_single_packed(dev, vq, mbuf_pool,
+                                               &pkts[pkt_idx]))
+                       break;
+               vhost_flush_dequeue_packed(dev, vq);
+               pkt_idx++;
+               remained--;
 
-       if (likely(dev->dequeue_zero_copy == 0)) {
+       } while (remained);
+
+       if (vq->shadow_used_idx)
                do_data_copy_dequeue(vq);
-               if (unlikely(i < count))
-                       vq->shadow_used_idx = i;
-               if (likely(vq->shadow_used_idx)) {
-                       flush_shadow_used_ring_packed(dev, vq);
-                       vhost_vring_call_packed(dev, vq);
-               }
-       }
 
-       return i;
+       return pkt_idx;
 }
 
 uint16_t
@@ -2162,9 +2244,14 @@ rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
                count -= 1;
        }
 
-       if (vq_is_packed(dev))
-               count = virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts, count);
-       else
+       if (vq_is_packed(dev)) {
+               if (unlikely(dev->dequeue_zero_copy))
+                       count = virtio_dev_tx_packed_zmbuf(dev, vq, mbuf_pool,
+                                                          pkts, count);
+               else
+                       count = virtio_dev_tx_packed(dev, vq, mbuf_pool, pkts,
+                                                    count);
+       } else
                count = virtio_dev_tx_split(dev, vq, mbuf_pool, pkts, count);
 
 out: