ring: promote new sync modes and peek to stable
[dpdk.git] / lib / ring / rte_ring_rts.h
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2010-2020 Intel Corporation
4  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
5  * All rights reserved.
6  * Derived from FreeBSD's bufring.h
7  * Used as BSD-3 Licensed with permission from Kip Macy.
8  */
9
10 #ifndef _RTE_RING_RTS_H_
11 #define _RTE_RING_RTS_H_
12
13 /**
14  * @file rte_ring_rts.h
15  * It is not recommended to include this file directly.
16  * Please include <rte_ring.h> instead.
17  *
18  * Contains functions for Relaxed Tail Sync (RTS) ring mode.
19  * The main idea remains the same as for our original MP/MC synchronization
20  * mechanism.
21  * The main difference is that tail value is increased not
22  * by every thread that finished enqueue/dequeue,
23  * but only by the current last one doing enqueue/dequeue.
24  * That allows threads to skip spinning on tail value,
25  * leaving actual tail value change to last thread at a given instance.
26  * RTS requires 2 64-bit CAS for each enqueue(/dequeue) operation:
27  * one for head update, second for tail update.
28  * As a gain it allows thread to avoid spinning/waiting on tail value.
29  * In comparison original MP/MC algorithm requires one 32-bit CAS
30  * for head update and waiting/spinning on tail value.
31  *
32  * Brief outline:
33  *  - introduce update counter (cnt) for both head and tail.
34  *  - increment head.cnt for each head.value update
35  *  - write head.value and head.cnt atomically (64-bit CAS)
36  *  - move tail.value ahead only when tail.cnt + 1 == head.cnt
37  *    (indicating that this is the last thread updating the tail)
38  *  - increment tail.cnt when each enqueue/dequeue op finishes
39  *    (no matter if tail.value going to change or not)
40  *  - write tail.value and tail.cnt atomically (64-bit CAS)
41  *
42  * To avoid producer/consumer starvation:
43  *  - limit max allowed distance between head and tail value (HTD_MAX).
44  *    I.E. thread is allowed to proceed with changing head.value,
45  *    only when:  head.value - tail.value <= HTD_MAX
46  * HTD_MAX is an optional parameter.
47  * With HTD_MAX == 0 we'll have fully serialized ring -
48  * i.e. only one thread at a time will be able to enqueue/dequeue
49  * to/from the ring.
50  * With HTD_MAX >= ring.capacity - no limitation.
51  * By default HTD_MAX == ring.capacity / 8.
52  */
53
54 #ifdef __cplusplus
55 extern "C" {
56 #endif
57
58 #include <rte_ring_rts_elem_pvt.h>
59
60 /**
61  * Enqueue several objects on the RTS ring (multi-producers safe).
62  *
63  * @param r
64  *   A pointer to the ring structure.
65  * @param obj_table
66  *   A pointer to a table of objects.
67  * @param esize
68  *   The size of ring element, in bytes. It must be a multiple of 4.
69  *   This must be the same value used while creating the ring. Otherwise
70  *   the results are undefined.
71  * @param n
72  *   The number of objects to add in the ring from the obj_table.
73  * @param free_space
74  *   if non-NULL, returns the amount of space in the ring after the
75  *   enqueue operation has finished.
76  * @return
77  *   The number of objects enqueued, either 0 or n
78  */
79 static __rte_always_inline unsigned int
80 rte_ring_mp_rts_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
81         unsigned int esize, unsigned int n, unsigned int *free_space)
82 {
83         return __rte_ring_do_rts_enqueue_elem(r, obj_table, esize, n,
84                         RTE_RING_QUEUE_FIXED, free_space);
85 }
86
87 /**
88  * Dequeue several objects from an RTS ring (multi-consumers safe).
89  *
90  * @param r
91  *   A pointer to the ring structure.
92  * @param obj_table
93  *   A pointer to a table of objects that will be filled.
94  * @param esize
95  *   The size of ring element, in bytes. It must be a multiple of 4.
96  *   This must be the same value used while creating the ring. Otherwise
97  *   the results are undefined.
98  * @param n
99  *   The number of objects to dequeue from the ring to the obj_table.
100  * @param available
101  *   If non-NULL, returns the number of remaining ring entries after the
102  *   dequeue has finished.
103  * @return
104  *   The number of objects dequeued, either 0 or n
105  */
106 static __rte_always_inline unsigned int
107 rte_ring_mc_rts_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
108         unsigned int esize, unsigned int n, unsigned int *available)
109 {
110         return __rte_ring_do_rts_dequeue_elem(r, obj_table, esize, n,
111                         RTE_RING_QUEUE_FIXED, available);
112 }
113
114 /**
115  * Enqueue several objects on the RTS ring (multi-producers safe).
116  *
117  * @param r
118  *   A pointer to the ring structure.
119  * @param obj_table
120  *   A pointer to a table of objects.
121  * @param esize
122  *   The size of ring element, in bytes. It must be a multiple of 4.
123  *   This must be the same value used while creating the ring. Otherwise
124  *   the results are undefined.
125  * @param n
126  *   The number of objects to add in the ring from the obj_table.
127  * @param free_space
128  *   if non-NULL, returns the amount of space in the ring after the
129  *   enqueue operation has finished.
130  * @return
131  *   - n: Actual number of objects enqueued.
132  */
133 static __rte_always_inline unsigned int
134 rte_ring_mp_rts_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
135         unsigned int esize, unsigned int n, unsigned int *free_space)
136 {
137         return __rte_ring_do_rts_enqueue_elem(r, obj_table, esize, n,
138                         RTE_RING_QUEUE_VARIABLE, free_space);
139 }
140
141 /**
142  * Dequeue several objects from an RTS  ring (multi-consumers safe).
143  * When the requested objects are more than the available objects,
144  * only dequeue the actual number of objects.
145  *
146  * @param r
147  *   A pointer to the ring structure.
148  * @param obj_table
149  *   A pointer to a table of objects that will be filled.
150  * @param esize
151  *   The size of ring element, in bytes. It must be a multiple of 4.
152  *   This must be the same value used while creating the ring. Otherwise
153  *   the results are undefined.
154  * @param n
155  *   The number of objects to dequeue from the ring to the obj_table.
156  * @param available
157  *   If non-NULL, returns the number of remaining ring entries after the
158  *   dequeue has finished.
159  * @return
160  *   - n: Actual number of objects dequeued, 0 if ring is empty
161  */
162 static __rte_always_inline unsigned int
163 rte_ring_mc_rts_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
164         unsigned int esize, unsigned int n, unsigned int *available)
165 {
166         return __rte_ring_do_rts_dequeue_elem(r, obj_table, esize, n,
167                         RTE_RING_QUEUE_VARIABLE, available);
168 }
169
170 /**
171  * Enqueue several objects on the RTS ring (multi-producers safe).
172  *
173  * @param r
174  *   A pointer to the ring structure.
175  * @param obj_table
176  *   A pointer to a table of void * pointers (objects).
177  * @param n
178  *   The number of objects to add in the ring from the obj_table.
179  * @param free_space
180  *   if non-NULL, returns the amount of space in the ring after the
181  *   enqueue operation has finished.
182  * @return
183  *   The number of objects enqueued, either 0 or n
184  */
185 static __rte_always_inline unsigned int
186 rte_ring_mp_rts_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
187                          unsigned int n, unsigned int *free_space)
188 {
189         return rte_ring_mp_rts_enqueue_bulk_elem(r, obj_table,
190                         sizeof(uintptr_t), n, free_space);
191 }
192
193 /**
194  * Dequeue several objects from an RTS ring (multi-consumers safe).
195  *
196  * @param r
197  *   A pointer to the ring structure.
198  * @param obj_table
199  *   A pointer to a table of void * pointers (objects) that will be filled.
200  * @param n
201  *   The number of objects to dequeue from the ring to the obj_table.
202  * @param available
203  *   If non-NULL, returns the number of remaining ring entries after the
204  *   dequeue has finished.
205  * @return
206  *   The number of objects dequeued, either 0 or n
207  */
208 static __rte_always_inline unsigned int
209 rte_ring_mc_rts_dequeue_bulk(struct rte_ring *r, void **obj_table,
210                 unsigned int n, unsigned int *available)
211 {
212         return rte_ring_mc_rts_dequeue_bulk_elem(r, obj_table,
213                         sizeof(uintptr_t), n, available);
214 }
215
216 /**
217  * Enqueue several objects on the RTS ring (multi-producers safe).
218  *
219  * @param r
220  *   A pointer to the ring structure.
221  * @param obj_table
222  *   A pointer to a table of void * pointers (objects).
223  * @param n
224  *   The number of objects to add in the ring from the obj_table.
225  * @param free_space
226  *   if non-NULL, returns the amount of space in the ring after the
227  *   enqueue operation has finished.
228  * @return
229  *   - n: Actual number of objects enqueued.
230  */
231 static __rte_always_inline unsigned int
232 rte_ring_mp_rts_enqueue_burst(struct rte_ring *r, void * const *obj_table,
233                          unsigned int n, unsigned int *free_space)
234 {
235         return rte_ring_mp_rts_enqueue_burst_elem(r, obj_table,
236                         sizeof(uintptr_t), n, free_space);
237 }
238
239 /**
240  * Dequeue several objects from an RTS  ring (multi-consumers safe).
241  * When the requested objects are more than the available objects,
242  * only dequeue the actual number of objects.
243  *
244  * @param r
245  *   A pointer to the ring structure.
246  * @param obj_table
247  *   A pointer to a table of void * pointers (objects) that will be filled.
248  * @param n
249  *   The number of objects to dequeue from the ring to the obj_table.
250  * @param available
251  *   If non-NULL, returns the number of remaining ring entries after the
252  *   dequeue has finished.
253  * @return
254  *   - n: Actual number of objects dequeued, 0 if ring is empty
255  */
256 static __rte_always_inline unsigned int
257 rte_ring_mc_rts_dequeue_burst(struct rte_ring *r, void **obj_table,
258                 unsigned int n, unsigned int *available)
259 {
260         return rte_ring_mc_rts_dequeue_burst_elem(r, obj_table,
261                         sizeof(uintptr_t), n, available);
262 }
263
264 /**
265  * Return producer max Head-Tail-Distance (HTD).
266  *
267  * @param r
268  *   A pointer to the ring structure.
269  * @return
270  *   Producer HTD value, if producer is set in appropriate sync mode,
271  *   or UINT32_MAX otherwise.
272  */
273 static inline uint32_t
274 rte_ring_get_prod_htd_max(const struct rte_ring *r)
275 {
276         if (r->prod.sync_type == RTE_RING_SYNC_MT_RTS)
277                 return r->rts_prod.htd_max;
278         return UINT32_MAX;
279 }
280
281 /**
282  * Set producer max Head-Tail-Distance (HTD).
283  * Note that producer has to use appropriate sync mode (RTS).
284  *
285  * @param r
286  *   A pointer to the ring structure.
287  * @param v
288  *   new HTD value to setup.
289  * @return
290  *   Zero on success, or negative error code otherwise.
291  */
292 static inline int
293 rte_ring_set_prod_htd_max(struct rte_ring *r, uint32_t v)
294 {
295         if (r->prod.sync_type != RTE_RING_SYNC_MT_RTS)
296                 return -ENOTSUP;
297
298         r->rts_prod.htd_max = v;
299         return 0;
300 }
301
302 /**
303  * Return consumer max Head-Tail-Distance (HTD).
304  *
305  * @param r
306  *   A pointer to the ring structure.
307  * @return
308  *   Consumer HTD value, if consumer is set in appropriate sync mode,
309  *   or UINT32_MAX otherwise.
310  */
311 static inline uint32_t
312 rte_ring_get_cons_htd_max(const struct rte_ring *r)
313 {
314         if (r->cons.sync_type == RTE_RING_SYNC_MT_RTS)
315                 return r->rts_cons.htd_max;
316         return UINT32_MAX;
317 }
318
319 /**
320  * Set consumer max Head-Tail-Distance (HTD).
321  * Note that consumer has to use appropriate sync mode (RTS).
322  *
323  * @param r
324  *   A pointer to the ring structure.
325  * @param v
326  *   new HTD value to setup.
327  * @return
328  *   Zero on success, or negative error code otherwise.
329  */
330 static inline int
331 rte_ring_set_cons_htd_max(struct rte_ring *r, uint32_t v)
332 {
333         if (r->cons.sync_type != RTE_RING_SYNC_MT_RTS)
334                 return -ENOTSUP;
335
336         r->rts_cons.htd_max = v;
337         return 0;
338 }
339
340 #ifdef __cplusplus
341 }
342 #endif
343
344 #endif /* _RTE_RING_RTS_H_ */