5 * Copyright(c) 2015 Intel Corporation. All rights reserved.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 * * Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in
16 * the documentation and/or other materials provided with the
18 * * Neither the name of Intel Corporation nor the names of its
19 * contributors may be used to endorse or promote products derived
20 * from this software without specific prior written permission.
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 * Some portions of this software is derived from the producer
37 * consumer queues described by Dmitry Vyukov and published here
38 * http://www.1024cores.net
40 * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
41 * Redistribution and use in source and binary forms, with or without
42 * modification, are permitted provided that the following conditions
45 * 1. Redistributions of source code must retain the above copyright notice,
46 * this list of conditions and the following disclaimer.
48 * 2. Redistributions in binary form must reproduce the above copyright notice,
49 * this list of conditions and the following disclaimer in the documentation
50 * and/or other materials provided with the distribution.
52 * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS"
53 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
54 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
55 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS
56 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
57 * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
58 * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
59 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
60 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
61 * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
62 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
64 * The views and conclusions contained in the software and documentation are
65 * those of the authors and should not be interpreted as representing official
66 * policies, either expressed or implied, of Dmitry Vyukov.
69 #ifndef LTHREAD_POOL_H_
70 #define LTHREAD_POOL_H_
76 #include <rte_malloc.h>
77 #include <rte_per_lcore.h>
80 #include "lthread_int.h"
81 #include "lthread_diag.h"
84 * This file implements pool of queue nodes used by the queue implemented
87 * The pool is an intrusive lock free MPSC queue.
89 * The pool is created empty and populated lazily, i.e. on first attempt to
90 * allocate a the pool.
92 * Whenever the pool is empty more nodes are added to the pool
93 * The number of nodes preallocated in this way is a parameter of
94 * _qnode_pool_create. Freeing an object returns it to the pool.
96 * Each lthread scheduler maintains its own pool of nodes. L-threads must always
97 * allocate from this local pool ( because it is a single consumer queue ).
98 * L-threads can free nodes to any pool (because it is a multi producer queue)
99 * This enables threads that have affined to a different scheduler to free
107 * define intermediate node
112 struct qnode_pool *pool;
113 } __rte_cache_aligned;
121 struct qnode *fast_alloc;
122 struct qnode *tail __rte_cache_aligned;
124 char name[LT_MAX_NAME_SIZE];
126 DIAG_COUNT_DEFINE(rd);
127 DIAG_COUNT_DEFINE(wr);
128 DIAG_COUNT_DEFINE(available);
129 DIAG_COUNT_DEFINE(prealloc);
130 DIAG_COUNT_DEFINE(capacity);
131 } __rte_cache_aligned;
134 * Create a pool of qnodes
137 static inline struct qnode_pool *
138 _qnode_pool_create(const char *name, int prealloc_size) {
140 struct qnode_pool *p = rte_malloc_socket(NULL,
141 sizeof(struct qnode_pool),
147 p->stub = rte_malloc_socket(NULL,
148 sizeof(struct qnode),
155 strncpy(p->name, name, LT_MAX_NAME_SIZE);
156 p->name[sizeof(p->name)-1] = 0;
159 p->stub->next = NULL;
162 p->pre_alloc = prealloc_size;
164 DIAG_COUNT_INIT(p, rd);
165 DIAG_COUNT_INIT(p, wr);
166 DIAG_COUNT_INIT(p, available);
167 DIAG_COUNT_INIT(p, prealloc);
168 DIAG_COUNT_INIT(p, capacity);
175 * Insert a node into the pool
177 static inline void __attribute__ ((always_inline))
178 _qnode_pool_insert(struct qnode_pool *p, struct qnode *n)
181 struct qnode *prev = n;
182 /* We insert at the head */
183 prev = (struct qnode *) __sync_lock_test_and_set((uint64_t *)&p->head,
185 /* there is a window of inconsistency until prev next is set */
186 /* which is why remove must retry */
191 * Remove a node from the pool
193 * There is a race with _qnode_pool_insert() whereby the queue could appear
194 * empty during a concurrent insert, this is handled by retrying
196 * The queue uses a stub node, which must be swung as the queue becomes
197 * empty, this requires an insert of the stub, which means that removing the
198 * last item from the queue incurs the penalty of an atomic exchange. Since the
199 * pool is maintained with a bulk pre-allocation the cost of this is amortised.
201 static inline struct qnode *__attribute__ ((always_inline))
202 _pool_remove(struct qnode_pool *p)
205 struct qnode *tail = p->tail;
206 struct qnode *next = tail->next;
208 /* we remove from the tail */
209 if (tail == p->stub) {
212 /* advance the tail */
217 if (likely(next != NULL)) {
226 /* swing stub node */
227 _qnode_pool_insert(p, p->stub);
239 * This adds a retry to the _pool_remove function
242 static inline struct qnode *__attribute__ ((always_inline))
243 _qnode_pool_remove(struct qnode_pool *p)
249 if (likely(n != NULL))
252 rte_compiler_barrier();
253 } while ((p->head != p->tail) &&
254 (p->tail != p->stub));
259 * Allocate a node from the pool
260 * If the pool is empty add mode nodes
262 static inline struct qnode *__attribute__ ((always_inline))
265 struct qnode_pool *p = (THIS_SCHED)->qnode_pool;
266 int prealloc_size = p->pre_alloc;
270 if (likely(p->fast_alloc != NULL)) {
272 p->fast_alloc = NULL;
276 n = _qnode_pool_remove(p);
278 if (unlikely(n == NULL)) {
279 DIAG_COUNT_INC(p, prealloc);
280 for (i = 0; i < prealloc_size; i++) {
281 n = rte_malloc_socket(NULL,
282 sizeof(struct qnode),
288 DIAG_COUNT_INC(p, available);
289 DIAG_COUNT_INC(p, capacity);
292 _qnode_pool_insert(p, n);
294 n = _qnode_pool_remove(p);
297 DIAG_COUNT_INC(p, rd);
298 DIAG_COUNT_DEC(p, available);
305 * free a queue node to the per scheduler pool from which it came
307 static inline void __attribute__ ((always_inline))
308 _qnode_free(struct qnode *n)
310 struct qnode_pool *p = n->pool;
313 if (unlikely(p->fast_alloc != NULL) ||
314 unlikely(n->pool != (THIS_SCHED)->qnode_pool)) {
315 DIAG_COUNT_INC(p, wr);
316 DIAG_COUNT_INC(p, available);
317 _qnode_pool_insert(p, n);
324 * Destroy an qnode pool
325 * queue must be empty when this is called
328 _qnode_pool_destroy(struct qnode_pool *p)
339 #endif /* LTHREAD_POOL_H_ */