From 9ed877062898dd1eb7150bca56ebbd5329e96ee4 Mon Sep 17 00:00:00 2001 From: Gavin Hu Date: Fri, 2 Nov 2018 19:21:27 +0800 Subject: [PATCH] ring/c11: synchronize load and store of the tail Synchronize the load-acquire of the tail and the store-release within update_tail, the store release ensures all the ring operations, enqueue or dequeue, are seen by the observers on the other side as soon as they see the updated tail. The load-acquire is needed here as the data dependency is not a reliable way for ordering as the compiler might break it by saving to temporary values to boost performance. When computing the free_entries and avail_entries, use atomic semantics to load the heads and tails instead. The patch was benchmarked with test/ring_perf_autotest and it decreases the enqueue/dequeue latency by 5% ~ 27.6% with two lcores, the real gains are dependent on the number of lcores, depth of the ring, SPSC or MPMC. For 1 lcore, it also improves a little, about 3 ~ 4%. It is a big improvement, in case of MPMC, with two lcores and ring size of 32, it saves latency up to (3.26-2.36)/3.26 = 27.6%. This patch is a bug fix, while the improvement is a bonus. In our analysis the improvement comes from the cacheline pre-filling after hoisting load- acquire from _atomic_compare_exchange_n up above. The test command: $sudo ./test/test/test -l 16-19,44-47,72-75,100-103 -n 4 --socket-mem=\ 1024 -- -i Test result with this patch(two cores): SP/SC bulk enq/dequeue (size: 8): 5.86 MP/MC bulk enq/dequeue (size: 8): 10.15 SP/SC bulk enq/dequeue (size: 32): 1.94 MP/MC bulk enq/dequeue (size: 32): 2.36 In comparison of the test result without this patch: SP/SC bulk enq/dequeue (size: 8): 6.67 MP/MC bulk enq/dequeue (size: 8): 13.12 SP/SC bulk enq/dequeue (size: 32): 2.04 MP/MC bulk enq/dequeue (size: 32): 3.26 Fixes: 39368ebfc6 ("ring: introduce C11 memory model barrier option") Cc: stable@dpdk.org Signed-off-by: Gavin Hu Reviewed-by: Honnappa Nagarahalli Reviewed-by: Steve Capper Reviewed-by: Ola Liljedahl Reviewed-by: Jia He Acked-by: Jerin Jacob Tested-by: Jerin Jacob Acked-by: Olivier Matz --- lib/librte_ring/rte_ring_c11_mem.h | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/lib/librte_ring/rte_ring_c11_mem.h b/lib/librte_ring/rte_ring_c11_mem.h index 94df3c4a69..52da95a21c 100644 --- a/lib/librte_ring/rte_ring_c11_mem.h +++ b/lib/librte_ring/rte_ring_c11_mem.h @@ -57,6 +57,7 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp, uint32_t *free_entries) { const uint32_t capacity = r->capacity; + uint32_t cons_tail; unsigned int max = n; int success; @@ -67,13 +68,18 @@ __rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp, *old_head = __atomic_load_n(&r->prod.head, __ATOMIC_ACQUIRE); - /* - * The subtraction is done between two unsigned 32bits value + /* load-acquire synchronize with store-release of ht->tail + * in update_tail. + */ + cons_tail = __atomic_load_n(&r->cons.tail, + __ATOMIC_ACQUIRE); + + /* The subtraction is done between two unsigned 32bits value * (the result is always modulo 32 bits even if we have * *old_head > cons_tail). So 'free_entries' is always between 0 * and capacity (which is < size). */ - *free_entries = (capacity + r->cons.tail - *old_head); + *free_entries = (capacity + cons_tail - *old_head); /* check that we have enough room in ring */ if (unlikely(n > *free_entries)) @@ -125,21 +131,29 @@ __rte_ring_move_cons_head(struct rte_ring *r, int is_sc, uint32_t *entries) { unsigned int max = n; + uint32_t prod_tail; int success; /* move cons.head atomically */ do { /* Restore n as it may change every loop */ n = max; + *old_head = __atomic_load_n(&r->cons.head, __ATOMIC_ACQUIRE); + /* this load-acquire synchronize with store-release of ht->tail + * in update_tail. + */ + prod_tail = __atomic_load_n(&r->prod.tail, + __ATOMIC_ACQUIRE); + /* The subtraction is done between two unsigned 32bits value * (the result is always modulo 32 bits even if we have * cons_head > prod_tail). So 'entries' is always between 0 * and size(ring)-1. */ - *entries = (r->prod.tail - *old_head); + *entries = (prod_tail - *old_head); /* Set the actual entries for dequeue */ if (n > *entries) -- 2.20.1