From: Pablo de Lara Date: Fri, 30 Oct 2015 14:37:28 +0000 (+0000) Subject: hash: fix scaling by reducing contention X-Git-Tag: spdx-start~8145 X-Git-Url: http://git.droids-corp.org/?a=commitdiff_plain;h=5915699153d7;p=dpdk.git hash: fix scaling by reducing contention If using multiple cores on a system with hardware transactional memory support, thread scaling does not work, as there was a single point in the hash library which is a bottleneck for all threads, which is the "free_slots" ring, which stores all the indices of the free slots in the table. This patch fixes the problem, by creating a local cache per logical core, which stores locally indices of free slots, so most times, writer threads will not interfere each other. Fixes: 48a399119619 ("hash: replace with cuckoo hash implementation") Signed-off-by: Pablo de Lara Acked-by: Bruce Richardson --- diff --git a/app/test/test_hash_scaling.c b/app/test/test_hash_scaling.c index 39602cbc82..744e5e36b7 100644 --- a/app/test/test_hash_scaling.c +++ b/app/test/test_hash_scaling.c @@ -133,6 +133,7 @@ test_hash_scaling(int locking_mode) .hash_func = rte_hash_crc, .hash_func_init_val = 0, .socket_id = rte_socket_id(), + .extra_flag = RTE_HASH_EXTRA_FLAGS_TRANS_MEM_SUPPORT }; struct rte_hash *handle; char name[RTE_HASH_NAMESIZE]; diff --git a/doc/guides/rel_notes/release_2_2.rst b/doc/guides/rel_notes/release_2_2.rst index 52a382d5c6..fa431714cf 100644 --- a/doc/guides/rel_notes/release_2_2.rst +++ b/doc/guides/rel_notes/release_2_2.rst @@ -146,6 +146,11 @@ Drivers Fixed issue when releasing null control queue. +* **hash: Fixed thread scaling by reducing contention.** + + Fixed issue in hash library where, using multiple cores with + hardware transactional memory support, thread scaling did not work, + due to the global ring that is shared by all cores. Libraries ~~~~~~~~~ diff --git a/lib/librte_hash/rte_cuckoo_hash.c b/lib/librte_hash/rte_cuckoo_hash.c index 409fc2e27c..f797aeba3f 100644 --- a/lib/librte_hash/rte_cuckoo_hash.c +++ b/lib/librte_hash/rte_cuckoo_hash.c @@ -96,8 +96,15 @@ EAL_REGISTER_TAILQ(rte_hash_tailq) #define KEY_ALIGNMENT 16 +#define LCORE_CACHE_SIZE 8 + typedef int (*rte_hash_cmp_eq_t)(const void *key1, const void *key2, size_t key_len); +struct lcore_cache { + unsigned len; /**< Cache len */ + void *objs[LCORE_CACHE_SIZE]; /**< Cache objects */ +} __rte_cache_aligned; + /** A hash table structure. */ struct rte_hash { char name[RTE_HASH_NAMESIZE]; /**< Name of the hash. */ @@ -117,6 +124,10 @@ struct rte_hash { struct rte_hash_bucket *buckets; /**< Table with buckets storing all the hash values and key indexes to the key table*/ + uint8_t hw_trans_mem_support; /**< Hardware transactional + memory support */ + struct lcore_cache *local_free_slots; + /**< Local cache per lcore, storing some indexes of the free slots */ } __rte_cache_aligned; /* Structure storing both primary and secondary hashes */ @@ -183,6 +194,8 @@ rte_hash_create(const struct rte_hash_parameters *params) void *k = NULL; void *buckets = NULL; char ring_name[RTE_RING_NAMESIZE]; + unsigned num_key_slots; + unsigned hw_trans_mem_support = 0; unsigned i; hash_list = RTE_TAILQ_CAST(rte_hash_tailq.head, rte_hash_list); @@ -202,6 +215,10 @@ rte_hash_create(const struct rte_hash_parameters *params) return NULL; } + /* Check extra flags field to check extra options. */ + if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_TRANS_MEM_SUPPORT) + hw_trans_mem_support = 1; + snprintf(hash_name, sizeof(hash_name), "HT_%s", params->name); /* Guarantee there's no existing */ @@ -238,7 +255,18 @@ rte_hash_create(const struct rte_hash_parameters *params) const uint32_t key_entry_size = sizeof(struct rte_hash_key) + params->key_len; /* Store all keys and leave the first entry as a dummy entry for lookup_bulk */ - const uint64_t key_tbl_size = (uint64_t) key_entry_size * (params->entries + 1); + if (hw_trans_mem_support) + /* + * Increase number of slots by total number of indices + * that can be stored in the lcore caches + * except for the first cache + */ + num_key_slots = params->entries + (RTE_MAX_LCORE - 1) * + LCORE_CACHE_SIZE + 1; + else + num_key_slots = params->entries + 1; + + const uint64_t key_tbl_size = (uint64_t) key_entry_size * num_key_slots; k = rte_zmalloc_socket(NULL, key_tbl_size, RTE_CACHE_LINE_SIZE, params->socket_id); @@ -288,13 +316,19 @@ rte_hash_create(const struct rte_hash_parameters *params) #endif snprintf(ring_name, sizeof(ring_name), "HT_%s", params->name); - r = rte_ring_create(ring_name, rte_align32pow2(params->entries + 1), - params->socket_id, 0); + r = rte_ring_create(ring_name, rte_align32pow2(num_key_slots), + params->socket_id, 0); if (r == NULL) { RTE_LOG(ERR, HASH, "memory allocation failed\n"); goto err; } + if (hw_trans_mem_support) { + h->local_free_slots = rte_zmalloc_socket(NULL, + sizeof(struct lcore_cache) * RTE_MAX_LCORE, + RTE_CACHE_LINE_SIZE, params->socket_id); + } + /* Setup hash context */ snprintf(h->name, sizeof(h->name), "%s", params->name); h->entries = params->entries; @@ -307,9 +341,9 @@ rte_hash_create(const struct rte_hash_parameters *params) h->buckets = buckets; h->hash_func = (params->hash_func == NULL) ? DEFAULT_HASH_FUNC : params->hash_func; - h->key_store = k; h->free_slots = r; + h->hw_trans_mem_support = hw_trans_mem_support; /* populate the free slots ring. Entry zero is reserved for key misses */ for (i = 1; i < params->entries + 1; i++) @@ -357,6 +391,9 @@ rte_hash_free(struct rte_hash *h) rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK); + if (h->hw_trans_mem_support) + rte_free(h->local_free_slots); + rte_ring_free(h->free_slots); rte_free(h->key_store); rte_free(h->buckets); @@ -402,6 +439,12 @@ rte_hash_reset(struct rte_hash *h) /* Repopulate the free slots ring. Entry zero is reserved for key misses */ for (i = 1; i < h->entries + 1; i++) rte_ring_sp_enqueue(h->free_slots, (void *)((uintptr_t) i)); + + if (h->hw_trans_mem_support) { + /* Reset local caches per lcore */ + for (i = 0; i < RTE_MAX_LCORE; i++) + h->local_free_slots[i].len = 0; + } } /* Search for an entry that can be pushed to its alternative location */ @@ -468,6 +511,23 @@ make_space_bucket(const struct rte_hash *h, struct rte_hash_bucket *bkt) } +/* + * Function called to enqueue back an index in the cache/ring, + * as slot has not being used and it can be used in the + * next addition attempt. + */ +static inline void +enqueue_slot_back(const struct rte_hash *h, + struct lcore_cache *cached_free_slots, + void *slot_id) +{ + if (h->hw_trans_mem_support) { + cached_free_slots->objs[cached_free_slots->len] = slot_id; + cached_free_slots->len++; + } else + rte_ring_sp_enqueue(h->free_slots, slot_id); +} + static inline int32_t __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, hash_sig_t sig, void *data) @@ -477,9 +537,12 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, unsigned i; struct rte_hash_bucket *prim_bkt, *sec_bkt; struct rte_hash_key *new_k, *k, *keys = h->key_store; - void *slot_id; + void *slot_id = NULL; uint32_t new_idx; int ret; + unsigned n_slots; + unsigned lcore_id; + struct lcore_cache *cached_free_slots = NULL; prim_bucket_idx = sig & h->bucket_bitmask; prim_bkt = &h->buckets[prim_bucket_idx]; @@ -491,8 +554,28 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, rte_prefetch0(sec_bkt); /* Get a new slot for storing the new key */ - if (rte_ring_sc_dequeue(h->free_slots, &slot_id) != 0) - return -ENOSPC; + if (h->hw_trans_mem_support) { + lcore_id = rte_lcore_id(); + cached_free_slots = &h->local_free_slots[lcore_id]; + /* Try to get a free slot from the local cache */ + if (cached_free_slots->len == 0) { + /* Need to get another burst of free slots from global ring */ + n_slots = rte_ring_mc_dequeue_burst(h->free_slots, + cached_free_slots->objs, LCORE_CACHE_SIZE); + if (n_slots == 0) + return -ENOSPC; + + cached_free_slots->len += n_slots; + } + + /* Get a free slot from the local cache */ + cached_free_slots->len--; + slot_id = cached_free_slots->objs[cached_free_slots->len]; + } else { + if (rte_ring_sc_dequeue(h->free_slots, &slot_id) != 0) + return -ENOSPC; + } + new_k = RTE_PTR_ADD(keys, (uintptr_t)slot_id * h->key_entry_size); rte_prefetch0(new_k); new_idx = (uint32_t)((uintptr_t) slot_id); @@ -500,11 +583,12 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, /* Check if key is already inserted in primary location */ for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) { if (prim_bkt->signatures[i].current == sig && - prim_bkt->signatures[i].alt == alt_hash) { + prim_bkt->signatures[i].alt == alt_hash) { k = (struct rte_hash_key *) ((char *)keys + prim_bkt->key_idx[i] * h->key_entry_size); if (h->rte_hash_cmp_eq(key, k->key, h->key_len) == 0) { - rte_ring_sp_enqueue(h->free_slots, slot_id); + /* Enqueue index of free slot back in the ring. */ + enqueue_slot_back(h, cached_free_slots, slot_id); /* Update data */ k->pdata = data; /* @@ -519,11 +603,12 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, /* Check if key is already inserted in secondary location */ for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) { if (sec_bkt->signatures[i].alt == sig && - sec_bkt->signatures[i].current == alt_hash) { + sec_bkt->signatures[i].current == alt_hash) { k = (struct rte_hash_key *) ((char *)keys + sec_bkt->key_idx[i] * h->key_entry_size); if (h->rte_hash_cmp_eq(key, k->key, h->key_len) == 0) { - rte_ring_sp_enqueue(h->free_slots, slot_id); + /* Enqueue index of free slot back in the ring. */ + enqueue_slot_back(h, cached_free_slots, slot_id); /* Update data */ k->pdata = data; /* @@ -566,10 +651,9 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, } /* Error in addition, store new slot back in the ring and return error */ - rte_ring_sp_enqueue(h->free_slots, - (void *)((uintptr_t) new_idx)); - return ret; + enqueue_slot_back(h, cached_free_slots, (void *)((uintptr_t) new_idx)); + return ret; } int32_t @@ -701,6 +785,34 @@ rte_hash_lookup_data(const struct rte_hash *h, const void *key, void **data) return __rte_hash_lookup_with_hash(h, key, rte_hash_hash(h, key), data); } +static inline void +remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i) +{ + unsigned lcore_id, n_slots; + struct lcore_cache *cached_free_slots; + + bkt->signatures[i].sig = NULL_SIGNATURE; + if (h->hw_trans_mem_support) { + lcore_id = rte_lcore_id(); + cached_free_slots = &h->local_free_slots[lcore_id]; + /* Cache full, need to free it. */ + if (cached_free_slots->len == LCORE_CACHE_SIZE) { + /* Need to enqueue the free slots in global ring. */ + n_slots = rte_ring_mp_enqueue_burst(h->free_slots, + cached_free_slots->objs, + LCORE_CACHE_SIZE); + cached_free_slots->len -= n_slots; + } + /* Put index of new free slot in cache. */ + cached_free_slots->objs[cached_free_slots->len] = + (void *)((uintptr_t)bkt->key_idx[i]); + cached_free_slots->len++; + } else { + rte_ring_sp_enqueue(h->free_slots, + (void *)((uintptr_t)bkt->key_idx[i])); + } +} + static inline int32_t __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key, hash_sig_t sig) @@ -721,9 +833,8 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key, k = (struct rte_hash_key *) ((char *)keys + bkt->key_idx[i] * h->key_entry_size); if (h->rte_hash_cmp_eq(key, k->key, h->key_len) == 0) { - bkt->signatures[i].sig = NULL_SIGNATURE; - rte_ring_sp_enqueue(h->free_slots, - (void *)((uintptr_t)bkt->key_idx[i])); + remove_entry(h, bkt, i); + /* * Return index where key is stored, * substracting the first dummy index @@ -745,9 +856,8 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key, k = (struct rte_hash_key *) ((char *)keys + bkt->key_idx[i] * h->key_entry_size); if (h->rte_hash_cmp_eq(key, k->key, h->key_len) == 0) { - bkt->signatures[i].sig = NULL_SIGNATURE; - rte_ring_sp_enqueue(h->free_slots, - (void *)((uintptr_t)bkt->key_idx[i])); + remove_entry(h, bkt, i); + /* * Return index where key is stored, * substracting the first dummy index diff --git a/lib/librte_hash/rte_hash.h b/lib/librte_hash/rte_hash.h index 175c0bbb89..b678766025 100644 --- a/lib/librte_hash/rte_hash.h +++ b/lib/librte_hash/rte_hash.h @@ -56,6 +56,9 @@ extern "C" { #define RTE_HASH_LOOKUP_BULK_MAX 64 #define RTE_HASH_LOOKUP_MULTI_MAX RTE_HASH_LOOKUP_BULK_MAX +/** Enable Hardware transactional memory support. */ +#define RTE_HASH_EXTRA_FLAGS_TRANS_MEM_SUPPORT 0x01 + /** Signature of key that is stored internally. */ typedef uint32_t hash_sig_t;