ring: return free space when enqueuing
[dpdk.git] / lib / librte_ring / rte_ring.h
index d650215..439698b 100644 (file)
@@ -109,34 +109,11 @@ enum rte_ring_queue_behavior {
        RTE_RING_QUEUE_VARIABLE   /* Enq/Deq as many items as possible from ring */
 };
 
-#ifdef RTE_LIBRTE_RING_DEBUG
-/**
- * A structure that stores the ring statistics (per-lcore).
- */
-struct rte_ring_debug_stats {
-       uint64_t enq_success_bulk; /**< Successful enqueues number. */
-       uint64_t enq_success_objs; /**< Objects successfully enqueued. */
-       uint64_t enq_quota_bulk;   /**< Successful enqueues above watermark. */
-       uint64_t enq_quota_objs;   /**< Objects enqueued above watermark. */
-       uint64_t enq_fail_bulk;    /**< Failed enqueues number. */
-       uint64_t enq_fail_objs;    /**< Objects that failed to be enqueued. */
-       uint64_t deq_success_bulk; /**< Successful dequeues number. */
-       uint64_t deq_success_objs; /**< Objects successfully dequeued. */
-       uint64_t deq_fail_bulk;    /**< Failed dequeues number. */
-       uint64_t deq_fail_objs;    /**< Objects that failed to be dequeued. */
-} __rte_cache_aligned;
-#endif
-
 #define RTE_RING_MZ_PREFIX "RG_"
 /**< The maximum length of a ring name. */
 #define RTE_RING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
                           sizeof(RTE_RING_MZ_PREFIX) + 1)
 
-#ifndef RTE_RING_PAUSE_REP_COUNT
-#define RTE_RING_PAUSE_REP_COUNT 0 /**< Yield after pause num of times, no yield
-                                    *   if RTE_RING_PAUSE_REP not defined. */
-#endif
-
 struct rte_memzone; /* forward declaration, so as not to require memzone.h */
 
 #if RTE_CACHE_LINE_SIZE < 128
