ring: introduce RTS ring mode
[dpdk.git] / lib / librte_ring / rte_ring_rts.h
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2010-2020 Intel Corporation
4  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
5  * All rights reserved.
6  * Derived from FreeBSD's bufring.h
7  * Used as BSD-3 Licensed with permission from Kip Macy.
8  */
9
10 #ifndef _RTE_RING_RTS_H_
11 #define _RTE_RING_RTS_H_
12
13 /**
14  * @file rte_ring_rts.h
15  * @b EXPERIMENTAL: this API may change without prior notice
16  * It is not recommended to include this file directly.
17  * Please include <rte_ring.h> instead.
18  *
19  * Contains functions for Relaxed Tail Sync (RTS) ring mode.
20  * The main idea remains the same as for our original MP/MC synchronization
21  * mechanism.
22  * The main difference is that tail value is increased not
23  * by every thread that finished enqueue/dequeue,
24  * but only by the current last one doing enqueue/dequeue.
25  * That allows threads to skip spinning on tail value,
26  * leaving actual tail value change to last thread at a given instance.
27  * RTS requires 2 64-bit CAS for each enqueue(/dequeue) operation:
28  * one for head update, second for tail update.
29  * As a gain it allows thread to avoid spinning/waiting on tail value.
30  * In comparison original MP/MC algorithm requires one 32-bit CAS
31  * for head update and waiting/spinning on tail value.
32  *
33  * Brief outline:
34  *  - introduce update counter (cnt) for both head and tail.
35  *  - increment head.cnt for each head.value update
36  *  - write head.value and head.cnt atomically (64-bit CAS)
37  *  - move tail.value ahead only when tail.cnt + 1 == head.cnt
38  *    (indicating that this is the last thread updating the tail)
39  *  - increment tail.cnt when each enqueue/dequeue op finishes
40  *    (no matter if tail.value going to change or not)
41  *  - write tail.value and tail.cnt atomically (64-bit CAS)
42  *
43  * To avoid producer/consumer starvation:
44  *  - limit max allowed distance between head and tail value (HTD_MAX).
45  *    I.E. thread is allowed to proceed with changing head.value,
46  *    only when:  head.value - tail.value <= HTD_MAX
47  * HTD_MAX is an optional parameter.
48  * With HTD_MAX == 0 we'll have fully serialized ring -
49  * i.e. only one thread at a time will be able to enqueue/dequeue
50  * to/from the ring.
51  * With HTD_MAX >= ring.capacity - no limitation.
52  * By default HTD_MAX == ring.capacity / 8.
53  */
54
55 #ifdef __cplusplus
56 extern "C" {
57 #endif
58
59 #include <rte_ring_rts_c11_mem.h>
60
61 /**
62  * @internal Enqueue several objects on the RTS ring.
63  *
64  * @param r
65  *   A pointer to the ring structure.
66  * @param obj_table
67  *   A pointer to a table of objects.
68  * @param esize
69  *   The size of ring element, in bytes. It must be a multiple of 4.
70  *   This must be the same value used while creating the ring. Otherwise
71  *   the results are undefined.
72  * @param n
73  *   The number of objects to add in the ring from the obj_table.
74  * @param behavior
75  *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
76  *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
77  * @param free_space
78  *   returns the amount of space after the enqueue operation has finished
79  * @return
80  *   Actual number of objects enqueued.
81  *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
82  */
83 static __rte_always_inline unsigned int
84 __rte_ring_do_rts_enqueue_elem(struct rte_ring *r, const void *obj_table,
85         uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
86         uint32_t *free_space)
87 {
88         uint32_t free, head;
89
90         n =  __rte_ring_rts_move_prod_head(r, n, behavior, &head, &free);
91
92         if (n != 0) {
93                 __rte_ring_enqueue_elems(r, head, obj_table, esize, n);
94                 __rte_ring_rts_update_tail(&r->rts_prod);
95         }
96
97         if (free_space != NULL)
98                 *free_space = free - n;
99         return n;
100 }
101
102 /**
103  * @internal Dequeue several objects from the RTS ring.
104  *
105  * @param r
106  *   A pointer to the ring structure.
107  * @param obj_table
108  *   A pointer to a table of objects.
109  * @param esize
110  *   The size of ring element, in bytes. It must be a multiple of 4.
111  *   This must be the same value used while creating the ring. Otherwise
112  *   the results are undefined.
113  * @param n
114  *   The number of objects to pull from the ring.
115  * @param behavior
116  *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
117  *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
118  * @param available
119  *   returns the number of remaining ring entries after the dequeue has finished
120  * @return
121  *   - Actual number of objects dequeued.
122  *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
123  */
124 static __rte_always_inline unsigned int
125 __rte_ring_do_rts_dequeue_elem(struct rte_ring *r, void *obj_table,
126         uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
127         uint32_t *available)
128 {
129         uint32_t entries, head;
130
131         n = __rte_ring_rts_move_cons_head(r, n, behavior, &head, &entries);
132
133         if (n != 0) {
134                 __rte_ring_dequeue_elems(r, head, obj_table, esize, n);
135                 __rte_ring_rts_update_tail(&r->rts_cons);
136         }
137
138         if (available != NULL)
139                 *available = entries - n;
140         return n;
141 }
142
143 /**
144  * Enqueue several objects on the RTS ring (multi-producers safe).
145  *
146  * @param r
147  *   A pointer to the ring structure.
148  * @param obj_table
149  *   A pointer to a table of objects.
150  * @param esize
151  *   The size of ring element, in bytes. It must be a multiple of 4.
152  *   This must be the same value used while creating the ring. Otherwise
153  *   the results are undefined.
154  * @param n
155  *   The number of objects to add in the ring from the obj_table.
156  * @param free_space
157  *   if non-NULL, returns the amount of space in the ring after the
158  *   enqueue operation has finished.
159  * @return
160  *   The number of objects enqueued, either 0 or n
161  */
162 __rte_experimental
163 static __rte_always_inline unsigned int
164 rte_ring_mp_rts_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table,
165         unsigned int esize, unsigned int n, unsigned int *free_space)
166 {
167         return __rte_ring_do_rts_enqueue_elem(r, obj_table, esize, n,
168                         RTE_RING_QUEUE_FIXED, free_space);
169 }
170
171 /**
172  * Dequeue several objects from an RTS ring (multi-consumers safe).
173  *
174  * @param r
175  *   A pointer to the ring structure.
176  * @param obj_table
177  *   A pointer to a table of objects that will be filled.
178  * @param esize
179  *   The size of ring element, in bytes. It must be a multiple of 4.
180  *   This must be the same value used while creating the ring. Otherwise
181  *   the results are undefined.
182  * @param n
183  *   The number of objects to dequeue from the ring to the obj_table.
184  * @param available
185  *   If non-NULL, returns the number of remaining ring entries after the
186  *   dequeue has finished.
187  * @return
188  *   The number of objects dequeued, either 0 or n
189  */
190 __rte_experimental
191 static __rte_always_inline unsigned int
192 rte_ring_mc_rts_dequeue_bulk_elem(struct rte_ring *r, void *obj_table,
193         unsigned int esize, unsigned int n, unsigned int *available)
194 {
195         return __rte_ring_do_rts_dequeue_elem(r, obj_table, esize, n,
196                         RTE_RING_QUEUE_FIXED, available);
197 }
198
199 /**
200  * Enqueue several objects on the RTS ring (multi-producers safe).
201  *
202  * @param r
203  *   A pointer to the ring structure.
204  * @param obj_table
205  *   A pointer to a table of objects.
206  * @param esize
207  *   The size of ring element, in bytes. It must be a multiple of 4.
208  *   This must be the same value used while creating the ring. Otherwise
209  *   the results are undefined.
210  * @param n
211  *   The number of objects to add in the ring from the obj_table.
212  * @param free_space
213  *   if non-NULL, returns the amount of space in the ring after the
214  *   enqueue operation has finished.
215  * @return
216  *   - n: Actual number of objects enqueued.
217  */
218 __rte_experimental
219 static __rte_always_inline unsigned
220 rte_ring_mp_rts_enqueue_burst_elem(struct rte_ring *r, const void *obj_table,
221         unsigned int esize, unsigned int n, unsigned int *free_space)
222 {
223         return __rte_ring_do_rts_enqueue_elem(r, obj_table, esize, n,
224                         RTE_RING_QUEUE_VARIABLE, free_space);
225 }
226
227 /**
228  * Dequeue several objects from an RTS  ring (multi-consumers safe).
229  * When the requested objects are more than the available objects,
230  * only dequeue the actual number of objects.
231  *
232  * @param r
233  *   A pointer to the ring structure.
234  * @param obj_table
235  *   A pointer to a table of objects that will be filled.
236  * @param esize
237  *   The size of ring element, in bytes. It must be a multiple of 4.
238  *   This must be the same value used while creating the ring. Otherwise
239  *   the results are undefined.
240  * @param n
241  *   The number of objects to dequeue from the ring to the obj_table.
242  * @param available
243  *   If non-NULL, returns the number of remaining ring entries after the
244  *   dequeue has finished.
245  * @return
246  *   - n: Actual number of objects dequeued, 0 if ring is empty
247  */
248 __rte_experimental
249 static __rte_always_inline unsigned
250 rte_ring_mc_rts_dequeue_burst_elem(struct rte_ring *r, void *obj_table,
251         unsigned int esize, unsigned int n, unsigned int *available)
252 {
253         return __rte_ring_do_rts_dequeue_elem(r, obj_table, esize, n,
254                         RTE_RING_QUEUE_VARIABLE, available);
255 }
256
257 /**
258  * Enqueue several objects on the RTS ring (multi-producers safe).
259  *
260  * @param r
261  *   A pointer to the ring structure.
262  * @param obj_table
263  *   A pointer to a table of void * pointers (objects).
264  * @param n
265  *   The number of objects to add in the ring from the obj_table.
266  * @param free_space
267  *   if non-NULL, returns the amount of space in the ring after the
268  *   enqueue operation has finished.
269  * @return
270  *   The number of objects enqueued, either 0 or n
271  */
272 __rte_experimental
273 static __rte_always_inline unsigned int
274 rte_ring_mp_rts_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
275                          unsigned int n, unsigned int *free_space)
276 {
277         return rte_ring_mp_rts_enqueue_bulk_elem(r, obj_table,
278                         sizeof(uintptr_t), n, free_space);
279 }
280
281 /**
282  * Dequeue several objects from an RTS ring (multi-consumers safe).
283  *
284  * @param r
285  *   A pointer to the ring structure.
286  * @param obj_table
287  *   A pointer to a table of void * pointers (objects) that will be filled.
288  * @param n
289  *   The number of objects to dequeue from the ring to the obj_table.
290  * @param available
291  *   If non-NULL, returns the number of remaining ring entries after the
292  *   dequeue has finished.
293  * @return
294  *   The number of objects dequeued, either 0 or n
295  */
296 __rte_experimental
297 static __rte_always_inline unsigned int
298 rte_ring_mc_rts_dequeue_bulk(struct rte_ring *r, void **obj_table,
299                 unsigned int n, unsigned int *available)
300 {
301         return rte_ring_mc_rts_dequeue_bulk_elem(r, obj_table,
302                         sizeof(uintptr_t), n, available);
303 }
304
305 /**
306  * Enqueue several objects on the RTS ring (multi-producers safe).
307  *
308  * @param r
309  *   A pointer to the ring structure.
310  * @param obj_table
311  *   A pointer to a table of void * pointers (objects).
312  * @param n
313  *   The number of objects to add in the ring from the obj_table.
314  * @param free_space
315  *   if non-NULL, returns the amount of space in the ring after the
316  *   enqueue operation has finished.
317  * @return
318  *   - n: Actual number of objects enqueued.
319  */
320 __rte_experimental
321 static __rte_always_inline unsigned
322 rte_ring_mp_rts_enqueue_burst(struct rte_ring *r, void * const *obj_table,
323                          unsigned int n, unsigned int *free_space)
324 {
325         return rte_ring_mp_rts_enqueue_burst_elem(r, obj_table,
326                         sizeof(uintptr_t), n, free_space);
327 }
328
329 /**
330  * Dequeue several objects from an RTS  ring (multi-consumers safe).
331  * When the requested objects are more than the available objects,
332  * only dequeue the actual number of objects.
333  *
334  * @param r
335  *   A pointer to the ring structure.
336  * @param obj_table
337  *   A pointer to a table of void * pointers (objects) that will be filled.
338  * @param n
339  *   The number of objects to dequeue from the ring to the obj_table.
340  * @param available
341  *   If non-NULL, returns the number of remaining ring entries after the
342  *   dequeue has finished.
343  * @return
344  *   - n: Actual number of objects dequeued, 0 if ring is empty
345  */
346 __rte_experimental
347 static __rte_always_inline unsigned
348 rte_ring_mc_rts_dequeue_burst(struct rte_ring *r, void **obj_table,
349                 unsigned int n, unsigned int *available)
350 {
351         return rte_ring_mc_rts_dequeue_burst_elem(r, obj_table,
352                         sizeof(uintptr_t), n, available);
353 }
354
355 /**
356  * Return producer max Head-Tail-Distance (HTD).
357  *
358  * @param r
359  *   A pointer to the ring structure.
360  * @return
361  *   Producer HTD value, if producer is set in appropriate sync mode,
362  *   or UINT32_MAX otherwise.
363  */
364 __rte_experimental
365 static inline uint32_t
366 rte_ring_get_prod_htd_max(const struct rte_ring *r)
367 {
368         if (r->prod.sync_type == RTE_RING_SYNC_MT_RTS)
369                 return r->rts_prod.htd_max;
370         return UINT32_MAX;
371 }
372
373 /**
374  * Set producer max Head-Tail-Distance (HTD).
375  * Note that producer has to use appropriate sync mode (RTS).
376  *
377  * @param r
378  *   A pointer to the ring structure.
379  * @param v
380  *   new HTD value to setup.
381  * @return
382  *   Zero on success, or negative error code otherwise.
383  */
384 __rte_experimental
385 static inline int
386 rte_ring_set_prod_htd_max(struct rte_ring *r, uint32_t v)
387 {
388         if (r->prod.sync_type != RTE_RING_SYNC_MT_RTS)
389                 return -ENOTSUP;
390
391         r->rts_prod.htd_max = v;
392         return 0;
393 }
394
395 /**
396  * Return consumer max Head-Tail-Distance (HTD).
397  *
398  * @param r
399  *   A pointer to the ring structure.
400  * @return
401  *   Consumer HTD value, if consumer is set in appropriate sync mode,
402  *   or UINT32_MAX otherwise.
403  */
404 __rte_experimental
405 static inline uint32_t
406 rte_ring_get_cons_htd_max(const struct rte_ring *r)
407 {
408         if (r->cons.sync_type == RTE_RING_SYNC_MT_RTS)
409                 return r->rts_cons.htd_max;
410         return UINT32_MAX;
411 }
412
413 /**
414  * Set consumer max Head-Tail-Distance (HTD).
415  * Note that consumer has to use appropriate sync mode (RTS).
416  *
417  * @param r
418  *   A pointer to the ring structure.
419  * @param v
420  *   new HTD value to setup.
421  * @return
422  *   Zero on success, or negative error code otherwise.
423  */
424 __rte_experimental
425 static inline int
426 rte_ring_set_cons_htd_max(struct rte_ring *r, uint32_t v)
427 {
428         if (r->cons.sync_type != RTE_RING_SYNC_MT_RTS)
429                 return -ENOTSUP;
430
431         r->rts_cons.htd_max = v;
432         return 0;
433 }
434
435 #ifdef __cplusplus
436 }
437 #endif
438
439 #endif /* _RTE_RING_RTS_H_ */