/*-
* BSD LICENSE
*
- * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
+ * Copyright(c) 2010-2017 Intel Corporation. All rights reserved.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
enum rte_ring_queue_behavior {
RTE_RING_QUEUE_FIXED = 0, /* Enq/Deq a fixed number of items from a ring */
- RTE_RING_QUEUE_VARIABLE /* Enq/Deq as many items a possible from ring */
+ RTE_RING_QUEUE_VARIABLE /* Enq/Deq as many items as possible from ring */
};
#ifdef RTE_LIBRTE_RING_DEBUG
struct rte_memzone; /* forward declaration, so as not to require memzone.h */
+#if RTE_CACHE_LINE_SIZE < 128
+#define PROD_ALIGN (RTE_CACHE_LINE_SIZE * 2)
+#define CONS_ALIGN (RTE_CACHE_LINE_SIZE * 2)
+#else
+#define PROD_ALIGN RTE_CACHE_LINE_SIZE
+#define CONS_ALIGN RTE_CACHE_LINE_SIZE
+#endif
+
+/* structure to hold a pair of head/tail values and other metadata */
+struct rte_ring_headtail {
+ volatile uint32_t head; /**< Prod/consumer head. */
+ volatile uint32_t tail; /**< Prod/consumer tail. */
+ uint32_t single; /**< True if single prod/cons */
+};
+
/**
* An RTE ring structure.
*
* next time the ABI changes
*/
char name[RTE_MEMZONE_NAMESIZE]; /**< Name of the ring. */
- int flags; /**< Flags supplied at creation. */
+ int flags; /**< Flags supplied at creation. */
const struct rte_memzone *memzone;
/**< Memzone, if any, containing the rte_ring */
+ uint32_t size; /**< Size of ring. */
+ uint32_t mask; /**< Mask (size-1) of ring. */
+ uint32_t watermark; /**< Max items before EDQUOT in producer. */
/** Ring producer status. */
- struct prod {
- uint32_t watermark; /**< Maximum items before EDQUOT. */
- uint32_t sp_enqueue; /**< True, if single producer. */
- uint32_t size; /**< Size of ring. */
- uint32_t mask; /**< Mask (size-1) of ring. */
- volatile uint32_t head; /**< Producer head. */
- volatile uint32_t tail; /**< Producer tail. */
- } prod __rte_cache_aligned;
+ struct rte_ring_headtail prod __rte_aligned(PROD_ALIGN);
/** Ring consumer status. */
- struct cons {
- uint32_t sc_dequeue; /**< True, if single consumer. */
- uint32_t size; /**< Size of the ring. */
- uint32_t mask; /**< Mask (size-1) of ring. */
- volatile uint32_t head; /**< Consumer head. */
- volatile uint32_t tail; /**< Consumer tail. */
-#ifdef RTE_RING_SPLIT_PROD_CONS
- } cons __rte_cache_aligned;
-#else
- } cons;
-#endif
+ struct rte_ring_headtail cons __rte_aligned(CONS_ALIGN);
#ifdef RTE_LIBRTE_RING_DEBUG
struct rte_ring_debug_stats stats[RTE_MAX_LCORE];
int rte_ring_set_water_mark(struct rte_ring *r, unsigned count);
/**
- * Dump the status of the ring to the console.
+ * Dump the status of the ring to a file.
*
* @param f
* A pointer to a file for output
* Placed here since identical code needed in both
* single and multi producer enqueue functions */
#define ENQUEUE_PTRS() do { \
- const uint32_t size = r->prod.size; \
+ const uint32_t size = r->size; \
uint32_t idx = prod_head & mask; \
if (likely(idx + n < size)) { \
for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
* single and multi consumer dequeue functions */
#define DEQUEUE_PTRS() do { \
uint32_t idx = cons_head & mask; \
- const uint32_t size = r->cons.size; \
+ const uint32_t size = r->size; \
if (likely(idx + n < size)) { \
for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\
obj_table[i] = r->ring[idx]; \
const unsigned max = n;
int success;
unsigned i, rep = 0;
- uint32_t mask = r->prod.mask;
+ uint32_t mask = r->mask;
int ret;
/* Avoid the unnecessary cmpset operation below, which is also
rte_smp_wmb();
/* if we exceed the watermark */
- if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
+ if (unlikely(((mask + 1) - free_entries + n) > r->watermark)) {
ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
(int)(n | RTE_RING_QUOT_EXCEED);
__RING_STAT_ADD(r, enq_quota, n);
uint32_t prod_head, cons_tail;
uint32_t prod_next, free_entries;
unsigned i;
- uint32_t mask = r->prod.mask;
+ uint32_t mask = r->mask;
int ret;
prod_head = r->prod.head;
rte_smp_wmb();
/* if we exceed the watermark */
- if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
+ if (unlikely(((mask + 1) - free_entries + n) > r->watermark)) {
ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
(int)(n | RTE_RING_QUOT_EXCEED);
__RING_STAT_ADD(r, enq_quota, n);
const unsigned max = n;
int success;
unsigned i, rep = 0;
- uint32_t mask = r->prod.mask;
+ uint32_t mask = r->mask;
/* Avoid the unnecessary cmpset operation below, which is also
* potentially harmful when n equals 0. */
uint32_t cons_head, prod_tail;
uint32_t cons_next, entries;
unsigned i;
- uint32_t mask = r->prod.mask;
+ uint32_t mask = r->mask;
cons_head = r->cons.head;
prod_tail = r->prod.tail;
rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
unsigned n)
{
- if (r->prod.sp_enqueue)
+ if (r->prod.single)
return rte_ring_sp_enqueue_bulk(r, obj_table, n);
else
return rte_ring_mp_enqueue_bulk(r, obj_table, n);
static inline int __attribute__((always_inline))
rte_ring_enqueue(struct rte_ring *r, void *obj)
{
- if (r->prod.sp_enqueue)
+ if (r->prod.single)
return rte_ring_sp_enqueue(r, obj);
else
return rte_ring_mp_enqueue(r, obj);
static inline int __attribute__((always_inline))
rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
{
- if (r->cons.sc_dequeue)
+ if (r->cons.single)
return rte_ring_sc_dequeue_bulk(r, obj_table, n);
else
return rte_ring_mc_dequeue_bulk(r, obj_table, n);
static inline int __attribute__((always_inline))
rte_ring_dequeue(struct rte_ring *r, void **obj_p)
{
- if (r->cons.sc_dequeue)
+ if (r->cons.single)
return rte_ring_sc_dequeue(r, obj_p);
else
return rte_ring_mc_dequeue(r, obj_p);
{
uint32_t prod_tail = r->prod.tail;
uint32_t cons_tail = r->cons.tail;
- return ((cons_tail - prod_tail - 1) & r->prod.mask) == 0;
+ return ((cons_tail - prod_tail - 1) & r->mask) == 0;
}
/**
{
uint32_t prod_tail = r->prod.tail;
uint32_t cons_tail = r->cons.tail;
- return (prod_tail - cons_tail) & r->prod.mask;
+ return (prod_tail - cons_tail) & r->mask;
}
/**
{
uint32_t prod_tail = r->prod.tail;
uint32_t cons_tail = r->cons.tail;
- return (cons_tail - prod_tail - 1) & r->prod.mask;
+ return (cons_tail - prod_tail - 1) & r->mask;
+}
+
+/**
+ * Return the size of the ring.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @return
+ * The number of elements which can be stored in the ring.
+ */
+static inline unsigned int
+rte_ring_get_size(const struct rte_ring *r)
+{
+ return r->size;
}
/**
rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
unsigned n)
{
- if (r->prod.sp_enqueue)
+ if (r->prod.single)
return rte_ring_sp_enqueue_burst(r, obj_table, n);
else
return rte_ring_mp_enqueue_burst(r, obj_table, n);
static inline unsigned __attribute__((always_inline))
rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
{
- if (r->cons.sc_dequeue)
+ if (r->cons.single)
return rte_ring_sc_dequeue_burst(r, obj_table, n);
else
return rte_ring_mc_dequeue_burst(r, obj_table, n);