@@ -176,7 +153,6 @@ struct rte_ring {
                        /**< Memzone, if any, containing the rte_ring */
        uint32_t size;           /**< Size of ring. */
        uint32_t mask;           /**< Mask (size-1) of ring. */
-       uint32_t watermark;      /**< Max items before EDQUOT in producer. */
 
        /** Ring producer status. */
        struct rte_ring_headtail prod __rte_aligned(PROD_ALIGN);
@@ -184,10 +160,6 @@ struct rte_ring {
        /** Ring consumer status. */
        struct rte_ring_headtail cons __rte_aligned(CONS_ALIGN);
 
-#ifdef RTE_LIBRTE_RING_DEBUG
-       struct rte_ring_debug_stats stats[RTE_MAX_LCORE];
-#endif
-
        void *ring[] __rte_cache_aligned;   /**< Memory space of ring starts here.
                                             * not volatile so need to be careful
                                             * about compiler re-ordering */
@@ -195,30 +167,8 @@ struct rte_ring {
 
 #define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */
 #define RING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */
-#define RTE_RING_QUOT_EXCEED (1 << 31)  /**< Quota exceed for burst ops */
 #define RTE_RING_SZ_MASK  (unsigned)(0x0fffffff) /**< Ring size mask */
 
-/**
- * @internal When debug is enabled, store ring statistics.
- * @param r
- *   A pointer to the ring.
- * @param name
- *   The name of the statistics field to increment in the ring.
- * @param n
- *   The number to add to the object-oriented statistics.
- */
-#ifdef RTE_LIBRTE_RING_DEBUG
-#define __RING_STAT_ADD(r, name, n) do {                        \
-               unsigned __lcore_id = rte_lcore_id();           \
-               if (__lcore_id < RTE_MAX_LCORE) {               \
-                       r->stats[__lcore_id].name##_objs += n;  \
-                       r->stats[__lcore_id].name##_bulk += 1;  \
-               }                                               \
-       } while(0)
-#else
-#define __RING_STAT_ADD(r, name, n) do {} while(0)
-#endif
-
 /**
  * Calculate the memory size needed for a ring
  *
@@ -321,26 +271,6 @@ struct rte_ring *rte_ring_create(const char *name, unsigned count,
  */
 void rte_ring_free(struct rte_ring *r);
 
-/**
- * Change the high water mark.
- *
- * If *count* is 0, water marking is disabled. Otherwise, it is set to the
- * *count* value. The *count* value must be greater than 0 and less
- * than the ring size.
- *
- * This function can be called at any time (not necessarily at
- * initialization).
- *
- * @param r
- *   A pointer to the ring structure.
- * @param count
- *   The new water mark value.
- * @return
- *   - 0: Success; water mark changed.
- *   - -EINVAL: Invalid water mark value.
- */
-int rte_ring_set_water_mark(struct rte_ring *r, unsigned count);
-
 /**
  * Dump the status of the ring to a file.
  *
@@ -419,31 +349,20 @@ void rte_ring_dump(FILE *f, const struct rte_ring *r);
  *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
  *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring
  * @return
- *   Depend on the behavior value
- *   if behavior = RTE_RING_QUEUE_FIXED
- *   - 0: Success; objects enqueue.
- *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
- *     high water mark is exceeded.
- *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
- *   if behavior = RTE_RING_QUEUE_VARIABLE
- *   - n: Actual number of objects enqueued.
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
  */
-static inline int __attribute__((always_inline))
+static inline unsigned int __attribute__((always_inline))
 __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
-                        unsigned n, enum rte_ring_queue_behavior behavior)
+                        unsigned int n, enum rte_ring_queue_behavior behavior,
+                        unsigned int *free_space)
 {
        uint32_t prod_head, prod_next;
        uint32_t cons_tail, free_entries;
-       const unsigned max = n;
+       const unsigned int max = n;
        int success;
-       unsigned i, rep = 0;
+       unsigned int i;
        uint32_t mask = r->mask;
-       int ret;
-
-       /* Avoid the unnecessary cmpset operation below, which is also
-        * potentially harmful when n equals 0. */
-       if (n == 0)
-               return 0;
 
        /* move prod.head atomically */
        do {
@@ -459,21 +378,12 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
                free_entries = (mask + cons_tail - prod_head);
 
                /* check that we have enough room in ring */
-               if (unlikely(n > free_entries)) {
-                       if (behavior == RTE_RING_QUEUE_FIXED) {
-                               __RING_STAT_ADD(r, enq_fail, n);
-                               return -ENOBUFS;
-                       }
-                       else {
-                               /* No free entry available */
-                               if (unlikely(free_entries == 0)) {
-                                       __RING_STAT_ADD(r, enq_fail, n);
-                                       return 0;
-                               }
+               if (unlikely(n > free_entries))
+                       n = (behavior == RTE_RING_QUEUE_FIXED) ?
+                                       0 : free_entries;
 
-                               n = free_entries;
-                       }
-               }
+               if (n == 0)
+                       goto end;
 
                prod_next = prod_head + n;
                success = rte_atomic32_cmpset(&r->prod.head, prod_head,
@@ -484,35 +394,18 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
        ENQUEUE_PTRS();
        rte_smp_wmb();
 
-       /* if we exceed the watermark */
-       if (unlikely(((mask + 1) - free_entries + n) > r->watermark)) {
-               ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
-                               (int)(n | RTE_RING_QUOT_EXCEED);
-               __RING_STAT_ADD(r, enq_quota, n);
-       }
-       else {
-               ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
-               __RING_STAT_ADD(r, enq_success, n);
-       }
-
        /*
         * If there are other enqueues in progress that preceded us,
         * we need to wait for them to complete
         */
-       while (unlikely(r->prod.tail != prod_head)) {
+       while (unlikely(r->prod.tail != prod_head))
                rte_pause();
 
-               /* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
-                * for other thread finish. It gives pre-empted thread a chance
-                * to proceed and finish with ring dequeue operation. */
-               if (RTE_RING_PAUSE_REP_COUNT &&
-                   ++rep == RTE_RING_PAUSE_REP_COUNT) {
-                       rep = 0;
-                       sched_yield();
-               }
-       }
        r->prod.tail = prod_next;
-       return ret;
+end:
+       if (free_space != NULL)
+               *free_space = free_entries - n;
+       return n;
 }
 
 /**
@@ -528,24 +421,18 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
  *   RTE_RING_QUEUE_FIXED:    Enqueue a fixed number of items from a ring
  *   RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring
  * @return
- *   Depend on the behavior value
- *   if behavior = RTE_RING_QUEUE_FIXED
- *   - 0: Success; objects enqueue.
- *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
- *     high water mark is exceeded.
- *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
- *   if behavior = RTE_RING_QUEUE_VARIABLE
- *   - n: Actual number of objects enqueued.
+ *   Actual number of objects enqueued.
+ *   If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
  */
-static inline int __attribute__((always_inline))
+static inline unsigned int __attribute__((always_inline))
 __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
-                        unsigned n, enum rte_ring_queue_behavior behavior)
+                        unsigned int n, enum rte_ring_queue_behavior behavior,
+                        unsigned int *free_space)
 {
        uint32_t prod_head, cons_tail;
        uint32_t prod_next, free_entries;
-       unsigned i;
+       unsigned int i;
        uint32_t mask = r->mask;
-       int ret;
 
        prod_head = r->prod.head;
        cons_tail = r->cons.tail;
@@ -556,21 +443,12 @@ __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
        free_entries = mask + cons_tail - prod_head;
 
        /* check that we have enough room in ring */
-       if (unlikely(n > free_entries)) {
-               if (behavior == RTE_RING_QUEUE_FIXED) {
-                       __RING_STAT_ADD(r, enq_fail, n);
-                       return -ENOBUFS;
-               }
-               else {
-                       /* No free entry available */
-                       if (unlikely(free_entries == 0)) {
-                               __RING_STAT_ADD(r, enq_fail, n);
-                               return 0;
-                       }
+       if (unlikely(n > free_entries))
+               n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : free_entries;
+
+       if (n == 0)
+               goto end;
 
-                       n = free_entries;
-               }
-       }
 
        prod_next = prod_head + n;
        r->prod.head = prod_next;
@@ -579,19 +457,11 @@ __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
        ENQUEUE_PTRS();
        rte_smp_wmb();
 
-       /* if we exceed the watermark */
-       if (unlikely(((mask + 1) - free_entries + n) > r->watermark)) {
-               ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
-                       (int)(n | RTE_RING_QUOT_EXCEED);
-               __RING_STAT_ADD(r, enq_quota, n);
-       }
-       else {
-               ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
-               __RING_STAT_ADD(r, enq_success, n);
-       }
-
        r->prod.tail = prod_next;
-       return ret;
+end:
+       if (free_space != NULL)
+               *free_space = free_entries - n;
+       return n;
 }
 
 /**
@@ -612,16 +482,11 @@ __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
  *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
  *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring
  * @return
- *   Depend on the behavior value
- *   if behavior = RTE_RING_QUEUE_FIXED
- *   - 0: Success; objects dequeued.
- *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
- *     dequeued.
- *   if behavior = RTE_RING_QUEUE_VARIABLE
- *   - n: Actual number of objects dequeued.
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
  */
 
