net/ixgbe: fix statistics in flow control mode
[dpdk.git] / lib / librte_ring / rte_ring.h
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2010-2020 Intel Corporation
4  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
5  * All rights reserved.
6  * Derived from FreeBSD's bufring.h
7  * Used as BSD-3 Licensed with permission from Kip Macy.
8  */
9
10 #ifndef _RTE_RING_H_
11 #define _RTE_RING_H_
12
13 /**
14  * @file
15  * RTE Ring
16  *
17  * The Ring Manager is a fixed-size queue, implemented as a table of
18  * pointers. Head and tail pointers are modified atomically, allowing
19  * concurrent access to it. It has the following features:
20  *
21  * - FIFO (First In First Out)
22  * - Maximum size is fixed; the pointers are stored in a table.
23  * - Lockless implementation.
24  * - Multi- or single-consumer dequeue.
25  * - Multi- or single-producer enqueue.
26  * - Bulk dequeue.
27  * - Bulk enqueue.
28  * - Ability to select different sync modes for producer/consumer.
29  * - Dequeue start/finish (depending on consumer sync modes).
30  * - Enqueue start/finish (depending on producer sync mode).
31  *
32  * Note: the ring implementation is not preemptible. Refer to Programmer's
33  * guide/Environment Abstraction Layer/Multiple pthread/Known Issues/rte_ring
34  * for more information.
35  *
36  */
37
38 #ifdef __cplusplus
39 extern "C" {
40 #endif
41
42 #include <rte_ring_core.h>
43
44 /**
45  * Calculate the memory size needed for a ring
46  *
47  * This function returns the number of bytes needed for a ring, given
48  * the number of elements in it. This value is the sum of the size of
49  * the structure rte_ring and the size of the memory needed by the
50  * objects pointers. The value is aligned to a cache line size.
51  *
52  * @param count
53  *   The number of elements in the ring (must be a power of 2).
54  * @return
55  *   - The memory size needed for the ring on success.
56  *   - -EINVAL if count is not a power of 2.
57  */
58 ssize_t rte_ring_get_memsize(unsigned count);
59
60 /**
61  * Initialize a ring structure.
62  *
63  * Initialize a ring structure in memory pointed by "r". The size of the
64  * memory area must be large enough to store the ring structure and the
65  * object table. It is advised to use rte_ring_get_memsize() to get the
66  * appropriate size.
67  *
68  * The ring size is set to *count*, which must be a power of two. Water
69  * marking is disabled by default. The real usable ring size is
70  * *count-1* instead of *count* to differentiate a free ring from an
71  * empty ring.
72  *
73  * The ring is not added in RTE_TAILQ_RING global list. Indeed, the
74  * memory given by the caller may not be shareable among dpdk
75  * processes.
76  *
77  * @param r
78  *   The pointer to the ring structure followed by the objects table.
79  * @param name
80  *   The name of the ring.
81  * @param count
82  *   The number of elements in the ring (must be a power of 2).
83  * @param flags
84  *   An OR of the following:
85  *   - One of mutually exclusive flags that define producer behavior:
86  *      - RING_F_SP_ENQ: If this flag is set, the default behavior when
87  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
88  *        is "single-producer".
89  *      - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when
90  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
91  *        is "multi-producer RTS mode".
92  *      - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when
93  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
94  *        is "multi-producer HTS mode".
95  *     If none of these flags is set, then default "multi-producer"
96  *     behavior is selected.
97  *   - One of mutually exclusive flags that define consumer behavior:
98  *      - RING_F_SC_DEQ: If this flag is set, the default behavior when
99  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
100  *        is "single-consumer". Otherwise, it is "multi-consumers".
101  *      - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when
102  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
103  *        is "multi-consumer RTS mode".
104  *      - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when
105  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
106  *        is "multi-consumer HTS mode".
107  *     If none of these flags is set, then default "multi-consumer"
108  *     behavior is selected.
109  * @return
110  *   0 on success, or a negative value on error.
111  */
112 int rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
113         unsigned flags);
114
115 /**
116  * Create a new ring named *name* in memory.
117  *
118  * This function uses ``memzone_reserve()`` to allocate memory. Then it
119  * calls rte_ring_init() to initialize an empty ring.
120  *
121  * The new ring size is set to *count*, which must be a power of
122  * two. Water marking is disabled by default. The real usable ring size
123  * is *count-1* instead of *count* to differentiate a free ring from an
124  * empty ring.
125  *
126  * The ring is added in RTE_TAILQ_RING list.
127  *
128  * @param name
129  *   The name of the ring.
130  * @param count
131  *   The size of the ring (must be a power of 2).
132  * @param socket_id
133  *   The *socket_id* argument is the socket identifier in case of
134  *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
135  *   constraint for the reserved zone.
136  * @param flags
137  *   An OR of the following:
138  *   - One of mutually exclusive flags that define producer behavior:
139  *      - RING_F_SP_ENQ: If this flag is set, the default behavior when
140  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
141  *        is "single-producer".
142  *      - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when
143  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
144  *        is "multi-producer RTS mode".
145  *      - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when
146  *        using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
147  *        is "multi-producer HTS mode".
148  *     If none of these flags is set, then default "multi-producer"
149  *     behavior is selected.
150  *   - One of mutually exclusive flags that define consumer behavior:
151  *      - RING_F_SC_DEQ: If this flag is set, the default behavior when
152  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
153  *        is "single-consumer". Otherwise, it is "multi-consumers".
154  *      - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when
155  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
156  *        is "multi-consumer RTS mode".
157  *      - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when
158  *        using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
159  *        is "multi-consumer HTS mode".
160  *     If none of these flags is set, then default "multi-consumer"
161  *     behavior is selected.
162  * @return
163  *   On success, the pointer to the new allocated ring. NULL on error with
164  *    rte_errno set appropriately. Possible errno values include:
165  *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
166  *    - E_RTE_SECONDARY - function was called from a secondary process instance
167  *    - EINVAL - count provided is not a power of 2
168  *    - ENOSPC - the maximum number of memzones has already been allocated
169  *    - EEXIST - a memzone with the same name already exists
170  *    - ENOMEM - no appropriate memory area found in which to create memzone
171  */
172 struct rte_ring *rte_ring_create(const char *name, unsigned count,
173                                  int socket_id, unsigned flags);
174
175 /**
176  * De-allocate all memory used by the ring.
177  *
178  * @param r
179  *   Ring to free
180  */
181 void rte_ring_free(struct rte_ring *r);
182
183 /**
184  * Dump the status of the ring to a file.
185  *
186  * @param f
187  *   A pointer to a file for output
188  * @param r
189  *   A pointer to the ring structure.
190  */
191 void rte_ring_dump(FILE *f, const struct rte_ring *r);
192
193 /* the actual enqueue of pointers on the ring.
194  * Placed here since identical code needed in both
195  * single and multi producer enqueue functions */
196 #define ENQUEUE_PTRS(r, ring_start, prod_head, obj_table, n, obj_type) do { \
197         unsigned int i; \
198         const uint32_t size = (r)->size; \
199         uint32_t idx = prod_head & (r)->mask; \
200         obj_type *ring = (obj_type *)ring_start; \
201         if (likely(idx + n < size)) { \
202                 for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
203                         ring[idx] = obj_table[i]; \
204                         ring[idx+1] = obj_table[i+1]; \
205                         ring[idx+2] = obj_table[i+2]; \
206                         ring[idx+3] = obj_table[i+3]; \
207                 } \
208                 switch (n & 0x3) { \
209                 case 3: \
210                         ring[idx++] = obj_table[i++]; /* fallthrough */ \
211                 case 2: \
212                         ring[idx++] = obj_table[i++]; /* fallthrough */ \
213                 case 1: \
214                         ring[idx++] = obj_table[i++]; \
215                 } \
216         } else { \
217                 for (i = 0; idx < size; i++, idx++)\
218                         ring[idx] = obj_table[i]; \
219                 for (idx = 0; i < n; i++, idx++) \
220                         ring[idx] = obj_table[i]; \
221         } \
222 } while (0)
223
224 /* the actual copy of pointers on the ring to obj_table.
225  * Placed here since identical code needed in both
226  * single and multi consumer dequeue functions */
227 #define DEQUEUE_PTRS(r, ring_start, cons_head, obj_table, n, obj_type) do { \
228         unsigned int i; \
229         uint32_t idx = cons_head & (r)->mask; \
230         const uint32_t size = (r)->size; \
231         obj_type *ring = (obj_type *)ring_start; \
232         if (likely(idx + n < size)) { \
233                 for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\
234                         obj_table[i] = ring[idx]; \
235                         obj_table[i+1] = ring[idx+1]; \
236                         obj_table[i+2] = ring[idx+2]; \
237                         obj_table[i+3] = ring[idx+3]; \
238                 } \
239                 switch (n & 0x3) { \
240                 case 3: \
241                         obj_table[i++] = ring[idx++]; /* fallthrough */ \
242                 case 2: \
243                         obj_table[i++] = ring[idx++]; /* fallthrough */ \
244                 case 1: \
245                         obj_table[i++] = ring[idx++]; \
246                 } \
247         } else { \
248                 for (i = 0; idx < size; i++, idx++) \
249                         obj_table[i] = ring[idx]; \
250                 for (idx = 0; i < n; i++, idx++) \
251                         obj_table[i] = ring[idx]; \
252         } \
253 } while (0)
254
255 /* Between load and load. there might be cpu reorder in weak model
256  * (powerpc/arm).
257  * There are 2 choices for the users
258  * 1.use rmb() memory barrier
259  * 2.use one-direction load_acquire/store_release barrier,defined by
260  * CONFIG_RTE_USE_C11_MEM_MODEL=y
261  * It depends on performance test results.
262  * By default, move common functions to rte_ring_generic.h
263  */
264 #ifdef RTE_USE_C11_MEM_MODEL
265 #include "rte_ring_c11_mem.h"
266 #else
267 #include "rte_ring_generic.h"
268 #endif
269
270 /**
271  * @internal Enqueue several objects on the ring
272  *
273   * @param r
274  *   A pointer to the ring structure.
275  * @param obj_table
276  *   A pointer to a table of void * pointers (objects).
277  * @param n
278  *   The number of objects to add in the ring from the obj_table.
279  * @param behavior
280  *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
281  *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring
282  * @param is_sp
283  *   Indicates whether to use single producer or multi-producer head update
284  * @param free_space
285  *   returns the amount of space after the enqueue operation has finished
286  * @return
287  *   Actual number of objects enqueued.
288  *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
289  */
290 static __rte_always_inline unsigned int
291 __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
292                  unsigned int n, enum rte_ring_queue_behavior behavior,
293                  unsigned int is_sp, unsigned int *free_space)
294 {
295         uint32_t prod_head, prod_next;
296         uint32_t free_entries;
297
298         n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
299                         &prod_head, &prod_next, &free_entries);
300         if (n == 0)
301                 goto end;
302
303         ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
304
305         update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
306 end:
307         if (free_space != NULL)
308                 *free_space = free_entries - n;
309         return n;
310 }
311
312 /**
313  * @internal Dequeue several objects from the ring
314  *
315  * @param r
316  *   A pointer to the ring structure.
317  * @param obj_table
318  *   A pointer to a table of void * pointers (objects).
319  * @param n
320  *   The number of objects to pull from the ring.
321  * @param behavior
322  *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
323  *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring
324  * @param is_sc
325  *   Indicates whether to use single consumer or multi-consumer head update
326  * @param available
327  *   returns the number of remaining ring entries after the dequeue has finished
328  * @return
329  *   - Actual number of objects dequeued.
330  *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
331  */
332 static __rte_always_inline unsigned int
333 __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
334                  unsigned int n, enum rte_ring_queue_behavior behavior,
335                  unsigned int is_sc, unsigned int *available)
336 {
337         uint32_t cons_head, cons_next;
338         uint32_t entries;
339
340         n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior,
341                         &cons_head, &cons_next, &entries);
342         if (n == 0)
343                 goto end;
344
345         DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
346
347         update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
348
349 end:
350         if (available != NULL)
351                 *available = entries - n;
352         return n;
353 }
354
355 /**
356  * Enqueue several objects on the ring (multi-producers safe).
357  *
358  * This function uses a "compare and set" instruction to move the
359  * producer index atomically.
360  *
361  * @param r
362  *   A pointer to the ring structure.
363  * @param obj_table
364  *   A pointer to a table of void * pointers (objects).
365  * @param n
366  *   The number of objects to add in the ring from the obj_table.
367  * @param free_space
368  *   if non-NULL, returns the amount of space in the ring after the
369  *   enqueue operation has finished.
370  * @return
371  *   The number of objects enqueued, either 0 or n
372  */
373 static __rte_always_inline unsigned int
374 rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
375                          unsigned int n, unsigned int *free_space)
376 {
377         return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
378                         RTE_RING_SYNC_MT, free_space);
379 }
380
381 /**
382  * Enqueue several objects on a ring (NOT multi-producers safe).
383  *
384  * @param r
385  *   A pointer to the ring structure.
386  * @param obj_table
387  *   A pointer to a table of void * pointers (objects).
388  * @param n
389  *   The number of objects to add in the ring from the obj_table.
390  * @param free_space
391  *   if non-NULL, returns the amount of space in the ring after the
392  *   enqueue operation has finished.
393  * @return
394  *   The number of objects enqueued, either 0 or n
395  */
396 static __rte_always_inline unsigned int
397 rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
398                          unsigned int n, unsigned int *free_space)
399 {
400         return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
401                         RTE_RING_SYNC_ST, free_space);
402 }
403
404 #ifdef ALLOW_EXPERIMENTAL_API
405 #include <rte_ring_elem.h>
406 #endif
407
408 /**
409  * Enqueue several objects on a ring.
410  *
411  * This function calls the multi-producer or the single-producer
412  * version depending on the default behavior that was specified at
413  * ring creation time (see flags).
414  *
415  * @param r
416  *   A pointer to the ring structure.
417  * @param obj_table
418  *   A pointer to a table of void * pointers (objects).
419  * @param n
420  *   The number of objects to add in the ring from the obj_table.
421  * @param free_space
422  *   if non-NULL, returns the amount of space in the ring after the
423  *   enqueue operation has finished.
424  * @return
425  *   The number of objects enqueued, either 0 or n
426  */
427 static __rte_always_inline unsigned int
428 rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
429                       unsigned int n, unsigned int *free_space)
430 {
431         switch (r->prod.sync_type) {
432         case RTE_RING_SYNC_MT:
433                 return rte_ring_mp_enqueue_bulk(r, obj_table, n, free_space);
434         case RTE_RING_SYNC_ST:
435                 return rte_ring_sp_enqueue_bulk(r, obj_table, n, free_space);
436 #ifdef ALLOW_EXPERIMENTAL_API
437         case RTE_RING_SYNC_MT_RTS:
438                 return rte_ring_mp_rts_enqueue_bulk(r, obj_table, n,
439                         free_space);
440         case RTE_RING_SYNC_MT_HTS:
441                 return rte_ring_mp_hts_enqueue_bulk(r, obj_table, n,
442                         free_space);
443 #endif
444         }
445
446         /* valid ring should never reach this point */
447         RTE_ASSERT(0);
448         return 0;
449 }
450
451 /**
452  * Enqueue one object on a ring (multi-producers safe).
453  *
454  * This function uses a "compare and set" instruction to move the
455  * producer index atomically.
456  *
457  * @param r
458  *   A pointer to the ring structure.
459  * @param obj
460  *   A pointer to the object to be added.
461  * @return
462  *   - 0: Success; objects enqueued.
463  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
464  */
465 static __rte_always_inline int
466 rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
467 {
468         return rte_ring_mp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
469 }
470
471 /**
472  * Enqueue one object on a ring (NOT multi-producers safe).
473  *
474  * @param r
475  *   A pointer to the ring structure.
476  * @param obj
477  *   A pointer to the object to be added.
478  * @return
479  *   - 0: Success; objects enqueued.
480  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
481  */
482 static __rte_always_inline int
483 rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
484 {
485         return rte_ring_sp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
486 }
487
488 /**
489  * Enqueue one object on a ring.
490  *
491  * This function calls the multi-producer or the single-producer
492  * version, depending on the default behaviour that was specified at
493  * ring creation time (see flags).
494  *
495  * @param r
496  *   A pointer to the ring structure.
497  * @param obj
498  *   A pointer to the object to be added.
499  * @return
500  *   - 0: Success; objects enqueued.
501  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
502  */
503 static __rte_always_inline int
504 rte_ring_enqueue(struct rte_ring *r, void *obj)
505 {
506         return rte_ring_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
507 }
508
509 /**
510  * Dequeue several objects from a ring (multi-consumers safe).
511  *
512  * This function uses a "compare and set" instruction to move the
513  * consumer index atomically.
514  *
515  * @param r
516  *   A pointer to the ring structure.
517  * @param obj_table
518  *   A pointer to a table of void * pointers (objects) that will be filled.
519  * @param n
520  *   The number of objects to dequeue from the ring to the obj_table.
521  * @param available
522  *   If non-NULL, returns the number of remaining ring entries after the
523  *   dequeue has finished.
524  * @return
525  *   The number of objects dequeued, either 0 or n
526  */
527 static __rte_always_inline unsigned int
528 rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table,
529                 unsigned int n, unsigned int *available)
530 {
531         return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
532                         RTE_RING_SYNC_MT, available);
533 }
534
535 /**
536  * Dequeue several objects from a ring (NOT multi-consumers safe).
537  *
538  * @param r
539  *   A pointer to the ring structure.
540  * @param obj_table
541  *   A pointer to a table of void * pointers (objects) that will be filled.
542  * @param n
543  *   The number of objects to dequeue from the ring to the obj_table,
544  *   must be strictly positive.
545  * @param available
546  *   If non-NULL, returns the number of remaining ring entries after the
547  *   dequeue has finished.
548  * @return
549  *   The number of objects dequeued, either 0 or n
550  */
551 static __rte_always_inline unsigned int
552 rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table,
553                 unsigned int n, unsigned int *available)
554 {
555         return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
556                         RTE_RING_SYNC_ST, available);
557 }
558
559 /**
560  * Dequeue several objects from a ring.
561  *
562  * This function calls the multi-consumers or the single-consumer
563  * version, depending on the default behaviour that was specified at
564  * ring creation time (see flags).
565  *
566  * @param r
567  *   A pointer to the ring structure.
568  * @param obj_table
569  *   A pointer to a table of void * pointers (objects) that will be filled.
570  * @param n
571  *   The number of objects to dequeue from the ring to the obj_table.
572  * @param available
573  *   If non-NULL, returns the number of remaining ring entries after the
574  *   dequeue has finished.
575  * @return
576  *   The number of objects dequeued, either 0 or n
577  */
578 static __rte_always_inline unsigned int
579 rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,
580                 unsigned int *available)
581 {
582         switch (r->cons.sync_type) {
583         case RTE_RING_SYNC_MT:
584                 return rte_ring_mc_dequeue_bulk(r, obj_table, n, available);
585         case RTE_RING_SYNC_ST:
586                 return rte_ring_sc_dequeue_bulk(r, obj_table, n, available);
587 #ifdef ALLOW_EXPERIMENTAL_API
588         case RTE_RING_SYNC_MT_RTS:
589                 return rte_ring_mc_rts_dequeue_bulk(r, obj_table, n, available);
590         case RTE_RING_SYNC_MT_HTS:
591                 return rte_ring_mc_hts_dequeue_bulk(r, obj_table, n, available);
592 #endif
593         }
594
595         /* valid ring should never reach this point */
596         RTE_ASSERT(0);
597         return 0;
598 }
599
600 /**
601  * Dequeue one object from a ring (multi-consumers safe).
602  *
603  * This function uses a "compare and set" instruction to move the
604  * consumer index atomically.
605  *
606  * @param r
607  *   A pointer to the ring structure.
608  * @param obj_p
609  *   A pointer to a void * pointer (object) that will be filled.
610  * @return
611  *   - 0: Success; objects dequeued.
612  *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
613  *     dequeued.
614  */
615 static __rte_always_inline int
616 rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
617 {
618         return rte_ring_mc_dequeue_bulk(r, obj_p, 1, NULL)  ? 0 : -ENOENT;
619 }
620
621 /**
622  * Dequeue one object from a ring (NOT multi-consumers safe).
623  *
624  * @param r
625  *   A pointer to the ring structure.
626  * @param obj_p
627  *   A pointer to a void * pointer (object) that will be filled.
628  * @return
629  *   - 0: Success; objects dequeued.
630  *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
631  *     dequeued.
632  */
633 static __rte_always_inline int
634 rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
635 {
636         return rte_ring_sc_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT;
637 }
638
639 /**
640  * Dequeue one object from a ring.
641  *
642  * This function calls the multi-consumers or the single-consumer
643  * version depending on the default behaviour that was specified at
644  * ring creation time (see flags).
645  *
646  * @param r
647  *   A pointer to the ring structure.
648  * @param obj_p
649  *   A pointer to a void * pointer (object) that will be filled.
650  * @return
651  *   - 0: Success, objects dequeued.
652  *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
653  *     dequeued.
654  */
655 static __rte_always_inline int
656 rte_ring_dequeue(struct rte_ring *r, void **obj_p)
657 {
658         return rte_ring_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT;
659 }
660
661 /**
662  * Flush a ring.
663  *
664  * This function flush all the elements in a ring
665  *
666  * @b EXPERIMENTAL: this API may change without prior notice
667  *
668  * @warning
669  * Make sure the ring is not in use while calling this function.
670  *
671  * @param r
672  *   A pointer to the ring structure.
673  */
674 __rte_experimental
675 void
676 rte_ring_reset(struct rte_ring *r);
677
678 /**
679  * Return the number of entries in a ring.
680  *
681  * @param r
682  *   A pointer to the ring structure.
683  * @return
684  *   The number of entries in the ring.
685  */
686 static inline unsigned
687 rte_ring_count(const struct rte_ring *r)
688 {
689         uint32_t prod_tail = r->prod.tail;
690         uint32_t cons_tail = r->cons.tail;
691         uint32_t count = (prod_tail - cons_tail) & r->mask;
692         return (count > r->capacity) ? r->capacity : count;
693 }
694
695 /**
696  * Return the number of free entries in a ring.
697  *
698  * @param r
699  *   A pointer to the ring structure.
700  * @return
701  *   The number of free entries in the ring.
702  */
703 static inline unsigned
704 rte_ring_free_count(const struct rte_ring *r)
705 {
706         return r->capacity - rte_ring_count(r);
707 }
708
709 /**
710  * Test if a ring is full.
711  *
712  * @param r
713  *   A pointer to the ring structure.
714  * @return
715  *   - 1: The ring is full.
716  *   - 0: The ring is not full.
717  */
718 static inline int
719 rte_ring_full(const struct rte_ring *r)
720 {
721         return rte_ring_free_count(r) == 0;
722 }
723
724 /**
725  * Test if a ring is empty.
726  *
727  * @param r
728  *   A pointer to the ring structure.
729  * @return
730  *   - 1: The ring is empty.
731  *   - 0: The ring is not empty.
732  */
733 static inline int
734 rte_ring_empty(const struct rte_ring *r)
735 {
736         return rte_ring_count(r) == 0;
737 }
738
739 /**
740  * Return the size of the ring.
741  *
742  * @param r
743  *   A pointer to the ring structure.
744  * @return
745  *   The size of the data store used by the ring.
746  *   NOTE: this is not the same as the usable space in the ring. To query that
747  *   use ``rte_ring_get_capacity()``.
748  */
749 static inline unsigned int
750 rte_ring_get_size(const struct rte_ring *r)
751 {
752         return r->size;
753 }
754
755 /**
756  * Return the number of elements which can be stored in the ring.
757  *
758  * @param r
759  *   A pointer to the ring structure.
760  * @return
761  *   The usable size of the ring.
762  */
763 static inline unsigned int
764 rte_ring_get_capacity(const struct rte_ring *r)
765 {
766         return r->capacity;
767 }
768
769 /**
770  * Return sync type used by producer in the ring.
771  *
772  * @param r
773  *   A pointer to the ring structure.
774  * @return
775  *   Producer sync type value.
776  */
777 static inline enum rte_ring_sync_type
778 rte_ring_get_prod_sync_type(const struct rte_ring *r)
779 {
780         return r->prod.sync_type;
781 }
782
783 /**
784  * Check is the ring for single producer.
785  *
786  * @param r
787  *   A pointer to the ring structure.
788  * @return
789  *   true if ring is SP, zero otherwise.
790  */
791 static inline int
792 rte_ring_is_prod_single(const struct rte_ring *r)
793 {
794         return (rte_ring_get_prod_sync_type(r) == RTE_RING_SYNC_ST);
795 }
796
797 /**
798  * Return sync type used by consumer in the ring.
799  *
800  * @param r
801  *   A pointer to the ring structure.
802  * @return
803  *   Consumer sync type value.
804  */
805 static inline enum rte_ring_sync_type
806 rte_ring_get_cons_sync_type(const struct rte_ring *r)
807 {
808         return r->cons.sync_type;
809 }
810
811 /**
812  * Check is the ring for single consumer.
813  *
814  * @param r
815  *   A pointer to the ring structure.
816  * @return
817  *   true if ring is SC, zero otherwise.
818  */
819 static inline int
820 rte_ring_is_cons_single(const struct rte_ring *r)
821 {
822         return (rte_ring_get_cons_sync_type(r) == RTE_RING_SYNC_ST);
823 }
824
825 /**
826  * Dump the status of all rings on the console
827  *
828  * @param f
829  *   A pointer to a file for output
830  */
831 void rte_ring_list_dump(FILE *f);
832
833 /**
834  * Search a ring from its name
835  *
836  * @param name
837  *   The name of the ring.
838  * @return
839  *   The pointer to the ring matching the name, or NULL if not found,
840  *   with rte_errno set appropriately. Possible rte_errno values include:
841  *    - ENOENT - required entry not available to return.
842  */
843 struct rte_ring *rte_ring_lookup(const char *name);
844
845 /**
846  * Enqueue several objects on the ring (multi-producers safe).
847  *
848  * This function uses a "compare and set" instruction to move the
849  * producer index atomically.
850  *
851  * @param r
852  *   A pointer to the ring structure.
853  * @param obj_table
854  *   A pointer to a table of void * pointers (objects).
855  * @param n
856  *   The number of objects to add in the ring from the obj_table.
857  * @param free_space
858  *   if non-NULL, returns the amount of space in the ring after the
859  *   enqueue operation has finished.
860  * @return
861  *   - n: Actual number of objects enqueued.
862  */
863 static __rte_always_inline unsigned
864 rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
865                          unsigned int n, unsigned int *free_space)
866 {
867         return __rte_ring_do_enqueue(r, obj_table, n,
868                         RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, free_space);
869 }
870
871 /**
872  * Enqueue several objects on a ring (NOT multi-producers safe).
873  *
874  * @param r
875  *   A pointer to the ring structure.
876  * @param obj_table
877  *   A pointer to a table of void * pointers (objects).
878  * @param n
879  *   The number of objects to add in the ring from the obj_table.
880  * @param free_space
881  *   if non-NULL, returns the amount of space in the ring after the
882  *   enqueue operation has finished.
883  * @return
884  *   - n: Actual number of objects enqueued.
885  */
886 static __rte_always_inline unsigned
887 rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
888                          unsigned int n, unsigned int *free_space)
889 {
890         return __rte_ring_do_enqueue(r, obj_table, n,
891                         RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, free_space);
892 }
893
894 /**
895  * Enqueue several objects on a ring.
896  *
897  * This function calls the multi-producer or the single-producer
898  * version depending on the default behavior that was specified at
899  * ring creation time (see flags).
900  *
901  * @param r
902  *   A pointer to the ring structure.
903  * @param obj_table
904  *   A pointer to a table of void * pointers (objects).
905  * @param n
906  *   The number of objects to add in the ring from the obj_table.
907  * @param free_space
908  *   if non-NULL, returns the amount of space in the ring after the
909  *   enqueue operation has finished.
910  * @return
911  *   - n: Actual number of objects enqueued.
912  */
913 static __rte_always_inline unsigned
914 rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
915                       unsigned int n, unsigned int *free_space)
916 {
917         switch (r->prod.sync_type) {
918         case RTE_RING_SYNC_MT:
919                 return rte_ring_mp_enqueue_burst(r, obj_table, n, free_space);
920         case RTE_RING_SYNC_ST:
921                 return rte_ring_sp_enqueue_burst(r, obj_table, n, free_space);
922 #ifdef ALLOW_EXPERIMENTAL_API
923         case RTE_RING_SYNC_MT_RTS:
924                 return rte_ring_mp_rts_enqueue_burst(r, obj_table, n,
925                         free_space);
926         case RTE_RING_SYNC_MT_HTS:
927                 return rte_ring_mp_hts_enqueue_burst(r, obj_table, n,
928                         free_space);
929 #endif
930         }
931
932         /* valid ring should never reach this point */
933         RTE_ASSERT(0);
934         return 0;
935 }
936
937 /**
938  * Dequeue several objects from a ring (multi-consumers safe). When the request
939  * objects are more than the available objects, only dequeue the actual number
940  * of objects
941  *
942  * This function uses a "compare and set" instruction to move the
943  * consumer index atomically.
944  *
945  * @param r
946  *   A pointer to the ring structure.
947  * @param obj_table
948  *   A pointer to a table of void * pointers (objects) that will be filled.
949  * @param n
950  *   The number of objects to dequeue from the ring to the obj_table.
951  * @param available
952  *   If non-NULL, returns the number of remaining ring entries after the
953  *   dequeue has finished.
954  * @return
955  *   - n: Actual number of objects dequeued, 0 if ring is empty
956  */
957 static __rte_always_inline unsigned
958 rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
959                 unsigned int n, unsigned int *available)
960 {
961         return __rte_ring_do_dequeue(r, obj_table, n,
962                         RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, available);
963 }
964
965 /**
966  * Dequeue several objects from a ring (NOT multi-consumers safe).When the
967  * request objects are more than the available objects, only dequeue the
968  * actual number of objects
969  *
970  * @param r
971  *   A pointer to the ring structure.
972  * @param obj_table
973  *   A pointer to a table of void * pointers (objects) that will be filled.
974  * @param n
975  *   The number of objects to dequeue from the ring to the obj_table.
976  * @param available
977  *   If non-NULL, returns the number of remaining ring entries after the
978  *   dequeue has finished.
979  * @return
980  *   - n: Actual number of objects dequeued, 0 if ring is empty
981  */
982 static __rte_always_inline unsigned
983 rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
984                 unsigned int n, unsigned int *available)
985 {
986         return __rte_ring_do_dequeue(r, obj_table, n,
987                         RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, available);
988 }
989
990 /**
991  * Dequeue multiple objects from a ring up to a maximum number.
992  *
993  * This function calls the multi-consumers or the single-consumer
994  * version, depending on the default behaviour that was specified at
995  * ring creation time (see flags).
996  *
997  * @param r
998  *   A pointer to the ring structure.
999  * @param obj_table
1000  *   A pointer to a table of void * pointers (objects) that will be filled.
1001  * @param n
1002  *   The number of objects to dequeue from the ring to the obj_table.
1003  * @param available
1004  *   If non-NULL, returns the number of remaining ring entries after the
1005  *   dequeue has finished.
1006  * @return
1007  *   - Number of objects dequeued
1008  */
1009 static __rte_always_inline unsigned
1010 rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
1011                 unsigned int n, unsigned int *available)
1012 {
1013         switch (r->cons.sync_type) {
1014         case RTE_RING_SYNC_MT:
1015                 return rte_ring_mc_dequeue_burst(r, obj_table, n, available);
1016         case RTE_RING_SYNC_ST:
1017                 return rte_ring_sc_dequeue_burst(r, obj_table, n, available);
1018 #ifdef ALLOW_EXPERIMENTAL_API
1019         case RTE_RING_SYNC_MT_RTS:
1020                 return rte_ring_mc_rts_dequeue_burst(r, obj_table, n,
1021                         available);
1022         case RTE_RING_SYNC_MT_HTS:
1023                 return rte_ring_mc_hts_dequeue_burst(r, obj_table, n,
1024                         available);
1025 #endif
1026         }
1027
1028         /* valid ring should never reach this point */
1029         RTE_ASSERT(0);
1030         return 0;
1031 }
1032
1033 #ifdef __cplusplus
1034 }
1035 #endif
1036
1037 #endif /* _RTE_RING_H_ */