From: Joyce Kong Date: Thu, 30 Apr 2020 09:14:36 +0000 (+0800) Subject: virtio: use one way barrier for split vring used index X-Git-Url: http://git.droids-corp.org/?a=commitdiff_plain;h=ea5207c158edb00d7e5da17369ea45a462a40dcc;p=dpdk.git virtio: use one way barrier for split vring used index In case VIRTIO_F_ORDER_PLATFORM(36) is not negotiated, then the frontend and backend are assumed to be implemented in software, that is they can run on identical CPUs in an SMP configuration. Thus a weak form of memory barriers like rte_smp_r/wmb, other than rte_cio_r/wmb, is sufficient for this case(vq->hw->weak_barriers == 1) and yields better performance. For the above case, this patch helps yielding even better performance by replacing the two-way barriers with C11 one-way barriers for used index in split ring. Signed-off-by: Joyce Kong Reviewed-by: Gavin Hu Reviewed-by: Maxime Coquelin --- diff --git a/drivers/net/virtio/virtio_ethdev.c b/drivers/net/virtio/virtio_ethdev.c index e86d4e08fd..312871cb48 100644 --- a/drivers/net/virtio/virtio_ethdev.c +++ b/drivers/net/virtio/virtio_ethdev.c @@ -290,13 +290,10 @@ virtio_send_command_split(struct virtnet_ctl *cvq, virtqueue_notify(vq); - rte_rmb(); - while (VIRTQUEUE_NUSED(vq) == 0) { - rte_rmb(); + while (virtqueue_nused(vq) == 0) usleep(100); - } - while (VIRTQUEUE_NUSED(vq)) { + while (virtqueue_nused(vq)) { uint32_t idx, desc_idx, used_idx; struct vring_used_elem *uep; diff --git a/drivers/net/virtio/virtio_ring.h b/drivers/net/virtio/virtio_ring.h index 7ba34662e5..0f6574f684 100644 --- a/drivers/net/virtio/virtio_ring.h +++ b/drivers/net/virtio/virtio_ring.h @@ -59,7 +59,7 @@ struct vring_used_elem { struct vring_used { uint16_t flags; - volatile uint16_t idx; + uint16_t idx; struct vring_used_elem ring[0]; }; diff --git a/drivers/net/virtio/virtio_rxtx.c b/drivers/net/virtio/virtio_rxtx.c index 050541a105..f915b8a2c9 100644 --- a/drivers/net/virtio/virtio_rxtx.c +++ b/drivers/net/virtio/virtio_rxtx.c @@ -45,7 +45,7 @@ virtio_dev_rx_queue_done(void *rxq, uint16_t offset) struct virtnet_rx *rxvq = rxq; struct virtqueue *vq = rxvq->vq; - return VIRTQUEUE_NUSED(vq) >= offset; + return virtqueue_nused(vq) >= offset; } void @@ -968,9 +968,7 @@ virtio_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts, uint16_t nb_pkts) if (unlikely(hw->started == 0)) return nb_rx; - nb_used = VIRTQUEUE_NUSED(vq); - - virtio_rmb(hw->weak_barriers); + nb_used = virtqueue_nused(vq); num = likely(nb_used <= nb_pkts) ? nb_used : nb_pkts; if (unlikely(num > VIRTIO_MBUF_BURST_SZ)) @@ -1183,12 +1181,10 @@ virtio_recv_pkts_inorder(void *rx_queue, if (unlikely(hw->started == 0)) return nb_rx; - nb_used = VIRTQUEUE_NUSED(vq); + nb_used = virtqueue_nused(vq); nb_used = RTE_MIN(nb_used, nb_pkts); nb_used = RTE_MIN(nb_used, VIRTIO_MBUF_BURST_SZ); - virtio_rmb(hw->weak_barriers); - PMD_RX_LOG(DEBUG, "used:%d", nb_used); nb_enqueued = 0; @@ -1277,8 +1273,7 @@ virtio_recv_pkts_inorder(void *rx_queue, uint16_t rcv_cnt = RTE_MIN((uint16_t)seg_res, VIRTIO_MBUF_BURST_SZ); - if (likely(VIRTQUEUE_NUSED(vq) >= rcv_cnt)) { - virtio_rmb(hw->weak_barriers); + if (likely(virtqueue_nused(vq) >= rcv_cnt)) { num = virtqueue_dequeue_rx_inorder(vq, rcv_pkts, len, rcv_cnt); uint16_t extra_idx = 0; @@ -1369,9 +1364,7 @@ virtio_recv_mergeable_pkts(void *rx_queue, if (unlikely(hw->started == 0)) return nb_rx; - nb_used = VIRTQUEUE_NUSED(vq); - - virtio_rmb(hw->weak_barriers); + nb_used = virtqueue_nused(vq); PMD_RX_LOG(DEBUG, "used:%d", nb_used); @@ -1459,8 +1452,7 @@ virtio_recv_mergeable_pkts(void *rx_queue, uint16_t rcv_cnt = RTE_MIN((uint16_t)seg_res, VIRTIO_MBUF_BURST_SZ); - if (likely(VIRTQUEUE_NUSED(vq) >= rcv_cnt)) { - virtio_rmb(hw->weak_barriers); + if (likely(virtqueue_nused(vq) >= rcv_cnt)) { num = virtqueue_dequeue_burst_rx(vq, rcv_pkts, len, rcv_cnt); uint16_t extra_idx = 0; @@ -1833,9 +1825,9 @@ virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts) return nb_pkts; PMD_TX_LOG(DEBUG, "%d packets to xmit", nb_pkts); - nb_used = VIRTQUEUE_NUSED(vq); - virtio_rmb(hw->weak_barriers); + nb_used = virtqueue_nused(vq); + if (likely(nb_used > vq->vq_nentries - vq->vq_free_thresh)) virtio_xmit_cleanup(vq, nb_used); @@ -1867,8 +1859,8 @@ virtio_xmit_pkts(void *tx_queue, struct rte_mbuf **tx_pkts, uint16_t nb_pkts) /* Positive value indicates it need free vring descriptors */ if (unlikely(need > 0)) { - nb_used = VIRTQUEUE_NUSED(vq); - virtio_rmb(hw->weak_barriers); + nb_used = virtqueue_nused(vq); + need = RTE_MIN(need, (int)nb_used); virtio_xmit_cleanup(vq, need); @@ -1905,11 +1897,9 @@ static __rte_always_inline int virtio_xmit_try_cleanup_inorder(struct virtqueue *vq, uint16_t need) { uint16_t nb_used, nb_clean, nb_descs; - struct virtio_hw *hw = vq->hw; nb_descs = vq->vq_free_cnt + need; - nb_used = VIRTQUEUE_NUSED(vq); - virtio_rmb(hw->weak_barriers); + nb_used = virtqueue_nused(vq); nb_clean = RTE_MIN(need, (int)nb_used); virtio_xmit_cleanup_inorder(vq, nb_clean); @@ -1938,9 +1928,8 @@ virtio_xmit_pkts_inorder(void *tx_queue, VIRTQUEUE_DUMP(vq); PMD_TX_LOG(DEBUG, "%d packets to xmit", nb_pkts); - nb_used = VIRTQUEUE_NUSED(vq); + nb_used = virtqueue_nused(vq); - virtio_rmb(hw->weak_barriers); if (likely(nb_used > vq->vq_nentries - vq->vq_free_thresh)) virtio_xmit_cleanup_inorder(vq, nb_used); diff --git a/drivers/net/virtio/virtio_rxtx_simple_altivec.c b/drivers/net/virtio/virtio_rxtx_simple_altivec.c index 003b6ec3f6..a260ebdf57 100644 --- a/drivers/net/virtio/virtio_rxtx_simple_altivec.c +++ b/drivers/net/virtio/virtio_rxtx_simple_altivec.c @@ -85,7 +85,7 @@ virtio_recv_pkts_vec(void *rx_queue, struct rte_mbuf **rx_pkts, if (unlikely(nb_pkts < RTE_VIRTIO_DESC_PER_LOOP)) return 0; - nb_used = VIRTQUEUE_NUSED(vq); + nb_used = virtqueue_nused(vq); rte_compiler_barrier(); diff --git a/drivers/net/virtio/virtio_rxtx_simple_neon.c b/drivers/net/virtio/virtio_rxtx_simple_neon.c index 992e71f010..363e2b330b 100644 --- a/drivers/net/virtio/virtio_rxtx_simple_neon.c +++ b/drivers/net/virtio/virtio_rxtx_simple_neon.c @@ -83,9 +83,8 @@ virtio_recv_pkts_vec(void *rx_queue, struct rte_mbuf **rx_pkts, if (unlikely(nb_pkts < RTE_VIRTIO_DESC_PER_LOOP)) return 0; - nb_used = VIRTQUEUE_NUSED(vq); - - rte_rmb(); + /* virtqueue_nused has a load-acquire or rte_cio_rmb inside */ + nb_used = virtqueue_nused(vq); if (unlikely(nb_used == 0)) return 0; diff --git a/drivers/net/virtio/virtio_rxtx_simple_sse.c b/drivers/net/virtio/virtio_rxtx_simple_sse.c index f9ec4ae699..1056e9c20b 100644 --- a/drivers/net/virtio/virtio_rxtx_simple_sse.c +++ b/drivers/net/virtio/virtio_rxtx_simple_sse.c @@ -85,9 +85,7 @@ virtio_recv_pkts_vec(void *rx_queue, struct rte_mbuf **rx_pkts, if (unlikely(nb_pkts < RTE_VIRTIO_DESC_PER_LOOP)) return 0; - nb_used = VIRTQUEUE_NUSED(vq); - - rte_compiler_barrier(); + nb_used = virtqueue_nused(vq); if (unlikely(nb_used == 0)) return 0; diff --git a/drivers/net/virtio/virtio_user/virtio_user_dev.c b/drivers/net/virtio/virtio_user/virtio_user_dev.c index 1c6b26f8d3..7fb135f49a 100644 --- a/drivers/net/virtio/virtio_user/virtio_user_dev.c +++ b/drivers/net/virtio/virtio_user/virtio_user_dev.c @@ -730,8 +730,10 @@ virtio_user_handle_cq(struct virtio_user_dev *dev, uint16_t queue_idx) struct vring *vring = &dev->vrings[queue_idx]; /* Consume avail ring, using used ring idx as first one */ - while (vring->used->idx != vring->avail->idx) { - avail_idx = (vring->used->idx) & (vring->num - 1); + while (__atomic_load_n(&vring->used->idx, __ATOMIC_RELAXED) + != vring->avail->idx) { + avail_idx = __atomic_load_n(&vring->used->idx, __ATOMIC_RELAXED) + & (vring->num - 1); desc_idx = vring->avail->ring[avail_idx]; n_descs = virtio_user_handle_ctrl_msg(dev, vring, desc_idx); @@ -741,6 +743,6 @@ virtio_user_handle_cq(struct virtio_user_dev *dev, uint16_t queue_idx) uep->id = desc_idx; uep->len = n_descs; - vring->used->idx++; + __atomic_add_fetch(&vring->used->idx, 1, __ATOMIC_RELAXED); } } diff --git a/drivers/net/virtio/virtqueue.c b/drivers/net/virtio/virtqueue.c index ca23180de9..408bba236a 100644 --- a/drivers/net/virtio/virtqueue.c +++ b/drivers/net/virtio/virtqueue.c @@ -93,7 +93,7 @@ virtqueue_rxvq_flush_split(struct virtqueue *vq) uint16_t used_idx, desc_idx; uint16_t nb_used, i; - nb_used = VIRTQUEUE_NUSED(vq); + nb_used = virtqueue_nused(vq); for (i = 0; i < nb_used; i++) { used_idx = vq->vq_used_cons_idx & (vq->vq_nentries - 1); diff --git a/drivers/net/virtio/virtqueue.h b/drivers/net/virtio/virtqueue.h index d422fac8c2..4f6956d845 100644 --- a/drivers/net/virtio/virtqueue.h +++ b/drivers/net/virtio/virtqueue.h @@ -469,8 +469,33 @@ virtio_get_queue_type(struct virtio_hw *hw, uint16_t vtpci_queue_idx) return VTNET_TQ; } -#define VIRTQUEUE_NUSED(vq) ((uint16_t)((vq)->vq_split.ring.used->idx - \ - (vq)->vq_used_cons_idx)) +/* virtqueue_nused has load-acquire or rte_cio_rmb insed */ +static inline uint16_t +virtqueue_nused(const struct virtqueue *vq) +{ + uint16_t idx; + + if (vq->hw->weak_barriers) { + /** + * x86 prefers to using rte_smp_rmb over __atomic_load_n as it + * reports a slightly better perf, which comes from the saved + * branch by the compiler. + * The if and else branches are identical with the smp and cio + * barriers both defined as compiler barriers on x86. + */ +#ifdef RTE_ARCH_X86_64 + idx = vq->vq_split.ring.used->idx; + rte_smp_rmb(); +#else + idx = __atomic_load_n(&(vq)->vq_split.ring.used->idx, + __ATOMIC_ACQUIRE); +#endif + } else { + idx = vq->vq_split.ring.used->idx; + rte_cio_rmb(); + } + return idx - vq->vq_used_cons_idx; +} void vq_ring_free_chain(struct virtqueue *vq, uint16_t desc_idx); void vq_ring_free_chain_packed(struct virtqueue *vq, uint16_t used_idx); @@ -539,7 +564,8 @@ virtqueue_notify(struct virtqueue *vq) #ifdef RTE_LIBRTE_VIRTIO_DEBUG_DUMP #define VIRTQUEUE_DUMP(vq) do { \ uint16_t used_idx, nused; \ - used_idx = (vq)->vq_split.ring.used->idx; \ + used_idx = __atomic_load_n(&(vq)->vq_split.ring.used->idx, \ + __ATOMIC_RELAXED); \ nused = (uint16_t)(used_idx - (vq)->vq_used_cons_idx); \ if (vtpci_packed_queue((vq)->hw)) { \ PMD_INIT_LOG(DEBUG, \ @@ -554,9 +580,9 @@ virtqueue_notify(struct virtqueue *vq) "VQ: - size=%d; free=%d; used=%d; desc_head_idx=%d;" \ " avail.idx=%d; used_cons_idx=%d; used.idx=%d;" \ " avail.flags=0x%x; used.flags=0x%x", \ - (vq)->vq_nentries, (vq)->vq_free_cnt, nused, \ - (vq)->vq_desc_head_idx, (vq)->vq_split.ring.avail->idx, \ - (vq)->vq_used_cons_idx, (vq)->vq_split.ring.used->idx, \ + (vq)->vq_nentries, (vq)->vq_free_cnt, nused, (vq)->vq_desc_head_idx, \ + (vq)->vq_split.ring.avail->idx, (vq)->vq_used_cons_idx, \ + __atomic_load_n(&(vq)->vq_split.ring.used->idx, __ATOMIC_RELAXED), \ (vq)->vq_split.ring.avail->flags, (vq)->vq_split.ring.used->flags); \ } while (0) #else diff --git a/lib/librte_vhost/virtio_net.c b/lib/librte_vhost/virtio_net.c index 62f37da073..e38293df96 100644 --- a/lib/librte_vhost/virtio_net.c +++ b/lib/librte_vhost/virtio_net.c @@ -107,11 +107,10 @@ flush_shadow_used_ring_split(struct virtio_net *dev, struct vhost_virtqueue *vq) } vq->last_used_idx += vq->shadow_used_idx; - rte_smp_wmb(); - vhost_log_cache_sync(dev, vq); - *(volatile uint16_t *)&vq->used->idx += vq->shadow_used_idx; + __atomic_add_fetch(&vq->used->idx, vq->shadow_used_idx, + __ATOMIC_RELEASE); vq->shadow_used_idx = 0; vhost_log_used_vring(dev, vq, offsetof(struct vring_used, idx), sizeof(vq->used->idx));