lib: remove unneeded header includes
[dpdk.git] / lib / rcu / rte_rcu_qsbr.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2018-2020 Arm Limited
4  */
5
6 #include <stdio.h>
7 #include <string.h>
8 #include <stdint.h>
9 #include <inttypes.h>
10 #include <errno.h>
11
12 #include <rte_common.h>
13 #include <rte_log.h>
14 #include <rte_memory.h>
15 #include <rte_malloc.h>
16 #include <rte_errno.h>
17 #include <rte_ring_elem.h>
18
19 #include "rte_rcu_qsbr.h"
20 #include "rcu_qsbr_pvt.h"
21
22 /* Get the memory size of QSBR variable */
23 size_t
24 rte_rcu_qsbr_get_memsize(uint32_t max_threads)
25 {
26         size_t sz;
27
28         if (max_threads == 0) {
29                 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
30                         "%s(): Invalid max_threads %u\n",
31                         __func__, max_threads);
32                 rte_errno = EINVAL;
33
34                 return 1;
35         }
36
37         sz = sizeof(struct rte_rcu_qsbr);
38
39         /* Add the size of quiescent state counter array */
40         sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads;
41
42         /* Add the size of the registered thread ID bitmap array */
43         sz += __RTE_QSBR_THRID_ARRAY_SIZE(max_threads);
44
45         return sz;
46 }
47
48 /* Initialize a quiescent state variable */
49 int
50 rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
51 {
52         size_t sz;
53
54         if (v == NULL) {
55                 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
56                         "%s(): Invalid input parameter\n", __func__);
57                 rte_errno = EINVAL;
58
59                 return 1;
60         }
61
62         sz = rte_rcu_qsbr_get_memsize(max_threads);
63         if (sz == 1)
64                 return 1;
65
66         /* Set all the threads to offline */
67         memset(v, 0, sz);
68         v->max_threads = max_threads;
69         v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads,
70                         __RTE_QSBR_THRID_ARRAY_ELM_SIZE) /
71                         __RTE_QSBR_THRID_ARRAY_ELM_SIZE;
72         v->token = __RTE_QSBR_CNT_INIT;
73         v->acked_token = __RTE_QSBR_CNT_INIT - 1;
74
75         return 0;
76 }
77
78 /* Register a reader thread to report its quiescent state
79  * on a QS variable.
80  */
81 int
82 rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
83 {
84         unsigned int i, id, success;
85         uint64_t old_bmap, new_bmap;
86
87         if (v == NULL || thread_id >= v->max_threads) {
88                 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
89                         "%s(): Invalid input parameter\n", __func__);
90                 rte_errno = EINVAL;
91
92                 return 1;
93         }
94
95         __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
96                                 v->qsbr_cnt[thread_id].lock_cnt);
97
98         id = thread_id & __RTE_QSBR_THRID_MASK;
99         i = thread_id >> __RTE_QSBR_THRID_INDEX_SHIFT;
100
101         /* Make sure that the counter for registered threads does not
102          * go out of sync. Hence, additional checks are required.
103          */
104         /* Check if the thread is already registered */
105         old_bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
106                                         __ATOMIC_RELAXED);
107         if (old_bmap & 1UL << id)
108                 return 0;
109
110         do {
111                 new_bmap = old_bmap | (1UL << id);
112                 success = __atomic_compare_exchange(
113                                         __RTE_QSBR_THRID_ARRAY_ELM(v, i),
114                                         &old_bmap, &new_bmap, 0,
115                                         __ATOMIC_RELEASE, __ATOMIC_RELAXED);
116
117                 if (success)
118                         __atomic_fetch_add(&v->num_threads,
119                                                 1, __ATOMIC_RELAXED);
120                 else if (old_bmap & (1UL << id))
121                         /* Someone else registered this thread.
122                          * Counter should not be incremented.
123                          */
124                         return 0;
125         } while (success == 0);
126
127         return 0;
128 }
129
130 /* Remove a reader thread, from the list of threads reporting their
131  * quiescent state on a QS variable.
132  */
133 int
134 rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
135 {
136         unsigned int i, id, success;
137         uint64_t old_bmap, new_bmap;
138
139         if (v == NULL || thread_id >= v->max_threads) {
140                 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
141                         "%s(): Invalid input parameter\n", __func__);
142                 rte_errno = EINVAL;
143
144                 return 1;
145         }
146
147         __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
148                                 v->qsbr_cnt[thread_id].lock_cnt);
149
150         id = thread_id & __RTE_QSBR_THRID_MASK;
151         i = thread_id >> __RTE_QSBR_THRID_INDEX_SHIFT;
152
153         /* Make sure that the counter for registered threads does not
154          * go out of sync. Hence, additional checks are required.
155          */
156         /* Check if the thread is already unregistered */
157         old_bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
158                                         __ATOMIC_RELAXED);
159         if (!(old_bmap & (1UL << id)))
160                 return 0;
161
162         do {
163                 new_bmap = old_bmap & ~(1UL << id);
164                 /* Make sure any loads of the shared data structure are
165                  * completed before removal of the thread from the list of
166                  * reporting threads.
167                  */
168                 success = __atomic_compare_exchange(
169                                         __RTE_QSBR_THRID_ARRAY_ELM(v, i),
170                                         &old_bmap, &new_bmap, 0,
171                                         __ATOMIC_RELEASE, __ATOMIC_RELAXED);
172
173                 if (success)
174                         __atomic_fetch_sub(&v->num_threads,
175                                                 1, __ATOMIC_RELAXED);
176                 else if (!(old_bmap & (1UL << id)))
177                         /* Someone else unregistered this thread.
178                          * Counter should not be incremented.
179                          */
180                         return 0;
181         } while (success == 0);
182
183         return 0;
184 }
185
186 /* Wait till the reader threads have entered quiescent state. */
187 void
188 rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)
189 {
190         uint64_t t;
191
192         RTE_ASSERT(v != NULL);
193
194         t = rte_rcu_qsbr_start(v);
195
196         /* If the current thread has readside critical section,
197          * update its quiescent state status.
198          */
199         if (thread_id != RTE_QSBR_THRID_INVALID)
200                 rte_rcu_qsbr_quiescent(v, thread_id);
201
202         /* Wait for other readers to enter quiescent state */
203         rte_rcu_qsbr_check(v, t, true);
204 }
205
206 /* Dump the details of a single quiescent state variable to a file. */
207 int
208 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
209 {
210         uint64_t bmap;
211         uint32_t i, t, id;
212
213         if (v == NULL || f == NULL) {
214                 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
215                         "%s(): Invalid input parameter\n", __func__);
216                 rte_errno = EINVAL;
217
218                 return 1;
219         }
220
221         fprintf(f, "\nQuiescent State Variable @%p\n", v);
222
223         fprintf(f, "  QS variable memory size = %zu\n",
224                                 rte_rcu_qsbr_get_memsize(v->max_threads));
225         fprintf(f, "  Given # max threads = %u\n", v->max_threads);
226         fprintf(f, "  Current # threads = %u\n", v->num_threads);
227
228         fprintf(f, "  Registered thread IDs = ");
229         for (i = 0; i < v->num_elems; i++) {
230                 bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
231                                         __ATOMIC_ACQUIRE);
232                 id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
233                 while (bmap) {
234                         t = __builtin_ctzl(bmap);
235                         fprintf(f, "%u ", id + t);
236
237                         bmap &= ~(1UL << t);
238                 }
239         }
240
241         fprintf(f, "\n");
242
243         fprintf(f, "  Token = %" PRIu64 "\n",
244                         __atomic_load_n(&v->token, __ATOMIC_ACQUIRE));
245
246         fprintf(f, "  Least Acknowledged Token = %" PRIu64 "\n",
247                         __atomic_load_n(&v->acked_token, __ATOMIC_ACQUIRE));
248
249         fprintf(f, "Quiescent State Counts for readers:\n");
250         for (i = 0; i < v->num_elems; i++) {
251                 bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
252                                         __ATOMIC_ACQUIRE);
253                 id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
254                 while (bmap) {
255                         t = __builtin_ctzl(bmap);
256                         fprintf(f, "thread ID = %u, count = %" PRIu64 ", lock count = %u\n",
257                                 id + t,
258                                 __atomic_load_n(
259                                         &v->qsbr_cnt[id + t].cnt,
260                                         __ATOMIC_RELAXED),
261                                 __atomic_load_n(
262                                         &v->qsbr_cnt[id + t].lock_cnt,
263                                         __ATOMIC_RELAXED));
264                         bmap &= ~(1UL << t);
265                 }
266         }
267
268         return 0;
269 }
270
271 /* Create a queue used to store the data structure elements that can
272  * be freed later. This queue is referred to as 'defer queue'.
273  */
274 struct rte_rcu_qsbr_dq *
275 rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
276 {
277         struct rte_rcu_qsbr_dq *dq;
278         uint32_t qs_fifo_size;
279         unsigned int flags;
280
281         if (params == NULL || params->free_fn == NULL ||
282                 params->v == NULL || params->name == NULL ||
283                 params->size == 0 || params->esize == 0 ||
284                 (params->esize % 4 != 0)) {
285                 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
286                         "%s(): Invalid input parameter\n", __func__);
287                 rte_errno = EINVAL;
288
289                 return NULL;
290         }
291         /* If auto reclamation is configured, reclaim limit
292          * should be a valid value.
293          */
294         if ((params->trigger_reclaim_limit <= params->size) &&
295             (params->max_reclaim_size == 0)) {
296                 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
297                         "%s(): Invalid input parameter, size = %u, trigger_reclaim_limit = %u, max_reclaim_size = %u\n",
298                         __func__, params->size, params->trigger_reclaim_limit,
299                         params->max_reclaim_size);
300                 rte_errno = EINVAL;
301
302                 return NULL;
303         }
304
305         dq = rte_zmalloc(NULL, sizeof(struct rte_rcu_qsbr_dq),
306                          RTE_CACHE_LINE_SIZE);
307         if (dq == NULL) {
308                 rte_errno = ENOMEM;
309
310                 return NULL;
311         }
312
313         /* Decide the flags for the ring.
314          * If MT safety is requested, use RTS for ring enqueue as most
315          * use cases involve dq-enqueue happening on the control plane.
316          * Ring dequeue is always HTS due to the possibility of revert.
317          */
318         flags = RING_F_MP_RTS_ENQ;
319         if (params->flags & RTE_RCU_QSBR_DQ_MT_UNSAFE)
320                 flags = RING_F_SP_ENQ;
321         flags |= RING_F_MC_HTS_DEQ;
322         /* round up qs_fifo_size to next power of two that is not less than
323          * max_size.
324          */
325         qs_fifo_size = rte_align32pow2(params->size + 1);
326         /* Add token size to ring element size */
327         dq->r = rte_ring_create_elem(params->name,
328                         __RTE_QSBR_TOKEN_SIZE + params->esize,
329                         qs_fifo_size, SOCKET_ID_ANY, flags);
330         if (dq->r == NULL) {
331                 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
332                         "%s(): defer queue create failed\n", __func__);
333                 rte_free(dq);
334                 return NULL;
335         }
336
337         dq->v = params->v;
338         dq->size = params->size;
339         dq->esize = __RTE_QSBR_TOKEN_SIZE + params->esize;
340         dq->trigger_reclaim_limit = params->trigger_reclaim_limit;
341         dq->max_reclaim_size = params->max_reclaim_size;
342         dq->free_fn = params->free_fn;
343         dq->p = params->p;
344
345         return dq;
346 }
347
348 /* Enqueue one resource to the defer queue to free after the grace
349  * period is over.
350  */
351 int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
352 {
353         __rte_rcu_qsbr_dq_elem_t *dq_elem;
354         uint32_t cur_size;
355
356         if (dq == NULL || e == NULL) {
357                 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
358                         "%s(): Invalid input parameter\n", __func__);
359                 rte_errno = EINVAL;
360
361                 return 1;
362         }
363
364         char data[dq->esize];
365         dq_elem = (__rte_rcu_qsbr_dq_elem_t *)data;
366         /* Start the grace period */
367         dq_elem->token = rte_rcu_qsbr_start(dq->v);
368
369         /* Reclaim resources if the queue size has hit the reclaim
370          * limit. This helps the queue from growing too large and
371          * allows time for reader threads to report their quiescent state.
372          */
373         cur_size = rte_ring_count(dq->r);
374         if (cur_size > dq->trigger_reclaim_limit) {
375                 rte_log(RTE_LOG_INFO, rte_rcu_log_type,
376                         "%s(): Triggering reclamation\n", __func__);
377                 rte_rcu_qsbr_dq_reclaim(dq, dq->max_reclaim_size,
378                                                 NULL, NULL, NULL);
379         }
380
381         /* Enqueue the token and resource. Generating the token and
382          * enqueuing (token + resource) on the queue is not an
383          * atomic operation. When the defer queue is shared by multiple
384          * writers, this might result in tokens enqueued out of order
385          * on the queue. So, some tokens might wait longer than they
386          * are required to be reclaimed.
387          */
388         memcpy(dq_elem->elem, e, dq->esize - __RTE_QSBR_TOKEN_SIZE);
389         /* Check the status as enqueue might fail since the other threads
390          * might have used up the freed space.
391          * Enqueue uses the configured flags when the DQ was created.
392          */
393         if (rte_ring_enqueue_elem(dq->r, data, dq->esize) != 0) {
394                 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
395                         "%s(): Enqueue failed\n", __func__);
396                 /* Note that the token generated above is not used.
397                  * Other than wasting tokens, it should not cause any
398                  * other issues.
399                  */
400                 rte_log(RTE_LOG_INFO, rte_rcu_log_type,
401                         "%s(): Skipped enqueuing token = %" PRIu64 "\n",
402                         __func__, dq_elem->token);
403
404                 rte_errno = ENOSPC;
405                 return 1;
406         }
407
408         rte_log(RTE_LOG_INFO, rte_rcu_log_type,
409                 "%s(): Enqueued token = %" PRIu64 "\n",
410                 __func__, dq_elem->token);
411
412         return 0;
413 }
414
415 /* Reclaim resources from the defer queue. */
416 int
417 rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
418                         unsigned int *freed, unsigned int *pending,
419                         unsigned int *available)
420 {
421         uint32_t cnt;
422         __rte_rcu_qsbr_dq_elem_t *dq_elem;
423
424         if (dq == NULL || n == 0) {
425                 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
426                         "%s(): Invalid input parameter\n", __func__);
427                 rte_errno = EINVAL;
428
429                 return 1;
430         }
431
432         cnt = 0;
433
434         char data[dq->esize];
435         /* Check reader threads quiescent state and reclaim resources */
436         while (cnt < n &&
437                 rte_ring_dequeue_bulk_elem_start(dq->r, &data,
438                                         dq->esize, 1, available) != 0) {
439                 dq_elem = (__rte_rcu_qsbr_dq_elem_t *)data;
440
441                 /* Reclaim the resource */
442                 if (rte_rcu_qsbr_check(dq->v, dq_elem->token, false) != 1) {
443                         rte_ring_dequeue_elem_finish(dq->r, 0);
444                         break;
445                 }
446                 rte_ring_dequeue_elem_finish(dq->r, 1);
447
448                 rte_log(RTE_LOG_INFO, rte_rcu_log_type,
449                         "%s(): Reclaimed token = %" PRIu64 "\n",
450                         __func__, dq_elem->token);
451
452                 dq->free_fn(dq->p, dq_elem->elem, 1);
453
454                 cnt++;
455         }
456
457         rte_log(RTE_LOG_INFO, rte_rcu_log_type,
458                 "%s(): Reclaimed %u resources\n", __func__, cnt);
459
460         if (freed != NULL)
461                 *freed = cnt;
462         if (pending != NULL)
463                 *pending = rte_ring_count(dq->r);
464
465         return 0;
466 }
467
468 /* Delete a defer queue. */
469 int
470 rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
471 {
472         unsigned int pending;
473
474         if (dq == NULL) {
475                 rte_log(RTE_LOG_DEBUG, rte_rcu_log_type,
476                         "%s(): Invalid input parameter\n", __func__);
477
478                 return 0;
479         }
480
481         /* Reclaim all the resources */
482         rte_rcu_qsbr_dq_reclaim(dq, ~0, NULL, &pending, NULL);
483         if (pending != 0) {
484                 rte_errno = EAGAIN;
485
486                 return 1;
487         }
488
489         rte_ring_free(dq->r);
490         rte_free(dq);
491
492         return 0;
493 }
494
495 RTE_LOG_REGISTER_DEFAULT(rte_rcu_log_type, ERR);