test mbuf attach
[dpdk.git] / lib / librte_rcu / rte_rcu_qsbr.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2018-2020 Arm Limited
4  */
5
6 #include <stdio.h>
7 #include <string.h>
8 #include <stdint.h>
9 #include <inttypes.h>
10 #include <errno.h>
11
12 #include <rte_common.h>
13 #include <rte_log.h>
14 #include <rte_memory.h>
15 #include <rte_malloc.h>
16 #include <rte_eal.h>
17 #include <rte_atomic.h>
18 #include <rte_per_lcore.h>
19 #include <rte_lcore.h>
20 #include <rte_errno.h>
21 #include <rte_ring_elem.h>
22
23 #include "rte_rcu_qsbr.h"
24 #include "rcu_qsbr_pvt.h"
25
26 /* Get the memory size of QSBR variable */
27 size_t
28 rte_rcu_qsbr_get_memsize(uint32_t max_threads)
29 {
30         size_t sz;
31
32         if (max_threads == 0) {
33                 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
34                         "%s(): Invalid max_threads %u\n",
35                         __func__, max_threads);
36                 rte_errno = EINVAL;
37
38                 return 1;
39         }
40
41         sz = sizeof(struct rte_rcu_qsbr);
42
43         /* Add the size of quiescent state counter array */
44         sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads;
45
46         /* Add the size of the registered thread ID bitmap array */
47         sz += __RTE_QSBR_THRID_ARRAY_SIZE(max_threads);
48
49         return sz;
50 }
51
52 /* Initialize a quiescent state variable */
53 int
54 rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
55 {
56         size_t sz;
57
58         if (v == NULL) {
59                 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
60                         "%s(): Invalid input parameter\n", __func__);
61                 rte_errno = EINVAL;
62
63                 return 1;
64         }
65
66         sz = rte_rcu_qsbr_get_memsize(max_threads);
67         if (sz == 1)
68                 return 1;
69
70         /* Set all the threads to offline */
71         memset(v, 0, sz);
72         v->max_threads = max_threads;
73         v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads,
74                         __RTE_QSBR_THRID_ARRAY_ELM_SIZE) /
75                         __RTE_QSBR_THRID_ARRAY_ELM_SIZE;
76         v->token = __RTE_QSBR_CNT_INIT;
77         v->acked_token = __RTE_QSBR_CNT_INIT - 1;
78
79         return 0;
80 }
81
82 /* Register a reader thread to report its quiescent state
83  * on a QS variable.
84  */
85 int
86 rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
87 {
88         unsigned int i, id, success;
89         uint64_t old_bmap, new_bmap;
90
91         if (v == NULL || thread_id >= v->max_threads) {
92                 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
93                         "%s(): Invalid input parameter\n", __func__);
94                 rte_errno = EINVAL;
95
96                 return 1;
97         }
98
99         __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
100                                 v->qsbr_cnt[thread_id].lock_cnt);
101
102         id = thread_id & __RTE_QSBR_THRID_MASK;
103         i = thread_id >> __RTE_QSBR_THRID_INDEX_SHIFT;
104
105         /* Make sure that the counter for registered threads does not
106          * go out of sync. Hence, additional checks are required.
107          */
108         /* Check if the thread is already registered */
109         old_bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
110                                         __ATOMIC_RELAXED);
111         if (old_bmap & 1UL << id)
112                 return 0;
113
114         do {
115                 new_bmap = old_bmap | (1UL << id);
116                 success = __atomic_compare_exchange(
117                                         __RTE_QSBR_THRID_ARRAY_ELM(v, i),
118                                         &old_bmap, &new_bmap, 0,
119                                         __ATOMIC_RELEASE, __ATOMIC_RELAXED);
120
121                 if (success)
122                         __atomic_fetch_add(&v->num_threads,
123                                                 1, __ATOMIC_RELAXED);
124                 else if (old_bmap & (1UL << id))
125                         /* Someone else registered this thread.
126                          * Counter should not be incremented.
127                          */
128                         return 0;
129         } while (success == 0);
130
131         return 0;
132 }
133
134 /* Remove a reader thread, from the list of threads reporting their
135  * quiescent state on a QS variable.
136  */
137 int
138 rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
139 {
140         unsigned int i, id, success;
141         uint64_t old_bmap, new_bmap;
142
143         if (v == NULL || thread_id >= v->max_threads) {
144                 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
145                         "%s(): Invalid input parameter\n", __func__);
146                 rte_errno = EINVAL;
147
148                 return 1;
149         }
150
151         __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
152                                 v->qsbr_cnt[thread_id].lock_cnt);
153
154         id = thread_id & __RTE_QSBR_THRID_MASK;
155         i = thread_id >> __RTE_QSBR_THRID_INDEX_SHIFT;
156
157         /* Make sure that the counter for registered threads does not
158          * go out of sync. Hence, additional checks are required.
159          */
160         /* Check if the thread is already unregistered */
161         old_bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
162                                         __ATOMIC_RELAXED);
163         if (!(old_bmap & (1UL << id)))
164                 return 0;
165
166         do {
167                 new_bmap = old_bmap & ~(1UL << id);
168                 /* Make sure any loads of the shared data structure are
169                  * completed before removal of the thread from the list of
170                  * reporting threads.
171                  */
172                 success = __atomic_compare_exchange(
173                                         __RTE_QSBR_THRID_ARRAY_ELM(v, i),
174                                         &old_bmap, &new_bmap, 0,
175                                         __ATOMIC_RELEASE, __ATOMIC_RELAXED);
176
177                 if (success)
178                         __atomic_fetch_sub(&v->num_threads,
179                                                 1, __ATOMIC_RELAXED);
180                 else if (!(old_bmap & (1UL << id)))
181                         /* Someone else unregistered this thread.
182                          * Counter should not be incremented.
183                          */
184                         return 0;
185         } while (success == 0);
186
187         return 0;
188 }
189
190 /* Wait till the reader threads have entered quiescent state. */
191 void
192 rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)
193 {
194         uint64_t t;
195
196         RTE_ASSERT(v != NULL);
197
198         t = rte_rcu_qsbr_start(v);
199
200         /* If the current thread has readside critical section,
201          * update its quiescent state status.
202          */
203         if (thread_id != RTE_QSBR_THRID_INVALID)
204                 rte_rcu_qsbr_quiescent(v, thread_id);
205
206         /* Wait for other readers to enter quiescent state */
207         rte_rcu_qsbr_check(v, t, true);
208 }
209
210 /* Dump the details of a single quiescent state variable to a file. */
211 int
212 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
213 {
214         uint64_t bmap;
215         uint32_t i, t, id;
216
217         if (v == NULL || f == NULL) {
218                 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
219                         "%s(): Invalid input parameter\n", __func__);
220                 rte_errno = EINVAL;
221
222                 return 1;
223         }
224
225         fprintf(f, "\nQuiescent State Variable @%p\n", v);
226
227         fprintf(f, "  QS variable memory size = %zu\n",
228                                 rte_rcu_qsbr_get_memsize(v->max_threads));
229         fprintf(f, "  Given # max threads = %u\n", v->max_threads);
230         fprintf(f, "  Current # threads = %u\n", v->num_threads);
231
232         fprintf(f, "  Registered thread IDs = ");
233         for (i = 0; i < v->num_elems; i++) {
234                 bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
235                                         __ATOMIC_ACQUIRE);
236                 id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
237                 while (bmap) {
238                         t = __builtin_ctzl(bmap);
239                         fprintf(f, "%u ", id + t);
240
241                         bmap &= ~(1UL << t);
242                 }
243         }
244
245         fprintf(f, "\n");
246
247         fprintf(f, "  Token = %"PRIu64"\n",
248                         __atomic_load_n(&v->token, __ATOMIC_ACQUIRE));
249
250         fprintf(f, "  Least Acknowledged Token = %"PRIu64"\n",
251                         __atomic_load_n(&v->acked_token, __ATOMIC_ACQUIRE));
252
253         fprintf(f, "Quiescent State Counts for readers:\n");
254         for (i = 0; i < v->num_elems; i++) {
255                 bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
256                                         __ATOMIC_ACQUIRE);
257                 id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
258                 while (bmap) {
259                         t = __builtin_ctzl(bmap);
260                         fprintf(f, "thread ID = %u, count = %"PRIu64", lock count = %u\n",
261                                 id + t,
262                                 __atomic_load_n(
263                                         &v->qsbr_cnt[id + t].cnt,
264                                         __ATOMIC_RELAXED),
265                                 __atomic_load_n(
266                                         &v->qsbr_cnt[id + t].lock_cnt,
267                                         __ATOMIC_RELAXED));
268                         bmap &= ~(1UL << t);
269                 }
270         }
271
272         return 0;
273 }
274
275 /* Create a queue used to store the data structure elements that can
276  * be freed later. This queue is referred to as 'defer queue'.
277  */
278 struct rte_rcu_qsbr_dq *
279 rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
280 {
281         struct rte_rcu_qsbr_dq *dq;
282         uint32_t qs_fifo_size;
283         unsigned int flags;
284
285         if (params == NULL || params->free_fn == NULL ||
286                 params->v == NULL || params->name == NULL ||
287                 params->size == 0 || params->esize == 0 ||
288                 (params->esize % 4 != 0)) {
289                 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
290                         "%s(): Invalid input parameter\n", __func__);
291                 rte_errno = EINVAL;
292
293                 return NULL;
294         }
295         /* If auto reclamation is configured, reclaim limit
296          * should be a valid value.
297          */
298         if ((params->trigger_reclaim_limit <= params->size) &&
299             (params->max_reclaim_size == 0)) {
300                 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
301                         "%s(): Invalid input parameter, size = %u, trigger_reclaim_limit = %u, max_reclaim_size = %u\n",
302                         __func__, params->size, params->trigger_reclaim_limit,
303                         params->max_reclaim_size);
304                 rte_errno = EINVAL;
305
306                 return NULL;
307         }
308
309         dq = rte_zmalloc(NULL, sizeof(struct rte_rcu_qsbr_dq),
310                          RTE_CACHE_LINE_SIZE);
311         if (dq == NULL) {
312                 rte_errno = ENOMEM;
313
314                 return NULL;
315         }
316
317         /* Decide the flags for the ring.
318          * If MT safety is requested, use RTS for ring enqueue as most
319          * use cases involve dq-enqueue happening on the control plane.
320          * Ring dequeue is always HTS due to the possibility of revert.
321          */
322         flags = RING_F_MP_RTS_ENQ;
323         if (params->flags & RTE_RCU_QSBR_DQ_MT_UNSAFE)
324                 flags = RING_F_SP_ENQ;
325         flags |= RING_F_MC_HTS_DEQ;
326         /* round up qs_fifo_size to next power of two that is not less than
327          * max_size.
328          */
329         qs_fifo_size = rte_align32pow2(params->size + 1);
330         /* Add token size to ring element size */
331         dq->r = rte_ring_create_elem(params->name,
332                         __RTE_QSBR_TOKEN_SIZE + params->esize,
333                         qs_fifo_size, SOCKET_ID_ANY, flags);
334         if (dq->r == NULL) {
335                 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
336                         "%s(): defer queue create failed\n", __func__);
337                 rte_free(dq);
338                 return NULL;
339         }
340
341         dq->v = params->v;
342         dq->size = params->size;
343         dq->esize = __RTE_QSBR_TOKEN_SIZE + params->esize;
344         dq->trigger_reclaim_limit = params->trigger_reclaim_limit;
345         dq->max_reclaim_size = params->max_reclaim_size;
346         dq->free_fn = params->free_fn;
347         dq->p = params->p;
348
349         return dq;
350 }
351
352 /* Enqueue one resource to the defer queue to free after the grace
353  * period is over.
354  */
355 int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
356 {
357         __rte_rcu_qsbr_dq_elem_t *dq_elem;
358         uint32_t cur_size;
359
360         if (dq == NULL || e == NULL) {
361                 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
362                         "%s(): Invalid input parameter\n", __func__);
363                 rte_errno = EINVAL;
364
365                 return 1;
366         }
367
368         char data[dq->esize];
369         dq_elem = (__rte_rcu_qsbr_dq_elem_t *)data;
370         /* Start the grace period */
371         dq_elem->token = rte_rcu_qsbr_start(dq->v);
372
373         /* Reclaim resources if the queue size has hit the reclaim
374          * limit. This helps the queue from growing too large and
375          * allows time for reader threads to report their quiescent state.
376          */
377         cur_size = rte_ring_count(dq->r);
378         if (cur_size > dq->trigger_reclaim_limit) {
379                 rte_log(RTE_LOG_INFO, rte_rcu_log_type,
380                         "%s(): Triggering reclamation\n", __func__);
381                 rte_rcu_qsbr_dq_reclaim(dq, dq->max_reclaim_size,
382                                                 NULL, NULL, NULL);
383         }
384
385         /* Enqueue the token and resource. Generating the token and
386          * enqueuing (token + resource) on the queue is not an
387          * atomic operation. When the defer queue is shared by multiple
388          * writers, this might result in tokens enqueued out of order
389          * on the queue. So, some tokens might wait longer than they
390          * are required to be reclaimed.
391          */
392         memcpy(dq_elem->elem, e, dq->esize - __RTE_QSBR_TOKEN_SIZE);
393         /* Check the status as enqueue might fail since the other threads
394          * might have used up the freed space.
395          * Enqueue uses the configured flags when the DQ was created.
396          */
397         if (rte_ring_enqueue_elem(dq->r, data, dq->esize) != 0) {
398                 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
399                         "%s(): Enqueue failed\n", __func__);
400                 /* Note that the token generated above is not used.
401                  * Other than wasting tokens, it should not cause any
402                  * other issues.
403                  */
404                 rte_log(RTE_LOG_INFO, rte_rcu_log_type,
405                         "%s(): Skipped enqueuing token = %"PRIu64"\n",
406                         __func__, dq_elem->token);
407
408                 rte_errno = ENOSPC;
409                 return 1;
410         }
411
412         rte_log(RTE_LOG_INFO, rte_rcu_log_type,
413                 "%s(): Enqueued token = %"PRIu64"\n", __func__, dq_elem->token);
414
415         return 0;
416 }
417
418 /* Reclaim resources from the defer queue. */
419 int
420 rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
421                         unsigned int *freed, unsigned int *pending,
422                         unsigned int *available)
423 {
424         uint32_t cnt;
425         __rte_rcu_qsbr_dq_elem_t *dq_elem;
426
427         if (dq == NULL || n == 0) {
428                 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
429                         "%s(): Invalid input parameter\n", __func__);
430                 rte_errno = EINVAL;
431
432                 return 1;
433         }
434
435         cnt = 0;
436
437         char data[dq->esize];
438         /* Check reader threads quiescent state and reclaim resources */
439         while (cnt < n &&
440                 rte_ring_dequeue_bulk_elem_start(dq->r, &data,
441                                         dq->esize, 1, available) != 0) {
442                 dq_elem = (__rte_rcu_qsbr_dq_elem_t *)data;
443
444                 /* Reclaim the resource */
445                 if (rte_rcu_qsbr_check(dq->v, dq_elem->token, false) != 1) {
446                         rte_ring_dequeue_elem_finish(dq->r, 0);
447                         break;
448                 }
449                 rte_ring_dequeue_elem_finish(dq->r, 1);
450
451                 rte_log(RTE_LOG_INFO, rte_rcu_log_type,
452                         "%s(): Reclaimed token = %"PRIu64"\n",
453                         __func__, dq_elem->token);
454
455                 dq->free_fn(dq->p, dq_elem->elem, 1);
456
457                 cnt++;
458         }
459
460         rte_log(RTE_LOG_INFO, rte_rcu_log_type,
461                 "%s(): Reclaimed %u resources\n", __func__, cnt);
462
463         if (freed != NULL)
464                 *freed = cnt;
465         if (pending != NULL)
466                 *pending = rte_ring_count(dq->r);
467
468         return 0;
469 }
470
471 /* Delete a defer queue. */
472 int
473 rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
474 {
475         unsigned int pending;
476
477         if (dq == NULL) {
478                 rte_log(RTE_LOG_DEBUG, rte_rcu_log_type,
479                         "%s(): Invalid input parameter\n", __func__);
480
481                 return 0;
482         }
483
484         /* Reclaim all the resources */
485         rte_rcu_qsbr_dq_reclaim(dq, ~0, NULL, &pending, NULL);
486         if (pending != 0) {
487                 rte_errno = EAGAIN;
488
489                 return 1;
490         }
491
492         rte_ring_free(dq->r);
493         rte_free(dq);
494
495         return 0;
496 }
497
498 RTE_LOG_REGISTER(rte_rcu_log_type, lib.rcu, ERR);