From: Konstantin Ananyev Date: Mon, 20 Apr 2020 12:28:26 +0000 (+0100) Subject: ring: introduce HTS ring mode X-Git-Url: http://git.droids-corp.org/?a=commitdiff_plain;h=1cc363b8ce06e855305c6f0e9a9c63bb42a8ac28;p=dpdk.git ring: introduce HTS ring mode Introduce head/tail sync mode for MT ring synchronization. In that mode enqueue/dequeue operation is fully serialized: only one thread at a time is allowed to perform given op. Suppose to reduce stall times in case when ring is used on overcommitted cpus (multiple active threads on the same cpu). Signed-off-by: Konstantin Ananyev Acked-by: Honnappa Nagarahalli --- diff --git a/doc/guides/prog_guide/ring_lib.rst b/doc/guides/prog_guide/ring_lib.rst index de4661aafe..26670cad04 100644 --- a/doc/guides/prog_guide/ring_lib.rst +++ b/doc/guides/prog_guide/ring_lib.rst @@ -390,6 +390,21 @@ one for head update, second for tail update. In comparison the original MP/MC algorithm requires one 32-bit CAS for head update and waiting/spinning on tail value. +MP_HTS/MC_HTS +~~~~~~~~~~~~~ + +Multi-producer (/multi-consumer) with Head/Tail Sync (HTS) mode. +In that mode enqueue/dequeue operation is fully serialized: +at any given moment only one enqueue/dequeue operation can proceed. +This is achieved by allowing a thread to proceed with changing ``head.value`` +only when ``head.value == tail.value``. +Both head and tail values are updated atomically (as one 64-bit value). +To achieve that 64-bit CAS is used by head update routine. +That technique also avoids the Lock-Waiter-Preemption (LWP) problem on tail +update and helps to improve ring enqueue/dequeue behavior in overcommitted +scenarios. Another advantage of fully serialized producer/consumer - +it provides the ability to implement MT safe peek API for rte_ring. + References ---------- diff --git a/doc/guides/rel_notes/release_20_05.rst b/doc/guides/rel_notes/release_20_05.rst index 60bb4eb637..5fa0a457f8 100644 --- a/doc/guides/rel_notes/release_20_05.rst +++ b/doc/guides/rel_notes/release_20_05.rst @@ -58,10 +58,10 @@ New Features * **New synchronization modes for rte_ring.** - Introduced new optional MT synchronization mode for rte_ring: - Relaxed Tail Sync (RTS). With this mode selected, rte_ring shows - significant improvements for average enqueue/dequeue times on - overcommitted systems. + Introduced new optional MT synchronization modes for rte_ring: + Relaxed Tail Sync (RTS) mode and Head/Tail Sync (HTS) mode. + With these mode selected, rte_ring shows significant improvements for + average enqueue/dequeue times on overcommitted systems. * **Updated Mellanox mlx5 driver.** diff --git a/lib/librte_ring/Makefile b/lib/librte_ring/Makefile index 04e446e379..f75d8e530b 100644 --- a/lib/librte_ring/Makefile +++ b/lib/librte_ring/Makefile @@ -20,6 +20,8 @@ SYMLINK-$(CONFIG_RTE_LIBRTE_RING)-include := rte_ring.h \ rte_ring_elem.h \ rte_ring_generic.h \ rte_ring_c11_mem.h \ + rte_ring_hts.h \ + rte_ring_hts_c11_mem.h \ rte_ring_rts.h \ rte_ring_rts_c11_mem.h diff --git a/lib/librte_ring/meson.build b/lib/librte_ring/meson.build index 42b98c6818..d22f852616 100644 --- a/lib/librte_ring/meson.build +++ b/lib/librte_ring/meson.build @@ -7,5 +7,7 @@ headers = files('rte_ring.h', 'rte_ring_elem.h', 'rte_ring_c11_mem.h', 'rte_ring_generic.h', + 'rte_ring_hts.h', + 'rte_ring_hts_c11_mem.h', 'rte_ring_rts.h', 'rte_ring_rts_c11_mem.h') diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index 222eec0fb6..ebe5ccf0de 100644 --- a/lib/librte_ring/rte_ring.c +++ b/lib/librte_ring/rte_ring.c @@ -89,9 +89,11 @@ static void reset_headtail(void *p) { struct rte_ring_headtail *ht; + struct rte_ring_hts_headtail *ht_hts; struct rte_ring_rts_headtail *ht_rts; ht = p; + ht_hts = p; ht_rts = p; switch (ht->sync_type) { @@ -104,6 +106,9 @@ reset_headtail(void *p) ht_rts->head.raw = 0; ht_rts->tail.raw = 0; break; + case RTE_RING_SYNC_MT_HTS: + ht_hts->ht.raw = 0; + break; default: /* unknown sync mode */ RTE_ASSERT(0); @@ -127,9 +132,9 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st, enum rte_ring_sync_type *cons_st) { static const uint32_t prod_st_flags = - (RING_F_SP_ENQ | RING_F_MP_RTS_ENQ); + (RING_F_SP_ENQ | RING_F_MP_RTS_ENQ | RING_F_MP_HTS_ENQ); static const uint32_t cons_st_flags = - (RING_F_SC_DEQ | RING_F_MC_RTS_DEQ); + (RING_F_SC_DEQ | RING_F_MC_RTS_DEQ | RING_F_MC_HTS_DEQ); switch (flags & prod_st_flags) { case 0: @@ -141,6 +146,9 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st, case RING_F_MP_RTS_ENQ: *prod_st = RTE_RING_SYNC_MT_RTS; break; + case RING_F_MP_HTS_ENQ: + *prod_st = RTE_RING_SYNC_MT_HTS; + break; default: return -EINVAL; } @@ -155,6 +163,9 @@ get_sync_type(uint32_t flags, enum rte_ring_sync_type *prod_st, case RING_F_MC_RTS_DEQ: *cons_st = RTE_RING_SYNC_MT_RTS; break; + case RING_F_MC_HTS_DEQ: + *cons_st = RTE_RING_SYNC_MT_HTS; + break; default: return -EINVAL; } @@ -176,6 +187,11 @@ rte_ring_init(struct rte_ring *r, const char *name, unsigned count, RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) & RTE_CACHE_LINE_MASK) != 0); + RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, sync_type) != + offsetof(struct rte_ring_hts_headtail, sync_type)); + RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, tail) != + offsetof(struct rte_ring_hts_headtail, ht.pos.tail)); + RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, sync_type) != offsetof(struct rte_ring_rts_headtail, sync_type)); RTE_BUILD_BUG_ON(offsetof(struct rte_ring_headtail, tail) != diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index c42e1cfc46..7cf0465288 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -86,6 +86,9 @@ ssize_t rte_ring_get_memsize(unsigned count); * - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` * is "multi-producer RTS mode". + * - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "multi-producer HTS mode". * If none of these flags is set, then default "multi-producer" * behavior is selected. * - One of mutually exclusive flags that define consumer behavior: @@ -95,6 +98,9 @@ ssize_t rte_ring_get_memsize(unsigned count); * - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` * is "multi-consumer RTS mode". + * - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "multi-consumer HTS mode". * If none of these flags is set, then default "multi-consumer" * behavior is selected. * @return @@ -133,6 +139,9 @@ int rte_ring_init(struct rte_ring *r, const char *name, unsigned count, * - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` * is "multi-producer RTS mode". + * - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "multi-producer HTS mode". * If none of these flags is set, then default "multi-producer" * behavior is selected. * - One of mutually exclusive flags that define consumer behavior: @@ -142,6 +151,9 @@ int rte_ring_init(struct rte_ring *r, const char *name, unsigned count, * - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` * is "multi-consumer RTS mode". + * - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "multi-consumer HTS mode". * If none of these flags is set, then default "multi-consumer" * behavior is selected. * @return @@ -422,6 +434,9 @@ rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table, case RTE_RING_SYNC_MT_RTS: return rte_ring_mp_rts_enqueue_bulk(r, obj_table, n, free_space); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mp_hts_enqueue_bulk(r, obj_table, n, + free_space); #endif } @@ -569,6 +584,8 @@ rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, #ifdef ALLOW_EXPERIMENTAL_API case RTE_RING_SYNC_MT_RTS: return rte_ring_mc_rts_dequeue_bulk(r, obj_table, n, available); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mc_hts_dequeue_bulk(r, obj_table, n, available); #endif } @@ -903,6 +920,9 @@ rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, case RTE_RING_SYNC_MT_RTS: return rte_ring_mp_rts_enqueue_burst(r, obj_table, n, free_space); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mp_hts_enqueue_burst(r, obj_table, n, + free_space); #endif } @@ -996,6 +1016,9 @@ rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, case RTE_RING_SYNC_MT_RTS: return rte_ring_mc_rts_dequeue_burst(r, obj_table, n, available); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mc_hts_dequeue_burst(r, obj_table, n, + available); #endif } diff --git a/lib/librte_ring/rte_ring_core.h b/lib/librte_ring/rte_ring_core.h index bd21fa5355..16718ca7f1 100644 --- a/lib/librte_ring/rte_ring_core.h +++ b/lib/librte_ring/rte_ring_core.h @@ -59,6 +59,7 @@ enum rte_ring_sync_type { RTE_RING_SYNC_ST, /**< single thread only */ #ifdef ALLOW_EXPERIMENTAL_API RTE_RING_SYNC_MT_RTS, /**< multi-thread relaxed tail sync */ + RTE_RING_SYNC_MT_HTS, /**< multi-thread head/tail sync */ #endif }; @@ -95,6 +96,20 @@ struct rte_ring_rts_headtail { volatile union __rte_ring_rts_poscnt head; }; +union __rte_ring_hts_pos { + /** raw 8B value to read/write *head* and *tail* as one atomic op */ + uint64_t raw __rte_aligned(8); + struct { + uint32_t head; /**< head position */ + uint32_t tail; /**< tail position */ + } pos; +}; + +struct rte_ring_hts_headtail { + volatile union __rte_ring_hts_pos ht; + enum rte_ring_sync_type sync_type; /**< sync type of prod/cons */ +}; + /** * An RTE ring structure. * @@ -126,6 +141,7 @@ struct rte_ring { RTE_STD_C11 union { struct rte_ring_headtail prod; + struct rte_ring_hts_headtail hts_prod; struct rte_ring_rts_headtail rts_prod; } __rte_cache_aligned; @@ -135,6 +151,7 @@ struct rte_ring { RTE_STD_C11 union { struct rte_ring_headtail cons; + struct rte_ring_hts_headtail hts_cons; struct rte_ring_rts_headtail rts_cons; } __rte_cache_aligned; @@ -157,6 +174,9 @@ struct rte_ring { #define RING_F_MP_RTS_ENQ 0x0008 /**< The default enqueue is "MP RTS". */ #define RING_F_MC_RTS_DEQ 0x0010 /**< The default dequeue is "MC RTS". */ +#define RING_F_MP_HTS_ENQ 0x0020 /**< The default enqueue is "MP HTS". */ +#define RING_F_MC_HTS_DEQ 0x0040 /**< The default dequeue is "MC HTS". */ + #ifdef __cplusplus } #endif diff --git a/lib/librte_ring/rte_ring_elem.h b/lib/librte_ring/rte_ring_elem.h index 4030753b60..492eef936d 100644 --- a/lib/librte_ring/rte_ring_elem.h +++ b/lib/librte_ring/rte_ring_elem.h @@ -81,6 +81,9 @@ ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count); * - RING_F_MP_RTS_ENQ: If this flag is set, the default behavior when * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` * is "multi-producer RTS mode". + * - RING_F_MP_HTS_ENQ: If this flag is set, the default behavior when + * using ``rte_ring_enqueue()`` or ``rte_ring_enqueue_bulk()`` + * is "multi-producer HTS mode". * If none of these flags is set, then default "multi-producer" * behavior is selected. * - One of mutually exclusive flags that define consumer behavior: @@ -90,6 +93,9 @@ ssize_t rte_ring_get_memsize_elem(unsigned int esize, unsigned int count); * - RING_F_MC_RTS_DEQ: If this flag is set, the default behavior when * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` * is "multi-consumer RTS mode". + * - RING_F_MC_HTS_DEQ: If this flag is set, the default behavior when + * using ``rte_ring_dequeue()`` or ``rte_ring_dequeue_bulk()`` + * is "multi-consumer HTS mode". * If none of these flags is set, then default "multi-consumer" * behavior is selected. * @return @@ -541,6 +547,7 @@ rte_ring_sp_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, } #ifdef ALLOW_EXPERIMENTAL_API +#include #include #endif @@ -585,6 +592,9 @@ rte_ring_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, case RTE_RING_SYNC_MT_RTS: return rte_ring_mp_rts_enqueue_bulk_elem(r, obj_table, esize, n, free_space); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mp_hts_enqueue_bulk_elem(r, obj_table, esize, n, + free_space); #endif } @@ -766,6 +776,9 @@ rte_ring_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, case RTE_RING_SYNC_MT_RTS: return rte_ring_mc_rts_dequeue_bulk_elem(r, obj_table, esize, n, available); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mc_hts_dequeue_bulk_elem(r, obj_table, esize, + n, available); #endif } @@ -951,6 +964,9 @@ rte_ring_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, case RTE_RING_SYNC_MT_RTS: return rte_ring_mp_rts_enqueue_burst_elem(r, obj_table, esize, n, free_space); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mp_hts_enqueue_burst_elem(r, obj_table, esize, + n, free_space); #endif } @@ -1060,6 +1076,9 @@ rte_ring_dequeue_burst_elem(struct rte_ring *r, void *obj_table, case RTE_RING_SYNC_MT_RTS: return rte_ring_mc_rts_dequeue_burst_elem(r, obj_table, esize, n, available); + case RTE_RING_SYNC_MT_HTS: + return rte_ring_mc_hts_dequeue_burst_elem(r, obj_table, esize, + n, available); #endif } diff --git a/lib/librte_ring/rte_ring_hts.h b/lib/librte_ring/rte_ring_hts.h new file mode 100644 index 0000000000..c7701defcd --- /dev/null +++ b/lib/librte_ring/rte_ring_hts.h @@ -0,0 +1,332 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2010-2020 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_HTS_H_ +#define _RTE_RING_HTS_H_ + +/** + * @file rte_ring_hts.h + * @b EXPERIMENTAL: this API may change without prior notice + * It is not recommended to include this file directly. + * Please include instead. + * + * Contains functions for serialized, aka Head-Tail Sync (HTS) ring mode. + * In that mode enqueue/dequeue operation is fully serialized: + * at any given moment only one enqueue/dequeue operation can proceed. + * This is achieved by allowing a thread to proceed with changing head.value + * only when head.value == tail.value. + * Both head and tail values are updated atomically (as one 64-bit value). + * To achieve that 64-bit CAS is used by head update routine. + */ + +#ifdef __cplusplus +extern "C" { +#endif + +#include + +/** + * @internal Enqueue several objects on the HTS ring. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items as possible from ring + * @param free_space + * returns the amount of space after the enqueue operation has finished + * @return + * Actual number of objects enqueued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_hts_enqueue_elem(struct rte_ring *r, const void *obj_table, + uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, + uint32_t *free_space) +{ + uint32_t free, head; + + n = __rte_ring_hts_move_prod_head(r, n, behavior, &head, &free); + + if (n != 0) { + __rte_ring_enqueue_elems(r, head, obj_table, esize, n); + __rte_ring_hts_update_tail(&r->hts_prod, head, n, 1); + } + + if (free_space != NULL) + *free_space = free - n; + return n; +} + +/** + * @internal Dequeue several objects from the HTS ring. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to pull from the ring. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items as possible from ring + * @param available + * returns the number of remaining ring entries after the dequeue has finished + * @return + * - Actual number of objects dequeued. + * If behavior == RTE_RING_QUEUE_FIXED, this will be 0 or n only. + */ +static __rte_always_inline unsigned int +__rte_ring_do_hts_dequeue_elem(struct rte_ring *r, void *obj_table, + uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior, + uint32_t *available) +{ + uint32_t entries, head; + + n = __rte_ring_hts_move_cons_head(r, n, behavior, &head, &entries); + + if (n != 0) { + __rte_ring_dequeue_elems(r, head, obj_table, esize, n); + __rte_ring_hts_update_tail(&r->hts_cons, head, n, 0); + } + + if (available != NULL) + *available = entries - n; + return n; +} + +/** + * Enqueue several objects on the HTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_mp_hts_enqueue_bulk_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_hts_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, free_space); +} + +/** + * Dequeue several objects from an HTS ring (multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_mc_hts_dequeue_bulk_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_hts_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_FIXED, available); +} + +/** + * Enqueue several objects on the HTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +__rte_experimental +static __rte_always_inline unsigned +rte_ring_mp_hts_enqueue_burst_elem(struct rte_ring *r, const void *obj_table, + unsigned int esize, unsigned int n, unsigned int *free_space) +{ + return __rte_ring_do_hts_enqueue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, free_space); +} + +/** + * Dequeue several objects from an HTS ring (multi-consumers safe). + * When the requested objects are more than the available objects, + * only dequeue the actual number of objects. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of objects that will be filled. + * @param esize + * The size of ring element, in bytes. It must be a multiple of 4. + * This must be the same value used while creating the ring. Otherwise + * the results are undefined. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +__rte_experimental +static __rte_always_inline unsigned +rte_ring_mc_hts_dequeue_burst_elem(struct rte_ring *r, void *obj_table, + unsigned int esize, unsigned int n, unsigned int *available) +{ + return __rte_ring_do_hts_dequeue_elem(r, obj_table, esize, n, + RTE_RING_QUEUE_VARIABLE, available); +} + +/** + * Enqueue several objects on the HTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * The number of objects enqueued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_mp_hts_enqueue_bulk(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return rte_ring_mp_hts_enqueue_bulk_elem(r, obj_table, + sizeof(uintptr_t), n, free_space); +} + +/** + * Dequeue several objects from an HTS ring (multi-consumers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * The number of objects dequeued, either 0 or n + */ +__rte_experimental +static __rte_always_inline unsigned int +rte_ring_mc_hts_dequeue_bulk(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return rte_ring_mc_hts_dequeue_bulk_elem(r, obj_table, + sizeof(uintptr_t), n, available); +} + +/** + * Enqueue several objects on the HTS ring (multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @param free_space + * if non-NULL, returns the amount of space in the ring after the + * enqueue operation has finished. + * @return + * - n: Actual number of objects enqueued. + */ +__rte_experimental +static __rte_always_inline unsigned +rte_ring_mp_hts_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned int n, unsigned int *free_space) +{ + return rte_ring_mp_hts_enqueue_burst_elem(r, obj_table, + sizeof(uintptr_t), n, free_space); +} + +/** + * Dequeue several objects from an HTS ring (multi-consumers safe). + * When the requested objects are more than the available objects, + * only dequeue the actual number of objects. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param available + * If non-NULL, returns the number of remaining ring entries after the + * dequeue has finished. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +__rte_experimental +static __rte_always_inline unsigned +rte_ring_mc_hts_dequeue_burst(struct rte_ring *r, void **obj_table, + unsigned int n, unsigned int *available) +{ + return rte_ring_mc_hts_dequeue_burst_elem(r, obj_table, + sizeof(uintptr_t), n, available); +} + +#ifdef __cplusplus +} +#endif + +#endif /* _RTE_RING_HTS_H_ */ diff --git a/lib/librte_ring/rte_ring_hts_c11_mem.h b/lib/librte_ring/rte_ring_hts_c11_mem.h new file mode 100644 index 0000000000..16e54b6ffb --- /dev/null +++ b/lib/librte_ring/rte_ring_hts_c11_mem.h @@ -0,0 +1,164 @@ +/* SPDX-License-Identifier: BSD-3-Clause + * + * Copyright (c) 2010-2020 Intel Corporation + * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org + * All rights reserved. + * Derived from FreeBSD's bufring.h + * Used as BSD-3 Licensed with permission from Kip Macy. + */ + +#ifndef _RTE_RING_HTS_C11_MEM_H_ +#define _RTE_RING_HTS_C11_MEM_H_ + +/** + * @file rte_ring_hts_c11_mem.h + * It is not recommended to include this file directly, + * include instead. + * Contains internal helper functions for head/tail sync (HTS) ring mode. + * For more information please refer to . + */ + +/** + * @internal update tail with new value. + */ +static __rte_always_inline void +__rte_ring_hts_update_tail(struct rte_ring_hts_headtail *ht, uint32_t old_tail, + uint32_t num, uint32_t enqueue) +{ + uint32_t tail; + + RTE_SET_USED(enqueue); + + tail = old_tail + num; + __atomic_store_n(&ht->ht.pos.tail, tail, __ATOMIC_RELEASE); +} + +/** + * @internal waits till tail will become equal to head. + * Means no writer/reader is active for that ring. + * Suppose to work as serialization point. + */ +static __rte_always_inline void +__rte_ring_hts_head_wait(const struct rte_ring_hts_headtail *ht, + union __rte_ring_hts_pos *p) +{ + while (p->pos.head != p->pos.tail) { + rte_pause(); + p->raw = __atomic_load_n(&ht->ht.raw, __ATOMIC_ACQUIRE); + } +} + +/** + * @internal This function updates the producer head for enqueue + */ +static __rte_always_inline unsigned int +__rte_ring_hts_move_prod_head(struct rte_ring *r, unsigned int num, + enum rte_ring_queue_behavior behavior, uint32_t *old_head, + uint32_t *free_entries) +{ + uint32_t n; + union __rte_ring_hts_pos np, op; + + const uint32_t capacity = r->capacity; + + op.raw = __atomic_load_n(&r->hts_prod.ht.raw, __ATOMIC_ACQUIRE); + + do { + /* Reset n to the initial burst count */ + n = num; + + /* + * wait for tail to be equal to head, + * make sure that we read prod head/tail *before* + * reading cons tail. + */ + __rte_ring_hts_head_wait(&r->hts_prod, &op); + + /* + * The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * *old_head > cons_tail). So 'free_entries' is always between 0 + * and capacity (which is < size). + */ + *free_entries = capacity + r->cons.tail - op.pos.head; + + /* check that we have enough room in ring */ + if (unlikely(n > *free_entries)) + n = (behavior == RTE_RING_QUEUE_FIXED) ? + 0 : *free_entries; + + if (n == 0) + break; + + np.pos.tail = op.pos.tail; + np.pos.head = op.pos.head + n; + + /* + * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent: + * - OOO reads of cons tail value + * - OOO copy of elems from the ring + */ + } while (__atomic_compare_exchange_n(&r->hts_prod.ht.raw, + &op.raw, np.raw, + 0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0); + + *old_head = op.pos.head; + return n; +} + +/** + * @internal This function updates the consumer head for dequeue + */ +static __rte_always_inline unsigned int +__rte_ring_hts_move_cons_head(struct rte_ring *r, unsigned int num, + enum rte_ring_queue_behavior behavior, uint32_t *old_head, + uint32_t *entries) +{ + uint32_t n; + union __rte_ring_hts_pos np, op; + + op.raw = __atomic_load_n(&r->hts_cons.ht.raw, __ATOMIC_ACQUIRE); + + /* move cons.head atomically */ + do { + /* Restore n as it may change every loop */ + n = num; + + /* + * wait for tail to be equal to head, + * make sure that we read cons head/tail *before* + * reading prod tail. + */ + __rte_ring_hts_head_wait(&r->hts_cons, &op); + + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * cons_head > prod_tail). So 'entries' is always between 0 + * and size(ring)-1. + */ + *entries = r->prod.tail - op.pos.head; + + /* Set the actual entries for dequeue */ + if (n > *entries) + n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries; + + if (unlikely(n == 0)) + break; + + np.pos.tail = op.pos.tail; + np.pos.head = op.pos.head + n; + + /* + * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent: + * - OOO reads of prod tail value + * - OOO copy of elems from the ring + */ + } while (__atomic_compare_exchange_n(&r->hts_cons.ht.raw, + &op.raw, np.raw, + 0, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE) == 0); + + *old_head = op.pos.head; + return n; +} + +#endif /* _RTE_RING_HTS_C11_MEM_H_ */