net/af_xdp: avoid deadlock due to empty fill queue
[dpdk.git] / drivers / net / memif / rte_eth_memif.c
index c1c7e9f..a19c0f3 100644 (file)
@@ -253,7 +253,12 @@ memif_free_stored_mbufs(struct pmd_process_private *proc_private, struct memif_q
        memif_ring_t *ring = memif_get_ring_from_queue(proc_private, mq);
 
        /* FIXME: improve performance */
-       while (mq->last_tail != ring->tail) {
+       /* The ring->tail acts as a guard variable between Tx and Rx
+        * threads, so using load-acquire pairs with store-release
+        * to synchronize it between threads.
+        */
+       while (mq->last_tail != __atomic_load_n(&ring->tail,
+                                               __ATOMIC_ACQUIRE)) {
                RTE_MBUF_PREFETCH_TO_FREE(mq->buffers[(mq->last_tail + 1) & mask]);
                /* Decrement refcnt and free mbuf. (current segment) */
                rte_mbuf_refcnt_update(mq->buffers[mq->last_tail & mask], -1);
@@ -455,7 +460,11 @@ eth_memif_rx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
        mask = ring_size - 1;
 
        cur_slot = mq->last_tail;
-       last_slot = ring->tail;
+       /* The ring->tail acts as a guard variable between Tx and Rx
+        * threads, so using load-acquire pairs with store-release
+        * to synchronize it between threads.
+        */
+       last_slot = __atomic_load_n(&ring->tail, __ATOMIC_ACQUIRE);
        if (cur_slot == last_slot)
                goto refill;
        n_slots = last_slot - cur_slot;
@@ -501,7 +510,11 @@ next_slot:
 
 /* Supply master with new buffers */
 refill:
-       head = ring->head;
+       /* The ring->head acts as a guard variable between Tx and Rx
+        * threads, so using load-acquire pairs with store-release
+        * to synchronize it between threads.
+        */
+       head = __atomic_load_n(&ring->head, __ATOMIC_ACQUIRE);
        n_slots = ring_size - head + mq->last_tail;
 
        if (n_slots < 32)
@@ -526,8 +539,7 @@ refill:
                        (uint8_t *)proc_private->regions[d0->region]->addr;
        }
 no_free_mbufs:
-       rte_mb();
-       ring->head = head;
+       __atomic_store_n(&ring->head, head, __ATOMIC_RELEASE);
 
        mq->n_pkts += n_rx_pkts;
 
@@ -723,8 +735,12 @@ eth_memif_tx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
        memif_free_stored_mbufs(proc_private, mq);
 
        /* ring type always MEMIF_RING_S2M */
-       slot = ring->head;
-       n_free = ring_size - ring->head + mq->last_tail;
+       /* The ring->head acts as a guard variable between Tx and Rx
+        * threads, so using load-acquire pairs with store-release
+        * to synchronize it between threads.
+        */
+       slot = __atomic_load_n(&ring->head, __ATOMIC_ACQUIRE);
+       n_free = ring_size - slot + mq->last_tail;
 
        int used_slots;
 
@@ -778,12 +794,11 @@ eth_memif_tx_zc(void *queue, struct rte_mbuf **bufs, uint16_t nb_pkts)
        }
 
 no_free_slots:
-       rte_mb();
        /* update ring pointers */
        if (type == MEMIF_RING_S2M)
-               ring->head = slot;
+               __atomic_store_n(&ring->head, slot, __ATOMIC_RELEASE);
        else
-               ring->tail = slot;
+               __atomic_store_n(&ring->tail, slot, __ATOMIC_RELEASE);
 
        /* Send interrupt, if enabled. */
        if ((ring->flags & MEMIF_RING_FLAG_MASK_INT) == 0) {