vhost: add dequeue zero copy
[dpdk.git] / lib / librte_vhost / virtio_net.c
index 70301a5..74263a3 100644 (file)
@@ -678,6 +678,12 @@ make_rarp_packet(struct rte_mbuf *rarp_mbuf, const struct ether_addr *mac)
        return 0;
 }
 
+static inline void __attribute__((always_inline))
+put_zmbuf(struct zcopy_mbuf *zmbuf)
+{
+       zmbuf->in_use = 0;
+}
+
 static inline int __attribute__((always_inline))
 copy_desc_to_mbuf(struct virtio_net *dev, struct vring_desc *descs,
                  uint16_t max_desc, struct rte_mbuf *m, uint16_t desc_idx,
@@ -735,10 +741,33 @@ copy_desc_to_mbuf(struct virtio_net *dev, struct vring_desc *descs,
        mbuf_offset = 0;
        mbuf_avail  = m->buf_len - RTE_PKTMBUF_HEADROOM;
        while (1) {
+               uint64_t hpa;
+
                cpy_len = RTE_MIN(desc_avail, mbuf_avail);
-               rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *, mbuf_offset),
-                       (void *)((uintptr_t)(desc_addr + desc_offset)),
-                       cpy_len);
+
+               /*
+                * A desc buf might across two host physical pages that are
+                * not continuous. In such case (gpa_to_hpa returns 0), data
+                * will be copied even though zero copy is enabled.
+                */
+               if (unlikely(dev->dequeue_zero_copy && (hpa = gpa_to_hpa(dev,
+                                       desc->addr + desc_offset, cpy_len)))) {
+                       cur->data_len = cpy_len;
+                       cur->data_off = 0;
+                       cur->buf_addr = (void *)(uintptr_t)desc_addr;
+                       cur->buf_physaddr = hpa;
+
+                       /*
+                        * In zero copy mode, one mbuf can only reference data
+                        * for one or partial of one desc buff.
+                        */
+                       mbuf_avail = cpy_len;
+               } else {
+                       rte_memcpy(rte_pktmbuf_mtod_offset(cur, void *,
+                                                          mbuf_offset),
+                               (void *)((uintptr_t)(desc_addr + desc_offset)),
+                               cpy_len);
+               }
 
                mbuf_avail  -= cpy_len;
                mbuf_offset += cpy_len;
@@ -801,6 +830,80 @@ copy_desc_to_mbuf(struct virtio_net *dev, struct vring_desc *descs,
        return 0;
 }
 
+static inline void __attribute__((always_inline))
+update_used_ring(struct virtio_net *dev, struct vhost_virtqueue *vq,
+                uint32_t used_idx, uint32_t desc_idx)
+{
+       vq->used->ring[used_idx].id  = desc_idx;
+       vq->used->ring[used_idx].len = 0;
+       vhost_log_used_vring(dev, vq,
+                       offsetof(struct vring_used, ring[used_idx]),
+                       sizeof(vq->used->ring[used_idx]));
+}
+
+static inline void __attribute__((always_inline))
+update_used_idx(struct virtio_net *dev, struct vhost_virtqueue *vq,
+               uint32_t count)
+{
+       if (unlikely(count == 0))
+               return;
+
+       rte_smp_wmb();
+       rte_smp_rmb();
+
+       vq->used->idx += count;
+       vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
+                       sizeof(vq->used->idx));
+
+       /* Kick guest if required. */
+       if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)
+                       && (vq->callfd >= 0))
+               eventfd_write(vq->callfd, (eventfd_t)1);
+}
+
+static inline struct zcopy_mbuf *__attribute__((always_inline))
+get_zmbuf(struct vhost_virtqueue *vq)
+{
+       uint16_t i;
+       uint16_t last;
+       int tries = 0;
+
+       /* search [last_zmbuf_idx, zmbuf_size) */
+       i = vq->last_zmbuf_idx;
+       last = vq->zmbuf_size;
+
+again:
+       for (; i < last; i++) {
+               if (vq->zmbufs[i].in_use == 0) {
+                       vq->last_zmbuf_idx = i + 1;
+                       vq->zmbufs[i].in_use = 1;
+                       return &vq->zmbufs[i];
+               }
+       }
+
+       tries++;
+       if (tries == 1) {
+               /* search [0, last_zmbuf_idx) */
+               i = 0;
+               last = vq->last_zmbuf_idx;
+               goto again;
+       }
+
+       return NULL;
+}
+
+static inline bool __attribute__((always_inline))
+mbuf_is_consumed(struct rte_mbuf *m)
+{
+       while (m) {
+               if (rte_mbuf_refcnt_read(m) > 1)
+                       return false;
+               m = m->next;
+       }
+
+       return true;
+}
+
 uint16_t
 rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
        struct rte_mempool *mbuf_pool, struct rte_mbuf **pkts, uint16_t count)
