#define KEY_ALIGNMENT 16
+#define LCORE_CACHE_SIZE 8
+
typedef int (*rte_hash_cmp_eq_t)(const void *key1, const void *key2, size_t key_len);
+struct lcore_cache {
+ unsigned len; /**< Cache len */
+ void *objs[LCORE_CACHE_SIZE]; /**< Cache objects */
+} __rte_cache_aligned;
+
/** A hash table structure. */
struct rte_hash {
char name[RTE_HASH_NAMESIZE]; /**< Name of the hash. */
struct rte_hash_bucket *buckets; /**< Table with buckets storing all the
hash values and key indexes
to the key table*/
+ uint8_t hw_trans_mem_support; /**< Hardware transactional
+ memory support */
+ struct lcore_cache *local_free_slots;
+ /**< Local cache per lcore, storing some indexes of the free slots */
} __rte_cache_aligned;
/* Structure storing both primary and secondary hashes */
void *k = NULL;
void *buckets = NULL;
char ring_name[RTE_RING_NAMESIZE];
+ unsigned num_key_slots;
+ unsigned hw_trans_mem_support = 0;
unsigned i;
hash_list = RTE_TAILQ_CAST(rte_hash_tailq.head, rte_hash_list);
return NULL;
}
+ /* Check extra flags field to check extra options. */
+ if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_TRANS_MEM_SUPPORT)
+ hw_trans_mem_support = 1;
+
snprintf(hash_name, sizeof(hash_name), "HT_%s", params->name);
/* Guarantee there's no existing */
const uint32_t key_entry_size = sizeof(struct rte_hash_key) + params->key_len;
/* Store all keys and leave the first entry as a dummy entry for lookup_bulk */
- const uint64_t key_tbl_size = (uint64_t) key_entry_size * (params->entries + 1);
+ if (hw_trans_mem_support)
+ /*
+ * Increase number of slots by total number of indices
+ * that can be stored in the lcore caches
+ * except for the first cache
+ */
+ num_key_slots = params->entries + (RTE_MAX_LCORE - 1) *
+ LCORE_CACHE_SIZE + 1;
+ else
+ num_key_slots = params->entries + 1;
+
+ const uint64_t key_tbl_size = (uint64_t) key_entry_size * num_key_slots;
k = rte_zmalloc_socket(NULL, key_tbl_size,
RTE_CACHE_LINE_SIZE, params->socket_id);
#endif
snprintf(ring_name, sizeof(ring_name), "HT_%s", params->name);
- r = rte_ring_create(ring_name, rte_align32pow2(params->entries + 1),
- params->socket_id, 0);
+ r = rte_ring_create(ring_name, rte_align32pow2(num_key_slots),
+ params->socket_id, 0);
if (r == NULL) {
RTE_LOG(ERR, HASH, "memory allocation failed\n");
goto err;
}
+ if (hw_trans_mem_support) {
+ h->local_free_slots = rte_zmalloc_socket(NULL,
+ sizeof(struct lcore_cache) * RTE_MAX_LCORE,
+ RTE_CACHE_LINE_SIZE, params->socket_id);
+ }
+
/* Setup hash context */
snprintf(h->name, sizeof(h->name), "%s", params->name);
h->entries = params->entries;
h->buckets = buckets;
h->hash_func = (params->hash_func == NULL) ?
DEFAULT_HASH_FUNC : params->hash_func;
-
h->key_store = k;
h->free_slots = r;
+ h->hw_trans_mem_support = hw_trans_mem_support;
/* populate the free slots ring. Entry zero is reserved for key misses */
for (i = 1; i < params->entries + 1; i++)
rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
+ if (h->hw_trans_mem_support)
+ rte_free(h->local_free_slots);
+
rte_ring_free(h->free_slots);
rte_free(h->key_store);
rte_free(h->buckets);
/* Repopulate the free slots ring. Entry zero is reserved for key misses */
for (i = 1; i < h->entries + 1; i++)
rte_ring_sp_enqueue(h->free_slots, (void *)((uintptr_t) i));
+
+ if (h->hw_trans_mem_support) {
+ /* Reset local caches per lcore */
+ for (i = 0; i < RTE_MAX_LCORE; i++)
+ h->local_free_slots[i].len = 0;
+ }
}
/* Search for an entry that can be pushed to its alternative location */
}
+/*
+ * Function called to enqueue back an index in the cache/ring,
+ * as slot has not being used and it can be used in the
+ * next addition attempt.
+ */
+static inline void
+enqueue_slot_back(const struct rte_hash *h,
+ struct lcore_cache *cached_free_slots,
+ void *slot_id)
+{
+ if (h->hw_trans_mem_support) {
+ cached_free_slots->objs[cached_free_slots->len] = slot_id;
+ cached_free_slots->len++;
+ } else
+ rte_ring_sp_enqueue(h->free_slots, slot_id);
+}
+
static inline int32_t
__rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
hash_sig_t sig, void *data)
unsigned i;
struct rte_hash_bucket *prim_bkt, *sec_bkt;
struct rte_hash_key *new_k, *k, *keys = h->key_store;
- void *slot_id;
+ void *slot_id = NULL;
uint32_t new_idx;
int ret;
+ unsigned n_slots;
+ unsigned lcore_id;
+ struct lcore_cache *cached_free_slots = NULL;
prim_bucket_idx = sig & h->bucket_bitmask;
prim_bkt = &h->buckets[prim_bucket_idx];
rte_prefetch0(sec_bkt);
/* Get a new slot for storing the new key */
- if (rte_ring_sc_dequeue(h->free_slots, &slot_id) != 0)
- return -ENOSPC;
+ if (h->hw_trans_mem_support) {
+ lcore_id = rte_lcore_id();
+ cached_free_slots = &h->local_free_slots[lcore_id];
+ /* Try to get a free slot from the local cache */
+ if (cached_free_slots->len == 0) {
+ /* Need to get another burst of free slots from global ring */
+ n_slots = rte_ring_mc_dequeue_burst(h->free_slots,
+ cached_free_slots->objs, LCORE_CACHE_SIZE);
+ if (n_slots == 0)
+ return -ENOSPC;
+
+ cached_free_slots->len += n_slots;
+ }
+
+ /* Get a free slot from the local cache */
+ cached_free_slots->len--;
+ slot_id = cached_free_slots->objs[cached_free_slots->len];
+ } else {
+ if (rte_ring_sc_dequeue(h->free_slots, &slot_id) != 0)
+ return -ENOSPC;
+ }
+
new_k = RTE_PTR_ADD(keys, (uintptr_t)slot_id * h->key_entry_size);
rte_prefetch0(new_k);
new_idx = (uint32_t)((uintptr_t) slot_id);
/* Check if key is already inserted in primary location */
for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
if (prim_bkt->signatures[i].current == sig &&
- prim_bkt->signatures[i].alt == alt_hash) {
+ prim_bkt->signatures[i].alt == alt_hash) {
k = (struct rte_hash_key *) ((char *)keys +
prim_bkt->key_idx[i] * h->key_entry_size);
if (h->rte_hash_cmp_eq(key, k->key, h->key_len) == 0) {
- rte_ring_sp_enqueue(h->free_slots, slot_id);
+ /* Enqueue index of free slot back in the ring. */
+ enqueue_slot_back(h, cached_free_slots, slot_id);
/* Update data */
k->pdata = data;
/*
/* Check if key is already inserted in secondary location */
for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
if (sec_bkt->signatures[i].alt == sig &&
- sec_bkt->signatures[i].current == alt_hash) {
+ sec_bkt->signatures[i].current == alt_hash) {
k = (struct rte_hash_key *) ((char *)keys +
sec_bkt->key_idx[i] * h->key_entry_size);
if (h->rte_hash_cmp_eq(key, k->key, h->key_len) == 0) {
- rte_ring_sp_enqueue(h->free_slots, slot_id);
+ /* Enqueue index of free slot back in the ring. */
+ enqueue_slot_back(h, cached_free_slots, slot_id);
/* Update data */
k->pdata = data;
/*
}
/* Error in addition, store new slot back in the ring and return error */
- rte_ring_sp_enqueue(h->free_slots,
- (void *)((uintptr_t) new_idx));
- return ret;
+ enqueue_slot_back(h, cached_free_slots, (void *)((uintptr_t) new_idx));
+ return ret;
}
int32_t
return __rte_hash_lookup_with_hash(h, key, rte_hash_hash(h, key), data);
}
+static inline void
+remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i)
+{
+ unsigned lcore_id, n_slots;
+ struct lcore_cache *cached_free_slots;
+
+ bkt->signatures[i].sig = NULL_SIGNATURE;
+ if (h->hw_trans_mem_support) {
+ lcore_id = rte_lcore_id();
+ cached_free_slots = &h->local_free_slots[lcore_id];
+ /* Cache full, need to free it. */
+ if (cached_free_slots->len == LCORE_CACHE_SIZE) {
+ /* Need to enqueue the free slots in global ring. */
+ n_slots = rte_ring_mp_enqueue_burst(h->free_slots,
+ cached_free_slots->objs,
+ LCORE_CACHE_SIZE);
+ cached_free_slots->len -= n_slots;
+ }
+ /* Put index of new free slot in cache. */
+ cached_free_slots->objs[cached_free_slots->len] =
+ (void *)((uintptr_t)bkt->key_idx[i]);
+ cached_free_slots->len++;
+ } else {
+ rte_ring_sp_enqueue(h->free_slots,
+ (void *)((uintptr_t)bkt->key_idx[i]));
+ }
+}
+
static inline int32_t
__rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key,
hash_sig_t sig)
k = (struct rte_hash_key *) ((char *)keys +
bkt->key_idx[i] * h->key_entry_size);
if (h->rte_hash_cmp_eq(key, k->key, h->key_len) == 0) {
- bkt->signatures[i].sig = NULL_SIGNATURE;
- rte_ring_sp_enqueue(h->free_slots,
- (void *)((uintptr_t)bkt->key_idx[i]));
+ remove_entry(h, bkt, i);
+
/*
* Return index where key is stored,
* substracting the first dummy index
k = (struct rte_hash_key *) ((char *)keys +
bkt->key_idx[i] * h->key_entry_size);
if (h->rte_hash_cmp_eq(key, k->key, h->key_len) == 0) {
- bkt->signatures[i].sig = NULL_SIGNATURE;
- rte_ring_sp_enqueue(h->free_slots,
- (void *)((uintptr_t)bkt->key_idx[i]));
+ remove_entry(h, bkt, i);
+
/*
* Return index where key is stored,
* substracting the first dummy index