From: Dharmik Thakkar Date: Wed, 21 Oct 2020 22:50:04 +0000 (-0500) Subject: hash: implement RCU resources reclamation X-Git-Url: http://git.droids-corp.org/?a=commitdiff_plain;h=769b2de7fb52;p=dpdk.git hash: implement RCU resources reclamation Currently, users have to use external RCU mechanisms to free resources when using lock free hash algorithm. Integrate RCU QSBR process to make it easier for the applications to use lock free algorithm. Refer to RCU documentation to understand various aspects of integrating RCU library into other libraries. Suggested-by: Honnappa Nagarahalli Signed-off-by: Dharmik Thakkar Reviewed-by: Ruifeng Wang Acked-by: Ray Kinsella Acked-by: Yipeng Wang --- diff --git a/doc/guides/prog_guide/hash_lib.rst b/doc/guides/prog_guide/hash_lib.rst index d06c7de2ea..96bb121682 100644 --- a/doc/guides/prog_guide/hash_lib.rst +++ b/doc/guides/prog_guide/hash_lib.rst @@ -102,6 +102,10 @@ For concurrent writes, and concurrent reads and writes the following flag values * If the 'do not free on delete' (RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL) flag is set, the position of the entry in the hash table is not freed upon calling delete(). This flag is enabled by default when the lock free read/write concurrency flag is set. The application should free the position after all the readers have stopped referencing the position. Where required, the application can make use of RCU mechanisms to determine when the readers have stopped referencing the position. + RCU QSBR process is integrated within the Hash library for safe freeing of the position. Application has certain responsibilities while using this feature. + For example, rte_hash_rcu_qsbr_add() need to be called to use the integrated RCU mechanism. + Please refer to resource reclamation framework of :ref:`RCU library ` for more details. + Extendable Bucket Functionality support ---------------------------------------- @@ -109,8 +113,8 @@ An extra flag is used to enable this functionality (flag is not set by default). in the very unlikely case due to excessive hash collisions that a key has failed to be inserted, the hash table bucket is extended with a linked list to insert these failed keys. This feature is important for the workloads (e.g. telco workloads) that need to insert up to 100% of the hash table size and can't tolerate any key insertion failure (even if very few). -Please note that with the 'lock free read/write concurrency' flag enabled, users need to call 'rte_hash_free_key_with_position' API in order to free the empty buckets and -deleted keys, to maintain the 100% capacity guarantee. +Please note that with the 'lock free read/write concurrency' flag enabled, users need to call 'rte_hash_free_key_with_position' API or configure integrated RCU QSBR +(or use external RCU mechanisms) in order to free the empty buckets and deleted keys, to maintain the 100% capacity guarantee. Implementation Details (non Extendable Bucket Case) --------------------------------------------------- @@ -172,7 +176,7 @@ Example of deletion: Similar to lookup, the key is searched in its primary and secondary buckets. If the key is found, the entry is marked as empty. If the hash table was configured with 'no free on delete' or 'lock free read/write concurrency', the position of the key is not freed. It is the responsibility of the user to free the position after -readers are not referencing the position anymore. +readers are not referencing the position anymore. User can configure integrated RCU QSBR or use external RCU mechanisms to safely free the position on delete. Implementation Details (with Extendable Bucket) @@ -286,6 +290,8 @@ The flow table operations on the application side are described below: * Free flow: Free flow key position. If 'no free on delete' or 'lock-free read/write concurrency' flags are set, wait till the readers are not referencing the position returned during add/delete flow and then free the position. RCU mechanisms can be used to find out when the readers are not referencing the position anymore. + RCU QSBR process is integrated within the Hash library for safe freeing of the position. Application has certain responsibilities while using this feature. + Please refer to resource reclamation framework of :ref:`RCU library ` for more details. * Lookup flow: Lookup for the flow key in the hash. If the returned position is valid (flow lookup hit), use the returned position to access the flow entry in the flow table. diff --git a/lib/librte_hash/meson.build b/lib/librte_hash/meson.build index 6ab46ae9d7..0977a63fd2 100644 --- a/lib/librte_hash/meson.build +++ b/lib/librte_hash/meson.build @@ -10,3 +10,4 @@ headers = files('rte_crc_arm64.h', sources = files('rte_cuckoo_hash.c', 'rte_fbk_hash.c') deps += ['ring'] +deps += ['rcu'] diff --git a/lib/librte_hash/rte_cuckoo_hash.c b/lib/librte_hash/rte_cuckoo_hash.c index aad0c965be..7514e33aa9 100644 --- a/lib/librte_hash/rte_cuckoo_hash.c +++ b/lib/librte_hash/rte_cuckoo_hash.c @@ -52,6 +52,11 @@ static struct rte_tailq_elem rte_hash_tailq = { }; EAL_REGISTER_TAILQ(rte_hash_tailq) +struct __rte_hash_rcu_dq_entry { + uint32_t key_idx; + uint32_t ext_bkt_idx; +}; + struct rte_hash * rte_hash_find_existing(const char *name) { @@ -210,7 +215,10 @@ rte_hash_create(const struct rte_hash_parameters *params) if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF) { readwrite_concur_lf_support = 1; - /* Enable not freeing internal memory/index on delete */ + /* Enable not freeing internal memory/index on delete. + * If internal RCU is enabled, freeing of internal memory/index + * is done on delete + */ no_free_on_del = 1; } @@ -505,6 +513,9 @@ rte_hash_free(struct rte_hash *h) rte_mcfg_tailq_write_unlock(); + if (h->dq) + rte_rcu_qsbr_dq_delete(h->dq); + if (h->use_local_cache) rte_free(h->local_free_slots); if (h->writer_takes_lock) @@ -607,11 +618,20 @@ void rte_hash_reset(struct rte_hash *h) { uint32_t tot_ring_cnt, i; + unsigned int pending; if (h == NULL) return; __hash_rw_writer_lock(h); + + if (h->dq) { + /* Reclaim all the resources */ + rte_rcu_qsbr_dq_reclaim(h->dq, ~0, NULL, &pending, NULL); + if (pending != 0) + RTE_LOG(ERR, HASH, "RCU reclaim all resources failed\n"); + } + memset(h->buckets, 0, h->num_buckets * sizeof(struct rte_hash_bucket)); memset(h->key_store, 0, h->key_entry_size * (h->entries + 1)); *h->tbl_chng_cnt = 0; @@ -952,6 +972,38 @@ rte_hash_cuckoo_make_space_mw(const struct rte_hash *h, return -ENOSPC; } +static inline uint32_t +alloc_slot(const struct rte_hash *h, struct lcore_cache *cached_free_slots) +{ + unsigned int n_slots; + uint32_t slot_id; + + if (h->use_local_cache) { + /* Try to get a free slot from the local cache */ + if (cached_free_slots->len == 0) { + /* Need to get another burst of free slots from global ring */ + n_slots = rte_ring_mc_dequeue_burst_elem(h->free_slots, + cached_free_slots->objs, + sizeof(uint32_t), + LCORE_CACHE_SIZE, NULL); + if (n_slots == 0) + return EMPTY_SLOT; + + cached_free_slots->len += n_slots; + } + + /* Get a free slot from the local cache */ + cached_free_slots->len--; + slot_id = cached_free_slots->objs[cached_free_slots->len]; + } else { + if (rte_ring_sc_dequeue_elem(h->free_slots, &slot_id, + sizeof(uint32_t)) != 0) + return EMPTY_SLOT; + } + + return slot_id; +} + static inline int32_t __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, hash_sig_t sig, void *data) @@ -963,7 +1015,6 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, uint32_t ext_bkt_id = 0; uint32_t slot_id; int ret; - unsigned n_slots; unsigned lcore_id; unsigned int i; struct lcore_cache *cached_free_slots = NULL; @@ -1001,28 +1052,20 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, if (h->use_local_cache) { lcore_id = rte_lcore_id(); cached_free_slots = &h->local_free_slots[lcore_id]; - /* Try to get a free slot from the local cache */ - if (cached_free_slots->len == 0) { - /* Need to get another burst of free slots from global ring */ - n_slots = rte_ring_mc_dequeue_burst_elem(h->free_slots, - cached_free_slots->objs, - sizeof(uint32_t), - LCORE_CACHE_SIZE, NULL); - if (n_slots == 0) { - return -ENOSPC; - } - - cached_free_slots->len += n_slots; + } + slot_id = alloc_slot(h, cached_free_slots); + if (slot_id == EMPTY_SLOT) { + if (h->dq) { + __hash_rw_writer_lock(h); + ret = rte_rcu_qsbr_dq_reclaim(h->dq, + h->hash_rcu_cfg->max_reclaim_size, + NULL, NULL, NULL); + __hash_rw_writer_unlock(h); + if (ret == 0) + slot_id = alloc_slot(h, cached_free_slots); } - - /* Get a free slot from the local cache */ - cached_free_slots->len--; - slot_id = cached_free_slots->objs[cached_free_slots->len]; - } else { - if (rte_ring_sc_dequeue_elem(h->free_slots, &slot_id, - sizeof(uint32_t)) != 0) { + if (slot_id == EMPTY_SLOT) return -ENOSPC; - } } new_k = RTE_PTR_ADD(keys, slot_id * h->key_entry_size); @@ -1118,8 +1161,19 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, if (rte_ring_sc_dequeue_elem(h->free_ext_bkts, &ext_bkt_id, sizeof(uint32_t)) != 0 || ext_bkt_id == 0) { - ret = -ENOSPC; - goto failure; + if (h->dq) { + if (rte_rcu_qsbr_dq_reclaim(h->dq, + h->hash_rcu_cfg->max_reclaim_size, + NULL, NULL, NULL) == 0) { + rte_ring_sc_dequeue_elem(h->free_ext_bkts, + &ext_bkt_id, + sizeof(uint32_t)); + } + } + if (ext_bkt_id == 0) { + ret = -ENOSPC; + goto failure; + } } /* Use the first location of the new bucket */ @@ -1395,12 +1449,13 @@ rte_hash_lookup_data(const struct rte_hash *h, const void *key, void **data) return __rte_hash_lookup_with_hash(h, key, rte_hash_hash(h, key), data); } -static inline void -remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i) +static int +free_slot(const struct rte_hash *h, uint32_t slot_id) { unsigned lcore_id, n_slots; - struct lcore_cache *cached_free_slots; + struct lcore_cache *cached_free_slots = NULL; + /* Return key indexes to free slot ring */ if (h->use_local_cache) { lcore_id = rte_lcore_id(); cached_free_slots = &h->local_free_slots[lcore_id]; @@ -1411,18 +1466,127 @@ remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i) cached_free_slots->objs, sizeof(uint32_t), LCORE_CACHE_SIZE, NULL); - ERR_IF_TRUE((n_slots == 0), - "%s: could not enqueue free slots in global ring\n", - __func__); + RETURN_IF_TRUE((n_slots == 0), -EFAULT); cached_free_slots->len -= n_slots; } - /* Put index of new free slot in cache. */ - cached_free_slots->objs[cached_free_slots->len] = - bkt->key_idx[i]; - cached_free_slots->len++; + } + + enqueue_slot_back(h, cached_free_slots, slot_id); + return 0; +} + +static void +__hash_rcu_qsbr_free_resource(void *p, void *e, unsigned int n) +{ + void *key_data = NULL; + int ret; + struct rte_hash_key *keys, *k; + struct rte_hash *h = (struct rte_hash *)p; + struct __rte_hash_rcu_dq_entry rcu_dq_entry = + *((struct __rte_hash_rcu_dq_entry *)e); + + RTE_SET_USED(n); + keys = h->key_store; + + k = (struct rte_hash_key *) ((char *)keys + + rcu_dq_entry.key_idx * h->key_entry_size); + key_data = k->pdata; + if (h->hash_rcu_cfg->free_key_data_func) + h->hash_rcu_cfg->free_key_data_func(h->hash_rcu_cfg->key_data_ptr, + key_data); + + if (h->ext_table_support && rcu_dq_entry.ext_bkt_idx != EMPTY_SLOT) + /* Recycle empty ext bkt to free list. */ + rte_ring_sp_enqueue_elem(h->free_ext_bkts, + &rcu_dq_entry.ext_bkt_idx, sizeof(uint32_t)); + + /* Return key indexes to free slot ring */ + ret = free_slot(h, rcu_dq_entry.key_idx); + if (ret < 0) { + RTE_LOG(ERR, HASH, + "%s: could not enqueue free slots in global ring\n", + __func__); + } +} + +int +rte_hash_rcu_qsbr_add(struct rte_hash *h, struct rte_hash_rcu_config *cfg) +{ + struct rte_rcu_qsbr_dq_parameters params = {0}; + char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE]; + struct rte_hash_rcu_config *hash_rcu_cfg = NULL; + const uint32_t total_entries = h->use_local_cache ? + h->entries + (RTE_MAX_LCORE - 1) * (LCORE_CACHE_SIZE - 1) + 1 + : h->entries + 1; + + if (h == NULL || cfg == NULL || cfg->v == NULL) { + rte_errno = EINVAL; + return 1; + } + + if (h->hash_rcu_cfg) { + rte_errno = EEXIST; + return 1; + } + + hash_rcu_cfg = rte_zmalloc(NULL, sizeof(struct rte_hash_rcu_config), 0); + if (hash_rcu_cfg == NULL) { + RTE_LOG(ERR, HASH, "memory allocation failed\n"); + return 1; + } + + if (cfg->mode == RTE_HASH_QSBR_MODE_SYNC) { + /* No other things to do. */ + } else if (cfg->mode == RTE_HASH_QSBR_MODE_DQ) { + /* Init QSBR defer queue. */ + snprintf(rcu_dq_name, sizeof(rcu_dq_name), + "HASH_RCU_%s", h->name); + params.name = rcu_dq_name; + params.size = cfg->dq_size; + if (params.size == 0) + params.size = total_entries; + params.trigger_reclaim_limit = cfg->trigger_reclaim_limit; + if (params.max_reclaim_size == 0) + params.max_reclaim_size = RTE_HASH_RCU_DQ_RECLAIM_MAX; + params.esize = sizeof(struct __rte_hash_rcu_dq_entry); + params.free_fn = __hash_rcu_qsbr_free_resource; + params.p = h; + params.v = cfg->v; + h->dq = rte_rcu_qsbr_dq_create(¶ms); + if (h->dq == NULL) { + rte_free(hash_rcu_cfg); + RTE_LOG(ERR, HASH, "HASH defer queue creation failed\n"); + return 1; + } } else { - rte_ring_sp_enqueue_elem(h->free_slots, - &bkt->key_idx[i], sizeof(uint32_t)); + rte_free(hash_rcu_cfg); + rte_errno = EINVAL; + return 1; + } + + hash_rcu_cfg->v = cfg->v; + hash_rcu_cfg->mode = cfg->mode; + hash_rcu_cfg->dq_size = params.size; + hash_rcu_cfg->trigger_reclaim_limit = params.trigger_reclaim_limit; + hash_rcu_cfg->max_reclaim_size = params.max_reclaim_size; + hash_rcu_cfg->free_key_data_func = cfg->free_key_data_func; + hash_rcu_cfg->key_data_ptr = cfg->key_data_ptr; + + h->hash_rcu_cfg = hash_rcu_cfg; + + return 0; +} + +static inline void +remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, + unsigned int i) +{ + int ret = free_slot(h, bkt->key_idx[i]); + + if (ret < 0) { + RTE_LOG(ERR, HASH, + "%s: could not enqueue free slots in global ring\n", + __func__); } } @@ -1521,6 +1685,8 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key, int pos; int32_t ret, i; uint16_t short_sig; + uint32_t index = EMPTY_SLOT; + struct __rte_hash_rcu_dq_entry rcu_dq_entry; short_sig = get_short_sig(sig); prim_bucket_idx = get_prim_bucket_index(h, sig); @@ -1555,10 +1721,9 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key, /* Search last bucket to see if empty to be recycled */ return_bkt: - if (!last_bkt) { - __hash_rw_writer_unlock(h); - return ret; - } + if (!last_bkt) + goto return_key; + while (last_bkt->next) { prev_bkt = last_bkt; last_bkt = last_bkt->next; @@ -1571,11 +1736,11 @@ return_bkt: /* found empty bucket and recycle */ if (i == RTE_HASH_BUCKET_ENTRIES) { prev_bkt->next = NULL; - uint32_t index = last_bkt - h->buckets_ext + 1; + index = last_bkt - h->buckets_ext + 1; /* Recycle the empty bkt if * no_free_on_del is disabled. */ - if (h->no_free_on_del) + if (h->no_free_on_del) { /* Store index of an empty ext bkt to be recycled * on calling rte_hash_del_xxx APIs. * When lock free read-write concurrency is enabled, @@ -1583,12 +1748,34 @@ return_bkt: * immediately (as readers might be using it still). * Hence freeing of the ext bkt is piggy-backed to * freeing of the key index. + * If using external RCU, store this index in an array. */ - h->ext_bkt_to_free[ret] = index; - else + if (h->hash_rcu_cfg == NULL) + h->ext_bkt_to_free[ret] = index; + } else rte_ring_sp_enqueue_elem(h->free_ext_bkts, &index, sizeof(uint32_t)); } + +return_key: + /* Using internal RCU QSBR */ + if (h->hash_rcu_cfg) { + /* Key index where key is stored, adding the first dummy index */ + rcu_dq_entry.key_idx = ret + 1; + rcu_dq_entry.ext_bkt_idx = index; + if (h->dq == NULL) { + /* Wait for quiescent state change if using + * RTE_HASH_QSBR_MODE_SYNC + */ + rte_rcu_qsbr_synchronize(h->hash_rcu_cfg->v, + RTE_QSBR_THRID_INVALID); + __hash_rcu_qsbr_free_resource((void *)((uintptr_t)h), + &rcu_dq_entry, 1); + } else if (h->dq) + /* Push into QSBR FIFO if using RTE_HASH_QSBR_MODE_DQ */ + if (rte_rcu_qsbr_dq_enqueue(h->dq, &rcu_dq_entry) != 0) + RTE_LOG(ERR, HASH, "Failed to push QSBR FIFO\n"); + } __hash_rw_writer_unlock(h); return ret; } @@ -1637,8 +1824,6 @@ rte_hash_free_key_with_position(const struct rte_hash *h, RETURN_IF_TRUE(((h == NULL) || (key_idx == EMPTY_SLOT)), -EINVAL); - unsigned int lcore_id, n_slots; - struct lcore_cache *cached_free_slots; const uint32_t total_entries = h->use_local_cache ? h->entries + (RTE_MAX_LCORE - 1) * (LCORE_CACHE_SIZE - 1) + 1 : h->entries + 1; @@ -1656,28 +1841,9 @@ rte_hash_free_key_with_position(const struct rte_hash *h, } } - if (h->use_local_cache) { - lcore_id = rte_lcore_id(); - cached_free_slots = &h->local_free_slots[lcore_id]; - /* Cache full, need to free it. */ - if (cached_free_slots->len == LCORE_CACHE_SIZE) { - /* Need to enqueue the free slots in global ring. */ - n_slots = rte_ring_mp_enqueue_burst_elem(h->free_slots, - cached_free_slots->objs, - sizeof(uint32_t), - LCORE_CACHE_SIZE, NULL); - RETURN_IF_TRUE((n_slots == 0), -EFAULT); - cached_free_slots->len -= n_slots; - } - /* Put index of new free slot in cache. */ - cached_free_slots->objs[cached_free_slots->len] = key_idx; - cached_free_slots->len++; - } else { - rte_ring_sp_enqueue_elem(h->free_slots, &key_idx, - sizeof(uint32_t)); - } + /* Enqueue slot to cache/ring of free slots. */ + return free_slot(h, key_idx); - return 0; } static inline void diff --git a/lib/librte_hash/rte_cuckoo_hash.h b/lib/librte_hash/rte_cuckoo_hash.h index 345de6bf9c..85be49d3bb 100644 --- a/lib/librte_hash/rte_cuckoo_hash.h +++ b/lib/librte_hash/rte_cuckoo_hash.h @@ -168,6 +168,11 @@ struct rte_hash { struct lcore_cache *local_free_slots; /**< Local cache per lcore, storing some indexes of the free slots */ + /* RCU config */ + struct rte_hash_rcu_config *hash_rcu_cfg; + /**< HASH RCU QSBR configuration structure */ + struct rte_rcu_qsbr_dq *dq; /**< RCU QSBR defer queue. */ + /* Fields used in lookup */ uint32_t key_len __rte_cache_aligned; @@ -230,4 +235,7 @@ struct queue_node { int prev_slot; /* Parent(slot) in search path */ }; +/** @internal Default RCU defer queue entries to reclaim in one go. */ +#define RTE_HASH_RCU_DQ_RECLAIM_MAX 16 + #endif diff --git a/lib/librte_hash/rte_hash.h b/lib/librte_hash/rte_hash.h index bff40251bc..73c415ff7e 100644 --- a/lib/librte_hash/rte_hash.h +++ b/lib/librte_hash/rte_hash.h @@ -15,6 +15,7 @@ #include #include +#include #ifdef __cplusplus extern "C" { @@ -45,7 +46,8 @@ extern "C" { /** Flag to disable freeing of key index on hash delete. * Refer to rte_hash_del_xxx APIs for more details. * This is enabled by default when RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF - * is enabled. + * is enabled. However, if internal RCU is enabled, freeing of internal + * memory/index is done on delete */ #define RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL 0x10 @@ -67,6 +69,13 @@ typedef uint32_t (*rte_hash_function)(const void *key, uint32_t key_len, /** Type of function used to compare the hash key. */ typedef int (*rte_hash_cmp_eq_t)(const void *key1, const void *key2, size_t key_len); +/** + * Type of function used to free data stored in the key. + * Required when using internal RCU to allow application to free key-data once + * the key is returned to the ring of free key-slots. + */ +typedef void (*rte_hash_free_key_data)(void *p, void *key_data); + /** * Parameters used when creating the hash table. */ @@ -81,6 +90,39 @@ struct rte_hash_parameters { uint8_t extra_flag; /**< Indicate if additional parameters are present. */ }; +/** RCU reclamation modes */ +enum rte_hash_qsbr_mode { + /** Create defer queue for reclaim. */ + RTE_HASH_QSBR_MODE_DQ = 0, + /** Use blocking mode reclaim. No defer queue created. */ + RTE_HASH_QSBR_MODE_SYNC +}; + +/** HASH RCU QSBR configuration structure. */ +struct rte_hash_rcu_config { + struct rte_rcu_qsbr *v; /**< RCU QSBR variable. */ + enum rte_hash_qsbr_mode mode; + /**< Mode of RCU QSBR. RTE_HASH_QSBR_MODE_xxx + * '0' for default: create defer queue for reclaim. + */ + uint32_t dq_size; + /**< RCU defer queue size. + * default: total hash table entries. + */ + uint32_t trigger_reclaim_limit; /**< Threshold to trigger auto reclaim. */ + uint32_t max_reclaim_size; + /**< Max entries to reclaim in one go. + * default: RTE_HASH_RCU_DQ_RECLAIM_MAX. + */ + void *key_data_ptr; + /**< Pointer passed to the free function. Typically, this is the + * pointer to the data structure to which the resource to free + * (key-data) belongs. This can be NULL. + */ + rte_hash_free_key_data free_key_data_func; + /**< Function to call to free the resource (key-data). */ +}; + /** @internal A hash table structure. */ struct rte_hash; @@ -287,7 +329,8 @@ rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, hash_sig_t * Thread safety can be enabled by setting flag during * table creation. * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or - * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled, + * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and + * internal RCU is NOT enabled, * the key index returned by rte_hash_add_key_xxx APIs will not be * freed by this API. rte_hash_free_key_with_position API must be called * additionally to free the index associated with the key. @@ -316,7 +359,8 @@ rte_hash_del_key(const struct rte_hash *h, const void *key); * Thread safety can be enabled by setting flag during * table creation. * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or - * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled, + * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and + * internal RCU is NOT enabled, * the key index returned by rte_hash_add_key_xxx APIs will not be * freed by this API. rte_hash_free_key_with_position API must be called * additionally to free the index associated with the key. @@ -370,7 +414,8 @@ rte_hash_get_key_with_position(const struct rte_hash *h, const int32_t position, * only be called from one thread by default. Thread safety * can be enabled by setting flag during table creation. * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or - * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled, + * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and + * internal RCU is NOT enabled, * the key index returned by rte_hash_del_key_xxx APIs must be freed * using this API. This API should be called after all the readers * have stopped referencing the entry corresponding to this key. @@ -625,6 +670,30 @@ rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys, */ int32_t rte_hash_iterate(const struct rte_hash *h, const void **key, void **data, uint32_t *next); + +/** + * @warning + * @b EXPERIMENTAL: this API may change without prior notice + * + * Associate RCU QSBR variable with a Hash object. + * This API should be called to enable the integrated RCU QSBR support and + * should be called immediately after creating the Hash object. + * + * @param h + * the hash object to add RCU QSBR + * @param cfg + * RCU QSBR configuration + * @return + * On success - 0 + * On error - 1 with error code set in rte_errno. + * Possible rte_errno codes are: + * - EINVAL - invalid pointer + * - EEXIST - already added QSBR + * - ENOMEM - memory allocation failure + */ +__rte_experimental +int rte_hash_rcu_qsbr_add(struct rte_hash *h, struct rte_hash_rcu_config *cfg); + #ifdef __cplusplus } #endif diff --git a/lib/librte_hash/version.map b/lib/librte_hash/version.map index c0db81014f..c6d73080f4 100644 --- a/lib/librte_hash/version.map +++ b/lib/librte_hash/version.map @@ -36,5 +36,5 @@ EXPERIMENTAL { rte_hash_lookup_with_hash_bulk; rte_hash_lookup_with_hash_bulk_data; rte_hash_max_key_id; - + rte_hash_rcu_qsbr_add; };