-static inline int __attribute__((always_inline))
+static inline unsigned int __attribute__((always_inline))
 __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
                 unsigned n, enum rte_ring_queue_behavior behavior)
 {
@@ -629,7 +494,7 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
        uint32_t cons_next, entries;
        const unsigned max = n;
        int success;
-       unsigned i, rep = 0;
+       unsigned int i;
        uint32_t mask = r->mask;
 
        /* Avoid the unnecessary cmpset operation below, which is also
@@ -652,16 +517,11 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
 
                /* Set the actual entries for dequeue */
                if (n > entries) {
-                       if (behavior == RTE_RING_QUEUE_FIXED) {
-                               __RING_STAT_ADD(r, deq_fail, n);
-                               return -ENOENT;
-                       }
+                       if (behavior == RTE_RING_QUEUE_FIXED)
+                               return 0;
                        else {
-                               if (unlikely(entries == 0)){
-                                       __RING_STAT_ADD(r, deq_fail, n);
+                               if (unlikely(entries == 0))
                                        return 0;
-                               }
-
                                n = entries;
                        }
                }
@@ -679,22 +539,12 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
         * If there are other dequeues in progress that preceded us,
         * we need to wait for them to complete
         */
-       while (unlikely(r->cons.tail != cons_head)) {
+       while (unlikely(r->cons.tail != cons_head))
                rte_pause();
 
-               /* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
-                * for other thread finish. It gives pre-empted thread a chance
-                * to proceed and finish with ring dequeue operation. */
-               if (RTE_RING_PAUSE_REP_COUNT &&
-                   ++rep == RTE_RING_PAUSE_REP_COUNT) {
-                       rep = 0;
-                       sched_yield();
-               }
-       }
-       __RING_STAT_ADD(r, deq_success, n);
        r->cons.tail = cons_next;
 
-       return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
+       return n;
 }
 
 /**
@@ -712,21 +562,16 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
  *   RTE_RING_QUEUE_FIXED:    Dequeue a fixed number of items from a ring
  *   RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring
  * @return
- *   Depend on the behavior value
- *   if behavior = RTE_RING_QUEUE_FIXED
- *   - 0: Success; objects dequeued.
- *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
- *     dequeued.
- *   if behavior = RTE_RING_QUEUE_VARIABLE
- *   - n: Actual number of objects dequeued.
+ *   - Actual number of objects dequeued.
+ *     If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only.
  */
