X-Git-Url: http://git.droids-corp.org/?a=blobdiff_plain;f=lib%2Flibrte_ring%2Frte_ring.h;h=b0f9860f0f5fb3e09757f40f86094e595df5bec6;hb=782a303ddce843352686cccabbbf7e66a87519af;hp=27bc60a4fb5b0a27b65e3353085d3a88e7a5743d;hpb=dada9ef6edc59015b6674b5a95258787c71401b0;p=dpdk.git diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index 27bc60a4fb..b0f9860f0f 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -101,6 +101,10 @@ extern "C" { #include #include +enum rte_ring_queue_behavior { + RTE_RING_QUEUE_FIXED = 0, /* Enq/Deq a fixed number of items from a ring */ + RTE_RING_QUEUE_VARIABLE /* Enq/Deq as many items a possible from ring */ +}; #ifdef RTE_LIBRTE_RING_DEBUG /** @@ -140,7 +144,6 @@ struct rte_ring { /** Ring producer status. */ struct prod { - volatile uint32_t bulk_default; /**< Default bulk count. */ uint32_t watermark; /**< Maximum items before EDQUOT. */ uint32_t sp_enqueue; /**< True, if single producer. */ uint32_t size; /**< Size of ring. */ @@ -151,7 +154,6 @@ struct rte_ring { /** Ring consumer status. */ struct cons { - volatile uint32_t bulk_default; /**< Default bulk count. */ uint32_t sc_dequeue; /**< True, if single consumer. */ uint32_t size; /**< Size of the ring. */ uint32_t mask; /**< Mask (size-1) of ring. */ @@ -170,9 +172,11 @@ struct rte_ring { #define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */ #define RING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */ +#define RTE_RING_QUOT_EXCEED (1 << 31) /**< Quota exceed for burst ops */ +#define RTE_RING_SZ_MASK (unsigned)(0x0fffffff) /**< Ring size mask */ /** - * When debug is enabled, store ring statistics. + * @internal When debug is enabled, store ring statistics. * @param r * A pointer to the ring. * @param name @@ -195,7 +199,7 @@ struct rte_ring { * * This function uses ``memzone_reserve()`` to allocate memory. Its size is * set to *count*, which must be a power of two. Water marking is - * disabled by default. The default bulk count is initialized to 1. + * disabled by default. * Note that the real usable ring size is *count-1* instead of * *count*. * @@ -229,45 +233,6 @@ struct rte_ring { struct rte_ring *rte_ring_create(const char *name, unsigned count, int socket_id, unsigned flags); -/** - * Set the default bulk count for enqueue/dequeue. - * - * The parameter *count* is the default number of bulk elements to - * get/put when using ``rte_ring_*_{en,de}queue_bulk()``. It must be - * greater than 0 and less than half of the ring size. - * - * @param r - * A pointer to the ring structure. - * @param count - * A new water mark value. - * @return - * - 0: Success; default_bulk_count changed. - * - -EINVAL: Invalid count value. - */ -static inline int -rte_ring_set_bulk_count(struct rte_ring *r, unsigned count) -{ - if (unlikely(count == 0 || count >= r->prod.size)) - return -EINVAL; - - r->prod.bulk_default = r->cons.bulk_default = count; - return 0; -} - -/** - * Get the default bulk count for enqueue/dequeue. - * - * @param r - * A pointer to the ring structure. - * @return - * The default bulk count for enqueue/dequeue. - */ -static inline unsigned -rte_ring_get_bulk_count(struct rte_ring *r) -{ - return r->prod.bulk_default; -} - /** * Change the high water mark. * @@ -275,7 +240,7 @@ rte_ring_get_bulk_count(struct rte_ring *r) * *count* value. The *count* value must be greater than 0 and less * than the ring size. * - * This function can be called at any time (not necessarilly at + * This function can be called at any time (not necessarily at * initialization). * * @param r @@ -297,7 +262,7 @@ int rte_ring_set_water_mark(struct rte_ring *r, unsigned count); void rte_ring_dump(const struct rte_ring *r); /** - * Enqueue several objects on the ring (multi-producers safe). + * @internal Enqueue several objects on the ring (multi-producers safe). * * This function uses a "compare and set" instruction to move the * producer index atomically. @@ -307,20 +272,27 @@ void rte_ring_dump(const struct rte_ring *r); * @param obj_table * A pointer to a table of void * pointers (objects). * @param n - * The number of objects to add in the ring from the obj_table. The - * value must be strictly positive. + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring * @return + * Depend on the behavior value + * if behavior = RTE_RING_QUEUE_FIXED * - 0: Success; objects enqueue. * - -EDQUOT: Quota exceeded. The objects have been enqueued, but the * high water mark is exceeded. * - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued. + * if behavior = RTE_RING_QUEUE_VARIABLE + * - n: Actual number of objects enqueued. */ static inline int -rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, - unsigned n) +__rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table, + unsigned n, enum rte_ring_queue_behavior behavior) { uint32_t prod_head, prod_next; uint32_t cons_tail, free_entries; + const unsigned max = n; int success; unsigned i; uint32_t mask = r->prod.mask; @@ -328,6 +300,9 @@ rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, /* move prod.head atomically */ do { + /* Reset n to the initial burst count */ + n = max; + prod_head = r->prod.head; cons_tail = r->cons.tail; /* The subtraction is done between two unsigned 32bits value @@ -338,8 +313,19 @@ rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, /* check that we have enough room in ring */ if (unlikely(n > free_entries)) { - __RING_STAT_ADD(r, enq_fail, n); - return -ENOBUFS; + if (behavior == RTE_RING_QUEUE_FIXED) { + __RING_STAT_ADD(r, enq_fail, n); + return -ENOBUFS; + } + else { + /* No free entry available */ + if (unlikely(free_entries == 0)) { + __RING_STAT_ADD(r, enq_fail, n); + return 0; + } + + n = free_entries; + } } prod_next = prod_head + n; @@ -352,13 +338,14 @@ rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, r->ring[(prod_head + i) & mask] = obj_table[i]; rte_wmb(); - /* return -EDQUOT if we exceed the watermark */ + /* if we exceed the watermark */ if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) { - ret = -EDQUOT; + ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT : + (int)(n | RTE_RING_QUOT_EXCEED); __RING_STAT_ADD(r, enq_quota, n); } else { - ret = 0; + ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n; __RING_STAT_ADD(r, enq_success, n); } @@ -374,24 +361,30 @@ rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, } /** - * Enqueue several objects on a ring (NOT multi-producers safe). + * @internal Enqueue several objects on a ring (NOT multi-producers safe). * * @param r * A pointer to the ring structure. * @param obj_table * A pointer to a table of void * pointers (objects). * @param n - * The number of objects to add in the ring from the obj_table. The - * value must be strictly positive. + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring * @return - * - 0: Success; objects enqueued. + * Depend on the behavior value + * if behavior = RTE_RING_QUEUE_FIXED + * - 0: Success; objects enqueue. * - -EDQUOT: Quota exceeded. The objects have been enqueued, but the * high water mark is exceeded. - * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. + * - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued. + * if behavior = RTE_RING_QUEUE_VARIABLE + * - n: Actual number of objects enqueued. */ static inline int -rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, - unsigned n) +__rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table, + unsigned n, enum rte_ring_queue_behavior behavior) { uint32_t prod_head, cons_tail; uint32_t prod_next, free_entries; @@ -409,8 +402,19 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, /* check that we have enough room in ring */ if (unlikely(n > free_entries)) { - __RING_STAT_ADD(r, enq_fail, n); - return -ENOBUFS; + if (behavior == RTE_RING_QUEUE_FIXED) { + __RING_STAT_ADD(r, enq_fail, n); + return -ENOBUFS; + } + else { + /* No free entry available */ + if (unlikely(free_entries == 0)) { + __RING_STAT_ADD(r, enq_fail, n); + return 0; + } + + n = free_entries; + } } prod_next = prod_head + n; @@ -421,13 +425,14 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, r->ring[(prod_head + i) & mask] = obj_table[i]; rte_wmb(); - /* return -EDQUOT if we exceed the watermark */ + /* if we exceed the watermark */ if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) { - ret = -EDQUOT; + ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT : + (int)(n | RTE_RING_QUOT_EXCEED); __RING_STAT_ADD(r, enq_quota, n); } else { - ret = 0; + ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n; __RING_STAT_ADD(r, enq_success, n); } @@ -435,6 +440,213 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, return ret; } +/** + * @internal Dequeue several objects from a ring (multi-consumers safe). When + * the request objects are more than the available objects, only dequeue the + * actual number of objects + * + * This function uses a "compare and set" instruction to move the + * consumer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring + * @return + * Depend on the behavior value + * if behavior = RTE_RING_QUEUE_FIXED + * - 0: Success; objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue; no object is + * dequeued. + * if behavior = RTE_RING_QUEUE_VARIABLE + * - n: Actual number of objects dequeued. + */ + +static inline int +__rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table, + unsigned n, enum rte_ring_queue_behavior behavior) +{ + uint32_t cons_head, prod_tail; + uint32_t cons_next, entries; + const unsigned max = n; + int success; + unsigned i; + uint32_t mask = r->prod.mask; + + /* move cons.head atomically */ + do { + /* Restore n as it may change every loop */ + n = max; + + cons_head = r->cons.head; + prod_tail = r->prod.tail; + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * cons_head > prod_tail). So 'entries' is always between 0 + * and size(ring)-1. */ + entries = (prod_tail - cons_head); + + /* Set the actual entries for dequeue */ + if (unlikely(n > entries)) { + if (behavior == RTE_RING_QUEUE_FIXED) { + __RING_STAT_ADD(r, deq_fail, n); + return -ENOENT; + } + else { + if (unlikely(entries == 0)){ + __RING_STAT_ADD(r, deq_fail, n); + return 0; + } + + n = entries; + } + } + + cons_next = cons_head + n; + success = rte_atomic32_cmpset(&r->cons.head, cons_head, + cons_next); + } while (unlikely(success == 0)); + + /* copy in table */ + rte_rmb(); + for (i = 0; likely(i < n); i++) { + obj_table[i] = r->ring[(cons_head + i) & mask]; + } + + /* + * If there are other dequeues in progress that preceded us, + * we need to wait for them to complete + */ + while (unlikely(r->cons.tail != cons_head)) + rte_pause(); + + __RING_STAT_ADD(r, deq_success, n); + r->cons.tail = cons_next; + + return behavior == RTE_RING_QUEUE_FIXED ? 0 : n; +} + +/** + * @internal Dequeue several objects from a ring (NOT multi-consumers safe). + * When the request objects are more than the available objects, only dequeue + * the actual number of objects + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring + * @return + * Depend on the behavior value + * if behavior = RTE_RING_QUEUE_FIXED + * - 0: Success; objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue; no object is + * dequeued. + * if behavior = RTE_RING_QUEUE_VARIABLE + * - n: Actual number of objects dequeued. + */ +static inline int +__rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table, + unsigned n, enum rte_ring_queue_behavior behavior) +{ + uint32_t cons_head, prod_tail; + uint32_t cons_next, entries; + unsigned i; + uint32_t mask = r->prod.mask; + + cons_head = r->cons.head; + prod_tail = r->prod.tail; + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * cons_head > prod_tail). So 'entries' is always between 0 + * and size(ring)-1. */ + entries = prod_tail - cons_head; + + if (unlikely(n > entries)) { + if (behavior == RTE_RING_QUEUE_FIXED) { + __RING_STAT_ADD(r, deq_fail, n); + return -ENOENT; + } + else { + if (unlikely(entries == 0)){ + __RING_STAT_ADD(r, deq_fail, n); + return 0; + } + + n = entries; + } + } + + cons_next = cons_head + n; + r->cons.head = cons_next; + + /* copy in table */ + rte_rmb(); + for (i = 0; likely(i < n); i++) { + obj_table[i] = r->ring[(cons_head + i) & mask]; + } + + __RING_STAT_ADD(r, deq_success, n); + r->cons.tail = cons_next; + return behavior == RTE_RING_QUEUE_FIXED ? 0 : n; +} + +/** + * Enqueue several objects on the ring (multi-producers safe). + * + * This function uses a "compare and set" instruction to move the + * producer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @return + * - 0: Success; objects enqueue. + * - -EDQUOT: Quota exceeded. The objects have been enqueued, but the + * high water mark is exceeded. + * - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued. + */ +static inline int +rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, + unsigned n) +{ + return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED); +} + +/** + * Enqueue several objects on a ring (NOT multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @return + * - 0: Success; objects enqueued. + * - -EDQUOT: Quota exceeded. The objects have been enqueued, but the + * high water mark is exceeded. + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. + */ +static inline int +rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, + unsigned n) +{ + return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED); +} + /** * Enqueue several objects on a ring. * @@ -542,8 +754,7 @@ rte_ring_enqueue(struct rte_ring *r, void *obj) * @param obj_table * A pointer to a table of void * pointers (objects) that will be filled. * @param n - * The number of objects to dequeue from the ring to the obj_table, - * must be strictly positive + * The number of objects to dequeue from the ring to the obj_table. * @return * - 0: Success; objects dequeued. * - -ENOENT: Not enough entries in the ring to dequeue; no object is @@ -552,49 +763,7 @@ rte_ring_enqueue(struct rte_ring *r, void *obj) static inline int rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n) { - uint32_t cons_head, prod_tail; - uint32_t cons_next, entries; - int success; - unsigned i; - uint32_t mask = r->prod.mask; - - /* move cons.head atomically */ - do { - cons_head = r->cons.head; - prod_tail = r->prod.tail; - /* The subtraction is done between two unsigned 32bits value - * (the result is always modulo 32 bits even if we have - * cons_head > prod_tail). So 'entries' is always between 0 - * and size(ring)-1. */ - entries = (prod_tail - cons_head); - - /* check that we have enough entries in ring */ - if (unlikely(n > entries)) { - __RING_STAT_ADD(r, deq_fail, n); - return -ENOENT; - } - - cons_next = cons_head + n; - success = rte_atomic32_cmpset(&r->cons.head, cons_head, - cons_next); - } while (unlikely(success == 0)); - - /* copy in table */ - rte_rmb(); - for (i = 0; likely(i < n); i++) { - obj_table[i] = r->ring[(cons_head + i) & mask]; - } - - /* - * If there are other dequeues in progress that preceeded us, - * we need to wait for them to complete - */ - while (unlikely(r->cons.tail != cons_head)) - rte_pause(); - - __RING_STAT_ADD(r, deq_success, n); - r->cons.tail = cons_next; - return 0; + return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED); } /** @@ -615,37 +784,7 @@ rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n) static inline int rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n) { - uint32_t cons_head, prod_tail; - uint32_t cons_next, entries; - unsigned i; - uint32_t mask = r->prod.mask; - - cons_head = r->cons.head; - prod_tail = r->prod.tail; - /* The subtraction is done between two unsigned 32bits value - * (the result is always modulo 32 bits even if we have - * cons_head > prod_tail). So 'entries' is always between 0 - * and size(ring)-1. */ - entries = prod_tail - cons_head; - - /* check that we have enough entries in ring */ - if (unlikely(n > entries)) { - __RING_STAT_ADD(r, deq_fail, n); - return -ENOENT; - } - - cons_next = cons_head + n; - r->cons.head = cons_next; - - /* copy in table */ - rte_rmb(); - for (i = 0; likely(i < n); i++) { - obj_table[i] = r->ring[(cons_head + i) & mask]; - } - - __RING_STAT_ADD(r, deq_success, n); - r->cons.tail = cons_next; - return 0; + return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED); } /** @@ -822,6 +961,141 @@ void rte_ring_list_dump(void); */ struct rte_ring *rte_ring_lookup(const char *name); +/** + * Enqueue several objects on the ring (multi-producers safe). + * + * This function uses a "compare and set" instruction to move the + * producer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @return + * - n: Actual number of objects enqueued. + */ +static inline int +rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned n) +{ + return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE); +} + +/** + * Enqueue several objects on a ring (NOT multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @return + * - n: Actual number of objects enqueued. + */ +static inline int +rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned n) +{ + return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE); +} + +/** + * Enqueue several objects on a ring. + * + * This function calls the multi-producer or the single-producer + * version depending on the default behavior that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @return + * - n: Actual number of objects enqueued. + */ +static inline int +rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned n) +{ + if (r->prod.sp_enqueue) + return rte_ring_sp_enqueue_burst(r, obj_table, n); + else + return rte_ring_mp_enqueue_burst(r, obj_table, n); +} + +/** + * Dequeue several objects from a ring (multi-consumers safe). When the request + * objects are more than the available objects, only dequeue the actual number + * of objects + * + * This function uses a "compare and set" instruction to move the + * consumer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +static inline int +rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n) +{ + return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE); +} + +/** + * Dequeue several objects from a ring (NOT multi-consumers safe).When the + * request objects are more than the available objects, only dequeue the + * actual number of objects + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +static inline int +rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n) +{ + return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE); +} + +/** + * Dequeue multiple objects from a ring up to a maximum number. + * + * This function calls the multi-consumers or the single-consumer + * version, depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @return + * - Number of objects dequeued, or a negative error code on error + */ +static inline int +rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n) +{ + if (r->cons.sc_dequeue) + return rte_ring_sc_dequeue_burst(r, obj_table, n); + else + return rte_ring_mc_dequeue_burst(r, obj_table, n); +} + #ifdef __cplusplus } #endif