The stack library provides the following basic operations:
* Create a uniquely named stack of a user-specified size and using a
- user-specified socket.
+ user-specified socket, with either standard (lock-based) or lock-free
+ behavior.
* Push and pop a burst of one or more stack objects (pointers). These function
are multi-threading safe.
Implementation
~~~~~~~~~~~~~~
-The stack consists of a contiguous array of pointers, a current index, and a
-spinlock. Accesses to the stack are made multi-thread safe by the spinlock.
+The library supports two types of stacks: standard (lock-based) and lock-free.
+Both types use the same set of interfaces, but their implementations differ.
+
+Lock-based Stack
+----------------
+
+The lock-based stack consists of a contiguous array of pointers, a current
+index, and a spinlock. Accesses to the stack are made multi-thread safe by the
+spinlock.
+
+Lock-free Stack
+------------------
+
+The lock-free stack consists of a linked list of elements, each containing a
+data pointer and a next pointer, and an atomic stack depth counter. The
+lock-free property means that multiple threads can push and pop simultaneously,
+and one thread being preempted/delayed in a push or pop operation will not
+impede the forward progress of any other thread.
+
+The lock-free push operation enqueues a linked list of pointers by pointing the
+list's tail to the current stack head, and using a CAS to swing the stack head
+pointer to the head of the list. The operation retries if it is unsuccessful
+(i.e. the list changed between reading the head and modifying it), else it
+adjusts the stack length and returns.
+
+The lock-free pop operation first reserves one or more list elements by
+adjusting the stack length, to ensure the dequeue operation will succeed
+without blocking. It then dequeues pointers by walking the list -- starting
+from the head -- then swinging the head pointer (using a CAS as well). While
+walking the list, the data pointers are recorded in an object table.
+
+The linked list elements themselves are maintained in a lock-free LIFO, and are
+allocated before stack pushes and freed after stack pops. Since the stack has a
+fixed maximum depth, these elements do not need to be dynamically created.
+
+The lock-free behavior is selected by passing the *RTE_STACK_F_LF* flag to
+rte_stack_create().
+
+Preventing the ABA Problem
+^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+To prevent the ABA problem, this algorithm stack uses a 128-bit
+compare-and-swap instruction to atomically update both the stack top pointer
+and a modification counter. The ABA problem can occur without a modification
+counter if, for example:
+
+1. Thread A reads head pointer X and stores the pointed-to list element.
+2. Other threads modify the list such that the head pointer is once again X,
+ but its pointed-to data is different than what thread A read.
+3. Thread A changes the head pointer with a compare-and-swap and succeeds.
+
+In this case thread A would not detect that the list had changed, and would
+both pop stale data and incorrect change the head pointer. By adding a
+modification counter that is updated on every push and pop as part of the
+compare-and-swap, the algorithm can detect when the list changes even if the
+head pointer remains the same.
pointers. The API provides MT-safe push and pop operations that can operate
on one or more pointers per operation.
+ The library supports two stack implementations: standard (lock-based) and lock-free.
+ The lock-free implementation is currently limited to x86-64 platforms.
+
* **Updated KNI module and PMD.**
Updated the KNI kernel module to set the max_mtu according to the given
# all source are stored in SRCS-y
SRCS-$(CONFIG_RTE_LIBRTE_STACK) := rte_stack.c \
- rte_stack_std.c
+ rte_stack_std.c \
+ rte_stack_lf.c
# install includes
SYMLINK-$(CONFIG_RTE_LIBRTE_STACK)-include := rte_stack.h \
- rte_stack_std.h
+ rte_stack_std.h \
+ rte_stack_lf.h \
+ rte_stack_lf_generic.h
include $(RTE_SDK)/mk/rte.lib.mk
allow_experimental_apis = true
version = 1
-sources = files('rte_stack.c', 'rte_stack_std.c')
-headers = files('rte_stack.h', 'rte_stack_std.h')
+sources = files('rte_stack.c', 'rte_stack_std.c', 'rte_stack_lf.c')
+headers = files('rte_stack.h',
+ 'rte_stack_std.h',
+ 'rte_stack_lf.h',
+ 'rte_stack_lf_generic.h')
};
EAL_REGISTER_TAILQ(rte_stack_tailq)
+
static void
-rte_stack_init(struct rte_stack *s)
+rte_stack_init(struct rte_stack *s, unsigned int count, uint32_t flags)
{
memset(s, 0, sizeof(*s));
- rte_stack_std_init(s);
+ if (flags & RTE_STACK_F_LF)
+ rte_stack_lf_init(s, count);
+ else
+ rte_stack_std_init(s);
}
static ssize_t
-rte_stack_get_memsize(unsigned int count)
+rte_stack_get_memsize(unsigned int count, uint32_t flags)
{
- return rte_stack_std_get_memsize(count);
+ if (flags & RTE_STACK_F_LF)
+ return rte_stack_lf_get_memsize(count);
+ else
+ return rte_stack_std_get_memsize(count);
}
struct rte_stack *
unsigned int sz;
int ret;
- RTE_SET_USED(flags);
+#ifdef RTE_ARCH_64
+ RTE_BUILD_BUG_ON(sizeof(struct rte_stack_lf_head) != 16);
+#else
+ if (flags & RTE_STACK_F_LF) {
+ STACK_LOG_ERR("Lock-free stack is not supported on your platform\n");
+ return NULL;
+ }
+#endif
- sz = rte_stack_get_memsize(count);
+ sz = rte_stack_get_memsize(count, flags);
ret = snprintf(mz_name, sizeof(mz_name), "%s%s",
RTE_STACK_MZ_PREFIX, name);
s = mz->addr;
- rte_stack_init(s);
+ rte_stack_init(s, count, flags);
/* Store the name for later lookups */
ret = snprintf(s->name, sizeof(s->name), "%s", name);
extern "C" {
#endif
+#include <rte_atomic.h>
#include <rte_compat.h>
#include <rte_debug.h>
#include <rte_errno.h>
#define RTE_STACK_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
sizeof(RTE_STACK_MZ_PREFIX) + 1)
+struct rte_stack_lf_elem {
+ void *data; /**< Data pointer */
+ struct rte_stack_lf_elem *next; /**< Next pointer */
+};
+
+struct rte_stack_lf_head {
+ struct rte_stack_lf_elem *top; /**< Stack top */
+ uint64_t cnt; /**< Modification counter for avoiding ABA problem */
+};
+
+struct rte_stack_lf_list {
+ /** List head */
+ struct rte_stack_lf_head head __rte_aligned(16);
+ /** List len */
+ rte_atomic64_t len;
+};
+
+/* Structure containing two lock-free LIFO lists: the stack itself and a list
+ * of free linked-list elements.
+ */
+struct rte_stack_lf {
+ /** LIFO list of elements */
+ struct rte_stack_lf_list used __rte_cache_aligned;
+ /** LIFO list of free elements */
+ struct rte_stack_lf_list free __rte_cache_aligned;
+ /** LIFO elements */
+ struct rte_stack_lf_elem elems[] __rte_cache_aligned;
+};
+
/* Structure containing the LIFO, its current length, and a lock for mutual
* exclusion.
*/
const struct rte_memzone *memzone;
uint32_t capacity; /**< Usable size of the stack. */
uint32_t flags; /**< Flags supplied at creation. */
- struct rte_stack_std stack_std; /**< LIFO structure. */
+ RTE_STD_C11
+ union {
+ struct rte_stack_lf stack_lf; /**< Lock-free LIFO structure. */
+ struct rte_stack_std stack_std; /**< LIFO structure. */
+ };
} __rte_cache_aligned;
+/**
+ * The stack uses lock-free push and pop functions. This flag is only
+ * supported on x86_64 platforms, currently.
+ */
+#define RTE_STACK_F_LF 0x0001
+
#include "rte_stack_std.h"
+#include "rte_stack_lf.h"
/**
* @warning
RTE_ASSERT(s != NULL);
RTE_ASSERT(obj_table != NULL);
- return __rte_stack_std_push(s, obj_table, n);
+ if (s->flags & RTE_STACK_F_LF)
+ return __rte_stack_lf_push(s, obj_table, n);
+ else
+ return __rte_stack_std_push(s, obj_table, n);
}
/**
RTE_ASSERT(s != NULL);
RTE_ASSERT(obj_table != NULL);
- return __rte_stack_std_pop(s, obj_table, n);
+ if (s->flags & RTE_STACK_F_LF)
+ return __rte_stack_lf_pop(s, obj_table, n);
+ else
+ return __rte_stack_std_pop(s, obj_table, n);
}
/**
{
RTE_ASSERT(s != NULL);
- return __rte_stack_std_count(s);
+ if (s->flags & RTE_STACK_F_LF)
+ return __rte_stack_lf_count(s);
+ else
+ return __rte_stack_std_count(s);
}
/**
* NUMA. The value can be *SOCKET_ID_ANY* if there is no NUMA
* constraint for the reserved zone.
* @param flags
- * Reserved for future use.
+ * An OR of the following:
+ * - RTE_STACK_F_LF: If this flag is set, the stack uses lock-free
+ * variants of the push and pop functions. Otherwise, it achieves
+ * thread-safety using a lock.
* @return
* On success, the pointer to the new allocated stack. NULL on error with
* rte_errno set appropriately. Possible errno values include:
--- /dev/null
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2019 Intel Corporation
+ */
+
+#include "rte_stack.h"
+
+void
+rte_stack_lf_init(struct rte_stack *s, unsigned int count)
+{
+ struct rte_stack_lf_elem *elems = s->stack_lf.elems;
+ unsigned int i;
+
+ for (i = 0; i < count; i++)
+ __rte_stack_lf_push_elems(&s->stack_lf.free,
+ &elems[i], &elems[i], 1);
+}
+
+ssize_t
+rte_stack_lf_get_memsize(unsigned int count)
+{
+ ssize_t sz = sizeof(struct rte_stack);
+
+ sz += RTE_CACHE_LINE_ROUNDUP(count * sizeof(struct rte_stack_lf_elem));
+
+ /* Add padding to avoid false sharing conflicts caused by
+ * next-line hardware prefetchers.
+ */
+ sz += 2 * RTE_CACHE_LINE_SIZE;
+
+ return sz;
+}
--- /dev/null
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2019 Intel Corporation
+ */
+
+#ifndef _RTE_STACK_LF_H_
+#define _RTE_STACK_LF_H_
+
+#include "rte_stack_lf_generic.h"
+
+/**
+ * @internal Push several objects on the lock-free stack (MT-safe).
+ *
+ * @param s
+ * A pointer to the stack structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to push on the stack from the obj_table.
+ * @return
+ * Actual number of objects enqueued.
+ */
+static __rte_always_inline unsigned int __rte_experimental
+__rte_stack_lf_push(struct rte_stack *s,
+ void * const *obj_table,
+ unsigned int n)
+{
+ struct rte_stack_lf_elem *tmp, *first, *last = NULL;
+ unsigned int i;
+
+ if (unlikely(n == 0))
+ return 0;
+
+ /* Pop n free elements */
+ first = __rte_stack_lf_pop_elems(&s->stack_lf.free, n, NULL, &last);
+ if (unlikely(first == NULL))
+ return 0;
+
+ /* Construct the list elements */
+ for (tmp = first, i = 0; i < n; i++, tmp = tmp->next)
+ tmp->data = obj_table[n - i - 1];
+
+ /* Push them to the used list */
+ __rte_stack_lf_push_elems(&s->stack_lf.used, first, last, n);
+
+ return n;
+}
+
+/**
+ * @internal Pop several objects from the lock-free stack (MT-safe).
+ *
+ * @param s
+ * A pointer to the stack structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to pull from the stack.
+ * @return
+ * - Actual number of objects popped.
+ */
+static __rte_always_inline unsigned int __rte_experimental
+__rte_stack_lf_pop(struct rte_stack *s, void **obj_table, unsigned int n)
+{
+ struct rte_stack_lf_elem *first, *last = NULL;
+
+ if (unlikely(n == 0))
+ return 0;
+
+ /* Pop n used elements */
+ first = __rte_stack_lf_pop_elems(&s->stack_lf.used,
+ n, obj_table, &last);
+ if (unlikely(first == NULL))
+ return 0;
+
+ /* Push the list elements to the free list */
+ __rte_stack_lf_push_elems(&s->stack_lf.free, first, last, n);
+
+ return n;
+}
+
+/**
+ * @internal Initialize a lock-free stack.
+ *
+ * @param s
+ * A pointer to the stack structure.
+ * @param count
+ * The size of the stack.
+ */
+void
+rte_stack_lf_init(struct rte_stack *s, unsigned int count);
+
+/**
+ * @internal Return the memory required for a lock-free stack.
+ *
+ * @param count
+ * The size of the stack.
+ * @return
+ * The bytes to allocate for a lock-free stack.
+ */
+ssize_t
+rte_stack_lf_get_memsize(unsigned int count);
+
+#endif /* _RTE_STACK_LF_H_ */
--- /dev/null
+/* SPDX-License-Identifier: BSD-3-Clause
+ * Copyright(c) 2019 Intel Corporation
+ */
+
+#ifndef _RTE_STACK_LF_GENERIC_H_
+#define _RTE_STACK_LF_GENERIC_H_
+
+#include <rte_branch_prediction.h>
+#include <rte_prefetch.h>
+
+static __rte_always_inline unsigned int
+__rte_stack_lf_count(struct rte_stack *s)
+{
+ /* stack_lf_push() and stack_lf_pop() do not update the list's contents
+ * and stack_lf->len atomically, which can cause the list to appear
+ * shorter than it actually is if this function is called while other
+ * threads are modifying the list.
+ *
+ * However, given the inherently approximate nature of the get_count
+ * callback -- even if the list and its size were updated atomically,
+ * the size could change between when get_count executes and when the
+ * value is returned to the caller -- this is acceptable.
+ *
+ * The stack_lf->len updates are placed such that the list may appear to
+ * have fewer elements than it does, but will never appear to have more
+ * elements. If the mempool is near-empty to the point that this is a
+ * concern, the user should consider increasing the mempool size.
+ */
+ return (unsigned int)rte_atomic64_read(&s->stack_lf.used.len);
+}
+
+static __rte_always_inline void
+__rte_stack_lf_push_elems(struct rte_stack_lf_list *list,
+ struct rte_stack_lf_elem *first,
+ struct rte_stack_lf_elem *last,
+ unsigned int num)
+{
+#ifndef RTE_ARCH_X86_64
+ RTE_SET_USED(first);
+ RTE_SET_USED(last);
+ RTE_SET_USED(list);
+ RTE_SET_USED(num);
+#else
+ struct rte_stack_lf_head old_head;
+ int success;
+
+ old_head = list->head;
+
+ do {
+ struct rte_stack_lf_head new_head;
+
+ /* An acquire fence (or stronger) is needed for weak memory
+ * models to establish a synchronized-with relationship between
+ * the list->head load and store-release operations (as part of
+ * the rte_atomic128_cmp_exchange()).
+ */
+ rte_smp_mb();
+
+ /* Swing the top pointer to the first element in the list and
+ * make the last element point to the old top.
+ */
+ new_head.top = first;
+ new_head.cnt = old_head.cnt + 1;
+
+ last->next = old_head.top;
+
+ /* old_head is updated on failure */
+ success = rte_atomic128_cmp_exchange(
+ (rte_int128_t *)&list->head,
+ (rte_int128_t *)&old_head,
+ (rte_int128_t *)&new_head,
+ 1, __ATOMIC_RELEASE,
+ __ATOMIC_RELAXED);
+ } while (success == 0);
+
+ rte_atomic64_add(&list->len, num);
+#endif
+}
+
+static __rte_always_inline struct rte_stack_lf_elem *
+__rte_stack_lf_pop_elems(struct rte_stack_lf_list *list,
+ unsigned int num,
+ void **obj_table,
+ struct rte_stack_lf_elem **last)
+{
+#ifndef RTE_ARCH_X86_64
+ RTE_SET_USED(obj_table);
+ RTE_SET_USED(last);
+ RTE_SET_USED(list);
+ RTE_SET_USED(num);
+
+ return NULL;
+#else
+ struct rte_stack_lf_head old_head;
+ int success;
+
+ /* Reserve num elements, if available */
+ while (1) {
+ uint64_t len = rte_atomic64_read(&list->len);
+
+ /* Does the list contain enough elements? */
+ if (unlikely(len < num))
+ return NULL;
+
+ if (rte_atomic64_cmpset((volatile uint64_t *)&list->len,
+ len, len - num))
+ break;
+ }
+
+ old_head = list->head;
+
+ /* Pop num elements */
+ do {
+ struct rte_stack_lf_head new_head;
+ struct rte_stack_lf_elem *tmp;
+ unsigned int i;
+
+ /* An acquire fence (or stronger) is needed for weak memory
+ * models to ensure the LF LIFO element reads are properly
+ * ordered with respect to the head pointer read.
+ */
+ rte_smp_mb();
+
+ rte_prefetch0(old_head.top);
+
+ tmp = old_head.top;
+
+ /* Traverse the list to find the new head. A next pointer will
+ * either point to another element or NULL; if a thread
+ * encounters a pointer that has already been popped, the CAS
+ * will fail.
+ */
+ for (i = 0; i < num && tmp != NULL; i++) {
+ rte_prefetch0(tmp->next);
+ if (obj_table)
+ obj_table[i] = tmp->data;
+ if (last)
+ *last = tmp;
+ tmp = tmp->next;
+ }
+
+ /* If NULL was encountered, the list was modified while
+ * traversing it. Retry.
+ */
+ if (i != num)
+ continue;
+
+ new_head.top = tmp;
+ new_head.cnt = old_head.cnt + 1;
+
+ /* old_head is updated on failure */
+ success = rte_atomic128_cmp_exchange(
+ (rte_int128_t *)&list->head,
+ (rte_int128_t *)&old_head,
+ (rte_int128_t *)&new_head,
+ 1, __ATOMIC_RELEASE,
+ __ATOMIC_RELAXED);
+ } while (success == 0);
+
+ return old_head.top;
+#endif
+}
+
+#endif /* _RTE_STACK_LF_GENERIC_H_ */