ring: remove split cacheline build setting
[dpdk.git] / lib / librte_ring / rte_ring.h
index 39bacdd..399ae3b 100644 (file)
@@ -100,10 +100,13 @@ extern "C" {
 #include <rte_lcore.h>
 #include <rte_atomic.h>
 #include <rte_branch_prediction.h>
+#include <rte_memzone.h>
+
+#define RTE_TAILQ_RING_NAME "RTE_RING"
 
 enum rte_ring_queue_behavior {
        RTE_RING_QUEUE_FIXED = 0, /* Enq/Deq a fixed number of items from a ring */
-       RTE_RING_QUEUE_VARIABLE   /* Enq/Deq as many items a possible from ring */
+       RTE_RING_QUEUE_VARIABLE   /* Enq/Deq as many items as possible from ring */
 };
 
 #ifdef RTE_LIBRTE_RING_DEBUG
@@ -124,8 +127,25 @@ struct rte_ring_debug_stats {
 } __rte_cache_aligned;
 #endif
 
-#define RTE_RING_NAMESIZE 32 /**< The maximum length of a ring name. */
 #define RTE_RING_MZ_PREFIX "RG_"
+/**< The maximum length of a ring name. */
+#define RTE_RING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
+                          sizeof(RTE_RING_MZ_PREFIX) + 1)
+
+#ifndef RTE_RING_PAUSE_REP_COUNT
+#define RTE_RING_PAUSE_REP_COUNT 0 /**< Yield after pause num of times, no yield
+                                    *   if RTE_RING_PAUSE_REP not defined. */
+#endif
+
+struct rte_memzone; /* forward declaration, so as not to require memzone.h */
+
+#if RTE_CACHE_LINE_SIZE < 128
+#define PROD_ALIGN (RTE_CACHE_LINE_SIZE * 2)
+#define CONS_ALIGN (RTE_CACHE_LINE_SIZE * 2)
+#else
+#define PROD_ALIGN RTE_CACHE_LINE_SIZE
+#define CONS_ALIGN RTE_CACHE_LINE_SIZE
+#endif
 
 /**
  * An RTE ring structure.
@@ -138,8 +158,15 @@ struct rte_ring_debug_stats {
  * a problem.
  */
 struct rte_ring {
-       char name[RTE_RING_NAMESIZE];    /**< Name of the ring. */
+       /*
+        * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI
+        * compatibility requirements, it could be changed to RTE_RING_NAMESIZE
+        * next time the ABI changes
+        */
+       char name[RTE_MEMZONE_NAMESIZE];    /**< Name of the ring. */
        int flags;                       /**< Flags supplied at creation. */
+       const struct rte_memzone *memzone;
+                       /**< Memzone, if any, containing the rte_ring */
 
        /** Ring producer status. */
        struct prod {
@@ -149,7 +176,7 @@ struct rte_ring {
                uint32_t mask;           /**< Mask (size-1) of ring. */
                volatile uint32_t head;  /**< Producer head. */
                volatile uint32_t tail;  /**< Producer tail. */
-       } prod __rte_cache_aligned;
+       } prod __rte_aligned(PROD_ALIGN);
 
        /** Ring consumer status. */
        struct cons {
@@ -158,17 +185,13 @@ struct rte_ring {
                uint32_t mask;           /**< Mask (size-1) of ring. */
                volatile uint32_t head;  /**< Consumer head. */
                volatile uint32_t tail;  /**< Consumer tail. */
-#ifdef RTE_RING_SPLIT_PROD_CONS
-       } cons __rte_cache_aligned;
-#else
-       } cons;
-#endif
+       } cons __rte_aligned(CONS_ALIGN);
 
 #ifdef RTE_LIBRTE_RING_DEBUG
        struct rte_ring_debug_stats stats[RTE_MAX_LCORE];
 #endif
 
-       void * ring[0] __rte_cache_aligned; /**< Memory space of ring starts here.
+       void *ring[] __rte_cache_aligned;   /**< Memory space of ring starts here.
                                             * not volatile so need to be careful
                                             * about compiler re-ordering */
 };
@@ -286,7 +309,6 @@ int rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
  *    rte_errno set appropriately. Possible errno values include:
  *    - E_RTE_NO_CONFIG - function could not get pointer to rte_config structure
  *    - E_RTE_SECONDARY - function was called from a secondary process instance
- *    - E_RTE_NO_TAILQ - no tailq list could be got for the ring list
  *    - EINVAL - count provided is not a power of 2
  *    - ENOSPC - the maximum number of memzones has already been allocated
  *    - EEXIST - a memzone with the same name already exists
@@ -294,6 +316,13 @@ int rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
  */
 struct rte_ring *rte_ring_create(const char *name, unsigned count,
                                 int socket_id, unsigned flags);
