net/memif: relax barrier for zero copy path
authorPhil Yang <phil.yang@arm.com>
Fri, 11 Sep 2020 05:38:19 +0000 (13:38 +0800)
committerFerruh Yigit <ferruh.yigit@intel.com>
Mon, 21 Sep 2020 16:05:38 +0000 (18:05 +0200)
Using 'rte_mb' to synchronize the shared ring head/tail between producer
and consumer will stall the pipeline and damage performance on the weak
memory model platforms, such like aarch64.

Relax the expensive barrier with c11 atomic with explicit memory
ordering can improve 3.6% performance on throughput.

Signed-off-by: Phil Yang <phil.yang@arm.com>
Reviewed-by: Ruifeng Wang <ruifeng.wang@arm.com>
Reviewed-by: Jakub Grajciar <jgrajcia@cisco.com>
drivers/net/memif/rte_eth_memif.c

index c1c7e9f..a19c0f3 100644 (file)
@@ -253,7 +253,12 @@ memif_free_stored_mbufs(struct pmd_process_private *proc_private, struct memif_q
        memif_ring_t *ring = memif_get_ring_from_queue(proc_private, mq);
 
        /* FIXME: improve performance */
-       while (mq->last_tail != ring->tail) {
+       /* The ring->tail acts as a guard variable between Tx and Rx
+        * threads, so using load-acquire pairs with store-release
+        * to synchronize it between threads.
+        */
+       while (mq->last_tail != __atomic_load_n(&ring->tail,
+                                               __ATOMIC_ACQUIRE)) {
                RTE_MBUF_PREFETCH_TO_FREE(mq->buffers[(mq->last_tail + 1) & mask]);
                /* Decrement refcnt and free mbuf. (current segment) */
                rte_mbuf_refcnt_update(mq->buffers[mq->last_tail & mask], -1);
@@ -455,7 +460,11 @@ eth_memif_rx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
        mask = ring_size - 1;
 
        cur_slot = mq->last_tail;
-       last_slot = ring->tail;
+       /* The ring->tail acts as a guard variable between Tx and Rx
+        * threads, so using load-acquire pairs with store-release
+        * to synchronize it between threads.
+        */
+       last_slot = __atomic_load_n(&ring->tail, __ATOMIC_ACQUIRE);
        if (cur_slot == last_slot)
                goto refill;
        n_slots = last_slot - cur_slot;
@@ -501,7 +510,11 @@ next_slot:
 
 /* Supply master with new buffers */
 refill:
-       head = ring->head;
+       /* The ring->head acts as a guard variable between Tx and Rx
+        * threads, so using load-acquire pairs with store-release
+        * to synchronize it between threads.
+        */
+       head = __atomic_load_n(&ring->head, __ATOMIC_ACQUIRE);
        n_slots = ring_size - head + mq->last_tail;
 
        if (n_slots < 32)
@@ -526,8 +539,7 @@ refill:
                        (uint8_t *)proc_private->regions[d0->region]->addr;
        }
 no_free_mbufs:
-       rte_mb();
-       ring->head = head;
+       __atomic_store_n(&ring->head, head, __ATOMIC_RELEASE);
 
        mq->n_pkts += n_rx_pkts;
 
@@ -723,8 +735,12 @@ eth_memif_tx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
        memif_free_stored_mbufs(proc_private, mq);
 
        /* ring type always MEMIF_RING_S2M */
-       slot = ring->head;
-       n_free = ring_size - ring->head + mq->last_tail;
+       /* The ring->head acts as a guard variable between Tx and Rx
+        * threads, so using load-acquire pairs with store-release
+        * to synchronize it between threads.
+        */
+       slot = __atomic_load_n(&ring->head, __ATOMIC_ACQUIRE);
+       n_free = ring_size - slot + mq->last_tail;
 
        int used_slots;
 
@@ -778,12 +794,11 @@ eth_memif_tx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
        }
 
 no_free_slots:
-       rte_mb();
        /* update ring pointers */
        if (type == MEMIF_RING_S2M)
-               ring->head = slot;
+               __atomic_store_n(&ring->head, slot, __ATOMIC_RELEASE);
        else
-               ring->tail = slot;
+               __atomic_store_n(&ring->tail, slot, __ATOMIC_RELEASE);
 
        /* Send interrupt, if enabled. */
        if ((ring->flags & MEMIF_RING_FLAG_MASK_INT) == 0) {