2c55fcec72828851c82656a2a329dd39f21054b1
[dpdk.git] / examples / performance-thread / common / lthread_queue.h
1 /*
2  *-
3  *   BSD LICENSE
4  *
5  *   Copyright(c) 2015 Intel Corporation. All rights reserved.
6  *   All rights reserved.
7  *
8  *   Redistribution and use in source and binary forms, with or without
9  *   modification, are permitted provided that the following conditions
10  *   are met:
11  *
12  *     * Redistributions of source code must retain the above copyright
13  *       notice, this list of conditions and the following disclaimer.
14  *     * Redistributions in binary form must reproduce the above copyright
15  *       notice, this list of conditions and the following disclaimer in
16  *       the documentation and/or other materials provided with the
17  *       distribution.
18  *     * Neither the name of Intel Corporation nor the names of its
19  *       contributors may be used to endorse or promote products derived
20  *       from this software without specific prior written permission.
21  *
22  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
25  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
26  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
27  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
28  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
32  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33  */
34
35 /*
36  * Some portions of this software is derived from the producer
37  * consumer queues described by Dmitry Vyukov and published  here
38  * http://www.1024cores.net
39  *
40  * Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved.
41  * Redistribution and use in source and binary forms, with or without
42  * modification, are permitted provided that the following conditions
43  * are met:
44  *
45  * 1. Redistributions of source code must retain the above copyright notice,
46  * this list of conditions and the following disclaimer.
47  *
48  * 2. Redistributions in binary form must reproduce the above copyright notice,
49  * this list of conditions and the following disclaimer in the documentation
50  * and/or other materials provided with the distribution.
51  *
52  * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS"
53  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
54  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
55  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS
56  * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
57  * OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
58  * OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
59  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
60  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
61  * OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
62  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
63  *
64  * The views and conclusions contained in the software and documentation are
65  * those of the authors and should not be interpreted as representing official
66  * policies, either expressed or implied, of Dmitry Vyukov.
67  */
68
69 #ifndef LTHREAD_QUEUE_H_
70 #define LTHREAD_QUEUE_H_
71
72 #include <string.h>
73
74 #include <rte_prefetch.h>
75 #include <rte_per_lcore.h>
76
77 #include "lthread_int.h"
78 #include "lthread.h"
79 #include "lthread_diag.h"
80 #include "lthread_pool.h"
81
82 struct lthread_queue;
83
84 /*
85  * This file implements an unbounded FIFO queue based on a lock free
86  * linked list.
87  *
88  * The queue is non-intrusive in that it uses intermediate nodes, and does
89  * not require these nodes to be inserted into the object being placed
90  * in the queue.
91  *
92  * This is slightly more efficient than the very similar queue in lthread_pool
93  * in that it does not have to swing a stub node as the queue becomes empty.
94  *
95  * The queue access functions allocate and free intermediate node
96  * transparently from/to a per scheduler pool ( see lthread_pool.h ).
97  *
98  * The queue provides both MPSC and SPSC insert methods
99  */
100
101 /*
102  * define a queue of lthread nodes
103  */
104 struct lthread_queue {
105         struct qnode *head;
106         struct qnode *tail __rte_cache_aligned;
107         struct lthread_queue *p;
108         char name[LT_MAX_NAME_SIZE];
109
110         DIAG_COUNT_DEFINE(rd);
111         DIAG_COUNT_DEFINE(wr);
112         DIAG_COUNT_DEFINE(size);
113
114 } __rte_cache_aligned;
115
116
117
118 static inline struct lthread_queue *
119 _lthread_queue_create(const char *name)
120 {
121         struct qnode *stub;
122         struct lthread_queue *new_queue;
123
124         new_queue = rte_malloc_socket(NULL, sizeof(struct lthread_queue),
125                                         RTE_CACHE_LINE_SIZE,
126                                         rte_socket_id());
127         if (new_queue == NULL)
128                 return NULL;
129
130         /* allocated stub node */
131         stub = _qnode_alloc();
132         RTE_ASSERT(stub);
133
134         if (name != NULL)
135                 strncpy(new_queue->name, name, sizeof(new_queue->name));
136         new_queue->name[sizeof(new_queue->name)-1] = 0;
137
138         /* initialize queue as empty */
139         stub->next = NULL;
140         new_queue->head = stub;
141         new_queue->tail = stub;
142
143         DIAG_COUNT_INIT(new_queue, rd);
144         DIAG_COUNT_INIT(new_queue, wr);
145         DIAG_COUNT_INIT(new_queue, size);
146
147         return new_queue;
148 }
149
150 /**
151  * Return true if the queue is empty
152  */
153 static inline int __attribute__ ((always_inline))
154 _lthread_queue_empty(struct lthread_queue *q)
155 {
156         return q->tail == q->head;
157 }
158
159
160
161 /**
162  * Destroy a queue
163  * fail if queue is not empty
164  */
165 static inline int _lthread_queue_destroy(struct lthread_queue *q)
166 {
167         if (q == NULL)
168                 return -1;
169
170         if (!_lthread_queue_empty(q))
171                 return -1;
172
173         _qnode_free(q->head);
174         rte_free(q);
175         return 0;
176 }
177
178 RTE_DECLARE_PER_LCORE(struct lthread_sched *, this_sched);
179
180 /*
181  * Insert a node into a queue
182  * this implementation is multi producer safe
183  */
184 static inline struct qnode *__attribute__ ((always_inline))
185 _lthread_queue_insert_mp(struct lthread_queue
186                                                           *q, void *data)
187 {
188         struct qnode *prev;
189         struct qnode *n = _qnode_alloc();
190
191         if (n == NULL)
192                 return NULL;
193
194         /* set object in node */
195         n->data = data;
196         n->next = NULL;
197
198         /* this is an MPSC method, perform a locked update */
199         prev = n;
200         prev =
201             (struct qnode *)__sync_lock_test_and_set((uint64_t *) &(q)->head,
202                                                (uint64_t) prev);
203         /* there is a window of inconsistency until prev next is set,
204          * which is why remove must retry
205          */
206         prev->next = n;
207
208         DIAG_COUNT_INC(q, wr);
209         DIAG_COUNT_INC(q, size);
210
211         return n;
212 }
213
214 /*
215  * Insert an node into a queue in single producer mode
216  * this implementation is NOT mult producer safe
217  */
218 static inline struct qnode *__attribute__ ((always_inline))
219 _lthread_queue_insert_sp(struct lthread_queue
220                                                           *q, void *data)
221 {
222         /* allocate a queue node */
223         struct qnode *prev;
224         struct qnode *n = _qnode_alloc();
225
226         if (n == NULL)
227                 return NULL;
228
229         /* set data in node */
230         n->data = data;
231         n->next = NULL;
232
233         /* this is an SPSC method, no need for locked exchange operation */
234         prev = q->head;
235         prev->next = q->head = n;
236
237         DIAG_COUNT_INC(q, wr);
238         DIAG_COUNT_INC(q, size);
239
240         return n;
241 }
242
243 /*
244  * Remove a node from a queue
245  */
246 static inline void *__attribute__ ((always_inline))
247 _lthread_queue_poll(struct lthread_queue *q)
248 {
249         void *data = NULL;
250         struct qnode *tail = q->tail;
251         struct qnode *next = (struct qnode *)tail->next;
252         /*
253          * There is a small window of inconsistency between producer and
254          * consumer whereby the queue may appear empty if consumer and
255          * producer access it at the same time.
256          * The consumer must handle this by retrying
257          */
258
259         if (likely(next != NULL)) {
260                 q->tail = next;
261                 tail->data = next->data;
262                 data = tail->data;
263
264                 /* free the node */
265                 _qnode_free(tail);
266
267                 DIAG_COUNT_INC(q, rd);
268                 DIAG_COUNT_DEC(q, size);
269                 return data;
270         }
271         return NULL;
272 }
273
274 /*
275  * Remove a node from a queue
276  */
277 static inline void *__attribute__ ((always_inline))
278 _lthread_queue_remove(struct lthread_queue *q)
279 {
280         void *data = NULL;
281
282         /*
283          * There is a small window of inconsistency between producer and
284          * consumer whereby the queue may appear empty if consumer and
285          * producer access it at the same time. We handle this by retrying
286          */
287         do {
288                 data = _lthread_queue_poll(q);
289
290                 if (likely(data != NULL)) {
291
292                         DIAG_COUNT_INC(q, rd);
293                         DIAG_COUNT_DEC(q, size);
294                         return data;
295                 }
296                 rte_compiler_barrier();
297         } while (unlikely(!_lthread_queue_empty(q)));
298         return NULL;
299 }
300
301
302 #endif                          /* LTHREAD_QUEUE_H_ */