ring: introduce RTS ring mode
[dpdk.git] / lib / librte_ring / rte_ring.h
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2010-2020 Intel Corporation
4  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
5  * All rights reserved.
6  * Derived from FreeBSD's bufring.h
7  * Used as BSD-3 Licensed with permission from Kip Macy.
8  */
9
10 #ifndef _RTE_RING_H_
11 #define _RTE_RING_H_
12
13 /**
14  * @file
15  * RTE Ring
16  *
17  * The Ring Manager is a fixed-size queue, implemented as a table of
18  * pointers. Head and tail pointers are modified atomically, allowing
19  * concurrent access to it. It has the following features:
20  *
21  * - FIFO (First In First Out)
22  * - Maximum size is fixed; the pointers are stored in a table.
23  * - Lockless implementation.
24  * - Multi- or single-consumer dequeue.
25  * - Multi- or single-producer enqueue.
26  * - Bulk dequeue.
27  * - Bulk enqueue.
28  *
29  * Note: the ring implementation is not preemptible. Refer to Programmer's
30  * guide/Environment Abstraction Layer/Multiple pthread/Known Issues/rte_ring
31  * for more information.
32  *
33  */
34
35 #ifdef __cplusplus
36 extern "C" {
37 #endif
38
39 #include <rte_ring_core.h>
40
41 /**
42  * Calculate the memory size needed for a ring
43  *
44  * This function returns the number of bytes needed for a ring, given
45  * the number of elements in it. This value is the sum of the size of
46  * the structure rte_ring and the size of the memory needed by the
47  * objects pointers. The value is aligned to a cache line size.
48  *
49  * @param count
50  *   The number of elements in the ring (must be a power of 2).
51  * @return
52  *   - The memory size needed for the ring on success.
53  *   - -EINVAL if count is not a power of 2.
54  */
55 ssize_t rte_ring_get_memsize(unsigned count);
56
57 /**
58  * Initialize a ring structure.
59  *
60  * Initialize a ring structure in memory pointed by "r". The size of the
61  * memory area must be large enough to store the ring structure and the
62  * object table. It is advised to use rte_ring_get_memsize() to get the
63  * appropriate size.
64  *
65  * The ring size is set to *count*, which must be a power of two. Water
66  * marking is disabled by default. The real usable ring size is
67  * *count-1* instead of *count* to differentiate a free ring from an
68  * empty ring.
69  *
70  * The ring is not added in RTE_TAILQ_RING global list. Indeed, the
71  * memory given by the caller may not be shareable among dpdk
72  * processes.
73  *
74  * @param r
75  *   The pointer to the ring structure followed by the objects table.
76  * @param name
77  *   The name of the ring.
78  * @param count
79  *   The number of elements in the ring (must be a power of 2).
80  * @param flags
81  *   An OR of the following:
82  *   - One of mutually exclusive flags that define producer behavior:
83  *      - RING_F_SP_ENQ: If this flag is set, the default behavior when
84  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
85  *        is "single-producer".
86  *      - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when
87  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
88  *        is "multi-producer RTS mode".
89  *     If none of these flags is set, then default "multi-producer"
90  *     behavior is selected.
91  *   - One of mutually exclusive flags that define consumer behavior:
92  *      - RING_F_SC_DEQ: If this flag is set, the default behavior when
93  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
94  *        is "single-consumer". Otherwise, it is "multi-consumers".
95  *      - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when
96  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
97  *        is "multi-consumer RTS mode".
98  *     If none of these flags is set, then default "multi-consumer"
99  *     behavior is selected.
100  * @return
101  *   0 on success, or a negative value on error.
102  */
103 int rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
104         unsigned flags);
105
106 /**
107  * Create a new ring named *name* in memory.
108  *
109  * This function uses ``memzone_reserve()`` to allocate memory. Then it
110  * calls rte_ring_init() to initialize an empty ring.
111  *
112  * The new ring size is set to *count*, which must be a power of
113  * two. Water marking is disabled by default. The real usable ring size
114  * is *count-1* instead of *count* to differentiate a free ring from an
115  * empty ring.
116  *
117  * The ring is added in RTE_TAILQ_RING list.
118  *
119  * @param name
120  *   The name of the ring.
121  * @param count
122  *   The size of the ring (must be a power of 2).
123  * @param socket_id
124  *   The *socket_id* argument is the socket identifier in case of
125  *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
126  *   constraint for the reserved zone.
127  * @param flags
128  *   An OR of the following:
129  *   - One of mutually exclusive flags that define producer behavior:
130  *      - RING_F_SP_ENQ: If this flag is set, the default behavior when
131  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
132  *        is "single-producer".
133  *      - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when
134  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
135  *        is "multi-producer RTS mode".
136  *     If none of these flags is set, then default "multi-producer"
137  *     behavior is selected.
138  *   - One of mutually exclusive flags that define consumer behavior:
139  *      - RING_F_SC_DEQ: If this flag is set, the default behavior when
140  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
141  *        is "single-consumer". Otherwise, it is "multi-consumers".
142  *      - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when
143  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
144  *        is "multi-consumer RTS mode".
145  *     If none of these flags is set, then default "multi-consumer"
146  *     behavior is selected.
147  * @return
148  *   On success, the pointer to the new allocated ring. NULL on error with
149  *    rte_errno set appropriately. Possible errno values include:
150  *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
151  *    - E_RTE_SECONDARY - function was called from a secondary process instance
152  *    - EINVAL - count provided is not a power of 2
153  *    - ENOSPC - the maximum number of memzones has already been allocated
154  *    - EEXIST - a memzone with the same name already exists
155  *    - ENOMEM - no appropriate memory area found in which to create memzone
156  */
157 struct rte_ring *rte_ring_create(const char *name, unsigned count,
158                                  int socket_id, unsigned flags);
159
160 /**
161  * De-allocate all memory used by the ring.
162  *
163  * @param r
164  *   Ring to free
165  */
166 void rte_ring_free(struct rte_ring *r);
167
168 /**
169  * Dump the status of the ring to a file.
170  *
171  * @param f
172  *   A pointer to a file for output
173  * @param r
174  *   A pointer to the ring structure.
175  */
176 void rte_ring_dump(FILE *f, const struct rte_ring *r);
177
178 /* the actual enqueue of pointers on the ring.
179  * Placed here since identical code needed in both
180  * single and multi producer enqueue functions */
181 #define ENQUEUE_PTRS(r, ring_start, prod_head, obj_table, n, obj_type) do { \
182         unsigned int i; \
183         const uint32_t size = (r)->size; \
184         uint32_t idx = prod_head & (r)->mask; \
185         obj_type *ring = (obj_type *)ring_start; \
186         if (likely(idx + n < size)) { \
187                 for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
188                         ring[idx] = obj_table[i]; \
189                         ring[idx+1] = obj_table[i+1]; \
190                         ring[idx+2] = obj_table[i+2]; \
191                         ring[idx+3] = obj_table[i+3]; \
192                 } \
193                 switch (n & 0x3) { \
194                 case 3: \
195                         ring[idx++] = obj_table[i++]; /* fallthrough */ \
196                 case 2: \
197                         ring[idx++] = obj_table[i++]; /* fallthrough */ \
198                 case 1: \
199                         ring[idx++] = obj_table[i++]; \
200                 } \
201         } else { \
202                 for (i = 0; idx < size; i++, idx++)\
203                         ring[idx] = obj_table[i]; \
204                 for (idx = 0; i < n; i++, idx++) \
205                         ring[idx] = obj_table[i]; \
206         } \
207 } while (0)
208
209 /* the actual copy of pointers on the ring to obj_table.
210  * Placed here since identical code needed in both
211  * single and multi consumer dequeue functions */
212 #define DEQUEUE_PTRS(r, ring_start, cons_head, obj_table, n, obj_type) do { \
213         unsigned int i; \
214         uint32_t idx = cons_head & (r)->mask; \
215         const uint32_t size = (r)->size; \
216         obj_type *ring = (obj_type *)ring_start; \
217         if (likely(idx + n < size)) { \
218                 for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\
219                         obj_table[i] = ring[idx]; \
220                         obj_table[i+1] = ring[idx+1]; \
221                         obj_table[i+2] = ring[idx+2]; \
222                         obj_table[i+3] = ring[idx+3]; \
223                 } \
224                 switch (n & 0x3) { \
225                 case 3: \
226                         obj_table[i++] = ring[idx++]; /* fallthrough */ \
227                 case 2: \
228                         obj_table[i++] = ring[idx++]; /* fallthrough */ \
229                 case 1: \
230                         obj_table[i++] = ring[idx++]; \
231                 } \
232         } else { \
233                 for (i = 0; idx < size; i++, idx++) \
234                         obj_table[i] = ring[idx]; \
235                 for (idx = 0; i < n; i++, idx++) \
236                         obj_table[i] = ring[idx]; \
237         } \
238 } while (0)
239
240 /* Between load and load. there might be cpu reorder in weak model
241  * (powerpc/arm).
242  * There are 2 choices for the users
243  * 1.use rmb() memory barrier
244  * 2.use one-direction load_acquire/store_release barrier,defined by
245  * CONFIG_RTE_USE_C11_MEM_MODEL=y
246  * It depends on performance test results.
247  * By default, move common functions to rte_ring_generic.h
248  */
249 #ifdef RTE_USE_C11_MEM_MODEL
250 #include "rte_ring_c11_mem.h"
251 #else
252 #include "rte_ring_generic.h"
253 #endif
254
255 /**
256  * @internal Enqueue several objects on the ring
257  *
258   * @param r
259  *   A pointer to the ring structure.
260  * @param obj_table
261  *   A pointer to a table of void * pointers (objects).
262  * @param n
263  *   The number of objects to add in the ring from the obj_table.
264  * @param behavior
265  *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
266  *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
267  * @param is_sp
268  *   Indicates whether to use single producer or multi-producer head update
269  * @param free_space
270  *   returns the amount of space after the enqueue operation has finished
271  * @return
272  *   Actual number of objects enqueued.
273  *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
274  */
275 static __rte_always_inline unsigned int
276 __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
277                  unsigned int n, enum rte_ring_queue_behavior behavior,
278                  unsigned int is_sp, unsigned int *free_space)
279 {
280         uint32_t prod_head, prod_next;
281         uint32_t free_entries;
282
283         n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
284                         &prod_head, &prod_next, &free_entries);
285         if (n == 0)
286                 goto end;
287
288         ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
289
290         update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
291 end:
292         if (free_space != NULL)
293                 *free_space = free_entries - n;
294         return n;
295 }
296
297 /**
298  * @internal Dequeue several objects from the ring
299  *
300  * @param r
301  *   A pointer to the ring structure.
302  * @param obj_table
303  *   A pointer to a table of void * pointers (objects).
304  * @param n
305  *   The number of objects to pull from the ring.
306  * @param behavior
307  *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
308  *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
309  * @param is_sc
310  *   Indicates whether to use single consumer or multi-consumer head update
311  * @param available
312  *   returns the number of remaining ring entries after the dequeue has finished
313  * @return
314  *   - Actual number of objects dequeued.
315  *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
316  */
317 static __rte_always_inline unsigned int
318 __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
319                  unsigned int n, enum rte_ring_queue_behavior behavior,
320                  unsigned int is_sc, unsigned int *available)
321 {
322         uint32_t cons_head, cons_next;
323         uint32_t entries;
324
325         n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior,
326                         &cons_head, &cons_next, &entries);
327         if (n == 0)
328                 goto end;
329
330         DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
331
332         update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
333
334 end:
335         if (available != NULL)
336                 *available = entries - n;
337         return n;
338 }
339
340 /**
341  * Enqueue several objects on the ring (multi-producers safe).
342  *
343  * This function uses a "compare and set" instruction to move the
344  * producer index atomically.
345  *
346  * @param r
347  *   A pointer to the ring structure.
348  * @param obj_table
349  *   A pointer to a table of void * pointers (objects).
350  * @param n
351  *   The number of objects to add in the ring from the obj_table.
352  * @param free_space
353  *   if non-NULL, returns the amount of space in the ring after the
354  *   enqueue operation has finished.
355  * @return
356  *   The number of objects enqueued, either 0 or n
357  */
358 static __rte_always_inline unsigned int
359 rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
360                          unsigned int n, unsigned int *free_space)
361 {
362         return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
363                         RTE_RING_SYNC_MT, free_space);
364 }
365
366 /**
367  * Enqueue several objects on a ring (NOT multi-producers safe).
368  *
369  * @param r
370  *   A pointer to the ring structure.
371  * @param obj_table
372  *   A pointer to a table of void * pointers (objects).
373  * @param n
374  *   The number of objects to add in the ring from the obj_table.
375  * @param free_space
376  *   if non-NULL, returns the amount of space in the ring after the
377  *   enqueue operation has finished.
378  * @return
379  *   The number of objects enqueued, either 0 or n
380  */
381 static __rte_always_inline unsigned int
382 rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
383                          unsigned int n, unsigned int *free_space)
384 {
385         return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
386                         RTE_RING_SYNC_ST, free_space);
387 }
388
389 #ifdef ALLOW_EXPERIMENTAL_API
390 #include <rte_ring_elem.h>
391 #endif
392
393 /**
394  * Enqueue several objects on a ring.
395  *
396  * This function calls the multi-producer or the single-producer
397  * version depending on the default behavior that was specified at
398  * ring creation time (see flags).
399  *
400  * @param r
401  *   A pointer to the ring structure.
402  * @param obj_table
403  *   A pointer to a table of void * pointers (objects).
404  * @param n
405  *   The number of objects to add in the ring from the obj_table.
406  * @param free_space
407  *   if non-NULL, returns the amount of space in the ring after the
408  *   enqueue operation has finished.
409  * @return
410  *   The number of objects enqueued, either 0 or n
411  */
412 static __rte_always_inline unsigned int
413 rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
414                       unsigned int n, unsigned int *free_space)
415 {
416         switch (r->prod.sync_type) {
417         case RTE_RING_SYNC_MT:
418                 return rte_ring_mp_enqueue_bulk(r, obj_table, n, free_space);
419         case RTE_RING_SYNC_ST:
420                 return rte_ring_sp_enqueue_bulk(r, obj_table, n, free_space);
421 #ifdef ALLOW_EXPERIMENTAL_API
422         case RTE_RING_SYNC_MT_RTS:
423                 return rte_ring_mp_rts_enqueue_bulk(r, obj_table, n,
424                         free_space);
425 #endif
426         }
427
428         /* valid ring should never reach this point */
429         RTE_ASSERT(0);
430         return 0;
431 }
432
433 /**
434  * Enqueue one object on a ring (multi-producers safe).
435  *
436  * This function uses a "compare and set" instruction to move the
437  * producer index atomically.
438  *
439  * @param r
440  *   A pointer to the ring structure.
441  * @param obj
442  *   A pointer to the object to be added.
443  * @return
444  *   - 0: Success; objects enqueued.
445  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
446  */
447 static __rte_always_inline int
448 rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
449 {
450         return rte_ring_mp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
451 }
452
453 /**
454  * Enqueue one object on a ring (NOT multi-producers safe).
455  *
456  * @param r
457  *   A pointer to the ring structure.
458  * @param obj
459  *   A pointer to the object to be added.
460  * @return
461  *   - 0: Success; objects enqueued.
462  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
463  */
464 static __rte_always_inline int
465 rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
466 {
467         return rte_ring_sp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
468 }
469
470 /**
471  * Enqueue one object on a ring.
472  *
473  * This function calls the multi-producer or the single-producer
474  * version, depending on the default behaviour that was specified at
475  * ring creation time (see flags).
476  *
477  * @param r
478  *   A pointer to the ring structure.
479  * @param obj
480  *   A pointer to the object to be added.
481  * @return
482  *   - 0: Success; objects enqueued.
483  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
484  */
485 static __rte_always_inline int
486 rte_ring_enqueue(struct rte_ring *r, void *obj)
487 {
488         return rte_ring_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
489 }
490
491 /**
492  * Dequeue several objects from a ring (multi-consumers safe).
493  *
494  * This function uses a "compare and set" instruction to move the
495  * consumer index atomically.
496  *
497  * @param r
498  *   A pointer to the ring structure.
499  * @param obj_table
500  *   A pointer to a table of void * pointers (objects) that will be filled.
501  * @param n
502  *   The number of objects to dequeue from the ring to the obj_table.
503  * @param available
504  *   If non-NULL, returns the number of remaining ring entries after the
505  *   dequeue has finished.
506  * @return
507  *   The number of objects dequeued, either 0 or n
508  */
509 static __rte_always_inline unsigned int
510 rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table,
511                 unsigned int n, unsigned int *available)
512 {
513         return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
514                         RTE_RING_SYNC_MT, available);
515 }
516
517 /**
518  * Dequeue several objects from a ring (NOT multi-consumers safe).
519  *
520  * @param r
521  *   A pointer to the ring structure.
522  * @param obj_table
523  *   A pointer to a table of void * pointers (objects) that will be filled.
524  * @param n
525  *   The number of objects to dequeue from the ring to the obj_table,
526  *   must be strictly positive.
527  * @param available
528  *   If non-NULL, returns the number of remaining ring entries after the
529  *   dequeue has finished.
530  * @return
531  *   The number of objects dequeued, either 0 or n
532  */
533 static __rte_always_inline unsigned int
534 rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table,
535                 unsigned int n, unsigned int *available)
536 {
537         return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
538                         RTE_RING_SYNC_ST, available);
539 }
540
541 /**
542  * Dequeue several objects from a ring.
543  *
544  * This function calls the multi-consumers or the single-consumer
545  * version, depending on the default behaviour that was specified at
546  * ring creation time (see flags).
547  *
548  * @param r
549  *   A pointer to the ring structure.
550  * @param obj_table
551  *   A pointer to a table of void * pointers (objects) that will be filled.
552  * @param n
553  *   The number of objects to dequeue from the ring to the obj_table.
554  * @param available
555  *   If non-NULL, returns the number of remaining ring entries after the
556  *   dequeue has finished.
557  * @return
558  *   The number of objects dequeued, either 0 or n
559  */
560 static __rte_always_inline unsigned int
561 rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,
562                 unsigned int *available)
563 {
564         switch (r->cons.sync_type) {
565         case RTE_RING_SYNC_MT:
566                 return rte_ring_mc_dequeue_bulk(r, obj_table, n, available);
567         case RTE_RING_SYNC_ST:
568                 return rte_ring_sc_dequeue_bulk(r, obj_table, n, available);
569 #ifdef ALLOW_EXPERIMENTAL_API
570         case RTE_RING_SYNC_MT_RTS:
571                 return rte_ring_mc_rts_dequeue_bulk(r, obj_table, n, available);
572 #endif
573         }
574
575         /* valid ring should never reach this point */
576         RTE_ASSERT(0);
577         return 0;
578 }
579
580 /**
581  * Dequeue one object from a ring (multi-consumers safe).
582  *
583  * This function uses a "compare and set" instruction to move the
584  * consumer index atomically.
585  *
586  * @param r
587  *   A pointer to the ring structure.
588  * @param obj_p
589  *   A pointer to a void * pointer (object) that will be filled.
590  * @return
591  *   - 0: Success; objects dequeued.
592  *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
593  *     dequeued.
594  */
595 static __rte_always_inline int
596 rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
597 {
598         return rte_ring_mc_dequeue_bulk(r, obj_p, 1, NULL)  ? 0 : -ENOENT;
599 }
600
601 /**
602  * Dequeue one object from a ring (NOT multi-consumers safe).
603  *
604  * @param r
605  *   A pointer to the ring structure.
606  * @param obj_p
607  *   A pointer to a void * pointer (object) that will be filled.
608  * @return
609  *   - 0: Success; objects dequeued.
610  *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
611  *     dequeued.
612  */
613 static __rte_always_inline int
614 rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
615 {
616         return rte_ring_sc_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT;
617 }
618
619 /**
620  * Dequeue one object from a ring.
621  *
622  * This function calls the multi-consumers or the single-consumer
623  * version depending on the default behaviour that was specified at
624  * ring creation time (see flags).
625  *
626  * @param r
627  *   A pointer to the ring structure.
628  * @param obj_p
629  *   A pointer to a void * pointer (object) that will be filled.
630  * @return
631  *   - 0: Success, objects dequeued.
632  *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
633  *     dequeued.
634  */
635 static __rte_always_inline int
636 rte_ring_dequeue(struct rte_ring *r, void **obj_p)
637 {
638         return rte_ring_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT;
639 }
640
641 /**
642  * Flush a ring.
643  *
644  * This function flush all the elements in a ring
645  *
646  * @b EXPERIMENTAL: this API may change without prior notice
647  *
648  * @warning
649  * Make sure the ring is not in use while calling this function.
650  *
651  * @param r
652  *   A pointer to the ring structure.
653  */
654 __rte_experimental
655 void
656 rte_ring_reset(struct rte_ring *r);
657
658 /**
659  * Return the number of entries in a ring.
660  *
661  * @param r
662  *   A pointer to the ring structure.
663  * @return
664  *   The number of entries in the ring.
665  */
666 static inline unsigned
667 rte_ring_count(const struct rte_ring *r)
668 {
669         uint32_t prod_tail = r->prod.tail;
670         uint32_t cons_tail = r->cons.tail;
671         uint32_t count = (prod_tail - cons_tail) & r->mask;
672         return (count > r->capacity) ? r->capacity : count;
673 }
674
675 /**
676  * Return the number of free entries in a ring.
677  *
678  * @param r
679  *   A pointer to the ring structure.
680  * @return
681  *   The number of free entries in the ring.
682  */
683 static inline unsigned
684 rte_ring_free_count(const struct rte_ring *r)
685 {
686         return r->capacity - rte_ring_count(r);
687 }
688
689 /**
690  * Test if a ring is full.
691  *
692  * @param r
693  *   A pointer to the ring structure.
694  * @return
695  *   - 1: The ring is full.
696  *   - 0: The ring is not full.
697  */
698 static inline int
699 rte_ring_full(const struct rte_ring *r)
700 {
701         return rte_ring_free_count(r) == 0;
702 }
703
704 /**
705  * Test if a ring is empty.
706  *
707  * @param r
708  *   A pointer to the ring structure.
709  * @return
710  *   - 1: The ring is empty.
711  *   - 0: The ring is not empty.
712  */
713 static inline int
714 rte_ring_empty(const struct rte_ring *r)
715 {
716         return rte_ring_count(r) == 0;
717 }
718
719 /**
720  * Return the size of the ring.
721  *
722  * @param r
723  *   A pointer to the ring structure.
724  * @return
725  *   The size of the data store used by the ring.
726  *   NOTE: this is not the same as the usable space in the ring. To query that
727  *   use ``rte_ring_get_capacity()``.
728  */
729 static inline unsigned int
730 rte_ring_get_size(const struct rte_ring *r)
731 {
732         return r->size;
733 }
734
735 /**
736  * Return the number of elements which can be stored in the ring.
737  *
738  * @param r
739  *   A pointer to the ring structure.
740  * @return
741  *   The usable size of the ring.
742  */
743 static inline unsigned int
744 rte_ring_get_capacity(const struct rte_ring *r)
745 {
746         return r->capacity;
747 }
748
749 /**
750  * Return sync type used by producer in the ring.
751  *
752  * @param r
753  *   A pointer to the ring structure.
754  * @return
755  *   Producer sync type value.
756  */
757 static inline enum rte_ring_sync_type
758 rte_ring_get_prod_sync_type(const struct rte_ring *r)
759 {
760         return r->prod.sync_type;
761 }
762
763 /**
764  * Check is the ring for single producer.
765  *
766  * @param r
767  *   A pointer to the ring structure.
768  * @return
769  *   true if ring is SP, zero otherwise.
770  */
771 static inline int
772 rte_ring_is_prod_single(const struct rte_ring *r)
773 {
774         return (rte_ring_get_prod_sync_type(r) == RTE_RING_SYNC_ST);
775 }
776
777 /**
778  * Return sync type used by consumer in the ring.
779  *
780  * @param r
781  *   A pointer to the ring structure.
782  * @return
783  *   Consumer sync type value.
784  */
785 static inline enum rte_ring_sync_type
786 rte_ring_get_cons_sync_type(const struct rte_ring *r)
787 {
788         return r->cons.sync_type;
789 }
790
791 /**
792  * Check is the ring for single consumer.
793  *
794  * @param r
795  *   A pointer to the ring structure.
796  * @return
797  *   true if ring is SC, zero otherwise.
798  */
799 static inline int
800 rte_ring_is_cons_single(const struct rte_ring *r)
801 {
802         return (rte_ring_get_cons_sync_type(r) == RTE_RING_SYNC_ST);
803 }
804
805 /**
806  * Dump the status of all rings on the console
807  *
808  * @param f
809  *   A pointer to a file for output
810  */
811 void rte_ring_list_dump(FILE *f);
812
813 /**
814  * Search a ring from its name
815  *
816  * @param name
817  *   The name of the ring.
818  * @return
819  *   The pointer to the ring matching the name, or NULL if not found,
820  *   with rte_errno set appropriately. Possible rte_errno values include:
821  *    - ENOENT - required entry not available to return.
822  */
823 struct rte_ring *rte_ring_lookup(const char *name);
824
825 /**
826  * Enqueue several objects on the ring (multi-producers safe).
827  *
828  * This function uses a "compare and set" instruction to move the
829  * producer index atomically.
830  *
831  * @param r
832  *   A pointer to the ring structure.
833  * @param obj_table
834  *   A pointer to a table of void * pointers (objects).
835  * @param n
836  *   The number of objects to add in the ring from the obj_table.
837  * @param free_space
838  *   if non-NULL, returns the amount of space in the ring after the
839  *   enqueue operation has finished.
840  * @return
841  *   - n: Actual number of objects enqueued.
842  */
843 static __rte_always_inline unsigned
844 rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
845                          unsigned int n, unsigned int *free_space)
846 {
847         return __rte_ring_do_enqueue(r, obj_table, n,
848                         RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, free_space);
849 }
850
851 /**
852  * Enqueue several objects on a ring (NOT multi-producers safe).
853  *
854  * @param r
855  *   A pointer to the ring structure.
856  * @param obj_table
857  *   A pointer to a table of void * pointers (objects).
858  * @param n
859  *   The number of objects to add in the ring from the obj_table.
860  * @param free_space
861  *   if non-NULL, returns the amount of space in the ring after the
862  *   enqueue operation has finished.
863  * @return
864  *   - n: Actual number of objects enqueued.
865  */
866 static __rte_always_inline unsigned
867 rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
868                          unsigned int n, unsigned int *free_space)
869 {
870         return __rte_ring_do_enqueue(r, obj_table, n,
871                         RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, free_space);
872 }
873
874 /**
875  * Enqueue several objects on a ring.
876  *
877  * This function calls the multi-producer or the single-producer
878  * version depending on the default behavior that was specified at
879  * ring creation time (see flags).
880  *
881  * @param r
882  *   A pointer to the ring structure.
883  * @param obj_table
884  *   A pointer to a table of void * pointers (objects).
885  * @param n
886  *   The number of objects to add in the ring from the obj_table.
887  * @param free_space
888  *   if non-NULL, returns the amount of space in the ring after the
889  *   enqueue operation has finished.
890  * @return
891  *   - n: Actual number of objects enqueued.
892  */
893 static __rte_always_inline unsigned
894 rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
895                       unsigned int n, unsigned int *free_space)
896 {
897         switch (r->prod.sync_type) {
898         case RTE_RING_SYNC_MT:
899                 return rte_ring_mp_enqueue_burst(r, obj_table, n, free_space);
900         case RTE_RING_SYNC_ST:
901                 return rte_ring_sp_enqueue_burst(r, obj_table, n, free_space);
902 #ifdef ALLOW_EXPERIMENTAL_API
903         case RTE_RING_SYNC_MT_RTS:
904                 return rte_ring_mp_rts_enqueue_burst(r, obj_table, n,
905                         free_space);
906 #endif
907         }
908
909         /* valid ring should never reach this point */
910         RTE_ASSERT(0);
911         return 0;
912 }
913
914 /**
915  * Dequeue several objects from a ring (multi-consumers safe). When the request
916  * objects are more than the available objects, only dequeue the actual number
917  * of objects
918  *
919  * This function uses a "compare and set" instruction to move the
920  * consumer index atomically.
921  *
922  * @param r
923  *   A pointer to the ring structure.
924  * @param obj_table
925  *   A pointer to a table of void * pointers (objects) that will be filled.
926  * @param n
927  *   The number of objects to dequeue from the ring to the obj_table.
928  * @param available
929  *   If non-NULL, returns the number of remaining ring entries after the
930  *   dequeue has finished.
931  * @return
932  *   - n: Actual number of objects dequeued, 0 if ring is empty
933  */
934 static __rte_always_inline unsigned
935 rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
936                 unsigned int n, unsigned int *available)
937 {
938         return __rte_ring_do_dequeue(r, obj_table, n,
939                         RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, available);
940 }
941
942 /**
943  * Dequeue several objects from a ring (NOT multi-consumers safe).When the
944  * request objects are more than the available objects, only dequeue the
945  * actual number of objects
946  *
947  * @param r
948  *   A pointer to the ring structure.
949  * @param obj_table
950  *   A pointer to a table of void * pointers (objects) that will be filled.
951  * @param n
952  *   The number of objects to dequeue from the ring to the obj_table.
953  * @param available
954  *   If non-NULL, returns the number of remaining ring entries after the
955  *   dequeue has finished.
956  * @return
957  *   - n: Actual number of objects dequeued, 0 if ring is empty
958  */
959 static __rte_always_inline unsigned
960 rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
961                 unsigned int n, unsigned int *available)
962 {
963         return __rte_ring_do_dequeue(r, obj_table, n,
964                         RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, available);
965 }
966
967 /**
968  * Dequeue multiple objects from a ring up to a maximum number.
969  *
970  * This function calls the multi-consumers or the single-consumer
971  * version, depending on the default behaviour that was specified at
972  * ring creation time (see flags).
973  *
974  * @param r
975  *   A pointer to the ring structure.
976  * @param obj_table
977  *   A pointer to a table of void * pointers (objects) that will be filled.
978  * @param n
979  *   The number of objects to dequeue from the ring to the obj_table.
980  * @param available
981  *   If non-NULL, returns the number of remaining ring entries after the
982  *   dequeue has finished.
983  * @return
984  *   - Number of objects dequeued
985  */
986 static __rte_always_inline unsigned
987 rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
988                 unsigned int n, unsigned int *available)
989 {
990         switch (r->cons.sync_type) {
991         case RTE_RING_SYNC_MT:
992                 return rte_ring_mc_dequeue_burst(r, obj_table, n, available);
993         case RTE_RING_SYNC_ST:
994                 return rte_ring_sc_dequeue_burst(r, obj_table, n, available);
995 #ifdef ALLOW_EXPERIMENTAL_API
996         case RTE_RING_SYNC_MT_RTS:
997                 return rte_ring_mc_rts_dequeue_burst(r, obj_table, n,
998                         available);
999 #endif
1000         }
1001
1002         /* valid ring should never reach this point */
1003         RTE_ASSERT(0);
1004         return 0;
1005 }
1006
1007 #ifdef __cplusplus
1008 }
1009 #endif
1010
1011 #endif /* _RTE_RING_H_ */