-static inline int __attribute__((always_inline))
+static inline unsigned int __attribute__((always_inline))
 __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
                 unsigned n, enum rte_ring_queue_behavior behavior)
 {
        uint32_t cons_head, prod_tail;
        uint32_t cons_next, entries;
-       unsigned i;
+       unsigned int i;
        uint32_t mask = r->mask;
 
        cons_head = r->cons.head;
@@ -738,16 +583,11 @@ __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
        entries = prod_tail - cons_head;
 
        if (n > entries) {
-               if (behavior == RTE_RING_QUEUE_FIXED) {
-                       __RING_STAT_ADD(r, deq_fail, n);
-                       return -ENOENT;
-               }
+               if (behavior == RTE_RING_QUEUE_FIXED)
+                       return 0;
                else {
-                       if (unlikely(entries == 0)){
-                               __RING_STAT_ADD(r, deq_fail, n);
+                       if (unlikely(entries == 0))
                                return 0;
-                       }
-
                        n = entries;
                }
        }
@@ -759,9 +599,8 @@ __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
        DEQUEUE_PTRS();
        rte_smp_rmb();
 
-       __RING_STAT_ADD(r, deq_success, n);
        r->cons.tail = cons_next;
-       return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
+       return n;
 }
 
 /**
@@ -776,17 +615,18 @@ __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
  *   A pointer to a table of void * pointers (objects).
  * @param n
  *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
  * @return
- *   - 0: Success; objects enqueue.
- *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
- *     high water mark is exceeded.
- *   - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
+ *   The number of objects enqueued, either 0 or n
  */
-static inline int __attribute__((always_inline))
+static inline unsigned int __attribute__((always_inline))
 rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
-                        unsigned n)
+                        unsigned int n, unsigned int *free_space)
 {
-       return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+       return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
+                       free_space);
 }
 
 /**
@@ -798,17 +638,18 @@ rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
  *   A pointer to a table of void * pointers (objects).
  * @param n
  *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
  * @return
- *   - 0: Success; objects enqueued.
- *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
- *     high water mark is exceeded.
- *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
+ *   The number of objects enqueued, either 0 or n
  */
-static inline int __attribute__((always_inline))
+static inline unsigned int __attribute__((always_inline))
 rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
-                        unsigned n)
+                        unsigned int n, unsigned int *free_space)
 {
-       return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+       return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
+                       free_space);
 }
 
 /**
@@ -824,20 +665,20 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
  *   A pointer to a table of void * pointers (objects).
  * @param n
  *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
  * @return
- *   - 0: Success; objects enqueued.
- *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
- *     high water mark is exceeded.
- *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
+ *   The number of objects enqueued, either 0 or n
  */
-static inline int __attribute__((always_inline))
+static inline unsigned int __attribute__((always_inline))
 rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
-                     unsigned n)
+                     unsigned int n, unsigned int *free_space)
 {
        if (r->prod.single)
-               return rte_ring_sp_enqueue_bulk(r, obj_table, n);
+               return rte_ring_sp_enqueue_bulk(r, obj_table, n, free_space);
        else
-               return rte_ring_mp_enqueue_bulk(r, obj_table, n);
+               return rte_ring_mp_enqueue_bulk(r, obj_table, n, free_space);
 }
 
 /**
@@ -852,14 +693,12 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
  *   A pointer to the object to be added.
  * @return
  *   - 0: Success; objects enqueued.
- *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
- *     high water mark is exceeded.
  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
  */
 static inline int __attribute__((always_inline))
 rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
 {
-       return rte_ring_mp_enqueue_bulk(r, &obj, 1);
+       return rte_ring_mp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
 }
 
 /**
@@ -871,14 +710,12 @@ rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
  *   A pointer to the object to be added.
  * @return
  *   - 0: Success; objects enqueued.
- *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
- *     high water mark is exceeded.
  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
  */
 static inline int __attribute__((always_inline))
 rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
 {
-       return rte_ring_sp_enqueue_bulk(r, &obj, 1);
+       return rte_ring_sp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
 }
 
 /**
@@ -894,17 +731,12 @@ rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
  *   A pointer to the object to be added.
  * @return
  *   - 0: Success; objects enqueued.
- *   - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
- *     high water mark is exceeded.
  *   - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
  */
 static inline int __attribute__((always_inline))
 rte_ring_enqueue(struct rte_ring *r, void *obj)
 {
-       if (r->prod.single)
-               return rte_ring_sp_enqueue(r, obj);
-       else
-               return rte_ring_mp_enqueue(r, obj);
+       return rte_ring_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
 }
 
 /**
@@ -920,11 +752,9 @@ rte_ring_enqueue(struct rte_ring *r, void *obj)
  * @param n
  *   The number of objects to dequeue from the ring to the obj_table.
  * @return
- *   - 0: Success; objects dequeued.
- *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
- *     dequeued.
+ *   The number of objects dequeued, either 0 or n
  */
