devtools: test pkg-config file
[dpdk.git] / lib / librte_rcu / rte_rcu_qsbr.h
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright (c) 2018 Arm Limited
3  */
4
5 #ifndef _RTE_RCU_QSBR_H_
6 #define _RTE_RCU_QSBR_H_
7
8 /**
9  * @file
10  * RTE Quiescent State Based Reclamation (QSBR)
11  *
12  * Quiescent State (QS) is any point in the thread execution
13  * where the thread does not hold a reference to a data structure
14  * in shared memory. While using lock-less data structures, the writer
15  * can safely free memory once all the reader threads have entered
16  * quiescent state.
17  *
18  * This library provides the ability for the readers to report quiescent
19  * state and for the writers to identify when all the readers have
20  * entered quiescent state.
21  */
22
23 #ifdef __cplusplus
24 extern "C" {
25 #endif
26
27 #include <stdio.h>
28 #include <stdint.h>
29 #include <inttypes.h>
30 #include <errno.h>
31 #include <rte_common.h>
32 #include <rte_memory.h>
33 #include <rte_lcore.h>
34 #include <rte_debug.h>
35 #include <rte_atomic.h>
36
37 extern int rte_rcu_log_type;
38
39 #if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
40 #define __RTE_RCU_DP_LOG(level, fmt, args...) \
41         rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
42                 "%s(): " fmt "\n", __func__, ## args)
43 #else
44 #define __RTE_RCU_DP_LOG(level, fmt, args...)
45 #endif
46
47 #if defined(RTE_LIBRTE_RCU_DEBUG)
48 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...) do {\
49         if (v->qsbr_cnt[thread_id].lock_cnt) \
50                 rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
51                         "%s(): " fmt "\n", __func__, ## args); \
52 } while (0)
53 #else
54 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...)
55 #endif
56
57 /* Registered thread IDs are stored as a bitmap of 64b element array.
58  * Given thread id needs to be converted to index into the array and
59  * the id within the array element.
60  */
61 #define __RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)
62 #define __RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \
63         RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \
64                 __RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE)
65 #define __RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \
66         ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)
67 #define __RTE_QSBR_THRID_INDEX_SHIFT 6
68 #define __RTE_QSBR_THRID_MASK 0x3f
69 #define RTE_QSBR_THRID_INVALID 0xffffffff
70
71 /* Worker thread counter */
72 struct rte_rcu_qsbr_cnt {
73         uint64_t cnt;
74         /**< Quiescent state counter. Value 0 indicates the thread is offline
75          *   64b counter is used to avoid adding more code to address
76          *   counter overflow. Changing this to 32b would require additional
77          *   changes to various APIs.
78          */
79         uint32_t lock_cnt;
80         /**< Lock counter. Used when CONFIG_RTE_LIBRTE_RCU_DEBUG is enabled */
81 } __rte_cache_aligned;
82
83 #define __RTE_QSBR_CNT_THR_OFFLINE 0
84 #define __RTE_QSBR_CNT_INIT 1
85
86 /* RTE Quiescent State variable structure.
87  * This structure has two elements that vary in size based on the
88  * 'max_threads' parameter.
89  * 1) Quiescent state counter array
90  * 2) Register thread ID array
91  */
92 struct rte_rcu_qsbr {
93         uint64_t token __rte_cache_aligned;
94         /**< Counter to allow for multiple concurrent quiescent state queries */
95
96         uint32_t num_elems __rte_cache_aligned;
97         /**< Number of elements in the thread ID array */
98         uint32_t num_threads;
99         /**< Number of threads currently using this QS variable */
100         uint32_t max_threads;
101         /**< Maximum number of threads using this QS variable */
102
103         struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;
104         /**< Quiescent state counter array of 'max_threads' elements */
105
106         /**< Registered thread IDs are stored in a bitmap array,
107          *   after the quiescent state counter array.
108          */
109 } __rte_cache_aligned;
110
111 /**
112  * @warning
113  * @b EXPERIMENTAL: this API may change without prior notice
114  *
115  * Return the size of the memory occupied by a Quiescent State variable.
116  *
117  * @param max_threads
118  *   Maximum number of threads reporting quiescent state on this variable.
119  * @return
120  *   On success - size of memory in bytes required for this QS variable.
121  *   On error - 1 with error code set in rte_errno.
122  *   Possible rte_errno codes are:
123  *   - EINVAL - max_threads is 0
124  */
125 __rte_experimental
126 size_t
127 rte_rcu_qsbr_get_memsize(uint32_t max_threads);
128
129 /**
130  * @warning
131  * @b EXPERIMENTAL: this API may change without prior notice
132  *
133  * Initialize a Quiescent State (QS) variable.
134  *
135  * @param v
136  *   QS variable
137  * @param max_threads
138  *   Maximum number of threads reporting quiescent state on this variable.
139  *   This should be the same value as passed to rte_rcu_qsbr_get_memsize.
140  * @return
141  *   On success - 0
142  *   On error - 1 with error code set in rte_errno.
143  *   Possible rte_errno codes are:
144  *   - EINVAL - max_threads is 0 or 'v' is NULL.
145  *
146  */
147 __rte_experimental
148 int
149 rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
150
151 /**
152  * @warning
153  * @b EXPERIMENTAL: this API may change without prior notice
154  *
155  * Register a reader thread to report its quiescent state
156  * on a QS variable.
157  *
158  * This is implemented as a lock-free function. It is multi-thread
159  * safe.
160  * Any reader thread that wants to report its quiescent state must
161  * call this API. This can be called during initialization or as part
162  * of the packet processing loop.
163  *
164  * Note that rte_rcu_qsbr_thread_online must be called before the
165  * thread updates its quiescent state using rte_rcu_qsbr_quiescent.
166  *
167  * @param v
168  *   QS variable
169  * @param thread_id
170  *   Reader thread with this thread ID will report its quiescent state on
171  *   the QS variable. thread_id is a value between 0 and (max_threads - 1).
172  *   'max_threads' is the parameter passed in 'rte_rcu_qsbr_init' API.
173  */
174 __rte_experimental
175 int
176 rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);
177
178 /**
179  * @warning
180  * @b EXPERIMENTAL: this API may change without prior notice
181  *
182  * Remove a reader thread, from the list of threads reporting their
183  * quiescent state on a QS variable.
184  *
185  * This is implemented as a lock-free function. It is multi-thread safe.
186  * This API can be called from the reader threads during shutdown.
187  * Ongoing quiescent state queries will stop waiting for the status from this
188  * unregistered reader thread.
189  *
190  * @param v
191  *   QS variable
192  * @param thread_id
193  *   Reader thread with this thread ID will stop reporting its quiescent
194  *   state on the QS variable.
195  */
196 __rte_experimental
197 int
198 rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
199
200 /**
201  * @warning
202  * @b EXPERIMENTAL: this API may change without prior notice
203  *
204  * Add a registered reader thread, to the list of threads reporting their
205  * quiescent state on a QS variable.
206  *
207  * This is implemented as a lock-free function. It is multi-thread
208  * safe.
209  *
210  * Any registered reader thread that wants to report its quiescent state must
211  * call this API before calling rte_rcu_qsbr_quiescent. This can be called
212  * during initialization or as part of the packet processing loop.
213  *
214  * The reader thread must call rte_rcu_thread_offline API, before
215  * calling any functions that block, to ensure that rte_rcu_qsbr_check
216  * API does not wait indefinitely for the reader thread to update its QS.
217  *
218  * The reader thread must call rte_rcu_thread_online API, after the blocking
219  * function call returns, to ensure that rte_rcu_qsbr_check API
220  * waits for the reader thread to update its quiescent state.
221  *
222  * @param v
223  *   QS variable
224  * @param thread_id
225  *   Reader thread with this thread ID will report its quiescent state on
226  *   the QS variable.
227  */
228 __rte_experimental
229 static __rte_always_inline void
230 rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
231 {
232         uint64_t t;
233
234         RTE_ASSERT(v != NULL && thread_id < v->max_threads);
235
236         __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
237                                 v->qsbr_cnt[thread_id].lock_cnt);
238
239         /* Copy the current value of token.
240          * The fence at the end of the function will ensure that
241          * the following will not move down after the load of any shared
242          * data structure.
243          */
244         t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
245
246         /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
247          * 'cnt' (64b) is accessed atomically.
248          */
249         __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
250                 t, __ATOMIC_RELAXED);
251
252         /* The subsequent load of the data structure should not
253          * move above the store. Hence a store-load barrier
254          * is required.
255          * If the load of the data structure moves above the store,
256          * writer might not see that the reader is online, even though
257          * the reader is referencing the shared data structure.
258          */
259 #ifdef RTE_ARCH_X86_64
260         /* rte_smp_mb() for x86 is lighter */
261         rte_smp_mb();
262 #else
263         __atomic_thread_fence(__ATOMIC_SEQ_CST);
264 #endif
265 }
266
267 /**
268  * @warning
269  * @b EXPERIMENTAL: this API may change without prior notice
270  *
271  * Remove a registered reader thread from the list of threads reporting their
272  * quiescent state on a QS variable.
273  *
274  * This is implemented as a lock-free function. It is multi-thread
275  * safe.
276  *
277  * This can be called during initialization or as part of the packet
278  * processing loop.
279  *
280  * The reader thread must call rte_rcu_thread_offline API, before
281  * calling any functions that block, to ensure that rte_rcu_qsbr_check
282  * API does not wait indefinitely for the reader thread to update its QS.
283  *
284  * @param v
285  *   QS variable
286  * @param thread_id
287  *   rte_rcu_qsbr_check API will not wait for the reader thread with
288  *   this thread ID to report its quiescent state on the QS variable.
289  */
290 __rte_experimental
291 static __rte_always_inline void
292 rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
293 {
294         RTE_ASSERT(v != NULL && thread_id < v->max_threads);
295
296         __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
297                                 v->qsbr_cnt[thread_id].lock_cnt);
298
299         /* The reader can go offline only after the load of the
300          * data structure is completed. i.e. any load of the
301          * data strcture can not move after this store.
302          */
303
304         __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
305                 __RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE);
306 }
307
308 /**
309  * @warning
310  * @b EXPERIMENTAL: this API may change without prior notice
311  *
312  * Acquire a lock for accessing a shared data structure.
313  *
314  * This is implemented as a lock-free function. It is multi-thread
315  * safe.
316  *
317  * This API is provided to aid debugging. This should be called before
318  * accessing a shared data structure.
319  *
320  * When CONFIG_RTE_LIBRTE_RCU_DEBUG is enabled a lock counter is incremented.
321  * Similarly rte_rcu_qsbr_unlock will decrement the counter. When the
322  * rte_rcu_qsbr_check API will verify that this counter is 0.
323  *
324  * When CONFIG_RTE_LIBRTE_RCU_DEBUG is disabled, this API will do nothing.
325  *
326  * @param v
327  *   QS variable
328  * @param thread_id
329  *   Reader thread id
330  */
331 __rte_experimental
332 static __rte_always_inline void
333 rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v,
334                         __rte_unused unsigned int thread_id)
335 {
336         RTE_ASSERT(v != NULL && thread_id < v->max_threads);
337
338 #if defined(RTE_LIBRTE_RCU_DEBUG)
339         /* Increment the lock counter */
340         __atomic_fetch_add(&v->qsbr_cnt[thread_id].lock_cnt,
341                                 1, __ATOMIC_ACQUIRE);
342 #endif
343 }
344
345 /**
346  * @warning
347  * @b EXPERIMENTAL: this API may change without prior notice
348  *
349  * Release a lock after accessing a shared data structure.
350  *
351  * This is implemented as a lock-free function. It is multi-thread
352  * safe.
353  *
354  * This API is provided to aid debugging. This should be called after
355  * accessing a shared data structure.
356  *
357  * When CONFIG_RTE_LIBRTE_RCU_DEBUG is enabled, rte_rcu_qsbr_unlock will
358  * decrement a lock counter. rte_rcu_qsbr_check API will verify that this
359  * counter is 0.
360  *
361  * When CONFIG_RTE_LIBRTE_RCU_DEBUG is disabled, this API will do nothing.
362  *
363  * @param v
364  *   QS variable
365  * @param thread_id
366  *   Reader thread id
367  */
368 __rte_experimental
369 static __rte_always_inline void
370 rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v,
371                         __rte_unused unsigned int thread_id)
372 {
373         RTE_ASSERT(v != NULL && thread_id < v->max_threads);
374
375 #if defined(RTE_LIBRTE_RCU_DEBUG)
376         /* Decrement the lock counter */
377         __atomic_fetch_sub(&v->qsbr_cnt[thread_id].lock_cnt,
378                                 1, __ATOMIC_RELEASE);
379
380         __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING,
381                                 "Lock counter %u. Nested locks?\n",
382                                 v->qsbr_cnt[thread_id].lock_cnt);
383 #endif
384 }
385
386 /**
387  * @warning
388  * @b EXPERIMENTAL: this API may change without prior notice
389  *
390  * Ask the reader threads to report the quiescent state
391  * status.
392  *
393  * This is implemented as a lock-free function. It is multi-thread
394  * safe and can be called from worker threads.
395  *
396  * @param v
397  *   QS variable
398  * @return
399  *   - This is the token for this call of the API. This should be
400  *     passed to rte_rcu_qsbr_check API.
401  */
402 __rte_experimental
403 static __rte_always_inline uint64_t
404 rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
405 {
406         uint64_t t;
407
408         RTE_ASSERT(v != NULL);
409
410         /* Release the changes to the shared data structure.
411          * This store release will ensure that changes to any data
412          * structure are visible to the workers before the token
413          * update is visible.
414          */
415         t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);
416
417         return t;
418 }
419
420 /**
421  * @warning
422  * @b EXPERIMENTAL: this API may change without prior notice
423  *
424  * Update quiescent state for a reader thread.
425  *
426  * This is implemented as a lock-free function. It is multi-thread safe.
427  * All the reader threads registered to report their quiescent state
428  * on the QS variable must call this API.
429  *
430  * @param v
431  *   QS variable
432  * @param thread_id
433  *   Update the quiescent state for the reader with this thread ID.
434  */
435 __rte_experimental
436 static __rte_always_inline void
437 rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
438 {
439         uint64_t t;
440
441         RTE_ASSERT(v != NULL && thread_id < v->max_threads);
442
443         __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
444                                 v->qsbr_cnt[thread_id].lock_cnt);
445
446         /* Acquire the changes to the shared data structure released
447          * by rte_rcu_qsbr_start.
448          * Later loads of the shared data structure should not move
449          * above this load. Hence, use load-acquire.
450          */
451         t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
452
453         /* Inform the writer that updates are visible to this reader.
454          * Prior loads of the shared data structure should not move
455          * beyond this store. Hence use store-release.
456          */
457         __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
458                          t, __ATOMIC_RELEASE);
459
460         __RTE_RCU_DP_LOG(DEBUG, "%s: update: token = %"PRIu64", Thread ID = %d",
461                 __func__, t, thread_id);
462 }
463
464 /* Check the quiescent state counter for registered threads only, assuming
465  * that not all threads have registered.
466  */
467 static __rte_always_inline int
468 __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
469 {
470         uint32_t i, j, id;
471         uint64_t bmap;
472         uint64_t c;
473         uint64_t *reg_thread_id;
474
475         for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
476                 i < v->num_elems;
477                 i++, reg_thread_id++) {
478                 /* Load the current registered thread bit map before
479                  * loading the reader thread quiescent state counters.
480                  */
481                 bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE);
482                 id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
483
484                 while (bmap) {
485                         j = __builtin_ctzl(bmap);
486                         __RTE_RCU_DP_LOG(DEBUG,
487                                 "%s: check: token = %"PRIu64", wait = %d, Bit Map = 0x%"PRIx64", Thread ID = %d",
488                                 __func__, t, wait, bmap, id + j);
489                         c = __atomic_load_n(
490                                         &v->qsbr_cnt[id + j].cnt,
491                                         __ATOMIC_ACQUIRE);
492                         __RTE_RCU_DP_LOG(DEBUG,
493                                 "%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d",
494                                 __func__, t, wait, c, id+j);
495                         /* Counter is not checked for wrap-around condition
496                          * as it is a 64b counter.
497                          */
498                         if (unlikely(c !=
499                                 __RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
500                                 /* This thread is not in quiescent state */
501                                 if (!wait)
502                                         return 0;
503
504                                 rte_pause();
505                                 /* This thread might have unregistered.
506                                  * Re-read the bitmap.
507                                  */
508                                 bmap = __atomic_load_n(reg_thread_id,
509                                                 __ATOMIC_ACQUIRE);
510
511                                 continue;
512                         }
513
514                         bmap &= ~(1UL << j);
515                 }
516         }
517
518         return 1;
519 }
520
521 /* Check the quiescent state counter for all threads, assuming that
522  * all the threads have registered.
523  */
524 static __rte_always_inline int
525 __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
526 {
527         uint32_t i;
528         struct rte_rcu_qsbr_cnt *cnt;
529         uint64_t c;
530
531         for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
532                 __RTE_RCU_DP_LOG(DEBUG,
533                         "%s: check: token = %"PRIu64", wait = %d, Thread ID = %d",
534                         __func__, t, wait, i);
535                 while (1) {
536                         c = __atomic_load_n(&cnt->cnt, __ATOMIC_ACQUIRE);
537                         __RTE_RCU_DP_LOG(DEBUG,
538                                 "%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d",
539                                 __func__, t, wait, c, i);
540                         /* Counter is not checked for wrap-around condition
541                          * as it is a 64b counter.
542                          */
543                         if (likely(c == __RTE_QSBR_CNT_THR_OFFLINE || c >= t))
544                                 break;
545
546                         /* This thread is not in quiescent state */
547                         if (!wait)
548                                 return 0;
549
550                         rte_pause();
551                 }
552         }
553
554         return 1;
555 }
556
557 /**
558  * @warning
559  * @b EXPERIMENTAL: this API may change without prior notice
560  *
561  * Checks if all the reader threads have entered the quiescent state
562  * referenced by token.
563  *
564  * This is implemented as a lock-free function. It is multi-thread
565  * safe and can be called from the worker threads as well.
566  *
567  * If this API is called with 'wait' set to true, the following
568  * factors must be considered:
569  *
570  * 1) If the calling thread is also reporting the status on the
571  * same QS variable, it must update the quiescent state status, before
572  * calling this API.
573  *
574  * 2) In addition, while calling from multiple threads, only
575  * one of those threads can be reporting the quiescent state status
576  * on a given QS variable.
577  *
578  * @param v
579  *   QS variable
580  * @param t
581  *   Token returned by rte_rcu_qsbr_start API
582  * @param wait
583  *   If true, block till all the reader threads have completed entering
584  *   the quiescent state referenced by token 't'.
585  * @return
586  *   - 0 if all reader threads have NOT passed through specified number
587  *     of quiescent states.
588  *   - 1 if all reader threads have passed through specified number
589  *     of quiescent states.
590  */
591 __rte_experimental
592 static __rte_always_inline int
593 rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
594 {
595         RTE_ASSERT(v != NULL);
596
597         if (likely(v->num_threads == v->max_threads))
598                 return __rte_rcu_qsbr_check_all(v, t, wait);
599         else
600                 return __rte_rcu_qsbr_check_selective(v, t, wait);
601 }
602
603 /**
604  * @warning
605  * @b EXPERIMENTAL: this API may change without prior notice
606  *
607  * Wait till the reader threads have entered quiescent state.
608  *
609  * This is implemented as a lock-free function. It is multi-thread safe.
610  * This API can be thought of as a wrapper around rte_rcu_qsbr_start and
611  * rte_rcu_qsbr_check APIs.
612  *
613  * If this API is called from multiple threads, only one of
614  * those threads can be reporting the quiescent state status on a
615  * given QS variable.
616  *
617  * @param v
618  *   QS variable
619  * @param thread_id
620  *   Thread ID of the caller if it is registered to report quiescent state
621  *   on this QS variable (i.e. the calling thread is also part of the
622  *   readside critical section). If not, pass RTE_QSBR_THRID_INVALID.
623  */
624 __rte_experimental
625 void
626 rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id);
627
628 /**
629  * @warning
630  * @b EXPERIMENTAL: this API may change without prior notice
631  *
632  * Dump the details of a single QS variables to a file.
633  *
634  * It is NOT multi-thread safe.
635  *
636  * @param f
637  *   A pointer to a file for output
638  * @param v
639  *   QS variable
640  * @return
641  *   On success - 0
642  *   On error - 1 with error code set in rte_errno.
643  *   Possible rte_errno codes are:
644  *   - EINVAL - NULL parameters are passed
645  */
646 __rte_experimental
647 int
648 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
649
650 #ifdef __cplusplus
651 }
652 #endif
653
654 #endif /* _RTE_RCU_QSBR_H_ */