mbuf: extend meaning of QinQ stripped bit
[dpdk.git] / lib / librte_rcu / rte_rcu_qsbr.c
index ce7f93d..aebfdb0 100644 (file)
@@ -1,6 +1,6 @@
 /* SPDX-License-Identifier: BSD-3-Clause
  *
- * Copyright (c) 2018 Arm Limited
+ * Copyright (c) 2018-2020 Arm Limited
  */
 
 #include <stdio.h>
 #include <rte_memory.h>
 #include <rte_malloc.h>
 #include <rte_eal.h>
-#include <rte_eal_memconfig.h>
 #include <rte_atomic.h>
 #include <rte_per_lcore.h>
 #include <rte_lcore.h>
 #include <rte_errno.h>
+#include <rte_ring_elem.h>
 
 #include "rte_rcu_qsbr.h"
+#include "rcu_qsbr_pvt.h"
 
 /* Get the memory size of QSBR variable */
 size_t
@@ -73,6 +74,7 @@ rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
                        __RTE_QSBR_THRID_ARRAY_ELM_SIZE) /
                        __RTE_QSBR_THRID_ARRAY_ELM_SIZE;
        v->token = __RTE_QSBR_CNT_INIT;
+       v->acked_token = __RTE_QSBR_CNT_INIT - 1;
 
        return 0;
 }
@@ -158,7 +160,7 @@ rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
        /* Check if the thread is already unregistered */
        old_bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
                                        __ATOMIC_RELAXED);
-       if (old_bmap & ~(1UL << id))
+       if (!(old_bmap & (1UL << id)))
                return 0;
 
        do {
@@ -175,7 +177,7 @@ rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
                if (success)
                        __atomic_fetch_sub(&v->num_threads,
                                                1, __ATOMIC_RELAXED);
-               else if (old_bmap & ~(1UL << id))
+               else if (!(old_bmap & (1UL << id)))
                        /* Someone else unregistered this thread.
                         * Counter should not be incremented.
                         */
@@ -245,6 +247,9 @@ rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
        fprintf(f, "  Token = %"PRIu64"\n",
                        __atomic_load_n(&v->token, __ATOMIC_ACQUIRE));
 
+       fprintf(f, "  Least Acknowledged Token = %"PRIu64"\n",
+                       __atomic_load_n(&v->acked_token, __ATOMIC_ACQUIRE));
+
        fprintf(f, "Quiescent State Counts for readers:\n");
        for (i = 0; i < v->num_elems; i++) {
                bmap = __atomic_load_n(__RTE_QSBR_THRID_ARRAY_ELM(v, i),
@@ -267,11 +272,227 @@ rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
        return 0;
 }
 
-int rte_rcu_log_type;
+/* Create a queue used to store the data structure elements that can
+ * be freed later. This queue is referred to as 'defer queue'.
+ */
+struct rte_rcu_qsbr_dq *
+rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
+{
+       struct rte_rcu_qsbr_dq *dq;
+       uint32_t qs_fifo_size;
+       unsigned int flags;
+
+       if (params == NULL || params->free_fn == NULL ||
+               params->v == NULL || params->name == NULL ||
+               params->size == 0 || params->esize == 0 ||
+               (params->esize % 4 != 0)) {
+               rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+                       "%s(): Invalid input parameter\n", __func__);
+               rte_errno = EINVAL;
+
+               return NULL;
+       }
+       /* If auto reclamation is configured, reclaim limit
+        * should be a valid value.
+        */
+       if ((params->trigger_reclaim_limit <= params->size) &&
+           (params->max_reclaim_size == 0)) {
+               rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+                       "%s(): Invalid input parameter, size = %u, trigger_reclaim_limit = %u, max_reclaim_size = %u\n",
+                       __func__, params->size, params->trigger_reclaim_limit,
+                       params->max_reclaim_size);
+               rte_errno = EINVAL;
+
+               return NULL;
+       }
+
+       dq = rte_zmalloc(NULL, sizeof(struct rte_rcu_qsbr_dq),
+                        RTE_CACHE_LINE_SIZE);
+       if (dq == NULL) {
+               rte_errno = ENOMEM;
+
+               return NULL;
+       }
+
+       /* Decide the flags for the ring.
+        * If MT safety is requested, use RTS for ring enqueue as most
+        * use cases involve dq-enqueue happening on the control plane.
+        * Ring dequeue is always HTS due to the possibility of revert.
+        */
+       flags = RING_F_MP_RTS_ENQ;
+       if (params->flags & RTE_RCU_QSBR_DQ_MT_UNSAFE)
+               flags = RING_F_SP_ENQ;
+       flags |= RING_F_MC_HTS_DEQ;
+       /* round up qs_fifo_size to next power of two that is not less than
+        * max_size.
+        */
+       qs_fifo_size = rte_align32pow2(params->size + 1);
+       /* Add token size to ring element size */
+       dq->r = rte_ring_create_elem(params->name,
+                       __RTE_QSBR_TOKEN_SIZE + params->esize,
+                       qs_fifo_size, SOCKET_ID_ANY, flags);
+       if (dq->r == NULL) {
+               rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+                       "%s(): defer queue create failed\n", __func__);
+               rte_free(dq);
+               return NULL;
+       }
+
+       dq->v = params->v;
+       dq->size = params->size;
+       dq->esize = __RTE_QSBR_TOKEN_SIZE + params->esize;
+       dq->trigger_reclaim_limit = params->trigger_reclaim_limit;
+       dq->max_reclaim_size = params->max_reclaim_size;
+       dq->free_fn = params->free_fn;
+       dq->p = params->p;
 
