mempool: fix slow allocation of large mempools
[dpdk.git] / lib / librte_rcu / rte_rcu_qsbr.h
index 9727f49..0b55859 100644 (file)
@@ -24,6 +24,7 @@
 extern "C" {
 #endif
 
+#include <stdbool.h>
 #include <stdio.h>
 #include <stdint.h>
 #include <inttypes.h>
@@ -82,6 +83,7 @@ struct rte_rcu_qsbr_cnt {
 
 #define __RTE_QSBR_CNT_THR_OFFLINE 0
 #define __RTE_QSBR_CNT_INIT 1
+#define __RTE_QSBR_CNT_MAX ((uint64_t)~0)
 
 /* RTE Quiescent State variable structure.
  * This structure has two elements that vary in size based on the
@@ -92,6 +94,10 @@ struct rte_rcu_qsbr_cnt {
 struct rte_rcu_qsbr {
        uint64_t token __rte_cache_aligned;
        /**< Counter to allow for multiple concurrent quiescent state queries */
+       uint64_t acked_token;
+       /**< Least token acked by all the threads in the last call to
+        *   rte_rcu_qsbr_check API.
+        */
 
        uint32_t num_elems __rte_cache_aligned;
        /**< Number of elements in the thread ID array */
@@ -122,7 +128,8 @@ struct rte_rcu_qsbr {
  *   Possible rte_errno codes are:
  *   - EINVAL - max_threads is 0
  */
-size_t __rte_experimental
+__rte_experimental
+size_t
 rte_rcu_qsbr_get_memsize(uint32_t max_threads);
 
 /**
@@ -143,7 +150,8 @@ rte_rcu_qsbr_get_memsize(uint32_t max_threads);
  *   - EINVAL - max_threads is 0 or 'v' is NULL.
  *
  */
-int __rte_experimental
+__rte_experimental
+int
 rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
 
 /**
@@ -169,7 +177,8 @@ rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
  *   the QS variable. thread_id is a value between 0 and (max_threads - 1).
  *   'max_threads' is the parameter passed in 'rte_rcu_qsbr_init' API.
  */
-int __rte_experimental
+__rte_experimental
+int
 rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);
 
 /**
@@ -190,7 +199,8 @@ rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);
  *   Reader thread with this thread ID will stop reporting its quiescent
  *   state on the QS variable.
  */
-int __rte_experimental
+__rte_experimental
+int
 rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
 
 /**
@@ -207,7 +217,7 @@ rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
  * call this API before calling rte_rcu_qsbr_quiescent. This can be called
  * during initialization or as part of the packet processing loop.
  *
- * The reader thread must call rte_rcu_thread_offline API, before
+ * The reader thread must call rte_rcu_qsbr_thread_offline API, before
  * calling any functions that block, to ensure that rte_rcu_qsbr_check
  * API does not wait indefinitely for the reader thread to update its QS.
  *
@@ -221,7 +231,8 @@ rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
  *   Reader thread with this thread ID will report its quiescent state on
  *   the QS variable.
  */
-static __rte_always_inline void __rte_experimental
+__rte_experimental
+static __rte_always_inline void
 rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
 {
        uint64_t t;
@@ -272,7 +283,7 @@ rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
  * This can be called during initialization or as part of the packet
  * processing loop.
  *
- * The reader thread must call rte_rcu_thread_offline API, before
+ * The reader thread must call rte_rcu_qsbr_thread_offline API, before
  * calling any functions that block, to ensure that rte_rcu_qsbr_check
  * API does not wait indefinitely for the reader thread to update its QS.
  *
@@ -282,7 +293,8 @@ rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
  *   rte_rcu_qsbr_check API will not wait for the reader thread with
  *   this thread ID to report its quiescent state on the QS variable.
  */
-static __rte_always_inline void __rte_experimental
+__rte_experimental
+static __rte_always_inline void
 rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
 {
        RTE_ASSERT(v != NULL && thread_id < v->max_threads);
@@ -322,7 +334,8 @@ rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
  * @param thread_id
  *   Reader thread id
  */
-static __rte_always_inline void __rte_experimental
+__rte_experimental
+static __rte_always_inline void
 rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v,
                        __rte_unused unsigned int thread_id)
 {
@@ -358,7 +371,8 @@ rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v,
  * @param thread_id
  *   Reader thread id
  */
-static __rte_always_inline void __rte_experimental
+__rte_experimental
+static __rte_always_inline void
 rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v,
                        __rte_unused unsigned int thread_id)
 {
@@ -391,7 +405,8 @@ rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v,
  *   - This is the token for this call of the API. This should be
  *     passed to rte_rcu_qsbr_check API.
  */
-static __rte_always_inline uint64_t __rte_experimental
+__rte_experimental
+static __rte_always_inline uint64_t
 rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
 {
        uint64_t t;
@@ -423,7 +438,8 @@ rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
  * @param thread_id
  *   Update the quiescent state for the reader with this thread ID.
  */
-static __rte_always_inline void __rte_experimental
+__rte_experimental
+static __rte_always_inline void
 rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
 {
        uint64_t t;
@@ -440,12 +456,14 @@ rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
         */
        t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
 
-       /* Inform the writer that updates are visible to this reader.
+       /* Check if there are updates available from the writer.
+        * Inform the writer that updates are visible to this reader.
         * Prior loads of the shared data structure should not move
         * beyond this store. Hence use store-release.
         */
-       __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
-                        t, __ATOMIC_RELEASE);
+       if (t != __atomic_load_n(&v->qsbr_cnt[thread_id].cnt, __ATOMIC_RELAXED))
+               __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
+                                        t, __ATOMIC_RELEASE);
 
        __RTE_RCU_DP_LOG(DEBUG, "%s: update: token = %"PRIu64", Thread ID = %d",
                __func__, t, thread_id);
@@ -461,6 +479,7 @@ __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
        uint64_t bmap;
        uint64_t c;
        uint64_t *reg_thread_id;
+       uint64_t acked_token = __RTE_QSBR_CNT_MAX;
 
        for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
                i < v->num_elems;
@@ -482,6 +501,7 @@ __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
                        __RTE_RCU_DP_LOG(DEBUG,
                                "%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d",
                                __func__, t, wait, c, id+j);
+
                        /* Counter is not checked for wrap-around condition
                         * as it is a 64b counter.
                         */
@@ -501,10 +521,25 @@ __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
                                continue;
                        }
 
+                       /* This thread is in quiescent state. Use the counter
+                        * to find the least acknowledged token among all the
+                        * readers.
+                        */
+                       if (c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)
+                               acked_token = c;
+
                        bmap &= ~(1UL << j);
                }
        }
 
+       /* All readers are checked, update least acknowledged token.
+        * There might be multiple writers trying to update this. There is
+        * no need to update this very accurately using compare-and-swap.
+        */
+       if (acked_token != __RTE_QSBR_CNT_MAX)
+               __atomic_store_n(&v->acked_token, acked_token,
+                       __ATOMIC_RELAXED);
+
        return 1;
 }
 
