5 * Copyright(c) 2015 Intel Corporation. All rights reserved.
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
12 * * Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 * * Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in
16 * the documentation and/or other materials provided with the
18 * * Neither the name of Intel Corporation nor the names of its
19 * contributors may be used to endorse or promote products derived
20 * from this software without specific prior written permission.
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 * Some portions of this software is derived from the producer
37 * consumer queues described by Dmitry Vyukov and published here
38 * http://www.1024cores.net
40 * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
41 * Redistribution and use in source and binary forms, with or without
42 * modification, are permitted provided that the following conditions
45 * 1. Redistributions of source code must retain the above copyright notice,
46 * this list of conditions and the following disclaimer.
48 * 2. Redistributions in binary form must reproduce the above copyright notice,
49 * this list of conditions and the following disclaimer in the documentation
50 * and/or other materials provided with the distribution.
52 * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS"
53 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
54 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
55 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS
56 * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
57 * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
58 * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
59 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
60 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
61 * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
62 * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
64 * The views and conclusions contained in the software and documentation are
65 * those of the authors and should not be interpreted as representing official
66 * policies, either expressed or implied, of Dmitry Vyukov.
69 #ifndef LTHREAD_QUEUE_H_
70 #define LTHREAD_QUEUE_H_
78 #include <rte_prefetch.h>
79 #include <rte_per_lcore.h>
81 #include "lthread_int.h"
83 #include "lthread_diag.h"
84 #include "lthread_pool.h"
89 * This file implements an unbounded FIFO queue based on a lock free
92 * The queue is non-intrusive in that it uses intermediate nodes, and does
93 * not require these nodes to be inserted into the object being placed
96 * This is slightly more efficient than the very similar queue in lthread_pool
97 * in that it does not have to swing a stub node as the queue becomes empty.
99 * The queue access functions allocate and free intermediate node
100 * transparently from/to a per scheduler pool ( see lthread_pool.h ).
102 * The queue provides both MPSC and SPSC insert methods
106 * define a queue of lthread nodes
108 struct lthread_queue {
110 struct qnode *tail __rte_cache_aligned;
111 struct lthread_queue *p;
112 char name[LT_MAX_NAME_SIZE];
114 DIAG_COUNT_DEFINE(rd);
115 DIAG_COUNT_DEFINE(wr);
116 DIAG_COUNT_DEFINE(size);
118 } __rte_cache_aligned;
122 static inline struct lthread_queue *
123 _lthread_queue_create(const char *name)
126 struct lthread_queue *new_queue;
128 new_queue = rte_malloc_socket(NULL, sizeof(struct lthread_queue),
131 if (new_queue == NULL)
134 /* allocated stub node */
135 stub = _qnode_alloc();
139 strncpy(new_queue->name, name, sizeof(new_queue->name));
140 new_queue->name[sizeof(new_queue->name)-1] = 0;
142 /* initialize queue as empty */
144 new_queue->head = stub;
145 new_queue->tail = stub;
147 DIAG_COUNT_INIT(new_queue, rd);
148 DIAG_COUNT_INIT(new_queue, wr);
149 DIAG_COUNT_INIT(new_queue, size);
155 * Return true if the queue is empty
157 static __rte_always_inline int
158 _lthread_queue_empty(struct lthread_queue *q)
160 return q->tail == q->head;
167 * fail if queue is not empty
169 static inline int _lthread_queue_destroy(struct lthread_queue *q)
174 if (!_lthread_queue_empty(q))
177 _qnode_free(q->head);
182 RTE_DECLARE_PER_LCORE(struct lthread_sched *, this_sched);
185 * Insert a node into a queue
186 * this implementation is multi producer safe
188 static __rte_always_inline struct qnode *
189 _lthread_queue_insert_mp(struct lthread_queue
193 struct qnode *n = _qnode_alloc();
198 /* set object in node */
202 /* this is an MPSC method, perform a locked update */
205 (struct qnode *)__sync_lock_test_and_set((uint64_t *) &(q)->head,
207 /* there is a window of inconsistency until prev next is set,
208 * which is why remove must retry
212 DIAG_COUNT_INC(q, wr);
213 DIAG_COUNT_INC(q, size);
219 * Insert an node into a queue in single producer mode
220 * this implementation is NOT mult producer safe
222 static __rte_always_inline struct qnode *
223 _lthread_queue_insert_sp(struct lthread_queue
226 /* allocate a queue node */
228 struct qnode *n = _qnode_alloc();
233 /* set data in node */
237 /* this is an SPSC method, no need for locked exchange operation */
239 prev->next = q->head = n;
241 DIAG_COUNT_INC(q, wr);
242 DIAG_COUNT_INC(q, size);
248 * Remove a node from a queue
250 static __rte_always_inline void *
251 _lthread_queue_poll(struct lthread_queue *q)
254 struct qnode *tail = q->tail;
255 struct qnode *next = (struct qnode *)tail->next;
257 * There is a small window of inconsistency between producer and
258 * consumer whereby the queue may appear empty if consumer and
259 * producer access it at the same time.
260 * The consumer must handle this by retrying
263 if (likely(next != NULL)) {
265 tail->data = next->data;
271 DIAG_COUNT_INC(q, rd);
272 DIAG_COUNT_DEC(q, size);
279 * Remove a node from a queue
281 static __rte_always_inline void *
282 _lthread_queue_remove(struct lthread_queue *q)
287 * There is a small window of inconsistency between producer and
288 * consumer whereby the queue may appear empty if consumer and
289 * producer access it at the same time. We handle this by retrying
292 data = _lthread_queue_poll(q);
294 if (likely(data != NULL)) {
296 DIAG_COUNT_INC(q, rd);
297 DIAG_COUNT_DEC(q, size);
300 rte_compiler_barrier();
301 } while (unlikely(!_lthread_queue_empty(q)));
309 #endif /* LTHREAD_QUEUE_H_ */