X-Git-Url: http://git.droids-corp.org/?a=blobdiff_plain;ds=sidebyside;f=lib%2Flibrte_ring%2Frte_ring_elem.h;h=a5a4c46f9d15a54f5767be9fee5fb29963ab781d;hb=5c76111f068c4ef8aff4f5b4e2e641a21ff733a0;hp=15d79bf2ac0d5db171155e03ea793471e239bfe2;hpb=cc4b218790f6e91336e51a27f1ee9ce3cc73dd85;p=dpdk.git diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h index 15d79bf2ac..a5a4c46f9d 100644 --- a/lib/librte_ring/rte_ring_elem.h +++ b/lib/librte_ring/rte_ring_elem.h @@ -20,21 +20,7 @@ extern "C" { #endif -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "rte_ring.h" +#include /** * @warning @@ -88,12 +74,30 @@ ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count); * constraint for the reserved zone. * @param flags * An OR of the following: - * - RING_F_SP_ENQ: If this flag is set, the default behavior when - * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` - * is "single-producer". Otherwise, it is "multi-producers". - * - RING_F_SC_DEQ: If this flag is set, the default behavior when - * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` - * is "single-consumer". Otherwise, it is "multi-consumers". + * - One of mutually exclusive flags that define producer behavior: + * - RING_F_SP_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "single-producer". + * - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "multi-producer RTS mode". + * - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "multi-producer HTS mode". + * If none of these flags is set, then default "multi-producer" + * behavior is selected. + * - One of mutually exclusive flags that define consumer behavior: + * - RING_F_SC_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "single-consumer". Otherwise, it is "multi-consumers". + * - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "multi-consumer RTS mode". + * - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "multi-consumer HTS mode". + * If none of these flags is set, then default "multi-consumer" + * behavior is selected. * @return * On success, the pointer to the new allocated ring. NULL on error with * rte_errno set appropriately. Possible errno values include: @@ -110,8 +114,8 @@ struct rte_ring *rte_ring_create_elem(const char *name, unsigned int esize, unsigned int count, int socket_id, unsigned int flags); static __rte_always_inline void -enqueue_elems_32(struct rte_ring *r, const uint32_t size, uint32_t idx, - const void *obj_table, uint32_t n) +__rte_ring_enqueue_elems_32(struct rte_ring *r, const uint32_t size, + uint32_t idx, const void *obj_table, uint32_t n) { unsigned int i; uint32_t *ring = (uint32_t *)&r[1]; @@ -153,14 +157,14 @@ enqueue_elems_32(struct rte_ring *r, const uint32_t size, uint32_t idx, } static __rte_always_inline void -enqueue_elems_64(struct rte_ring *r, uint32_t prod_head, +__rte_ring_enqueue_elems_64(struct rte_ring *r, uint32_t prod_head, const void *obj_table, uint32_t n) { unsigned int i; const uint32_t size = r->size; uint32_t idx = prod_head & r->mask; uint64_t *ring = (uint64_t *)&r[1]; - const uint64_t *obj = (const uint64_t *)obj_table; + const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table; if (likely(idx + n < size)) { for (i = 0; i < (n & ~0x3); i += 4, idx += 4) { ring[idx] = obj[i]; @@ -186,7 +190,7 @@ enqueue_elems_64(struct rte_ring *r, uint32_t prod_head, } static __rte_always_inline void -enqueue_elems_128(struct rte_ring *r, uint32_t prod_head, +__rte_ring_enqueue_elems_128(struct rte_ring *r, uint32_t prod_head, const void *obj_table, uint32_t n) { unsigned int i; @@ -219,16 +223,16 @@ enqueue_elems_128(struct rte_ring *r, uint32_t prod_head, * single and multi producer enqueue functions. */ static __rte_always_inline void -enqueue_elems(struct rte_ring *r, uint32_t prod_head, const void *obj_table, - uint32_t esize, uint32_t num) +__rte_ring_enqueue_elems(struct rte_ring *r, uint32_t prod_head, + const void *obj_table, uint32_t esize, uint32_t num) { /* 8B and 16B copies implemented individually to retain * the current performance. */ if (esize == 8) - enqueue_elems_64(r, prod_head, obj_table, num); + __rte_ring_enqueue_elems_64(r, prod_head, obj_table, num); else if (esize == 16) - enqueue_elems_128(r, prod_head, obj_table, num); + __rte_ring_enqueue_elems_128(r, prod_head, obj_table, num); else { uint32_t idx, scale, nr_idx, nr_num, nr_size; @@ -238,13 +242,14 @@ enqueue_elems(struct rte_ring *r, uint32_t prod_head, const void *obj_table, idx = prod_head & r->mask; nr_idx = idx * scale; nr_size = r->size * scale; - enqueue_elems_32(r, nr_size, nr_idx, obj_table, nr_num); + __rte_ring_enqueue_elems_32(r, nr_size, nr_idx, + obj_table, nr_num); } } static __rte_always_inline void -dequeue_elems_32(struct rte_ring *r, const uint32_t size, uint32_t idx, - void *obj_table, uint32_t n) +__rte_ring_dequeue_elems_32(struct rte_ring *r, const uint32_t size, + uint32_t idx, void *obj_table, uint32_t n) { unsigned int i; uint32_t *ring = (uint32_t *)&r[1]; @@ -286,14 +291,14 @@ dequeue_elems_32(struct rte_ring *r, const uint32_t size, uint32_t idx, } static __rte_always_inline void -dequeue_elems_64(struct rte_ring *r, uint32_t prod_head, +__rte_ring_dequeue_elems_64(struct rte_ring *r, uint32_t prod_head, void *obj_table, uint32_t n) { unsigned int i; const uint32_t size = r->size; uint32_t idx = prod_head & r->mask; uint64_t *ring = (uint64_t *)&r[1]; - uint64_t *obj = (uint64_t *)obj_table; + unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table; if (likely(idx + n < size)) { for (i = 0; i < (n & ~0x3); i += 4, idx += 4) { obj[i] = ring[idx]; @@ -319,7 +324,7 @@ dequeue_elems_64(struct rte_ring *r, uint32_t prod_head, } static __rte_always_inline void -dequeue_elems_128(struct rte_ring *r, uint32_t prod_head, +__rte_ring_dequeue_elems_128(struct rte_ring *r, uint32_t prod_head, void *obj_table, uint32_t n) { unsigned int i; @@ -348,16 +353,16 @@ dequeue_elems_128(struct rte_ring *r, uint32_t prod_head, * single and multi producer enqueue functions. */ static __rte_always_inline void -dequeue_elems(struct rte_ring *r, uint32_t cons_head, void *obj_table, - uint32_t esize, uint32_t num) +__rte_ring_dequeue_elems(struct rte_ring *r, uint32_t cons_head, + void *obj_table, uint32_t esize, uint32_t num) { /* 8B and 16B copies implemented individually to retain * the current performance. */ if (esize == 8) - dequeue_elems_64(r, cons_head, obj_table, num); + __rte_ring_dequeue_elems_64(r, cons_head, obj_table, num); else if (esize == 16) - dequeue_elems_128(r, cons_head, obj_table, num); + __rte_ring_dequeue_elems_128(r, cons_head, obj_table, num); else { uint32_t idx, scale, nr_idx, nr_num, nr_size; @@ -367,7 +372,8 @@ dequeue_elems(struct rte_ring *r, uint32_t cons_head, void *obj_table, idx = cons_head & r->mask; nr_idx = idx * scale; nr_size = r->size * scale; - dequeue_elems_32(r, nr_size, nr_idx, obj_table, nr_num); + __rte_ring_dequeue_elems_32(r, nr_size, nr_idx, + obj_table, nr_num); } } @@ -392,7 +398,7 @@ dequeue_elems(struct rte_ring *r, uint32_t cons_head, void *obj_table, * @param r * A pointer to the ring structure. * @param obj_table - * A pointer to a table of void * pointers (objects). + * A pointer to a table of objects. * @param esize * The size of ring element, in bytes. It must be a multiple of 4. * This must be the same value used while creating the ring. Otherwise @@ -424,7 +430,7 @@ __rte_ring_do_enqueue_elem(struct rte_ring *r, const void *obj_table, if (n == 0) goto end; - enqueue_elems(r, prod_head, obj_table, esize, n); + __rte_ring_enqueue_elems(r, prod_head, obj_table, esize, n); update_tail(&r->prod, prod_head, prod_next, is_sp, 1); end: @@ -439,7 +445,7 @@ end: * @param r * A pointer to the ring structure. * @param obj_table - * A pointer to a table of void * pointers (objects). + * A pointer to a table of objects. * @param esize * The size of ring element, in bytes. It must be a multiple of 4. * This must be the same value used while creating the ring. Otherwise @@ -471,7 +477,7 @@ __rte_ring_do_dequeue_elem(struct rte_ring *r, void *obj_table, if (n == 0) goto end; - dequeue_elems(r, cons_head, obj_table, esize, n); + __rte_ring_dequeue_elems(r, cons_head, obj_table, esize, n); update_tail(&r->cons, cons_head, cons_next, is_sc, 0); @@ -490,7 +496,7 @@ end: * @param r * A pointer to the ring structure. * @param obj_table - * A pointer to a table of void * pointers (objects). + * A pointer to a table of objects. * @param esize * The size of ring element, in bytes. It must be a multiple of 4. * This must be the same value used while creating the ring. Otherwise @@ -508,7 +514,7 @@ rte_ring_mp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, unsigned int esize, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_FIXED, __IS_MP, free_space); + RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_MT, free_space); } /** @@ -519,7 +525,7 @@ rte_ring_mp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, * @param r * A pointer to the ring structure. * @param obj_table - * A pointer to a table of void * pointers (objects). + * A pointer to a table of objects. * @param esize * The size of ring element, in bytes. It must be a multiple of 4. * This must be the same value used while creating the ring. Otherwise @@ -537,9 +543,14 @@ rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, unsigned int esize, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_FIXED, __IS_SP, free_space); + RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_ST, free_space); } +#ifdef ALLOW_EXPERIMENTAL_API +#include +#include +#endif + /** * Enqueue several objects on a ring. * @@ -550,7 +561,7 @@ rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, * @param r * A pointer to the ring structure. * @param obj_table - * A pointer to a table of void * pointers (objects). + * A pointer to a table of objects. * @param esize * The size of ring element, in bytes. It must be a multiple of 4. * This must be the same value used while creating the ring. Otherwise @@ -568,7 +579,30 @@ rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, unsigned int esize, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_FIXED, r->prod.single, free_space); + RTE_RING_QUEUE_FIXED, r->prod.sync_type, free_space); + + switch (r->prod.sync_type) { + case RTE_RING_SYNC_MT: + return rte_ring_mp_enqueue_bulk_elem(r, obj_table, esize, n, + free_space); + case RTE_RING_SYNC_ST: + return rte_ring_sp_enqueue_bulk_elem(r, obj_table, esize, n, + free_space); +#ifdef ALLOW_EXPERIMENTAL_API + case RTE_RING_SYNC_MT_RTS: + return rte_ring_mp_rts_enqueue_bulk_elem(r, obj_table, esize, n, + free_space); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mp_hts_enqueue_bulk_elem(r, obj_table, esize, n, + free_space); +#endif + } + + /* valid ring should never reach this point */ + RTE_ASSERT(0); + if (free_space != NULL) + *free_space = 0; + return 0; } /** @@ -655,7 +689,7 @@ rte_ring_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize) * @param r * A pointer to the ring structure. * @param obj_table - * A pointer to a table of void * pointers (objects) that will be filled. + * A pointer to a table of objects that will be filled. * @param esize * The size of ring element, in bytes. It must be a multiple of 4. * This must be the same value used while creating the ring. Otherwise @@ -673,7 +707,7 @@ rte_ring_mc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, unsigned int esize, unsigned int n, unsigned int *available) { return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_FIXED, __IS_MC, available); + RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_MT, available); } /** @@ -682,7 +716,7 @@ rte_ring_mc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, * @param r * A pointer to the ring structure. * @param obj_table - * A pointer to a table of void * pointers (objects) that will be filled. + * A pointer to a table of objects that will be filled. * @param esize * The size of ring element, in bytes. It must be a multiple of 4. * This must be the same value used while creating the ring. Otherwise @@ -701,7 +735,7 @@ rte_ring_sc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, unsigned int esize, unsigned int n, unsigned int *available) { return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_FIXED, __IS_SC, available); + RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_ST, available); } /** @@ -714,7 +748,7 @@ rte_ring_sc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, * @param r * A pointer to the ring structure. * @param obj_table - * A pointer to a table of void * pointers (objects) that will be filled. + * A pointer to a table of objects that will be filled. * @param esize * The size of ring element, in bytes. It must be a multiple of 4. * This must be the same value used while creating the ring. Otherwise @@ -731,8 +765,28 @@ static __rte_always_inline unsigned int rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, unsigned int esize, unsigned int n, unsigned int *available) { - return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_FIXED, r->cons.single, available); + switch (r->cons.sync_type) { + case RTE_RING_SYNC_MT: + return rte_ring_mc_dequeue_bulk_elem(r, obj_table, esize, n, + available); + case RTE_RING_SYNC_ST: + return rte_ring_sc_dequeue_bulk_elem(r, obj_table, esize, n, + available); +#ifdef ALLOW_EXPERIMENTAL_API + case RTE_RING_SYNC_MT_RTS: + return rte_ring_mc_rts_dequeue_bulk_elem(r, obj_table, esize, + n, available); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mc_hts_dequeue_bulk_elem(r, obj_table, esize, + n, available); +#endif + } + + /* valid ring should never reach this point */ + RTE_ASSERT(0); + if (available != NULL) + *available = 0; + return 0; } /** @@ -744,7 +798,7 @@ rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, * @param r * A pointer to the ring structure. * @param obj_p - * A pointer to a void * pointer (object) that will be filled. + * A pointer to the object that will be filled. * @param esize * The size of ring element, in bytes. It must be a multiple of 4. * This must be the same value used while creating the ring. Otherwise @@ -768,7 +822,7 @@ rte_ring_mc_dequeue_elem(struct rte_ring *r, void *obj_p, * @param r * A pointer to the ring structure. * @param obj_p - * A pointer to a void * pointer (object) that will be filled. + * A pointer to the object that will be filled. * @param esize * The size of ring element, in bytes. It must be a multiple of 4. * This must be the same value used while creating the ring. Otherwise @@ -796,7 +850,7 @@ rte_ring_sc_dequeue_elem(struct rte_ring *r, void *obj_p, * @param r * A pointer to the ring structure. * @param obj_p - * A pointer to a void * pointer (object) that will be filled. + * A pointer to the object that will be filled. * @param esize * The size of ring element, in bytes. It must be a multiple of 4. * This must be the same value used while creating the ring. Otherwise @@ -822,7 +876,7 @@ rte_ring_dequeue_elem(struct rte_ring *r, void *obj_p, unsigned int esize) * @param r * A pointer to the ring structure. * @param obj_table - * A pointer to a table of void * pointers (objects). + * A pointer to a table of objects. * @param esize * The size of ring element, in bytes. It must be a multiple of 4. * This must be the same value used while creating the ring. Otherwise @@ -840,7 +894,7 @@ rte_ring_mp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, unsigned int esize, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space); + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, free_space); } /** @@ -851,7 +905,7 @@ rte_ring_mp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, * @param r * A pointer to the ring structure. * @param obj_table - * A pointer to a table of void * pointers (objects). + * A pointer to a table of objects. * @param esize * The size of ring element, in bytes. It must be a multiple of 4. * This must be the same value used while creating the ring. Otherwise @@ -869,7 +923,7 @@ rte_ring_sp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, unsigned int esize, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space); + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, free_space); } /** @@ -882,7 +936,7 @@ rte_ring_sp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, * @param r * A pointer to the ring structure. * @param obj_table - * A pointer to a table of void * pointers (objects). + * A pointer to a table of objects. * @param esize * The size of ring element, in bytes. It must be a multiple of 4. * This must be the same value used while creating the ring. Otherwise @@ -899,8 +953,28 @@ static __rte_always_inline unsigned rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, unsigned int esize, unsigned int n, unsigned int *free_space) { - return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_VARIABLE, r->prod.single, free_space); + switch (r->prod.sync_type) { + case RTE_RING_SYNC_MT: + return rte_ring_mp_enqueue_burst_elem(r, obj_table, esize, n, + free_space); + case RTE_RING_SYNC_ST: + return rte_ring_sp_enqueue_burst_elem(r, obj_table, esize, n, + free_space); +#ifdef ALLOW_EXPERIMENTAL_API + case RTE_RING_SYNC_MT_RTS: + return rte_ring_mp_rts_enqueue_burst_elem(r, obj_table, esize, + n, free_space); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mp_hts_enqueue_burst_elem(r, obj_table, esize, + n, free_space); +#endif + } + + /* valid ring should never reach this point */ + RTE_ASSERT(0); + if (free_space != NULL) + *free_space = 0; + return 0; } /** @@ -914,7 +988,7 @@ rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, * @param r * A pointer to the ring structure. * @param obj_table - * A pointer to a table of void * pointers (objects) that will be filled. + * A pointer to a table of objects that will be filled. * @param esize * The size of ring element, in bytes. It must be a multiple of 4. * This must be the same value used while creating the ring. Otherwise @@ -932,7 +1006,7 @@ rte_ring_mc_dequeue_burst_elem(struct rte_ring *r, void *obj_table, unsigned int esize, unsigned int n, unsigned int *available) { return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_VARIABLE, __IS_MC, available); + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, available); } /** @@ -943,7 +1017,7 @@ rte_ring_mc_dequeue_burst_elem(struct rte_ring *r, void *obj_table, * @param r * A pointer to the ring structure. * @param obj_table - * A pointer to a table of void * pointers (objects) that will be filled. + * A pointer to a table of objects that will be filled. * @param esize * The size of ring element, in bytes. It must be a multiple of 4. * This must be the same value used while creating the ring. Otherwise @@ -961,7 +1035,7 @@ rte_ring_sc_dequeue_burst_elem(struct rte_ring *r, void *obj_table, unsigned int esize, unsigned int n, unsigned int *available) { return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_VARIABLE, __IS_SC, available); + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, available); } /** @@ -974,7 +1048,7 @@ rte_ring_sc_dequeue_burst_elem(struct rte_ring *r, void *obj_table, * @param r * A pointer to the ring structure. * @param obj_table - * A pointer to a table of void * pointers (objects) that will be filled. + * A pointer to a table of objects that will be filled. * @param esize * The size of ring element, in bytes. It must be a multiple of 4. * This must be the same value used while creating the ring. Otherwise @@ -991,11 +1065,36 @@ static __rte_always_inline unsigned int rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table, unsigned int esize, unsigned int n, unsigned int *available) { - return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_VARIABLE, - r->cons.single, available); + switch (r->cons.sync_type) { + case RTE_RING_SYNC_MT: + return rte_ring_mc_dequeue_burst_elem(r, obj_table, esize, n, + available); + case RTE_RING_SYNC_ST: + return rte_ring_sc_dequeue_burst_elem(r, obj_table, esize, n, + available); +#ifdef ALLOW_EXPERIMENTAL_API + case RTE_RING_SYNC_MT_RTS: + return rte_ring_mc_rts_dequeue_burst_elem(r, obj_table, esize, + n, available); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mc_hts_dequeue_burst_elem(r, obj_table, esize, + n, available); +#endif + } + + /* valid ring should never reach this point */ + RTE_ASSERT(0); + if (available != NULL) + *available = 0; + return 0; } +#ifdef ALLOW_EXPERIMENTAL_API +#include +#endif + +#include + #ifdef __cplusplus } #endif