-RTE_INIT(rte_rcu_register)
+       return dq;
+}
+
+/* Enqueue one resource to the defer queue to free after the grace
+ * period is over.
+ */
+int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
 {
-       rte_rcu_log_type = rte_log_register("lib.rcu");
-       if (rte_rcu_log_type >= 0)
-               rte_log_set_level(rte_rcu_log_type, RTE_LOG_ERR);
+       __rte_rcu_qsbr_dq_elem_t *dq_elem;
+       uint32_t cur_size;
+
+       if (dq == NULL || e == NULL) {
+               rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+                       "%s(): Invalid input parameter\n", __func__);
+               rte_errno = EINVAL;
+
+               return 1;
+       }
+
+       char data[dq->esize];
+       dq_elem = (__rte_rcu_qsbr_dq_elem_t *)data;
+       /* Start the grace period */
+       dq_elem->token = rte_rcu_qsbr_start(dq->v);
+
+       /* Reclaim resources if the queue size has hit the reclaim
+        * limit. This helps the queue from growing too large and
+        * allows time for reader threads to report their quiescent state.
+        */
+       cur_size = rte_ring_count(dq->r);
+       if (cur_size > dq->trigger_reclaim_limit) {
+               rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+                       "%s(): Triggering reclamation\n", __func__);
+               rte_rcu_qsbr_dq_reclaim(dq, dq->max_reclaim_size,
+                                               NULL, NULL, NULL);
+       }
+
+       /* Enqueue the token and resource. Generating the token and
+        * enqueuing (token + resource) on the queue is not an
+        * atomic operation. When the defer queue is shared by multiple
+        * writers, this might result in tokens enqueued out of order
+        * on the queue. So, some tokens might wait longer than they
+        * are required to be reclaimed.
+        */
+       memcpy(dq_elem->elem, e, dq->esize - __RTE_QSBR_TOKEN_SIZE);
+       /* Check the status as enqueue might fail since the other threads
+        * might have used up the freed space.
+        * Enqueue uses the configured flags when the DQ was created.
+        */
+       if (rte_ring_enqueue_elem(dq->r, data, dq->esize) != 0) {
+               rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+                       "%s(): Enqueue failed\n", __func__);
+               /* Note that the token generated above is not used.
+                * Other than wasting tokens, it should not cause any
+                * other issues.
+                */
+               rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+                       "%s(): Skipped enqueuing token = %"PRIu64"\n",
+                       __func__, dq_elem->token);
+
+               rte_errno = ENOSPC;
+               return 1;
+       }
+
+       rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+               "%s(): Enqueued token = %"PRIu64"\n", __func__, dq_elem->token);
+
+       return 0;
 }
+
+/* Reclaim resources from the defer queue. */
+int
+rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
+                       unsigned int *freed, unsigned int *pending,
+                       unsigned int *available)
+{
+       uint32_t cnt;
+       __rte_rcu_qsbr_dq_elem_t *dq_elem;
+
+       if (dq == NULL || n == 0) {
+               rte_log(RTE_LOG_ERR, rte_rcu_log_type,
+                       "%s(): Invalid input parameter\n", __func__);
+               rte_errno = EINVAL;
+
+               return 1;
+       }
+
+       cnt = 0;
+
+       char data[dq->esize];
+       /* Check reader threads quiescent state and reclaim resources */
+       while (cnt < n &&
+               rte_ring_dequeue_bulk_elem_start(dq->r, &data,
+                                       dq->esize, 1, available) != 0) {
+               dq_elem = (__rte_rcu_qsbr_dq_elem_t *)data;
+
+               /* Reclaim the resource */
+               if (rte_rcu_qsbr_check(dq->v, dq_elem->token, false) != 1) {
+                       rte_ring_dequeue_elem_finish(dq->r, 0);
+                       break;
+               }
+               rte_ring_dequeue_elem_finish(dq->r, 1);
+
+               rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+                       "%s(): Reclaimed token = %"PRIu64"\n",
+                       __func__, dq_elem->token);
+
+               dq->free_fn(dq->p, dq_elem->elem, 1);
+
+               cnt++;
+       }
+
+       rte_log(RTE_LOG_INFO, rte_rcu_log_type,
+               "%s(): Reclaimed %u resources\n", __func__, cnt);
+
+       if (freed != NULL)
+               *freed = cnt;
+       if (pending != NULL)
+               *pending = rte_ring_count(dq->r);
+
+       return 0;
+}
+
+/* Delete a defer queue. */
+int
+rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
+{
+       unsigned int pending;
+
+       if (dq == NULL) {
+               rte_log(RTE_LOG_DEBUG, rte_rcu_log_type,
+                       "%s(): Invalid input parameter\n", __func__);
+
+               return 0;
+       }
+
+       /* Reclaim all the resources */
+       rte_rcu_qsbr_dq_reclaim(dq, ~0, NULL, &pending, NULL);
+       if (pending != 0) {
+               rte_errno = EAGAIN;
+
+               return 1;
+       }
+
+       rte_ring_free(dq->r);
+       rte_free(dq);
+
+       return 0;
+}
+
+RTE_LOG_REGISTER(rte_rcu_log_type, lib.rcu, ERR);