@@ -828,6 +931,30 @@ rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
        if (unlikely(vq->enabled == 0))
                return 0;
 
+       if (unlikely(dev->dequeue_zero_copy)) {
+               struct zcopy_mbuf *zmbuf, *next;
+               int nr_updated = 0;
+
+               for (zmbuf = TAILQ_FIRST(&vq->zmbuf_list);
+                    zmbuf != NULL; zmbuf = next) {
+                       next = TAILQ_NEXT(zmbuf, next);
+
+                       if (mbuf_is_consumed(zmbuf->mbuf)) {
+                               used_idx = vq->last_used_idx++ & (vq->size - 1);
+                               update_used_ring(dev, vq, used_idx,
+                                                zmbuf->desc_idx);
+                               nr_updated += 1;
+
+                               TAILQ_REMOVE(&vq->zmbuf_list, zmbuf, next);
+                               rte_pktmbuf_free(zmbuf->mbuf);
+                               put_zmbuf(zmbuf);
+                               vq->nr_zmbuf -= 1;
+                       }
+               }
+
+               update_used_idx(dev, vq, nr_updated);
+       }
+
        /*
         * Construct a RARP broadcast packet, and inject it to the "pkts"
         * array, to looks like that guest actually send such packet.
@@ -875,11 +1002,8 @@ rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
                used_idx  = (vq->last_used_idx  + i) & (vq->size - 1);
                desc_indexes[i] = vq->avail->ring[avail_idx];
 
-               vq->used->ring[used_idx].id  = desc_indexes[i];
-               vq->used->ring[used_idx].len = 0;
-               vhost_log_used_vring(dev, vq,
-                               offsetof(struct vring_used, ring[used_idx]),
-                               sizeof(vq->used->ring[used_idx]));
+               if (likely(dev->dequeue_zero_copy == 0))
+                       update_used_ring(dev, vq, used_idx, desc_indexes[i]);
        }
 
        /* Prefetch descriptor index. */
@@ -913,25 +1037,42 @@ rte_vhost_dequeue_burst(int vid, uint16_t queue_id,
                                "Failed to allocate memory for mbuf.\n");
                        break;
                }
+
                err = copy_desc_to_mbuf(dev, desc, sz, pkts[i], idx, mbuf_pool);
                if (unlikely(err)) {
                        rte_pktmbuf_free(pkts[i]);
                        break;
                }
-       }
 
-       rte_smp_wmb();
-       rte_smp_rmb();
-       vq->used->idx += i;
+               if (unlikely(dev->dequeue_zero_copy)) {
+                       struct zcopy_mbuf *zmbuf;
+
+                       zmbuf = get_zmbuf(vq);
+                       if (!zmbuf) {
+                               rte_pktmbuf_free(pkts[i]);
+                               break;
+                       }
+                       zmbuf->mbuf = pkts[i];
+                       zmbuf->desc_idx = desc_indexes[i];
+
+                       /*
+                        * Pin lock the mbuf; we will check later to see
+                        * whether the mbuf is freed (when we are the last
+                        * user) or not. If that's the case, we then could
+                        * update the used ring safely.
+                        */
+                       rte_mbuf_refcnt_update(pkts[i], 1);
+
+                       vq->nr_zmbuf += 1;
+                       TAILQ_INSERT_TAIL(&vq->zmbuf_list, zmbuf, next);
+               }
+       }
        vq->last_avail_idx += i;
-       vq->last_used_idx  += i;
-       vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx),
-                       sizeof(vq->used->idx));
 
-       /* Kick guest if required. */
-       if (!(vq->avail->flags & VRING_AVAIL_F_NO_INTERRUPT)
-                       && (vq->callfd >= 0))
-               eventfd_write(vq->callfd, (eventfd_t)1);
+       if (likely(dev->dequeue_zero_copy == 0)) {
+               vq->last_used_idx += i;
+               update_used_idx(dev, vq, i);
+       }
 
 out:
        if (unlikely(rarp_mbuf != NULL)) {