2 * SPDX-License-Identifier: BSD-3-Clause
3 * Copyright 2015 Intel Corporation.
4 * Copyright 2010-2011 Dmitry Vyukov
7 #ifndef LTHREAD_QUEUE_H_
8 #define LTHREAD_QUEUE_H_
16 #include <rte_prefetch.h>
17 #include <rte_per_lcore.h>
19 #include "lthread_int.h"
21 #include "lthread_diag.h"
22 #include "lthread_pool.h"
27 * This file implements an unbounded FIFO queue based on a lock free
30 * The queue is non-intrusive in that it uses intermediate nodes, and does
31 * not require these nodes to be inserted into the object being placed
34 * This is slightly more efficient than the very similar queue in lthread_pool
35 * in that it does not have to swing a stub node as the queue becomes empty.
37 * The queue access functions allocate and free intermediate node
38 * transparently from/to a per scheduler pool ( see lthread_pool.h ).
40 * The queue provides both MPSC and SPSC insert methods
44 * define a queue of lthread nodes
46 struct lthread_queue {
48 struct qnode *tail __rte_cache_aligned;
49 struct lthread_queue *p;
50 char name[LT_MAX_NAME_SIZE];
52 DIAG_COUNT_DEFINE(rd);
53 DIAG_COUNT_DEFINE(wr);
54 DIAG_COUNT_DEFINE(size);
56 } __rte_cache_aligned;
60 static inline struct lthread_queue *
61 _lthread_queue_create(const char *name)
64 struct lthread_queue *new_queue;
66 new_queue = rte_malloc_socket(NULL, sizeof(struct lthread_queue),
69 if (new_queue == NULL)
72 /* allocated stub node */
73 stub = _qnode_alloc();
77 strncpy(new_queue->name, name, sizeof(new_queue->name));
78 new_queue->name[sizeof(new_queue->name)-1] = 0;
80 /* initialize queue as empty */
82 new_queue->head = stub;
83 new_queue->tail = stub;
85 DIAG_COUNT_INIT(new_queue, rd);
86 DIAG_COUNT_INIT(new_queue, wr);
87 DIAG_COUNT_INIT(new_queue, size);
93 * Return true if the queue is empty
95 static __rte_always_inline int
96 _lthread_queue_empty(struct lthread_queue *q)
98 return q->tail == q->head;
105 * fail if queue is not empty
107 static inline int _lthread_queue_destroy(struct lthread_queue *q)
112 if (!_lthread_queue_empty(q))
115 _qnode_free(q->head);
120 RTE_DECLARE_PER_LCORE(struct lthread_sched *, this_sched);
123 * Insert a node into a queue
124 * this implementation is multi producer safe
126 static __rte_always_inline struct qnode *
127 _lthread_queue_insert_mp(struct lthread_queue
131 struct qnode *n = _qnode_alloc();
136 /* set object in node */
140 /* this is an MPSC method, perform a locked update */
143 (struct qnode *)__sync_lock_test_and_set((uint64_t *) &(q)->head,
145 /* there is a window of inconsistency until prev next is set,
146 * which is why remove must retry
150 DIAG_COUNT_INC(q, wr);
151 DIAG_COUNT_INC(q, size);
157 * Insert an node into a queue in single producer mode
158 * this implementation is NOT mult producer safe
160 static __rte_always_inline struct qnode *
161 _lthread_queue_insert_sp(struct lthread_queue
164 /* allocate a queue node */
166 struct qnode *n = _qnode_alloc();
171 /* set data in node */
175 /* this is an SPSC method, no need for locked exchange operation */
177 prev->next = q->head = n;
179 DIAG_COUNT_INC(q, wr);
180 DIAG_COUNT_INC(q, size);
186 * Remove a node from a queue
188 static __rte_always_inline void *
189 _lthread_queue_poll(struct lthread_queue *q)
192 struct qnode *tail = q->tail;
193 struct qnode *next = (struct qnode *)tail->next;
195 * There is a small window of inconsistency between producer and
196 * consumer whereby the queue may appear empty if consumer and
197 * producer access it at the same time.
198 * The consumer must handle this by retrying
201 if (likely(next != NULL)) {
203 tail->data = next->data;
209 DIAG_COUNT_INC(q, rd);
210 DIAG_COUNT_DEC(q, size);
217 * Remove a node from a queue
219 static __rte_always_inline void *
220 _lthread_queue_remove(struct lthread_queue *q)
225 * There is a small window of inconsistency between producer and
226 * consumer whereby the queue may appear empty if consumer and
227 * producer access it at the same time. We handle this by retrying
230 data = _lthread_queue_poll(q);
232 if (likely(data != NULL)) {
234 DIAG_COUNT_INC(q, rd);
235 DIAG_COUNT_DEC(q, size);
238 rte_compiler_barrier();
239 } while (unlikely(!_lthread_queue_empty(q)));
247 #endif /* LTHREAD_QUEUE_H_ */