From 1f90d32ce1756bfd26da913940e3ac38153c6e9a Mon Sep 17 00:00:00 2001 From: Honnappa Nagarahalli Date: Tue, 8 Oct 2019 16:12:19 -0500 Subject: [PATCH] rcu: add least acknowledged token optimization When the rte_rcu_qsbr_check API is called, it is possible to calculate the least valued token acknowledged by all the readers. When the API is called next time, the readers' token counters do not need to be scanned if the value of the token being queried is less than the last least token acknowledged. This avoids the cache line bounces between readers and writer. Fixes: 64994b56cfd7 ("rcu: add RCU library supporting QSBR mechanism") Cc: stable@dpdk.org Signed-off-by: Honnappa Nagarahalli Reviewed-by: Gavin Hu --- lib/librte_rcu/rte_rcu_qsbr.c | 4 ++++ lib/librte_rcu/rte_rcu_qsbr.h | 42 +++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c index 474675d510..2f3fad776e 100644 --- a/lib/librte_rcu/rte_rcu_qsbr.c +++ b/lib/librte_rcu/rte_rcu_qsbr.c @@ -72,6 +72,7 @@ rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads) __RTE_QSBR_THRID_ARRAY_ELM_SIZE) / __RTE_QSBR_THRID_ARRAY_ELM_SIZE; v->token = __RTE_QSBR_CNT_INIT; + v->acked_token = __RTE_QSBR_CNT_INIT - 1; return 0; } @@ -244,6 +245,9 @@ rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v) fprintf(f, " Token = %"PRIu64"\n", __atomic_load_n(&v->token, __ATOMIC_ACQUIRE)); + fprintf(f, " Least Acknowledged Token = %"PRIu64"\n", + __atomic_load_n(&v->acked_token, __ATOMIC_ACQUIRE)); + fprintf(f, "Quiescent State Counts for readers:\n"); for (i = 0; i < v->num_elems; i++) { bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i), diff --git a/lib/librte_rcu/rte_rcu_qsbr.h b/lib/librte_rcu/rte_rcu_qsbr.h index c80f15c009..3f445ba6ca 100644 --- a/lib/librte_rcu/rte_rcu_qsbr.h +++ b/lib/librte_rcu/rte_rcu_qsbr.h @@ -83,6 +83,7 @@ struct rte_rcu_qsbr_cnt { #define __RTE_QSBR_CNT_THR_OFFLINE 0 #define __RTE_QSBR_CNT_INIT 1 +#define __RTE_QSBR_CNT_MAX ((uint64_t)~0) /* RTE Quiescent State variable structure. * This structure has two elements that vary in size based on the @@ -93,6 +94,10 @@ struct rte_rcu_qsbr_cnt { struct rte_rcu_qsbr { uint64_t token __rte_cache_aligned; /**< Counter to allow for multiple concurrent quiescent state queries */ + uint64_t acked_token; + /**< Least token acked by all the threads in the last call to + * rte_rcu_qsbr_check API. + */ uint32_t num_elems __rte_cache_aligned; /**< Number of elements in the thread ID array */ @@ -472,6 +477,7 @@ __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait) uint64_t bmap; uint64_t c; uint64_t *reg_thread_id; + uint64_t acked_token = __RTE_QSBR_CNT_MAX; for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0); i < v->num_elems; @@ -493,6 +499,7 @@ __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait) __RTE_RCU_DP_LOG(DEBUG, "%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d", __func__, t, wait, c, id+j); + /* Counter is not checked for wrap-around condition * as it is a 64b counter. */ @@ -512,10 +519,25 @@ __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait) continue; } + /* This thread is in quiescent state. Use the counter + * to find the least acknowledged token among all the + * readers. + */ + if (c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c) + acked_token = c; + bmap &= ~(1UL << j); } } + /* All readers are checked, update least acknowledged token. + * There might be multiple writers trying to update this. There is + * no need to update this very accurately using compare-and-swap. + */ + if (acked_token != __RTE_QSBR_CNT_MAX) + __atomic_store_n(&v->acked_token, acked_token, + __ATOMIC_RELAXED); + return 1; } @@ -528,6 +550,7 @@ __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait) uint32_t i; struct rte_rcu_qsbr_cnt *cnt; uint64_t c; + uint64_t acked_token = __RTE_QSBR_CNT_MAX; for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) { __RTE_RCU_DP_LOG(DEBUG, @@ -538,6 +561,7 @@ __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait) __RTE_RCU_DP_LOG(DEBUG, "%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d", __func__, t, wait, c, i); + /* Counter is not checked for wrap-around condition * as it is a 64b counter. */ @@ -550,8 +574,22 @@ __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait) rte_pause(); } + + /* This thread is in quiescent state. Use the counter to find + * the least acknowledged token among all the readers. + */ + if (likely(c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)) + acked_token = c; } + /* All readers are checked, update least acknowledged token. + * There might be multiple writers trying to update this. There is + * no need to update this very accurately using compare-and-swap. + */ + if (acked_token != __RTE_QSBR_CNT_MAX) + __atomic_store_n(&v->acked_token, acked_token, + __ATOMIC_RELAXED); + return 1; } @@ -595,6 +633,10 @@ rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait) { RTE_ASSERT(v != NULL); + /* Check if all the readers have already acknowledged this token */ + if (likely(t <= v->acked_token)) + return 1; + if (likely(v->num_threads == v->max_threads)) return __rte_rcu_qsbr_check_all(v, t, wait); else -- 2.20.1