+/**
+ * De-allocate all memory used by the ring.
+ *
+ * @param r
+ *   Ring to free
+ */
+void rte_ring_free(struct rte_ring *r);
 
 /**
  * Change the high water mark.
@@ -316,7 +345,7 @@ struct rte_ring *rte_ring_create(const char *name, unsigned count,
 int rte_ring_set_water_mark(struct rte_ring *r, unsigned count);
 
 /**
- * Dump the status of the ring to the console.
+ * Dump the status of the ring to a file.
  *
  * @param f
  *   A pointer to a file for output
@@ -410,10 +439,15 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
        uint32_t cons_tail, free_entries;
        const unsigned max = n;
        int success;
-       unsigned i;
+       unsigned i, rep = 0;
        uint32_t mask = r->prod.mask;
        int ret;
 
+       /* Avoid the unnecessary cmpset operation below, which is also
+        * potentially harmful when n equals 0. */
+       if (n == 0)
+               return 0;
+
        /* move prod.head atomically */
        do {
                /* Reset n to the initial burst count */
@@ -451,7 +485,7 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
 
        /* write entries in ring */
        ENQUEUE_PTRS();
-       rte_compiler_barrier();
+       rte_smp_wmb();
 
        /* if we exceed the watermark */
        if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
@@ -468,9 +502,18 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
         * If there are other enqueues in progress that preceded us,
         * we need to wait for them to complete
         */
-       while (unlikely(r->prod.tail != prod_head))
+       while (unlikely(r->prod.tail != prod_head)) {
                rte_pause();
 
+               /* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
+                * for other thread finish. It gives pre-empted thread a chance
+                * to proceed and finish with ring dequeue operation. */
+               if (RTE_RING_PAUSE_REP_COUNT &&
+                   ++rep == RTE_RING_PAUSE_REP_COUNT) {
+                       rep = 0;
+                       sched_yield();
+               }
+       }
        r->prod.tail = prod_next;
        return ret;
 }
@@ -537,7 +580,7 @@ __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
 
        /* write entries in ring */
        ENQUEUE_PTRS();
-       rte_compiler_barrier();
+       rte_smp_wmb();
 
        /* if we exceed the watermark */
        if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
@@ -589,9 +632,14 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
        uint32_t cons_next, entries;
        const unsigned max = n;
        int success;
-       unsigned i;
+       unsigned i, rep = 0;
        uint32_t mask = r->prod.mask;
 
+       /* Avoid the unnecessary cmpset operation below, which is also
+        * potentially harmful when n equals 0. */
+       if (n == 0)
+               return 0;
+
        /* move cons.head atomically */
        do {
                /* Restore n as it may change every loop */
@@ -628,15 +676,24 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
 
        /* copy in table */
        DEQUEUE_PTRS();
-       rte_compiler_barrier();
+       rte_smp_rmb();
 
        /*
         * If there are other dequeues in progress that preceded us,
         * we need to wait for them to complete
         */
-       while (unlikely(r->cons.tail != cons_head))
+       while (unlikely(r->cons.tail != cons_head)) {
                rte_pause();
 
+               /* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
+                * for other thread finish. It gives pre-empted thread a chance
+                * to proceed and finish with ring dequeue operation. */
+               if (RTE_RING_PAUSE_REP_COUNT &&
+                   ++rep == RTE_RING_PAUSE_REP_COUNT) {
+                       rep = 0;
+                       sched_yield();
+               }
+       }
        __RING_STAT_ADD(r, deq_success, n);
        r->cons.tail = cons_next;
 
@@ -703,7 +760,7 @@ __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
 
        /* copy in table */
        DEQUEUE_PTRS();
-       rte_compiler_barrier();
+       rte_smp_rmb();
 
        __RING_STAT_ADD(r, deq_success, n);
        r->cons.tail = cons_next;
@@ -1002,7 +1059,7 @@ rte_ring_full(const struct rte_ring *r)
 {
        uint32_t prod_tail = r->prod.tail;
        uint32_t cons_tail = r->cons.tail;
-       return (((cons_tail - prod_tail - 1) & r->prod.mask) == 0);
+       return ((cons_tail - prod_tail - 1) & r->prod.mask) == 0;
 }
 
 /**
@@ -1035,7 +1092,7 @@ rte_ring_count(const struct rte_ring *r)
 {
        uint32_t prod_tail = r->prod.tail;
        uint32_t cons_tail = r->cons.tail;
-       return ((prod_tail - cons_tail) & r->prod.mask);
+       return (prod_tail - cons_tail) & r->prod.mask;
 }
 
 /**
@@ -1051,7 +1108,21 @@ rte_ring_free_count(const struct rte_ring *r)
 {
        uint32_t prod_tail = r->prod.tail;
        uint32_t cons_tail = r->cons.tail;
-       return ((cons_tail - prod_tail - 1) & r->prod.mask);
+       return (cons_tail - prod_tail - 1) & r->prod.mask;
+}
+
+/**
+ * Return the size of the ring.
+ *
+ * @param r
+ *   A pointer to the ring structure.
+ * @return
+ *   The number of elements which can be stored in the ring.
+ */
+static inline unsigned int
+rte_ring_get_size(const struct rte_ring *r)
+{
+       return r->prod.size;
 }
 
 /**