X-Git-Url: http://git.droids-corp.org/?a=blobdiff_plain;f=lib%2Flibrte_ring%2Frte_ring_elem.h;h=69dc51746c5533a53e3af2626693c1062f841367;hb=c47463272f572a4d78109763399b5e3817b848a9;hp=3976757edc0b3d07047bc64a5daaf82cbf0388b4;hpb=ae122e50799570dd276cd42e5f8aa71e37fe0c36;p=dpdk.git diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h index 3976757edc..69dc51746c 100644 --- a/lib/librte_ring/rte_ring_elem.h +++ b/lib/librte_ring/rte_ring_elem.h @@ -20,26 +20,9 @@ extern "C" { #endif -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "rte_ring.h" +#include /** - * @warning - * @b EXPERIMENTAL: this API may change without prior notice - * * Calculate the memory size needed for a ring with given element size * * This function returns the number of bytes needed for a ring, given @@ -57,13 +40,9 @@ extern "C" { * - -EINVAL - esize is not a multiple of 4 or count provided is not a * power of 2. */ -__rte_experimental ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count); /** - * @warning - * @b EXPERIMENTAL: this API may change without prior notice - * * Create a new ring named *name* that stores elements with given size. * * This function uses ``memzone_reserve()`` to allocate memory. Then it @@ -88,12 +67,30 @@ ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count); * constraint for the reserved zone. * @param flags * An OR of the following: - * - RING_F_SP_ENQ: If this flag is set, the default behavior when - * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` - * is "single-producer". Otherwise, it is "multi-producers". - * - RING_F_SC_DEQ: If this flag is set, the default behavior when - * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` - * is "single-consumer". Otherwise, it is "multi-consumers". + * - One of mutually exclusive flags that define producer behavior: + * - RING_F_SP_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "single-producer". + * - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "multi-producer RTS mode". + * - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "multi-producer HTS mode". + * If none of these flags is set, then default "multi-producer" + * behavior is selected. + * - One of mutually exclusive flags that define consumer behavior: + * - RING_F_SC_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "single-consumer". Otherwise, it is "multi-consumers". + * - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "multi-consumer RTS mode". + * - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "multi-consumer HTS mode". + * If none of these flags is set, then default "multi-consumer" + * behavior is selected. * @return * On success, the pointer to the new allocated ring. NULL on error with * rte_errno set appropriately. Possible errno values include: @@ -105,7 +102,6 @@ ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count); * - EEXIST - a memzone with the same name already exists * - ENOMEM - no appropriate memory area found in which to create memzone */ -__rte_experimental struct rte_ring *rte_ring_create_elem(const char *name, unsigned int esize, unsigned int count, int socket_id, unsigned int flags); @@ -160,7 +156,7 @@ __rte_ring_enqueue_elems_64(struct rte_ring *r, uint32_t prod_head, const uint32_t size = r->size; uint32_t idx = prod_head & r->mask; uint64_t *ring = (uint64_t *)&r[1]; - const uint64_t *obj = (const uint64_t *)obj_table; + const unaligned_uint64_t *obj = (const unaligned_uint64_t *)obj_table; if (likely(idx + n < size)) { for (i = 0; i < (n & ~0x3); i += 4, idx += 4) { ring[idx] = obj[i]; @@ -294,7 +290,7 @@ __rte_ring_dequeue_elems_64(struct rte_ring *r, uint32_t prod_head, const uint32_t size = r->size; uint32_t idx = prod_head & r->mask; uint64_t *ring = (uint64_t *)&r[1]; - uint64_t *obj = (uint64_t *)obj_table; + unaligned_uint64_t *obj = (unaligned_uint64_t *)obj_table; if (likely(idx + n < size)) { for (i = 0; i < (n & ~0x3); i += 4, idx += 4) { obj[i] = ring[idx]; @@ -510,7 +506,7 @@ rte_ring_mp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, unsigned int esize, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_FIXED, __IS_MP, free_space); + RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_MT, free_space); } /** @@ -539,9 +535,14 @@ rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, unsigned int esize, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_FIXED, __IS_SP, free_space); + RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_ST, free_space); } +#ifdef ALLOW_EXPERIMENTAL_API +#include +#include +#endif + /** * Enqueue several objects on a ring. * @@ -569,8 +570,28 @@ static __rte_always_inline unsigned int rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, unsigned int esize, unsigned int n, unsigned int *free_space) { - return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_FIXED, r->prod.single, free_space); + switch (r->prod.sync_type) { + case RTE_RING_SYNC_MT: + return rte_ring_mp_enqueue_bulk_elem(r, obj_table, esize, n, + free_space); + case RTE_RING_SYNC_ST: + return rte_ring_sp_enqueue_bulk_elem(r, obj_table, esize, n, + free_space); +#ifdef ALLOW_EXPERIMENTAL_API + case RTE_RING_SYNC_MT_RTS: + return rte_ring_mp_rts_enqueue_bulk_elem(r, obj_table, esize, n, + free_space); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mp_hts_enqueue_bulk_elem(r, obj_table, esize, n, + free_space); +#endif + } + + /* valid ring should never reach this point */ + RTE_ASSERT(0); + if (free_space != NULL) + *free_space = 0; + return 0; } /** @@ -675,7 +696,7 @@ rte_ring_mc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, unsigned int esize, unsigned int n, unsigned int *available) { return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_FIXED, __IS_MC, available); + RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_MT, available); } /** @@ -703,7 +724,7 @@ rte_ring_sc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, unsigned int esize, unsigned int n, unsigned int *available) { return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_FIXED, __IS_SC, available); + RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_ST, available); } /** @@ -733,8 +754,28 @@ static __rte_always_inline unsigned int rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, unsigned int esize, unsigned int n, unsigned int *available) { - return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_FIXED, r->cons.single, available); + switch (r->cons.sync_type) { + case RTE_RING_SYNC_MT: + return rte_ring_mc_dequeue_bulk_elem(r, obj_table, esize, n, + available); + case RTE_RING_SYNC_ST: + return rte_ring_sc_dequeue_bulk_elem(r, obj_table, esize, n, + available); +#ifdef ALLOW_EXPERIMENTAL_API + case RTE_RING_SYNC_MT_RTS: + return rte_ring_mc_rts_dequeue_bulk_elem(r, obj_table, esize, + n, available); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mc_hts_dequeue_bulk_elem(r, obj_table, esize, + n, available); +#endif + } + + /* valid ring should never reach this point */ + RTE_ASSERT(0); + if (available != NULL) + *available = 0; + return 0; } /** @@ -837,12 +878,12 @@ rte_ring_dequeue_elem(struct rte_ring *r, void *obj_p, unsigned int esize) * @return * - n: Actual number of objects enqueued. */ -static __rte_always_inline unsigned +static __rte_always_inline unsigned int rte_ring_mp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, unsigned int esize, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space); + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, free_space); } /** @@ -866,12 +907,12 @@ rte_ring_mp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, * @return * - n: Actual number of objects enqueued. */ -static __rte_always_inline unsigned +static __rte_always_inline unsigned int rte_ring_sp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, unsigned int esize, unsigned int n, unsigned int *free_space) { return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space); + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, free_space); } /** @@ -897,12 +938,32 @@ rte_ring_sp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, * @return * - n: Actual number of objects enqueued. */ -static __rte_always_inline unsigned +static __rte_always_inline unsigned int rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, unsigned int esize, unsigned int n, unsigned int *free_space) { - return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_VARIABLE, r->prod.single, free_space); + switch (r->prod.sync_type) { + case RTE_RING_SYNC_MT: + return rte_ring_mp_enqueue_burst_elem(r, obj_table, esize, n, + free_space); + case RTE_RING_SYNC_ST: + return rte_ring_sp_enqueue_burst_elem(r, obj_table, esize, n, + free_space); +#ifdef ALLOW_EXPERIMENTAL_API + case RTE_RING_SYNC_MT_RTS: + return rte_ring_mp_rts_enqueue_burst_elem(r, obj_table, esize, + n, free_space); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mp_hts_enqueue_burst_elem(r, obj_table, esize, + n, free_space); +#endif + } + + /* valid ring should never reach this point */ + RTE_ASSERT(0); + if (free_space != NULL) + *free_space = 0; + return 0; } /** @@ -929,12 +990,12 @@ rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, * @return * - n: Actual number of objects dequeued, 0 if ring is empty */ -static __rte_always_inline unsigned +static __rte_always_inline unsigned int rte_ring_mc_dequeue_burst_elem(struct rte_ring *r, void *obj_table, unsigned int esize, unsigned int n, unsigned int *available) { return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_VARIABLE, __IS_MC, available); + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, available); } /** @@ -958,12 +1019,12 @@ rte_ring_mc_dequeue_burst_elem(struct rte_ring *r, void *obj_table, * @return * - n: Actual number of objects dequeued, 0 if ring is empty */ -static __rte_always_inline unsigned +static __rte_always_inline unsigned int rte_ring_sc_dequeue_burst_elem(struct rte_ring *r, void *obj_table, unsigned int esize, unsigned int n, unsigned int *available) { return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_VARIABLE, __IS_SC, available); + RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, available); } /** @@ -993,11 +1054,36 @@ static __rte_always_inline unsigned int rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table, unsigned int esize, unsigned int n, unsigned int *available) { - return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, - RTE_RING_QUEUE_VARIABLE, - r->cons.single, available); + switch (r->cons.sync_type) { + case RTE_RING_SYNC_MT: + return rte_ring_mc_dequeue_burst_elem(r, obj_table, esize, n, + available); + case RTE_RING_SYNC_ST: + return rte_ring_sc_dequeue_burst_elem(r, obj_table, esize, n, + available); +#ifdef ALLOW_EXPERIMENTAL_API + case RTE_RING_SYNC_MT_RTS: + return rte_ring_mc_rts_dequeue_burst_elem(r, obj_table, esize, + n, available); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mc_hts_dequeue_burst_elem(r, obj_table, esize, + n, available); +#endif + } + + /* valid ring should never reach this point */ + RTE_ASSERT(0); + if (available != NULL) + *available = 0; + return 0; } +#ifdef ALLOW_EXPERIMENTAL_API +#include +#endif + +#include + #ifdef __cplusplus } #endif