virtio: various improvements
[dpdk.git] / lib / librte_pmd_virtio / virtqueue.h
index c1cf146..f3c7776 100644 (file)
 #define wmb() rte_wmb()
 #define rmb() rte_rmb()
 
+#ifdef RTE_PMD_PACKET_PREFETCH
+#define rte_packet_prefetch(p)  rte_prefetch1(p)
+#else
+#define rte_packet_prefetch(p)  do {} while(0)
+#endif
+
 #define VIRTQUEUE_MAX_NAME_SZ 32
 
 #define RTE_MBUF_DATA_DMA_ADDR(mb) \
@@ -121,11 +127,13 @@ struct virtqueue {
         * VQ_RING_DESC_CHAIN_END.
         */
        uint16_t  vq_desc_head_idx;
+       uint16_t  vq_desc_tail_idx;
        /**
         * Last consumed descriptor in the used table,
         * trails vq_ring.used->idx.
         */
        uint16_t vq_used_cons_idx;
+       uint16_t vq_avail_idx;
        void     *virtio_net_hdr_mem; /**< hdr for each xmit packet */
 
        struct vq_desc_extra {
@@ -184,20 +192,25 @@ virtqueue_full(const struct virtqueue *vq)
 #define VIRTQUEUE_NUSED(vq) ((uint16_t)((vq)->vq_ring.used->idx - (vq)->vq_used_cons_idx))
 
 static inline void
-vq_ring_update_avail(struct virtqueue *vq, uint16_t desc_idx)
+vq_update_avail_idx(struct virtqueue *vq)
+{
+       rte_compiler_barrier();
+       vq->vq_ring.avail->idx = vq->vq_avail_idx;
+}
+
+static inline void
+vq_update_avail_ring(struct virtqueue *vq, uint16_t desc_idx)
 {
        uint16_t avail_idx;
        /*
         * Place the head of the descriptor chain into the next slot and make
-        * it usable to the host. The chain is made available now rather than
-        * deferring to virtqueue_notify() in the hopes that if the host is
-        * currently running on another CPU, we can keep it processing the new
-        * descriptor.
+        * it usable to the host. We wait to inform the host until after the burst 
+        * is complete to avoid cache alignment issues with descriptors. This 
+        * also helps to avoid any contention on the available index.
         */
-       avail_idx = (uint16_t)(vq->vq_ring.avail->idx & (vq->vq_nentries - 1));
+       avail_idx = (uint16_t)(vq->vq_avail_idx & (vq->vq_nentries - 1));
        vq->vq_ring.avail->ring[avail_idx] = desc_idx;
-       mb();
-       vq->vq_ring.avail->idx++;
+       vq->vq_avail_idx++;
 }
 
 static inline int virtqueue_kick_prepare(struct virtqueue * vq)
@@ -219,14 +232,16 @@ virtqueue_notify(struct virtqueue *vq)
 static inline void
 vq_ring_free_chain(struct virtqueue *vq, uint16_t desc_idx)
 {
-       struct vring_desc *dp;
+       struct vring_desc *dp, *dp_tail;
        struct vq_desc_extra *dxp;
+       uint16_t desc_idx_last = desc_idx;
 
        dp  = &vq->vq_ring.desc[desc_idx];
        dxp = &vq->vq_descx[desc_idx];
        vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt + dxp->ndescs);
        if ((dp->flags & VRING_DESC_F_INDIRECT) == 0) {
                while (dp->flags & VRING_DESC_F_NEXT) {
+                       desc_idx_last = dp->next; 
                        dp = &vq->vq_ring.desc[dp->next];
                }
        }
@@ -237,8 +252,14 @@ vq_ring_free_chain(struct virtqueue *vq, uint16_t desc_idx)
         * newly freed chain. If the virtqueue was completely used, then
         * head would be VQ_RING_DESC_CHAIN_END (ASSERTed above).
         */
-       dp->next = vq->vq_desc_head_idx;
-       vq->vq_desc_head_idx = desc_idx;
+       if (vq->vq_desc_tail_idx == VQ_RING_DESC_CHAIN_END) {
+               vq->vq_desc_head_idx = desc_idx;
+       } else {
+               dp_tail = &vq->vq_ring.desc[vq->vq_desc_tail_idx];
+               dp_tail->next = desc_idx;
+       }
+       vq->vq_desc_tail_idx = desc_idx_last;
+       dp->next = VQ_RING_DESC_CHAIN_END;
 }
 
 static inline int
@@ -271,8 +292,10 @@ virtqueue_enqueue_recv_refill(struct virtqueue *vq, struct rte_mbuf *cookie)
        start_dp[idx].flags =  VRING_DESC_F_WRITE;
        idx = start_dp[idx].next;
        vq->vq_desc_head_idx = idx;
+       if (vq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END)
+               vq->vq_desc_tail_idx = idx; 
        vq->vq_free_cnt = (uint16_t)(vq->vq_free_cnt - needed);
-       vq_ring_update_avail(vq, head_idx);
+       vq_update_avail_ring(vq, head_idx);
 
        return (0);
 }
