From cc4b218790f6e91336e51a27f1ee9ce3cc73dd85 Mon Sep 17 00:00:00 2001 From: Honnappa Nagarahalli Date: Sat, 18 Jan 2020 13:32:43 -0600 Subject: [PATCH] ring: support configurable element size Current APIs assume ring elements to be pointers. However, in many use cases, the size can be different. Add new APIs to support configurable ring element sizes. Signed-off-by: Honnappa Nagarahalli Reviewed-by: Dharmik Thakkar Reviewed-by: Gavin Hu Reviewed-by: Ruifeng Wang Acked-by: Olivier Matz --- lib/librte_ring/Makefile | 3 +- lib/librte_ring/meson.build | 4 + lib/librte_ring/rte_ring.c | 41 +- lib/librte_ring/rte_ring.h | 1 + lib/librte_ring/rte_ring_elem.h | 1003 ++++++++++++++++++++++++++ lib/librte_ring/rte_ring_version.map | 4 + 6 files changed, 1047 insertions(+), 9 deletions(-) create mode 100644 lib/librte_ring/rte_ring_elem.h diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index 22454b0848..917c560ad0 100644 --- a/lib/librte_ring/Makefile +++ b/lib/librte_ring/Makefile @@ -6,7 +6,7 @@ include $(RTE_SDK)/mk/rte.vars.mk # library name LIB = librte_ring.a -CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 +CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR) -O3 -DALLOW_EXPERIMENTAL_API LDLIBS += -lrte_eal EXPORT_MAP := rte_ring_version.map @@ -16,6 +16,7 @@ SRCS-$(CONFIG_RTE_LIBRTE_RING) := rte_ring.c # install includes SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \ + rte_ring_elem.h \ rte_ring_generic.h \ rte_ring_c11_mem.h diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build index ca8a435e90..f2f3ccc88b 100644 --- a/lib/librte_ring/meson.build +++ b/lib/librte_ring/meson.build @@ -3,5 +3,9 @@ sources = files('rte_ring.c') headers = files('rte_ring.h', + 'rte_ring_elem.h', 'rte_ring_c11_mem.h', 'rte_ring_generic.h') + +# rte_ring_create_elem and rte_ring_get_memsize_elem are experimental +allow_experimental_apis = true diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index d9b308036c..77e5de099b 100644 --- a/lib/librte_ring/rte_ring.c +++ b/lib/librte_ring/rte_ring.c @@ -33,6 +33,7 @@ #include #include "rte_ring.h" +#include "rte_ring_elem.h" TAILQ_HEAD(rte_ring_list, rte_tailq_entry); @@ -46,23 +47,38 @@ EAL_REGISTER_TAILQ(rte_ring_tailq) /* return the size of memory occupied by a ring */ ssize_t -rte_ring_get_memsize(unsigned count) +rte_ring_get_memsize_elem(unsigned int esize, unsigned int count) { ssize_t sz; + /* Check if element size is a multiple of 4B */ + if (esize % 4 != 0) { + RTE_LOG(ERR, RING, "element size is not a multiple of 4\n"); + + return -EINVAL; + } + /* count must be a power of 2 */ if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) { RTE_LOG(ERR, RING, - "Requested size is invalid, must be power of 2, and " - "do not exceed the size limit %u\n", RTE_RING_SZ_MASK); + "Requested number of elements is invalid, must be power of 2, and not exceed %u\n", + RTE_RING_SZ_MASK); + return -EINVAL; } - sz = sizeof(struct rte_ring) + count * sizeof(void *); + sz = sizeof(struct rte_ring) + count * esize; sz = RTE_ALIGN(sz, RTE_CACHE_LINE_SIZE); return sz; } +/* return the size of memory occupied by a ring */ +ssize_t +rte_ring_get_memsize(unsigned int count) +{ + return rte_ring_get_memsize_elem(sizeof(void *), count); +} + void rte_ring_reset(struct rte_ring *r) { @@ -114,10 +130,10 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count, return 0; } -/* create the ring */ +/* create the ring for a given element size */ struct rte_ring * -rte_ring_create(const char *name, unsigned count, int socket_id, - unsigned flags) +rte_ring_create_elem(const char *name, unsigned int esize, unsigned int count, + int socket_id, unsigned int flags) { char mz_name[RTE_MEMZONE_NAMESIZE]; struct rte_ring *r; @@ -135,7 +151,7 @@ rte_ring_create(const char *name, unsigned count, int socket_id, if (flags & RING_F_EXACT_SZ) count = rte_align32pow2(count + 1); - ring_size = rte_ring_get_memsize(count); + ring_size = rte_ring_get_memsize_elem(esize, count); if (ring_size < 0) { rte_errno = ring_size; return NULL; @@ -182,6 +198,15 @@ rte_ring_create(const char *name, unsigned count, int socket_id, return r; } +/* create the ring */ +struct rte_ring * +rte_ring_create(const char *name, unsigned int count, int socket_id, + unsigned int flags) +{ + return rte_ring_create_elem(name, sizeof(void *), count, socket_id, + flags); +} + /* free the ring */ void rte_ring_free(struct rte_ring *r) diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index 2a9f768a1c..18fc5d8451 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -216,6 +216,7 @@ int rte_ring_init(struct rte_ring *r, const char *name, unsigned count, */ struct rte_ring *rte_ring_create(const char *name, unsigned count, int socket_id, unsigned flags); + /** * De-allocate all memory used by the ring. * diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h new file mode 100644 index 0000000000..15d79bf2ac --- /dev/null +++ b/lib/librte_ring/rte_ring_elem.h @@ -0,0 +1,1003 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2019 Arm Limited + * Copyright (c) 2010-2017 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_ELEM_H_ +#define _RTE_RING_ELEM_H_ + +/** + * @file + * RTE Ring with user defined element size + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rte_ring.h" + +/** + * @warning + * @b EXPERIMENTAL: this API may change without prior notice + * + * Calculate the memory size needed for a ring with given element size + * + * This function returns the number of bytes needed for a ring, given + * the number of elements in it and the size of the element. This value + * is the sum of the size of the structure rte_ring and the size of the + * memory needed for storing the elements. The value is aligned to a cache + * line size. + * + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * @param count + * The number of elements in the ring (must be a power of 2). + * @return + * - The memory size needed for the ring on success. + * - -EINVAL - esize is not a multiple of 4 or count provided is not a + * power of 2. + */ +__rte_experimental +ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count); + +/** + * @warning + * @b EXPERIMENTAL: this API may change without prior notice + * + * Create a new ring named *name* that stores elements with given size. + * + * This function uses ``memzone_reserve()`` to allocate memory. Then it + * calls rte_ring_init() to initialize an empty ring. + * + * The new ring size is set to *count*, which must be a power of + * two. Water marking is disabled by default. The real usable ring size + * is *count-1* instead of *count* to differentiate a free ring from an + * empty ring. + * + * The ring is added in RTE_TAILQ_RING list. + * + * @param name + * The name of the ring. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * @param count + * The number of elements in the ring (must be a power of 2). + * @param socket_id + * The *socket_id* argument is the socket identifier in case of + * NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA + * constraint for the reserved zone. + * @param flags + * An OR of the following: + * - RING_F_SP_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "single-producer". Otherwise, it is "multi-producers". + * - RING_F_SC_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "single-consumer". Otherwise, it is "multi-consumers". + * @return + * On success, the pointer to the new allocated ring. NULL on error with + * rte_errno set appropriately. Possible errno values include: + * - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure + * - E_RTE_SECONDARY - function was called from a secondary process instance + * - EINVAL - esize is not a multiple of 4 or count provided is not a + * power of 2. + * - ENOSPC - the maximum number of memzones has already been allocated + * - EEXIST - a memzone with the same name already exists + * - ENOMEM - no appropriate memory area found in which to create memzone + */ +__rte_experimental +struct rte_ring *rte_ring_create_elem(const char *name, unsigned int esize, + unsigned int count, int socket_id, unsigned int flags); + +static __rte_always_inline void +enqueue_elems_32(struct rte_ring *r, const uint32_t size, uint32_t idx, + const void *obj_table, uint32_t n) +{ + unsigned int i; + uint32_t *ring = (uint32_t *)&r[1]; + const uint32_t *obj = (const uint32_t *)obj_table; + if (likely(idx + n < size)) { + for (i = 0; i < (n & ~0x7); i += 8, idx += 8) { + ring[idx] = obj[i]; + ring[idx + 1] = obj[i + 1]; + ring[idx + 2] = obj[i + 2]; + ring[idx + 3] = obj[i + 3]; + ring[idx + 4] = obj[i + 4]; + ring[idx + 5] = obj[i + 5]; + ring[idx + 6] = obj[i + 6]; + ring[idx + 7] = obj[i + 7]; + } + switch (n & 0x7) { + case 7: + ring[idx++] = obj[i++]; /* fallthrough */ + case 6: + ring[idx++] = obj[i++]; /* fallthrough */ + case 5: + ring[idx++] = obj[i++]; /* fallthrough */ + case 4: + ring[idx++] = obj[i++]; /* fallthrough */ + case 3: + ring[idx++] = obj[i++]; /* fallthrough */ + case 2: + ring[idx++] = obj[i++]; /* fallthrough */ + case 1: + ring[idx++] = obj[i++]; /* fallthrough */ + } + } else { + for (i = 0; idx < size; i++, idx++) + ring[idx] = obj[i]; + /* Start at the beginning */ + for (idx = 0; i < n; i++, idx++) + ring[idx] = obj[i]; + } +} + +static __rte_always_inline void +enqueue_elems_64(struct rte_ring *r, uint32_t prod_head, + const void *obj_table, uint32_t n) +{ + unsigned int i; + const uint32_t size = r->size; + uint32_t idx = prod_head & r->mask; + uint64_t *ring = (uint64_t *)&r[1]; + const uint64_t *obj = (const uint64_t *)obj_table; + if (likely(idx + n < size)) { + for (i = 0; i < (n & ~0x3); i += 4, idx += 4) { + ring[idx] = obj[i]; + ring[idx + 1] = obj[i + 1]; + ring[idx + 2] = obj[i + 2]; + ring[idx + 3] = obj[i + 3]; + } + switch (n & 0x3) { + case 3: + ring[idx++] = obj[i++]; /* fallthrough */ + case 2: + ring[idx++] = obj[i++]; /* fallthrough */ + case 1: + ring[idx++] = obj[i++]; + } + } else { + for (i = 0; idx < size; i++, idx++) + ring[idx] = obj[i]; + /* Start at the beginning */ + for (idx = 0; i < n; i++, idx++) + ring[idx] = obj[i]; + } +} + +static __rte_always_inline void +enqueue_elems_128(struct rte_ring *r, uint32_t prod_head, + const void *obj_table, uint32_t n) +{ + unsigned int i; + const uint32_t size = r->size; + uint32_t idx = prod_head & r->mask; + rte_int128_t *ring = (rte_int128_t *)&r[1]; + const rte_int128_t *obj = (const rte_int128_t *)obj_table; + if (likely(idx + n < size)) { + for (i = 0; i < (n & ~0x1); i += 2, idx += 2) + memcpy((void *)(ring + idx), + (const void *)(obj + i), 32); + switch (n & 0x1) { + case 1: + memcpy((void *)(ring + idx), + (const void *)(obj + i), 16); + } + } else { + for (i = 0; idx < size; i++, idx++) + memcpy((void *)(ring + idx), + (const void *)(obj + i), 16); + /* Start at the beginning */ + for (idx = 0; i < n; i++, idx++) + memcpy((void *)(ring + idx), + (const void *)(obj + i), 16); + } +} + +/* the actual enqueue of elements on the ring. + * Placed here since identical code needed in both + * single and multi producer enqueue functions. + */ +static __rte_always_inline void +enqueue_elems(struct rte_ring *r, uint32_t prod_head, const void *obj_table, + uint32_t esize, uint32_t num) +{ + /* 8B and 16B copies implemented individually to retain + * the current performance. + */ + if (esize == 8) + enqueue_elems_64(r, prod_head, obj_table, num); + else if (esize == 16) + enqueue_elems_128(r, prod_head, obj_table, num); + else { + uint32_t idx, scale, nr_idx, nr_num, nr_size; + + /* Normalize to uint32_t */ + scale = esize / sizeof(uint32_t); + nr_num = num * scale; + idx = prod_head & r->mask; + nr_idx = idx * scale; + nr_size = r->size * scale; + enqueue_elems_32(r, nr_size, nr_idx, obj_table, nr_num); + } +} + +static __rte_always_inline void +dequeue_elems_32(struct rte_ring *r, const uint32_t size, uint32_t idx, + void *obj_table, uint32_t n) +{ + unsigned int i; + uint32_t *ring = (uint32_t *)&r[1]; + uint32_t *obj = (uint32_t *)obj_table; + if (likely(idx + n < size)) { + for (i = 0; i < (n & ~0x7); i += 8, idx += 8) { + obj[i] = ring[idx]; + obj[i + 1] = ring[idx + 1]; + obj[i + 2] = ring[idx + 2]; + obj[i + 3] = ring[idx + 3]; + obj[i + 4] = ring[idx + 4]; + obj[i + 5] = ring[idx + 5]; + obj[i + 6] = ring[idx + 6]; + obj[i + 7] = ring[idx + 7]; + } + switch (n & 0x7) { + case 7: + obj[i++] = ring[idx++]; /* fallthrough */ + case 6: + obj[i++] = ring[idx++]; /* fallthrough */ + case 5: + obj[i++] = ring[idx++]; /* fallthrough */ + case 4: + obj[i++] = ring[idx++]; /* fallthrough */ + case 3: + obj[i++] = ring[idx++]; /* fallthrough */ + case 2: + obj[i++] = ring[idx++]; /* fallthrough */ + case 1: + obj[i++] = ring[idx++]; /* fallthrough */ + } + } else { + for (i = 0; idx < size; i++, idx++) + obj[i] = ring[idx]; + /* Start at the beginning */ + for (idx = 0; i < n; i++, idx++) + obj[i] = ring[idx]; + } +} + +static __rte_always_inline void +dequeue_elems_64(struct rte_ring *r, uint32_t prod_head, + void *obj_table, uint32_t n) +{ + unsigned int i; + const uint32_t size = r->size; + uint32_t idx = prod_head & r->mask; + uint64_t *ring = (uint64_t *)&r[1]; + uint64_t *obj = (uint64_t *)obj_table; + if (likely(idx + n < size)) { + for (i = 0; i < (n & ~0x3); i += 4, idx += 4) { + obj[i] = ring[idx]; + obj[i + 1] = ring[idx + 1]; + obj[i + 2] = ring[idx + 2]; + obj[i + 3] = ring[idx + 3]; + } + switch (n & 0x3) { + case 3: + obj[i++] = ring[idx++]; /* fallthrough */ + case 2: + obj[i++] = ring[idx++]; /* fallthrough */ + case 1: + obj[i++] = ring[idx++]; /* fallthrough */ + } + } else { + for (i = 0; idx < size; i++, idx++) + obj[i] = ring[idx]; + /* Start at the beginning */ + for (idx = 0; i < n; i++, idx++) + obj[i] = ring[idx]; + } +} + +static __rte_always_inline void +dequeue_elems_128(struct rte_ring *r, uint32_t prod_head, + void *obj_table, uint32_t n) +{ + unsigned int i; + const uint32_t size = r->size; + uint32_t idx = prod_head & r->mask; + rte_int128_t *ring = (rte_int128_t *)&r[1]; + rte_int128_t *obj = (rte_int128_t *)obj_table; + if (likely(idx + n < size)) { + for (i = 0; i < (n & ~0x1); i += 2, idx += 2) + memcpy((void *)(obj + i), (void *)(ring + idx), 32); + switch (n & 0x1) { + case 1: + memcpy((void *)(obj + i), (void *)(ring + idx), 16); + } + } else { + for (i = 0; idx < size; i++, idx++) + memcpy((void *)(obj + i), (void *)(ring + idx), 16); + /* Start at the beginning */ + for (idx = 0; i < n; i++, idx++) + memcpy((void *)(obj + i), (void *)(ring + idx), 16); + } +} + +/* the actual dequeue of elements from the ring. + * Placed here since identical code needed in both + * single and multi producer enqueue functions. + */ +static __rte_always_inline void +dequeue_elems(struct rte_ring *r, uint32_t cons_head, void *obj_table, + uint32_t esize, uint32_t num) +{ + /* 8B and 16B copies implemented individually to retain + * the current performance. + */ + if (esize == 8) + dequeue_elems_64(r, cons_head, obj_table, num); + else if (esize == 16) + dequeue_elems_128(r, cons_head, obj_table, num); + else { + uint32_t idx, scale, nr_idx, nr_num, nr_size; + + /* Normalize to uint32_t */ + scale = esize / sizeof(uint32_t); + nr_num = num * scale; + idx = cons_head & r->mask; + nr_idx = idx * scale; + nr_size = r->size * scale; + dequeue_elems_32(r, nr_size, nr_idx, obj_table, nr_num); + } +} + +/* Between load and load. there might be cpu reorder in weak model + * (powerpc/arm). + * There are 2 choices for the users + * 1.use rmb() memory barrier + * 2.use one-direction load_acquire/store_release barrier,defined by + * CONFIG_RTE_USE_C11_MEM_MODEL=y + * It depends on performance test results. + * By default, move common functions to rte_ring_generic.h + */ +#ifdef RTE_USE_C11_MEM_MODEL +#include "rte_ring_c11_mem.h" +#else +#include "rte_ring_generic.h" +#endif + +/** + * @internal Enqueue several objects on the ring + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param is_sp + * Indicates whether to use single producer or multi-producer head update + * @param free_space + * returns the amount of space after the enqueue operation has finished + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_enqueue_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, + enum rte_ring_queue_behavior behavior, unsigned int is_sp, + unsigned int *free_space) +{ + uint32_t prod_head, prod_next; + uint32_t free_entries; + + n = __rte_ring_move_prod_head(r, is_sp, n, behavior, + &prod_head, &prod_next, &free_entries); + if (n == 0) + goto end; + + enqueue_elems(r, prod_head, obj_table, esize, n); + + update_tail(&r->prod, prod_head, prod_next, is_sp, 1); +end: + if (free_space != NULL) + *free_space = free_entries - n; + return n; +} + +/** + * @internal Dequeue several objects from the ring + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to pull from the ring. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param is_sc + * Indicates whether to use single consumer or multi-consumer head update + * @param available + * returns the number of remaining ring entries after the dequeue has finished + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_dequeue_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, + enum rte_ring_queue_behavior behavior, unsigned int is_sc, + unsigned int *available) +{ + uint32_t cons_head, cons_next; + uint32_t entries; + + n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior, + &cons_head, &cons_next, &entries); + if (n == 0) + goto end; + + dequeue_elems(r, cons_head, obj_table, esize, n); + + update_tail(&r->cons, cons_head, cons_next, is_sc, 0); + +end: + if (available != NULL) + *available = entries - n; + return n; +} + +/** + * Enqueue several objects on the ring (multi-producers safe). + * + * This function uses a "compare and set" instruction to move the + * producer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_mp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, __IS_MP, free_space); +} + +/** + * Enqueue several objects on a ring + * + * @warning This API is NOT multi-producers safe + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, __IS_SP, free_space); +} + +/** + * Enqueue several objects on a ring. + * + * This function calls the multi-producer or the single-producer + * version depending on the default behavior that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, r->prod.single, free_space); +} + +/** + * Enqueue one object on a ring (multi-producers safe). + * + * This function uses a "compare and set" instruction to move the + * producer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj + * A pointer to the object to be added. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @return + * - 0: Success; objects enqueued. + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. + */ +static __rte_always_inline int +rte_ring_mp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize) +{ + return rte_ring_mp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 : + -ENOBUFS; +} + +/** + * Enqueue one object on a ring + * + * @warning This API is NOT multi-producers safe + * + * @param r + * A pointer to the ring structure. + * @param obj + * A pointer to the object to be added. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @return + * - 0: Success; objects enqueued. + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. + */ +static __rte_always_inline int +rte_ring_sp_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize) +{ + return rte_ring_sp_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 : + -ENOBUFS; +} + +/** + * Enqueue one object on a ring. + * + * This function calls the multi-producer or the single-producer + * version, depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj + * A pointer to the object to be added. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @return + * - 0: Success; objects enqueued. + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. + */ +static __rte_always_inline int +rte_ring_enqueue_elem(struct rte_ring *r, void *obj, unsigned int esize) +{ + return rte_ring_enqueue_bulk_elem(r, obj, esize, 1, NULL) ? 0 : + -ENOBUFS; +} + +/** + * Dequeue several objects from a ring (multi-consumers safe). + * + * This function uses a "compare and set" instruction to move the + * consumer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_mc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, __IS_MC, available); +} + +/** + * Dequeue several objects from a ring (NOT multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table, + * must be strictly positive. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_sc_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, __IS_SC, available); +} + +/** + * Dequeue several objects from a ring. + * + * This function calls the multi-consumers or the single-consumer + * version, depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +static __rte_always_inline unsigned int +rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, r->cons.single, available); +} + +/** + * Dequeue one object from a ring (multi-consumers safe). + * + * This function uses a "compare and set" instruction to move the + * consumer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_p + * A pointer to a void * pointer (object) that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @return + * - 0: Success; objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue; no object is + * dequeued. + */ +static __rte_always_inline int +rte_ring_mc_dequeue_elem(struct rte_ring *r, void *obj_p, + unsigned int esize) +{ + return rte_ring_mc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 : + -ENOENT; +} + +/** + * Dequeue one object from a ring (NOT multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_p + * A pointer to a void * pointer (object) that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @return + * - 0: Success; objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue, no object is + * dequeued. + */ +static __rte_always_inline int +rte_ring_sc_dequeue_elem(struct rte_ring *r, void *obj_p, + unsigned int esize) +{ + return rte_ring_sc_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 : + -ENOENT; +} + +/** + * Dequeue one object from a ring. + * + * This function calls the multi-consumers or the single-consumer + * version depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_p + * A pointer to a void * pointer (object) that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @return + * - 0: Success, objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue, no object is + * dequeued. + */ +static __rte_always_inline int +rte_ring_dequeue_elem(struct rte_ring *r, void *obj_p, unsigned int esize) +{ + return rte_ring_dequeue_bulk_elem(r, obj_p, esize, 1, NULL) ? 0 : + -ENOENT; +} + +/** + * Enqueue several objects on the ring (multi-producers safe). + * + * This function uses a "compare and set" instruction to move the + * producer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +static __rte_always_inline unsigned +rte_ring_mp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space); +} + +/** + * Enqueue several objects on a ring + * + * @warning This API is NOT multi-producers safe + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +static __rte_always_inline unsigned +rte_ring_sp_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space); +} + +/** + * Enqueue several objects on a ring. + * + * This function calls the multi-producer or the single-producer + * version depending on the default behavior that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +static __rte_always_inline unsigned +rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, r->prod.single, free_space); +} + +/** + * Dequeue several objects from a ring (multi-consumers safe). When the request + * objects are more than the available objects, only dequeue the actual number + * of objects + * + * This function uses a "compare and set" instruction to move the + * consumer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +static __rte_always_inline unsigned +rte_ring_mc_dequeue_burst_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, __IS_MC, available); +} + +/** + * Dequeue several objects from a ring (NOT multi-consumers safe).When the + * request objects are more than the available objects, only dequeue the + * actual number of objects + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +static __rte_always_inline unsigned +rte_ring_sc_dequeue_burst_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, __IS_SC, available); +} + +/** + * Dequeue multiple objects from a ring up to a maximum number. + * + * This function calls the multi-consumers or the single-consumer + * version, depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - Number of objects dequeued + */ +static __rte_always_inline unsigned int +rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, + r->cons.single, available); +} + +#ifdef __cplusplus +} +#endif + +#endif /* _RTE_RING_ELEM_H_ */ diff --git a/lib/librte_ring/rte_ring_version.map b/lib/librte_ring/rte_ring_version.map index 89d84bcf48..e88c143cfb 100644 --- a/lib/librte_ring/rte_ring_version.map +++ b/lib/librte_ring/rte_ring_version.map @@ -15,6 +15,10 @@ DPDK_20.0 { EXPERIMENTAL { global: + # added in 19.08 rte_ring_reset; + # added in 20.02 + rte_ring_create_elem; + rte_ring_get_memsize_elem; }; -- 2.20.1