X-Git-Url: http://git.droids-corp.org/?a=blobdiff_plain;f=lib%2Flibrte_ring%2Frte_ring.h;h=943c97cf4a8c664b93d925bd11808c94ceac34b6;hb=693f715da45c48ec1ec0fe4ba2f3b5ffd11ba53e;hp=4086c78caf9b43dd103136c8b88f24a20101cc8f;hpb=af75078fece3615088e561357c1e97603e43a5fe;p=dpdk.git diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index 4086c78caf..943c97cf4a 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -1,36 +1,34 @@ /*- * BSD LICENSE - * - * Copyright(c) 2010-2012 Intel Corporation. All rights reserved. + * + * Copyright(c) 2010-2014 Intel Corporation. All rights reserved. * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions * are met: - * - * * Redistributions of source code must retain the above copyright + * + * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in - * the documentation and/or other materials provided with the + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the * distribution. - * * Neither the name of Intel Corporation nor the names of its - * contributors may be used to endorse or promote products derived + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived * from this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * version: DPDK.L.1.2.3-3 */ /* @@ -93,6 +91,7 @@ extern "C" { #endif +#include #include #include #include @@ -102,6 +101,12 @@ extern "C" { #include #include +#define RTE_TAILQ_RING_NAME "RTE_RING" + +enum rte_ring_queue_behavior { + RTE_RING_QUEUE_FIXED = 0, /* Enq/Deq a fixed number of items from a ring */ + RTE_RING_QUEUE_VARIABLE /* Enq/Deq as many items a possible from ring */ +}; #ifdef RTE_LIBRTE_RING_DEBUG /** @@ -122,6 +127,14 @@ struct rte_ring_debug_stats { #endif #define RTE_RING_NAMESIZE 32 /**< The maximum length of a ring name. */ +#define RTE_RING_MZ_PREFIX "RG_" + +#ifndef RTE_RING_PAUSE_REP_COUNT +#define RTE_RING_PAUSE_REP_COUNT 0 /**< Yield after pause num of times, no yield + * if RTE_RING_PAUSE_REP not defined. */ +#endif + +struct rte_memzone; /* forward declaration, so as not to require memzone.h */ /** * An RTE ring structure. @@ -134,14 +147,13 @@ struct rte_ring_debug_stats { * a problem. */ struct rte_ring { - TAILQ_ENTRY(rte_ring) next; /**< Next in list. */ - char name[RTE_RING_NAMESIZE]; /**< Name of the ring. */ int flags; /**< Flags supplied at creation. */ + const struct rte_memzone *memzone; + /**< Memzone, if any, containing the rte_ring */ /** Ring producer status. */ struct prod { - volatile uint32_t bulk_default; /**< Default bulk count. */ uint32_t watermark; /**< Maximum items before EDQUOT. */ uint32_t sp_enqueue; /**< True, if single producer. */ uint32_t size; /**< Size of ring. */ @@ -152,28 +164,33 @@ struct rte_ring { /** Ring consumer status. */ struct cons { - volatile uint32_t bulk_default; /**< Default bulk count. */ uint32_t sc_dequeue; /**< True, if single consumer. */ uint32_t size; /**< Size of the ring. */ uint32_t mask; /**< Mask (size-1) of ring. */ volatile uint32_t head; /**< Consumer head. */ volatile uint32_t tail; /**< Consumer tail. */ +#ifdef RTE_RING_SPLIT_PROD_CONS } cons __rte_cache_aligned; - +#else + } cons; +#endif #ifdef RTE_LIBRTE_RING_DEBUG struct rte_ring_debug_stats stats[RTE_MAX_LCORE]; #endif - void * volatile ring[0] \ - __rte_cache_aligned; /**< Memory space of ring starts here. */ + void * ring[0] __rte_cache_aligned; /**< Memory space of ring starts here. + * not volatile so need to be careful + * about compiler re-ordering */ }; #define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */ #define RING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */ +#define RTE_RING_QUOT_EXCEED (1 << 31) /**< Quota exceed for burst ops */ +#define RTE_RING_SZ_MASK (unsigned)(0x0fffffff) /**< Ring size mask */ /** - * When debug is enabled, store ring statistics. + * @internal When debug is enabled, store ring statistics. * @param r * A pointer to the ring. * @param name @@ -182,23 +199,82 @@ struct rte_ring { * The number to add to the object-oriented statistics. */ #ifdef RTE_LIBRTE_RING_DEBUG -#define __RING_STAT_ADD(r, name, n) do { \ - unsigned __lcore_id = rte_lcore_id(); \ - r->stats[__lcore_id].name##_objs += n; \ - r->stats[__lcore_id].name##_bulk += 1; \ +#define __RING_STAT_ADD(r, name, n) do { \ + unsigned __lcore_id = rte_lcore_id(); \ + if (__lcore_id < RTE_MAX_LCORE) { \ + r->stats[__lcore_id].name##_objs += n; \ + r->stats[__lcore_id].name##_bulk += 1; \ + } \ } while(0) #else #define __RING_STAT_ADD(r, name, n) do {} while(0) #endif +/** + * Calculate the memory size needed for a ring + * + * This function returns the number of bytes needed for a ring, given + * the number of elements in it. This value is the sum of the size of + * the structure rte_ring and the size of the memory needed by the + * objects pointers. The value is aligned to a cache line size. + * + * @param count + * The number of elements in the ring (must be a power of 2). + * @return + * - The memory size needed for the ring on success. + * - -EINVAL if count is not a power of 2. + */ +ssize_t rte_ring_get_memsize(unsigned count); + +/** + * Initialize a ring structure. + * + * Initialize a ring structure in memory pointed by "r". The size of the + * memory area must be large enough to store the ring structure and the + * object table. It is advised to use rte_ring_get_memsize() to get the + * appropriate size. + * + * The ring size is set to *count*, which must be a power of two. Water + * marking is disabled by default. The real usable ring size is + * *count-1* instead of *count* to differentiate a free ring from an + * empty ring. + * + * The ring is not added in RTE_TAILQ_RING global list. Indeed, the + * memory given by the caller may not be shareable among dpdk + * processes. + * + * @param r + * The pointer to the ring structure followed by the objects table. + * @param name + * The name of the ring. + * @param count + * The number of elements in the ring (must be a power of 2). + * @param flags + * An OR of the following: + * - RING_F_SP_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "single-producer". Otherwise, it is "multi-producers". + * - RING_F_SC_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "single-consumer". Otherwise, it is "multi-consumers". + * @return + * 0 on success, or a negative value on error. + */ +int rte_ring_init(struct rte_ring *r, const char *name, unsigned count, + unsigned flags); + /** * Create a new ring named *name* in memory. * - * This function uses ``memzone_reserve()`` to allocate memory. Its size is - * set to *count*, which must be a power of two. Water marking is - * disabled by default. The default bulk count is initialized to 1. - * Note that the real usable ring size is *count-1* instead of - * *count*. + * This function uses ``memzone_reserve()`` to allocate memory. Then it + * calls rte_ring_init() to initialize an empty ring. + * + * The new ring size is set to *count*, which must be a power of + * two. Water marking is disabled by default. The real usable ring size + * is *count-1* instead of *count* to differentiate a free ring from an + * empty ring. + * + * The ring is added in RTE_TAILQ_RING list. * * @param name * The name of the ring. @@ -221,7 +297,6 @@ struct rte_ring { * rte_errno set appropriately. Possible errno values include: * - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure * - E_RTE_SECONDARY - function was called from a secondary process instance - * - E_RTE_NO_TAILQ - no tailq list could be got for the ring list * - EINVAL - count provided is not a power of 2 * - ENOSPC - the maximum number of memzones has already been allocated * - EEXIST - a memzone with the same name already exists @@ -229,45 +304,13 @@ struct rte_ring { */ struct rte_ring *rte_ring_create(const char *name, unsigned count, int socket_id, unsigned flags); - -/** - * Set the default bulk count for enqueue/dequeue. - * - * The parameter *count* is the default number of bulk elements to - * get/put when using ``rte_ring_*_{en,de}queue_bulk()``. It must be - * greater than 0 and less than half of the ring size. - * - * @param r - * A pointer to the ring structure. - * @param count - * A new water mark value. - * @return - * - 0: Success; default_bulk_count changed. - * - -EINVAL: Invalid count value. - */ -static inline int -rte_ring_set_bulk_count(struct rte_ring *r, unsigned count) -{ - if (unlikely(count == 0 || count >= r->prod.size)) - return -EINVAL; - - r->prod.bulk_default = r->cons.bulk_default = count; - return 0; -} - /** - * Get the default bulk count for enqueue/dequeue. + * De-allocate all memory used by the ring. * * @param r - * A pointer to the ring structure. - * @return - * The default bulk count for enqueue/dequeue. + * Ring to free */ -static inline unsigned -rte_ring_get_bulk_count(struct rte_ring *r) -{ - return r->prod.bulk_default; -} +void rte_ring_free(struct rte_ring *r); /** * Change the high water mark. @@ -276,7 +319,7 @@ rte_ring_get_bulk_count(struct rte_ring *r) * *count* value. The *count* value must be greater than 0 and less * than the ring size. * - * This function can be called at any time (not necessarilly at + * This function can be called at any time (not necessarily at * initialization). * * @param r @@ -292,13 +335,67 @@ int rte_ring_set_water_mark(struct rte_ring *r, unsigned count); /** * Dump the status of the ring to the console. * + * @param f + * A pointer to a file for output * @param r * A pointer to the ring structure. */ -void rte_ring_dump(const struct rte_ring *r); +void rte_ring_dump(FILE *f, const struct rte_ring *r); + +/* the actual enqueue of pointers on the ring. + * Placed here since identical code needed in both + * single and multi producer enqueue functions */ +#define ENQUEUE_PTRS() do { \ + const uint32_t size = r->prod.size; \ + uint32_t idx = prod_head & mask; \ + if (likely(idx + n < size)) { \ + for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \ + r->ring[idx] = obj_table[i]; \ + r->ring[idx+1] = obj_table[i+1]; \ + r->ring[idx+2] = obj_table[i+2]; \ + r->ring[idx+3] = obj_table[i+3]; \ + } \ + switch (n & 0x3) { \ + case 3: r->ring[idx++] = obj_table[i++]; \ + case 2: r->ring[idx++] = obj_table[i++]; \ + case 1: r->ring[idx++] = obj_table[i++]; \ + } \ + } else { \ + for (i = 0; idx < size; i++, idx++)\ + r->ring[idx] = obj_table[i]; \ + for (idx = 0; i < n; i++, idx++) \ + r->ring[idx] = obj_table[i]; \ + } \ +} while(0) + +/* the actual copy of pointers on the ring to obj_table. + * Placed here since identical code needed in both + * single and multi consumer dequeue functions */ +#define DEQUEUE_PTRS() do { \ + uint32_t idx = cons_head & mask; \ + const uint32_t size = r->cons.size; \ + if (likely(idx + n < size)) { \ + for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\ + obj_table[i] = r->ring[idx]; \ + obj_table[i+1] = r->ring[idx+1]; \ + obj_table[i+2] = r->ring[idx+2]; \ + obj_table[i+3] = r->ring[idx+3]; \ + } \ + switch (n & 0x3) { \ + case 3: obj_table[i++] = r->ring[idx++]; \ + case 2: obj_table[i++] = r->ring[idx++]; \ + case 1: obj_table[i++] = r->ring[idx++]; \ + } \ + } else { \ + for (i = 0; idx < size; i++, idx++) \ + obj_table[i] = r->ring[idx]; \ + for (idx = 0; i < n; i++, idx++) \ + obj_table[i] = r->ring[idx]; \ + } \ +} while (0) /** - * Enqueue several objects on the ring (multi-producers safe). + * @internal Enqueue several objects on the ring (multi-producers safe). * * This function uses a "compare and set" instruction to move the * producer index atomically. @@ -308,27 +405,37 @@ void rte_ring_dump(const struct rte_ring *r); * @param obj_table * A pointer to a table of void * pointers (objects). * @param n - * The number of objects to add in the ring from the obj_table. The - * value must be strictly positive. + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring * @return + * Depend on the behavior value + * if behavior = RTE_RING_QUEUE_FIXED * - 0: Success; objects enqueue. * - -EDQUOT: Quota exceeded. The objects have been enqueued, but the * high water mark is exceeded. * - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued. + * if behavior = RTE_RING_QUEUE_VARIABLE + * - n: Actual number of objects enqueued. */ -static inline int -rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, - unsigned n) +static inline int __attribute__((always_inline)) +__rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table, + unsigned n, enum rte_ring_queue_behavior behavior) { uint32_t prod_head, prod_next; uint32_t cons_tail, free_entries; + const unsigned max = n; int success; - unsigned i; + unsigned i, rep = 0; uint32_t mask = r->prod.mask; int ret; /* move prod.head atomically */ do { + /* Reset n to the initial burst count */ + n = max; + prod_head = r->prod.head; cons_tail = r->cons.tail; /* The subtraction is done between two unsigned 32bits value @@ -339,8 +446,19 @@ rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, /* check that we have enough room in ring */ if (unlikely(n > free_entries)) { - __RING_STAT_ADD(r, enq_fail, n); - return -ENOBUFS; + if (behavior == RTE_RING_QUEUE_FIXED) { + __RING_STAT_ADD(r, enq_fail, n); + return -ENOBUFS; + } + else { + /* No free entry available */ + if (unlikely(free_entries == 0)) { + __RING_STAT_ADD(r, enq_fail, n); + return 0; + } + + n = free_entries; + } } prod_next = prod_head + n; @@ -349,50 +467,65 @@ rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, } while (unlikely(success == 0)); /* write entries in ring */ - for (i = 0; likely(i < n); i++) - r->ring[(prod_head + i) & mask] = obj_table[i]; - rte_wmb(); + ENQUEUE_PTRS(); + rte_smp_wmb(); - /* return -EDQUOT if we exceed the watermark */ + /* if we exceed the watermark */ if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) { - ret = -EDQUOT; + ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT : + (int)(n | RTE_RING_QUOT_EXCEED); __RING_STAT_ADD(r, enq_quota, n); } else { - ret = 0; + ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n; __RING_STAT_ADD(r, enq_success, n); } /* - * If there are other enqueues in progress that preceeded us, + * If there are other enqueues in progress that preceded us, * we need to wait for them to complete */ - while (unlikely(r->prod.tail != prod_head)) + while (unlikely(r->prod.tail != prod_head)) { rte_pause(); + /* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting + * for other thread finish. It gives pre-empted thread a chance + * to proceed and finish with ring dequeue operation. */ + if (RTE_RING_PAUSE_REP_COUNT && + ++rep == RTE_RING_PAUSE_REP_COUNT) { + rep = 0; + sched_yield(); + } + } r->prod.tail = prod_next; return ret; } /** - * Enqueue several objects on a ring (NOT multi-producers safe). + * @internal Enqueue several objects on a ring (NOT multi-producers safe). * * @param r * A pointer to the ring structure. * @param obj_table * A pointer to a table of void * pointers (objects). * @param n - * The number of objects to add in the ring from the obj_table. The - * value must be strictly positive. + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring * @return - * - 0: Success; objects enqueued. + * Depend on the behavior value + * if behavior = RTE_RING_QUEUE_FIXED + * - 0: Success; objects enqueue. * - -EDQUOT: Quota exceeded. The objects have been enqueued, but the * high water mark is exceeded. - * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. + * - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued. + * if behavior = RTE_RING_QUEUE_VARIABLE + * - n: Actual number of objects enqueued. */ -static inline int -rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, - unsigned n) +static inline int __attribute__((always_inline)) +__rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table, + unsigned n, enum rte_ring_queue_behavior behavior) { uint32_t prod_head, cons_tail; uint32_t prod_next, free_entries; @@ -410,25 +543,36 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, /* check that we have enough room in ring */ if (unlikely(n > free_entries)) { - __RING_STAT_ADD(r, enq_fail, n); - return -ENOBUFS; + if (behavior == RTE_RING_QUEUE_FIXED) { + __RING_STAT_ADD(r, enq_fail, n); + return -ENOBUFS; + } + else { + /* No free entry available */ + if (unlikely(free_entries == 0)) { + __RING_STAT_ADD(r, enq_fail, n); + return 0; + } + + n = free_entries; + } } prod_next = prod_head + n; r->prod.head = prod_next; /* write entries in ring */ - for (i = 0; likely(i < n); i++) - r->ring[(prod_head + i) & mask] = obj_table[i]; - rte_wmb(); + ENQUEUE_PTRS(); + rte_smp_wmb(); - /* return -EDQUOT if we exceed the watermark */ + /* if we exceed the watermark */ if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) { - ret = -EDQUOT; + ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT : + (int)(n | RTE_RING_QUOT_EXCEED); __RING_STAT_ADD(r, enq_quota, n); } else { - ret = 0; + ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n; __RING_STAT_ADD(r, enq_success, n); } @@ -436,6 +580,218 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, return ret; } +/** + * @internal Dequeue several objects from a ring (multi-consumers safe). When + * the request objects are more than the available objects, only dequeue the + * actual number of objects + * + * This function uses a "compare and set" instruction to move the + * consumer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring + * @return + * Depend on the behavior value + * if behavior = RTE_RING_QUEUE_FIXED + * - 0: Success; objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue; no object is + * dequeued. + * if behavior = RTE_RING_QUEUE_VARIABLE + * - n: Actual number of objects dequeued. + */ + +static inline int __attribute__((always_inline)) +__rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table, + unsigned n, enum rte_ring_queue_behavior behavior) +{ + uint32_t cons_head, prod_tail; + uint32_t cons_next, entries; + const unsigned max = n; + int success; + unsigned i, rep = 0; + uint32_t mask = r->prod.mask; + + /* move cons.head atomically */ + do { + /* Restore n as it may change every loop */ + n = max; + + cons_head = r->cons.head; + prod_tail = r->prod.tail; + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * cons_head > prod_tail). So 'entries' is always between 0 + * and size(ring)-1. */ + entries = (prod_tail - cons_head); + + /* Set the actual entries for dequeue */ + if (n > entries) { + if (behavior == RTE_RING_QUEUE_FIXED) { + __RING_STAT_ADD(r, deq_fail, n); + return -ENOENT; + } + else { + if (unlikely(entries == 0)){ + __RING_STAT_ADD(r, deq_fail, n); + return 0; + } + + n = entries; + } + } + + cons_next = cons_head + n; + success = rte_atomic32_cmpset(&r->cons.head, cons_head, + cons_next); + } while (unlikely(success == 0)); + + /* copy in table */ + DEQUEUE_PTRS(); + rte_smp_rmb(); + + /* + * If there are other dequeues in progress that preceded us, + * we need to wait for them to complete + */ + while (unlikely(r->cons.tail != cons_head)) { + rte_pause(); + + /* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting + * for other thread finish. It gives pre-empted thread a chance + * to proceed and finish with ring dequeue operation. */ + if (RTE_RING_PAUSE_REP_COUNT && + ++rep == RTE_RING_PAUSE_REP_COUNT) { + rep = 0; + sched_yield(); + } + } + __RING_STAT_ADD(r, deq_success, n); + r->cons.tail = cons_next; + + return behavior == RTE_RING_QUEUE_FIXED ? 0 : n; +} + +/** + * @internal Dequeue several objects from a ring (NOT multi-consumers safe). + * When the request objects are more than the available objects, only dequeue + * the actual number of objects + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring + * @return + * Depend on the behavior value + * if behavior = RTE_RING_QUEUE_FIXED + * - 0: Success; objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue; no object is + * dequeued. + * if behavior = RTE_RING_QUEUE_VARIABLE + * - n: Actual number of objects dequeued. + */ +static inline int __attribute__((always_inline)) +__rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table, + unsigned n, enum rte_ring_queue_behavior behavior) +{ + uint32_t cons_head, prod_tail; + uint32_t cons_next, entries; + unsigned i; + uint32_t mask = r->prod.mask; + + cons_head = r->cons.head; + prod_tail = r->prod.tail; + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * cons_head > prod_tail). So 'entries' is always between 0 + * and size(ring)-1. */ + entries = prod_tail - cons_head; + + if (n > entries) { + if (behavior == RTE_RING_QUEUE_FIXED) { + __RING_STAT_ADD(r, deq_fail, n); + return -ENOENT; + } + else { + if (unlikely(entries == 0)){ + __RING_STAT_ADD(r, deq_fail, n); + return 0; + } + + n = entries; + } + } + + cons_next = cons_head + n; + r->cons.head = cons_next; + + /* copy in table */ + DEQUEUE_PTRS(); + rte_smp_rmb(); + + __RING_STAT_ADD(r, deq_success, n); + r->cons.tail = cons_next; + return behavior == RTE_RING_QUEUE_FIXED ? 0 : n; +} + +/** + * Enqueue several objects on the ring (multi-producers safe). + * + * This function uses a "compare and set" instruction to move the + * producer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @return + * - 0: Success; objects enqueue. + * - -EDQUOT: Quota exceeded. The objects have been enqueued, but the + * high water mark is exceeded. + * - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued. + */ +static inline int __attribute__((always_inline)) +rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, + unsigned n) +{ + return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED); +} + +/** + * Enqueue several objects on a ring (NOT multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @return + * - 0: Success; objects enqueued. + * - -EDQUOT: Quota exceeded. The objects have been enqueued, but the + * high water mark is exceeded. + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. + */ +static inline int __attribute__((always_inline)) +rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, + unsigned n) +{ + return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED); +} + /** * Enqueue several objects on a ring. * @@ -455,7 +811,7 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, * high water mark is exceeded. * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. */ -static inline int +static inline int __attribute__((always_inline)) rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned n) { @@ -481,7 +837,7 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, * high water mark is exceeded. * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. */ -static inline int +static inline int __attribute__((always_inline)) rte_ring_mp_enqueue(struct rte_ring *r, void *obj) { return rte_ring_mp_enqueue_bulk(r, &obj, 1); @@ -500,7 +856,7 @@ rte_ring_mp_enqueue(struct rte_ring *r, void *obj) * high water mark is exceeded. * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. */ -static inline int +static inline int __attribute__((always_inline)) rte_ring_sp_enqueue(struct rte_ring *r, void *obj) { return rte_ring_sp_enqueue_bulk(r, &obj, 1); @@ -523,7 +879,7 @@ rte_ring_sp_enqueue(struct rte_ring *r, void *obj) * high water mark is exceeded. * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. */ -static inline int +static inline int __attribute__((always_inline)) rte_ring_enqueue(struct rte_ring *r, void *obj) { if (r->prod.sp_enqueue) @@ -543,59 +899,16 @@ rte_ring_enqueue(struct rte_ring *r, void *obj) * @param obj_table * A pointer to a table of void * pointers (objects) that will be filled. * @param n - * The number of objects to dequeue from the ring to the obj_table, - * must be strictly positive + * The number of objects to dequeue from the ring to the obj_table. * @return * - 0: Success; objects dequeued. * - -ENOENT: Not enough entries in the ring to dequeue; no object is * dequeued. */ -static inline int +static inline int __attribute__((always_inline)) rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n) { - uint32_t cons_head, prod_tail; - uint32_t cons_next, entries; - int success; - unsigned i; - uint32_t mask = r->prod.mask; - - /* move cons.head atomically */ - do { - cons_head = r->cons.head; - prod_tail = r->prod.tail; - /* The subtraction is done between two unsigned 32bits value - * (the result is always modulo 32 bits even if we have - * cons_head > prod_tail). So 'entries' is always between 0 - * and size(ring)-1. */ - entries = (prod_tail - cons_head); - - /* check that we have enough entries in ring */ - if (unlikely(n > entries)) { - __RING_STAT_ADD(r, deq_fail, n); - return -ENOENT; - } - - cons_next = cons_head + n; - success = rte_atomic32_cmpset(&r->cons.head, cons_head, - cons_next); - } while (unlikely(success == 0)); - - /* copy in table */ - rte_rmb(); - for (i = 0; likely(i < n); i++) { - obj_table[i] = r->ring[(cons_head + i) & mask]; - } - - /* - * If there are other dequeues in progress that preceeded us, - * we need to wait for them to complete - */ - while (unlikely(r->cons.tail != cons_head)) - rte_pause(); - - __RING_STAT_ADD(r, deq_success, n); - r->cons.tail = cons_next; - return 0; + return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED); } /** @@ -613,40 +926,10 @@ rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n) * - -ENOENT: Not enough entries in the ring to dequeue; no object is * dequeued. */ -static inline int +static inline int __attribute__((always_inline)) rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n) { - uint32_t cons_head, prod_tail; - uint32_t cons_next, entries; - unsigned i; - uint32_t mask = r->prod.mask; - - cons_head = r->cons.head; - prod_tail = r->prod.tail; - /* The subtraction is done between two unsigned 32bits value - * (the result is always modulo 32 bits even if we have - * cons_head > prod_tail). So 'entries' is always between 0 - * and size(ring)-1. */ - entries = prod_tail - cons_head; - - /* check that we have enough entries in ring */ - if (unlikely(n > entries)) { - __RING_STAT_ADD(r, deq_fail, n); - return -ENOENT; - } - - cons_next = cons_head + n; - r->cons.head = cons_next; - - /* copy in table */ - rte_rmb(); - for (i = 0; likely(i < n); i++) { - obj_table[i] = r->ring[(cons_head + i) & mask]; - } - - __RING_STAT_ADD(r, deq_success, n); - r->cons.tail = cons_next; - return 0; + return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED); } /** @@ -667,7 +950,7 @@ rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n) * - -ENOENT: Not enough entries in the ring to dequeue, no object is * dequeued. */ -static inline int +static inline int __attribute__((always_inline)) rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n) { if (r->cons.sc_dequeue) @@ -691,7 +974,7 @@ rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n) * - -ENOENT: Not enough entries in the ring to dequeue; no object is * dequeued. */ -static inline int +static inline int __attribute__((always_inline)) rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p) { return rte_ring_mc_dequeue_bulk(r, obj_p, 1); @@ -709,7 +992,7 @@ rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p) * - -ENOENT: Not enough entries in the ring to dequeue, no object is * dequeued. */ -static inline int +static inline int __attribute__((always_inline)) rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p) { return rte_ring_sc_dequeue_bulk(r, obj_p, 1); @@ -731,7 +1014,7 @@ rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p) * - -ENOENT: Not enough entries in the ring to dequeue, no object is * dequeued. */ -static inline int +static inline int __attribute__((always_inline)) rte_ring_dequeue(struct rte_ring *r, void **obj_p) { if (r->cons.sc_dequeue) @@ -754,7 +1037,7 @@ rte_ring_full(const struct rte_ring *r) { uint32_t prod_tail = r->prod.tail; uint32_t cons_tail = r->cons.tail; - return (((cons_tail - prod_tail - 1) & r->prod.mask) == 0); + return ((cons_tail - prod_tail - 1) & r->prod.mask) == 0; } /** @@ -787,7 +1070,7 @@ rte_ring_count(const struct rte_ring *r) { uint32_t prod_tail = r->prod.tail; uint32_t cons_tail = r->cons.tail; - return ((prod_tail - cons_tail) & r->prod.mask); + return (prod_tail - cons_tail) & r->prod.mask; } /** @@ -803,13 +1086,16 @@ rte_ring_free_count(const struct rte_ring *r) { uint32_t prod_tail = r->prod.tail; uint32_t cons_tail = r->cons.tail; - return ((cons_tail - prod_tail - 1) & r->prod.mask); + return (cons_tail - prod_tail - 1) & r->prod.mask; } /** * Dump the status of all rings on the console + * + * @param f + * A pointer to a file for output */ -void rte_ring_list_dump(void); +void rte_ring_list_dump(FILE *f); /** * Search a ring from its name @@ -823,6 +1109,141 @@ void rte_ring_list_dump(void); */ struct rte_ring *rte_ring_lookup(const char *name); +/** + * Enqueue several objects on the ring (multi-producers safe). + * + * This function uses a "compare and set" instruction to move the + * producer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @return + * - n: Actual number of objects enqueued. + */ +static inline unsigned __attribute__((always_inline)) +rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned n) +{ + return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE); +} + +/** + * Enqueue several objects on a ring (NOT multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @return + * - n: Actual number of objects enqueued. + */ +static inline unsigned __attribute__((always_inline)) +rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned n) +{ + return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE); +} + +/** + * Enqueue several objects on a ring. + * + * This function calls the multi-producer or the single-producer + * version depending on the default behavior that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @return + * - n: Actual number of objects enqueued. + */ +static inline unsigned __attribute__((always_inline)) +rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned n) +{ + if (r->prod.sp_enqueue) + return rte_ring_sp_enqueue_burst(r, obj_table, n); + else + return rte_ring_mp_enqueue_burst(r, obj_table, n); +} + +/** + * Dequeue several objects from a ring (multi-consumers safe). When the request + * objects are more than the available objects, only dequeue the actual number + * of objects + * + * This function uses a "compare and set" instruction to move the + * consumer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +static inline unsigned __attribute__((always_inline)) +rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n) +{ + return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE); +} + +/** + * Dequeue several objects from a ring (NOT multi-consumers safe).When the + * request objects are more than the available objects, only dequeue the + * actual number of objects + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +static inline unsigned __attribute__((always_inline)) +rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n) +{ + return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE); +} + +/** + * Dequeue multiple objects from a ring up to a maximum number. + * + * This function calls the multi-consumers or the single-consumer + * version, depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @return + * - Number of objects dequeued + */ +static inline unsigned __attribute__((always_inline)) +rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n) +{ + if (r->cons.sc_dequeue) + return rte_ring_sc_dequeue_burst(r, obj_table, n); + else + return rte_ring_mc_dequeue_burst(r, obj_table, n); +} + #ifdef __cplusplus } #endif