ring: remove split cacheline build setting
[dpdk.git] / lib / librte_ring / rte_ring.h
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 /*
35  * Derived from FreeBSD's bufring.h
36  *
37  **************************************************************************
38  *
39  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
40  * All rights reserved.
41  *
42  * Redistribution and use in source and binary forms, with or without
43  * modification, are permitted provided that the following conditions are met:
44  *
45  * 1. Redistributions of source code must retain the above copyright notice,
46  *    this list of conditions and the following disclaimer.
47  *
48  * 2. The name of Kip Macy nor the names of other
49  *    contributors may be used to endorse or promote products derived from
50  *    this software without specific prior written permission.
51  *
52  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
53  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
54  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
55  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
56  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
57  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
58  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
59  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
60  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
61  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
62  * POSSIBILITY OF SUCH DAMAGE.
63  *
64  ***************************************************************************/
65
66 #ifndef _RTE_RING_H_
67 #define _RTE_RING_H_
68
69 /**
70  * @file
71  * RTE Ring
72  *
73  * The Ring Manager is a fixed-size queue, implemented as a table of
74  * pointers. Head and tail pointers are modified atomically, allowing
75  * concurrent access to it. It has the following features:
76  *
77  * - FIFO (First In First Out)
78  * - Maximum size is fixed; the pointers are stored in a table.
79  * - Lockless implementation.
80  * - Multi- or single-consumer dequeue.
81  * - Multi- or single-producer enqueue.
82  * - Bulk dequeue.
83  * - Bulk enqueue.
84  *
85  * Note: the ring implementation is not preemptable. A lcore must not
86  * be interrupted by another task that uses the same ring.
87  *
88  */
89
90 #ifdef __cplusplus
91 extern "C" {
92 #endif
93
94 #include <stdio.h>
95 #include <stdint.h>
96 #include <sys/queue.h>
97 #include <errno.h>
98 #include <rte_common.h>
99 #include <rte_memory.h>
100 #include <rte_lcore.h>
101 #include <rte_atomic.h>
102 #include <rte_branch_prediction.h>
103 #include <rte_memzone.h>
104
105 #define RTE_TAILQ_RING_NAME "RTE_RING"
106
107 enum rte_ring_queue_behavior {
108         RTE_RING_QUEUE_FIXED = 0, /* Enq/Deq a fixed number of items from a ring */
109         RTE_RING_QUEUE_VARIABLE   /* Enq/Deq as many items as possible from ring */
110 };
111
112 #ifdef RTE_LIBRTE_RING_DEBUG
113 /**
114  * A structure that stores the ring statistics (per-lcore).
115  */
116 struct rte_ring_debug_stats {
117         uint64_t enq_success_bulk; /**< Successful enqueues number. */
118         uint64_t enq_success_objs; /**< Objects successfully enqueued. */
119         uint64_t enq_quota_bulk;   /**< Successful enqueues above watermark. */
120         uint64_t enq_quota_objs;   /**< Objects enqueued above watermark. */
121         uint64_t enq_fail_bulk;    /**< Failed enqueues number. */
122         uint64_t enq_fail_objs;    /**< Objects that failed to be enqueued. */
123         uint64_t deq_success_bulk; /**< Successful dequeues number. */
124         uint64_t deq_success_objs; /**< Objects successfully dequeued. */
125         uint64_t deq_fail_bulk;    /**< Failed dequeues number. */
126         uint64_t deq_fail_objs;    /**< Objects that failed to be dequeued. */
127 } __rte_cache_aligned;
128 #endif
129
130 #define RTE_RING_MZ_PREFIX "RG_"
131 /**< The maximum length of a ring name. */
132 #define RTE_RING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
133                            sizeof(RTE_RING_MZ_PREFIX) + 1)
134
135 #ifndef RTE_RING_PAUSE_REP_COUNT
136 #define RTE_RING_PAUSE_REP_COUNT 0 /**< Yield after pause num of times, no yield
137                                     *   if RTE_RING_PAUSE_REP not defined. */
138 #endif
139
140 struct rte_memzone; /* forward declaration, so as not to require memzone.h */
141
142 #if RTE_CACHE_LINE_SIZE < 128
143 #define PROD_ALIGN (RTE_CACHE_LINE_SIZE * 2)
144 #define CONS_ALIGN (RTE_CACHE_LINE_SIZE * 2)
145 #else
146 #define PROD_ALIGN RTE_CACHE_LINE_SIZE
147 #define CONS_ALIGN RTE_CACHE_LINE_SIZE
148 #endif
149
150 /**
151  * An RTE ring structure.
152  *
153  * The producer and the consumer have a head and a tail index. The particularity
154  * of these index is that they are not between 0 and size(ring). These indexes
155  * are between 0 and 2^32, and we mask their value when we access the ring[]
156  * field. Thanks to this assumption, we can do subtractions between 2 index
157  * values in a modulo-32bit base: that's why the overflow of the indexes is not
158  * a problem.
159  */
160 struct rte_ring {
161         /*
162          * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI
163          * compatibility requirements, it could be changed to RTE_RING_NAMESIZE
164          * next time the ABI changes
165          */
166         char name[RTE_MEMZONE_NAMESIZE];    /**< Name of the ring. */
167         int flags;                       /**< Flags supplied at creation. */
168         const struct rte_memzone *memzone;
169                         /**< Memzone, if any, containing the rte_ring */
170
171         /** Ring producer status. */
172         struct prod {
173                 uint32_t watermark;      /**< Maximum items before EDQUOT. */
174                 uint32_t sp_enqueue;     /**< True, if single producer. */
175                 uint32_t size;           /**< Size of ring. */
176                 uint32_t mask;           /**< Mask (size-1) of ring. */
177                 volatile uint32_t head;  /**< Producer head. */
178                 volatile uint32_t tail;  /**< Producer tail. */
179         } prod __rte_aligned(PROD_ALIGN);
180
181         /** Ring consumer status. */
182         struct cons {
183                 uint32_t sc_dequeue;     /**< True, if single consumer. */
184                 uint32_t size;           /**< Size of the ring. */
185                 uint32_t mask;           /**< Mask (size-1) of ring. */
186                 volatile uint32_t head;  /**< Consumer head. */
187                 volatile uint32_t tail;  /**< Consumer tail. */
188         } cons __rte_aligned(CONS_ALIGN);
189
190 #ifdef RTE_LIBRTE_RING_DEBUG
191         struct rte_ring_debug_stats stats[RTE_MAX_LCORE];
192 #endif
193
194         void *ring[] __rte_cache_aligned;   /**< Memory space of ring starts here.
195                                              * not volatile so need to be careful
196                                              * about compiler re-ordering */
197 };
198
199 #define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */
200 #define RING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */
201 #define RTE_RING_QUOT_EXCEED (1 << 31)  /**< Quota exceed for burst ops */
202 #define RTE_RING_SZ_MASK  (unsigned)(0x0fffffff) /**< Ring size mask */
203
204 /**
205  * @internal When debug is enabled, store ring statistics.
206  * @param r
207  *   A pointer to the ring.
208  * @param name
209  *   The name of the statistics field to increment in the ring.
210  * @param n
211  *   The number to add to the object-oriented statistics.
212  */
213 #ifdef RTE_LIBRTE_RING_DEBUG
214 #define __RING_STAT_ADD(r, name, n) do {                        \
215                 unsigned __lcore_id = rte_lcore_id();           \
216                 if (__lcore_id < RTE_MAX_LCORE) {               \
217                         r->stats[__lcore_id].name##_objs += n;  \
218                         r->stats[__lcore_id].name##_bulk += 1;  \
219                 }                                               \
220         } while(0)
221 #else
222 #define __RING_STAT_ADD(r, name, n) do {} while(0)
223 #endif
224
225 /**
226  * Calculate the memory size needed for a ring
227  *
228  * This function returns the number of bytes needed for a ring, given
229  * the number of elements in it. This value is the sum of the size of
230  * the structure rte_ring and the size of the memory needed by the
231  * objects pointers. The value is aligned to a cache line size.
232  *
233  * @param count
234  *   The number of elements in the ring (must be a power of 2).
235  * @return
236  *   - The memory size needed for the ring on success.
237  *   - -EINVAL if count is not a power of 2.
238  */
239 ssize_t rte_ring_get_memsize(unsigned count);
240
241 /**
242  * Initialize a ring structure.
243  *
244  * Initialize a ring structure in memory pointed by "r". The size of the
245  * memory area must be large enough to store the ring structure and the
246  * object table. It is advised to use rte_ring_get_memsize() to get the
247  * appropriate size.
248  *
249  * The ring size is set to *count*, which must be a power of two. Water
250  * marking is disabled by default. The real usable ring size is
251  * *count-1* instead of *count* to differentiate a free ring from an
252  * empty ring.
253  *
254  * The ring is not added in RTE_TAILQ_RING global list. Indeed, the
255  * memory given by the caller may not be shareable among dpdk
256  * processes.
257  *
258  * @param r
259  *   The pointer to the ring structure followed by the objects table.
260  * @param name
261  *   The name of the ring.
262  * @param count
263  *   The number of elements in the ring (must be a power of 2).
264  * @param flags
265  *   An OR of the following:
266  *    - RING_F_SP_ENQ: If this flag is set, the default behavior when
267  *      using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
268  *      is "single-producer". Otherwise, it is "multi-producers".
269  *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
270  *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
271  *      is "single-consumer". Otherwise, it is "multi-consumers".
272  * @return
273  *   0 on success, or a negative value on error.
274  */
275 int rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
276         unsigned flags);
277
278 /**
279  * Create a new ring named *name* in memory.
280  *
281  * This function uses ``memzone_reserve()`` to allocate memory. Then it
282  * calls rte_ring_init() to initialize an empty ring.
283  *
284  * The new ring size is set to *count*, which must be a power of
285  * two. Water marking is disabled by default. The real usable ring size
286  * is *count-1* instead of *count* to differentiate a free ring from an
287  * empty ring.
288  *
289  * The ring is added in RTE_TAILQ_RING list.
290  *
291  * @param name
292  *   The name of the ring.
293  * @param count
294  *   The size of the ring (must be a power of 2).
295  * @param socket_id
296  *   The *socket_id* argument is the socket identifier in case of
297  *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
298  *   constraint for the reserved zone.
299  * @param flags
300  *   An OR of the following:
301  *    - RING_F_SP_ENQ: If this flag is set, the default behavior when
302  *      using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
303  *      is "single-producer". Otherwise, it is "multi-producers".
304  *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
305  *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
306  *      is "single-consumer". Otherwise, it is "multi-consumers".
307  * @return
308  *   On success, the pointer to the new allocated ring. NULL on error with
309  *    rte_errno set appropriately. Possible errno values include:
310  *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
311  *    - E_RTE_SECONDARY - function was called from a secondary process instance
312  *    - EINVAL - count provided is not a power of 2
313  *    - ENOSPC - the maximum number of memzones has already been allocated
314  *    - EEXIST - a memzone with the same name already exists
315  *    - ENOMEM - no appropriate memory area found in which to create memzone
316  */
317 struct rte_ring *rte_ring_create(const char *name, unsigned count,
318                                  int socket_id, unsigned flags);
319 /**
320  * De-allocate all memory used by the ring.
321  *
322  * @param r
323  *   Ring to free
324  */
325 void rte_ring_free(struct rte_ring *r);
326
327 /**
328  * Change the high water mark.
329  *
330  * If *count* is 0, water marking is disabled. Otherwise, it is set to the
331  * *count* value. The *count* value must be greater than 0 and less
332  * than the ring size.
333  *
334  * This function can be called at any time (not necessarily at
335  * initialization).
336  *
337  * @param r
338  *   A pointer to the ring structure.
339  * @param count
340  *   The new water mark value.
341  * @return
342  *   - 0: Success; water mark changed.
343  *   - -EINVAL: Invalid water mark value.
344  */
345 int rte_ring_set_water_mark(struct rte_ring *r, unsigned count);
346
347 /**
348  * Dump the status of the ring to a file.
349  *
350  * @param f
351  *   A pointer to a file for output
352  * @param r
353  *   A pointer to the ring structure.
354  */
355 void rte_ring_dump(FILE *f, const struct rte_ring *r);
356
357 /* the actual enqueue of pointers on the ring.
358  * Placed here since identical code needed in both
359  * single and multi producer enqueue functions */
360 #define ENQUEUE_PTRS() do { \
361         const uint32_t size = r->prod.size; \
362         uint32_t idx = prod_head & mask; \
363         if (likely(idx + n < size)) { \
364                 for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
365                         r->ring[idx] = obj_table[i]; \
366                         r->ring[idx+1] = obj_table[i+1]; \
367                         r->ring[idx+2] = obj_table[i+2]; \
368                         r->ring[idx+3] = obj_table[i+3]; \
369                 } \
370                 switch (n & 0x3) { \
371                         case 3: r->ring[idx++] = obj_table[i++]; \
372                         case 2: r->ring[idx++] = obj_table[i++]; \
373                         case 1: r->ring[idx++] = obj_table[i++]; \
374                 } \
375         } else { \
376                 for (i = 0; idx < size; i++, idx++)\
377                         r->ring[idx] = obj_table[i]; \
378                 for (idx = 0; i < n; i++, idx++) \
379                         r->ring[idx] = obj_table[i]; \
380         } \
381 } while(0)
382
383 /* the actual copy of pointers on the ring to obj_table.
384  * Placed here since identical code needed in both
385  * single and multi consumer dequeue functions */
386 #define DEQUEUE_PTRS() do { \
387         uint32_t idx = cons_head & mask; \
388         const uint32_t size = r->cons.size; \
389         if (likely(idx + n < size)) { \
390                 for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\
391                         obj_table[i] = r->ring[idx]; \
392                         obj_table[i+1] = r->ring[idx+1]; \
393                         obj_table[i+2] = r->ring[idx+2]; \
394                         obj_table[i+3] = r->ring[idx+3]; \
395                 } \
396                 switch (n & 0x3) { \
397                         case 3: obj_table[i++] = r->ring[idx++]; \
398                         case 2: obj_table[i++] = r->ring[idx++]; \
399                         case 1: obj_table[i++] = r->ring[idx++]; \
400                 } \
401         } else { \
402                 for (i = 0; idx < size; i++, idx++) \
403                         obj_table[i] = r->ring[idx]; \
404                 for (idx = 0; i < n; i++, idx++) \
405                         obj_table[i] = r->ring[idx]; \
406         } \
407 } while (0)
408
409 /**
410  * @internal Enqueue several objects on the ring (multi-producers safe).
411  *
412  * This function uses a "compare and set" instruction to move the
413  * producer index atomically.
414  *
415  * @param r
416  *   A pointer to the ring structure.
417  * @param obj_table
418  *   A pointer to a table of void * pointers (objects).
419  * @param n
420  *   The number of objects to add in the ring from the obj_table.
421  * @param behavior
422  *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
423  *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring
424  * @return
425  *   Depend on the behavior value
426  *   if behavior = RTE_RING_QUEUE_FIXED
427  *   - 0: Success; objects enqueue.
428  *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
429  *     high water mark is exceeded.
430  *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
431  *   if behavior = RTE_RING_QUEUE_VARIABLE
432  *   - n: Actual number of objects enqueued.
433  */
434 static inline int __attribute__((always_inline))
435 __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
436                          unsigned n, enum rte_ring_queue_behavior behavior)
437 {
438         uint32_t prod_head, prod_next;
439         uint32_t cons_tail, free_entries;
440         const unsigned max = n;
441         int success;
442         unsigned i, rep = 0;
443         uint32_t mask = r->prod.mask;
444         int ret;
445
446         /* Avoid the unnecessary cmpset operation below, which is also
447          * potentially harmful when n equals 0. */
448         if (n == 0)
449                 return 0;
450
451         /* move prod.head atomically */
452         do {
453                 /* Reset n to the initial burst count */
454                 n = max;
455
456                 prod_head = r->prod.head;
457                 cons_tail = r->cons.tail;
458                 /* The subtraction is done between two unsigned 32bits value
459                  * (the result is always modulo 32 bits even if we have
460                  * prod_head > cons_tail). So 'free_entries' is always between 0
461                  * and size(ring)-1. */
462                 free_entries = (mask + cons_tail - prod_head);
463
464                 /* check that we have enough room in ring */
465                 if (unlikely(n > free_entries)) {
466                         if (behavior == RTE_RING_QUEUE_FIXED) {
467                                 __RING_STAT_ADD(r, enq_fail, n);
468                                 return -ENOBUFS;
469                         }
470                         else {
471                                 /* No free entry available */
472                                 if (unlikely(free_entries == 0)) {
473                                         __RING_STAT_ADD(r, enq_fail, n);
474                                         return 0;
475                                 }
476
477                                 n = free_entries;
478                         }
479                 }
480
481                 prod_next = prod_head + n;
482                 success = rte_atomic32_cmpset(&r->prod.head, prod_head,
483                                               prod_next);
484         } while (unlikely(success == 0));
485
486         /* write entries in ring */
487         ENQUEUE_PTRS();
488         rte_smp_wmb();
489
490         /* if we exceed the watermark */
491         if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
492                 ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
493                                 (int)(n | RTE_RING_QUOT_EXCEED);
494                 __RING_STAT_ADD(r, enq_quota, n);
495         }
496         else {
497                 ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
498                 __RING_STAT_ADD(r, enq_success, n);
499         }
500
501         /*
502          * If there are other enqueues in progress that preceded us,
503          * we need to wait for them to complete
504          */
505         while (unlikely(r->prod.tail != prod_head)) {
506                 rte_pause();
507
508                 /* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
509                  * for other thread finish. It gives pre-empted thread a chance
510                  * to proceed and finish with ring dequeue operation. */
511                 if (RTE_RING_PAUSE_REP_COUNT &&
512                     ++rep == RTE_RING_PAUSE_REP_COUNT) {
513                         rep = 0;
514                         sched_yield();
515                 }
516         }
517         r->prod.tail = prod_next;
518         return ret;
519 }
520
521 /**
522  * @internal Enqueue several objects on a ring (NOT multi-producers safe).
523  *
524  * @param r
525  *   A pointer to the ring structure.
526  * @param obj_table
527  *   A pointer to a table of void * pointers (objects).
528  * @param n
529  *   The number of objects to add in the ring from the obj_table.
530  * @param behavior
531  *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
532  *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring
533  * @return
534  *   Depend on the behavior value
535  *   if behavior = RTE_RING_QUEUE_FIXED
536  *   - 0: Success; objects enqueue.
537  *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
538  *     high water mark is exceeded.
539  *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
540  *   if behavior = RTE_RING_QUEUE_VARIABLE
541  *   - n: Actual number of objects enqueued.
542  */
543 static inline int __attribute__((always_inline))
544 __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
545                          unsigned n, enum rte_ring_queue_behavior behavior)
546 {
547         uint32_t prod_head, cons_tail;
548         uint32_t prod_next, free_entries;
549         unsigned i;
550         uint32_t mask = r->prod.mask;
551         int ret;
552
553         prod_head = r->prod.head;
554         cons_tail = r->cons.tail;
555         /* The subtraction is done between two unsigned 32bits value
556          * (the result is always modulo 32 bits even if we have
557          * prod_head > cons_tail). So 'free_entries' is always between 0
558          * and size(ring)-1. */
559         free_entries = mask + cons_tail - prod_head;
560
561         /* check that we have enough room in ring */
562         if (unlikely(n > free_entries)) {
563                 if (behavior == RTE_RING_QUEUE_FIXED) {
564                         __RING_STAT_ADD(r, enq_fail, n);
565                         return -ENOBUFS;
566                 }
567                 else {
568                         /* No free entry available */
569                         if (unlikely(free_entries == 0)) {
570                                 __RING_STAT_ADD(r, enq_fail, n);
571                                 return 0;
572                         }
573
574                         n = free_entries;
575                 }
576         }
577
578         prod_next = prod_head + n;
579         r->prod.head = prod_next;
580
581         /* write entries in ring */
582         ENQUEUE_PTRS();
583         rte_smp_wmb();
584
585         /* if we exceed the watermark */
586         if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
587                 ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
588                         (int)(n | RTE_RING_QUOT_EXCEED);
589                 __RING_STAT_ADD(r, enq_quota, n);
590         }
591         else {
592                 ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
593                 __RING_STAT_ADD(r, enq_success, n);
594         }
595
596         r->prod.tail = prod_next;
597         return ret;
598 }
599
600 /**
601  * @internal Dequeue several objects from a ring (multi-consumers safe). When
602  * the request objects are more than the available objects, only dequeue the
603  * actual number of objects
604  *
605  * This function uses a "compare and set" instruction to move the
606  * consumer index atomically.
607  *
608  * @param r
609  *   A pointer to the ring structure.
610  * @param obj_table
611  *   A pointer to a table of void * pointers (objects) that will be filled.
612  * @param n
613  *   The number of objects to dequeue from the ring to the obj_table.
614  * @param behavior
615  *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
616  *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring
617  * @return
618  *   Depend on the behavior value
619  *   if behavior = RTE_RING_QUEUE_FIXED
620  *   - 0: Success; objects dequeued.
621  *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
622  *     dequeued.
623  *   if behavior = RTE_RING_QUEUE_VARIABLE
624  *   - n: Actual number of objects dequeued.
625  */
626
627 static inline int __attribute__((always_inline))
628 __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
629                  unsigned n, enum rte_ring_queue_behavior behavior)
630 {
631         uint32_t cons_head, prod_tail;
632         uint32_t cons_next, entries;
633         const unsigned max = n;
634         int success;
635         unsigned i, rep = 0;
636         uint32_t mask = r->prod.mask;
637
638         /* Avoid the unnecessary cmpset operation below, which is also
639          * potentially harmful when n equals 0. */
640         if (n == 0)
641                 return 0;
642
643         /* move cons.head atomically */
644         do {
645                 /* Restore n as it may change every loop */
646                 n = max;
647
648                 cons_head = r->cons.head;
649                 prod_tail = r->prod.tail;
650                 /* The subtraction is done between two unsigned 32bits value
651                  * (the result is always modulo 32 bits even if we have
652                  * cons_head > prod_tail). So 'entries' is always between 0
653                  * and size(ring)-1. */
654                 entries = (prod_tail - cons_head);
655
656                 /* Set the actual entries for dequeue */
657                 if (n > entries) {
658                         if (behavior == RTE_RING_QUEUE_FIXED) {
659                                 __RING_STAT_ADD(r, deq_fail, n);
660                                 return -ENOENT;
661                         }
662                         else {
663                                 if (unlikely(entries == 0)){
664                                         __RING_STAT_ADD(r, deq_fail, n);
665                                         return 0;
666                                 }
667
668                                 n = entries;
669                         }
670                 }
671
672                 cons_next = cons_head + n;
673                 success = rte_atomic32_cmpset(&r->cons.head, cons_head,
674                                               cons_next);
675         } while (unlikely(success == 0));
676
677         /* copy in table */
678         DEQUEUE_PTRS();
679         rte_smp_rmb();
680
681         /*
682          * If there are other dequeues in progress that preceded us,
683          * we need to wait for them to complete
684          */
685         while (unlikely(r->cons.tail != cons_head)) {
686                 rte_pause();
687
688                 /* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
689                  * for other thread finish. It gives pre-empted thread a chance
690                  * to proceed and finish with ring dequeue operation. */
691                 if (RTE_RING_PAUSE_REP_COUNT &&
692                     ++rep == RTE_RING_PAUSE_REP_COUNT) {
693                         rep = 0;
694                         sched_yield();
695                 }
696         }
697         __RING_STAT_ADD(r, deq_success, n);
698         r->cons.tail = cons_next;
699
700         return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
701 }
702
703 /**
704  * @internal Dequeue several objects from a ring (NOT multi-consumers safe).
705  * When the request objects are more than the available objects, only dequeue
706  * the actual number of objects
707  *
708  * @param r
709  *   A pointer to the ring structure.
710  * @param obj_table
711  *   A pointer to a table of void * pointers (objects) that will be filled.
712  * @param n
713  *   The number of objects to dequeue from the ring to the obj_table.
714  * @param behavior
715  *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
716  *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring
717  * @return
718  *   Depend on the behavior value
719  *   if behavior = RTE_RING_QUEUE_FIXED
720  *   - 0: Success; objects dequeued.
721  *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
722  *     dequeued.
723  *   if behavior = RTE_RING_QUEUE_VARIABLE
724  *   - n: Actual number of objects dequeued.
725  */
726 static inline int __attribute__((always_inline))
727 __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
728                  unsigned n, enum rte_ring_queue_behavior behavior)
729 {
730         uint32_t cons_head, prod_tail;
731         uint32_t cons_next, entries;
732         unsigned i;
733         uint32_t mask = r->prod.mask;
734
735         cons_head = r->cons.head;
736         prod_tail = r->prod.tail;
737         /* The subtraction is done between two unsigned 32bits value
738          * (the result is always modulo 32 bits even if we have
739          * cons_head > prod_tail). So 'entries' is always between 0
740          * and size(ring)-1. */
741         entries = prod_tail - cons_head;
742
743         if (n > entries) {
744                 if (behavior == RTE_RING_QUEUE_FIXED) {
745                         __RING_STAT_ADD(r, deq_fail, n);
746                         return -ENOENT;
747                 }
748                 else {
749                         if (unlikely(entries == 0)){
750                                 __RING_STAT_ADD(r, deq_fail, n);
751                                 return 0;
752                         }
753
754                         n = entries;
755                 }
756         }
757
758         cons_next = cons_head + n;
759         r->cons.head = cons_next;
760
761         /* copy in table */
762         DEQUEUE_PTRS();
763         rte_smp_rmb();
764
765         __RING_STAT_ADD(r, deq_success, n);
766         r->cons.tail = cons_next;
767         return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
768 }
769
770 /**
771  * Enqueue several objects on the ring (multi-producers safe).
772  *
773  * This function uses a "compare and set" instruction to move the
774  * producer index atomically.
775  *
776  * @param r
777  *   A pointer to the ring structure.
778  * @param obj_table
779  *   A pointer to a table of void * pointers (objects).
780  * @param n
781  *   The number of objects to add in the ring from the obj_table.
782  * @return
783  *   - 0: Success; objects enqueue.
784  *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
785  *     high water mark is exceeded.
786  *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
787  */
788 static inline int __attribute__((always_inline))
789 rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
790                          unsigned n)
791 {
792         return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
793 }
794
795 /**
796  * Enqueue several objects on a ring (NOT multi-producers safe).
797  *
798  * @param r
799  *   A pointer to the ring structure.
800  * @param obj_table
801  *   A pointer to a table of void * pointers (objects).
802  * @param n
803  *   The number of objects to add in the ring from the obj_table.
804  * @return
805  *   - 0: Success; objects enqueued.
806  *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
807  *     high water mark is exceeded.
808  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
809  */
810 static inline int __attribute__((always_inline))
811 rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
812                          unsigned n)
813 {
814         return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
815 }
816
817 /**
818  * Enqueue several objects on a ring.
819  *
820  * This function calls the multi-producer or the single-producer
821  * version depending on the default behavior that was specified at
822  * ring creation time (see flags).
823  *
824  * @param r
825  *   A pointer to the ring structure.
826  * @param obj_table
827  *   A pointer to a table of void * pointers (objects).
828  * @param n
829  *   The number of objects to add in the ring from the obj_table.
830  * @return
831  *   - 0: Success; objects enqueued.
832  *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
833  *     high water mark is exceeded.
834  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
835  */
836 static inline int __attribute__((always_inline))
837 rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
838                       unsigned n)
839 {
840         if (r->prod.sp_enqueue)
841                 return rte_ring_sp_enqueue_bulk(r, obj_table, n);
842         else
843                 return rte_ring_mp_enqueue_bulk(r, obj_table, n);
844 }
845
846 /**
847  * Enqueue one object on a ring (multi-producers safe).
848  *
849  * This function uses a "compare and set" instruction to move the
850  * producer index atomically.
851  *
852  * @param r
853  *   A pointer to the ring structure.
854  * @param obj
855  *   A pointer to the object to be added.
856  * @return
857  *   - 0: Success; objects enqueued.
858  *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
859  *     high water mark is exceeded.
860  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
861  */
862 static inline int __attribute__((always_inline))
863 rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
864 {
865         return rte_ring_mp_enqueue_bulk(r, &obj, 1);
866 }
867
868 /**
869  * Enqueue one object on a ring (NOT multi-producers safe).
870  *
871  * @param r
872  *   A pointer to the ring structure.
873  * @param obj
874  *   A pointer to the object to be added.
875  * @return
876  *   - 0: Success; objects enqueued.
877  *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
878  *     high water mark is exceeded.
879  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
880  */
881 static inline int __attribute__((always_inline))
882 rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
883 {
884         return rte_ring_sp_enqueue_bulk(r, &obj, 1);
885 }
886
887 /**
888  * Enqueue one object on a ring.
889  *
890  * This function calls the multi-producer or the single-producer
891  * version, depending on the default behaviour that was specified at
892  * ring creation time (see flags).
893  *
894  * @param r
895  *   A pointer to the ring structure.
896  * @param obj
897  *   A pointer to the object to be added.
898  * @return
899  *   - 0: Success; objects enqueued.
900  *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
901  *     high water mark is exceeded.
902  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
903  */
904 static inline int __attribute__((always_inline))
905 rte_ring_enqueue(struct rte_ring *r, void *obj)
906 {
907         if (r->prod.sp_enqueue)
908                 return rte_ring_sp_enqueue(r, obj);
909         else
910                 return rte_ring_mp_enqueue(r, obj);
911 }
912
913 /**
914  * Dequeue several objects from a ring (multi-consumers safe).
915  *
916  * This function uses a "compare and set" instruction to move the
917  * consumer index atomically.
918  *
919  * @param r
920  *   A pointer to the ring structure.
921  * @param obj_table
922  *   A pointer to a table of void * pointers (objects) that will be filled.
923  * @param n
924  *   The number of objects to dequeue from the ring to the obj_table.
925  * @return
926  *   - 0: Success; objects dequeued.
927  *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
928  *     dequeued.
929  */
930 static inline int __attribute__((always_inline))
931 rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
932 {
933         return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
934 }
935
936 /**
937  * Dequeue several objects from a ring (NOT multi-consumers safe).
938  *
939  * @param r
940  *   A pointer to the ring structure.
941  * @param obj_table
942  *   A pointer to a table of void * pointers (objects) that will be filled.
943  * @param n
944  *   The number of objects to dequeue from the ring to the obj_table,
945  *   must be strictly positive.
946  * @return
947  *   - 0: Success; objects dequeued.
948  *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
949  *     dequeued.
950  */
951 static inline int __attribute__((always_inline))
952 rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
953 {
954         return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
955 }
956
957 /**
958  * Dequeue several objects from a ring.
959  *
960  * This function calls the multi-consumers or the single-consumer
961  * version, depending on the default behaviour that was specified at
962  * ring creation time (see flags).
963  *
964  * @param r
965  *   A pointer to the ring structure.
966  * @param obj_table
967  *   A pointer to a table of void * pointers (objects) that will be filled.
968  * @param n
969  *   The number of objects to dequeue from the ring to the obj_table.
970  * @return
971  *   - 0: Success; objects dequeued.
972  *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
973  *     dequeued.
974  */
975 static inline int __attribute__((always_inline))
976 rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
977 {
978         if (r->cons.sc_dequeue)
979                 return rte_ring_sc_dequeue_bulk(r, obj_table, n);
980         else
981                 return rte_ring_mc_dequeue_bulk(r, obj_table, n);
982 }
983
984 /**
985  * Dequeue one object from a ring (multi-consumers safe).
986  *
987  * This function uses a "compare and set" instruction to move the
988  * consumer index atomically.
989  *
990  * @param r
991  *   A pointer to the ring structure.
992  * @param obj_p
993  *   A pointer to a void * pointer (object) that will be filled.
994  * @return
995  *   - 0: Success; objects dequeued.
996  *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
997  *     dequeued.
998  */
999 static inline int __attribute__((always_inline))
1000 rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
1001 {
1002         return rte_ring_mc_dequeue_bulk(r, obj_p, 1);
1003 }
1004
1005 /**
1006  * Dequeue one object from a ring (NOT multi-consumers safe).
1007  *
1008  * @param r
1009  *   A pointer to the ring structure.
1010  * @param obj_p
1011  *   A pointer to a void * pointer (object) that will be filled.
1012  * @return
1013  *   - 0: Success; objects dequeued.
1014  *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
1015  *     dequeued.
1016  */
1017 static inline int __attribute__((always_inline))
1018 rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
1019 {
1020         return rte_ring_sc_dequeue_bulk(r, obj_p, 1);
1021 }
1022
1023 /**
1024  * Dequeue one object from a ring.
1025  *
1026  * This function calls the multi-consumers or the single-consumer
1027  * version depending on the default behaviour that was specified at
1028  * ring creation time (see flags).
1029  *
1030  * @param r
1031  *   A pointer to the ring structure.
1032  * @param obj_p
1033  *   A pointer to a void * pointer (object) that will be filled.
1034  * @return
1035  *   - 0: Success, objects dequeued.
1036  *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
1037  *     dequeued.
1038  */
1039 static inline int __attribute__((always_inline))
1040 rte_ring_dequeue(struct rte_ring *r, void **obj_p)
1041 {
1042         if (r->cons.sc_dequeue)
1043                 return rte_ring_sc_dequeue(r, obj_p);
1044         else
1045                 return rte_ring_mc_dequeue(r, obj_p);
1046 }
1047
1048 /**
1049  * Test if a ring is full.
1050  *
1051  * @param r
1052  *   A pointer to the ring structure.
1053  * @return
1054  *   - 1: The ring is full.
1055  *   - 0: The ring is not full.
1056  */
1057 static inline int
1058 rte_ring_full(const struct rte_ring *r)
1059 {
1060         uint32_t prod_tail = r->prod.tail;
1061         uint32_t cons_tail = r->cons.tail;
1062         return ((cons_tail - prod_tail - 1) & r->prod.mask) == 0;
1063 }
1064
1065 /**
1066  * Test if a ring is empty.
1067  *
1068  * @param r
1069  *   A pointer to the ring structure.
1070  * @return
1071  *   - 1: The ring is empty.
1072  *   - 0: The ring is not empty.
1073  */
1074 static inline int
1075 rte_ring_empty(const struct rte_ring *r)
1076 {
1077         uint32_t prod_tail = r->prod.tail;
1078         uint32_t cons_tail = r->cons.tail;
1079         return !!(cons_tail == prod_tail);
1080 }
1081
1082 /**
1083  * Return the number of entries in a ring.
1084  *
1085  * @param r
1086  *   A pointer to the ring structure.
1087  * @return
1088  *   The number of entries in the ring.
1089  */
1090 static inline unsigned
1091 rte_ring_count(const struct rte_ring *r)
1092 {
1093         uint32_t prod_tail = r->prod.tail;
1094         uint32_t cons_tail = r->cons.tail;
1095         return (prod_tail - cons_tail) & r->prod.mask;
1096 }
1097
1098 /**
1099  * Return the number of free entries in a ring.
1100  *
1101  * @param r
1102  *   A pointer to the ring structure.
1103  * @return
1104  *   The number of free entries in the ring.
1105  */
1106 static inline unsigned
1107 rte_ring_free_count(const struct rte_ring *r)
1108 {
1109         uint32_t prod_tail = r->prod.tail;
1110         uint32_t cons_tail = r->cons.tail;
1111         return (cons_tail - prod_tail - 1) & r->prod.mask;
1112 }
1113
1114 /**
1115  * Return the size of the ring.
1116  *
1117  * @param r
1118  *   A pointer to the ring structure.
1119  * @return
1120  *   The number of elements which can be stored in the ring.
1121  */
1122 static inline unsigned int
1123 rte_ring_get_size(const struct rte_ring *r)
1124 {
1125         return r->prod.size;
1126 }
1127
1128 /**
1129  * Dump the status of all rings on the console
1130  *
1131  * @param f
1132  *   A pointer to a file for output
1133  */
1134 void rte_ring_list_dump(FILE *f);
1135
1136 /**
1137  * Search a ring from its name
1138  *
1139  * @param name
1140  *   The name of the ring.
1141  * @return
1142  *   The pointer to the ring matching the name, or NULL if not found,
1143  *   with rte_errno set appropriately. Possible rte_errno values include:
1144  *    - ENOENT - required entry not available to return.
1145  */
1146 struct rte_ring *rte_ring_lookup(const char *name);
1147
1148 /**
1149  * Enqueue several objects on the ring (multi-producers safe).
1150  *
1151  * This function uses a "compare and set" instruction to move the
1152  * producer index atomically.
1153  *
1154  * @param r
1155  *   A pointer to the ring structure.
1156  * @param obj_table
1157  *   A pointer to a table of void * pointers (objects).
1158  * @param n
1159  *   The number of objects to add in the ring from the obj_table.
1160  * @return
1161  *   - n: Actual number of objects enqueued.
1162  */
1163 static inline unsigned __attribute__((always_inline))
1164 rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
1165                          unsigned n)
1166 {
1167         return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
1168 }
1169
1170 /**
1171  * Enqueue several objects on a ring (NOT multi-producers safe).
1172  *
1173  * @param r
1174  *   A pointer to the ring structure.
1175  * @param obj_table
1176  *   A pointer to a table of void * pointers (objects).
1177  * @param n
1178  *   The number of objects to add in the ring from the obj_table.
1179  * @return
1180  *   - n: Actual number of objects enqueued.
1181  */
1182 static inline unsigned __attribute__((always_inline))
1183 rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
1184                          unsigned n)
1185 {
1186         return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
1187 }
1188
1189 /**
1190  * Enqueue several objects on a ring.
1191  *
1192  * This function calls the multi-producer or the single-producer
1193  * version depending on the default behavior that was specified at
1194  * ring creation time (see flags).
1195  *
1196  * @param r
1197  *   A pointer to the ring structure.
1198  * @param obj_table
1199  *   A pointer to a table of void * pointers (objects).
1200  * @param n
1201  *   The number of objects to add in the ring from the obj_table.
1202  * @return
1203  *   - n: Actual number of objects enqueued.
1204  */
1205 static inline unsigned __attribute__((always_inline))
1206 rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
1207                       unsigned n)
1208 {
1209         if (r->prod.sp_enqueue)
1210                 return rte_ring_sp_enqueue_burst(r, obj_table, n);
1211         else
1212                 return rte_ring_mp_enqueue_burst(r, obj_table, n);
1213 }
1214
1215 /**
1216  * Dequeue several objects from a ring (multi-consumers safe). When the request
1217  * objects are more than the available objects, only dequeue the actual number
1218  * of objects
1219  *
1220  * This function uses a "compare and set" instruction to move the
1221  * consumer index atomically.
1222  *
1223  * @param r
1224  *   A pointer to the ring structure.
1225  * @param obj_table
1226  *   A pointer to a table of void * pointers (objects) that will be filled.
1227  * @param n
1228  *   The number of objects to dequeue from the ring to the obj_table.
1229  * @return
1230  *   - n: Actual number of objects dequeued, 0 if ring is empty
1231  */
1232 static inline unsigned __attribute__((always_inline))
1233 rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
1234 {
1235         return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
1236 }
1237
1238 /**
1239  * Dequeue several objects from a ring (NOT multi-consumers safe).When the
1240  * request objects are more than the available objects, only dequeue the
1241  * actual number of objects
1242  *
1243  * @param r
1244  *   A pointer to the ring structure.
1245  * @param obj_table
1246  *   A pointer to a table of void * pointers (objects) that will be filled.
1247  * @param n
1248  *   The number of objects to dequeue from the ring to the obj_table.
1249  * @return
1250  *   - n: Actual number of objects dequeued, 0 if ring is empty
1251  */
1252 static inline unsigned __attribute__((always_inline))
1253 rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
1254 {
1255         return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
1256 }
1257
1258 /**
1259  * Dequeue multiple objects from a ring up to a maximum number.
1260  *
1261  * This function calls the multi-consumers or the single-consumer
1262  * version, depending on the default behaviour that was specified at
1263  * ring creation time (see flags).
1264  *
1265  * @param r
1266  *   A pointer to the ring structure.
1267  * @param obj_table
1268  *   A pointer to a table of void * pointers (objects) that will be filled.
1269  * @param n
1270  *   The number of objects to dequeue from the ring to the obj_table.
1271  * @return
1272  *   - Number of objects dequeued
1273  */
1274 static inline unsigned __attribute__((always_inline))
1275 rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
1276 {
1277         if (r->cons.sc_dequeue)
1278                 return rte_ring_sc_dequeue_burst(r, obj_table, n);
1279         else
1280                 return rte_ring_mc_dequeue_burst(r, obj_table, n);
1281 }
1282
1283 #ifdef __cplusplus
1284 }
1285 #endif
1286
1287 #endif /* _RTE_RING_H_ */