Using 'rte_mb' to synchronize the shared ring head/tail between producer
and consumer will stall the pipeline and damage performance on the weak
memory model platforms, such like aarch64. Meanwhile update the shared
ring head and tail are observable and ordered between CPUs on IA.
Optimized this full barrier with the one-way barrier can improve the
throughput. On aarch64 n1sdp server this patch make testpmd throughput
boost 2.1%. On Intel E5-2640, testpmd got 3.98% performance gain.
Signed-off-by: Phil Yang <phil.yang@arm.com>
Reviewed-by: Gavin Hu <gavin.hu@arm.com>
Reviewed-by: Jakub Grajciar <jgrajcia@cisco.com>
uint32_t cookie; /**< MEMIF_COOKIE */
uint16_t flags; /**< flags */
#define MEMIF_RING_FLAG_MASK_INT 1 /**< disable interrupt mode */
uint32_t cookie; /**< MEMIF_COOKIE */
uint16_t flags; /**< flags */
#define MEMIF_RING_FLAG_MASK_INT 1 /**< disable interrupt mode */
- volatile uint16_t head; /**< pointer to ring buffer head */
+ uint16_t head; /**< pointer to ring buffer head */
MEMIF_CACHELINE_ALIGN_MARK(cacheline1);
MEMIF_CACHELINE_ALIGN_MARK(cacheline1);
- volatile uint16_t tail; /**< pointer to ring buffer tail */
+ uint16_t tail; /**< pointer to ring buffer tail */
MEMIF_CACHELINE_ALIGN_MARK(cacheline2);
memif_desc_t desc[0]; /**< buffer descriptors */
} memif_ring_t;
MEMIF_CACHELINE_ALIGN_MARK(cacheline2);
memif_desc_t desc[0]; /**< buffer descriptors */
} memif_ring_t;
ring_size = 1 << mq->log2_ring_size;
mask = ring_size - 1;
ring_size = 1 << mq->log2_ring_size;
mask = ring_size - 1;
- cur_slot = (type == MEMIF_RING_S2M) ? mq->last_head : mq->last_tail;
- last_slot = (type == MEMIF_RING_S2M) ? ring->head : ring->tail;
+ if (type == MEMIF_RING_S2M) {
+ cur_slot = mq->last_head;
+ last_slot = __atomic_load_n(&ring->head, __ATOMIC_ACQUIRE);
+ } else {
+ cur_slot = mq->last_tail;
+ last_slot = __atomic_load_n(&ring->tail, __ATOMIC_ACQUIRE);
+ }
+
if (cur_slot == last_slot)
goto refill;
n_slots = last_slot - cur_slot;
if (cur_slot == last_slot)
goto refill;
n_slots = last_slot - cur_slot;
no_free_bufs:
if (type == MEMIF_RING_S2M) {
no_free_bufs:
if (type == MEMIF_RING_S2M) {
- rte_mb();
- ring->tail = cur_slot;
+ __atomic_store_n(&ring->tail, cur_slot, __ATOMIC_RELEASE);
mq->last_head = cur_slot;
} else {
mq->last_tail = cur_slot;
mq->last_head = cur_slot;
} else {
mq->last_tail = cur_slot;
refill:
if (type == MEMIF_RING_M2S) {
refill:
if (type == MEMIF_RING_M2S) {
+ head = __atomic_load_n(&ring->head, __ATOMIC_ACQUIRE);
n_slots = ring_size - head + mq->last_tail;
while (n_slots--) {
n_slots = ring_size - head + mq->last_tail;
while (n_slots--) {
d0 = &ring->desc[s0];
d0->length = pmd->run.pkt_buffer_size;
}
d0 = &ring->desc[s0];
d0->length = pmd->run.pkt_buffer_size;
}
- rte_mb();
- ring->head = head;
+ __atomic_store_n(&ring->head, head, __ATOMIC_RELEASE);
}
mq->n_pkts += n_rx_pkts;
}
mq->n_pkts += n_rx_pkts;
ring_size = 1 << mq->log2_ring_size;
mask = ring_size - 1;
ring_size = 1 << mq->log2_ring_size;
mask = ring_size - 1;
- n_free = ring->tail - mq->last_tail;
+ n_free = __atomic_load_n(&ring->tail, __ATOMIC_ACQUIRE) - mq->last_tail;
- slot = (type == MEMIF_RING_S2M) ? ring->head : ring->tail;
- if (type == MEMIF_RING_S2M)
- n_free = ring_size - ring->head + mq->last_tail;
- else
- n_free = ring->head - ring->tail;
+ if (type == MEMIF_RING_S2M) {
+ slot = __atomic_load_n(&ring->head, __ATOMIC_ACQUIRE);
+ n_free = ring_size - slot + mq->last_tail;
+ } else {
+ slot = __atomic_load_n(&ring->tail, __ATOMIC_ACQUIRE);
+ n_free = __atomic_load_n(&ring->head, __ATOMIC_ACQUIRE) - slot;
+ }
while (n_tx_pkts < nb_pkts && n_free) {
mbuf_head = *bufs++;
while (n_tx_pkts < nb_pkts && n_free) {
mbuf_head = *bufs++;
if (type == MEMIF_RING_S2M)
if (type == MEMIF_RING_S2M)
+ __atomic_store_n(&ring->head, slot, __ATOMIC_RELEASE);
+ __atomic_store_n(&ring->tail, slot, __ATOMIC_RELEASE);
if ((ring->flags & MEMIF_RING_FLAG_MASK_INT) == 0) {
a = 1;
if ((ring->flags & MEMIF_RING_FLAG_MASK_INT) == 0) {
a = 1;
for (i = 0; i < pmd->run.num_s2m_rings; i++) {
ring = memif_get_ring(pmd, proc_private, MEMIF_RING_S2M, i);
for (i = 0; i < pmd->run.num_s2m_rings; i++) {
ring = memif_get_ring(pmd, proc_private, MEMIF_RING_S2M, i);
- ring->head = 0;
- ring->tail = 0;
+ __atomic_store_n(&ring->head, 0, __ATOMIC_RELAXED);
+ __atomic_store_n(&ring->tail, 0, __ATOMIC_RELAXED);
ring->cookie = MEMIF_COOKIE;
ring->flags = 0;
for (j = 0; j < (1 << pmd->run.log2_ring_size); j++) {
ring->cookie = MEMIF_COOKIE;
ring->flags = 0;
for (j = 0; j < (1 << pmd->run.log2_ring_size); j++) {
for (i = 0; i < pmd->run.num_m2s_rings; i++) {
ring = memif_get_ring(pmd, proc_private, MEMIF_RING_M2S, i);
for (i = 0; i < pmd->run.num_m2s_rings; i++) {
ring = memif_get_ring(pmd, proc_private, MEMIF_RING_M2S, i);
- ring->head = 0;
- ring->tail = 0;
+ __atomic_store_n(&ring->head, 0, __ATOMIC_RELAXED);
+ __atomic_store_n(&ring->tail, 0, __ATOMIC_RELAXED);
ring->cookie = MEMIF_COOKIE;
ring->flags = 0;
for (j = 0; j < (1 << pmd->run.log2_ring_size); j++) {
ring->cookie = MEMIF_COOKIE;
ring->flags = 0;
for (j = 0; j < (1 << pmd->run.log2_ring_size); j++) {
MIF_LOG(ERR, "Wrong ring");
return -1;
}
MIF_LOG(ERR, "Wrong ring");
return -1;
}
- ring->head = 0;
- ring->tail = 0;
+ __atomic_store_n(&ring->head, 0, __ATOMIC_RELAXED);
+ __atomic_store_n(&ring->tail, 0, __ATOMIC_RELAXED);
mq->last_head = 0;
mq->last_tail = 0;
/* enable polling mode */
mq->last_head = 0;
mq->last_tail = 0;
/* enable polling mode */
MIF_LOG(ERR, "Wrong ring");
return -1;
}
MIF_LOG(ERR, "Wrong ring");
return -1;
}
- ring->head = 0;
- ring->tail = 0;
+ __atomic_store_n(&ring->head, 0, __ATOMIC_RELAXED);
+ __atomic_store_n(&ring->tail, 0, __ATOMIC_RELAXED);
mq->last_head = 0;
mq->last_tail = 0;
/* enable polling mode */
mq->last_head = 0;
mq->last_tail = 0;
/* enable polling mode */