eal: add rte_compiler_barrier() macro
[dpdk.git] / lib / librte_ring / rte_ring.h
1 /*-
2  *   BSD LICENSE
3  * 
4  *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  * 
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  * 
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  * 
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 /*
35  * Derived from FreeBSD's bufring.h
36  *
37  **************************************************************************
38  *
39  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
40  * All rights reserved.
41  *
42  * Redistribution and use in source and binary forms, with or without
43  * modification, are permitted provided that the following conditions are met:
44  *
45  * 1. Redistributions of source code must retain the above copyright notice,
46  *    this list of conditions and the following disclaimer.
47  *
48  * 2. The name of Kip Macy nor the names of other
49  *    contributors may be used to endorse or promote products derived from
50  *    this software without specific prior written permission.
51  *
52  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
53  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
54  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
55  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
56  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
57  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
58  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
59  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
60  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
61  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
62  * POSSIBILITY OF SUCH DAMAGE.
63  *
64  ***************************************************************************/
65
66 #ifndef _RTE_RING_H_
67 #define _RTE_RING_H_
68
69 /**
70  * @file
71  * RTE Ring
72  *
73  * The Ring Manager is a fixed-size queue, implemented as a table of
74  * pointers. Head and tail pointers are modified atomically, allowing
75  * concurrent access to it. It has the following features:
76  *
77  * - FIFO (First In First Out)
78  * - Maximum size is fixed; the pointers are stored in a table.
79  * - Lockless implementation.
80  * - Multi- or single-consumer dequeue.
81  * - Multi- or single-producer enqueue.
82  * - Bulk dequeue.
83  * - Bulk enqueue.
84  *
85  * Note: the ring implementation is not preemptable. A lcore must not
86  * be interrupted by another task that uses the same ring.
87  *
88  */
89
90 #ifdef __cplusplus
91 extern "C" {
92 #endif
93
94 #include <stdint.h>
95 #include <sys/queue.h>
96 #include <errno.h>
97 #include <rte_common.h>
98 #include <rte_memory.h>
99 #include <rte_lcore.h>
100 #include <rte_atomic.h>
101 #include <rte_branch_prediction.h>
102
103 enum rte_ring_queue_behavior {
104         RTE_RING_QUEUE_FIXED = 0, /* Enq/Deq a fixed number of items from a ring */
105         RTE_RING_QUEUE_VARIABLE   /* Enq/Deq as many items a possible from ring */
106 };
107
108 #ifdef RTE_LIBRTE_RING_DEBUG
109 /**
110  * A structure that stores the ring statistics (per-lcore).
111  */
112 struct rte_ring_debug_stats {
113         uint64_t enq_success_bulk; /**< Successful enqueues number. */
114         uint64_t enq_success_objs; /**< Objects successfully enqueued. */
115         uint64_t enq_quota_bulk;   /**< Successful enqueues above watermark. */
116         uint64_t enq_quota_objs;   /**< Objects enqueued above watermark. */
117         uint64_t enq_fail_bulk;    /**< Failed enqueues number. */
118         uint64_t enq_fail_objs;    /**< Objects that failed to be enqueued. */
119         uint64_t deq_success_bulk; /**< Successful dequeues number. */
120         uint64_t deq_success_objs; /**< Objects successfully dequeued. */
121         uint64_t deq_fail_bulk;    /**< Failed dequeues number. */
122         uint64_t deq_fail_objs;    /**< Objects that failed to be dequeued. */
123 } __rte_cache_aligned;
124 #endif
125
126 #define RTE_RING_NAMESIZE 32 /**< The maximum length of a ring name. */
127
128 /**
129  * An RTE ring structure.
130  *
131  * The producer and the consumer have a head and a tail index. The particularity
132  * of these index is that they are not between 0 and size(ring). These indexes
133  * are between 0 and 2^32, and we mask their value when we access the ring[]
134  * field. Thanks to this assumption, we can do subtractions between 2 index
135  * values in a modulo-32bit base: that's why the overflow of the indexes is not
136  * a problem.
137  */
138 struct rte_ring {
139         TAILQ_ENTRY(rte_ring) next;      /**< Next in list. */
140
141         char name[RTE_RING_NAMESIZE];    /**< Name of the ring. */
142         int flags;                       /**< Flags supplied at creation. */
143
144         /** Ring producer status. */
145         struct prod {
146                 uint32_t watermark;      /**< Maximum items before EDQUOT. */
147                 uint32_t sp_enqueue;     /**< True, if single producer. */
148                 uint32_t size;           /**< Size of ring. */
149                 uint32_t mask;           /**< Mask (size-1) of ring. */
150                 volatile uint32_t head;  /**< Producer head. */
151                 volatile uint32_t tail;  /**< Producer tail. */
152         } prod __rte_cache_aligned;
153
154         /** Ring consumer status. */
155         struct cons {
156                 uint32_t sc_dequeue;     /**< True, if single consumer. */
157                 uint32_t size;           /**< Size of the ring. */
158                 uint32_t mask;           /**< Mask (size-1) of ring. */
159                 volatile uint32_t head;  /**< Consumer head. */
160                 volatile uint32_t tail;  /**< Consumer tail. */
161 #ifdef RTE_RING_SPLIT_PROD_CONS
162         } cons __rte_cache_aligned;
163 #else
164         } cons;
165 #endif
166
167 #ifdef RTE_LIBRTE_RING_DEBUG
168         struct rte_ring_debug_stats stats[RTE_MAX_LCORE];
169 #endif
170
171         void * ring[0] __rte_cache_aligned; /**< Memory space of ring starts here.
172                                                                                  * not volatile so need to be careful
173                                                                                  * about compiler re-ordering */
174 };
175
176 #define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */
177 #define RING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */
178 #define RTE_RING_QUOT_EXCEED (1 << 31)  /**< Quota exceed for burst ops */
179 #define RTE_RING_SZ_MASK  (unsigned)(0x0fffffff) /**< Ring size mask */
180
181 /**
182  * @internal When debug is enabled, store ring statistics.
183  * @param r
184  *   A pointer to the ring.
185  * @param name
186  *   The name of the statistics field to increment in the ring.
187  * @param n
188  *   The number to add to the object-oriented statistics.
189  */
190 #ifdef RTE_LIBRTE_RING_DEBUG
191 #define __RING_STAT_ADD(r, name, n) do {                \
192                 unsigned __lcore_id = rte_lcore_id();   \
193                 r->stats[__lcore_id].name##_objs += n;  \
194                 r->stats[__lcore_id].name##_bulk += 1;  \
195         } while(0)
196 #else
197 #define __RING_STAT_ADD(r, name, n) do {} while(0)
198 #endif
199
200 /**
201  * Create a new ring named *name* in memory.
202  *
203  * This function uses ``memzone_reserve()`` to allocate memory. Its size is
204  * set to *count*, which must be a power of two. Water marking is
205  * disabled by default.
206  * Note that the real usable ring size is *count-1* instead of
207  * *count*.
208  *
209  * @param name
210  *   The name of the ring.
211  * @param count
212  *   The size of the ring (must be a power of 2).
213  * @param socket_id
214  *   The *socket_id* argument is the socket identifier in case of
215  *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
216  *   constraint for the reserved zone.
217  * @param flags
218  *   An OR of the following:
219  *    - RING_F_SP_ENQ: If this flag is set, the default behavior when
220  *      using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
221  *      is "single-producer". Otherwise, it is "multi-producers".
222  *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
223  *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
224  *      is "single-consumer". Otherwise, it is "multi-consumers".
225  * @return
226  *   On success, the pointer to the new allocated ring. NULL on error with
227  *    rte_errno set appropriately. Possible errno values include:
228  *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
229  *    - E_RTE_SECONDARY - function was called from a secondary process instance
230  *    - E_RTE_NO_TAILQ - no tailq list could be got for the ring list
231  *    - EINVAL - count provided is not a power of 2
232  *    - ENOSPC - the maximum number of memzones has already been allocated
233  *    - EEXIST - a memzone with the same name already exists
234  *    - ENOMEM - no appropriate memory area found in which to create memzone
235  */
236 struct rte_ring *rte_ring_create(const char *name, unsigned count,
237                                  int socket_id, unsigned flags);
238
239 /**
240  * Change the high water mark.
241  *
242  * If *count* is 0, water marking is disabled. Otherwise, it is set to the
243  * *count* value. The *count* value must be greater than 0 and less
244  * than the ring size.
245  *
246  * This function can be called at any time (not necessarily at
247  * initialization).
248  *
249  * @param r
250  *   A pointer to the ring structure.
251  * @param count
252  *   The new water mark value.
253  * @return
254  *   - 0: Success; water mark changed.
255  *   - -EINVAL: Invalid water mark value.
256  */
257 int rte_ring_set_water_mark(struct rte_ring *r, unsigned count);
258
259 /**
260  * Dump the status of the ring to the console.
261  *
262  * @param r
263  *   A pointer to the ring structure.
264  */
265 void rte_ring_dump(const struct rte_ring *r);
266
267 /* the actual enqueue of pointers on the ring. 
268  * Placed here since identical code needed in both
269  * single and multi producer enqueue functions */
270 #define ENQUEUE_PTRS() do { \
271         const uint32_t size = r->prod.size; \
272         uint32_t idx = prod_head & mask; \
273         if (likely(idx + n < size)) { \
274                 for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
275                         r->ring[idx] = obj_table[i]; \
276                         r->ring[idx+1] = obj_table[i+1]; \
277                         r->ring[idx+2] = obj_table[i+2]; \
278                         r->ring[idx+3] = obj_table[i+3]; \
279                 } \
280                 switch (n & 0x3) { \
281                         case 3: r->ring[idx++] = obj_table[i++]; \
282                         case 2: r->ring[idx++] = obj_table[i++]; \
283                         case 1: r->ring[idx++] = obj_table[i++]; \
284                 } \
285         } else { \
286                 for (i = 0; idx < size; i++, idx++)\
287                         r->ring[idx] = obj_table[i]; \
288                 for (idx = 0; i < n; i++, idx++) \
289                         r->ring[idx] = obj_table[i]; \
290         } \
291 } while(0)
292
293 /* the actual copy of pointers on the ring to obj_table. 
294  * Placed here since identical code needed in both
295  * single and multi consumer dequeue functions */
296 #define DEQUEUE_PTRS() do { \
297         uint32_t idx = cons_head & mask; \
298         const uint32_t size = r->cons.size; \
299         if (likely(idx + n < size)) { \
300                 for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\
301                         obj_table[i] = r->ring[idx]; \
302                         obj_table[i+1] = r->ring[idx+1]; \
303                         obj_table[i+2] = r->ring[idx+2]; \
304                         obj_table[i+3] = r->ring[idx+3]; \
305                 } \
306                 switch (n & 0x3) { \
307                         case 3: obj_table[i++] = r->ring[idx++]; \
308                         case 2: obj_table[i++] = r->ring[idx++]; \
309                         case 1: obj_table[i++] = r->ring[idx++]; \
310                 } \
311         } else { \
312                 for (i = 0; idx < size; i++, idx++) \
313                         obj_table[i] = r->ring[idx]; \
314                 for (idx = 0; i < n; i++, idx++) \
315                         obj_table[i] = r->ring[idx]; \
316         } \
317 } while (0)
318
319 /**
320  * @internal Enqueue several objects on the ring (multi-producers safe).
321  *
322  * This function uses a "compare and set" instruction to move the
323  * producer index atomically.
324  *
325  * @param r
326  *   A pointer to the ring structure.
327  * @param obj_table
328  *   A pointer to a table of void * pointers (objects).
329  * @param n
330  *   The number of objects to add in the ring from the obj_table.
331  * @param behavior
332  *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
333  *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring
334  * @return
335  *   Depend on the behavior value
336  *   if behavior = RTE_RING_QUEUE_FIXED
337  *   - 0: Success; objects enqueue.
338  *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
339  *     high water mark is exceeded.
340  *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
341  *   if behavior = RTE_RING_QUEUE_VARIABLE
342  *   - n: Actual number of objects enqueued.
343  */
344 static inline int __attribute__((always_inline))
345 __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
346                          unsigned n, enum rte_ring_queue_behavior behavior)
347 {
348         uint32_t prod_head, prod_next;
349         uint32_t cons_tail, free_entries;
350         const unsigned max = n;
351         int success;
352         unsigned i;
353         uint32_t mask = r->prod.mask;
354         int ret;
355
356         /* move prod.head atomically */
357         do {
358                 /* Reset n to the initial burst count */
359                 n = max;
360
361                 prod_head = r->prod.head;
362                 cons_tail = r->cons.tail;
363                 /* The subtraction is done between two unsigned 32bits value
364                  * (the result is always modulo 32 bits even if we have
365                  * prod_head > cons_tail). So 'free_entries' is always between 0
366                  * and size(ring)-1. */
367                 free_entries = (mask + cons_tail - prod_head);
368
369                 /* check that we have enough room in ring */
370                 if (unlikely(n > free_entries)) {
371                         if (behavior == RTE_RING_QUEUE_FIXED) {
372                                 __RING_STAT_ADD(r, enq_fail, n);
373                                 return -ENOBUFS;
374                         }
375                         else {
376                                 /* No free entry available */
377                                 if (unlikely(free_entries == 0)) {
378                                         __RING_STAT_ADD(r, enq_fail, n);
379                                         return 0;
380                                 }
381
382                                 n = free_entries;
383                         }
384                 }
385
386                 prod_next = prod_head + n;
387                 success = rte_atomic32_cmpset(&r->prod.head, prod_head,
388                                               prod_next);
389         } while (unlikely(success == 0));
390
391         /* write entries in ring */
392         ENQUEUE_PTRS();
393         rte_compiler_barrier();
394
395         /* if we exceed the watermark */
396         if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
397                 ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
398                                 (int)(n | RTE_RING_QUOT_EXCEED);
399                 __RING_STAT_ADD(r, enq_quota, n);
400         }
401         else {
402                 ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
403                 __RING_STAT_ADD(r, enq_success, n);
404         }
405
406         /*
407          * If there are other enqueues in progress that preceeded us,
408          * we need to wait for them to complete
409          */
410         while (unlikely(r->prod.tail != prod_head))
411                 rte_pause();
412
413         r->prod.tail = prod_next;
414         return ret;
415 }
416
417 /**
418  * @internal Enqueue several objects on a ring (NOT multi-producers safe).
419  *
420  * @param r
421  *   A pointer to the ring structure.
422  * @param obj_table
423  *   A pointer to a table of void * pointers (objects).
424  * @param n
425  *   The number of objects to add in the ring from the obj_table.
426  * @param behavior
427  *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
428  *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring
429  * @return
430  *   Depend on the behavior value
431  *   if behavior = RTE_RING_QUEUE_FIXED
432  *   - 0: Success; objects enqueue.
433  *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
434  *     high water mark is exceeded.
435  *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
436  *   if behavior = RTE_RING_QUEUE_VARIABLE
437  *   - n: Actual number of objects enqueued.
438  */
439 static inline int __attribute__((always_inline))
440 __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
441                          unsigned n, enum rte_ring_queue_behavior behavior)
442 {
443         uint32_t prod_head, cons_tail;
444         uint32_t prod_next, free_entries;
445         unsigned i;
446         uint32_t mask = r->prod.mask;
447         int ret;
448
449         prod_head = r->prod.head;
450         cons_tail = r->cons.tail;
451         /* The subtraction is done between two unsigned 32bits value
452          * (the result is always modulo 32 bits even if we have
453          * prod_head > cons_tail). So 'free_entries' is always between 0
454          * and size(ring)-1. */
455         free_entries = mask + cons_tail - prod_head;
456
457         /* check that we have enough room in ring */
458         if (unlikely(n > free_entries)) {
459                 if (behavior == RTE_RING_QUEUE_FIXED) {
460                         __RING_STAT_ADD(r, enq_fail, n);
461                         return -ENOBUFS;
462                 }
463                 else {
464                         /* No free entry available */
465                         if (unlikely(free_entries == 0)) {
466                                 __RING_STAT_ADD(r, enq_fail, n);
467                                 return 0;
468                         }
469
470                         n = free_entries;
471                 }
472         }
473
474         prod_next = prod_head + n;
475         r->prod.head = prod_next;
476
477         /* write entries in ring */
478         ENQUEUE_PTRS();
479         rte_compiler_barrier();
480
481         /* if we exceed the watermark */
482         if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
483                 ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
484                         (int)(n | RTE_RING_QUOT_EXCEED);
485                 __RING_STAT_ADD(r, enq_quota, n);
486         }
487         else {
488                 ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
489                 __RING_STAT_ADD(r, enq_success, n);
490         }
491
492         r->prod.tail = prod_next;
493         return ret;
494 }
495
496 /**
497  * @internal Dequeue several objects from a ring (multi-consumers safe). When
498  * the request objects are more than the available objects, only dequeue the
499  * actual number of objects
500  *
501  * This function uses a "compare and set" instruction to move the
502  * consumer index atomically.
503  *
504  * @param r
505  *   A pointer to the ring structure.
506  * @param obj_table
507  *   A pointer to a table of void * pointers (objects) that will be filled.
508  * @param n
509  *   The number of objects to dequeue from the ring to the obj_table.
510  * @param behavior
511  *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
512  *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring
513  * @return
514  *   Depend on the behavior value
515  *   if behavior = RTE_RING_QUEUE_FIXED
516  *   - 0: Success; objects dequeued.
517  *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
518  *     dequeued.
519  *   if behavior = RTE_RING_QUEUE_VARIABLE
520  *   - n: Actual number of objects dequeued.
521  */
522
523 static inline int __attribute__((always_inline))
524 __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
525                  unsigned n, enum rte_ring_queue_behavior behavior)
526 {
527         uint32_t cons_head, prod_tail;
528         uint32_t cons_next, entries;
529         const unsigned max = n;
530         int success;
531         unsigned i;
532         uint32_t mask = r->prod.mask;
533
534         /* move cons.head atomically */
535         do {
536                 /* Restore n as it may change every loop */
537                 n = max;
538
539                 cons_head = r->cons.head;
540                 prod_tail = r->prod.tail;
541                 /* The subtraction is done between two unsigned 32bits value
542                  * (the result is always modulo 32 bits even if we have
543                  * cons_head > prod_tail). So 'entries' is always between 0
544                  * and size(ring)-1. */
545                 entries = (prod_tail - cons_head);
546
547                 /* Set the actual entries for dequeue */
548                 if (n > entries) {
549                         if (behavior == RTE_RING_QUEUE_FIXED) {
550                                 __RING_STAT_ADD(r, deq_fail, n);
551                                 return -ENOENT;
552                         }
553                         else {
554                                 if (unlikely(entries == 0)){
555                                         __RING_STAT_ADD(r, deq_fail, n);
556                                         return 0;
557                                 }
558
559                                 n = entries;
560                         }
561                 }
562
563                 cons_next = cons_head + n;
564                 success = rte_atomic32_cmpset(&r->cons.head, cons_head,
565                                               cons_next);
566         } while (unlikely(success == 0));
567
568         /* copy in table */
569         DEQUEUE_PTRS();
570         rte_compiler_barrier();
571
572         /*
573          * If there are other dequeues in progress that preceded us,
574          * we need to wait for them to complete
575          */
576         while (unlikely(r->cons.tail != cons_head))
577                 rte_pause();
578
579         __RING_STAT_ADD(r, deq_success, n);
580         r->cons.tail = cons_next;
581
582         return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
583 }
584
585 /**
586  * @internal Dequeue several objects from a ring (NOT multi-consumers safe).
587  * When the request objects are more than the available objects, only dequeue
588  * the actual number of objects
589  *
590  * @param r
591  *   A pointer to the ring structure.
592  * @param obj_table
593  *   A pointer to a table of void * pointers (objects) that will be filled.
594  * @param n
595  *   The number of objects to dequeue from the ring to the obj_table.
596  * @param behavior
597  *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
598  *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring
599  * @return
600  *   Depend on the behavior value
601  *   if behavior = RTE_RING_QUEUE_FIXED
602  *   - 0: Success; objects dequeued.
603  *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
604  *     dequeued.
605  *   if behavior = RTE_RING_QUEUE_VARIABLE
606  *   - n: Actual number of objects dequeued.
607  */
608 static inline int __attribute__((always_inline))
609 __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
610                  unsigned n, enum rte_ring_queue_behavior behavior)
611 {
612         uint32_t cons_head, prod_tail;
613         uint32_t cons_next, entries;
614         unsigned i;
615         uint32_t mask = r->prod.mask;
616
617         cons_head = r->cons.head;
618         prod_tail = r->prod.tail;
619         /* The subtraction is done between two unsigned 32bits value
620          * (the result is always modulo 32 bits even if we have
621          * cons_head > prod_tail). So 'entries' is always between 0
622          * and size(ring)-1. */
623         entries = prod_tail - cons_head;
624
625         if (n > entries) {
626                 if (behavior == RTE_RING_QUEUE_FIXED) {
627                         __RING_STAT_ADD(r, deq_fail, n);
628                         return -ENOENT;
629                 }
630                 else {
631                         if (unlikely(entries == 0)){
632                                 __RING_STAT_ADD(r, deq_fail, n);
633                                 return 0;
634                         }
635
636                         n = entries;
637                 }
638         }
639
640         cons_next = cons_head + n;
641         r->cons.head = cons_next;
642
643         /* copy in table */
644         DEQUEUE_PTRS();
645         rte_compiler_barrier();
646
647         __RING_STAT_ADD(r, deq_success, n);
648         r->cons.tail = cons_next;
649         return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
650 }
651
652 /**
653  * Enqueue several objects on the ring (multi-producers safe).
654  *
655  * This function uses a "compare and set" instruction to move the
656  * producer index atomically.
657  *
658  * @param r
659  *   A pointer to the ring structure.
660  * @param obj_table
661  *   A pointer to a table of void * pointers (objects).
662  * @param n
663  *   The number of objects to add in the ring from the obj_table.
664  * @return
665  *   - 0: Success; objects enqueue.
666  *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
667  *     high water mark is exceeded.
668  *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
669  */
670 static inline int __attribute__((always_inline))
671 rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
672                          unsigned n)
673 {
674         return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
675 }
676
677 /**
678  * Enqueue several objects on a ring (NOT multi-producers safe).
679  *
680  * @param r
681  *   A pointer to the ring structure.
682  * @param obj_table
683  *   A pointer to a table of void * pointers (objects).
684  * @param n
685  *   The number of objects to add in the ring from the obj_table.
686  * @return
687  *   - 0: Success; objects enqueued.
688  *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
689  *     high water mark is exceeded.
690  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
691  */
692 static inline int __attribute__((always_inline))
693 rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
694                          unsigned n)
695 {
696         return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
697 }
698
699 /**
700  * Enqueue several objects on a ring.
701  *
702  * This function calls the multi-producer or the single-producer
703  * version depending on the default behavior that was specified at
704  * ring creation time (see flags).
705  *
706  * @param r
707  *   A pointer to the ring structure.
708  * @param obj_table
709  *   A pointer to a table of void * pointers (objects).
710  * @param n
711  *   The number of objects to add in the ring from the obj_table.
712  * @return
713  *   - 0: Success; objects enqueued.
714  *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
715  *     high water mark is exceeded.
716  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
717  */
718 static inline int __attribute__((always_inline))
719 rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
720                       unsigned n)
721 {
722         if (r->prod.sp_enqueue)
723                 return rte_ring_sp_enqueue_bulk(r, obj_table, n);
724         else
725                 return rte_ring_mp_enqueue_bulk(r, obj_table, n);
726 }
727
728 /**
729  * Enqueue one object on a ring (multi-producers safe).
730  *
731  * This function uses a "compare and set" instruction to move the
732  * producer index atomically.
733  *
734  * @param r
735  *   A pointer to the ring structure.
736  * @param obj
737  *   A pointer to the object to be added.
738  * @return
739  *   - 0: Success; objects enqueued.
740  *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
741  *     high water mark is exceeded.
742  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
743  */
744 static inline int __attribute__((always_inline))
745 rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
746 {
747         return rte_ring_mp_enqueue_bulk(r, &obj, 1);
748 }
749
750 /**
751  * Enqueue one object on a ring (NOT multi-producers safe).
752  *
753  * @param r
754  *   A pointer to the ring structure.
755  * @param obj
756  *   A pointer to the object to be added.
757  * @return
758  *   - 0: Success; objects enqueued.
759  *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
760  *     high water mark is exceeded.
761  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
762  */
763 static inline int __attribute__((always_inline))
764 rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
765 {
766         return rte_ring_sp_enqueue_bulk(r, &obj, 1);
767 }
768
769 /**
770  * Enqueue one object on a ring.
771  *
772  * This function calls the multi-producer or the single-producer
773  * version, depending on the default behaviour that was specified at
774  * ring creation time (see flags).
775  *
776  * @param r
777  *   A pointer to the ring structure.
778  * @param obj
779  *   A pointer to the object to be added.
780  * @return
781  *   - 0: Success; objects enqueued.
782  *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
783  *     high water mark is exceeded.
784  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
785  */
786 static inline int __attribute__((always_inline))
787 rte_ring_enqueue(struct rte_ring *r, void *obj)
788 {
789         if (r->prod.sp_enqueue)
790                 return rte_ring_sp_enqueue(r, obj);
791         else
792                 return rte_ring_mp_enqueue(r, obj);
793 }
794
795 /**
796  * Dequeue several objects from a ring (multi-consumers safe).
797  *
798  * This function uses a "compare and set" instruction to move the
799  * consumer index atomically.
800  *
801  * @param r
802  *   A pointer to the ring structure.
803  * @param obj_table
804  *   A pointer to a table of void * pointers (objects) that will be filled.
805  * @param n
806  *   The number of objects to dequeue from the ring to the obj_table.
807  * @return
808  *   - 0: Success; objects dequeued.
809  *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
810  *     dequeued.
811  */
812 static inline int __attribute__((always_inline))
813 rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
814 {
815         return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
816 }
817
818 /**
819  * Dequeue several objects from a ring (NOT multi-consumers safe).
820  *
821  * @param r
822  *   A pointer to the ring structure.
823  * @param obj_table
824  *   A pointer to a table of void * pointers (objects) that will be filled.
825  * @param n
826  *   The number of objects to dequeue from the ring to the obj_table,
827  *   must be strictly positive.
828  * @return
829  *   - 0: Success; objects dequeued.
830  *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
831  *     dequeued.
832  */
833 static inline int __attribute__((always_inline))
834 rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
835 {
836         return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
837 }
838
839 /**
840  * Dequeue several objects from a ring.
841  *
842  * This function calls the multi-consumers or the single-consumer
843  * version, depending on the default behaviour that was specified at
844  * ring creation time (see flags).
845  *
846  * @param r
847  *   A pointer to the ring structure.
848  * @param obj_table
849  *   A pointer to a table of void * pointers (objects) that will be filled.
850  * @param n
851  *   The number of objects to dequeue from the ring to the obj_table.
852  * @return
853  *   - 0: Success; objects dequeued.
854  *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
855  *     dequeued.
856  */
857 static inline int __attribute__((always_inline))
858 rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
859 {
860         if (r->cons.sc_dequeue)
861                 return rte_ring_sc_dequeue_bulk(r, obj_table, n);
862         else
863                 return rte_ring_mc_dequeue_bulk(r, obj_table, n);
864 }
865
866 /**
867  * Dequeue one object from a ring (multi-consumers safe).
868  *
869  * This function uses a "compare and set" instruction to move the
870  * consumer index atomically.
871  *
872  * @param r
873  *   A pointer to the ring structure.
874  * @param obj_p
875  *   A pointer to a void * pointer (object) that will be filled.
876  * @return
877  *   - 0: Success; objects dequeued.
878  *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
879  *     dequeued.
880  */
881 static inline int __attribute__((always_inline))
882 rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
883 {
884         return rte_ring_mc_dequeue_bulk(r, obj_p, 1);
885 }
886
887 /**
888  * Dequeue one object from a ring (NOT multi-consumers safe).
889  *
890  * @param r
891  *   A pointer to the ring structure.
892  * @param obj_p
893  *   A pointer to a void * pointer (object) that will be filled.
894  * @return
895  *   - 0: Success; objects dequeued.
896  *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
897  *     dequeued.
898  */
899 static inline int __attribute__((always_inline))
900 rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
901 {
902         return rte_ring_sc_dequeue_bulk(r, obj_p, 1);
903 }
904
905 /**
906  * Dequeue one object from a ring.
907  *
908  * This function calls the multi-consumers or the single-consumer
909  * version depending on the default behaviour that was specified at
910  * ring creation time (see flags).
911  *
912  * @param r
913  *   A pointer to the ring structure.
914  * @param obj_p
915  *   A pointer to a void * pointer (object) that will be filled.
916  * @return
917  *   - 0: Success, objects dequeued.
918  *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
919  *     dequeued.
920  */
921 static inline int __attribute__((always_inline))
922 rte_ring_dequeue(struct rte_ring *r, void **obj_p)
923 {
924         if (r->cons.sc_dequeue)
925                 return rte_ring_sc_dequeue(r, obj_p);
926         else
927                 return rte_ring_mc_dequeue(r, obj_p);
928 }
929
930 /**
931  * Test if a ring is full.
932  *
933  * @param r
934  *   A pointer to the ring structure.
935  * @return
936  *   - 1: The ring is full.
937  *   - 0: The ring is not full.
938  */
939 static inline int
940 rte_ring_full(const struct rte_ring *r)
941 {
942         uint32_t prod_tail = r->prod.tail;
943         uint32_t cons_tail = r->cons.tail;
944         return (((cons_tail - prod_tail - 1) & r->prod.mask) == 0);
945 }
946
947 /**
948  * Test if a ring is empty.
949  *
950  * @param r
951  *   A pointer to the ring structure.
952  * @return
953  *   - 1: The ring is empty.
954  *   - 0: The ring is not empty.
955  */
956 static inline int
957 rte_ring_empty(const struct rte_ring *r)
958 {
959         uint32_t prod_tail = r->prod.tail;
960         uint32_t cons_tail = r->cons.tail;
961         return !!(cons_tail == prod_tail);
962 }
963
964 /**
965  * Return the number of entries in a ring.
966  *
967  * @param r
968  *   A pointer to the ring structure.
969  * @return
970  *   The number of entries in the ring.
971  */
972 static inline unsigned
973 rte_ring_count(const struct rte_ring *r)
974 {
975         uint32_t prod_tail = r->prod.tail;
976         uint32_t cons_tail = r->cons.tail;
977         return ((prod_tail - cons_tail) & r->prod.mask);
978 }
979
980 /**
981  * Return the number of free entries in a ring.
982  *
983  * @param r
984  *   A pointer to the ring structure.
985  * @return
986  *   The number of free entries in the ring.
987  */
988 static inline unsigned
989 rte_ring_free_count(const struct rte_ring *r)
990 {
991         uint32_t prod_tail = r->prod.tail;
992         uint32_t cons_tail = r->cons.tail;
993         return ((cons_tail - prod_tail - 1) & r->prod.mask);
994 }
995
996 /**
997  * Dump the status of all rings on the console
998  */
999 void rte_ring_list_dump(void);
1000
1001 /**
1002  * Search a ring from its name
1003  *
1004  * @param name
1005  *   The name of the ring.
1006  * @return
1007  *   The pointer to the ring matching the name, or NULL if not found,
1008  *   with rte_errno set appropriately. Possible rte_errno values include:
1009  *    - ENOENT - required entry not available to return.
1010  */
1011 struct rte_ring *rte_ring_lookup(const char *name);
1012
1013 /**
1014  * Enqueue several objects on the ring (multi-producers safe).
1015  *
1016  * This function uses a "compare and set" instruction to move the
1017  * producer index atomically.
1018  *
1019  * @param r
1020  *   A pointer to the ring structure.
1021  * @param obj_table
1022  *   A pointer to a table of void * pointers (objects).
1023  * @param n
1024  *   The number of objects to add in the ring from the obj_table.
1025  * @return
1026  *   - n: Actual number of objects enqueued.
1027  */
1028 static inline int __attribute__((always_inline))
1029 rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
1030                          unsigned n)
1031 {
1032         return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
1033 }
1034
1035 /**
1036  * Enqueue several objects on a ring (NOT multi-producers safe).
1037  *
1038  * @param r
1039  *   A pointer to the ring structure.
1040  * @param obj_table
1041  *   A pointer to a table of void * pointers (objects).
1042  * @param n
1043  *   The number of objects to add in the ring from the obj_table.
1044  * @return
1045  *   - n: Actual number of objects enqueued.
1046  */
1047 static inline int __attribute__((always_inline))
1048 rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
1049                          unsigned n)
1050 {
1051         return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
1052 }
1053
1054 /**
1055  * Enqueue several objects on a ring.
1056  *
1057  * This function calls the multi-producer or the single-producer
1058  * version depending on the default behavior that was specified at
1059  * ring creation time (see flags).
1060  *
1061  * @param r
1062  *   A pointer to the ring structure.
1063  * @param obj_table
1064  *   A pointer to a table of void * pointers (objects).
1065  * @param n
1066  *   The number of objects to add in the ring from the obj_table.
1067  * @return
1068  *   - n: Actual number of objects enqueued.
1069  */
1070 static inline int __attribute__((always_inline))
1071 rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
1072                       unsigned n)
1073 {
1074         if (r->prod.sp_enqueue)
1075                 return  rte_ring_sp_enqueue_burst(r, obj_table, n);
1076         else
1077                 return  rte_ring_mp_enqueue_burst(r, obj_table, n);
1078 }
1079
1080 /**
1081  * Dequeue several objects from a ring (multi-consumers safe). When the request
1082  * objects are more than the available objects, only dequeue the actual number
1083  * of objects
1084  *
1085  * This function uses a "compare and set" instruction to move the
1086  * consumer index atomically.
1087  *
1088  * @param r
1089  *   A pointer to the ring structure.
1090  * @param obj_table
1091  *   A pointer to a table of void * pointers (objects) that will be filled.
1092  * @param n
1093  *   The number of objects to dequeue from the ring to the obj_table.
1094  * @return
1095  *   - n: Actual number of objects dequeued, 0 if ring is empty
1096  */
1097 static inline int __attribute__((always_inline))
1098 rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
1099 {
1100         return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
1101 }
1102
1103 /**
1104  * Dequeue several objects from a ring (NOT multi-consumers safe).When the
1105  * request objects are more than the available objects, only dequeue the
1106  * actual number of objects
1107  *
1108  * @param r
1109  *   A pointer to the ring structure.
1110  * @param obj_table
1111  *   A pointer to a table of void * pointers (objects) that will be filled.
1112  * @param n
1113  *   The number of objects to dequeue from the ring to the obj_table.
1114  * @return
1115  *   - n: Actual number of objects dequeued, 0 if ring is empty
1116  */
1117 static inline int __attribute__((always_inline))
1118 rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
1119 {
1120         return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
1121 }
1122
1123 /**
1124  * Dequeue multiple objects from a ring up to a maximum number.
1125  *
1126  * This function calls the multi-consumers or the single-consumer
1127  * version, depending on the default behaviour that was specified at
1128  * ring creation time (see flags).
1129  *
1130  * @param r
1131  *   A pointer to the ring structure.
1132  * @param obj_table
1133  *   A pointer to a table of void * pointers (objects) that will be filled.
1134  * @param n
1135  *   The number of objects to dequeue from the ring to the obj_table.
1136  * @return
1137  *   - Number of objects dequeued, or a negative error code on error
1138  */
1139 static inline int __attribute__((always_inline))
1140 rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
1141 {
1142         if (r->cons.sc_dequeue)
1143                 return rte_ring_sc_dequeue_burst(r, obj_table, n);
1144         else
1145                 return rte_ring_mc_dequeue_burst(r, obj_table, n);
1146 }
1147
1148 #ifdef __cplusplus
1149 }
1150 #endif
1151
1152 #endif /* _RTE_RING_H_ */