update Intel copyright years to 2014
[dpdk.git] / lib / librte_ring / rte_ring.h
1 /*-
2  *   BSD LICENSE
3  * 
4  *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  * 
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  * 
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  * 
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 /*
35  * Derived from FreeBSD's bufring.h
36  *
37  **************************************************************************
38  *
39  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
40  * All rights reserved.
41  *
42  * Redistribution and use in source and binary forms, with or without
43  * modification, are permitted provided that the following conditions are met:
44  *
45  * 1. Redistributions of source code must retain the above copyright notice,
46  *    this list of conditions and the following disclaimer.
47  *
48  * 2. The name of Kip Macy nor the names of other
49  *    contributors may be used to endorse or promote products derived from
50  *    this software without specific prior written permission.
51  *
52  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
53  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
54  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
55  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
56  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
57  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
58  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
59  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
60  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
61  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
62  * POSSIBILITY OF SUCH DAMAGE.
63  *
64  ***************************************************************************/
65
66 #ifndef _RTE_RING_H_
67 #define _RTE_RING_H_
68
69 /**
70  * @file
71  * RTE Ring
72  *
73  * The Ring Manager is a fixed-size queue, implemented as a table of
74  * pointers. Head and tail pointers are modified atomically, allowing
75  * concurrent access to it. It has the following features:
76  *
77  * - FIFO (First In First Out)
78  * - Maximum size is fixed; the pointers are stored in a table.
79  * - Lockless implementation.
80  * - Multi- or single-consumer dequeue.
81  * - Multi- or single-producer enqueue.
82  * - Bulk dequeue.
83  * - Bulk enqueue.
84  *
85  * Note: the ring implementation is not preemptable. A lcore must not
86  * be interrupted by another task that uses the same ring.
87  *
88  */
89
90 #ifdef __cplusplus
91 extern "C" {
92 #endif
93
94 #include <stdint.h>
95 #include <sys/queue.h>
96 #include <errno.h>
97 #include <rte_common.h>
98 #include <rte_memory.h>
99 #include <rte_lcore.h>
100 #include <rte_atomic.h>
101 #include <rte_branch_prediction.h>
102
103 enum rte_ring_queue_behavior {
104         RTE_RING_QUEUE_FIXED = 0, /* Enq/Deq a fixed number of items from a ring */
105         RTE_RING_QUEUE_VARIABLE   /* Enq/Deq as many items a possible from ring */
106 };
107
108 #ifdef RTE_LIBRTE_RING_DEBUG
109 /**
110  * A structure that stores the ring statistics (per-lcore).
111  */
112 struct rte_ring_debug_stats {
113         uint64_t enq_success_bulk; /**< Successful enqueues number. */
114         uint64_t enq_success_objs; /**< Objects successfully enqueued. */
115         uint64_t enq_quota_bulk;   /**< Successful enqueues above watermark. */
116         uint64_t enq_quota_objs;   /**< Objects enqueued above watermark. */
117         uint64_t enq_fail_bulk;    /**< Failed enqueues number. */
118         uint64_t enq_fail_objs;    /**< Objects that failed to be enqueued. */
119         uint64_t deq_success_bulk; /**< Successful dequeues number. */
120         uint64_t deq_success_objs; /**< Objects successfully dequeued. */
121         uint64_t deq_fail_bulk;    /**< Failed dequeues number. */
122         uint64_t deq_fail_objs;    /**< Objects that failed to be dequeued. */
123 } __rte_cache_aligned;
124 #endif
125
126 #define RTE_RING_NAMESIZE 32 /**< The maximum length of a ring name. */
127
128 /**
129  * An RTE ring structure.
130  *
131  * The producer and the consumer have a head and a tail index. The particularity
132  * of these index is that they are not between 0 and size(ring). These indexes
133  * are between 0 and 2^32, and we mask their value when we access the ring[]
134  * field. Thanks to this assumption, we can do subtractions between 2 index
135  * values in a modulo-32bit base: that's why the overflow of the indexes is not
136  * a problem.
137  */
138 struct rte_ring {
139         TAILQ_ENTRY(rte_ring) next;      /**< Next in list. */
140
141         char name[RTE_RING_NAMESIZE];    /**< Name of the ring. */
142         int flags;                       /**< Flags supplied at creation. */
143
144         /** Ring producer status. */
145         struct prod {
146                 uint32_t watermark;      /**< Maximum items before EDQUOT. */
147                 uint32_t sp_enqueue;     /**< True, if single producer. */
148                 uint32_t size;           /**< Size of ring. */
149                 uint32_t mask;           /**< Mask (size-1) of ring. */
150                 volatile uint32_t head;  /**< Producer head. */
151                 volatile uint32_t tail;  /**< Producer tail. */
152         } prod __rte_cache_aligned;
153
154         /** Ring consumer status. */
155         struct cons {
156                 uint32_t sc_dequeue;     /**< True, if single consumer. */
157                 uint32_t size;           /**< Size of the ring. */
158                 uint32_t mask;           /**< Mask (size-1) of ring. */
159                 volatile uint32_t head;  /**< Consumer head. */
160                 volatile uint32_t tail;  /**< Consumer tail. */
161 #ifdef RTE_RING_SPLIT_PROD_CONS
162         } cons __rte_cache_aligned;
163 #else
164         } cons;
165 #endif
166
167 #ifdef RTE_LIBRTE_RING_DEBUG
168         struct rte_ring_debug_stats stats[RTE_MAX_LCORE];
169 #endif
170
171         void * ring[0] __rte_cache_aligned; /**< Memory space of ring starts here.
172                                                                                  * not volatile so need to be careful
173                                                                                  * about compiler re-ordering */
174 };
175
176 /* dummy assembly operation to prevent compiler re-ordering of instructions */
177 #define COMPILER_BARRIER() do { asm volatile("" ::: "memory"); } while(0)
178
179 #define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */
180 #define RING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */
181 #define RTE_RING_QUOT_EXCEED (1 << 31)  /**< Quota exceed for burst ops */
182 #define RTE_RING_SZ_MASK  (unsigned)(0x0fffffff) /**< Ring size mask */
183
184 /**
185  * @internal When debug is enabled, store ring statistics.
186  * @param r
187  *   A pointer to the ring.
188  * @param name
189  *   The name of the statistics field to increment in the ring.
190  * @param n
191  *   The number to add to the object-oriented statistics.
192  */
193 #ifdef RTE_LIBRTE_RING_DEBUG
194 #define __RING_STAT_ADD(r, name, n) do {                \
195                 unsigned __lcore_id = rte_lcore_id();   \
196                 r->stats[__lcore_id].name##_objs += n;  \
197                 r->stats[__lcore_id].name##_bulk += 1;  \
198         } while(0)
199 #else
200 #define __RING_STAT_ADD(r, name, n) do {} while(0)
201 #endif
202
203 /**
204  * Create a new ring named *name* in memory.
205  *
206  * This function uses ``memzone_reserve()`` to allocate memory. Its size is
207  * set to *count*, which must be a power of two. Water marking is
208  * disabled by default.
209  * Note that the real usable ring size is *count-1* instead of
210  * *count*.
211  *
212  * @param name
213  *   The name of the ring.
214  * @param count
215  *   The size of the ring (must be a power of 2).
216  * @param socket_id
217  *   The *socket_id* argument is the socket identifier in case of
218  *   NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
219  *   constraint for the reserved zone.
220  * @param flags
221  *   An OR of the following:
222  *    - RING_F_SP_ENQ: If this flag is set, the default behavior when
223  *      using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()``
224  *      is "single-producer". Otherwise, it is "multi-producers".
225  *    - RING_F_SC_DEQ: If this flag is set, the default behavior when
226  *      using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()``
227  *      is "single-consumer". Otherwise, it is "multi-consumers".
228  * @return
229  *   On success, the pointer to the new allocated ring. NULL on error with
230  *    rte_errno set appropriately. Possible errno values include:
231  *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
232  *    - E_RTE_SECONDARY - function was called from a secondary process instance
233  *    - E_RTE_NO_TAILQ - no tailq list could be got for the ring list
234  *    - EINVAL - count provided is not a power of 2
235  *    - ENOSPC - the maximum number of memzones has already been allocated
236  *    - EEXIST - a memzone with the same name already exists
237  *    - ENOMEM - no appropriate memory area found in which to create memzone
238  */
239 struct rte_ring *rte_ring_create(const char *name, unsigned count,
240                                  int socket_id, unsigned flags);
241
242 /**
243  * Change the high water mark.
244  *
245  * If *count* is 0, water marking is disabled. Otherwise, it is set to the
246  * *count* value. The *count* value must be greater than 0 and less
247  * than the ring size.
248  *
249  * This function can be called at any time (not necessarily at
250  * initialization).
251  *
252  * @param r
253  *   A pointer to the ring structure.
254  * @param count
255  *   The new water mark value.
256  * @return
257  *   - 0: Success; water mark changed.
258  *   - -EINVAL: Invalid water mark value.
259  */
260 int rte_ring_set_water_mark(struct rte_ring *r, unsigned count);
261
262 /**
263  * Dump the status of the ring to the console.
264  *
265  * @param r
266  *   A pointer to the ring structure.
267  */
268 void rte_ring_dump(const struct rte_ring *r);
269
270 /* the actual enqueue of pointers on the ring. 
271  * Placed here since identical code needed in both
272  * single and multi producer enqueue functions */
273 #define ENQUEUE_PTRS() do { \
274         const uint32_t size = r->prod.size; \
275         uint32_t idx = prod_head & mask; \
276         if (likely(idx + n < size)) { \
277                 for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
278                         r->ring[idx] = obj_table[i]; \
279                         r->ring[idx+1] = obj_table[i+1]; \
280                         r->ring[idx+2] = obj_table[i+2]; \
281                         r->ring[idx+3] = obj_table[i+3]; \
282                 } \
283                 switch (n & 0x3) { \
284                         case 3: r->ring[idx++] = obj_table[i++]; \
285                         case 2: r->ring[idx++] = obj_table[i++]; \
286                         case 1: r->ring[idx++] = obj_table[i++]; \
287                 } \
288         } else { \
289                 for (i = 0; idx < size; i++, idx++)\
290                         r->ring[idx] = obj_table[i]; \
291                 for (idx = 0; i < n; i++, idx++) \
292                         r->ring[idx] = obj_table[i]; \
293         } \
294 } while(0)
295
296 /* the actual copy of pointers on the ring to obj_table. 
297  * Placed here since identical code needed in both
298  * single and multi consumer dequeue functions */
299 #define DEQUEUE_PTRS() do { \
300         uint32_t idx = cons_head & mask; \
301         const uint32_t size = r->cons.size; \
302         if (likely(idx + n < size)) { \
303                 for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\
304                         obj_table[i] = r->ring[idx]; \
305                         obj_table[i+1] = r->ring[idx+1]; \
306                         obj_table[i+2] = r->ring[idx+2]; \
307                         obj_table[i+3] = r->ring[idx+3]; \
308                 } \
309                 switch (n & 0x3) { \
310                         case 3: obj_table[i++] = r->ring[idx++]; \
311                         case 2: obj_table[i++] = r->ring[idx++]; \
312                         case 1: obj_table[i++] = r->ring[idx++]; \
313                 } \
314         } else { \
315                 for (i = 0; idx < size; i++, idx++) \
316                         obj_table[i] = r->ring[idx]; \
317                 for (idx = 0; i < n; i++, idx++) \
318                         obj_table[i] = r->ring[idx]; \
319         } \
320 } while (0)
321
322 /**
323  * @internal Enqueue several objects on the ring (multi-producers safe).
324  *
325  * This function uses a "compare and set" instruction to move the
326  * producer index atomically.
327  *
328  * @param r
329  *   A pointer to the ring structure.
330  * @param obj_table
331  *   A pointer to a table of void * pointers (objects).
332  * @param n
333  *   The number of objects to add in the ring from the obj_table.
334  * @param behavior
335  *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
336  *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring
337  * @return
338  *   Depend on the behavior value
339  *   if behavior = RTE_RING_QUEUE_FIXED
340  *   - 0: Success; objects enqueue.
341  *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
342  *     high water mark is exceeded.
343  *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
344  *   if behavior = RTE_RING_QUEUE_VARIABLE
345  *   - n: Actual number of objects enqueued.
346  */
347 static inline int __attribute__((always_inline))
348 __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
349                          unsigned n, enum rte_ring_queue_behavior behavior)
350 {
351         uint32_t prod_head, prod_next;
352         uint32_t cons_tail, free_entries;
353         const unsigned max = n;
354         int success;
355         unsigned i;
356         uint32_t mask = r->prod.mask;
357         int ret;
358
359         /* move prod.head atomically */
360         do {
361                 /* Reset n to the initial burst count */
362                 n = max;
363
364                 prod_head = r->prod.head;
365                 cons_tail = r->cons.tail;
366                 /* The subtraction is done between two unsigned 32bits value
367                  * (the result is always modulo 32 bits even if we have
368                  * prod_head > cons_tail). So 'free_entries' is always between 0
369                  * and size(ring)-1. */
370                 free_entries = (mask + cons_tail - prod_head);
371
372                 /* check that we have enough room in ring */
373                 if (unlikely(n > free_entries)) {
374                         if (behavior == RTE_RING_QUEUE_FIXED) {
375                                 __RING_STAT_ADD(r, enq_fail, n);
376                                 return -ENOBUFS;
377                         }
378                         else {
379                                 /* No free entry available */
380                                 if (unlikely(free_entries == 0)) {
381                                         __RING_STAT_ADD(r, enq_fail, n);
382                                         return 0;
383                                 }
384
385                                 n = free_entries;
386                         }
387                 }
388
389                 prod_next = prod_head + n;
390                 success = rte_atomic32_cmpset(&r->prod.head, prod_head,
391                                               prod_next);
392         } while (unlikely(success == 0));
393
394         /* write entries in ring */
395         ENQUEUE_PTRS();
396         COMPILER_BARRIER();
397
398         /* if we exceed the watermark */
399         if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
400                 ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
401                                 (int)(n | RTE_RING_QUOT_EXCEED);
402                 __RING_STAT_ADD(r, enq_quota, n);
403         }
404         else {
405                 ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
406                 __RING_STAT_ADD(r, enq_success, n);
407         }
408
409         /*
410          * If there are other enqueues in progress that preceeded us,
411          * we need to wait for them to complete
412          */
413         while (unlikely(r->prod.tail != prod_head))
414                 rte_pause();
415
416         r->prod.tail = prod_next;
417         return ret;
418 }
419
420 /**
421  * @internal Enqueue several objects on a ring (NOT multi-producers safe).
422  *
423  * @param r
424  *   A pointer to the ring structure.
425  * @param obj_table
426  *   A pointer to a table of void * pointers (objects).
427  * @param n
428  *   The number of objects to add in the ring from the obj_table.
429  * @param behavior
430  *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
431  *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring
432  * @return
433  *   Depend on the behavior value
434  *   if behavior = RTE_RING_QUEUE_FIXED
435  *   - 0: Success; objects enqueue.
436  *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
437  *     high water mark is exceeded.
438  *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
439  *   if behavior = RTE_RING_QUEUE_VARIABLE
440  *   - n: Actual number of objects enqueued.
441  */
442 static inline int __attribute__((always_inline))
443 __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
444                          unsigned n, enum rte_ring_queue_behavior behavior)
445 {
446         uint32_t prod_head, cons_tail;
447         uint32_t prod_next, free_entries;
448         unsigned i;
449         uint32_t mask = r->prod.mask;
450         int ret;
451
452         prod_head = r->prod.head;
453         cons_tail = r->cons.tail;
454         /* The subtraction is done between two unsigned 32bits value
455          * (the result is always modulo 32 bits even if we have
456          * prod_head > cons_tail). So 'free_entries' is always between 0
457          * and size(ring)-1. */
458         free_entries = mask + cons_tail - prod_head;
459
460         /* check that we have enough room in ring */
461         if (unlikely(n > free_entries)) {
462                 if (behavior == RTE_RING_QUEUE_FIXED) {
463                         __RING_STAT_ADD(r, enq_fail, n);
464                         return -ENOBUFS;
465                 }
466                 else {
467                         /* No free entry available */
468                         if (unlikely(free_entries == 0)) {
469                                 __RING_STAT_ADD(r, enq_fail, n);
470                                 return 0;
471                         }
472
473                         n = free_entries;
474                 }
475         }
476
477         prod_next = prod_head + n;
478         r->prod.head = prod_next;
479
480         /* write entries in ring */
481         ENQUEUE_PTRS();
482         COMPILER_BARRIER();
483
484         /* if we exceed the watermark */
485         if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
486                 ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
487                         (int)(n | RTE_RING_QUOT_EXCEED);
488                 __RING_STAT_ADD(r, enq_quota, n);
489         }
490         else {
491                 ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
492                 __RING_STAT_ADD(r, enq_success, n);
493         }
494
495         r->prod.tail = prod_next;
496         return ret;
497 }
498
499 /**
500  * @internal Dequeue several objects from a ring (multi-consumers safe). When
501  * the request objects are more than the available objects, only dequeue the
502  * actual number of objects
503  *
504  * This function uses a "compare and set" instruction to move the
505  * consumer index atomically.
506  *
507  * @param r
508  *   A pointer to the ring structure.
509  * @param obj_table
510  *   A pointer to a table of void * pointers (objects) that will be filled.
511  * @param n
512  *   The number of objects to dequeue from the ring to the obj_table.
513  * @param behavior
514  *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
515  *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring
516  * @return
517  *   Depend on the behavior value
518  *   if behavior = RTE_RING_QUEUE_FIXED
519  *   - 0: Success; objects dequeued.
520  *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
521  *     dequeued.
522  *   if behavior = RTE_RING_QUEUE_VARIABLE
523  *   - n: Actual number of objects dequeued.
524  */
525
526 static inline int __attribute__((always_inline))
527 __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
528                  unsigned n, enum rte_ring_queue_behavior behavior)
529 {
530         uint32_t cons_head, prod_tail;
531         uint32_t cons_next, entries;
532         const unsigned max = n;
533         int success;
534         unsigned i;
535         uint32_t mask = r->prod.mask;
536
537         /* move cons.head atomically */
538         do {
539                 /* Restore n as it may change every loop */
540                 n = max;
541
542                 cons_head = r->cons.head;
543                 prod_tail = r->prod.tail;
544                 /* The subtraction is done between two unsigned 32bits value
545                  * (the result is always modulo 32 bits even if we have
546                  * cons_head > prod_tail). So 'entries' is always between 0
547                  * and size(ring)-1. */
548                 entries = (prod_tail - cons_head);
549
550                 /* Set the actual entries for dequeue */
551                 if (n > entries) {
552                         if (behavior == RTE_RING_QUEUE_FIXED) {
553                                 __RING_STAT_ADD(r, deq_fail, n);
554                                 return -ENOENT;
555                         }
556                         else {
557                                 if (unlikely(entries == 0)){
558                                         __RING_STAT_ADD(r, deq_fail, n);
559                                         return 0;
560                                 }
561
562                                 n = entries;
563                         }
564                 }
565
566                 cons_next = cons_head + n;
567                 success = rte_atomic32_cmpset(&r->cons.head, cons_head,
568                                               cons_next);
569         } while (unlikely(success == 0));
570
571         /* copy in table */
572         DEQUEUE_PTRS();
573         COMPILER_BARRIER();
574
575         /*
576          * If there are other dequeues in progress that preceded us,
577          * we need to wait for them to complete
578          */
579         while (unlikely(r->cons.tail != cons_head))
580                 rte_pause();
581
582         __RING_STAT_ADD(r, deq_success, n);
583         r->cons.tail = cons_next;
584
585         return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
586 }
587
588 /**
589  * @internal Dequeue several objects from a ring (NOT multi-consumers safe).
590  * When the request objects are more than the available objects, only dequeue
591  * the actual number of objects
592  *
593  * @param r
594  *   A pointer to the ring structure.
595  * @param obj_table
596  *   A pointer to a table of void * pointers (objects) that will be filled.
597  * @param n
598  *   The number of objects to dequeue from the ring to the obj_table.
599  * @param behavior
600  *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
601  *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring
602  * @return
603  *   Depend on the behavior value
604  *   if behavior = RTE_RING_QUEUE_FIXED
605  *   - 0: Success; objects dequeued.
606  *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
607  *     dequeued.
608  *   if behavior = RTE_RING_QUEUE_VARIABLE
609  *   - n: Actual number of objects dequeued.
610  */
611 static inline int __attribute__((always_inline))
612 __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
613                  unsigned n, enum rte_ring_queue_behavior behavior)
614 {
615         uint32_t cons_head, prod_tail;
616         uint32_t cons_next, entries;
617         unsigned i;
618         uint32_t mask = r->prod.mask;
619
620         cons_head = r->cons.head;
621         prod_tail = r->prod.tail;
622         /* The subtraction is done between two unsigned 32bits value
623          * (the result is always modulo 32 bits even if we have
624          * cons_head > prod_tail). So 'entries' is always between 0
625          * and size(ring)-1. */
626         entries = prod_tail - cons_head;
627
628         if (n > entries) {
629                 if (behavior == RTE_RING_QUEUE_FIXED) {
630                         __RING_STAT_ADD(r, deq_fail, n);
631                         return -ENOENT;
632                 }
633                 else {
634                         if (unlikely(entries == 0)){
635                                 __RING_STAT_ADD(r, deq_fail, n);
636                                 return 0;
637                         }
638
639                         n = entries;
640                 }
641         }
642
643         cons_next = cons_head + n;
644         r->cons.head = cons_next;
645
646         /* copy in table */
647         DEQUEUE_PTRS();
648         COMPILER_BARRIER();
649
650         __RING_STAT_ADD(r, deq_success, n);
651         r->cons.tail = cons_next;
652         return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
653 }
654
655 /**
656  * Enqueue several objects on the ring (multi-producers safe).
657  *
658  * This function uses a "compare and set" instruction to move the
659  * producer index atomically.
660  *
661  * @param r
662  *   A pointer to the ring structure.
663  * @param obj_table
664  *   A pointer to a table of void * pointers (objects).
665  * @param n
666  *   The number of objects to add in the ring from the obj_table.
667  * @return
668  *   - 0: Success; objects enqueue.
669  *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
670  *     high water mark is exceeded.
671  *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
672  */
673 static inline int __attribute__((always_inline))
674 rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
675                          unsigned n)
676 {
677         return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
678 }
679
680 /**
681  * Enqueue several objects on a ring (NOT multi-producers safe).
682  *
683  * @param r
684  *   A pointer to the ring structure.
685  * @param obj_table
686  *   A pointer to a table of void * pointers (objects).
687  * @param n
688  *   The number of objects to add in the ring from the obj_table.
689  * @return
690  *   - 0: Success; objects enqueued.
691  *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
692  *     high water mark is exceeded.
693  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
694  */
695 static inline int __attribute__((always_inline))
696 rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
697                          unsigned n)
698 {
699         return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
700 }
701
702 /**
703  * Enqueue several objects on a ring.
704  *
705  * This function calls the multi-producer or the single-producer
706  * version depending on the default behavior that was specified at
707  * ring creation time (see flags).
708  *
709  * @param r
710  *   A pointer to the ring structure.
711  * @param obj_table
712  *   A pointer to a table of void * pointers (objects).
713  * @param n
714  *   The number of objects to add in the ring from the obj_table.
715  * @return
716  *   - 0: Success; objects enqueued.
717  *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
718  *     high water mark is exceeded.
719  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
720  */
721 static inline int __attribute__((always_inline))
722 rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
723                       unsigned n)
724 {
725         if (r->prod.sp_enqueue)
726                 return rte_ring_sp_enqueue_bulk(r, obj_table, n);
727         else
728                 return rte_ring_mp_enqueue_bulk(r, obj_table, n);
729 }
730
731 /**
732  * Enqueue one object on a ring (multi-producers safe).
733  *
734  * This function uses a "compare and set" instruction to move the
735  * producer index atomically.
736  *
737  * @param r
738  *   A pointer to the ring structure.
739  * @param obj
740  *   A pointer to the object to be added.
741  * @return
742  *   - 0: Success; objects enqueued.
743  *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
744  *     high water mark is exceeded.
745  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
746  */
747 static inline int __attribute__((always_inline))
748 rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
749 {
750         return rte_ring_mp_enqueue_bulk(r, &obj, 1);
751 }
752
753 /**
754  * Enqueue one object on a ring (NOT multi-producers safe).
755  *
756  * @param r
757  *   A pointer to the ring structure.
758  * @param obj
759  *   A pointer to the object to be added.
760  * @return
761  *   - 0: Success; objects enqueued.
762  *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
763  *     high water mark is exceeded.
764  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
765  */
766 static inline int __attribute__((always_inline))
767 rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
768 {
769         return rte_ring_sp_enqueue_bulk(r, &obj, 1);
770 }
771
772 /**
773  * Enqueue one object on a ring.
774  *
775  * This function calls the multi-producer or the single-producer
776  * version, depending on the default behaviour that was specified at
777  * ring creation time (see flags).
778  *
779  * @param r
780  *   A pointer to the ring structure.
781  * @param obj
782  *   A pointer to the object to be added.
783  * @return
784  *   - 0: Success; objects enqueued.
785  *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
786  *     high water mark is exceeded.
787  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
788  */
789 static inline int __attribute__((always_inline))
790 rte_ring_enqueue(struct rte_ring *r, void *obj)
791 {
792         if (r->prod.sp_enqueue)
793                 return rte_ring_sp_enqueue(r, obj);
794         else
795                 return rte_ring_mp_enqueue(r, obj);
796 }
797
798 /**
799  * Dequeue several objects from a ring (multi-consumers safe).
800  *
801  * This function uses a "compare and set" instruction to move the
802  * consumer index atomically.
803  *
804  * @param r
805  *   A pointer to the ring structure.
806  * @param obj_table
807  *   A pointer to a table of void * pointers (objects) that will be filled.
808  * @param n
809  *   The number of objects to dequeue from the ring to the obj_table.
810  * @return
811  *   - 0: Success; objects dequeued.
812  *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
813  *     dequeued.
814  */
815 static inline int __attribute__((always_inline))
816 rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
817 {
818         return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
819 }
820
821 /**
822  * Dequeue several objects from a ring (NOT multi-consumers safe).
823  *
824  * @param r
825  *   A pointer to the ring structure.
826  * @param obj_table
827  *   A pointer to a table of void * pointers (objects) that will be filled.
828  * @param n
829  *   The number of objects to dequeue from the ring to the obj_table,
830  *   must be strictly positive.
831  * @return
832  *   - 0: Success; objects dequeued.
833  *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
834  *     dequeued.
835  */
836 static inline int __attribute__((always_inline))
837 rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
838 {
839         return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
840 }
841
842 /**
843  * Dequeue several objects from a ring.
844  *
845  * This function calls the multi-consumers or the single-consumer
846  * version, depending on the default behaviour that was specified at
847  * ring creation time (see flags).
848  *
849  * @param r
850  *   A pointer to the ring structure.
851  * @param obj_table
852  *   A pointer to a table of void * pointers (objects) that will be filled.
853  * @param n
854  *   The number of objects to dequeue from the ring to the obj_table.
855  * @return
856  *   - 0: Success; objects dequeued.
857  *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
858  *     dequeued.
859  */
860 static inline int __attribute__((always_inline))
861 rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
862 {
863         if (r->cons.sc_dequeue)
864                 return rte_ring_sc_dequeue_bulk(r, obj_table, n);
865         else
866                 return rte_ring_mc_dequeue_bulk(r, obj_table, n);
867 }
868
869 /**
870  * Dequeue one object from a ring (multi-consumers safe).
871  *
872  * This function uses a "compare and set" instruction to move the
873  * consumer index atomically.
874  *
875  * @param r
876  *   A pointer to the ring structure.
877  * @param obj_p
878  *   A pointer to a void * pointer (object) that will be filled.
879  * @return
880  *   - 0: Success; objects dequeued.
881  *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
882  *     dequeued.
883  */
884 static inline int __attribute__((always_inline))
885 rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
886 {
887         return rte_ring_mc_dequeue_bulk(r, obj_p, 1);
888 }
889
890 /**
891  * Dequeue one object from a ring (NOT multi-consumers safe).
892  *
893  * @param r
894  *   A pointer to the ring structure.
895  * @param obj_p
896  *   A pointer to a void * pointer (object) that will be filled.
897  * @return
898  *   - 0: Success; objects dequeued.
899  *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
900  *     dequeued.
901  */
902 static inline int __attribute__((always_inline))
903 rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
904 {
905         return rte_ring_sc_dequeue_bulk(r, obj_p, 1);
906 }
907
908 /**
909  * Dequeue one object from a ring.
910  *
911  * This function calls the multi-consumers or the single-consumer
912  * version depending on the default behaviour that was specified at
913  * ring creation time (see flags).
914  *
915  * @param r
916  *   A pointer to the ring structure.
917  * @param obj_p
918  *   A pointer to a void * pointer (object) that will be filled.
919  * @return
920  *   - 0: Success, objects dequeued.
921  *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
922  *     dequeued.
923  */
924 static inline int __attribute__((always_inline))
925 rte_ring_dequeue(struct rte_ring *r, void **obj_p)
926 {
927         if (r->cons.sc_dequeue)
928                 return rte_ring_sc_dequeue(r, obj_p);
929         else
930                 return rte_ring_mc_dequeue(r, obj_p);
931 }
932
933 /**
934  * Test if a ring is full.
935  *
936  * @param r
937  *   A pointer to the ring structure.
938  * @return
939  *   - 1: The ring is full.
940  *   - 0: The ring is not full.
941  */
942 static inline int
943 rte_ring_full(const struct rte_ring *r)
944 {
945         uint32_t prod_tail = r->prod.tail;
946         uint32_t cons_tail = r->cons.tail;
947         return (((cons_tail - prod_tail - 1) & r->prod.mask) == 0);
948 }
949
950 /**
951  * Test if a ring is empty.
952  *
953  * @param r
954  *   A pointer to the ring structure.
955  * @return
956  *   - 1: The ring is empty.
957  *   - 0: The ring is not empty.
958  */
959 static inline int
960 rte_ring_empty(const struct rte_ring *r)
961 {
962         uint32_t prod_tail = r->prod.tail;
963         uint32_t cons_tail = r->cons.tail;
964         return !!(cons_tail == prod_tail);
965 }
966
967 /**
968  * Return the number of entries in a ring.
969  *
970  * @param r
971  *   A pointer to the ring structure.
972  * @return
973  *   The number of entries in the ring.
974  */
975 static inline unsigned
976 rte_ring_count(const struct rte_ring *r)
977 {
978         uint32_t prod_tail = r->prod.tail;
979         uint32_t cons_tail = r->cons.tail;
980         return ((prod_tail - cons_tail) & r->prod.mask);
981 }
982
983 /**
984  * Return the number of free entries in a ring.
985  *
986  * @param r
987  *   A pointer to the ring structure.
988  * @return
989  *   The number of free entries in the ring.
990  */
991 static inline unsigned
992 rte_ring_free_count(const struct rte_ring *r)
993 {
994         uint32_t prod_tail = r->prod.tail;
995         uint32_t cons_tail = r->cons.tail;
996         return ((cons_tail - prod_tail - 1) & r->prod.mask);
997 }
998
999 /**
1000  * Dump the status of all rings on the console
1001  */
1002 void rte_ring_list_dump(void);
1003
1004 /**
1005  * Search a ring from its name
1006  *
1007  * @param name
1008  *   The name of the ring.
1009  * @return
1010  *   The pointer to the ring matching the name, or NULL if not found,
1011  *   with rte_errno set appropriately. Possible rte_errno values include:
1012  *    - ENOENT - required entry not available to return.
1013  */
1014 struct rte_ring *rte_ring_lookup(const char *name);
1015
1016 /**
1017  * Enqueue several objects on the ring (multi-producers safe).
1018  *
1019  * This function uses a "compare and set" instruction to move the
1020  * producer index atomically.
1021  *
1022  * @param r
1023  *   A pointer to the ring structure.
1024  * @param obj_table
1025  *   A pointer to a table of void * pointers (objects).
1026  * @param n
1027  *   The number of objects to add in the ring from the obj_table.
1028  * @return
1029  *   - n: Actual number of objects enqueued.
1030  */
1031 static inline int __attribute__((always_inline))
1032 rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
1033                          unsigned n)
1034 {
1035         return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
1036 }
1037
1038 /**
1039  * Enqueue several objects on a ring (NOT multi-producers safe).
1040  *
1041  * @param r
1042  *   A pointer to the ring structure.
1043  * @param obj_table
1044  *   A pointer to a table of void * pointers (objects).
1045  * @param n
1046  *   The number of objects to add in the ring from the obj_table.
1047  * @return
1048  *   - n: Actual number of objects enqueued.
1049  */
1050 static inline int __attribute__((always_inline))
1051 rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
1052                          unsigned n)
1053 {
1054         return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
1055 }
1056
1057 /**
1058  * Enqueue several objects on a ring.
1059  *
1060  * This function calls the multi-producer or the single-producer
1061  * version depending on the default behavior that was specified at
1062  * ring creation time (see flags).
1063  *
1064  * @param r
1065  *   A pointer to the ring structure.
1066  * @param obj_table
1067  *   A pointer to a table of void * pointers (objects).
1068  * @param n
1069  *   The number of objects to add in the ring from the obj_table.
1070  * @return
1071  *   - n: Actual number of objects enqueued.
1072  */
1073 static inline int __attribute__((always_inline))
1074 rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
1075                       unsigned n)
1076 {
1077         if (r->prod.sp_enqueue)
1078                 return  rte_ring_sp_enqueue_burst(r, obj_table, n);
1079         else
1080                 return  rte_ring_mp_enqueue_burst(r, obj_table, n);
1081 }
1082
1083 /**
1084  * Dequeue several objects from a ring (multi-consumers safe). When the request
1085  * objects are more than the available objects, only dequeue the actual number
1086  * of objects
1087  *
1088  * This function uses a "compare and set" instruction to move the
1089  * consumer index atomically.
1090  *
1091  * @param r
1092  *   A pointer to the ring structure.
1093  * @param obj_table
1094  *   A pointer to a table of void * pointers (objects) that will be filled.
1095  * @param n
1096  *   The number of objects to dequeue from the ring to the obj_table.
1097  * @return
1098  *   - n: Actual number of objects dequeued, 0 if ring is empty
1099  */
1100 static inline int __attribute__((always_inline))
1101 rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
1102 {
1103         return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
1104 }
1105
1106 /**
1107  * Dequeue several objects from a ring (NOT multi-consumers safe).When the
1108  * request objects are more than the available objects, only dequeue the
1109  * actual number of objects
1110  *
1111  * @param r
1112  *   A pointer to the ring structure.
1113  * @param obj_table
1114  *   A pointer to a table of void * pointers (objects) that will be filled.
1115  * @param n
1116  *   The number of objects to dequeue from the ring to the obj_table.
1117  * @return
1118  *   - n: Actual number of objects dequeued, 0 if ring is empty
1119  */
1120 static inline int __attribute__((always_inline))
1121 rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
1122 {
1123         return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
1124 }
1125
1126 /**
1127  * Dequeue multiple objects from a ring up to a maximum number.
1128  *
1129  * This function calls the multi-consumers or the single-consumer
1130  * version, depending on the default behaviour that was specified at
1131  * ring creation time (see flags).
1132  *
1133  * @param r
1134  *   A pointer to the ring structure.
1135  * @param obj_table
1136  *   A pointer to a table of void * pointers (objects) that will be filled.
1137  * @param n
1138  *   The number of objects to dequeue from the ring to the obj_table.
1139  * @return
1140  *   - Number of objects dequeued, or a negative error code on error
1141  */
1142 static inline int __attribute__((always_inline))
1143 rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
1144 {
1145         if (r->cons.sc_dequeue)
1146                 return rte_ring_sc_dequeue_burst(r, obj_table, n);
1147         else
1148                 return rte_ring_mc_dequeue_burst(r, obj_table, n);
1149 }
1150
1151 #ifdef __cplusplus
1152 }
1153 #endif
1154
1155 #endif /* _RTE_RING_H_ */