rcu: add RCU library supporting QSBR mechanism
[dpdk.git] / lib / librte_rcu / rte_rcu_qsbr.h
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright (c) 2018 Arm Limited
3  */
4
5 #ifndef _RTE_RCU_QSBR_H_
6 #define _RTE_RCU_QSBR_H_
7
8 /**
9  * @file
10  * RTE Quiescent State Based Reclamation (QSBR)
11  *
12  * Quiescent State (QS) is any point in the thread execution
13  * where the thread does not hold a reference to a data structure
14  * in shared memory. While using lock-less data structures, the writer
15  * can safely free memory once all the reader threads have entered
16  * quiescent state.
17  *
18  * This library provides the ability for the readers to report quiescent
19  * state and for the writers to identify when all the readers have
20  * entered quiescent state.
21  */
22
23 #ifdef __cplusplus
24 extern "C" {
25 #endif
26
27 #include <stdio.h>
28 #include <stdint.h>
29 #include <inttypes.h>
30 #include <errno.h>
31 #include <rte_common.h>
32 #include <rte_memory.h>
33 #include <rte_lcore.h>
34 #include <rte_debug.h>
35 #include <rte_atomic.h>
36
37 extern int rte_rcu_log_type;
38
39 #if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
40 #define __RTE_RCU_DP_LOG(level, fmt, args...) \
41         rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
42                 "%s(): " fmt "\n", __func__, ## args)
43 #else
44 #define __RTE_RCU_DP_LOG(level, fmt, args...)
45 #endif
46
47 #if defined(RTE_LIBRTE_RCU_DEBUG)
48 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...) do {\
49         if (v->qsbr_cnt[thread_id].lock_cnt) \
50                 rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
51                         "%s(): " fmt "\n", __func__, ## args); \
52 } while (0)
53 #else
54 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...)
55 #endif
56
57 /* Registered thread IDs are stored as a bitmap of 64b element array.
58  * Given thread id needs to be converted to index into the array and
59  * the id within the array element.
60  */
61 #define __RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)
62 #define __RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \
63         RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \
64                 __RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE)
65 #define __RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \
66         ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)
67 #define __RTE_QSBR_THRID_INDEX_SHIFT 6
68 #define __RTE_QSBR_THRID_MASK 0x3f
69 #define RTE_QSBR_THRID_INVALID 0xffffffff
70
71 /* Worker thread counter */
72 struct rte_rcu_qsbr_cnt {
73         uint64_t cnt;
74         /**< Quiescent state counter. Value 0 indicates the thread is offline
75          *   64b counter is used to avoid adding more code to address
76          *   counter overflow. Changing this to 32b would require additional
77          *   changes to various APIs.
78          */
79         uint32_t lock_cnt;
80         /**< Lock counter. Used when CONFIG_RTE_LIBRTE_RCU_DEBUG is enabled */
81 } __rte_cache_aligned;
82
83 #define __RTE_QSBR_CNT_THR_OFFLINE 0
84 #define __RTE_QSBR_CNT_INIT 1
85
86 /* RTE Quiescent State variable structure.
87  * This structure has two elements that vary in size based on the
88  * 'max_threads' parameter.
89  * 1) Quiescent state counter array
90  * 2) Register thread ID array
91  */
92 struct rte_rcu_qsbr {
93         uint64_t token __rte_cache_aligned;
94         /**< Counter to allow for multiple concurrent quiescent state queries */
95
96         uint32_t num_elems __rte_cache_aligned;
97         /**< Number of elements in the thread ID array */
98         uint32_t num_threads;
99         /**< Number of threads currently using this QS variable */
100         uint32_t max_threads;
101         /**< Maximum number of threads using this QS variable */
102
103         struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;
104         /**< Quiescent state counter array of 'max_threads' elements */
105
106         /**< Registered thread IDs are stored in a bitmap array,
107          *   after the quiescent state counter array.
108          */
109 } __rte_cache_aligned;
110
111 /**
112  * @warning
113  * @b EXPERIMENTAL: this API may change without prior notice
114  *
115  * Return the size of the memory occupied by a Quiescent State variable.
116  *
117  * @param max_threads
118  *   Maximum number of threads reporting quiescent state on this variable.
119  * @return
120  *   On success - size of memory in bytes required for this QS variable.
121  *   On error - 1 with error code set in rte_errno.
122  *   Possible rte_errno codes are:
123  *   - EINVAL - max_threads is 0
124  */
125 size_t __rte_experimental
126 rte_rcu_qsbr_get_memsize(uint32_t max_threads);
127
128 /**
129  * @warning
130  * @b EXPERIMENTAL: this API may change without prior notice
131  *
132  * Initialize a Quiescent State (QS) variable.
133  *
134  * @param v
135  *   QS variable
136  * @param max_threads
137  *   Maximum number of threads reporting quiescent state on this variable.
138  *   This should be the same value as passed to rte_rcu_qsbr_get_memsize.
139  * @return
140  *   On success - 0
141  *   On error - 1 with error code set in rte_errno.
142  *   Possible rte_errno codes are:
143  *   - EINVAL - max_threads is 0 or 'v' is NULL.
144  *
145  */
146 int __rte_experimental
147 rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
148
149 /**
150  * @warning
151  * @b EXPERIMENTAL: this API may change without prior notice
152  *
153  * Register a reader thread to report its quiescent state
154  * on a QS variable.
155  *
156  * This is implemented as a lock-free function. It is multi-thread
157  * safe.
158  * Any reader thread that wants to report its quiescent state must
159  * call this API. This can be called during initialization or as part
160  * of the packet processing loop.
161  *
162  * Note that rte_rcu_qsbr_thread_online must be called before the
163  * thread updates its quiescent state using rte_rcu_qsbr_quiescent.
164  *
165  * @param v
166  *   QS variable
167  * @param thread_id
168  *   Reader thread with this thread ID will report its quiescent state on
169  *   the QS variable. thread_id is a value between 0 and (max_threads - 1).
170  *   'max_threads' is the parameter passed in 'rte_rcu_qsbr_init' API.
171  */
172 int __rte_experimental
173 rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);
174
175 /**
176  * @warning
177  * @b EXPERIMENTAL: this API may change without prior notice
178  *
179  * Remove a reader thread, from the list of threads reporting their
180  * quiescent state on a QS variable.
181  *
182  * This is implemented as a lock-free function. It is multi-thread safe.
183  * This API can be called from the reader threads during shutdown.
184  * Ongoing quiescent state queries will stop waiting for the status from this
185  * unregistered reader thread.
186  *
187  * @param v
188  *   QS variable
189  * @param thread_id
190  *   Reader thread with this thread ID will stop reporting its quiescent
191  *   state on the QS variable.
192  */
193 int __rte_experimental
194 rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
195
196 /**
197  * @warning
198  * @b EXPERIMENTAL: this API may change without prior notice
199  *
200  * Add a registered reader thread, to the list of threads reporting their
201  * quiescent state on a QS variable.
202  *
203  * This is implemented as a lock-free function. It is multi-thread
204  * safe.
205  *
206  * Any registered reader thread that wants to report its quiescent state must
207  * call this API before calling rte_rcu_qsbr_quiescent. This can be called
208  * during initialization or as part of the packet processing loop.
209  *
210  * The reader thread must call rte_rcu_thread_offline API, before
211  * calling any functions that block, to ensure that rte_rcu_qsbr_check
212  * API does not wait indefinitely for the reader thread to update its QS.
213  *
214  * The reader thread must call rte_rcu_thread_online API, after the blocking
215  * function call returns, to ensure that rte_rcu_qsbr_check API
216  * waits for the reader thread to update its quiescent state.
217  *
218  * @param v
219  *   QS variable
220  * @param thread_id
221  *   Reader thread with this thread ID will report its quiescent state on
222  *   the QS variable.
223  */
224 static __rte_always_inline void __rte_experimental
225 rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
226 {
227         uint64_t t;
228
229         RTE_ASSERT(v != NULL && thread_id < v->max_threads);
230
231         __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
232                                 v->qsbr_cnt[thread_id].lock_cnt);
233
234         /* Copy the current value of token.
235          * The fence at the end of the function will ensure that
236          * the following will not move down after the load of any shared
237          * data structure.
238          */
239         t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
240
241         /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
242          * 'cnt' (64b) is accessed atomically.
243          */
244         __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
245                 t, __ATOMIC_RELAXED);
246
247         /* The subsequent load of the data structure should not
248          * move above the store. Hence a store-load barrier
249          * is required.
250          * If the load of the data structure moves above the store,
251          * writer might not see that the reader is online, even though
252          * the reader is referencing the shared data structure.
253          */
254 #ifdef RTE_ARCH_X86_64
255         /* rte_smp_mb() for x86 is lighter */
256         rte_smp_mb();
257 #else
258         __atomic_thread_fence(__ATOMIC_SEQ_CST);
259 #endif
260 }
261
262 /**
263  * @warning
264  * @b EXPERIMENTAL: this API may change without prior notice
265  *
266  * Remove a registered reader thread from the list of threads reporting their
267  * quiescent state on a QS variable.
268  *
269  * This is implemented as a lock-free function. It is multi-thread
270  * safe.
271  *
272  * This can be called during initialization or as part of the packet
273  * processing loop.
274  *
275  * The reader thread must call rte_rcu_thread_offline API, before
276  * calling any functions that block, to ensure that rte_rcu_qsbr_check
277  * API does not wait indefinitely for the reader thread to update its QS.
278  *
279  * @param v
280  *   QS variable
281  * @param thread_id
282  *   rte_rcu_qsbr_check API will not wait for the reader thread with
283  *   this thread ID to report its quiescent state on the QS variable.
284  */
285 static __rte_always_inline void __rte_experimental
286 rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
287 {
288         RTE_ASSERT(v != NULL && thread_id < v->max_threads);
289
290         __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
291                                 v->qsbr_cnt[thread_id].lock_cnt);
292
293         /* The reader can go offline only after the load of the
294          * data structure is completed. i.e. any load of the
295          * data strcture can not move after this store.
296          */
297
298         __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
299                 __RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE);
300 }
301
302 /**
303  * @warning
304  * @b EXPERIMENTAL: this API may change without prior notice
305  *
306  * Acquire a lock for accessing a shared data structure.
307  *
308  * This is implemented as a lock-free function. It is multi-thread
309  * safe.
310  *
311  * This API is provided to aid debugging. This should be called before
312  * accessing a shared data structure.
313  *
314  * When CONFIG_RTE_LIBRTE_RCU_DEBUG is enabled a lock counter is incremented.
315  * Similarly rte_rcu_qsbr_unlock will decrement the counter. When the
316  * rte_rcu_qsbr_check API will verify that this counter is 0.
317  *
318  * When CONFIG_RTE_LIBRTE_RCU_DEBUG is disabled, this API will do nothing.
319  *
320  * @param v
321  *   QS variable
322  * @param thread_id
323  *   Reader thread id
324  */
325 static __rte_always_inline void __rte_experimental
326 rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v,
327                         __rte_unused unsigned int thread_id)
328 {
329         RTE_ASSERT(v != NULL && thread_id < v->max_threads);
330
331 #if defined(RTE_LIBRTE_RCU_DEBUG)
332         /* Increment the lock counter */
333         __atomic_fetch_add(&v->qsbr_cnt[thread_id].lock_cnt,
334                                 1, __ATOMIC_ACQUIRE);
335 #endif
336 }
337
338 /**
339  * @warning
340  * @b EXPERIMENTAL: this API may change without prior notice
341  *
342  * Release a lock after accessing a shared data structure.
343  *
344  * This is implemented as a lock-free function. It is multi-thread
345  * safe.
346  *
347  * This API is provided to aid debugging. This should be called after
348  * accessing a shared data structure.
349  *
350  * When CONFIG_RTE_LIBRTE_RCU_DEBUG is enabled, rte_rcu_qsbr_unlock will
351  * decrement a lock counter. rte_rcu_qsbr_check API will verify that this
352  * counter is 0.
353  *
354  * When CONFIG_RTE_LIBRTE_RCU_DEBUG is disabled, this API will do nothing.
355  *
356  * @param v
357  *   QS variable
358  * @param thread_id
359  *   Reader thread id
360  */
361 static __rte_always_inline void __rte_experimental
362 rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v,
363                         __rte_unused unsigned int thread_id)
364 {
365         RTE_ASSERT(v != NULL && thread_id < v->max_threads);
366
367 #if defined(RTE_LIBRTE_RCU_DEBUG)
368         /* Decrement the lock counter */
369         __atomic_fetch_sub(&v->qsbr_cnt[thread_id].lock_cnt,
370                                 1, __ATOMIC_RELEASE);
371
372         __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING,
373                                 "Lock counter %u. Nested locks?\n",
374                                 v->qsbr_cnt[thread_id].lock_cnt);
375 #endif
376 }
377
378 /**
379  * @warning
380  * @b EXPERIMENTAL: this API may change without prior notice
381  *
382  * Ask the reader threads to report the quiescent state
383  * status.
384  *
385  * This is implemented as a lock-free function. It is multi-thread
386  * safe and can be called from worker threads.
387  *
388  * @param v
389  *   QS variable
390  * @return
391  *   - This is the token for this call of the API. This should be
392  *     passed to rte_rcu_qsbr_check API.
393  */
394 static __rte_always_inline uint64_t __rte_experimental
395 rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
396 {
397         uint64_t t;
398
399         RTE_ASSERT(v != NULL);
400
401         /* Release the changes to the shared data structure.
402          * This store release will ensure that changes to any data
403          * structure are visible to the workers before the token
404          * update is visible.
405          */
406         t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);
407
408         return t;
409 }
410
411 /**
412  * @warning
413  * @b EXPERIMENTAL: this API may change without prior notice
414  *
415  * Update quiescent state for a reader thread.
416  *
417  * This is implemented as a lock-free function. It is multi-thread safe.
418  * All the reader threads registered to report their quiescent state
419  * on the QS variable must call this API.
420  *
421  * @param v
422  *   QS variable
423  * @param thread_id
424  *   Update the quiescent state for the reader with this thread ID.
425  */
426 static __rte_always_inline void __rte_experimental
427 rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
428 {
429         uint64_t t;
430
431         RTE_ASSERT(v != NULL && thread_id < v->max_threads);
432
433         __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
434                                 v->qsbr_cnt[thread_id].lock_cnt);
435
436         /* Acquire the changes to the shared data structure released
437          * by rte_rcu_qsbr_start.
438          * Later loads of the shared data structure should not move
439          * above this load. Hence, use load-acquire.
440          */
441         t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
442
443         /* Inform the writer that updates are visible to this reader.
444          * Prior loads of the shared data structure should not move
445          * beyond this store. Hence use store-release.
446          */
447         __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
448                          t, __ATOMIC_RELEASE);
449
450         __RTE_RCU_DP_LOG(DEBUG, "%s: update: token = %"PRIu64", Thread ID = %d",
451                 __func__, t, thread_id);
452 }
453
454 /* Check the quiescent state counter for registered threads only, assuming
455  * that not all threads have registered.
456  */
457 static __rte_always_inline int
458 __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
459 {
460         uint32_t i, j, id;
461         uint64_t bmap;
462         uint64_t c;
463         uint64_t *reg_thread_id;
464
465         for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
466                 i < v->num_elems;
467                 i++, reg_thread_id++) {
468                 /* Load the current registered thread bit map before
469                  * loading the reader thread quiescent state counters.
470                  */
471                 bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE);
472                 id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
473
474                 while (bmap) {
475                         j = __builtin_ctzl(bmap);
476                         __RTE_RCU_DP_LOG(DEBUG,
477                                 "%s: check: token = %"PRIu64", wait = %d, Bit Map = 0x%"PRIx64", Thread ID = %d",
478                                 __func__, t, wait, bmap, id + j);
479                         c = __atomic_load_n(
480                                         &v->qsbr_cnt[id + j].cnt,
481                                         __ATOMIC_ACQUIRE);
482                         __RTE_RCU_DP_LOG(DEBUG,
483                                 "%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d",
484                                 __func__, t, wait, c, id+j);
485                         /* Counter is not checked for wrap-around condition
486                          * as it is a 64b counter.
487                          */
488                         if (unlikely(c !=
489                                 __RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
490                                 /* This thread is not in quiescent state */
491                                 if (!wait)
492                                         return 0;
493
494                                 rte_pause();
495                                 /* This thread might have unregistered.
496                                  * Re-read the bitmap.
497                                  */
498                                 bmap = __atomic_load_n(reg_thread_id,
499                                                 __ATOMIC_ACQUIRE);
500
501                                 continue;
502                         }
503
504                         bmap &= ~(1UL << j);
505                 }
506         }
507
508         return 1;
509 }
510
511 /* Check the quiescent state counter for all threads, assuming that
512  * all the threads have registered.
513  */
514 static __rte_always_inline int
515 __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
516 {
517         uint32_t i;
518         struct rte_rcu_qsbr_cnt *cnt;
519         uint64_t c;
520
521         for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
522                 __RTE_RCU_DP_LOG(DEBUG,
523                         "%s: check: token = %"PRIu64", wait = %d, Thread ID = %d",
524                         __func__, t, wait, i);
525                 while (1) {
526                         c = __atomic_load_n(&cnt->cnt, __ATOMIC_ACQUIRE);
527                         __RTE_RCU_DP_LOG(DEBUG,
528                                 "%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d",
529                                 __func__, t, wait, c, i);
530                         /* Counter is not checked for wrap-around condition
531                          * as it is a 64b counter.
532                          */
533                         if (likely(c == __RTE_QSBR_CNT_THR_OFFLINE || c >= t))
534                                 break;
535
536                         /* This thread is not in quiescent state */
537                         if (!wait)
538                                 return 0;
539
540                         rte_pause();
541                 }
542         }
543
544         return 1;
545 }
546
547 /**
548  * @warning
549  * @b EXPERIMENTAL: this API may change without prior notice
550  *
551  * Checks if all the reader threads have entered the quiescent state
552  * referenced by token.
553  *
554  * This is implemented as a lock-free function. It is multi-thread
555  * safe and can be called from the worker threads as well.
556  *
557  * If this API is called with 'wait' set to true, the following
558  * factors must be considered:
559  *
560  * 1) If the calling thread is also reporting the status on the
561  * same QS variable, it must update the quiescent state status, before
562  * calling this API.
563  *
564  * 2) In addition, while calling from multiple threads, only
565  * one of those threads can be reporting the quiescent state status
566  * on a given QS variable.
567  *
568  * @param v
569  *   QS variable
570  * @param t
571  *   Token returned by rte_rcu_qsbr_start API
572  * @param wait
573  *   If true, block till all the reader threads have completed entering
574  *   the quiescent state referenced by token 't'.
575  * @return
576  *   - 0 if all reader threads have NOT passed through specified number
577  *     of quiescent states.
578  *   - 1 if all reader threads have passed through specified number
579  *     of quiescent states.
580  */
581 static __rte_always_inline int __rte_experimental
582 rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
583 {
584         RTE_ASSERT(v != NULL);
585
586         if (likely(v->num_threads == v->max_threads))
587                 return __rte_rcu_qsbr_check_all(v, t, wait);
588         else
589                 return __rte_rcu_qsbr_check_selective(v, t, wait);
590 }
591
592 /**
593  * @warning
594  * @b EXPERIMENTAL: this API may change without prior notice
595  *
596  * Wait till the reader threads have entered quiescent state.
597  *
598  * This is implemented as a lock-free function. It is multi-thread safe.
599  * This API can be thought of as a wrapper around rte_rcu_qsbr_start and
600  * rte_rcu_qsbr_check APIs.
601  *
602  * If this API is called from multiple threads, only one of
603  * those threads can be reporting the quiescent state status on a
604  * given QS variable.
605  *
606  * @param v
607  *   QS variable
608  * @param thread_id
609  *   Thread ID of the caller if it is registered to report quiescent state
610  *   on this QS variable (i.e. the calling thread is also part of the
611  *   readside critical section). If not, pass RTE_QSBR_THRID_INVALID.
612  */
613 void __rte_experimental
614 rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id);
615
616 /**
617  * @warning
618  * @b EXPERIMENTAL: this API may change without prior notice
619  *
620  * Dump the details of a single QS variables to a file.
621  *
622  * It is NOT multi-thread safe.
623  *
624  * @param f
625  *   A pointer to a file for output
626  * @param v
627  *   QS variable
628  * @return
629  *   On success - 0
630  *   On error - 1 with error code set in rte_errno.
631  *   Possible rte_errno codes are:
632  *   - EINVAL - NULL parameters are passed
633  */
634 int __rte_experimental
635 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
636
637 #ifdef __cplusplus
638 }
639 #endif
640
641 #endif /* _RTE_RCU_QSBR_H_ */