@@ -295,6 +318,8 @@ virtqueue_enqueue_xmit(struct virtqueue *txvq, struct rte_mbuf *cookie)
 
        idx = head_idx;
        dxp = &txvq->vq_descx[idx];
+       if (dxp->cookie != NULL)
+               rte_pktmbuf_free_seg(dxp->cookie);
        dxp->cookie = (void *)cookie;
        dxp->ndescs = needed;
 
@@ -308,31 +333,36 @@ virtqueue_enqueue_xmit(struct virtqueue *txvq, struct rte_mbuf *cookie)
        start_dp[idx].flags = 0;
        idx = start_dp[idx].next;
        txvq->vq_desc_head_idx = idx;
+       if (txvq->vq_desc_head_idx == VQ_RING_DESC_CHAIN_END)
+               txvq->vq_desc_tail_idx = idx; 
        txvq->vq_free_cnt = (uint16_t)(txvq->vq_free_cnt - needed);
-       vq_ring_update_avail(txvq, head_idx);
+       vq_update_avail_ring(txvq, head_idx);
 
        return (0);
 }
 
 static inline uint16_t
-virtqueue_dequeue_burst(struct virtqueue *vq, struct rte_mbuf **rx_pkts, uint32_t *len, uint16_t num)
+virtqueue_dequeue_burst_rx(struct virtqueue *vq, struct rte_mbuf **rx_pkts, uint32_t *len, uint16_t num)
 {
        struct vring_used_elem *uep;
        struct rte_mbuf *cookie;
        uint16_t used_idx, desc_idx;
        uint16_t i;
+
        /*  Caller does the check */
        for (i = 0; i < num ; i ++) {
                used_idx = (uint16_t)(vq->vq_used_cons_idx & (vq->vq_nentries - 1));
                uep = &vq->vq_ring.used->ring[used_idx];
                desc_idx = (uint16_t) uep->id;
+               len[i] = uep->len;
                cookie = (struct rte_mbuf *)vq->vq_descx[desc_idx].cookie;
                if (unlikely(cookie == NULL)) {
                        PMD_DRV_LOG(ERR, "vring descriptor with no mbuf cookie at %u\n", 
                                vq->vq_used_cons_idx);
                        break;
                }
-               len[i] = uep->len;
+               rte_prefetch0(cookie);
+               rte_packet_prefetch(cookie->pkt.data);
                rx_pkts[i]  = cookie;
                vq->vq_used_cons_idx++;
                vq_ring_free_chain(vq, desc_idx);
@@ -341,6 +371,21 @@ virtqueue_dequeue_burst(struct virtqueue *vq, struct rte_mbuf **rx_pkts, uint32_
        return (i);
 }
 
+static inline uint16_t
+virtqueue_dequeue_pkt_tx(struct virtqueue *vq)
+{
+        struct vring_used_elem *uep;
+        uint16_t used_idx, desc_idx;
+
+        used_idx = (uint16_t)(vq->vq_used_cons_idx & (vq->vq_nentries - 1));
+        uep = &vq->vq_ring.used->ring[used_idx];
+        desc_idx = (uint16_t) uep->id;
+               vq->vq_used_cons_idx++;
+        vq_ring_free_chain(vq, desc_idx);
+
+        return 0;
+}
+
 #ifdef  RTE_LIBRTE_VIRTIO_DEBUG_DUMP
 #define VIRTQUEUE_DUMP(vq) do { \
        uint16_t used_idx, nused; \