1 /* SPDX-License-Identifier: BSD-3-Clause
3 * Copyright (c) 2018-2020 Arm Limited
12 #include <rte_common.h>
14 #include <rte_memory.h>
15 #include <rte_malloc.h>
16 #include <rte_errno.h>
17 #include <rte_ring_elem.h>
19 #include "rte_rcu_qsbr.h"
20 #include "rcu_qsbr_pvt.h"
22 /* Get the memory size of QSBR variable */
24 rte_rcu_qsbr_get_memsize(uint32_t max_threads)
28 if (max_threads == 0) {
29 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
30 "%s(): Invalid max_threads %u\n",
31 __func__, max_threads);
37 sz = sizeof(struct rte_rcu_qsbr);
39 /* Add the size of quiescent state counter array */
40 sz += sizeof(struct rte_rcu_qsbr_cnt) * max_threads;
42 /* Add the size of the registered thread ID bitmap array */
43 sz += __RTE_QSBR_THRID_ARRAY_SIZE(max_threads);
48 /* Initialize a quiescent state variable */
50 rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
55 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
56 "%s(): Invalid input parameter\n", __func__);
62 sz = rte_rcu_qsbr_get_memsize(max_threads);
66 /* Set all the threads to offline */
68 v->max_threads = max_threads;
69 v->num_elems = RTE_ALIGN_MUL_CEIL(max_threads,
70 __RTE_QSBR_THRID_ARRAY_ELM_SIZE) /
71 __RTE_QSBR_THRID_ARRAY_ELM_SIZE;
72 v->token = __RTE_QSBR_CNT_INIT;
73 v->acked_token = __RTE_QSBR_CNT_INIT - 1;
78 /* Register a reader thread to report its quiescent state
82 rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
84 unsigned int i, id, success;
85 uint64_t old_bmap, new_bmap;
87 if (v == NULL || thread_id >= v->max_threads) {
88 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
89 "%s(): Invalid input parameter\n", __func__);
95 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
96 v->qsbr_cnt[thread_id].lock_cnt);
98 id = thread_id & __RTE_QSBR_THRID_MASK;
99 i = thread_id >> __RTE_QSBR_THRID_INDEX_SHIFT;
101 /* Make sure that the counter for registered threads does not
102 * go out of sync. Hence, additional checks are required.
104 /* Check if the thread is already registered */
105 old_bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
107 if (old_bmap & 1UL << id)
111 new_bmap = old_bmap | (1UL << id);
112 success = __atomic_compare_exchange(
113 __RTE_QSBR_THRID_ARRAY_ELM(v, i),
114 &old_bmap, &new_bmap, 0,
115 __ATOMIC_RELEASE, __ATOMIC_RELAXED);
118 __atomic_fetch_add(&v->num_threads,
119 1, __ATOMIC_RELAXED);
120 else if (old_bmap & (1UL << id))
121 /* Someone else registered this thread.
122 * Counter should not be incremented.
125 } while (success == 0);
130 /* Remove a reader thread, from the list of threads reporting their
131 * quiescent state on a QS variable.
134 rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
136 unsigned int i, id, success;
137 uint64_t old_bmap, new_bmap;
139 if (v == NULL || thread_id >= v->max_threads) {
140 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
141 "%s(): Invalid input parameter\n", __func__);
147 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
148 v->qsbr_cnt[thread_id].lock_cnt);
150 id = thread_id & __RTE_QSBR_THRID_MASK;
151 i = thread_id >> __RTE_QSBR_THRID_INDEX_SHIFT;
153 /* Make sure that the counter for registered threads does not
154 * go out of sync. Hence, additional checks are required.
156 /* Check if the thread is already unregistered */
157 old_bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
159 if (!(old_bmap & (1UL << id)))
163 new_bmap = old_bmap & ~(1UL << id);
164 /* Make sure any loads of the shared data structure are
165 * completed before removal of the thread from the list of
168 success = __atomic_compare_exchange(
169 __RTE_QSBR_THRID_ARRAY_ELM(v, i),
170 &old_bmap, &new_bmap, 0,
171 __ATOMIC_RELEASE, __ATOMIC_RELAXED);
174 __atomic_fetch_sub(&v->num_threads,
175 1, __ATOMIC_RELAXED);
176 else if (!(old_bmap & (1UL << id)))
177 /* Someone else unregistered this thread.
178 * Counter should not be incremented.
181 } while (success == 0);
186 /* Wait till the reader threads have entered quiescent state. */
188 rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)
192 RTE_ASSERT(v != NULL);
194 t = rte_rcu_qsbr_start(v);
196 /* If the current thread has readside critical section,
197 * update its quiescent state status.
199 if (thread_id != RTE_QSBR_THRID_INVALID)
200 rte_rcu_qsbr_quiescent(v, thread_id);
202 /* Wait for other readers to enter quiescent state */
203 rte_rcu_qsbr_check(v, t, true);
206 /* Dump the details of a single quiescent state variable to a file. */
208 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
213 if (v == NULL || f == NULL) {
214 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
215 "%s(): Invalid input parameter\n", __func__);
221 fprintf(f, "\nQuiescent State Variable @%p\n", v);
223 fprintf(f, " QS variable memory size = %zu\n",
224 rte_rcu_qsbr_get_memsize(v->max_threads));
225 fprintf(f, " Given # max threads = %u\n", v->max_threads);
226 fprintf(f, " Current # threads = %u\n", v->num_threads);
228 fprintf(f, " Registered thread IDs = ");
229 for (i = 0; i < v->num_elems; i++) {
230 bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
232 id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
234 t = __builtin_ctzl(bmap);
235 fprintf(f, "%u ", id + t);
243 fprintf(f, " Token = %" PRIu64 "\n",
244 __atomic_load_n(&v->token, __ATOMIC_ACQUIRE));
246 fprintf(f, " Least Acknowledged Token = %" PRIu64 "\n",
247 __atomic_load_n(&v->acked_token, __ATOMIC_ACQUIRE));
249 fprintf(f, "Quiescent State Counts for readers:\n");
250 for (i = 0; i < v->num_elems; i++) {
251 bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
253 id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
255 t = __builtin_ctzl(bmap);
256 fprintf(f, "thread ID = %u, count = %" PRIu64 ", lock count = %u\n",
259 &v->qsbr_cnt[id + t].cnt,
262 &v->qsbr_cnt[id + t].lock_cnt,
271 /* Create a queue used to store the data structure elements that can
272 * be freed later. This queue is referred to as 'defer queue'.
274 struct rte_rcu_qsbr_dq *
275 rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
277 struct rte_rcu_qsbr_dq *dq;
278 uint32_t qs_fifo_size;
281 if (params == NULL || params->free_fn == NULL ||
282 params->v == NULL || params->name == NULL ||
283 params->size == 0 || params->esize == 0 ||
284 (params->esize % 4 != 0)) {
285 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
286 "%s(): Invalid input parameter\n", __func__);
291 /* If auto reclamation is configured, reclaim limit
292 * should be a valid value.
294 if ((params->trigger_reclaim_limit <= params->size) &&
295 (params->max_reclaim_size == 0)) {
296 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
297 "%s(): Invalid input parameter, size = %u, trigger_reclaim_limit = %u, max_reclaim_size = %u\n",
298 __func__, params->size, params->trigger_reclaim_limit,
299 params->max_reclaim_size);
305 dq = rte_zmalloc(NULL, sizeof(struct rte_rcu_qsbr_dq),
306 RTE_CACHE_LINE_SIZE);
313 /* Decide the flags for the ring.
314 * If MT safety is requested, use RTS for ring enqueue as most
315 * use cases involve dq-enqueue happening on the control plane.
316 * Ring dequeue is always HTS due to the possibility of revert.
318 flags = RING_F_MP_RTS_ENQ;
319 if (params->flags & RTE_RCU_QSBR_DQ_MT_UNSAFE)
320 flags = RING_F_SP_ENQ;
321 flags |= RING_F_MC_HTS_DEQ;
322 /* round up qs_fifo_size to next power of two that is not less than
325 qs_fifo_size = rte_align32pow2(params->size + 1);
326 /* Add token size to ring element size */
327 dq->r = rte_ring_create_elem(params->name,
328 __RTE_QSBR_TOKEN_SIZE + params->esize,
329 qs_fifo_size, SOCKET_ID_ANY, flags);
331 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
332 "%s(): defer queue create failed\n", __func__);
338 dq->size = params->size;
339 dq->esize = __RTE_QSBR_TOKEN_SIZE + params->esize;
340 dq->trigger_reclaim_limit = params->trigger_reclaim_limit;
341 dq->max_reclaim_size = params->max_reclaim_size;
342 dq->free_fn = params->free_fn;
348 /* Enqueue one resource to the defer queue to free after the grace
351 int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
353 __rte_rcu_qsbr_dq_elem_t *dq_elem;
356 if (dq == NULL || e == NULL) {
357 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
358 "%s(): Invalid input parameter\n", __func__);
364 char data[dq->esize];
365 dq_elem = (__rte_rcu_qsbr_dq_elem_t *)data;
366 /* Start the grace period */
367 dq_elem->token = rte_rcu_qsbr_start(dq->v);
369 /* Reclaim resources if the queue size has hit the reclaim
370 * limit. This helps the queue from growing too large and
371 * allows time for reader threads to report their quiescent state.
373 cur_size = rte_ring_count(dq->r);
374 if (cur_size > dq->trigger_reclaim_limit) {
375 rte_log(RTE_LOG_INFO, rte_rcu_log_type,
376 "%s(): Triggering reclamation\n", __func__);
377 rte_rcu_qsbr_dq_reclaim(dq, dq->max_reclaim_size,
381 /* Enqueue the token and resource. Generating the token and
382 * enqueuing (token + resource) on the queue is not an
383 * atomic operation. When the defer queue is shared by multiple
384 * writers, this might result in tokens enqueued out of order
385 * on the queue. So, some tokens might wait longer than they
386 * are required to be reclaimed.
388 memcpy(dq_elem->elem, e, dq->esize - __RTE_QSBR_TOKEN_SIZE);
389 /* Check the status as enqueue might fail since the other threads
390 * might have used up the freed space.
391 * Enqueue uses the configured flags when the DQ was created.
393 if (rte_ring_enqueue_elem(dq->r, data, dq->esize) != 0) {
394 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
395 "%s(): Enqueue failed\n", __func__);
396 /* Note that the token generated above is not used.
397 * Other than wasting tokens, it should not cause any
400 rte_log(RTE_LOG_INFO, rte_rcu_log_type,
401 "%s(): Skipped enqueuing token = %" PRIu64 "\n",
402 __func__, dq_elem->token);
408 rte_log(RTE_LOG_INFO, rte_rcu_log_type,
409 "%s(): Enqueued token = %" PRIu64 "\n",
410 __func__, dq_elem->token);
415 /* Reclaim resources from the defer queue. */
417 rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
418 unsigned int *freed, unsigned int *pending,
419 unsigned int *available)
422 __rte_rcu_qsbr_dq_elem_t *dq_elem;
424 if (dq == NULL || n == 0) {
425 rte_log(RTE_LOG_ERR, rte_rcu_log_type,
426 "%s(): Invalid input parameter\n", __func__);
434 char data[dq->esize];
435 /* Check reader threads quiescent state and reclaim resources */
437 rte_ring_dequeue_bulk_elem_start(dq->r, &data,
438 dq->esize, 1, available) != 0) {
439 dq_elem = (__rte_rcu_qsbr_dq_elem_t *)data;
441 /* Reclaim the resource */
442 if (rte_rcu_qsbr_check(dq->v, dq_elem->token, false) != 1) {
443 rte_ring_dequeue_elem_finish(dq->r, 0);
446 rte_ring_dequeue_elem_finish(dq->r, 1);
448 rte_log(RTE_LOG_INFO, rte_rcu_log_type,
449 "%s(): Reclaimed token = %" PRIu64 "\n",
450 __func__, dq_elem->token);
452 dq->free_fn(dq->p, dq_elem->elem, 1);
457 rte_log(RTE_LOG_INFO, rte_rcu_log_type,
458 "%s(): Reclaimed %u resources\n", __func__, cnt);
463 *pending = rte_ring_count(dq->r);
468 /* Delete a defer queue. */
470 rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
472 unsigned int pending;
475 rte_log(RTE_LOG_DEBUG, rte_rcu_log_type,
476 "%s(): Invalid input parameter\n", __func__);
481 /* Reclaim all the resources */
482 rte_rcu_qsbr_dq_reclaim(dq, ~0, NULL, &pending, NULL);
489 rte_ring_free(dq->r);
495 RTE_LOG_REGISTER_DEFAULT(rte_rcu_log_type, ERR);