X-Git-Url: http://git.droids-corp.org/?a=blobdiff_plain;f=lib%2Flibrte_ring%2Frte_ring.h;h=7763f1d5af8d2cdcbb6b7652a38ddac014aa11b6;hb=e25e4d7ef16b;hp=4086c78caf9b43dd103136c8b88f24a20101cc8f;hpb=af75078fece3615088e561357c1e97603e43a5fe;p=dpdk.git diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index 4086c78caf..7763f1d5af 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -1,36 +1,34 @@ /*- * BSD LICENSE * - * Copyright(c) 2010-2012 Intel Corporation. All rights reserved. + * Copyright(c) 2010-2013 Intel Corporation. All rights reserved. * All rights reserved. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions * are met: * - * * Redistributions of source code must retain the above copyright + * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in - * the documentation and/or other materials provided with the + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the * distribution. - * * Neither the name of Intel Corporation nor the names of its - * contributors may be used to endorse or promote products derived + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived * from this software without specific prior written permission. * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - * version: DPDK.L.1.2.3-3 */ /* @@ -102,6 +100,10 @@ extern "C" { #include #include +enum rte_ring_queue_behavior { + RTE_RING_QUEUE_FIXED = 0, /* Enq/Deq a fixed number of items from a ring */ + RTE_RING_QUEUE_VARIABLE /* Enq/Deq as many items a possible from ring */ +}; #ifdef RTE_LIBRTE_RING_DEBUG /** @@ -141,7 +143,6 @@ struct rte_ring { /** Ring producer status. */ struct prod { - volatile uint32_t bulk_default; /**< Default bulk count. */ uint32_t watermark; /**< Maximum items before EDQUOT. */ uint32_t sp_enqueue; /**< True, if single producer. */ uint32_t size; /**< Size of ring. */ @@ -152,28 +153,36 @@ struct rte_ring { /** Ring consumer status. */ struct cons { - volatile uint32_t bulk_default; /**< Default bulk count. */ uint32_t sc_dequeue; /**< True, if single consumer. */ uint32_t size; /**< Size of the ring. */ uint32_t mask; /**< Mask (size-1) of ring. */ volatile uint32_t head; /**< Consumer head. */ volatile uint32_t tail; /**< Consumer tail. */ +#ifdef RTE_RING_SPLIT_PROD_CONS } cons __rte_cache_aligned; - +#else + } cons; +#endif #ifdef RTE_LIBRTE_RING_DEBUG struct rte_ring_debug_stats stats[RTE_MAX_LCORE]; #endif - void * volatile ring[0] \ - __rte_cache_aligned; /**< Memory space of ring starts here. */ + void * ring[0] __rte_cache_aligned; /**< Memory space of ring starts here. + * not volatile so need to be careful + * about compiler re-ordering */ }; +/* dummy assembly operation to prevent compiler re-ordering of instructions */ +#define COMPILER_BARRIER() do { asm volatile("" ::: "memory"); } while(0) + #define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */ #define RING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */ +#define RTE_RING_QUOT_EXCEED (1 << 31) /**< Quota exceed for burst ops */ +#define RTE_RING_SZ_MASK (unsigned)(0x0fffffff) /**< Ring size mask */ /** - * When debug is enabled, store ring statistics. + * @internal When debug is enabled, store ring statistics. * @param r * A pointer to the ring. * @param name @@ -196,7 +205,7 @@ struct rte_ring { * * This function uses ``memzone_reserve()`` to allocate memory. Its size is * set to *count*, which must be a power of two. Water marking is - * disabled by default. The default bulk count is initialized to 1. + * disabled by default. * Note that the real usable ring size is *count-1* instead of * *count*. * @@ -230,45 +239,6 @@ struct rte_ring { struct rte_ring *rte_ring_create(const char *name, unsigned count, int socket_id, unsigned flags); -/** - * Set the default bulk count for enqueue/dequeue. - * - * The parameter *count* is the default number of bulk elements to - * get/put when using ``rte_ring_*_{en,de}queue_bulk()``. It must be - * greater than 0 and less than half of the ring size. - * - * @param r - * A pointer to the ring structure. - * @param count - * A new water mark value. - * @return - * - 0: Success; default_bulk_count changed. - * - -EINVAL: Invalid count value. - */ -static inline int -rte_ring_set_bulk_count(struct rte_ring *r, unsigned count) -{ - if (unlikely(count == 0 || count >= r->prod.size)) - return -EINVAL; - - r->prod.bulk_default = r->cons.bulk_default = count; - return 0; -} - -/** - * Get the default bulk count for enqueue/dequeue. - * - * @param r - * A pointer to the ring structure. - * @return - * The default bulk count for enqueue/dequeue. - */ -static inline unsigned -rte_ring_get_bulk_count(struct rte_ring *r) -{ - return r->prod.bulk_default; -} - /** * Change the high water mark. * @@ -276,7 +246,7 @@ rte_ring_get_bulk_count(struct rte_ring *r) * *count* value. The *count* value must be greater than 0 and less * than the ring size. * - * This function can be called at any time (not necessarilly at + * This function can be called at any time (not necessarily at * initialization). * * @param r @@ -297,8 +267,60 @@ int rte_ring_set_water_mark(struct rte_ring *r, unsigned count); */ void rte_ring_dump(const struct rte_ring *r); +/* the actual enqueue of pointers on the ring. + * Placed here since identical code needed in both + * single and multi producer enqueue functions */ +#define ENQUEUE_PTRS() do { \ + const uint32_t size = r->prod.size; \ + uint32_t idx = prod_head & mask; \ + if (likely(idx + n < size)) { \ + for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \ + r->ring[idx] = obj_table[i]; \ + r->ring[idx+1] = obj_table[i+1]; \ + r->ring[idx+2] = obj_table[i+2]; \ + r->ring[idx+3] = obj_table[i+3]; \ + } \ + switch (n & 0x3) { \ + case 3: r->ring[idx++] = obj_table[i++]; \ + case 2: r->ring[idx++] = obj_table[i++]; \ + case 1: r->ring[idx++] = obj_table[i++]; \ + } \ + } else { \ + for (i = 0; idx < size; i++, idx++)\ + r->ring[idx] = obj_table[i]; \ + for (idx = 0; i < n; i++, idx++) \ + r->ring[idx] = obj_table[i]; \ + } \ +} while(0) + +/* the actual copy of pointers on the ring to obj_table. + * Placed here since identical code needed in both + * single and multi consumer dequeue functions */ +#define DEQUEUE_PTRS() do { \ + uint32_t idx = cons_head & mask; \ + const uint32_t size = r->cons.size; \ + if (likely(idx + n < size)) { \ + for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\ + obj_table[i] = r->ring[idx]; \ + obj_table[i+1] = r->ring[idx+1]; \ + obj_table[i+2] = r->ring[idx+2]; \ + obj_table[i+3] = r->ring[idx+3]; \ + } \ + switch (n & 0x3) { \ + case 3: obj_table[i++] = r->ring[idx++]; \ + case 2: obj_table[i++] = r->ring[idx++]; \ + case 1: obj_table[i++] = r->ring[idx++]; \ + } \ + } else { \ + for (i = 0; idx < size; i++, idx++) \ + obj_table[i] = r->ring[idx]; \ + for (idx = 0; i < n; i++, idx++) \ + obj_table[i] = r->ring[idx]; \ + } \ +} while (0) + /** - * Enqueue several objects on the ring (multi-producers safe). + * @internal Enqueue several objects on the ring (multi-producers safe). * * This function uses a "compare and set" instruction to move the * producer index atomically. @@ -308,20 +330,27 @@ void rte_ring_dump(const struct rte_ring *r); * @param obj_table * A pointer to a table of void * pointers (objects). * @param n - * The number of objects to add in the ring from the obj_table. The - * value must be strictly positive. + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring * @return + * Depend on the behavior value + * if behavior = RTE_RING_QUEUE_FIXED * - 0: Success; objects enqueue. * - -EDQUOT: Quota exceeded. The objects have been enqueued, but the * high water mark is exceeded. * - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued. + * if behavior = RTE_RING_QUEUE_VARIABLE + * - n: Actual number of objects enqueued. */ -static inline int -rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, - unsigned n) +static inline int __attribute__((always_inline)) +__rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table, + unsigned n, enum rte_ring_queue_behavior behavior) { uint32_t prod_head, prod_next; uint32_t cons_tail, free_entries; + const unsigned max = n; int success; unsigned i; uint32_t mask = r->prod.mask; @@ -329,6 +358,9 @@ rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, /* move prod.head atomically */ do { + /* Reset n to the initial burst count */ + n = max; + prod_head = r->prod.head; cons_tail = r->cons.tail; /* The subtraction is done between two unsigned 32bits value @@ -339,8 +371,19 @@ rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, /* check that we have enough room in ring */ if (unlikely(n > free_entries)) { - __RING_STAT_ADD(r, enq_fail, n); - return -ENOBUFS; + if (behavior == RTE_RING_QUEUE_FIXED) { + __RING_STAT_ADD(r, enq_fail, n); + return -ENOBUFS; + } + else { + /* No free entry available */ + if (unlikely(free_entries == 0)) { + __RING_STAT_ADD(r, enq_fail, n); + return 0; + } + + n = free_entries; + } } prod_next = prod_head + n; @@ -349,17 +392,17 @@ rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, } while (unlikely(success == 0)); /* write entries in ring */ - for (i = 0; likely(i < n); i++) - r->ring[(prod_head + i) & mask] = obj_table[i]; - rte_wmb(); + ENQUEUE_PTRS(); + COMPILER_BARRIER(); - /* return -EDQUOT if we exceed the watermark */ + /* if we exceed the watermark */ if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) { - ret = -EDQUOT; + ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT : + (int)(n | RTE_RING_QUOT_EXCEED); __RING_STAT_ADD(r, enq_quota, n); } else { - ret = 0; + ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n; __RING_STAT_ADD(r, enq_success, n); } @@ -375,24 +418,30 @@ rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, } /** - * Enqueue several objects on a ring (NOT multi-producers safe). + * @internal Enqueue several objects on a ring (NOT multi-producers safe). * * @param r * A pointer to the ring structure. * @param obj_table * A pointer to a table of void * pointers (objects). * @param n - * The number of objects to add in the ring from the obj_table. The - * value must be strictly positive. + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring * @return - * - 0: Success; objects enqueued. + * Depend on the behavior value + * if behavior = RTE_RING_QUEUE_FIXED + * - 0: Success; objects enqueue. * - -EDQUOT: Quota exceeded. The objects have been enqueued, but the * high water mark is exceeded. - * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. + * - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued. + * if behavior = RTE_RING_QUEUE_VARIABLE + * - n: Actual number of objects enqueued. */ -static inline int -rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, - unsigned n) +static inline int __attribute__((always_inline)) +__rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table, + unsigned n, enum rte_ring_queue_behavior behavior) { uint32_t prod_head, cons_tail; uint32_t prod_next, free_entries; @@ -410,25 +459,36 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, /* check that we have enough room in ring */ if (unlikely(n > free_entries)) { - __RING_STAT_ADD(r, enq_fail, n); - return -ENOBUFS; + if (behavior == RTE_RING_QUEUE_FIXED) { + __RING_STAT_ADD(r, enq_fail, n); + return -ENOBUFS; + } + else { + /* No free entry available */ + if (unlikely(free_entries == 0)) { + __RING_STAT_ADD(r, enq_fail, n); + return 0; + } + + n = free_entries; + } } prod_next = prod_head + n; r->prod.head = prod_next; /* write entries in ring */ - for (i = 0; likely(i < n); i++) - r->ring[(prod_head + i) & mask] = obj_table[i]; - rte_wmb(); + ENQUEUE_PTRS(); + COMPILER_BARRIER(); - /* return -EDQUOT if we exceed the watermark */ + /* if we exceed the watermark */ if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) { - ret = -EDQUOT; + ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT : + (int)(n | RTE_RING_QUOT_EXCEED); __RING_STAT_ADD(r, enq_quota, n); } else { - ret = 0; + ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n; __RING_STAT_ADD(r, enq_success, n); } @@ -436,6 +496,209 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, return ret; } +/** + * @internal Dequeue several objects from a ring (multi-consumers safe). When + * the request objects are more than the available objects, only dequeue the + * actual number of objects + * + * This function uses a "compare and set" instruction to move the + * consumer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring + * @return + * Depend on the behavior value + * if behavior = RTE_RING_QUEUE_FIXED + * - 0: Success; objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue; no object is + * dequeued. + * if behavior = RTE_RING_QUEUE_VARIABLE + * - n: Actual number of objects dequeued. + */ + +static inline int __attribute__((always_inline)) +__rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table, + unsigned n, enum rte_ring_queue_behavior behavior) +{ + uint32_t cons_head, prod_tail; + uint32_t cons_next, entries; + const unsigned max = n; + int success; + unsigned i; + uint32_t mask = r->prod.mask; + + /* move cons.head atomically */ + do { + /* Restore n as it may change every loop */ + n = max; + + cons_head = r->cons.head; + prod_tail = r->prod.tail; + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * cons_head > prod_tail). So 'entries' is always between 0 + * and size(ring)-1. */ + entries = (prod_tail - cons_head); + + /* Set the actual entries for dequeue */ + if (n > entries) { + if (behavior == RTE_RING_QUEUE_FIXED) { + __RING_STAT_ADD(r, deq_fail, n); + return -ENOENT; + } + else { + if (unlikely(entries == 0)){ + __RING_STAT_ADD(r, deq_fail, n); + return 0; + } + + n = entries; + } + } + + cons_next = cons_head + n; + success = rte_atomic32_cmpset(&r->cons.head, cons_head, + cons_next); + } while (unlikely(success == 0)); + + /* copy in table */ + DEQUEUE_PTRS(); + COMPILER_BARRIER(); + + /* + * If there are other dequeues in progress that preceded us, + * we need to wait for them to complete + */ + while (unlikely(r->cons.tail != cons_head)) + rte_pause(); + + __RING_STAT_ADD(r, deq_success, n); + r->cons.tail = cons_next; + + return behavior == RTE_RING_QUEUE_FIXED ? 0 : n; +} + +/** + * @internal Dequeue several objects from a ring (NOT multi-consumers safe). + * When the request objects are more than the available objects, only dequeue + * the actual number of objects + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring + * @return + * Depend on the behavior value + * if behavior = RTE_RING_QUEUE_FIXED + * - 0: Success; objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue; no object is + * dequeued. + * if behavior = RTE_RING_QUEUE_VARIABLE + * - n: Actual number of objects dequeued. + */ +static inline int __attribute__((always_inline)) +__rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table, + unsigned n, enum rte_ring_queue_behavior behavior) +{ + uint32_t cons_head, prod_tail; + uint32_t cons_next, entries; + unsigned i; + uint32_t mask = r->prod.mask; + + cons_head = r->cons.head; + prod_tail = r->prod.tail; + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * cons_head > prod_tail). So 'entries' is always between 0 + * and size(ring)-1. */ + entries = prod_tail - cons_head; + + if (n > entries) { + if (behavior == RTE_RING_QUEUE_FIXED) { + __RING_STAT_ADD(r, deq_fail, n); + return -ENOENT; + } + else { + if (unlikely(entries == 0)){ + __RING_STAT_ADD(r, deq_fail, n); + return 0; + } + + n = entries; + } + } + + cons_next = cons_head + n; + r->cons.head = cons_next; + + /* copy in table */ + DEQUEUE_PTRS(); + COMPILER_BARRIER(); + + __RING_STAT_ADD(r, deq_success, n); + r->cons.tail = cons_next; + return behavior == RTE_RING_QUEUE_FIXED ? 0 : n; +} + +/** + * Enqueue several objects on the ring (multi-producers safe). + * + * This function uses a "compare and set" instruction to move the + * producer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @return + * - 0: Success; objects enqueue. + * - -EDQUOT: Quota exceeded. The objects have been enqueued, but the + * high water mark is exceeded. + * - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued. + */ +static inline int __attribute__((always_inline)) +rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, + unsigned n) +{ + return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED); +} + +/** + * Enqueue several objects on a ring (NOT multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @return + * - 0: Success; objects enqueued. + * - -EDQUOT: Quota exceeded. The objects have been enqueued, but the + * high water mark is exceeded. + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. + */ +static inline int __attribute__((always_inline)) +rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, + unsigned n) +{ + return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED); +} + /** * Enqueue several objects on a ring. * @@ -455,7 +718,7 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, * high water mark is exceeded. * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. */ -static inline int +static inline int __attribute__((always_inline)) rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, unsigned n) { @@ -481,7 +744,7 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, * high water mark is exceeded. * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. */ -static inline int +static inline int __attribute__((always_inline)) rte_ring_mp_enqueue(struct rte_ring *r, void *obj) { return rte_ring_mp_enqueue_bulk(r, &obj, 1); @@ -500,7 +763,7 @@ rte_ring_mp_enqueue(struct rte_ring *r, void *obj) * high water mark is exceeded. * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. */ -static inline int +static inline int __attribute__((always_inline)) rte_ring_sp_enqueue(struct rte_ring *r, void *obj) { return rte_ring_sp_enqueue_bulk(r, &obj, 1); @@ -523,7 +786,7 @@ rte_ring_sp_enqueue(struct rte_ring *r, void *obj) * high water mark is exceeded. * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. */ -static inline int +static inline int __attribute__((always_inline)) rte_ring_enqueue(struct rte_ring *r, void *obj) { if (r->prod.sp_enqueue) @@ -543,59 +806,16 @@ rte_ring_enqueue(struct rte_ring *r, void *obj) * @param obj_table * A pointer to a table of void * pointers (objects) that will be filled. * @param n - * The number of objects to dequeue from the ring to the obj_table, - * must be strictly positive + * The number of objects to dequeue from the ring to the obj_table. * @return * - 0: Success; objects dequeued. * - -ENOENT: Not enough entries in the ring to dequeue; no object is * dequeued. */ -static inline int +static inline int __attribute__((always_inline)) rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n) { - uint32_t cons_head, prod_tail; - uint32_t cons_next, entries; - int success; - unsigned i; - uint32_t mask = r->prod.mask; - - /* move cons.head atomically */ - do { - cons_head = r->cons.head; - prod_tail = r->prod.tail; - /* The subtraction is done between two unsigned 32bits value - * (the result is always modulo 32 bits even if we have - * cons_head > prod_tail). So 'entries' is always between 0 - * and size(ring)-1. */ - entries = (prod_tail - cons_head); - - /* check that we have enough entries in ring */ - if (unlikely(n > entries)) { - __RING_STAT_ADD(r, deq_fail, n); - return -ENOENT; - } - - cons_next = cons_head + n; - success = rte_atomic32_cmpset(&r->cons.head, cons_head, - cons_next); - } while (unlikely(success == 0)); - - /* copy in table */ - rte_rmb(); - for (i = 0; likely(i < n); i++) { - obj_table[i] = r->ring[(cons_head + i) & mask]; - } - - /* - * If there are other dequeues in progress that preceeded us, - * we need to wait for them to complete - */ - while (unlikely(r->cons.tail != cons_head)) - rte_pause(); - - __RING_STAT_ADD(r, deq_success, n); - r->cons.tail = cons_next; - return 0; + return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED); } /** @@ -613,40 +833,10 @@ rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n) * - -ENOENT: Not enough entries in the ring to dequeue; no object is * dequeued. */ -static inline int +static inline int __attribute__((always_inline)) rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n) { - uint32_t cons_head, prod_tail; - uint32_t cons_next, entries; - unsigned i; - uint32_t mask = r->prod.mask; - - cons_head = r->cons.head; - prod_tail = r->prod.tail; - /* The subtraction is done between two unsigned 32bits value - * (the result is always modulo 32 bits even if we have - * cons_head > prod_tail). So 'entries' is always between 0 - * and size(ring)-1. */ - entries = prod_tail - cons_head; - - /* check that we have enough entries in ring */ - if (unlikely(n > entries)) { - __RING_STAT_ADD(r, deq_fail, n); - return -ENOENT; - } - - cons_next = cons_head + n; - r->cons.head = cons_next; - - /* copy in table */ - rte_rmb(); - for (i = 0; likely(i < n); i++) { - obj_table[i] = r->ring[(cons_head + i) & mask]; - } - - __RING_STAT_ADD(r, deq_success, n); - r->cons.tail = cons_next; - return 0; + return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED); } /** @@ -667,7 +857,7 @@ rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n) * - -ENOENT: Not enough entries in the ring to dequeue, no object is * dequeued. */ -static inline int +static inline int __attribute__((always_inline)) rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n) { if (r->cons.sc_dequeue) @@ -691,7 +881,7 @@ rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n) * - -ENOENT: Not enough entries in the ring to dequeue; no object is * dequeued. */ -static inline int +static inline int __attribute__((always_inline)) rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p) { return rte_ring_mc_dequeue_bulk(r, obj_p, 1); @@ -709,7 +899,7 @@ rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p) * - -ENOENT: Not enough entries in the ring to dequeue, no object is * dequeued. */ -static inline int +static inline int __attribute__((always_inline)) rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p) { return rte_ring_sc_dequeue_bulk(r, obj_p, 1); @@ -731,7 +921,7 @@ rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p) * - -ENOENT: Not enough entries in the ring to dequeue, no object is * dequeued. */ -static inline int +static inline int __attribute__((always_inline)) rte_ring_dequeue(struct rte_ring *r, void **obj_p) { if (r->cons.sc_dequeue) @@ -823,6 +1013,141 @@ void rte_ring_list_dump(void); */ struct rte_ring *rte_ring_lookup(const char *name); +/** + * Enqueue several objects on the ring (multi-producers safe). + * + * This function uses a "compare and set" instruction to move the + * producer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @return + * - n: Actual number of objects enqueued. + */ +static inline int __attribute__((always_inline)) +rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned n) +{ + return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE); +} + +/** + * Enqueue several objects on a ring (NOT multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @return + * - n: Actual number of objects enqueued. + */ +static inline int __attribute__((always_inline)) +rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned n) +{ + return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE); +} + +/** + * Enqueue several objects on a ring. + * + * This function calls the multi-producer or the single-producer + * version depending on the default behavior that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @return + * - n: Actual number of objects enqueued. + */ +static inline int __attribute__((always_inline)) +rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned n) +{ + if (r->prod.sp_enqueue) + return rte_ring_sp_enqueue_burst(r, obj_table, n); + else + return rte_ring_mp_enqueue_burst(r, obj_table, n); +} + +/** + * Dequeue several objects from a ring (multi-consumers safe). When the request + * objects are more than the available objects, only dequeue the actual number + * of objects + * + * This function uses a "compare and set" instruction to move the + * consumer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +static inline int __attribute__((always_inline)) +rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n) +{ + return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE); +} + +/** + * Dequeue several objects from a ring (NOT multi-consumers safe).When the + * request objects are more than the available objects, only dequeue the + * actual number of objects + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +static inline int __attribute__((always_inline)) +rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n) +{ + return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE); +} + +/** + * Dequeue multiple objects from a ring up to a maximum number. + * + * This function calls the multi-consumers or the single-consumer + * version, depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @return + * - Number of objects dequeued, or a negative error code on error + */ +static inline int __attribute__((always_inline)) +rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n) +{ + if (r->cons.sc_dequeue) + return rte_ring_sc_dequeue_burst(r, obj_table, n); + else + return rte_ring_mc_dequeue_burst(r, obj_table, n); +} + #ifdef __cplusplus } #endif