rcu: fix spurious thread unregister
[dpdk.git] / lib / librte_rcu / rte_rcu_qsbr.h
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright (c) 2018 Arm Limited
3  */
4
5 #ifndef _RTE_RCU_QSBR_H_
6 #define _RTE_RCU_QSBR_H_
7
8 /**
9  * @file
10  * RTE Quiescent State Based Reclamation (QSBR)
11  *
12  * Quiescent State (QS) is any point in the thread execution
13  * where the thread does not hold a reference to a data structure
14  * in shared memory. While using lock-less data structures, the writer
15  * can safely free memory once all the reader threads have entered
16  * quiescent state.
17  *
18  * This library provides the ability for the readers to report quiescent
19  * state and for the writers to identify when all the readers have
20  * entered quiescent state.
21  */
22
23 #ifdef __cplusplus
24 extern "C" {
25 #endif
26
27 #include <stdbool.h>
28 #include <stdio.h>
29 #include <stdint.h>
30 #include <inttypes.h>
31 #include <errno.h>
32 #include <rte_common.h>
33 #include <rte_memory.h>
34 #include <rte_lcore.h>
35 #include <rte_debug.h>
36 #include <rte_atomic.h>
37
38 extern int rte_rcu_log_type;
39
40 #if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
41 #define __RTE_RCU_DP_LOG(level, fmt, args...) \
42         rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
43                 "%s(): " fmt "\n", __func__, ## args)
44 #else
45 #define __RTE_RCU_DP_LOG(level, fmt, args...)
46 #endif
47
48 #if defined(RTE_LIBRTE_RCU_DEBUG)
49 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...) do {\
50         if (v->qsbr_cnt[thread_id].lock_cnt) \
51                 rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
52                         "%s(): " fmt "\n", __func__, ## args); \
53 } while (0)
54 #else
55 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...)
56 #endif
57
58 /* Registered thread IDs are stored as a bitmap of 64b element array.
59  * Given thread id needs to be converted to index into the array and
60  * the id within the array element.
61  */
62 #define __RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)
63 #define __RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \
64         RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \
65                 __RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE)
66 #define __RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \
67         ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)
68 #define __RTE_QSBR_THRID_INDEX_SHIFT 6
69 #define __RTE_QSBR_THRID_MASK 0x3f
70 #define RTE_QSBR_THRID_INVALID 0xffffffff
71
72 /* Worker thread counter */
73 struct rte_rcu_qsbr_cnt {
74         uint64_t cnt;
75         /**< Quiescent state counter. Value 0 indicates the thread is offline
76          *   64b counter is used to avoid adding more code to address
77          *   counter overflow. Changing this to 32b would require additional
78          *   changes to various APIs.
79          */
80         uint32_t lock_cnt;
81         /**< Lock counter. Used when CONFIG_RTE_LIBRTE_RCU_DEBUG is enabled */
82 } __rte_cache_aligned;
83
84 #define __RTE_QSBR_CNT_THR_OFFLINE 0
85 #define __RTE_QSBR_CNT_INIT 1
86
87 /* RTE Quiescent State variable structure.
88  * This structure has two elements that vary in size based on the
89  * 'max_threads' parameter.
90  * 1) Quiescent state counter array
91  * 2) Register thread ID array
92  */
93 struct rte_rcu_qsbr {
94         uint64_t token __rte_cache_aligned;
95         /**< Counter to allow for multiple concurrent quiescent state queries */
96
97         uint32_t num_elems __rte_cache_aligned;
98         /**< Number of elements in the thread ID array */
99         uint32_t num_threads;
100         /**< Number of threads currently using this QS variable */
101         uint32_t max_threads;
102         /**< Maximum number of threads using this QS variable */
103
104         struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;
105         /**< Quiescent state counter array of 'max_threads' elements */
106
107         /**< Registered thread IDs are stored in a bitmap array,
108          *   after the quiescent state counter array.
109          */
110 } __rte_cache_aligned;
111
112 /**
113  * @warning
114  * @b EXPERIMENTAL: this API may change without prior notice
115  *
116  * Return the size of the memory occupied by a Quiescent State variable.
117  *
118  * @param max_threads
119  *   Maximum number of threads reporting quiescent state on this variable.
120  * @return
121  *   On success - size of memory in bytes required for this QS variable.
122  *   On error - 1 with error code set in rte_errno.
123  *   Possible rte_errno codes are:
124  *   - EINVAL - max_threads is 0
125  */
126 __rte_experimental
127 size_t
128 rte_rcu_qsbr_get_memsize(uint32_t max_threads);
129
130 /**
131  * @warning
132  * @b EXPERIMENTAL: this API may change without prior notice
133  *
134  * Initialize a Quiescent State (QS) variable.
135  *
136  * @param v
137  *   QS variable
138  * @param max_threads
139  *   Maximum number of threads reporting quiescent state on this variable.
140  *   This should be the same value as passed to rte_rcu_qsbr_get_memsize.
141  * @return
142  *   On success - 0
143  *   On error - 1 with error code set in rte_errno.
144  *   Possible rte_errno codes are:
145  *   - EINVAL - max_threads is 0 or 'v' is NULL.
146  *
147  */
148 __rte_experimental
149 int
150 rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
151
152 /**
153  * @warning
154  * @b EXPERIMENTAL: this API may change without prior notice
155  *
156  * Register a reader thread to report its quiescent state
157  * on a QS variable.
158  *
159  * This is implemented as a lock-free function. It is multi-thread
160  * safe.
161  * Any reader thread that wants to report its quiescent state must
162  * call this API. This can be called during initialization or as part
163  * of the packet processing loop.
164  *
165  * Note that rte_rcu_qsbr_thread_online must be called before the
166  * thread updates its quiescent state using rte_rcu_qsbr_quiescent.
167  *
168  * @param v
169  *   QS variable
170  * @param thread_id
171  *   Reader thread with this thread ID will report its quiescent state on
172  *   the QS variable. thread_id is a value between 0 and (max_threads - 1).
173  *   'max_threads' is the parameter passed in 'rte_rcu_qsbr_init' API.
174  */
175 __rte_experimental
176 int
177 rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);
178
179 /**
180  * @warning
181  * @b EXPERIMENTAL: this API may change without prior notice
182  *
183  * Remove a reader thread, from the list of threads reporting their
184  * quiescent state on a QS variable.
185  *
186  * This is implemented as a lock-free function. It is multi-thread safe.
187  * This API can be called from the reader threads during shutdown.
188  * Ongoing quiescent state queries will stop waiting for the status from this
189  * unregistered reader thread.
190  *
191  * @param v
192  *   QS variable
193  * @param thread_id
194  *   Reader thread with this thread ID will stop reporting its quiescent
195  *   state on the QS variable.
196  */
197 __rte_experimental
198 int
199 rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
200
201 /**
202  * @warning
203  * @b EXPERIMENTAL: this API may change without prior notice
204  *
205  * Add a registered reader thread, to the list of threads reporting their
206  * quiescent state on a QS variable.
207  *
208  * This is implemented as a lock-free function. It is multi-thread
209  * safe.
210  *
211  * Any registered reader thread that wants to report its quiescent state must
212  * call this API before calling rte_rcu_qsbr_quiescent. This can be called
213  * during initialization or as part of the packet processing loop.
214  *
215  * The reader thread must call rte_rcu_thread_offline API, before
216  * calling any functions that block, to ensure that rte_rcu_qsbr_check
217  * API does not wait indefinitely for the reader thread to update its QS.
218  *
219  * The reader thread must call rte_rcu_thread_online API, after the blocking
220  * function call returns, to ensure that rte_rcu_qsbr_check API
221  * waits for the reader thread to update its quiescent state.
222  *
223  * @param v
224  *   QS variable
225  * @param thread_id
226  *   Reader thread with this thread ID will report its quiescent state on
227  *   the QS variable.
228  */
229 __rte_experimental
230 static __rte_always_inline void
231 rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
232 {
233         uint64_t t;
234
235         RTE_ASSERT(v != NULL && thread_id < v->max_threads);
236
237         __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
238                                 v->qsbr_cnt[thread_id].lock_cnt);
239
240         /* Copy the current value of token.
241          * The fence at the end of the function will ensure that
242          * the following will not move down after the load of any shared
243          * data structure.
244          */
245         t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
246
247         /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
248          * 'cnt' (64b) is accessed atomically.
249          */
250         __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
251                 t, __ATOMIC_RELAXED);
252
253         /* The subsequent load of the data structure should not
254          * move above the store. Hence a store-load barrier
255          * is required.
256          * If the load of the data structure moves above the store,
257          * writer might not see that the reader is online, even though
258          * the reader is referencing the shared data structure.
259          */
260 #ifdef RTE_ARCH_X86_64
261         /* rte_smp_mb() for x86 is lighter */
262         rte_smp_mb();
263 #else
264         __atomic_thread_fence(__ATOMIC_SEQ_CST);
265 #endif
266 }
267
268 /**
269  * @warning
270  * @b EXPERIMENTAL: this API may change without prior notice
271  *
272  * Remove a registered reader thread from the list of threads reporting their
273  * quiescent state on a QS variable.
274  *
275  * This is implemented as a lock-free function. It is multi-thread
276  * safe.
277  *
278  * This can be called during initialization or as part of the packet
279  * processing loop.
280  *
281  * The reader thread must call rte_rcu_thread_offline API, before
282  * calling any functions that block, to ensure that rte_rcu_qsbr_check
283  * API does not wait indefinitely for the reader thread to update its QS.
284  *
285  * @param v
286  *   QS variable
287  * @param thread_id
288  *   rte_rcu_qsbr_check API will not wait for the reader thread with
289  *   this thread ID to report its quiescent state on the QS variable.
290  */
291 __rte_experimental
292 static __rte_always_inline void
293 rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
294 {
295         RTE_ASSERT(v != NULL && thread_id < v->max_threads);
296
297         __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
298                                 v->qsbr_cnt[thread_id].lock_cnt);
299
300         /* The reader can go offline only after the load of the
301          * data structure is completed. i.e. any load of the
302          * data strcture can not move after this store.
303          */
304
305         __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
306                 __RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE);
307 }
308
309 /**
310  * @warning
311  * @b EXPERIMENTAL: this API may change without prior notice
312  *
313  * Acquire a lock for accessing a shared data structure.
314  *
315  * This is implemented as a lock-free function. It is multi-thread
316  * safe.
317  *
318  * This API is provided to aid debugging. This should be called before
319  * accessing a shared data structure.
320  *
321  * When CONFIG_RTE_LIBRTE_RCU_DEBUG is enabled a lock counter is incremented.
322  * Similarly rte_rcu_qsbr_unlock will decrement the counter. When the
323  * rte_rcu_qsbr_check API will verify that this counter is 0.
324  *
325  * When CONFIG_RTE_LIBRTE_RCU_DEBUG is disabled, this API will do nothing.
326  *
327  * @param v
328  *   QS variable
329  * @param thread_id
330  *   Reader thread id
331  */
332 __rte_experimental
333 static __rte_always_inline void
334 rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v,
335                         __rte_unused unsigned int thread_id)
336 {
337         RTE_ASSERT(v != NULL && thread_id < v->max_threads);
338
339 #if defined(RTE_LIBRTE_RCU_DEBUG)
340         /* Increment the lock counter */
341         __atomic_fetch_add(&v->qsbr_cnt[thread_id].lock_cnt,
342                                 1, __ATOMIC_ACQUIRE);
343 #endif
344 }
345
346 /**
347  * @warning
348  * @b EXPERIMENTAL: this API may change without prior notice
349  *
350  * Release a lock after accessing a shared data structure.
351  *
352  * This is implemented as a lock-free function. It is multi-thread
353  * safe.
354  *
355  * This API is provided to aid debugging. This should be called after
356  * accessing a shared data structure.
357  *
358  * When CONFIG_RTE_LIBRTE_RCU_DEBUG is enabled, rte_rcu_qsbr_unlock will
359  * decrement a lock counter. rte_rcu_qsbr_check API will verify that this
360  * counter is 0.
361  *
362  * When CONFIG_RTE_LIBRTE_RCU_DEBUG is disabled, this API will do nothing.
363  *
364  * @param v
365  *   QS variable
366  * @param thread_id
367  *   Reader thread id
368  */
369 __rte_experimental
370 static __rte_always_inline void
371 rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v,
372                         __rte_unused unsigned int thread_id)
373 {
374         RTE_ASSERT(v != NULL && thread_id < v->max_threads);
375
376 #if defined(RTE_LIBRTE_RCU_DEBUG)
377         /* Decrement the lock counter */
378         __atomic_fetch_sub(&v->qsbr_cnt[thread_id].lock_cnt,
379                                 1, __ATOMIC_RELEASE);
380
381         __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING,
382                                 "Lock counter %u. Nested locks?\n",
383                                 v->qsbr_cnt[thread_id].lock_cnt);
384 #endif
385 }
386
387 /**
388  * @warning
389  * @b EXPERIMENTAL: this API may change without prior notice
390  *
391  * Ask the reader threads to report the quiescent state
392  * status.
393  *
394  * This is implemented as a lock-free function. It is multi-thread
395  * safe and can be called from worker threads.
396  *
397  * @param v
398  *   QS variable
399  * @return
400  *   - This is the token for this call of the API. This should be
401  *     passed to rte_rcu_qsbr_check API.
402  */
403 __rte_experimental
404 static __rte_always_inline uint64_t
405 rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
406 {
407         uint64_t t;
408
409         RTE_ASSERT(v != NULL);
410
411         /* Release the changes to the shared data structure.
412          * This store release will ensure that changes to any data
413          * structure are visible to the workers before the token
414          * update is visible.
415          */
416         t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);
417
418         return t;
419 }
420
421 /**
422  * @warning
423  * @b EXPERIMENTAL: this API may change without prior notice
424  *
425  * Update quiescent state for a reader thread.
426  *
427  * This is implemented as a lock-free function. It is multi-thread safe.
428  * All the reader threads registered to report their quiescent state
429  * on the QS variable must call this API.
430  *
431  * @param v
432  *   QS variable
433  * @param thread_id
434  *   Update the quiescent state for the reader with this thread ID.
435  */
436 __rte_experimental
437 static __rte_always_inline void
438 rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
439 {
440         uint64_t t;
441
442         RTE_ASSERT(v != NULL && thread_id < v->max_threads);
443
444         __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
445                                 v->qsbr_cnt[thread_id].lock_cnt);
446
447         /* Acquire the changes to the shared data structure released
448          * by rte_rcu_qsbr_start.
449          * Later loads of the shared data structure should not move
450          * above this load. Hence, use load-acquire.
451          */
452         t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
453
454         /* Inform the writer that updates are visible to this reader.
455          * Prior loads of the shared data structure should not move
456          * beyond this store. Hence use store-release.
457          */
458         __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
459                          t, __ATOMIC_RELEASE);
460
461         __RTE_RCU_DP_LOG(DEBUG, "%s: update: token = %"PRIu64", Thread ID = %d",
462                 __func__, t, thread_id);
463 }
464
465 /* Check the quiescent state counter for registered threads only, assuming
466  * that not all threads have registered.
467  */
468 static __rte_always_inline int
469 __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
470 {
471         uint32_t i, j, id;
472         uint64_t bmap;
473         uint64_t c;
474         uint64_t *reg_thread_id;
475
476         for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
477                 i < v->num_elems;
478                 i++, reg_thread_id++) {
479                 /* Load the current registered thread bit map before
480                  * loading the reader thread quiescent state counters.
481                  */
482                 bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE);
483                 id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
484
485                 while (bmap) {
486                         j = __builtin_ctzl(bmap);
487                         __RTE_RCU_DP_LOG(DEBUG,
488                                 "%s: check: token = %"PRIu64", wait = %d, Bit Map = 0x%"PRIx64", Thread ID = %d",
489                                 __func__, t, wait, bmap, id + j);
490                         c = __atomic_load_n(
491                                         &v->qsbr_cnt[id + j].cnt,
492                                         __ATOMIC_ACQUIRE);
493                         __RTE_RCU_DP_LOG(DEBUG,
494                                 "%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d",
495                                 __func__, t, wait, c, id+j);
496                         /* Counter is not checked for wrap-around condition
497                          * as it is a 64b counter.
498                          */
499                         if (unlikely(c !=
500                                 __RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
501                                 /* This thread is not in quiescent state */
502                                 if (!wait)
503                                         return 0;
504
505                                 rte_pause();
506                                 /* This thread might have unregistered.
507                                  * Re-read the bitmap.
508                                  */
509                                 bmap = __atomic_load_n(reg_thread_id,
510                                                 __ATOMIC_ACQUIRE);
511
512                                 continue;
513                         }
514
515                         bmap &= ~(1UL << j);
516                 }
517         }
518
519         return 1;
520 }
521
522 /* Check the quiescent state counter for all threads, assuming that
523  * all the threads have registered.
524  */
525 static __rte_always_inline int
526 __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
527 {
528         uint32_t i;
529         struct rte_rcu_qsbr_cnt *cnt;
530         uint64_t c;
531
532         for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
533                 __RTE_RCU_DP_LOG(DEBUG,
534                         "%s: check: token = %"PRIu64", wait = %d, Thread ID = %d",
535                         __func__, t, wait, i);
536                 while (1) {
537                         c = __atomic_load_n(&cnt->cnt, __ATOMIC_ACQUIRE);
538                         __RTE_RCU_DP_LOG(DEBUG,
539                                 "%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d",
540                                 __func__, t, wait, c, i);
541                         /* Counter is not checked for wrap-around condition
542                          * as it is a 64b counter.
543                          */
544                         if (likely(c == __RTE_QSBR_CNT_THR_OFFLINE || c >= t))
545                                 break;
546
547                         /* This thread is not in quiescent state */
548                         if (!wait)
549                                 return 0;
550
551                         rte_pause();
552                 }
553         }
554
555         return 1;
556 }
557
558 /**
559  * @warning
560  * @b EXPERIMENTAL: this API may change without prior notice
561  *
562  * Checks if all the reader threads have entered the quiescent state
563  * referenced by token.
564  *
565  * This is implemented as a lock-free function. It is multi-thread
566  * safe and can be called from the worker threads as well.
567  *
568  * If this API is called with 'wait' set to true, the following
569  * factors must be considered:
570  *
571  * 1) If the calling thread is also reporting the status on the
572  * same QS variable, it must update the quiescent state status, before
573  * calling this API.
574  *
575  * 2) In addition, while calling from multiple threads, only
576  * one of those threads can be reporting the quiescent state status
577  * on a given QS variable.
578  *
579  * @param v
580  *   QS variable
581  * @param t
582  *   Token returned by rte_rcu_qsbr_start API
583  * @param wait
584  *   If true, block till all the reader threads have completed entering
585  *   the quiescent state referenced by token 't'.
586  * @return
587  *   - 0 if all reader threads have NOT passed through specified number
588  *     of quiescent states.
589  *   - 1 if all reader threads have passed through specified number
590  *     of quiescent states.
591  */
592 __rte_experimental
593 static __rte_always_inline int
594 rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
595 {
596         RTE_ASSERT(v != NULL);
597
598         if (likely(v->num_threads == v->max_threads))
599                 return __rte_rcu_qsbr_check_all(v, t, wait);
600         else
601                 return __rte_rcu_qsbr_check_selective(v, t, wait);
602 }
603
604 /**
605  * @warning
606  * @b EXPERIMENTAL: this API may change without prior notice
607  *
608  * Wait till the reader threads have entered quiescent state.
609  *
610  * This is implemented as a lock-free function. It is multi-thread safe.
611  * This API can be thought of as a wrapper around rte_rcu_qsbr_start and
612  * rte_rcu_qsbr_check APIs.
613  *
614  * If this API is called from multiple threads, only one of
615  * those threads can be reporting the quiescent state status on a
616  * given QS variable.
617  *
618  * @param v
619  *   QS variable
620  * @param thread_id
621  *   Thread ID of the caller if it is registered to report quiescent state
622  *   on this QS variable (i.e. the calling thread is also part of the
623  *   readside critical section). If not, pass RTE_QSBR_THRID_INVALID.
624  */
625 __rte_experimental
626 void
627 rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id);
628
629 /**
630  * @warning
631  * @b EXPERIMENTAL: this API may change without prior notice
632  *
633  * Dump the details of a single QS variables to a file.
634  *
635  * It is NOT multi-thread safe.
636  *
637  * @param f
638  *   A pointer to a file for output
639  * @param v
640  *   QS variable
641  * @return
642  *   On success - 0
643  *   On error - 1 with error code set in rte_errno.
644  *   Possible rte_errno codes are:
645  *   - EINVAL - NULL parameters are passed
646  */
647 __rte_experimental
648 int
649 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
650
651 #ifdef __cplusplus
652 }
653 #endif
654
655 #endif /* _RTE_RCU_QSBR_H_ */