if (ret < 0)
return -1;
mp->flags = 0x0000;
- ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(),
- RING_F_SP_ENQ | RING_F_SC_DEQ);
+ ring_client = rte_ring_create("SR0", RING_SIZE, rte_socket_id(), 0);
if (ring_client == NULL) {
printf("rte_ring_create SR0 failed");
return -1;
}
rte_eth_dev_probing_finish(eth_dev);
- ring_client->prod.single = 0;
- ring_client->cons.single = 0;
-
printf("\n***** flags = RTE_PDUMP_FLAG_TX *****\n");
for (itr = 0; itr < NUM_ITR; itr++) {
uint32_t entries = (prod_tail - cons_head);
uint32_t free_entries = (mask + cons_tail -prod_head);
+Producer/consumer synchronization modes
+---------------------------------------
+
+rte_ring supports different synchronization modes for producers and consumers.
+These modes can be specified at ring creation/init time via ``flags``
+parameter.
+That should help users to configure ring in the most suitable way for his
+specific usage scenarios.
+Currently supported modes:
+
+MP/MC (default one)
+~~~~~~~~~~~~~~~~~~~
+
+Multi-producer (/multi-consumer) mode. This is a default enqueue (/dequeue)
+mode for the ring. In this mode multiple threads can enqueue (/dequeue)
+objects to (/from) the ring. For 'classic' DPDK deployments (with one thread
+per core) this is usually the most suitable and fastest synchronization mode.
+As a well known limitation - it can perform quite pure on some overcommitted
+scenarios.
+
+SP/SC
+~~~~~
+Single-producer (/single-consumer) mode. In this mode only one thread at a time
+is allowed to enqueue (/dequeue) objects to (/from) the ring.
+
References
----------
rte_errno = EINVAL;
return -1;
}
- if (ring->prod.single || ring->cons.single) {
+ if (rte_ring_is_prod_single(ring) || rte_ring_is_cons_single(ring)) {
PDUMP_LOG(ERR, "ring with either SP or SC settings"
" is not valid for pdump, should have MP and MC settings\n");
rte_errno = EINVAL;
/* Check input parameters */
if ((conf == NULL) ||
(conf->ring == NULL) ||
- (conf->ring->cons.single && is_multi) ||
- (!(conf->ring->cons.single) && !is_multi)) {
+ (rte_ring_is_cons_single(conf->ring) && is_multi) ||
+ (!rte_ring_is_cons_single(conf->ring) && !is_multi)) {
RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
return NULL;
}
/* Check input parameters */
if ((conf == NULL) ||
(conf->ring == NULL) ||
- (conf->ring->prod.single && is_multi) ||
- (!(conf->ring->prod.single) && !is_multi) ||
+ (rte_ring_is_prod_single(conf->ring) && is_multi) ||
+ (!rte_ring_is_prod_single(conf->ring) && !is_multi) ||
(conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
return NULL;
/* Check input parameters */
if ((conf == NULL) ||
(conf->ring == NULL) ||
- (conf->ring->prod.single && is_multi) ||
- (!(conf->ring->prod.single) && !is_multi) ||
+ (rte_ring_is_prod_single(conf->ring) && is_multi) ||
+ (!rte_ring_is_prod_single(conf->ring) && !is_multi) ||
(conf->tx_burst_sz > RTE_PORT_IN_BURST_SIZE_MAX)) {
RTE_LOG(ERR, PORT, "%s: Invalid Parameters\n", __func__);
return NULL;
# install includes
SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \
+ rte_ring_core.h \
rte_ring_elem.h \
rte_ring_generic.h \
rte_ring_c11_mem.h
sources = files('rte_ring.c')
headers = files('rte_ring.h',
+ 'rte_ring_core.h',
'rte_ring_elem.h',
'rte_ring_c11_mem.h',
'rte_ring_generic.h')
if (ret < 0 || ret >= (int)sizeof(r->name))
return -ENAMETOOLONG;
r->flags = flags;
- r->prod.single = (flags & RING_F_SP_ENQ) ? __IS_SP : __IS_MP;
- r->cons.single = (flags & RING_F_SC_DEQ) ? __IS_SC : __IS_MC;
+ r->prod.sync_type = (flags & RING_F_SP_ENQ) ?
+ RTE_RING_SYNC_ST : RTE_RING_SYNC_MT;
+ r->cons.sync_type = (flags & RING_F_SC_DEQ) ?
+ RTE_RING_SYNC_ST : RTE_RING_SYNC_MT;
if (flags & RING_F_EXACT_SZ) {
r->size = rte_align32pow2(count + 1);
extern "C" {
#endif
-#include <stdio.h>
-#include <stdint.h>
-#include <sys/queue.h>
-#include <errno.h>
-#include <rte_common.h>
-#include <rte_config.h>
-#include <rte_memory.h>
-#include <rte_lcore.h>
-#include <rte_atomic.h>
-#include <rte_branch_prediction.h>
-#include <rte_memzone.h>
-#include <rte_pause.h>
-
-#define RTE_TAILQ_RING_NAME "RTE_RING"
-
-enum rte_ring_queue_behavior {
- RTE_RING_QUEUE_FIXED = 0, /* Enq/Deq a fixed number of items from a ring */
- RTE_RING_QUEUE_VARIABLE /* Enq/Deq as many items as possible from ring */
-};
-
-#define RTE_RING_MZ_PREFIX "RG_"
-/** The maximum length of a ring name. */
-#define RTE_RING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
- sizeof(RTE_RING_MZ_PREFIX) + 1)
-
-/* structure to hold a pair of head/tail values and other metadata */
-struct rte_ring_headtail {
- volatile uint32_t head; /**< Prod/consumer head. */
- volatile uint32_t tail; /**< Prod/consumer tail. */
- uint32_t single; /**< True if single prod/cons */
-};
-
-/**
- * An RTE ring structure.
- *
- * The producer and the consumer have a head and a tail index. The particularity
- * of these index is that they are not between 0 and size(ring). These indexes
- * are between 0 and 2^32, and we mask their value when we access the ring[]
- * field. Thanks to this assumption, we can do subtractions between 2 index
- * values in a modulo-32bit base: that's why the overflow of the indexes is not
- * a problem.
- */
-struct rte_ring {
- /*
- * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI
- * compatibility requirements, it could be changed to RTE_RING_NAMESIZE
- * next time the ABI changes
- */
- char name[RTE_MEMZONE_NAMESIZE] __rte_cache_aligned; /**< Name of the ring. */
- int flags; /**< Flags supplied at creation. */
- const struct rte_memzone *memzone;
- /**< Memzone, if any, containing the rte_ring */
- uint32_t size; /**< Size of ring. */
- uint32_t mask; /**< Mask (size-1) of ring. */
- uint32_t capacity; /**< Usable size of ring */
-
- char pad0 __rte_cache_aligned; /**< empty cache line */
-
- /** Ring producer status. */
- struct rte_ring_headtail prod __rte_cache_aligned;
- char pad1 __rte_cache_aligned; /**< empty cache line */
-
- /** Ring consumer status. */
- struct rte_ring_headtail cons __rte_cache_aligned;
- char pad2 __rte_cache_aligned; /**< empty cache line */
-};
-
-#define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */
-#define RING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */
-/**
- * Ring is to hold exactly requested number of entries.
- * Without this flag set, the ring size requested must be a power of 2, and the
- * usable space will be that size - 1. With the flag, the requested size will
- * be rounded up to the next power of two, but the usable space will be exactly
- * that requested. Worst case, if a power-of-2 size is requested, half the
- * ring space will be wasted.
- */
-#define RING_F_EXACT_SZ 0x0004
-#define RTE_RING_SZ_MASK (0x7fffffffU) /**< Ring size mask */
-
-/* @internal defines for passing to the enqueue dequeue worker functions */
-#define __IS_SP 1
-#define __IS_MP 0
-#define __IS_SC 1
-#define __IS_MC 0
+#include <rte_ring_core.h>
/**
* Calculate the memory size needed for a ring
unsigned int n, unsigned int *free_space)
{
return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- __IS_MP, free_space);
+ RTE_RING_SYNC_MT, free_space);
}
/**
unsigned int n, unsigned int *free_space)
{
return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- __IS_SP, free_space);
+ RTE_RING_SYNC_ST, free_space);
}
+#ifdef ALLOW_EXPERIMENTAL_API
+#include <rte_ring_elem.h>
+#endif
+
/**
* Enqueue several objects on a ring.
*
unsigned int n, unsigned int *free_space)
{
return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- r->prod.single, free_space);
+ r->prod.sync_type, free_space);
}
/**
unsigned int n, unsigned int *available)
{
return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- __IS_MC, available);
+ RTE_RING_SYNC_MT, available);
}
/**
unsigned int n, unsigned int *available)
{
return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- __IS_SC, available);
+ RTE_RING_SYNC_ST, available);
}
/**
unsigned int *available)
{
return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
- r->cons.single, available);
+ r->cons.sync_type, available);
}
/**
return r->capacity;
}
+/**
+ * Return sync type used by producer in the ring.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @return
+ * Producer sync type value.
+ */
+static inline enum rte_ring_sync_type
+rte_ring_get_prod_sync_type(const struct rte_ring *r)
+{
+ return r->prod.sync_type;
+}
+
+/**
+ * Check is the ring for single producer.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @return
+ * true if ring is SP, zero otherwise.
+ */
+static inline int
+rte_ring_is_prod_single(const struct rte_ring *r)
+{
+ return (rte_ring_get_prod_sync_type(r) == RTE_RING_SYNC_ST);
+}
+
+/**
+ * Return sync type used by consumer in the ring.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @return
+ * Consumer sync type value.
+ */
+static inline enum rte_ring_sync_type
+rte_ring_get_cons_sync_type(const struct rte_ring *r)
+{
+ return r->cons.sync_type;
+}
+
+/**
+ * Check is the ring for single consumer.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @return
+ * true if ring is SC, zero otherwise.
+ */
+static inline int
+rte_ring_is_cons_single(const struct rte_ring *r)
+{
+ return (rte_ring_get_cons_sync_type(r) == RTE_RING_SYNC_ST);
+}
+
/**
* Dump the status of all rings on the console
*
unsigned int n, unsigned int *free_space)
{
return __rte_ring_do_enqueue(r, obj_table, n,
- RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
+ RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, free_space);
}
/**
unsigned int n, unsigned int *free_space)
{
return __rte_ring_do_enqueue(r, obj_table, n,
- RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
+ RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, free_space);
}
/**
unsigned int n, unsigned int *free_space)
{
return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE,
- r->prod.single, free_space);
+ r->prod.sync_type, free_space);
}
/**
unsigned int n, unsigned int *available)
{
return __rte_ring_do_dequeue(r, obj_table, n,
- RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
+ RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, available);
}
/**
unsigned int n, unsigned int *available)
{
return __rte_ring_do_dequeue(r, obj_table, n,
- RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
+ RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, available);
}
/**
{
return __rte_ring_do_dequeue(r, obj_table, n,
RTE_RING_QUEUE_VARIABLE,
- r->cons.single, available);
+ r->cons.sync_type, available);
}
#ifdef __cplusplus
--- /dev/null
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2010-2020 Intel Corporation
+ * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
+ * All rights reserved.
+ * Derived from FreeBSD's bufring.h
+ * Used as BSD-3 Licensed with permission from Kip Macy.
+ */
+
+#ifndef _RTE_RING_CORE_H_
+#define _RTE_RING_CORE_H_
+
+/**
+ * @file
+ * This file contains definion of RTE ring structure itself,
+ * init flags and some related macros.
+ * For majority of DPDK entities, it is not recommended to include
+ * this file directly, use include <rte_ring.h> or <rte_ring_elem.h>
+ * instead.
+ */
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <stdio.h>
+#include <stdint.h>
+#include <string.h>
+#include <sys/queue.h>
+#include <errno.h>
+#include <rte_common.h>
+#include <rte_config.h>
+#include <rte_memory.h>
+#include <rte_lcore.h>
+#include <rte_atomic.h>
+#include <rte_branch_prediction.h>
+#include <rte_memzone.h>
+#include <rte_pause.h>
+#include <rte_debug.h>
+
+#define RTE_TAILQ_RING_NAME "RTE_RING"
+
+/** enqueue/dequeue behavior types */
+enum rte_ring_queue_behavior {
+ /** Enq/Deq a fixed number of items from a ring */
+ RTE_RING_QUEUE_FIXED = 0,
+ /** Enq/Deq as many items as possible from ring */
+ RTE_RING_QUEUE_VARIABLE
+};
+
+#define RTE_RING_MZ_PREFIX "RG_"
+/** The maximum length of a ring name. */
+#define RTE_RING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
+ sizeof(RTE_RING_MZ_PREFIX) + 1)
+
+/** prod/cons sync types */
+enum rte_ring_sync_type {
+ RTE_RING_SYNC_MT, /**< multi-thread safe (default mode) */
+ RTE_RING_SYNC_ST, /**< single thread only */
+};
+
+/**
+ * structures to hold a pair of head/tail values and other metadata.
+ * Depending on sync_type format of that structure might be different,
+ * but offset for *sync_type* and *tail* values should remain the same.
+ */
+struct rte_ring_headtail {
+ volatile uint32_t head; /**< prod/consumer head. */
+ volatile uint32_t tail; /**< prod/consumer tail. */
+ RTE_STD_C11
+ union {
+ /** sync type of prod/cons */
+ enum rte_ring_sync_type sync_type;
+ /** deprecated - True if single prod/cons */
+ uint32_t single;
+ };
+};
+
+/**
+ * An RTE ring structure.
+ *
+ * The producer and the consumer have a head and a tail index. The particularity
+ * of these index is that they are not between 0 and size(ring). These indexes
+ * are between 0 and 2^32, and we mask their value when we access the ring[]
+ * field. Thanks to this assumption, we can do subtractions between 2 index
+ * values in a modulo-32bit base: that's why the overflow of the indexes is not
+ * a problem.
+ */
+struct rte_ring {
+ /*
+ * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI
+ * compatibility requirements, it could be changed to RTE_RING_NAMESIZE
+ * next time the ABI changes
+ */
+ char name[RTE_MEMZONE_NAMESIZE] __rte_cache_aligned;
+ /**< Name of the ring. */
+ int flags; /**< Flags supplied at creation. */
+ const struct rte_memzone *memzone;
+ /**< Memzone, if any, containing the rte_ring */
+ uint32_t size; /**< Size of ring. */
+ uint32_t mask; /**< Mask (size-1) of ring. */
+ uint32_t capacity; /**< Usable size of ring */
+
+ char pad0 __rte_cache_aligned; /**< empty cache line */
+
+ /** Ring producer status. */
+ struct rte_ring_headtail prod __rte_cache_aligned;
+ char pad1 __rte_cache_aligned; /**< empty cache line */
+
+ /** Ring consumer status. */
+ struct rte_ring_headtail cons __rte_cache_aligned;
+ char pad2 __rte_cache_aligned; /**< empty cache line */
+};
+
+#define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */
+#define RING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */
+/**
+ * Ring is to hold exactly requested number of entries.
+ * Without this flag set, the ring size requested must be a power of 2, and the
+ * usable space will be that size - 1. With the flag, the requested size will
+ * be rounded up to the next power of two, but the usable space will be exactly
+ * that requested. Worst case, if a power-of-2 size is requested, half the
+ * ring space will be wasted.
+ */
+#define RING_F_EXACT_SZ 0x0004
+#define RTE_RING_SZ_MASK (0x7fffffffU) /**< Ring size mask */
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _RTE_RING_CORE_H_ */
extern "C" {
#endif
-#include <stdio.h>
-#include <stdint.h>
-#include <string.h>
-#include <sys/queue.h>
-#include <errno.h>
-#include <rte_common.h>
-#include <rte_config.h>
-#include <rte_memory.h>
-#include <rte_lcore.h>
-#include <rte_atomic.h>
-#include <rte_branch_prediction.h>
-#include <rte_memzone.h>
-#include <rte_pause.h>
-
-#include "rte_ring.h"
+#include <rte_ring_core.h>
/**
* @warning
unsigned int esize, unsigned int n, unsigned int *free_space)
{
return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
- RTE_RING_QUEUE_FIXED, __IS_MP, free_space);
+ RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_MT, free_space);
}
/**
unsigned int esize, unsigned int n, unsigned int *free_space)
{
return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
- RTE_RING_QUEUE_FIXED, __IS_SP, free_space);
+ RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_ST, free_space);
}
/**
unsigned int esize, unsigned int n, unsigned int *free_space)
{
return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
- RTE_RING_QUEUE_FIXED, r->prod.single, free_space);
+ RTE_RING_QUEUE_FIXED, r->prod.sync_type, free_space);
}
/**
unsigned int esize, unsigned int n, unsigned int *available)
{
return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
- RTE_RING_QUEUE_FIXED, __IS_MC, available);
+ RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_MT, available);
}
/**
unsigned int esize, unsigned int n, unsigned int *available)
{
return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
- RTE_RING_QUEUE_FIXED, __IS_SC, available);
+ RTE_RING_QUEUE_FIXED, RTE_RING_SYNC_ST, available);
}
/**
unsigned int esize, unsigned int n, unsigned int *available)
{
return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
- RTE_RING_QUEUE_FIXED, r->cons.single, available);
+ RTE_RING_QUEUE_FIXED, r->cons.sync_type, available);
}
/**
unsigned int esize, unsigned int n, unsigned int *free_space)
{
return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
- RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
+ RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, free_space);
}
/**
unsigned int esize, unsigned int n, unsigned int *free_space)
{
return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
- RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
+ RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, free_space);
}
/**
unsigned int esize, unsigned int n, unsigned int *free_space)
{
return __rte_ring_do_enqueue_elem(r, obj_table, esize, n,
- RTE_RING_QUEUE_VARIABLE, r->prod.single, free_space);
+ RTE_RING_QUEUE_VARIABLE, r->prod.sync_type, free_space);
}
/**
unsigned int esize, unsigned int n, unsigned int *available)
{
return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
- RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
+ RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_MT, available);
}
/**
unsigned int esize, unsigned int n, unsigned int *available)
{
return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
- RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
+ RTE_RING_QUEUE_VARIABLE, RTE_RING_SYNC_ST, available);
}
/**
{
return __rte_ring_do_dequeue_elem(r, obj_table, esize, n,
RTE_RING_QUEUE_VARIABLE,
- r->cons.single, available);
+ r->cons.sync_type, available);
}
+#include <rte_ring.h>
+
#ifdef __cplusplus
}
#endif