1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright (c) 2018 Arm Limited
5 #ifndef _RTE_RCU_QSBR_H_
6 #define _RTE_RCU_QSBR_H_
10 * RTE Quiescent State Based Reclamation (QSBR)
12 * Quiescent State (QS) is any point in the thread execution
13 * where the thread does not hold a reference to a data structure
14 * in shared memory. While using lock-less data structures, the writer
15 * can safely free memory once all the reader threads have entered
18 * This library provides the ability for the readers to report quiescent
19 * state and for the writers to identify when all the readers have
20 * entered quiescent state.
31 #include <rte_common.h>
32 #include <rte_memory.h>
33 #include <rte_lcore.h>
34 #include <rte_debug.h>
35 #include <rte_atomic.h>
37 extern int rte_rcu_log_type;
39 #if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
40 #define __RTE_RCU_DP_LOG(level, fmt, args...) \
41 rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
42 "%s(): " fmt "\n", __func__, ## args)
44 #define __RTE_RCU_DP_LOG(level, fmt, args...)
47 #if defined(RTE_LIBRTE_RCU_DEBUG)
48 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...) do {\
49 if (v->qsbr_cnt[thread_id].lock_cnt) \
50 rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
51 "%s(): " fmt "\n", __func__, ## args); \
54 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...)
57 /* Registered thread IDs are stored as a bitmap of 64b element array.
58 * Given thread id needs to be converted to index into the array and
59 * the id within the array element.
61 #define __RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)
62 #define __RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \
63 RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \
64 __RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE)
65 #define __RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \
66 ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)
67 #define __RTE_QSBR_THRID_INDEX_SHIFT 6
68 #define __RTE_QSBR_THRID_MASK 0x3f
69 #define RTE_QSBR_THRID_INVALID 0xffffffff
71 /* Worker thread counter */
72 struct rte_rcu_qsbr_cnt {
74 /**< Quiescent state counter. Value 0 indicates the thread is offline
75 * 64b counter is used to avoid adding more code to address
76 * counter overflow. Changing this to 32b would require additional
77 * changes to various APIs.
80 /**< Lock counter. Used when CONFIG_RTE_LIBRTE_RCU_DEBUG is enabled */
81 } __rte_cache_aligned;
83 #define __RTE_QSBR_CNT_THR_OFFLINE 0
84 #define __RTE_QSBR_CNT_INIT 1
86 /* RTE Quiescent State variable structure.
87 * This structure has two elements that vary in size based on the
88 * 'max_threads' parameter.
89 * 1) Quiescent state counter array
90 * 2) Register thread ID array
93 uint64_t token __rte_cache_aligned;
94 /**< Counter to allow for multiple concurrent quiescent state queries */
96 uint32_t num_elems __rte_cache_aligned;
97 /**< Number of elements in the thread ID array */
99 /**< Number of threads currently using this QS variable */
100 uint32_t max_threads;
101 /**< Maximum number of threads using this QS variable */
103 struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;
104 /**< Quiescent state counter array of 'max_threads' elements */
106 /**< Registered thread IDs are stored in a bitmap array,
107 * after the quiescent state counter array.
109 } __rte_cache_aligned;
113 * @b EXPERIMENTAL: this API may change without prior notice
115 * Return the size of the memory occupied by a Quiescent State variable.
118 * Maximum number of threads reporting quiescent state on this variable.
120 * On success - size of memory in bytes required for this QS variable.
121 * On error - 1 with error code set in rte_errno.
122 * Possible rte_errno codes are:
123 * - EINVAL - max_threads is 0
127 rte_rcu_qsbr_get_memsize(uint32_t max_threads);
131 * @b EXPERIMENTAL: this API may change without prior notice
133 * Initialize a Quiescent State (QS) variable.
138 * Maximum number of threads reporting quiescent state on this variable.
139 * This should be the same value as passed to rte_rcu_qsbr_get_memsize.
142 * On error - 1 with error code set in rte_errno.
143 * Possible rte_errno codes are:
144 * - EINVAL - max_threads is 0 or 'v' is NULL.
149 rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
153 * @b EXPERIMENTAL: this API may change without prior notice
155 * Register a reader thread to report its quiescent state
158 * This is implemented as a lock-free function. It is multi-thread
160 * Any reader thread that wants to report its quiescent state must
161 * call this API. This can be called during initialization or as part
162 * of the packet processing loop.
164 * Note that rte_rcu_qsbr_thread_online must be called before the
165 * thread updates its quiescent state using rte_rcu_qsbr_quiescent.
170 * Reader thread with this thread ID will report its quiescent state on
171 * the QS variable. thread_id is a value between 0 and (max_threads - 1).
172 * 'max_threads' is the parameter passed in 'rte_rcu_qsbr_init' API.
176 rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);
180 * @b EXPERIMENTAL: this API may change without prior notice
182 * Remove a reader thread, from the list of threads reporting their
183 * quiescent state on a QS variable.
185 * This is implemented as a lock-free function. It is multi-thread safe.
186 * This API can be called from the reader threads during shutdown.
187 * Ongoing quiescent state queries will stop waiting for the status from this
188 * unregistered reader thread.
193 * Reader thread with this thread ID will stop reporting its quiescent
194 * state on the QS variable.
198 rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
202 * @b EXPERIMENTAL: this API may change without prior notice
204 * Add a registered reader thread, to the list of threads reporting their
205 * quiescent state on a QS variable.
207 * This is implemented as a lock-free function. It is multi-thread
210 * Any registered reader thread that wants to report its quiescent state must
211 * call this API before calling rte_rcu_qsbr_quiescent. This can be called
212 * during initialization or as part of the packet processing loop.
214 * The reader thread must call rte_rcu_thread_offline API, before
215 * calling any functions that block, to ensure that rte_rcu_qsbr_check
216 * API does not wait indefinitely for the reader thread to update its QS.
218 * The reader thread must call rte_rcu_thread_online API, after the blocking
219 * function call returns, to ensure that rte_rcu_qsbr_check API
220 * waits for the reader thread to update its quiescent state.
225 * Reader thread with this thread ID will report its quiescent state on
229 static __rte_always_inline void
230 rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
234 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
236 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
237 v->qsbr_cnt[thread_id].lock_cnt);
239 /* Copy the current value of token.
240 * The fence at the end of the function will ensure that
241 * the following will not move down after the load of any shared
244 t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
246 /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
247 * 'cnt' (64b) is accessed atomically.
249 __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
250 t, __ATOMIC_RELAXED);
252 /* The subsequent load of the data structure should not
253 * move above the store. Hence a store-load barrier
255 * If the load of the data structure moves above the store,
256 * writer might not see that the reader is online, even though
257 * the reader is referencing the shared data structure.
259 #ifdef RTE_ARCH_X86_64
260 /* rte_smp_mb() for x86 is lighter */
263 __atomic_thread_fence(__ATOMIC_SEQ_CST);
269 * @b EXPERIMENTAL: this API may change without prior notice
271 * Remove a registered reader thread from the list of threads reporting their
272 * quiescent state on a QS variable.
274 * This is implemented as a lock-free function. It is multi-thread
277 * This can be called during initialization or as part of the packet
280 * The reader thread must call rte_rcu_thread_offline API, before
281 * calling any functions that block, to ensure that rte_rcu_qsbr_check
282 * API does not wait indefinitely for the reader thread to update its QS.
287 * rte_rcu_qsbr_check API will not wait for the reader thread with
288 * this thread ID to report its quiescent state on the QS variable.
291 static __rte_always_inline void
292 rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
294 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
296 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
297 v->qsbr_cnt[thread_id].lock_cnt);
299 /* The reader can go offline only after the load of the
300 * data structure is completed. i.e. any load of the
301 * data strcture can not move after this store.
304 __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
305 __RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE);
310 * @b EXPERIMENTAL: this API may change without prior notice
312 * Acquire a lock for accessing a shared data structure.
314 * This is implemented as a lock-free function. It is multi-thread
317 * This API is provided to aid debugging. This should be called before
318 * accessing a shared data structure.
320 * When CONFIG_RTE_LIBRTE_RCU_DEBUG is enabled a lock counter is incremented.
321 * Similarly rte_rcu_qsbr_unlock will decrement the counter. When the
322 * rte_rcu_qsbr_check API will verify that this counter is 0.
324 * When CONFIG_RTE_LIBRTE_RCU_DEBUG is disabled, this API will do nothing.
332 static __rte_always_inline void
333 rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v,
334 __rte_unused unsigned int thread_id)
336 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
338 #if defined(RTE_LIBRTE_RCU_DEBUG)
339 /* Increment the lock counter */
340 __atomic_fetch_add(&v->qsbr_cnt[thread_id].lock_cnt,
341 1, __ATOMIC_ACQUIRE);
347 * @b EXPERIMENTAL: this API may change without prior notice
349 * Release a lock after accessing a shared data structure.
351 * This is implemented as a lock-free function. It is multi-thread
354 * This API is provided to aid debugging. This should be called after
355 * accessing a shared data structure.
357 * When CONFIG_RTE_LIBRTE_RCU_DEBUG is enabled, rte_rcu_qsbr_unlock will
358 * decrement a lock counter. rte_rcu_qsbr_check API will verify that this
361 * When CONFIG_RTE_LIBRTE_RCU_DEBUG is disabled, this API will do nothing.
369 static __rte_always_inline void
370 rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v,
371 __rte_unused unsigned int thread_id)
373 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
375 #if defined(RTE_LIBRTE_RCU_DEBUG)
376 /* Decrement the lock counter */
377 __atomic_fetch_sub(&v->qsbr_cnt[thread_id].lock_cnt,
378 1, __ATOMIC_RELEASE);
380 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING,
381 "Lock counter %u. Nested locks?\n",
382 v->qsbr_cnt[thread_id].lock_cnt);
388 * @b EXPERIMENTAL: this API may change without prior notice
390 * Ask the reader threads to report the quiescent state
393 * This is implemented as a lock-free function. It is multi-thread
394 * safe and can be called from worker threads.
399 * - This is the token for this call of the API. This should be
400 * passed to rte_rcu_qsbr_check API.
403 static __rte_always_inline uint64_t
404 rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
408 RTE_ASSERT(v != NULL);
410 /* Release the changes to the shared data structure.
411 * This store release will ensure that changes to any data
412 * structure are visible to the workers before the token
415 t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);
422 * @b EXPERIMENTAL: this API may change without prior notice
424 * Update quiescent state for a reader thread.
426 * This is implemented as a lock-free function. It is multi-thread safe.
427 * All the reader threads registered to report their quiescent state
428 * on the QS variable must call this API.
433 * Update the quiescent state for the reader with this thread ID.
436 static __rte_always_inline void
437 rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
441 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
443 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
444 v->qsbr_cnt[thread_id].lock_cnt);
446 /* Acquire the changes to the shared data structure released
447 * by rte_rcu_qsbr_start.
448 * Later loads of the shared data structure should not move
449 * above this load. Hence, use load-acquire.
451 t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
453 /* Inform the writer that updates are visible to this reader.
454 * Prior loads of the shared data structure should not move
455 * beyond this store. Hence use store-release.
457 __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
458 t, __ATOMIC_RELEASE);
460 __RTE_RCU_DP_LOG(DEBUG, "%s: update: token = %"PRIu64", Thread ID = %d",
461 __func__, t, thread_id);
464 /* Check the quiescent state counter for registered threads only, assuming
465 * that not all threads have registered.
467 static __rte_always_inline int
468 __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
473 uint64_t *reg_thread_id;
475 for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
477 i++, reg_thread_id++) {
478 /* Load the current registered thread bit map before
479 * loading the reader thread quiescent state counters.
481 bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE);
482 id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
485 j = __builtin_ctzl(bmap);
486 __RTE_RCU_DP_LOG(DEBUG,
487 "%s: check: token = %"PRIu64", wait = %d, Bit Map = 0x%"PRIx64", Thread ID = %d",
488 __func__, t, wait, bmap, id + j);
490 &v->qsbr_cnt[id + j].cnt,
492 __RTE_RCU_DP_LOG(DEBUG,
493 "%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d",
494 __func__, t, wait, c, id+j);
495 /* Counter is not checked for wrap-around condition
496 * as it is a 64b counter.
499 __RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
500 /* This thread is not in quiescent state */
505 /* This thread might have unregistered.
506 * Re-read the bitmap.
508 bmap = __atomic_load_n(reg_thread_id,
521 /* Check the quiescent state counter for all threads, assuming that
522 * all the threads have registered.
524 static __rte_always_inline int
525 __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
528 struct rte_rcu_qsbr_cnt *cnt;
531 for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
532 __RTE_RCU_DP_LOG(DEBUG,
533 "%s: check: token = %"PRIu64", wait = %d, Thread ID = %d",
534 __func__, t, wait, i);
536 c = __atomic_load_n(&cnt->cnt, __ATOMIC_ACQUIRE);
537 __RTE_RCU_DP_LOG(DEBUG,
538 "%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d",
539 __func__, t, wait, c, i);
540 /* Counter is not checked for wrap-around condition
541 * as it is a 64b counter.
543 if (likely(c == __RTE_QSBR_CNT_THR_OFFLINE || c >= t))
546 /* This thread is not in quiescent state */
559 * @b EXPERIMENTAL: this API may change without prior notice
561 * Checks if all the reader threads have entered the quiescent state
562 * referenced by token.
564 * This is implemented as a lock-free function. It is multi-thread
565 * safe and can be called from the worker threads as well.
567 * If this API is called with 'wait' set to true, the following
568 * factors must be considered:
570 * 1) If the calling thread is also reporting the status on the
571 * same QS variable, it must update the quiescent state status, before
574 * 2) In addition, while calling from multiple threads, only
575 * one of those threads can be reporting the quiescent state status
576 * on a given QS variable.
581 * Token returned by rte_rcu_qsbr_start API
583 * If true, block till all the reader threads have completed entering
584 * the quiescent state referenced by token 't'.
586 * - 0 if all reader threads have NOT passed through specified number
587 * of quiescent states.
588 * - 1 if all reader threads have passed through specified number
589 * of quiescent states.
592 static __rte_always_inline int
593 rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
595 RTE_ASSERT(v != NULL);
597 if (likely(v->num_threads == v->max_threads))
598 return __rte_rcu_qsbr_check_all(v, t, wait);
600 return __rte_rcu_qsbr_check_selective(v, t, wait);
605 * @b EXPERIMENTAL: this API may change without prior notice
607 * Wait till the reader threads have entered quiescent state.
609 * This is implemented as a lock-free function. It is multi-thread safe.
610 * This API can be thought of as a wrapper around rte_rcu_qsbr_start and
611 * rte_rcu_qsbr_check APIs.
613 * If this API is called from multiple threads, only one of
614 * those threads can be reporting the quiescent state status on a
620 * Thread ID of the caller if it is registered to report quiescent state
621 * on this QS variable (i.e. the calling thread is also part of the
622 * readside critical section). If not, pass RTE_QSBR_THRID_INVALID.
626 rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id);
630 * @b EXPERIMENTAL: this API may change without prior notice
632 * Dump the details of a single QS variables to a file.
634 * It is NOT multi-thread safe.
637 * A pointer to a file for output
642 * On error - 1 with error code set in rte_errno.
643 * Possible rte_errno codes are:
644 * - EINVAL - NULL parameters are passed
648 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
654 #endif /* _RTE_RCU_QSBR_H_ */