X-Git-Url: http://git.droids-corp.org/?a=blobdiff_plain;f=lib%2Flibrte_rcu%2Frte_rcu_qsbr.c;h=6a429d8b378c84d8d37b90ca895b135b08eaea5a;hb=1d51f154cd93bc8f623985032835d2218e05c893;hp=474675d5105491f2202d9b560b6fd25a76995734;hpb=b36f587f010bba421a8fd0b4c383276ecb796ae3;p=dpdk.git diff --git a/lib/librte_rcu/rte_rcu_qsbr.c b/lib/librte_rcu/rte_rcu_qsbr.c index 474675d510..6a429d8b37 100644 --- a/lib/librte_rcu/rte_rcu_qsbr.c +++ b/lib/librte_rcu/rte_rcu_qsbr.c @@ -1,6 +1,6 @@ /* SPDX-License-Identifier: BSD-3-Clause * - * Copyright (c) 2018 Arm Limited + * Copyright (c) 2018-2020 Arm Limited */ #include @@ -18,8 +18,10 @@ #include #include #include +#include #include "rte_rcu_qsbr.h" +#include "rcu_qsbr_pvt.h" /* Get the memory size of QSBR variable */ size_t @@ -72,6 +74,7 @@ rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads) __RTE_QSBR_THRID_ARRAY_ELM_SIZE) / __RTE_QSBR_THRID_ARRAY_ELM_SIZE; v->token = __RTE_QSBR_CNT_INIT; + v->acked_token = __RTE_QSBR_CNT_INIT - 1; return 0; } @@ -244,6 +247,9 @@ rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v) fprintf(f, " Token = %"PRIu64"\n", __atomic_load_n(&v->token, __ATOMIC_ACQUIRE)); + fprintf(f, " Least Acknowledged Token = %"PRIu64"\n", + __atomic_load_n(&v->acked_token, __ATOMIC_ACQUIRE)); + fprintf(f, "Quiescent State Counts for readers:\n"); for (i = 0; i < v->num_elems; i++) { bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i), @@ -266,6 +272,229 @@ rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v) return 0; } +/* Create a queue used to store the data structure elements that can + * be freed later. This queue is referred to as 'defer queue'. + */ +struct rte_rcu_qsbr_dq * +rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params) +{ + struct rte_rcu_qsbr_dq *dq; + uint32_t qs_fifo_size; + unsigned int flags; + + if (params == NULL || params->free_fn == NULL || + params->v == NULL || params->name == NULL || + params->size == 0 || params->esize == 0 || + (params->esize % 4 != 0)) { + rte_log(RTE_LOG_ERR, rte_rcu_log_type, + "%s(): Invalid input parameter\n", __func__); + rte_errno = EINVAL; + + return NULL; + } + /* If auto reclamation is configured, reclaim limit + * should be a valid value. + */ + if ((params->trigger_reclaim_limit <= params->size) && + (params->max_reclaim_size == 0)) { + rte_log(RTE_LOG_ERR, rte_rcu_log_type, + "%s(): Invalid input parameter, size = %u, trigger_reclaim_limit = %u, max_reclaim_size = %u\n", + __func__, params->size, params->trigger_reclaim_limit, + params->max_reclaim_size); + rte_errno = EINVAL; + + return NULL; + } + + dq = rte_zmalloc(NULL, sizeof(struct rte_rcu_qsbr_dq), + RTE_CACHE_LINE_SIZE); + if (dq == NULL) { + rte_errno = ENOMEM; + + return NULL; + } + + /* Decide the flags for the ring. + * If MT safety is requested, use RTS for ring enqueue as most + * use cases involve dq-enqueue happening on the control plane. + * Ring dequeue is always HTS due to the possibility of revert. + */ + flags = RING_F_MP_RTS_ENQ; + if (params->flags & RTE_RCU_QSBR_DQ_MT_UNSAFE) + flags = RING_F_SP_ENQ; + flags |= RING_F_MC_HTS_DEQ; + /* round up qs_fifo_size to next power of two that is not less than + * max_size. + */ + qs_fifo_size = rte_align32pow2(params->size + 1); + /* Add token size to ring element size */ + dq->r = rte_ring_create_elem(params->name, + __RTE_QSBR_TOKEN_SIZE + params->esize, + qs_fifo_size, SOCKET_ID_ANY, flags); + if (dq->r == NULL) { + rte_log(RTE_LOG_ERR, rte_rcu_log_type, + "%s(): defer queue create failed\n", __func__); + rte_free(dq); + return NULL; + } + + dq->v = params->v; + dq->size = params->size; + dq->esize = __RTE_QSBR_TOKEN_SIZE + params->esize; + dq->trigger_reclaim_limit = params->trigger_reclaim_limit; + dq->max_reclaim_size = params->max_reclaim_size; + dq->free_fn = params->free_fn; + dq->p = params->p; + + return dq; +} + +/* Enqueue one resource to the defer queue to free after the grace + * period is over. + */ +int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e) +{ + __rte_rcu_qsbr_dq_elem_t *dq_elem; + uint32_t cur_size; + + if (dq == NULL || e == NULL) { + rte_log(RTE_LOG_ERR, rte_rcu_log_type, + "%s(): Invalid input parameter\n", __func__); + rte_errno = EINVAL; + + return 1; + } + + char data[dq->esize]; + dq_elem = (__rte_rcu_qsbr_dq_elem_t *)data; + /* Start the grace period */ + dq_elem->token = rte_rcu_qsbr_start(dq->v); + + /* Reclaim resources if the queue size has hit the reclaim + * limit. This helps the queue from growing too large and + * allows time for reader threads to report their quiescent state. + */ + cur_size = rte_ring_count(dq->r); + if (cur_size > dq->trigger_reclaim_limit) { + rte_log(RTE_LOG_INFO, rte_rcu_log_type, + "%s(): Triggering reclamation\n", __func__); + rte_rcu_qsbr_dq_reclaim(dq, dq->max_reclaim_size, + NULL, NULL, NULL); + } + + /* Enqueue the token and resource. Generating the token and + * enqueuing (token + resource) on the queue is not an + * atomic operation. When the defer queue is shared by multiple + * writers, this might result in tokens enqueued out of order + * on the queue. So, some tokens might wait longer than they + * are required to be reclaimed. + */ + memcpy(dq_elem->elem, e, dq->esize - __RTE_QSBR_TOKEN_SIZE); + /* Check the status as enqueue might fail since the other threads + * might have used up the freed space. + * Enqueue uses the configured flags when the DQ was created. + */ + if (rte_ring_enqueue_elem(dq->r, data, dq->esize) != 0) { + rte_log(RTE_LOG_ERR, rte_rcu_log_type, + "%s(): Enqueue failed\n", __func__); + /* Note that the token generated above is not used. + * Other than wasting tokens, it should not cause any + * other issues. + */ + rte_log(RTE_LOG_INFO, rte_rcu_log_type, + "%s(): Skipped enqueuing token = %"PRIu64"\n", + __func__, dq_elem->token); + + rte_errno = ENOSPC; + return 1; + } + + rte_log(RTE_LOG_INFO, rte_rcu_log_type, + "%s(): Enqueued token = %"PRIu64"\n", __func__, dq_elem->token); + + return 0; +} + +/* Reclaim resources from the defer queue. */ +int +rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n, + unsigned int *freed, unsigned int *pending, + unsigned int *available) +{ + uint32_t cnt; + __rte_rcu_qsbr_dq_elem_t *dq_elem; + + if (dq == NULL || n == 0) { + rte_log(RTE_LOG_ERR, rte_rcu_log_type, + "%s(): Invalid input parameter\n", __func__); + rte_errno = EINVAL; + + return 1; + } + + cnt = 0; + + char data[dq->esize]; + /* Check reader threads quiescent state and reclaim resources */ + while (cnt < n && + rte_ring_dequeue_bulk_elem_start(dq->r, &data, + dq->esize, 1, available) != 0) { + dq_elem = (__rte_rcu_qsbr_dq_elem_t *)data; + + /* Reclaim the resource */ + if (rte_rcu_qsbr_check(dq->v, dq_elem->token, false) != 1) { + rte_ring_dequeue_elem_finish(dq->r, 0); + break; + } + rte_ring_dequeue_elem_finish(dq->r, 1); + + rte_log(RTE_LOG_INFO, rte_rcu_log_type, + "%s(): Reclaimed token = %"PRIu64"\n", + __func__, dq_elem->token); + + dq->free_fn(dq->p, dq_elem->elem, 1); + + cnt++; + } + + rte_log(RTE_LOG_INFO, rte_rcu_log_type, + "%s(): Reclaimed %u resources\n", __func__, cnt); + + if (freed != NULL) + *freed = cnt; + if (pending != NULL) + *pending = rte_ring_count(dq->r); + + return 0; +} + +/* Delete a defer queue. */ +int +rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq) +{ + unsigned int pending; + + if (dq == NULL) { + rte_log(RTE_LOG_DEBUG, rte_rcu_log_type, + "%s(): Invalid input parameter\n", __func__); + + return 0; + } + + /* Reclaim all the resources */ + rte_rcu_qsbr_dq_reclaim(dq, ~0, NULL, &pending, NULL); + if (pending != 0) { + rte_errno = EAGAIN; + + return 1; + } + + rte_ring_free(dq->r); + rte_free(dq); + + return 0; +} + int rte_rcu_log_type; RTE_INIT(rte_rcu_register)