};
EAL_REGISTER_TAILQ(rte_hash_tailq)
+struct __rte_hash_rcu_dq_entry {
+ uint32_t key_idx;
+ uint32_t ext_bkt_idx;
+};
+
struct rte_hash *
rte_hash_find_existing(const char *name)
{
if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF) {
readwrite_concur_lf_support = 1;
- /* Enable not freeing internal memory/index on delete */
+ /* Enable not freeing internal memory/index on delete.
+ * If internal RCU is enabled, freeing of internal memory/index
+ * is done on delete
+ */
no_free_on_del = 1;
}
rte_mcfg_tailq_write_unlock();
+ if (h->dq)
+ rte_rcu_qsbr_dq_delete(h->dq);
+
if (h->use_local_cache)
rte_free(h->local_free_slots);
if (h->writer_takes_lock)
rte_hash_reset(struct rte_hash *h)
{
uint32_t tot_ring_cnt, i;
+ unsigned int pending;
if (h == NULL)
return;
__hash_rw_writer_lock(h);
+
+ if (h->dq) {
+ /* Reclaim all the resources */
+ rte_rcu_qsbr_dq_reclaim(h->dq, ~0, NULL, &pending, NULL);
+ if (pending != 0)
+ RTE_LOG(ERR, HASH, "RCU reclaim all resources failed\n");
+ }
+
memset(h->buckets, 0, h->num_buckets * sizeof(struct rte_hash_bucket));
memset(h->key_store, 0, h->key_entry_size * (h->entries + 1));
*h->tbl_chng_cnt = 0;
return -ENOSPC;
}
+static inline uint32_t
+alloc_slot(const struct rte_hash *h, struct lcore_cache *cached_free_slots)
+{
+ unsigned int n_slots;
+ uint32_t slot_id;
+
+ if (h->use_local_cache) {
+ /* Try to get a free slot from the local cache */
+ if (cached_free_slots->len == 0) {
+ /* Need to get another burst of free slots from global ring */
+ n_slots = rte_ring_mc_dequeue_burst_elem(h->free_slots,
+ cached_free_slots->objs,
+ sizeof(uint32_t),
+ LCORE_CACHE_SIZE, NULL);
+ if (n_slots == 0)
+ return EMPTY_SLOT;
+
+ cached_free_slots->len += n_slots;
+ }
+
+ /* Get a free slot from the local cache */
+ cached_free_slots->len--;
+ slot_id = cached_free_slots->objs[cached_free_slots->len];
+ } else {
+ if (rte_ring_sc_dequeue_elem(h->free_slots, &slot_id,
+ sizeof(uint32_t)) != 0)
+ return EMPTY_SLOT;
+ }
+
+ return slot_id;
+}
+
static inline int32_t
__rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
hash_sig_t sig, void *data)
uint32_t ext_bkt_id = 0;
uint32_t slot_id;
int ret;
- unsigned n_slots;
unsigned lcore_id;
unsigned int i;
struct lcore_cache *cached_free_slots = NULL;
if (h->use_local_cache) {
lcore_id = rte_lcore_id();
cached_free_slots = &h->local_free_slots[lcore_id];
- /* Try to get a free slot from the local cache */
- if (cached_free_slots->len == 0) {
- /* Need to get another burst of free slots from global ring */
- n_slots = rte_ring_mc_dequeue_burst_elem(h->free_slots,
- cached_free_slots->objs,
- sizeof(uint32_t),
- LCORE_CACHE_SIZE, NULL);
- if (n_slots == 0) {
- return -ENOSPC;
- }
-
- cached_free_slots->len += n_slots;
+ }
+ slot_id = alloc_slot(h, cached_free_slots);
+ if (slot_id == EMPTY_SLOT) {
+ if (h->dq) {
+ __hash_rw_writer_lock(h);
+ ret = rte_rcu_qsbr_dq_reclaim(h->dq,
+ h->hash_rcu_cfg->max_reclaim_size,
+ NULL, NULL, NULL);
+ __hash_rw_writer_unlock(h);
+ if (ret == 0)
+ slot_id = alloc_slot(h, cached_free_slots);
}
-
- /* Get a free slot from the local cache */
- cached_free_slots->len--;
- slot_id = cached_free_slots->objs[cached_free_slots->len];
- } else {
- if (rte_ring_sc_dequeue_elem(h->free_slots, &slot_id,
- sizeof(uint32_t)) != 0) {
+ if (slot_id == EMPTY_SLOT)
return -ENOSPC;
- }
}
new_k = RTE_PTR_ADD(keys, slot_id * h->key_entry_size);
if (rte_ring_sc_dequeue_elem(h->free_ext_bkts, &ext_bkt_id,
sizeof(uint32_t)) != 0 ||
ext_bkt_id == 0) {
- ret = -ENOSPC;
- goto failure;
+ if (h->dq) {
+ if (rte_rcu_qsbr_dq_reclaim(h->dq,
+ h->hash_rcu_cfg->max_reclaim_size,
+ NULL, NULL, NULL) == 0) {
+ rte_ring_sc_dequeue_elem(h->free_ext_bkts,
+ &ext_bkt_id,
+ sizeof(uint32_t));
+ }
+ }
+ if (ext_bkt_id == 0) {
+ ret = -ENOSPC;
+ goto failure;
+ }
}
/* Use the first location of the new bucket */
return __rte_hash_lookup_with_hash(h, key, rte_hash_hash(h, key), data);
}
-static inline void
-remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i)
+static int
+free_slot(const struct rte_hash *h, uint32_t slot_id)
{
unsigned lcore_id, n_slots;
- struct lcore_cache *cached_free_slots;
+ struct lcore_cache *cached_free_slots = NULL;
+ /* Return key indexes to free slot ring */
if (h->use_local_cache) {
lcore_id = rte_lcore_id();
cached_free_slots = &h->local_free_slots[lcore_id];
cached_free_slots->objs,
sizeof(uint32_t),
LCORE_CACHE_SIZE, NULL);
- ERR_IF_TRUE((n_slots == 0),
- "%s: could not enqueue free slots in global ring\n",
- __func__);
+ RETURN_IF_TRUE((n_slots == 0), -EFAULT);
cached_free_slots->len -= n_slots;
}
- /* Put index of new free slot in cache. */
- cached_free_slots->objs[cached_free_slots->len] =
- bkt->key_idx[i];
- cached_free_slots->len++;
+ }
+
+ enqueue_slot_back(h, cached_free_slots, slot_id);
+ return 0;
+}
+
+static void
+__hash_rcu_qsbr_free_resource(void *p, void *e, unsigned int n)
+{
+ void *key_data = NULL;
+ int ret;
+ struct rte_hash_key *keys, *k;
+ struct rte_hash *h = (struct rte_hash *)p;
+ struct __rte_hash_rcu_dq_entry rcu_dq_entry =
+ *((struct __rte_hash_rcu_dq_entry *)e);
+
+ RTE_SET_USED(n);
+ keys = h->key_store;
+
+ k = (struct rte_hash_key *) ((char *)keys +
+ rcu_dq_entry.key_idx * h->key_entry_size);
+ key_data = k->pdata;
+ if (h->hash_rcu_cfg->free_key_data_func)
+ h->hash_rcu_cfg->free_key_data_func(h->hash_rcu_cfg->key_data_ptr,
+ key_data);
+
+ if (h->ext_table_support && rcu_dq_entry.ext_bkt_idx != EMPTY_SLOT)
+ /* Recycle empty ext bkt to free list. */
+ rte_ring_sp_enqueue_elem(h->free_ext_bkts,
+ &rcu_dq_entry.ext_bkt_idx, sizeof(uint32_t));
+
+ /* Return key indexes to free slot ring */
+ ret = free_slot(h, rcu_dq_entry.key_idx);
+ if (ret < 0) {
+ RTE_LOG(ERR, HASH,
+ "%s: could not enqueue free slots in global ring\n",
+ __func__);
+ }
+}
+
+int
+rte_hash_rcu_qsbr_add(struct rte_hash *h, struct rte_hash_rcu_config *cfg)
+{
+ struct rte_rcu_qsbr_dq_parameters params = {0};
+ char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
+ struct rte_hash_rcu_config *hash_rcu_cfg = NULL;
+ const uint32_t total_entries = h->use_local_cache ?
+ h->entries + (RTE_MAX_LCORE - 1) * (LCORE_CACHE_SIZE - 1) + 1
+ : h->entries + 1;
+
+ if (h == NULL || cfg == NULL || cfg->v == NULL) {
+ rte_errno = EINVAL;
+ return 1;
+ }
+
+ if (h->hash_rcu_cfg) {
+ rte_errno = EEXIST;
+ return 1;
+ }
+
+ hash_rcu_cfg = rte_zmalloc(NULL, sizeof(struct rte_hash_rcu_config), 0);
+ if (hash_rcu_cfg == NULL) {
+ RTE_LOG(ERR, HASH, "memory allocation failed\n");
+ return 1;
+ }
+
+ if (cfg->mode == RTE_HASH_QSBR_MODE_SYNC) {
+ /* No other things to do. */
+ } else if (cfg->mode == RTE_HASH_QSBR_MODE_DQ) {
+ /* Init QSBR defer queue. */
+ snprintf(rcu_dq_name, sizeof(rcu_dq_name),
+ "HASH_RCU_%s", h->name);
+ params.name = rcu_dq_name;
+ params.size = cfg->dq_size;
+ if (params.size == 0)
+ params.size = total_entries;
+ params.trigger_reclaim_limit = cfg->trigger_reclaim_limit;
+ if (params.max_reclaim_size == 0)
+ params.max_reclaim_size = RTE_HASH_RCU_DQ_RECLAIM_MAX;
+ params.esize = sizeof(struct __rte_hash_rcu_dq_entry);
+ params.free_fn = __hash_rcu_qsbr_free_resource;
+ params.p = h;
+ params.v = cfg->v;
+ h->dq = rte_rcu_qsbr_dq_create(¶ms);
+ if (h->dq == NULL) {
+ rte_free(hash_rcu_cfg);
+ RTE_LOG(ERR, HASH, "HASH defer queue creation failed\n");
+ return 1;
+ }
} else {
- rte_ring_sp_enqueue_elem(h->free_slots,
- &bkt->key_idx[i], sizeof(uint32_t));
+ rte_free(hash_rcu_cfg);
+ rte_errno = EINVAL;
+ return 1;
+ }
+
+ hash_rcu_cfg->v = cfg->v;
+ hash_rcu_cfg->mode = cfg->mode;
+ hash_rcu_cfg->dq_size = params.size;
+ hash_rcu_cfg->trigger_reclaim_limit = params.trigger_reclaim_limit;
+ hash_rcu_cfg->max_reclaim_size = params.max_reclaim_size;
+ hash_rcu_cfg->free_key_data_func = cfg->free_key_data_func;
+ hash_rcu_cfg->key_data_ptr = cfg->key_data_ptr;
+
+ h->hash_rcu_cfg = hash_rcu_cfg;
+
+ return 0;
+}
+
+static inline void
+remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt,
+ unsigned int i)
+{
+ int ret = free_slot(h, bkt->key_idx[i]);
+
+ if (ret < 0) {
+ RTE_LOG(ERR, HASH,
+ "%s: could not enqueue free slots in global ring\n",
+ __func__);
}
}
int pos;
int32_t ret, i;
uint16_t short_sig;
+ uint32_t index = EMPTY_SLOT;
+ struct __rte_hash_rcu_dq_entry rcu_dq_entry;
short_sig = get_short_sig(sig);
prim_bucket_idx = get_prim_bucket_index(h, sig);
/* Search last bucket to see if empty to be recycled */
return_bkt:
- if (!last_bkt) {
- __hash_rw_writer_unlock(h);
- return ret;
- }
+ if (!last_bkt)
+ goto return_key;
+
while (last_bkt->next) {
prev_bkt = last_bkt;
last_bkt = last_bkt->next;
/* found empty bucket and recycle */
if (i == RTE_HASH_BUCKET_ENTRIES) {
prev_bkt->next = NULL;
- uint32_t index = last_bkt - h->buckets_ext + 1;
+ index = last_bkt - h->buckets_ext + 1;
/* Recycle the empty bkt if
* no_free_on_del is disabled.
*/
- if (h->no_free_on_del)
+ if (h->no_free_on_del) {
/* Store index of an empty ext bkt to be recycled
* on calling rte_hash_del_xxx APIs.
* When lock free read-write concurrency is enabled,
* immediately (as readers might be using it still).
* Hence freeing of the ext bkt is piggy-backed to
* freeing of the key index.
+ * If using external RCU, store this index in an array.
*/
- h->ext_bkt_to_free[ret] = index;
- else
+ if (h->hash_rcu_cfg == NULL)
+ h->ext_bkt_to_free[ret] = index;
+ } else
rte_ring_sp_enqueue_elem(h->free_ext_bkts, &index,
sizeof(uint32_t));
}
+
+return_key:
+ /* Using internal RCU QSBR */
+ if (h->hash_rcu_cfg) {
+ /* Key index where key is stored, adding the first dummy index */
+ rcu_dq_entry.key_idx = ret + 1;
+ rcu_dq_entry.ext_bkt_idx = index;
+ if (h->dq == NULL) {
+ /* Wait for quiescent state change if using
+ * RTE_HASH_QSBR_MODE_SYNC
+ */
+ rte_rcu_qsbr_synchronize(h->hash_rcu_cfg->v,
+ RTE_QSBR_THRID_INVALID);
+ __hash_rcu_qsbr_free_resource((void *)((uintptr_t)h),
+ &rcu_dq_entry, 1);
+ } else if (h->dq)
+ /* Push into QSBR FIFO if using RTE_HASH_QSBR_MODE_DQ */
+ if (rte_rcu_qsbr_dq_enqueue(h->dq, &rcu_dq_entry) != 0)
+ RTE_LOG(ERR, HASH, "Failed to push QSBR FIFO\n");
+ }
__hash_rw_writer_unlock(h);
return ret;
}
RETURN_IF_TRUE(((h == NULL) || (key_idx == EMPTY_SLOT)), -EINVAL);
- unsigned int lcore_id, n_slots;
- struct lcore_cache *cached_free_slots;
const uint32_t total_entries = h->use_local_cache ?
h->entries + (RTE_MAX_LCORE - 1) * (LCORE_CACHE_SIZE - 1) + 1
: h->entries + 1;
}
}
- if (h->use_local_cache) {
- lcore_id = rte_lcore_id();
- cached_free_slots = &h->local_free_slots[lcore_id];
- /* Cache full, need to free it. */
- if (cached_free_slots->len == LCORE_CACHE_SIZE) {
- /* Need to enqueue the free slots in global ring. */
- n_slots = rte_ring_mp_enqueue_burst_elem(h->free_slots,
- cached_free_slots->objs,
- sizeof(uint32_t),
- LCORE_CACHE_SIZE, NULL);
- RETURN_IF_TRUE((n_slots == 0), -EFAULT);
- cached_free_slots->len -= n_slots;
- }
- /* Put index of new free slot in cache. */
- cached_free_slots->objs[cached_free_slots->len] = key_idx;
- cached_free_slots->len++;
- } else {
- rte_ring_sp_enqueue_elem(h->free_slots, &key_idx,
- sizeof(uint32_t));
- }
+ /* Enqueue slot to cache/ring of free slots. */
+ return free_slot(h, key_idx);
- return 0;
}
static inline void
#include <stddef.h>
#include <rte_compat.h>
+#include <rte_rcu_qsbr.h>
#ifdef __cplusplus
extern "C" {
/** Flag to disable freeing of key index on hash delete.
* Refer to rte_hash_del_xxx APIs for more details.
* This is enabled by default when RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF
- * is enabled.
+ * is enabled. However, if internal RCU is enabled, freeing of internal
+ * memory/index is done on delete
*/
#define RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL 0x10
/** Type of function used to compare the hash key. */
typedef int (*rte_hash_cmp_eq_t)(const void *key1, const void *key2, size_t key_len);
+/**
+ * Type of function used to free data stored in the key.
+ * Required when using internal RCU to allow application to free key-data once
+ * the key is returned to the ring of free key-slots.
+ */
+typedef void (*rte_hash_free_key_data)(void *p, void *key_data);
+
/**
* Parameters used when creating the hash table.
*/
uint8_t extra_flag; /**< Indicate if additional parameters are present. */
};
+/** RCU reclamation modes */
+enum rte_hash_qsbr_mode {
+ /** Create defer queue for reclaim. */
+ RTE_HASH_QSBR_MODE_DQ = 0,
+ /** Use blocking mode reclaim. No defer queue created. */
+ RTE_HASH_QSBR_MODE_SYNC
+};
+
+/** HASH RCU QSBR configuration structure. */
+struct rte_hash_rcu_config {
+ struct rte_rcu_qsbr *v; /**< RCU QSBR variable. */
+ enum rte_hash_qsbr_mode mode;
+ /**< Mode of RCU QSBR. RTE_HASH_QSBR_MODE_xxx
+ * '0' for default: create defer queue for reclaim.
+ */
+ uint32_t dq_size;
+ /**< RCU defer queue size.
+ * default: total hash table entries.
+ */
+ uint32_t trigger_reclaim_limit; /**< Threshold to trigger auto reclaim. */
+ uint32_t max_reclaim_size;
+ /**< Max entries to reclaim in one go.
+ * default: RTE_HASH_RCU_DQ_RECLAIM_MAX.
+ */
+ void *key_data_ptr;
+ /**< Pointer passed to the free function. Typically, this is the
+ * pointer to the data structure to which the resource to free
+ * (key-data) belongs. This can be NULL.
+ */
+ rte_hash_free_key_data free_key_data_func;
+ /**< Function to call to free the resource (key-data). */
+};
+
/** @internal A hash table structure. */
struct rte_hash;
* Thread safety can be enabled by setting flag during
* table creation.
* If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
- * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
+ * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and
+ * internal RCU is NOT enabled,
* the key index returned by rte_hash_add_key_xxx APIs will not be
* freed by this API. rte_hash_free_key_with_position API must be called
* additionally to free the index associated with the key.
* Thread safety can be enabled by setting flag during
* table creation.
* If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
- * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
+ * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and
+ * internal RCU is NOT enabled,
* the key index returned by rte_hash_add_key_xxx APIs will not be
* freed by this API. rte_hash_free_key_with_position API must be called
* additionally to free the index associated with the key.
* only be called from one thread by default. Thread safety
* can be enabled by setting flag during table creation.
* If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
- * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
+ * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled and
+ * internal RCU is NOT enabled,
* the key index returned by rte_hash_del_key_xxx APIs must be freed
* using this API. This API should be called after all the readers
* have stopped referencing the entry corresponding to this key.
*/
int32_t
rte_hash_iterate(const struct rte_hash *h, const void **key, void **data, uint32_t *next);
+
+/**
+ * @warning
+ * @b EXPERIMENTAL: this API may change without prior notice
+ *
+ * Associate RCU QSBR variable with a Hash object.
+ * This API should be called to enable the integrated RCU QSBR support and
+ * should be called immediately after creating the Hash object.
+ *
+ * @param h
+ * the hash object to add RCU QSBR
+ * @param cfg
+ * RCU QSBR configuration
+ * @return
+ * On success - 0
+ * On error - 1 with error code set in rte_errno.
+ * Possible rte_errno codes are:
+ * - EINVAL - invalid pointer
+ * - EEXIST - already added QSBR
+ * - ENOMEM - memory allocation failure
+ */
+__rte_experimental
+int rte_hash_rcu_qsbr_add(struct rte_hash *h, struct rte_hash_rcu_config *cfg);
+
#ifdef __cplusplus
}
#endif