rcu: add least acknowledged token optimization
[dpdk.git] / lib / librte_rcu / rte_rcu_qsbr.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2018 Arm Limited
4  */
5
6 #include <stdio.h>
7 #include <string.h>
8 #include <stdint.h>
9 #include <inttypes.h>
10 #include <errno.h>
11
12 #include <rte_common.h>
13 #include <rte_log.h>
14 #include <rte_memory.h>
15 #include <rte_malloc.h>
16 #include <rte_eal.h>
17 #include <rte_atomic.h>
18 #include <rte_per_lcore.h>
19 #include <rte_lcore.h>
20 #include <rte_errno.h>
21
22 #include "rte_rcu_qsbr.h"
23
24 /* Get the memory size of QSBR variable */
25 size_t
26 rte_rcu_qsbr_get_memsize(uint32_t max_threads)
27 {
28         size_t sz;
29
30         if (max_threads == 0) {
31                 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
32                         "%s(): Invalid max_threads %u\n",
33                         __func__, max_threads);
34                 rte_errno = EINVAL;
35
36                 return 1;
37         }
38
39         sz = sizeof(struct rte_rcu_qsbr);
40
41         /* Add the size of quiescent state counter array */
42         sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads;
43
44         /* Add the size of the registered thread ID bitmap array */
45         sz += __RTE_QSBR_THRID_ARRAY_SIZE(max_threads);
46
47         return sz;
48 }
49
50 /* Initialize a quiescent state variable */
51 int
52 rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
53 {
54         size_t sz;
55
56         if (v == NULL) {
57                 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
58                         "%s(): Invalid input parameter\n", __func__);
59                 rte_errno = EINVAL;
60
61                 return 1;
62         }
63
64         sz = rte_rcu_qsbr_get_memsize(max_threads);
65         if (sz == 1)
66                 return 1;
67
68         /* Set all the threads to offline */
69         memset(v, 0, sz);
70         v->max_threads = max_threads;
71         v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads,
72                         __RTE_QSBR_THRID_ARRAY_ELM_SIZE) /
73                         __RTE_QSBR_THRID_ARRAY_ELM_SIZE;
74         v->token = __RTE_QSBR_CNT_INIT;
75         v->acked_token = __RTE_QSBR_CNT_INIT - 1;
76
77         return 0;
78 }
79
80 /* Register a reader thread to report its quiescent state
81  * on a QS variable.
82  */
83 int
84 rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
85 {
86         unsigned int i, id, success;
87         uint64_t old_bmap, new_bmap;
88
89         if (v == NULL || thread_id >= v->max_threads) {
90                 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
91                         "%s(): Invalid input parameter\n", __func__);
92                 rte_errno = EINVAL;
93
94                 return 1;
95         }
96
97         __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
98                                 v->qsbr_cnt[thread_id].lock_cnt);
99
100         id = thread_id & __RTE_QSBR_THRID_MASK;
101         i = thread_id >> __RTE_QSBR_THRID_INDEX_SHIFT;
102
103         /* Make sure that the counter for registered threads does not
104          * go out of sync. Hence, additional checks are required.
105          */
106         /* Check if the thread is already registered */
107         old_bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
108                                         __ATOMIC_RELAXED);
109         if (old_bmap & 1UL << id)
110                 return 0;
111
112         do {
113                 new_bmap = old_bmap | (1UL << id);
114                 success = __atomic_compare_exchange(
115                                         __RTE_QSBR_THRID_ARRAY_ELM(v, i),
116                                         &old_bmap, &new_bmap, 0,
117                                         __ATOMIC_RELEASE, __ATOMIC_RELAXED);
118
119                 if (success)
120                         __atomic_fetch_add(&v->num_threads,
121                                                 1, __ATOMIC_RELAXED);
122                 else if (old_bmap & (1UL << id))
123                         /* Someone else registered this thread.
124                          * Counter should not be incremented.
125                          */
126                         return 0;
127         } while (success == 0);
128
129         return 0;
130 }
131
132 /* Remove a reader thread, from the list of threads reporting their
133  * quiescent state on a QS variable.
134  */
135 int
136 rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
137 {
138         unsigned int i, id, success;
139         uint64_t old_bmap, new_bmap;
140
141         if (v == NULL || thread_id >= v->max_threads) {
142                 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
143                         "%s(): Invalid input parameter\n", __func__);
144                 rte_errno = EINVAL;
145
146                 return 1;
147         }
148
149         __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
150                                 v->qsbr_cnt[thread_id].lock_cnt);
151
152         id = thread_id & __RTE_QSBR_THRID_MASK;
153         i = thread_id >> __RTE_QSBR_THRID_INDEX_SHIFT;
154
155         /* Make sure that the counter for registered threads does not
156          * go out of sync. Hence, additional checks are required.
157          */
158         /* Check if the thread is already unregistered */
159         old_bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
160                                         __ATOMIC_RELAXED);
161         if (!(old_bmap & (1UL << id)))
162                 return 0;
163
164         do {
165                 new_bmap = old_bmap & ~(1UL << id);
166                 /* Make sure any loads of the shared data structure are
167                  * completed before removal of the thread from the list of
168                  * reporting threads.
169                  */
170                 success = __atomic_compare_exchange(
171                                         __RTE_QSBR_THRID_ARRAY_ELM(v, i),
172                                         &old_bmap, &new_bmap, 0,
173                                         __ATOMIC_RELEASE, __ATOMIC_RELAXED);
174
175                 if (success)
176                         __atomic_fetch_sub(&v->num_threads,
177                                                 1, __ATOMIC_RELAXED);
178                 else if (!(old_bmap & (1UL << id)))
179                         /* Someone else unregistered this thread.
180                          * Counter should not be incremented.
181                          */
182                         return 0;
183         } while (success == 0);
184
185         return 0;
186 }
187
188 /* Wait till the reader threads have entered quiescent state. */
189 void
190 rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)
191 {
192         uint64_t t;
193
194         RTE_ASSERT(v != NULL);
195
196         t = rte_rcu_qsbr_start(v);
197
198         /* If the current thread has readside critical section,
199          * update its quiescent state status.
200          */
201         if (thread_id != RTE_QSBR_THRID_INVALID)
202                 rte_rcu_qsbr_quiescent(v, thread_id);
203
204         /* Wait for other readers to enter quiescent state */
205         rte_rcu_qsbr_check(v, t, true);
206 }
207
208 /* Dump the details of a single quiescent state variable to a file. */
209 int
210 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
211 {
212         uint64_t bmap;
213         uint32_t i, t, id;
214
215         if (v == NULL || f == NULL) {
216                 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
217                         "%s(): Invalid input parameter\n", __func__);
218                 rte_errno = EINVAL;
219
220                 return 1;
221         }
222
223         fprintf(f, "\nQuiescent State Variable @%p\n", v);
224
225         fprintf(f, "  QS variable memory size = %zu\n",
226                                 rte_rcu_qsbr_get_memsize(v->max_threads));
227         fprintf(f, "  Given # max threads = %u\n", v->max_threads);
228         fprintf(f, "  Current # threads = %u\n", v->num_threads);
229
230         fprintf(f, "  Registered thread IDs = ");
231         for (i = 0; i < v->num_elems; i++) {
232                 bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
233                                         __ATOMIC_ACQUIRE);
234                 id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
235                 while (bmap) {
236                         t = __builtin_ctzl(bmap);
237                         fprintf(f, "%u ", id + t);
238
239                         bmap &= ~(1UL << t);
240                 }
241         }
242
243         fprintf(f, "\n");
244
245         fprintf(f, "  Token = %"PRIu64"\n",
246                         __atomic_load_n(&v->token, __ATOMIC_ACQUIRE));
247
248         fprintf(f, "  Least Acknowledged Token = %"PRIu64"\n",
249                         __atomic_load_n(&v->acked_token, __ATOMIC_ACQUIRE));
250
251         fprintf(f, "Quiescent State Counts for readers:\n");
252         for (i = 0; i < v->num_elems; i++) {
253                 bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
254                                         __ATOMIC_ACQUIRE);
255                 id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
256                 while (bmap) {
257                         t = __builtin_ctzl(bmap);
258                         fprintf(f, "thread ID = %u, count = %"PRIu64", lock count = %u\n",
259                                 id + t,
260                                 __atomic_load_n(
261                                         &v->qsbr_cnt[id + t].cnt,
262                                         __ATOMIC_RELAXED),
263                                 __atomic_load_n(
264                                         &v->qsbr_cnt[id + t].lock_cnt,
265                                         __ATOMIC_RELAXED));
266                         bmap &= ~(1UL << t);
267                 }
268         }
269
270         return 0;
271 }
272
273 int rte_rcu_log_type;
274
275 RTE_INIT(rte_rcu_register)
276 {
277         rte_rcu_log_type = rte_log_register("lib.rcu");
278         if (rte_rcu_log_type >= 0)
279                 rte_log_set_level(rte_rcu_log_type, RTE_LOG_ERR);
280 }