-static inline int __attribute__((always_inline))
+static inline unsigned int __attribute__((always_inline))
 rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
 {
        return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
@@ -941,11 +771,9 @@ rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
  *   The number of objects to dequeue from the ring to the obj_table,
  *   must be strictly positive.
  * @return
- *   - 0: Success; objects dequeued.
- *   - -ENOENT: Not enough entries in the ring to dequeue; no object is
- *     dequeued.
+ *   The number of objects dequeued, either 0 or n
  */
-static inline int __attribute__((always_inline))
+static inline unsigned int __attribute__((always_inline))
 rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
 {
        return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
@@ -965,11 +793,9 @@ rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
  * @param n
  *   The number of objects to dequeue from the ring to the obj_table.
  * @return
- *   - 0: Success; objects dequeued.
- *   - -ENOENT: Not enough entries in the ring to dequeue, no object is
- *     dequeued.
+ *   The number of objects dequeued, either 0 or n
  */
-static inline int __attribute__((always_inline))
+static inline unsigned int __attribute__((always_inline))
 rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
 {
        if (r->cons.single)
@@ -996,7 +822,7 @@ rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
 static inline int __attribute__((always_inline))
 rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
 {
-       return rte_ring_mc_dequeue_bulk(r, obj_p, 1);
+       return rte_ring_mc_dequeue_bulk(r, obj_p, 1)  ? 0 : -ENOBUFS;
 }
 
 /**
@@ -1014,7 +840,7 @@ rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
 static inline int __attribute__((always_inline))
 rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
 {
-       return rte_ring_sc_dequeue_bulk(r, obj_p, 1);
+       return rte_ring_sc_dequeue_bulk(r, obj_p, 1) ? 0 : -ENOBUFS;
 }
 
 /**
@@ -1036,10 +862,7 @@ rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
 static inline int __attribute__((always_inline))
 rte_ring_dequeue(struct rte_ring *r, void **obj_p)
 {
-       if (r->cons.single)
-               return rte_ring_sc_dequeue(r, obj_p);
-       else
-               return rte_ring_mc_dequeue(r, obj_p);
+       return rte_ring_dequeue_bulk(r, obj_p, 1) ? 0 : -ENOBUFS;
 }
 
 /**
@@ -1154,14 +977,18 @@ struct rte_ring *rte_ring_lookup(const char *name);
  *   A pointer to a table of void * pointers (objects).
  * @param n
  *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
  * @return
  *   - n: Actual number of objects enqueued.
  */
 static inline unsigned __attribute__((always_inline))
 rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
-                        unsigned n)
+                        unsigned int n, unsigned int *free_space)
 {
-       return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+       return __rte_ring_mp_do_enqueue(r, obj_table, n,
+                       RTE_RING_QUEUE_VARIABLE, free_space);
 }
 
 /**
@@ -1173,14 +1000,18 @@ rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
  *   A pointer to a table of void * pointers (objects).
  * @param n
  *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
  * @return
  *   - n: Actual number of objects enqueued.
  */
 static inline unsigned __attribute__((always_inline))
 rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
-                        unsigned n)
+                        unsigned int n, unsigned int *free_space)
 {
-       return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+       return __rte_ring_sp_do_enqueue(r, obj_table, n,
+                       RTE_RING_QUEUE_VARIABLE, free_space);
 }
 
 /**
@@ -1196,17 +1027,20 @@ rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
  *   A pointer to a table of void * pointers (objects).
  * @param n
  *   The number of objects to add in the ring from the obj_table.
+ * @param free_space
+ *   if non-NULL, returns the amount of space in the ring after the
+ *   enqueue operation has finished.
  * @return
  *   - n: Actual number of objects enqueued.
  */
 static inline unsigned __attribute__((always_inline))
 rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
-                     unsigned n)
+                     unsigned int n, unsigned int *free_space)
 {
        if (r->prod.single)
-               return rte_ring_sp_enqueue_burst(r, obj_table, n);
+               return rte_ring_sp_enqueue_burst(r, obj_table, n, free_space);
        else
-               return rte_ring_mp_enqueue_burst(r, obj_table, n);
+               return rte_ring_mp_enqueue_burst(r, obj_table, n, free_space);
 }
 
 /**