1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright (c) 2018 Arm Limited
5 #ifndef _RTE_RCU_QSBR_H_
6 #define _RTE_RCU_QSBR_H_
10 * RTE Quiescent State Based Reclamation (QSBR)
12 * Quiescent State (QS) is any point in the thread execution
13 * where the thread does not hold a reference to a data structure
14 * in shared memory. While using lock-less data structures, the writer
15 * can safely free memory once all the reader threads have entered
18 * This library provides the ability for the readers to report quiescent
19 * state and for the writers to identify when all the readers have
20 * entered quiescent state.
32 #include <rte_common.h>
33 #include <rte_memory.h>
34 #include <rte_lcore.h>
35 #include <rte_debug.h>
36 #include <rte_atomic.h>
38 extern int rte_rcu_log_type;
40 #if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
41 #define __RTE_RCU_DP_LOG(level, fmt, args...) \
42 rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
43 "%s(): " fmt "\n", __func__, ## args)
45 #define __RTE_RCU_DP_LOG(level, fmt, args...)
48 #if defined(RTE_LIBRTE_RCU_DEBUG)
49 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...) do {\
50 if (v->qsbr_cnt[thread_id].lock_cnt) \
51 rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
52 "%s(): " fmt "\n", __func__, ## args); \
55 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...)
58 /* Registered thread IDs are stored as a bitmap of 64b element array.
59 * Given thread id needs to be converted to index into the array and
60 * the id within the array element.
62 #define __RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)
63 #define __RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \
64 RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \
65 __RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE)
66 #define __RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \
67 ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)
68 #define __RTE_QSBR_THRID_INDEX_SHIFT 6
69 #define __RTE_QSBR_THRID_MASK 0x3f
70 #define RTE_QSBR_THRID_INVALID 0xffffffff
72 /* Worker thread counter */
73 struct rte_rcu_qsbr_cnt {
75 /**< Quiescent state counter. Value 0 indicates the thread is offline
76 * 64b counter is used to avoid adding more code to address
77 * counter overflow. Changing this to 32b would require additional
78 * changes to various APIs.
81 /**< Lock counter. Used when CONFIG_RTE_LIBRTE_RCU_DEBUG is enabled */
82 } __rte_cache_aligned;
84 #define __RTE_QSBR_CNT_THR_OFFLINE 0
85 #define __RTE_QSBR_CNT_INIT 1
86 #define __RTE_QSBR_CNT_MAX ((uint64_t)~0)
88 /* RTE Quiescent State variable structure.
89 * This structure has two elements that vary in size based on the
90 * 'max_threads' parameter.
91 * 1) Quiescent state counter array
92 * 2) Register thread ID array
95 uint64_t token __rte_cache_aligned;
96 /**< Counter to allow for multiple concurrent quiescent state queries */
98 /**< Least token acked by all the threads in the last call to
99 * rte_rcu_qsbr_check API.
102 uint32_t num_elems __rte_cache_aligned;
103 /**< Number of elements in the thread ID array */
104 uint32_t num_threads;
105 /**< Number of threads currently using this QS variable */
106 uint32_t max_threads;
107 /**< Maximum number of threads using this QS variable */
109 struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;
110 /**< Quiescent state counter array of 'max_threads' elements */
112 /**< Registered thread IDs are stored in a bitmap array,
113 * after the quiescent state counter array.
115 } __rte_cache_aligned;
119 * @b EXPERIMENTAL: this API may change without prior notice
121 * Return the size of the memory occupied by a Quiescent State variable.
124 * Maximum number of threads reporting quiescent state on this variable.
126 * On success - size of memory in bytes required for this QS variable.
127 * On error - 1 with error code set in rte_errno.
128 * Possible rte_errno codes are:
129 * - EINVAL - max_threads is 0
133 rte_rcu_qsbr_get_memsize(uint32_t max_threads);
137 * @b EXPERIMENTAL: this API may change without prior notice
139 * Initialize a Quiescent State (QS) variable.
144 * Maximum number of threads reporting quiescent state on this variable.
145 * This should be the same value as passed to rte_rcu_qsbr_get_memsize.
148 * On error - 1 with error code set in rte_errno.
149 * Possible rte_errno codes are:
150 * - EINVAL - max_threads is 0 or 'v' is NULL.
155 rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
159 * @b EXPERIMENTAL: this API may change without prior notice
161 * Register a reader thread to report its quiescent state
164 * This is implemented as a lock-free function. It is multi-thread
166 * Any reader thread that wants to report its quiescent state must
167 * call this API. This can be called during initialization or as part
168 * of the packet processing loop.
170 * Note that rte_rcu_qsbr_thread_online must be called before the
171 * thread updates its quiescent state using rte_rcu_qsbr_quiescent.
176 * Reader thread with this thread ID will report its quiescent state on
177 * the QS variable. thread_id is a value between 0 and (max_threads - 1).
178 * 'max_threads' is the parameter passed in 'rte_rcu_qsbr_init' API.
182 rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);
186 * @b EXPERIMENTAL: this API may change without prior notice
188 * Remove a reader thread, from the list of threads reporting their
189 * quiescent state on a QS variable.
191 * This is implemented as a lock-free function. It is multi-thread safe.
192 * This API can be called from the reader threads during shutdown.
193 * Ongoing quiescent state queries will stop waiting for the status from this
194 * unregistered reader thread.
199 * Reader thread with this thread ID will stop reporting its quiescent
200 * state on the QS variable.
204 rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
208 * @b EXPERIMENTAL: this API may change without prior notice
210 * Add a registered reader thread, to the list of threads reporting their
211 * quiescent state on a QS variable.
213 * This is implemented as a lock-free function. It is multi-thread
216 * Any registered reader thread that wants to report its quiescent state must
217 * call this API before calling rte_rcu_qsbr_quiescent. This can be called
218 * during initialization or as part of the packet processing loop.
220 * The reader thread must call rte_rcu_qsbr_thread_offline API, before
221 * calling any functions that block, to ensure that rte_rcu_qsbr_check
222 * API does not wait indefinitely for the reader thread to update its QS.
224 * The reader thread must call rte_rcu_thread_online API, after the blocking
225 * function call returns, to ensure that rte_rcu_qsbr_check API
226 * waits for the reader thread to update its quiescent state.
231 * Reader thread with this thread ID will report its quiescent state on
235 static __rte_always_inline void
236 rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
240 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
242 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
243 v->qsbr_cnt[thread_id].lock_cnt);
245 /* Copy the current value of token.
246 * The fence at the end of the function will ensure that
247 * the following will not move down after the load of any shared
250 t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
252 /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
253 * 'cnt' (64b) is accessed atomically.
255 __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
256 t, __ATOMIC_RELAXED);
258 /* The subsequent load of the data structure should not
259 * move above the store. Hence a store-load barrier
261 * If the load of the data structure moves above the store,
262 * writer might not see that the reader is online, even though
263 * the reader is referencing the shared data structure.
265 #ifdef RTE_ARCH_X86_64
266 /* rte_smp_mb() for x86 is lighter */
269 __atomic_thread_fence(__ATOMIC_SEQ_CST);
275 * @b EXPERIMENTAL: this API may change without prior notice
277 * Remove a registered reader thread from the list of threads reporting their
278 * quiescent state on a QS variable.
280 * This is implemented as a lock-free function. It is multi-thread
283 * This can be called during initialization or as part of the packet
286 * The reader thread must call rte_rcu_qsbr_thread_offline API, before
287 * calling any functions that block, to ensure that rte_rcu_qsbr_check
288 * API does not wait indefinitely for the reader thread to update its QS.
293 * rte_rcu_qsbr_check API will not wait for the reader thread with
294 * this thread ID to report its quiescent state on the QS variable.
297 static __rte_always_inline void
298 rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
300 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
302 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
303 v->qsbr_cnt[thread_id].lock_cnt);
305 /* The reader can go offline only after the load of the
306 * data structure is completed. i.e. any load of the
307 * data strcture can not move after this store.
310 __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
311 __RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE);
316 * @b EXPERIMENTAL: this API may change without prior notice
318 * Acquire a lock for accessing a shared data structure.
320 * This is implemented as a lock-free function. It is multi-thread
323 * This API is provided to aid debugging. This should be called before
324 * accessing a shared data structure.
326 * When CONFIG_RTE_LIBRTE_RCU_DEBUG is enabled a lock counter is incremented.
327 * Similarly rte_rcu_qsbr_unlock will decrement the counter. When the
328 * rte_rcu_qsbr_check API will verify that this counter is 0.
330 * When CONFIG_RTE_LIBRTE_RCU_DEBUG is disabled, this API will do nothing.
338 static __rte_always_inline void
339 rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v,
340 __rte_unused unsigned int thread_id)
342 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
344 #if defined(RTE_LIBRTE_RCU_DEBUG)
345 /* Increment the lock counter */
346 __atomic_fetch_add(&v->qsbr_cnt[thread_id].lock_cnt,
347 1, __ATOMIC_ACQUIRE);
353 * @b EXPERIMENTAL: this API may change without prior notice
355 * Release a lock after accessing a shared data structure.
357 * This is implemented as a lock-free function. It is multi-thread
360 * This API is provided to aid debugging. This should be called after
361 * accessing a shared data structure.
363 * When CONFIG_RTE_LIBRTE_RCU_DEBUG is enabled, rte_rcu_qsbr_unlock will
364 * decrement a lock counter. rte_rcu_qsbr_check API will verify that this
367 * When CONFIG_RTE_LIBRTE_RCU_DEBUG is disabled, this API will do nothing.
375 static __rte_always_inline void
376 rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v,
377 __rte_unused unsigned int thread_id)
379 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
381 #if defined(RTE_LIBRTE_RCU_DEBUG)
382 /* Decrement the lock counter */
383 __atomic_fetch_sub(&v->qsbr_cnt[thread_id].lock_cnt,
384 1, __ATOMIC_RELEASE);
386 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING,
387 "Lock counter %u. Nested locks?\n",
388 v->qsbr_cnt[thread_id].lock_cnt);
394 * @b EXPERIMENTAL: this API may change without prior notice
396 * Ask the reader threads to report the quiescent state
399 * This is implemented as a lock-free function. It is multi-thread
400 * safe and can be called from worker threads.
405 * - This is the token for this call of the API. This should be
406 * passed to rte_rcu_qsbr_check API.
409 static __rte_always_inline uint64_t
410 rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
414 RTE_ASSERT(v != NULL);
416 /* Release the changes to the shared data structure.
417 * This store release will ensure that changes to any data
418 * structure are visible to the workers before the token
421 t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);
428 * @b EXPERIMENTAL: this API may change without prior notice
430 * Update quiescent state for a reader thread.
432 * This is implemented as a lock-free function. It is multi-thread safe.
433 * All the reader threads registered to report their quiescent state
434 * on the QS variable must call this API.
439 * Update the quiescent state for the reader with this thread ID.
442 static __rte_always_inline void
443 rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
447 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
449 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
450 v->qsbr_cnt[thread_id].lock_cnt);
452 /* Acquire the changes to the shared data structure released
453 * by rte_rcu_qsbr_start.
454 * Later loads of the shared data structure should not move
455 * above this load. Hence, use load-acquire.
457 t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
459 /* Check if there are updates available from the writer.
460 * Inform the writer that updates are visible to this reader.
461 * Prior loads of the shared data structure should not move
462 * beyond this store. Hence use store-release.
464 if (t != __atomic_load_n(&v->qsbr_cnt[thread_id].cnt, __ATOMIC_RELAXED))
465 __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
466 t, __ATOMIC_RELEASE);
468 __RTE_RCU_DP_LOG(DEBUG, "%s: update: token = %"PRIu64", Thread ID = %d",
469 __func__, t, thread_id);
472 /* Check the quiescent state counter for registered threads only, assuming
473 * that not all threads have registered.
475 static __rte_always_inline int
476 __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
481 uint64_t *reg_thread_id;
482 uint64_t acked_token = __RTE_QSBR_CNT_MAX;
484 for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
486 i++, reg_thread_id++) {
487 /* Load the current registered thread bit map before
488 * loading the reader thread quiescent state counters.
490 bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE);
491 id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
494 j = __builtin_ctzl(bmap);
495 __RTE_RCU_DP_LOG(DEBUG,
496 "%s: check: token = %"PRIu64", wait = %d, Bit Map = 0x%"PRIx64", Thread ID = %d",
497 __func__, t, wait, bmap, id + j);
499 &v->qsbr_cnt[id + j].cnt,
501 __RTE_RCU_DP_LOG(DEBUG,
502 "%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d",
503 __func__, t, wait, c, id+j);
505 /* Counter is not checked for wrap-around condition
506 * as it is a 64b counter.
509 __RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
510 /* This thread is not in quiescent state */
515 /* This thread might have unregistered.
516 * Re-read the bitmap.
518 bmap = __atomic_load_n(reg_thread_id,
524 /* This thread is in quiescent state. Use the counter
525 * to find the least acknowledged token among all the
528 if (c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)
535 /* All readers are checked, update least acknowledged token.
536 * There might be multiple writers trying to update this. There is
537 * no need to update this very accurately using compare-and-swap.
539 if (acked_token != __RTE_QSBR_CNT_MAX)
540 __atomic_store_n(&v->acked_token, acked_token,
546 /* Check the quiescent state counter for all threads, assuming that
547 * all the threads have registered.
549 static __rte_always_inline int
550 __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
553 struct rte_rcu_qsbr_cnt *cnt;
555 uint64_t acked_token = __RTE_QSBR_CNT_MAX;
557 for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
558 __RTE_RCU_DP_LOG(DEBUG,
559 "%s: check: token = %"PRIu64", wait = %d, Thread ID = %d",
560 __func__, t, wait, i);
562 c = __atomic_load_n(&cnt->cnt, __ATOMIC_ACQUIRE);
563 __RTE_RCU_DP_LOG(DEBUG,
564 "%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d",
565 __func__, t, wait, c, i);
567 /* Counter is not checked for wrap-around condition
568 * as it is a 64b counter.
570 if (likely(c == __RTE_QSBR_CNT_THR_OFFLINE || c >= t))
573 /* This thread is not in quiescent state */
580 /* This thread is in quiescent state. Use the counter to find
581 * the least acknowledged token among all the readers.
583 if (likely(c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c))
587 /* All readers are checked, update least acknowledged token.
588 * There might be multiple writers trying to update this. There is
589 * no need to update this very accurately using compare-and-swap.
591 if (acked_token != __RTE_QSBR_CNT_MAX)
592 __atomic_store_n(&v->acked_token, acked_token,
600 * @b EXPERIMENTAL: this API may change without prior notice
602 * Checks if all the reader threads have entered the quiescent state
603 * referenced by token.
605 * This is implemented as a lock-free function. It is multi-thread
606 * safe and can be called from the worker threads as well.
608 * If this API is called with 'wait' set to true, the following
609 * factors must be considered:
611 * 1) If the calling thread is also reporting the status on the
612 * same QS variable, it must update the quiescent state status, before
615 * 2) In addition, while calling from multiple threads, only
616 * one of those threads can be reporting the quiescent state status
617 * on a given QS variable.
622 * Token returned by rte_rcu_qsbr_start API
624 * If true, block till all the reader threads have completed entering
625 * the quiescent state referenced by token 't'.
627 * - 0 if all reader threads have NOT passed through specified number
628 * of quiescent states.
629 * - 1 if all reader threads have passed through specified number
630 * of quiescent states.
633 static __rte_always_inline int
634 rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
636 RTE_ASSERT(v != NULL);
638 /* Check if all the readers have already acknowledged this token */
639 if (likely(t <= v->acked_token))
642 if (likely(v->num_threads == v->max_threads))
643 return __rte_rcu_qsbr_check_all(v, t, wait);
645 return __rte_rcu_qsbr_check_selective(v, t, wait);
650 * @b EXPERIMENTAL: this API may change without prior notice
652 * Wait till the reader threads have entered quiescent state.
654 * This is implemented as a lock-free function. It is multi-thread safe.
655 * This API can be thought of as a wrapper around rte_rcu_qsbr_start and
656 * rte_rcu_qsbr_check APIs.
658 * If this API is called from multiple threads, only one of
659 * those threads can be reporting the quiescent state status on a
665 * Thread ID of the caller if it is registered to report quiescent state
666 * on this QS variable (i.e. the calling thread is also part of the
667 * readside critical section). If not, pass RTE_QSBR_THRID_INVALID.
671 rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id);
675 * @b EXPERIMENTAL: this API may change without prior notice
677 * Dump the details of a single QS variables to a file.
679 * It is NOT multi-thread safe.
682 * A pointer to a file for output
687 * On error - 1 with error code set in rte_errno.
688 * Possible rte_errno codes are:
689 * - EINVAL - NULL parameters are passed
693 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
699 #endif /* _RTE_RCU_QSBR_H_ */