7cf0465288dd76a61aaef9a4bf8e7a45ff888941
[dpdk.git] / lib / librte_ring / rte_ring.h
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2010-2020 Intel Corporation
4  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
5  * All rights reserved.
6  * Derived from FreeBSD's bufring.h
7  * Used as BSD-3 Licensed with permission from Kip Macy.
8  */
9
10 #ifndef _RTE_RING_H_
11 #define _RTE_RING_H_
12
13 /**
14  * @file
15  * RTE Ring
16  *
17  * The Ring Manager is a fixed-size queue, implemented as a table of
18  * pointers. Head and tail pointers are modified atomically, allowing
19  * concurrent access to it. It has the following features:
20  *
21  * - FIFO (First In First Out)
22  * - Maximum size is fixed; the pointers are stored in a table.
23  * - Lockless implementation.
24  * - Multi- or single-consumer dequeue.
25  * - Multi- or single-producer enqueue.
26  * - Bulk dequeue.
27  * - Bulk enqueue.
28  *
29  * Note: the ring implementation is not preemptible. Refer to Programmer's
30  * guide/Environment Abstraction Layer/Multiple pthread/Known Issues/rte_ring
31  * for more information.
32  *
33  */
34
35 #ifdef __cplusplus
36 extern "C" {
37 #endif
38
39 #include <rte_ring_core.h>
40
41 /**
42  * Calculate the memory size needed for a ring
43  *
44  * This function returns the number of bytes needed for a ring, given
45  * the number of elements in it. This value is the sum of the size of
46  * the structure rte_ring and the size of the memory needed by the
47  * objects pointers. The value is aligned to a cache line size.
48  *
49  * @param count
50  *   The number of elements in the ring (must be a power of 2).
51  * @return
52  *   - The memory size needed for the ring on success.
53  *   - -EINVAL if count is not a power of 2.
54  */
55 ssize_t rte_ring_get_memsize(unsigned count);
56
57 /**
58  * Initialize a ring structure.
59  *
60  * Initialize a ring structure in memory pointed by "r". The size of the
61  * memory area must be large enough to store the ring structure and the
62  * object table. It is advised to use rte_ring_get_memsize() to get the
63  * appropriate size.
64  *
65  * The ring size is set to *count*, which must be a power of two. Water
66  * marking is disabled by default. The real usable ring size is
67  * *count-1* instead of *count* to differentiate a free ring from an
68  * empty ring.
69  *
70  * The ring is not added in RTE_TAILQ_RING global list. Indeed, the
71  * memory given by the caller may not be shareable among dpdk
72  * processes.
73  *
74  * @param r
75  *   The pointer to the ring structure followed by the objects table.
76  * @param name
77  *   The name of the ring.
78  * @param count
79  *   The number of elements in the ring (must be a power of 2).
80  * @param flags
81  *   An OR of the following:
82  *   - One of mutually exclusive flags that define producer behavior:
83  *      - RING_F_SP_ENQ: If this flag is set, the default behavior when
84  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
85  *        is "single-producer".
86  *      - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when
87  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
88  *        is "multi-producer RTS mode".
89  *      - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when
90  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
91  *        is "multi-producer HTS mode".
92  *     If none of these flags is set, then default "multi-producer"
93  *     behavior is selected.
94  *   - One of mutually exclusive flags that define consumer behavior:
95  *      - RING_F_SC_DEQ: If this flag is set, the default behavior when
96  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
97  *        is "single-consumer". Otherwise, it is "multi-consumers".
98  *      - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when
99  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
100  *        is "multi-consumer RTS mode".
101  *      - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when
102  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
103  *        is "multi-consumer HTS mode".
104  *     If none of these flags is set, then default "multi-consumer"
105  *     behavior is selected.
106  * @return
107  *   0 on success, or a negative value on error.
108  */
109 int rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
110         unsigned flags);
111
112 /**
113  * Create a new ring named *name* in memory.
114  *
115  * This function uses ``memzone_reserve()`` to allocate memory. Then it
116  * calls rte_ring_init() to initialize an empty ring.
117  *
118  * The new ring size is set to *count*, which must be a power of
119  * two. Water marking is disabled by default. The real usable ring size
120  * is *count-1* instead of *count* to differentiate a free ring from an
121  * empty ring.
122  *
123  * The ring is added in RTE_TAILQ_RING list.
124  *
125  * @param name
126  *   The name of the ring.
127  * @param count
128  *   The size of the ring (must be a power of 2).
129  * @param socket_id
130  *   The *socket_id* argument is the socket identifier in case of
131  *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
132  *   constraint for the reserved zone.
133  * @param flags
134  *   An OR of the following:
135  *   - One of mutually exclusive flags that define producer behavior:
136  *      - RING_F_SP_ENQ: If this flag is set, the default behavior when
137  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
138  *        is "single-producer".
139  *      - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when
140  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
141  *        is "multi-producer RTS mode".
142  *      - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when
143  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
144  *        is "multi-producer HTS mode".
145  *     If none of these flags is set, then default "multi-producer"
146  *     behavior is selected.
147  *   - One of mutually exclusive flags that define consumer behavior:
148  *      - RING_F_SC_DEQ: If this flag is set, the default behavior when
149  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
150  *        is "single-consumer". Otherwise, it is "multi-consumers".
151  *      - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when
152  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
153  *        is "multi-consumer RTS mode".
154  *      - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when
155  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
156  *        is "multi-consumer HTS mode".
157  *     If none of these flags is set, then default "multi-consumer"
158  *     behavior is selected.
159  * @return
160  *   On success, the pointer to the new allocated ring. NULL on error with
161  *    rte_errno set appropriately. Possible errno values include:
162  *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
163  *    - E_RTE_SECONDARY - function was called from a secondary process instance
164  *    - EINVAL - count provided is not a power of 2
165  *    - ENOSPC - the maximum number of memzones has already been allocated
166  *    - EEXIST - a memzone with the same name already exists
167  *    - ENOMEM - no appropriate memory area found in which to create memzone
168  */
169 struct rte_ring *rte_ring_create(const char *name, unsigned count,
170                                  int socket_id, unsigned flags);
171
172 /**
173  * De-allocate all memory used by the ring.
174  *
175  * @param r
176  *   Ring to free
177  */
178 void rte_ring_free(struct rte_ring *r);
179
180 /**
181  * Dump the status of the ring to a file.
182  *
183  * @param f
184  *   A pointer to a file for output
185  * @param r
186  *   A pointer to the ring structure.
187  */
188 void rte_ring_dump(FILE *f, const struct rte_ring *r);
189
190 /* the actual enqueue of pointers on the ring.
191  * Placed here since identical code needed in both
192  * single and multi producer enqueue functions */
193 #define ENQUEUE_PTRS(r, ring_start, prod_head, obj_table, n, obj_type) do { \
194         unsigned int i; \
195         const uint32_t size = (r)->size; \
196         uint32_t idx = prod_head & (r)->mask; \
197         obj_type *ring = (obj_type *)ring_start; \
198         if (likely(idx + n < size)) { \
199                 for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
200                         ring[idx] = obj_table[i]; \
201                         ring[idx+1] = obj_table[i+1]; \
202                         ring[idx+2] = obj_table[i+2]; \
203                         ring[idx+3] = obj_table[i+3]; \
204                 } \
205                 switch (n & 0x3) { \
206                 case 3: \
207                         ring[idx++] = obj_table[i++]; /* fallthrough */ \
208                 case 2: \
209                         ring[idx++] = obj_table[i++]; /* fallthrough */ \
210                 case 1: \
211                         ring[idx++] = obj_table[i++]; \
212                 } \
213         } else { \
214                 for (i = 0; idx < size; i++, idx++)\
215                         ring[idx] = obj_table[i]; \
216                 for (idx = 0; i < n; i++, idx++) \
217                         ring[idx] = obj_table[i]; \
218         } \
219 } while (0)
220
221 /* the actual copy of pointers on the ring to obj_table.
222  * Placed here since identical code needed in both
223  * single and multi consumer dequeue functions */
224 #define DEQUEUE_PTRS(r, ring_start, cons_head, obj_table, n, obj_type) do { \
225         unsigned int i; \
226         uint32_t idx = cons_head & (r)->mask; \
227         const uint32_t size = (r)->size; \
228         obj_type *ring = (obj_type *)ring_start; \
229         if (likely(idx + n < size)) { \
230                 for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\
231                         obj_table[i] = ring[idx]; \
232                         obj_table[i+1] = ring[idx+1]; \
233                         obj_table[i+2] = ring[idx+2]; \
234                         obj_table[i+3] = ring[idx+3]; \
235                 } \
236                 switch (n & 0x3) { \
237                 case 3: \
238                         obj_table[i++] = ring[idx++]; /* fallthrough */ \
239                 case 2: \
240                         obj_table[i++] = ring[idx++]; /* fallthrough */ \
241                 case 1: \
242                         obj_table[i++] = ring[idx++]; \
243                 } \
244         } else { \
245                 for (i = 0; idx < size; i++, idx++) \
246                         obj_table[i] = ring[idx]; \
247                 for (idx = 0; i < n; i++, idx++) \
248                         obj_table[i] = ring[idx]; \
249         } \
250 } while (0)
251
252 /* Between load and load. there might be cpu reorder in weak model
253  * (powerpc/arm).
254  * There are 2 choices for the users
255  * 1.use rmb() memory barrier
256  * 2.use one-direction load_acquire/store_release barrier,defined by
257  * CONFIG_RTE_USE_C11_MEM_MODEL=y
258  * It depends on performance test results.
259  * By default, move common functions to rte_ring_generic.h
260  */
261 #ifdef RTE_USE_C11_MEM_MODEL
262 #include "rte_ring_c11_mem.h"
263 #else
264 #include "rte_ring_generic.h"
265 #endif
266
267 /**
268  * @internal Enqueue several objects on the ring
269  *
270   * @param r
271  *   A pointer to the ring structure.
272  * @param obj_table
273  *   A pointer to a table of void * pointers (objects).
274  * @param n
275  *   The number of objects to add in the ring from the obj_table.
276  * @param behavior
277  *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
278  *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
279  * @param is_sp
280  *   Indicates whether to use single producer or multi-producer head update
281  * @param free_space
282  *   returns the amount of space after the enqueue operation has finished
283  * @return
284  *   Actual number of objects enqueued.
285  *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
286  */
287 static __rte_always_inline unsigned int
288 __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
289                  unsigned int n, enum rte_ring_queue_behavior behavior,
290                  unsigned int is_sp, unsigned int *free_space)
291 {
292         uint32_t prod_head, prod_next;
293         uint32_t free_entries;
294
295         n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
296                         &prod_head, &prod_next, &free_entries);
297         if (n == 0)
298                 goto end;
299
300         ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
301
302         update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
303 end:
304         if (free_space != NULL)
305                 *free_space = free_entries - n;
306         return n;
307 }
308
309 /**
310  * @internal Dequeue several objects from the ring
311  *
312  * @param r
313  *   A pointer to the ring structure.
314  * @param obj_table
315  *   A pointer to a table of void * pointers (objects).
316  * @param n
317  *   The number of objects to pull from the ring.
318  * @param behavior
319  *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
320  *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
321  * @param is_sc
322  *   Indicates whether to use single consumer or multi-consumer head update
323  * @param available
324  *   returns the number of remaining ring entries after the dequeue has finished
325  * @return
326  *   - Actual number of objects dequeued.
327  *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
328  */
329 static __rte_always_inline unsigned int
330 __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
331                  unsigned int n, enum rte_ring_queue_behavior behavior,
332                  unsigned int is_sc, unsigned int *available)
333 {
334         uint32_t cons_head, cons_next;
335         uint32_t entries;
336
337         n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior,
338                         &cons_head, &cons_next, &entries);
339         if (n == 0)
340                 goto end;
341
342         DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
343
344         update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
345
346 end:
347         if (available != NULL)
348                 *available = entries - n;
349         return n;
350 }
351
352 /**
353  * Enqueue several objects on the ring (multi-producers safe).
354  *
355  * This function uses a "compare and set" instruction to move the
356  * producer index atomically.
357  *
358  * @param r
359  *   A pointer to the ring structure.
360  * @param obj_table
361  *   A pointer to a table of void * pointers (objects).
362  * @param n
363  *   The number of objects to add in the ring from the obj_table.
364  * @param free_space
365  *   if non-NULL, returns the amount of space in the ring after the
366  *   enqueue operation has finished.
367  * @return
368  *   The number of objects enqueued, either 0 or n
369  */
370 static __rte_always_inline unsigned int
371 rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
372                          unsigned int n, unsigned int *free_space)
373 {
374         return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
375                         RTE_RING_SYNC_MT, free_space);
376 }
377
378 /**
379  * Enqueue several objects on a ring (NOT multi-producers safe).
380  *
381  * @param r
382  *   A pointer to the ring structure.
383  * @param obj_table
384  *   A pointer to a table of void * pointers (objects).
385  * @param n
386  *   The number of objects to add in the ring from the obj_table.
387  * @param free_space
388  *   if non-NULL, returns the amount of space in the ring after the
389  *   enqueue operation has finished.
390  * @return
391  *   The number of objects enqueued, either 0 or n
392  */
393 static __rte_always_inline unsigned int
394 rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
395                          unsigned int n, unsigned int *free_space)
396 {
397         return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
398                         RTE_RING_SYNC_ST, free_space);
399 }
400
401 #ifdef ALLOW_EXPERIMENTAL_API
402 #include <rte_ring_elem.h>
403 #endif
404
405 /**
406  * Enqueue several objects on a ring.
407  *
408  * This function calls the multi-producer or the single-producer
409  * version depending on the default behavior that was specified at
410  * ring creation time (see flags).
411  *
412  * @param r
413  *   A pointer to the ring structure.
414  * @param obj_table
415  *   A pointer to a table of void * pointers (objects).
416  * @param n
417  *   The number of objects to add in the ring from the obj_table.
418  * @param free_space
419  *   if non-NULL, returns the amount of space in the ring after the
420  *   enqueue operation has finished.
421  * @return
422  *   The number of objects enqueued, either 0 or n
423  */
424 static __rte_always_inline unsigned int
425 rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
426                       unsigned int n, unsigned int *free_space)
427 {
428         switch (r->prod.sync_type) {
429         case RTE_RING_SYNC_MT:
430                 return rte_ring_mp_enqueue_bulk(r, obj_table, n, free_space);
431         case RTE_RING_SYNC_ST:
432                 return rte_ring_sp_enqueue_bulk(r, obj_table, n, free_space);
433 #ifdef ALLOW_EXPERIMENTAL_API
434         case RTE_RING_SYNC_MT_RTS:
435                 return rte_ring_mp_rts_enqueue_bulk(r, obj_table, n,
436                         free_space);
437         case RTE_RING_SYNC_MT_HTS:
438                 return rte_ring_mp_hts_enqueue_bulk(r, obj_table, n,
439                         free_space);
440 #endif
441         }
442
443         /* valid ring should never reach this point */
444         RTE_ASSERT(0);
445         return 0;
446 }
447
448 /**
449  * Enqueue one object on a ring (multi-producers safe).
450  *
451  * This function uses a "compare and set" instruction to move the
452  * producer index atomically.
453  *
454  * @param r
455  *   A pointer to the ring structure.
456  * @param obj
457  *   A pointer to the object to be added.
458  * @return
459  *   - 0: Success; objects enqueued.
460  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
461  */
462 static __rte_always_inline int
463 rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
464 {
465         return rte_ring_mp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
466 }
467
468 /**
469  * Enqueue one object on a ring (NOT multi-producers safe).
470  *
471  * @param r
472  *   A pointer to the ring structure.
473  * @param obj
474  *   A pointer to the object to be added.
475  * @return
476  *   - 0: Success; objects enqueued.
477  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
478  */
479 static __rte_always_inline int
480 rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
481 {
482         return rte_ring_sp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
483 }
484
485 /**
486  * Enqueue one object on a ring.
487  *
488  * This function calls the multi-producer or the single-producer
489  * version, depending on the default behaviour that was specified at
490  * ring creation time (see flags).
491  *
492  * @param r
493  *   A pointer to the ring structure.
494  * @param obj
495  *   A pointer to the object to be added.
496  * @return
497  *   - 0: Success; objects enqueued.
498  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
499  */
500 static __rte_always_inline int
501 rte_ring_enqueue(struct rte_ring *r, void *obj)
502 {
503         return rte_ring_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
504 }
505
506 /**
507  * Dequeue several objects from a ring (multi-consumers safe).
508  *
509  * This function uses a "compare and set" instruction to move the
510  * consumer index atomically.
511  *
512  * @param r
513  *   A pointer to the ring structure.
514  * @param obj_table
515  *   A pointer to a table of void * pointers (objects) that will be filled.
516  * @param n
517  *   The number of objects to dequeue from the ring to the obj_table.
518  * @param available
519  *   If non-NULL, returns the number of remaining ring entries after the
520  *   dequeue has finished.
521  * @return
522  *   The number of objects dequeued, either 0 or n
523  */
524 static __rte_always_inline unsigned int
525 rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table,
526                 unsigned int n, unsigned int *available)
527 {
528         return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
529                         RTE_RING_SYNC_MT, available);
530 }
531
532 /**
533  * Dequeue several objects from a ring (NOT multi-consumers safe).
534  *
535  * @param r
536  *   A pointer to the ring structure.
537  * @param obj_table
538  *   A pointer to a table of void * pointers (objects) that will be filled.
539  * @param n
540  *   The number of objects to dequeue from the ring to the obj_table,
541  *   must be strictly positive.
542  * @param available
543  *   If non-NULL, returns the number of remaining ring entries after the
544  *   dequeue has finished.
545  * @return
546  *   The number of objects dequeued, either 0 or n
547  */
548 static __rte_always_inline unsigned int
549 rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table,
550                 unsigned int n, unsigned int *available)
551 {
552         return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
553                         RTE_RING_SYNC_ST, available);
554 }
555
556 /**
557  * Dequeue several objects from a ring.
558  *
559  * This function calls the multi-consumers or the single-consumer
560  * version, depending on the default behaviour that was specified at
561  * ring creation time (see flags).
562  *
563  * @param r
564  *   A pointer to the ring structure.
565  * @param obj_table
566  *   A pointer to a table of void * pointers (objects) that will be filled.
567  * @param n
568  *   The number of objects to dequeue from the ring to the obj_table.
569  * @param available
570  *   If non-NULL, returns the number of remaining ring entries after the
571  *   dequeue has finished.
572  * @return
573  *   The number of objects dequeued, either 0 or n
574  */
575 static __rte_always_inline unsigned int
576 rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,
577                 unsigned int *available)
578 {
579         switch (r->cons.sync_type) {
580         case RTE_RING_SYNC_MT:
581                 return rte_ring_mc_dequeue_bulk(r, obj_table, n, available);
582         case RTE_RING_SYNC_ST:
583                 return rte_ring_sc_dequeue_bulk(r, obj_table, n, available);
584 #ifdef ALLOW_EXPERIMENTAL_API
585         case RTE_RING_SYNC_MT_RTS:
586                 return rte_ring_mc_rts_dequeue_bulk(r, obj_table, n, available);
587         case RTE_RING_SYNC_MT_HTS:
588                 return rte_ring_mc_hts_dequeue_bulk(r, obj_table, n, available);
589 #endif
590         }
591
592         /* valid ring should never reach this point */
593         RTE_ASSERT(0);
594         return 0;
595 }
596
597 /**
598  * Dequeue one object from a ring (multi-consumers safe).
599  *
600  * This function uses a "compare and set" instruction to move the
601  * consumer index atomically.
602  *
603  * @param r
604  *   A pointer to the ring structure.
605  * @param obj_p
606  *   A pointer to a void * pointer (object) that will be filled.
607  * @return
608  *   - 0: Success; objects dequeued.
609  *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
610  *     dequeued.
611  */
612 static __rte_always_inline int
613 rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
614 {
615         return rte_ring_mc_dequeue_bulk(r, obj_p, 1, NULL)  ? 0 : -ENOENT;
616 }
617
618 /**
619  * Dequeue one object from a ring (NOT multi-consumers safe).
620  *
621  * @param r
622  *   A pointer to the ring structure.
623  * @param obj_p
624  *   A pointer to a void * pointer (object) that will be filled.
625  * @return
626  *   - 0: Success; objects dequeued.
627  *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
628  *     dequeued.
629  */
630 static __rte_always_inline int
631 rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
632 {
633         return rte_ring_sc_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT;
634 }
635
636 /**
637  * Dequeue one object from a ring.
638  *
639  * This function calls the multi-consumers or the single-consumer
640  * version depending on the default behaviour that was specified at
641  * ring creation time (see flags).
642  *
643  * @param r
644  *   A pointer to the ring structure.
645  * @param obj_p
646  *   A pointer to a void * pointer (object) that will be filled.
647  * @return
648  *   - 0: Success, objects dequeued.
649  *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
650  *     dequeued.
651  */
652 static __rte_always_inline int
653 rte_ring_dequeue(struct rte_ring *r, void **obj_p)
654 {
655         return rte_ring_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT;
656 }
657
658 /**
659  * Flush a ring.
660  *
661  * This function flush all the elements in a ring
662  *
663  * @b EXPERIMENTAL: this API may change without prior notice
664  *
665  * @warning
666  * Make sure the ring is not in use while calling this function.
667  *
668  * @param r
669  *   A pointer to the ring structure.
670  */
671 __rte_experimental
672 void
673 rte_ring_reset(struct rte_ring *r);
674
675 /**
676  * Return the number of entries in a ring.
677  *
678  * @param r
679  *   A pointer to the ring structure.
680  * @return
681  *   The number of entries in the ring.
682  */
683 static inline unsigned
684 rte_ring_count(const struct rte_ring *r)
685 {
686         uint32_t prod_tail = r->prod.tail;
687         uint32_t cons_tail = r->cons.tail;
688         uint32_t count = (prod_tail - cons_tail) & r->mask;
689         return (count > r->capacity) ? r->capacity : count;
690 }
691
692 /**
693  * Return the number of free entries in a ring.
694  *
695  * @param r
696  *   A pointer to the ring structure.
697  * @return
698  *   The number of free entries in the ring.
699  */
700 static inline unsigned
701 rte_ring_free_count(const struct rte_ring *r)
702 {
703         return r->capacity - rte_ring_count(r);
704 }
705
706 /**
707  * Test if a ring is full.
708  *
709  * @param r
710  *   A pointer to the ring structure.
711  * @return
712  *   - 1: The ring is full.
713  *   - 0: The ring is not full.
714  */
715 static inline int
716 rte_ring_full(const struct rte_ring *r)
717 {
718         return rte_ring_free_count(r) == 0;
719 }
720
721 /**
722  * Test if a ring is empty.
723  *
724  * @param r
725  *   A pointer to the ring structure.
726  * @return
727  *   - 1: The ring is empty.
728  *   - 0: The ring is not empty.
729  */
730 static inline int
731 rte_ring_empty(const struct rte_ring *r)
732 {
733         return rte_ring_count(r) == 0;
734 }
735
736 /**
737  * Return the size of the ring.
738  *
739  * @param r
740  *   A pointer to the ring structure.
741  * @return
742  *   The size of the data store used by the ring.
743  *   NOTE: this is not the same as the usable space in the ring. To query that
744  *   use ``rte_ring_get_capacity()``.
745  */
746 static inline unsigned int
747 rte_ring_get_size(const struct rte_ring *r)
748 {
749         return r->size;
750 }
751
752 /**
753  * Return the number of elements which can be stored in the ring.
754  *
755  * @param r
756  *   A pointer to the ring structure.
757  * @return
758  *   The usable size of the ring.
759  */
760 static inline unsigned int
761 rte_ring_get_capacity(const struct rte_ring *r)
762 {
763         return r->capacity;
764 }
765
766 /**
767  * Return sync type used by producer in the ring.
768  *
769  * @param r
770  *   A pointer to the ring structure.
771  * @return
772  *   Producer sync type value.
773  */
774 static inline enum rte_ring_sync_type
775 rte_ring_get_prod_sync_type(const struct rte_ring *r)
776 {
777         return r->prod.sync_type;
778 }
779
780 /**
781  * Check is the ring for single producer.
782  *
783  * @param r
784  *   A pointer to the ring structure.
785  * @return
786  *   true if ring is SP, zero otherwise.
787  */
788 static inline int
789 rte_ring_is_prod_single(const struct rte_ring *r)
790 {
791         return (rte_ring_get_prod_sync_type(r) == RTE_RING_SYNC_ST);
792 }
793
794 /**
795  * Return sync type used by consumer in the ring.
796  *
797  * @param r
798  *   A pointer to the ring structure.
799  * @return
800  *   Consumer sync type value.
801  */
802 static inline enum rte_ring_sync_type
803 rte_ring_get_cons_sync_type(const struct rte_ring *r)
804 {
805         return r->cons.sync_type;
806 }
807
808 /**
809  * Check is the ring for single consumer.
810  *
811  * @param r
812  *   A pointer to the ring structure.
813  * @return
814  *   true if ring is SC, zero otherwise.
815  */
816 static inline int
817 rte_ring_is_cons_single(const struct rte_ring *r)
818 {
819         return (rte_ring_get_cons_sync_type(r) == RTE_RING_SYNC_ST);
820 }
821
822 /**
823  * Dump the status of all rings on the console
824  *
825  * @param f
826  *   A pointer to a file for output
827  */
828 void rte_ring_list_dump(FILE *f);
829
830 /**
831  * Search a ring from its name
832  *
833  * @param name
834  *   The name of the ring.
835  * @return
836  *   The pointer to the ring matching the name, or NULL if not found,
837  *   with rte_errno set appropriately. Possible rte_errno values include:
838  *    - ENOENT - required entry not available to return.
839  */
840 struct rte_ring *rte_ring_lookup(const char *name);
841
842 /**
843  * Enqueue several objects on the ring (multi-producers safe).
844  *
845  * This function uses a "compare and set" instruction to move the
846  * producer index atomically.
847  *
848  * @param r
849  *   A pointer to the ring structure.
850  * @param obj_table
851  *   A pointer to a table of void * pointers (objects).
852  * @param n
853  *   The number of objects to add in the ring from the obj_table.
854  * @param free_space
855  *   if non-NULL, returns the amount of space in the ring after the
856  *   enqueue operation has finished.
857  * @return
858  *   - n: Actual number of objects enqueued.
859  */
860 static __rte_always_inline unsigned
861 rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
862                          unsigned int n, unsigned int *free_space)
863 {
864         return __rte_ring_do_enqueue(r, obj_table, n,
865                         RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, free_space);
866 }
867
868 /**
869  * Enqueue several objects on a ring (NOT multi-producers safe).
870  *
871  * @param r
872  *   A pointer to the ring structure.
873  * @param obj_table
874  *   A pointer to a table of void * pointers (objects).
875  * @param n
876  *   The number of objects to add in the ring from the obj_table.
877  * @param free_space
878  *   if non-NULL, returns the amount of space in the ring after the
879  *   enqueue operation has finished.
880  * @return
881  *   - n: Actual number of objects enqueued.
882  */
883 static __rte_always_inline unsigned
884 rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
885                          unsigned int n, unsigned int *free_space)
886 {
887         return __rte_ring_do_enqueue(r, obj_table, n,
888                         RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, free_space);
889 }
890
891 /**
892  * Enqueue several objects on a ring.
893  *
894  * This function calls the multi-producer or the single-producer
895  * version depending on the default behavior that was specified at
896  * ring creation time (see flags).
897  *
898  * @param r
899  *   A pointer to the ring structure.
900  * @param obj_table
901  *   A pointer to a table of void * pointers (objects).
902  * @param n
903  *   The number of objects to add in the ring from the obj_table.
904  * @param free_space
905  *   if non-NULL, returns the amount of space in the ring after the
906  *   enqueue operation has finished.
907  * @return
908  *   - n: Actual number of objects enqueued.
909  */
910 static __rte_always_inline unsigned
911 rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
912                       unsigned int n, unsigned int *free_space)
913 {
914         switch (r->prod.sync_type) {
915         case RTE_RING_SYNC_MT:
916                 return rte_ring_mp_enqueue_burst(r, obj_table, n, free_space);
917         case RTE_RING_SYNC_ST:
918                 return rte_ring_sp_enqueue_burst(r, obj_table, n, free_space);
919 #ifdef ALLOW_EXPERIMENTAL_API
920         case RTE_RING_SYNC_MT_RTS:
921                 return rte_ring_mp_rts_enqueue_burst(r, obj_table, n,
922                         free_space);
923         case RTE_RING_SYNC_MT_HTS:
924                 return rte_ring_mp_hts_enqueue_burst(r, obj_table, n,
925                         free_space);
926 #endif
927         }
928
929         /* valid ring should never reach this point */
930         RTE_ASSERT(0);
931         return 0;
932 }
933
934 /**
935  * Dequeue several objects from a ring (multi-consumers safe). When the request
936  * objects are more than the available objects, only dequeue the actual number
937  * of objects
938  *
939  * This function uses a "compare and set" instruction to move the
940  * consumer index atomically.
941  *
942  * @param r
943  *   A pointer to the ring structure.
944  * @param obj_table
945  *   A pointer to a table of void * pointers (objects) that will be filled.
946  * @param n
947  *   The number of objects to dequeue from the ring to the obj_table.
948  * @param available
949  *   If non-NULL, returns the number of remaining ring entries after the
950  *   dequeue has finished.
951  * @return
952  *   - n: Actual number of objects dequeued, 0 if ring is empty
953  */
954 static __rte_always_inline unsigned
955 rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
956                 unsigned int n, unsigned int *available)
957 {
958         return __rte_ring_do_dequeue(r, obj_table, n,
959                         RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, available);
960 }
961
962 /**
963  * Dequeue several objects from a ring (NOT multi-consumers safe).When the
964  * request objects are more than the available objects, only dequeue the
965  * actual number of objects
966  *
967  * @param r
968  *   A pointer to the ring structure.
969  * @param obj_table
970  *   A pointer to a table of void * pointers (objects) that will be filled.
971  * @param n
972  *   The number of objects to dequeue from the ring to the obj_table.
973  * @param available
974  *   If non-NULL, returns the number of remaining ring entries after the
975  *   dequeue has finished.
976  * @return
977  *   - n: Actual number of objects dequeued, 0 if ring is empty
978  */
979 static __rte_always_inline unsigned
980 rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
981                 unsigned int n, unsigned int *available)
982 {
983         return __rte_ring_do_dequeue(r, obj_table, n,
984                         RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, available);
985 }
986
987 /**
988  * Dequeue multiple objects from a ring up to a maximum number.
989  *
990  * This function calls the multi-consumers or the single-consumer
991  * version, depending on the default behaviour that was specified at
992  * ring creation time (see flags).
993  *
994  * @param r
995  *   A pointer to the ring structure.
996  * @param obj_table
997  *   A pointer to a table of void * pointers (objects) that will be filled.
998  * @param n
999  *   The number of objects to dequeue from the ring to the obj_table.
1000  * @param available
1001  *   If non-NULL, returns the number of remaining ring entries after the
1002  *   dequeue has finished.
1003  * @return
1004  *   - Number of objects dequeued
1005  */
1006 static __rte_always_inline unsigned
1007 rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
1008                 unsigned int n, unsigned int *available)
1009 {
1010         switch (r->cons.sync_type) {
1011         case RTE_RING_SYNC_MT:
1012                 return rte_ring_mc_dequeue_burst(r, obj_table, n, available);
1013         case RTE_RING_SYNC_ST:
1014                 return rte_ring_sc_dequeue_burst(r, obj_table, n, available);
1015 #ifdef ALLOW_EXPERIMENTAL_API
1016         case RTE_RING_SYNC_MT_RTS:
1017                 return rte_ring_mc_rts_dequeue_burst(r, obj_table, n,
1018                         available);
1019         case RTE_RING_SYNC_MT_HTS:
1020                 return rte_ring_mc_hts_dequeue_burst(r, obj_table, n,
1021                         available);
1022 #endif
1023         }
1024
1025         /* valid ring should never reach this point */
1026         RTE_ASSERT(0);
1027         return 0;
1028 }
1029
1030 #ifdef __cplusplus
1031 }
1032 #endif
1033
1034 #endif /* _RTE_RING_H_ */