2 * SPDX-License-Identifier: BSD-3-Clause
3 * Copyright 2015 Intel Corporation.
4 * Copyright 2010-2011 Dmitry Vyukov
7 #ifndef LTHREAD_POOL_H_
8 #define LTHREAD_POOL_H_
14 #include <rte_malloc.h>
15 #include <rte_per_lcore.h>
18 #include "lthread_int.h"
19 #include "lthread_diag.h"
22 * This file implements pool of queue nodes used by the queue implemented
25 * The pool is an intrusive lock free MPSC queue.
27 * The pool is created empty and populated lazily, i.e. on first attempt to
28 * allocate a the pool.
30 * Whenever the pool is empty more nodes are added to the pool
31 * The number of nodes preallocated in this way is a parameter of
32 * _qnode_pool_create. Freeing an object returns it to the pool.
34 * Each lthread scheduler maintains its own pool of nodes. L-threads must always
35 * allocate from this local pool ( because it is a single consumer queue ).
36 * L-threads can free nodes to any pool (because it is a multi producer queue)
37 * This enables threads that have affined to a different scheduler to free
45 * define intermediate node
50 struct qnode_pool *pool;
51 } __rte_cache_aligned;
59 struct qnode *fast_alloc;
60 struct qnode *tail __rte_cache_aligned;
62 char name[LT_MAX_NAME_SIZE];
64 DIAG_COUNT_DEFINE(rd);
65 DIAG_COUNT_DEFINE(wr);
66 DIAG_COUNT_DEFINE(available);
67 DIAG_COUNT_DEFINE(prealloc);
68 DIAG_COUNT_DEFINE(capacity);
69 } __rte_cache_aligned;
72 * Create a pool of qnodes
75 static inline struct qnode_pool *
76 _qnode_pool_create(const char *name, int prealloc_size) {
78 struct qnode_pool *p = rte_malloc_socket(NULL,
79 sizeof(struct qnode_pool),
85 p->stub = rte_malloc_socket(NULL,
93 strncpy(p->name, name, LT_MAX_NAME_SIZE);
94 p->name[sizeof(p->name)-1] = 0;
100 p->pre_alloc = prealloc_size;
102 DIAG_COUNT_INIT(p, rd);
103 DIAG_COUNT_INIT(p, wr);
104 DIAG_COUNT_INIT(p, available);
105 DIAG_COUNT_INIT(p, prealloc);
106 DIAG_COUNT_INIT(p, capacity);
113 * Insert a node into the pool
115 static __rte_always_inline void
116 _qnode_pool_insert(struct qnode_pool *p, struct qnode *n)
119 struct qnode *prev = n;
120 /* We insert at the head */
121 prev = (struct qnode *) __sync_lock_test_and_set((uint64_t *)&p->head,
123 /* there is a window of inconsistency until prev next is set */
124 /* which is why remove must retry */
129 * Remove a node from the pool
131 * There is a race with _qnode_pool_insert() whereby the queue could appear
132 * empty during a concurrent insert, this is handled by retrying
134 * The queue uses a stub node, which must be swung as the queue becomes
135 * empty, this requires an insert of the stub, which means that removing the
136 * last item from the queue incurs the penalty of an atomic exchange. Since the
137 * pool is maintained with a bulk pre-allocation the cost of this is amortised.
139 static __rte_always_inline struct qnode *
140 _pool_remove(struct qnode_pool *p)
143 struct qnode *tail = p->tail;
144 struct qnode *next = tail->next;
146 /* we remove from the tail */
147 if (tail == p->stub) {
150 /* advance the tail */
155 if (likely(next != NULL)) {
164 /* swing stub node */
165 _qnode_pool_insert(p, p->stub);
177 * This adds a retry to the _pool_remove function
180 static __rte_always_inline struct qnode *
181 _qnode_pool_remove(struct qnode_pool *p)
187 if (likely(n != NULL))
190 rte_compiler_barrier();
191 } while ((p->head != p->tail) &&
192 (p->tail != p->stub));
197 * Allocate a node from the pool
198 * If the pool is empty add mode nodes
200 static __rte_always_inline struct qnode *
203 struct qnode_pool *p = (THIS_SCHED)->qnode_pool;
204 int prealloc_size = p->pre_alloc;
208 if (likely(p->fast_alloc != NULL)) {
210 p->fast_alloc = NULL;
214 n = _qnode_pool_remove(p);
216 if (unlikely(n == NULL)) {
217 DIAG_COUNT_INC(p, prealloc);
218 for (i = 0; i < prealloc_size; i++) {
219 n = rte_malloc_socket(NULL,
220 sizeof(struct qnode),
226 DIAG_COUNT_INC(p, available);
227 DIAG_COUNT_INC(p, capacity);
230 _qnode_pool_insert(p, n);
232 n = _qnode_pool_remove(p);
235 DIAG_COUNT_INC(p, rd);
236 DIAG_COUNT_DEC(p, available);
243 * free a queue node to the per scheduler pool from which it came
245 static __rte_always_inline void
246 _qnode_free(struct qnode *n)
248 struct qnode_pool *p = n->pool;
251 if (unlikely(p->fast_alloc != NULL) ||
252 unlikely(n->pool != (THIS_SCHED)->qnode_pool)) {
253 DIAG_COUNT_INC(p, wr);
254 DIAG_COUNT_INC(p, available);
255 _qnode_pool_insert(p, n);
262 * Destroy an qnode pool
263 * queue must be empty when this is called
266 _qnode_pool_destroy(struct qnode_pool *p)
277 #endif /* LTHREAD_POOL_H_ */