@@ -517,6 +552,7 @@ __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
        uint32_t i;
        struct rte_rcu_qsbr_cnt *cnt;
        uint64_t c;
+       uint64_t acked_token = __RTE_QSBR_CNT_MAX;
 
        for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
                __RTE_RCU_DP_LOG(DEBUG,
@@ -527,6 +563,7 @@ __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
                        __RTE_RCU_DP_LOG(DEBUG,
                                "%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d",
                                __func__, t, wait, c, i);
+
                        /* Counter is not checked for wrap-around condition
                         * as it is a 64b counter.
                         */
@@ -539,8 +576,22 @@ __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
 
                        rte_pause();
                }
+
+               /* This thread is in quiescent state. Use the counter to find
+                * the least acknowledged token among all the readers.
+                */
+               if (likely(c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c))
+                       acked_token = c;
        }
 
+       /* All readers are checked, update least acknowledged token.
+        * There might be multiple writers trying to update this. There is
+        * no need to update this very accurately using compare-and-swap.
+        */
+       if (acked_token != __RTE_QSBR_CNT_MAX)
+               __atomic_store_n(&v->acked_token, acked_token,
+                       __ATOMIC_RELAXED);
+
        return 1;
 }
 
@@ -578,11 +629,16 @@ __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
  *   - 1 if all reader threads have passed through specified number
  *     of quiescent states.
  */
-static __rte_always_inline int __rte_experimental
+__rte_experimental
+static __rte_always_inline int
 rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
 {
        RTE_ASSERT(v != NULL);
 
+       /* Check if all the readers have already acknowledged this token */
+       if (likely(t <= v->acked_token))
+               return 1;
+
        if (likely(v->num_threads == v->max_threads))
                return __rte_rcu_qsbr_check_all(v, t, wait);
        else
@@ -610,7 +666,8 @@ rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
  *   on this QS variable (i.e. the calling thread is also part of the
  *   readside critical section). If not, pass RTE_QSBR_THRID_INVALID.
  */
-void __rte_experimental
+__rte_experimental
+void
 rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id);
 
 /**
@@ -631,7 +688,8 @@ rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id);
  *   Possible rte_errno codes are:
  *   - EINVAL - NULL parameters are passed
  */
-int __rte_experimental
+__rte_experimental
+int
 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
 
 #ifdef __cplusplus