X-Git-Url: http://git.droids-corp.org/?a=blobdiff_plain;f=lib%2Flibrte_hash%2Frte_cuckoo_hash.c;h=6af8ca42e94f38ffcf57c126ed6222f631252581;hb=61669ecb921fff821db11d81557fa0449a7eb79c;hp=c55a4f26323302b4fc9c82ff9566b69cca0499fc;hpb=606bd11736a210aa79fe5da4ea0cb3a9cbcde4a0;p=dpdk.git diff --git a/lib/librte_hash/rte_cuckoo_hash.c b/lib/librte_hash/rte_cuckoo_hash.c index c55a4f2632..6af8ca42e9 100644 --- a/lib/librte_hash/rte_cuckoo_hash.c +++ b/lib/librte_hash/rte_cuckoo_hash.c @@ -24,8 +24,10 @@ #include #include #include -#include +#include #include +#include +#include #include "rte_hash.h" #include "rte_cuckoo_hash.h" @@ -51,13 +53,13 @@ rte_hash_find_existing(const char *name) hash_list = RTE_TAILQ_CAST(rte_hash_tailq.head, rte_hash_list); - rte_rwlock_read_lock(RTE_EAL_TAILQ_RWLOCK); + rte_mcfg_tailq_read_lock(); TAILQ_FOREACH(te, hash_list, next) { h = (struct rte_hash *) te->data; if (strncmp(name, h->name, RTE_HASH_NAMESIZE) == 0) break; } - rte_rwlock_read_unlock(RTE_EAL_TAILQ_RWLOCK); + rte_mcfg_tailq_read_unlock(); if (te == NULL) { rte_errno = ENOENT; @@ -134,14 +136,15 @@ rte_hash_create(const struct rte_hash_parameters *params) char ring_name[RTE_RING_NAMESIZE]; char ext_ring_name[RTE_RING_NAMESIZE]; unsigned num_key_slots; - unsigned i; unsigned int hw_trans_mem_support = 0, use_local_cache = 0; unsigned int ext_table_support = 0; unsigned int readwrite_concur_support = 0; unsigned int writer_takes_lock = 0; unsigned int no_free_on_del = 0; + uint32_t *ext_bkt_to_free = NULL; uint32_t *tbl_chng_cnt = NULL; unsigned int readwrite_concur_lf_support = 0; + uint32_t i; rte_hash_function default_hash_func = (rte_hash_function)rte_jhash; @@ -170,15 +173,6 @@ rte_hash_create(const struct rte_hash_parameters *params) return NULL; } - if ((params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF) && - (params->extra_flag & RTE_HASH_EXTRA_FLAGS_EXT_TABLE)) { - rte_errno = EINVAL; - RTE_LOG(ERR, HASH, "rte_hash_create: extendable bucket " - "feature not supported with rw concurrency " - "lock free\n"); - return NULL; - } - /* Check extra flags field to check extra options. */ if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_TRANS_MEM_SUPPORT) hw_trans_mem_support = 1; @@ -219,8 +213,8 @@ rte_hash_create(const struct rte_hash_parameters *params) snprintf(ring_name, sizeof(ring_name), "HT_%s", params->name); /* Create ring (Dummy slot index is not enqueued) */ - r = rte_ring_create(ring_name, rte_align32pow2(num_key_slots), - params->socket_id, 0); + r = rte_ring_create_elem(ring_name, sizeof(uint32_t), + rte_align32pow2(num_key_slots), params->socket_id, 0); if (r == NULL) { RTE_LOG(ERR, HASH, "memory allocation failed\n"); goto err; @@ -233,7 +227,7 @@ rte_hash_create(const struct rte_hash_parameters *params) if (ext_table_support) { snprintf(ext_ring_name, sizeof(ext_ring_name), "HT_EXT_%s", params->name); - r_ext = rte_ring_create(ext_ring_name, + r_ext = rte_ring_create_elem(ext_ring_name, sizeof(uint32_t), rte_align32pow2(num_buckets + 1), params->socket_id, 0); @@ -246,7 +240,7 @@ rte_hash_create(const struct rte_hash_parameters *params) snprintf(hash_name, sizeof(hash_name), "HT_%s", params->name); - rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK); + rte_mcfg_tailq_write_lock(); /* guarantee there's no existing: this is normally already checked * by ring creation above */ @@ -301,7 +295,17 @@ rte_hash_create(const struct rte_hash_parameters *params) * for next bucket */ for (i = 1; i <= num_buckets; i++) - rte_ring_sp_enqueue(r_ext, (void *)((uintptr_t) i)); + rte_ring_sp_enqueue_elem(r_ext, &i, sizeof(uint32_t)); + + if (readwrite_concur_lf_support) { + ext_bkt_to_free = rte_zmalloc(NULL, sizeof(uint32_t) * + num_key_slots, 0); + if (ext_bkt_to_free == NULL) { + RTE_LOG(ERR, HASH, "ext bkt to free memory allocation " + "failed\n"); + goto err_unlock; + } + } } const uint32_t key_entry_size = @@ -378,7 +382,7 @@ rte_hash_create(const struct rte_hash_parameters *params) default_hash_func = (rte_hash_function)rte_hash_crc; #endif /* Setup hash context */ - snprintf(h->name, sizeof(h->name), "%s", params->name); + strlcpy(h->name, params->name, sizeof(h->name)); h->entries = params->entries; h->key_len = params->key_len; h->key_entry_size = key_entry_size; @@ -393,6 +397,7 @@ rte_hash_create(const struct rte_hash_parameters *params) default_hash_func : params->hash_func; h->key_store = k; h->free_slots = r; + h->ext_bkt_to_free = ext_bkt_to_free; h->tbl_chng_cnt = tbl_chng_cnt; *h->tbl_chng_cnt = 0; h->hw_trans_mem_support = hw_trans_mem_support; @@ -407,6 +412,10 @@ rte_hash_create(const struct rte_hash_parameters *params) if (rte_cpu_get_flag_enabled(RTE_CPUFLAG_SSE2)) h->sig_cmp_fn = RTE_HASH_COMPARE_SSE; else +#elif defined(RTE_ARCH_ARM64) + if (rte_cpu_get_flag_enabled(RTE_CPUFLAG_NEON)) + h->sig_cmp_fn = RTE_HASH_COMPARE_NEON; + else #endif h->sig_cmp_fn = RTE_HASH_COMPARE_SCALAR; @@ -425,15 +434,15 @@ rte_hash_create(const struct rte_hash_parameters *params) /* Populate free slots ring. Entry zero is reserved for key misses. */ for (i = 1; i < num_key_slots; i++) - rte_ring_sp_enqueue(r, (void *)((uintptr_t) i)); + rte_ring_sp_enqueue_elem(r, &i, sizeof(uint32_t)); te->data = (void *) h; TAILQ_INSERT_TAIL(hash_list, te, next); - rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK); + rte_mcfg_tailq_write_unlock(); return h; err_unlock: - rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK); + rte_mcfg_tailq_write_unlock(); err: rte_ring_free(r); rte_ring_free(r_ext); @@ -443,6 +452,7 @@ err: rte_free(buckets_ext); rte_free(k); rte_free(tbl_chng_cnt); + rte_free(ext_bkt_to_free); return NULL; } @@ -457,7 +467,7 @@ rte_hash_free(struct rte_hash *h) hash_list = RTE_TAILQ_CAST(rte_hash_tailq.head, rte_hash_list); - rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK); + rte_mcfg_tailq_write_lock(); /* find out tailq entry */ TAILQ_FOREACH(te, hash_list, next) { @@ -466,13 +476,13 @@ rte_hash_free(struct rte_hash *h) } if (te == NULL) { - rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK); + rte_mcfg_tailq_write_unlock(); return; } TAILQ_REMOVE(hash_list, te, next); - rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK); + rte_mcfg_tailq_write_unlock(); if (h->use_local_cache) rte_free(h->local_free_slots); @@ -484,6 +494,7 @@ rte_hash_free(struct rte_hash *h) rte_free(h->buckets); rte_free(h->buckets_ext); rte_free(h->tbl_chng_cnt); + rte_free(h->ext_bkt_to_free); rte_free(h); rte_free(te); } @@ -495,6 +506,21 @@ rte_hash_hash(const struct rte_hash *h, const void *key) return h->hash_func(key, h->key_len, h->hash_func_init_val); } +int32_t +rte_hash_max_key_id(const struct rte_hash *h) +{ + RETURN_IF_TRUE((h == NULL), -EINVAL); + if (h->use_local_cache) + /* + * Increase number of slots by total number of indices + * that can be stored in the lcore caches + */ + return (h->entries + ((RTE_MAX_LCORE - 1) * + (LCORE_CACHE_SIZE - 1))); + else + return h->entries; +} + int32_t rte_hash_count(const struct rte_hash *h) { @@ -559,7 +585,6 @@ __hash_rw_reader_unlock(const struct rte_hash *h) void rte_hash_reset(struct rte_hash *h) { - void *ptr; uint32_t tot_ring_cnt, i; if (h == NULL) @@ -570,16 +595,14 @@ rte_hash_reset(struct rte_hash *h) memset(h->key_store, 0, h->key_entry_size * (h->entries + 1)); *h->tbl_chng_cnt = 0; - /* clear the free ring */ - while (rte_ring_dequeue(h->free_slots, &ptr) == 0) - continue; + /* reset the free ring */ + rte_ring_reset(h->free_slots); - /* clear free extendable bucket ring and memory */ + /* flush free extendable bucket ring and memory */ if (h->ext_table_support) { memset(h->buckets_ext, 0, h->num_buckets * sizeof(struct rte_hash_bucket)); - while (rte_ring_dequeue(h->free_ext_bkts, &ptr) == 0) - continue; + rte_ring_reset(h->free_ext_bkts); } /* Repopulate the free slots ring. Entry zero is reserved for key misses */ @@ -590,13 +613,13 @@ rte_hash_reset(struct rte_hash *h) tot_ring_cnt = h->entries; for (i = 1; i < tot_ring_cnt + 1; i++) - rte_ring_sp_enqueue(h->free_slots, (void *)((uintptr_t) i)); + rte_ring_sp_enqueue_elem(h->free_slots, &i, sizeof(uint32_t)); /* Repopulate the free ext bkt ring. */ if (h->ext_table_support) { for (i = 1; i <= h->num_buckets; i++) - rte_ring_sp_enqueue(h->free_ext_bkts, - (void *)((uintptr_t) i)); + rte_ring_sp_enqueue_elem(h->free_ext_bkts, &i, + sizeof(uint32_t)); } if (h->use_local_cache) { @@ -615,13 +638,14 @@ rte_hash_reset(struct rte_hash *h) static inline void enqueue_slot_back(const struct rte_hash *h, struct lcore_cache *cached_free_slots, - void *slot_id) + uint32_t slot_id) { if (h->use_local_cache) { cached_free_slots->objs[cached_free_slots->len] = slot_id; cached_free_slots->len++; } else - rte_ring_sp_enqueue(h->free_slots, slot_id); + rte_ring_sp_enqueue_elem(h->free_slots, &slot_id, + sizeof(uint32_t)); } /* Search a key from bucket and update its data. @@ -639,9 +663,11 @@ search_and_update(const struct rte_hash *h, void *data, const void *key, k = (struct rte_hash_key *) ((char *)keys + bkt->key_idx[i] * h->key_entry_size); if (rte_hash_cmp_eq(key, k->key, h) == 0) { - /* 'pdata' acts as the synchronization point - * when an existing hash entry is updated. - * Key is not updated in this case. + /* The store to application data at *data + * should not leak after the store to pdata + * in the key store. i.e. pdata is the guard + * variable. Release the application data + * to the readers. */ __atomic_store_n(&k->pdata, data, @@ -701,11 +727,10 @@ rte_hash_cuckoo_insert_mw(const struct rte_hash *h, /* Check if slot is available */ if (likely(prim_bkt->key_idx[i] == EMPTY_SLOT)) { prim_bkt->sig_current[i] = sig; - /* Key can be of arbitrary length, so it is - * not possible to store it atomically. - * Hence the new key element's memory stores - * (key as well as data) should be complete - * before it is referenced. + /* Store to signature and key should not + * leak after the store to key_idx. i.e. + * key_idx is the guard variable for signature + * and key. */ __atomic_store_n(&prim_bkt->key_idx[i], new_idx, @@ -799,7 +824,7 @@ rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h, __atomic_store_n(h->tbl_chng_cnt, *h->tbl_chng_cnt + 1, __ATOMIC_RELEASE); - /* The stores to sig_alt and sig_current should not + /* The store to sig_current should not * move above the store to tbl_chng_cnt. */ __atomic_thread_fence(__ATOMIC_RELEASE); @@ -831,7 +856,7 @@ rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h, __atomic_store_n(h->tbl_chng_cnt, *h->tbl_chng_cnt + 1, __ATOMIC_RELEASE); - /* The stores to sig_alt and sig_current should not + /* The store to sig_current should not * move above the store to tbl_chng_cnt. */ __atomic_thread_fence(__ATOMIC_RELEASE); @@ -914,9 +939,8 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, uint32_t prim_bucket_idx, sec_bucket_idx; struct rte_hash_bucket *prim_bkt, *sec_bkt, *cur_bkt; struct rte_hash_key *new_k, *keys = h->key_store; - void *slot_id = NULL; - void *ext_bkt_id = NULL; - uint32_t new_idx, bkt_id; + uint32_t slot_id; + uint32_t ext_bkt_id; int ret; unsigned n_slots; unsigned lcore_id; @@ -959,8 +983,9 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, /* Try to get a free slot from the local cache */ if (cached_free_slots->len == 0) { /* Need to get another burst of free slots from global ring */ - n_slots = rte_ring_mc_dequeue_burst(h->free_slots, + n_slots = rte_ring_mc_dequeue_burst_elem(h->free_slots, cached_free_slots->objs, + sizeof(uint32_t), LCORE_CACHE_SIZE, NULL); if (n_slots == 0) { return -ENOSPC; @@ -973,30 +998,28 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, cached_free_slots->len--; slot_id = cached_free_slots->objs[cached_free_slots->len]; } else { - if (rte_ring_sc_dequeue(h->free_slots, &slot_id) != 0) { + if (rte_ring_sc_dequeue_elem(h->free_slots, &slot_id, + sizeof(uint32_t)) != 0) { return -ENOSPC; } } - new_k = RTE_PTR_ADD(keys, (uintptr_t)slot_id * h->key_entry_size); - new_idx = (uint32_t)((uintptr_t) slot_id); - /* Copy key */ - memcpy(new_k->key, key, h->key_len); - /* Key can be of arbitrary length, so it is not possible to store - * it atomically. Hence the new key element's memory stores - * (key as well as data) should be complete before it is referenced. - * 'pdata' acts as the synchronization point when an existing hash - * entry is updated. + new_k = RTE_PTR_ADD(keys, slot_id * h->key_entry_size); + /* The store to application data (by the application) at *data should + * not leak after the store of pdata in the key store. i.e. pdata is + * the guard variable. Release the application data to the readers. */ __atomic_store_n(&new_k->pdata, data, __ATOMIC_RELEASE); + /* Copy key */ + memcpy(new_k->key, key, h->key_len); /* Find an empty slot and insert */ ret = rte_hash_cuckoo_insert_mw(h, prim_bkt, sec_bkt, key, data, - short_sig, new_idx, &ret_val); + short_sig, slot_id, &ret_val); if (ret == 0) - return new_idx - 1; + return slot_id - 1; else if (ret == 1) { enqueue_slot_back(h, cached_free_slots, slot_id); return ret_val; @@ -1004,9 +1027,9 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, /* Primary bucket full, need to make space for new entry */ ret = rte_hash_cuckoo_make_space_mw(h, prim_bkt, sec_bkt, key, data, - short_sig, prim_bucket_idx, new_idx, &ret_val); + short_sig, prim_bucket_idx, slot_id, &ret_val); if (ret == 0) - return new_idx - 1; + return slot_id - 1; else if (ret == 1) { enqueue_slot_back(h, cached_free_slots, slot_id); return ret_val; @@ -1014,10 +1037,10 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, /* Also search secondary bucket to get better occupancy */ ret = rte_hash_cuckoo_make_space_mw(h, sec_bkt, prim_bkt, key, data, - short_sig, sec_bucket_idx, new_idx, &ret_val); + short_sig, sec_bucket_idx, slot_id, &ret_val); if (ret == 0) - return new_idx - 1; + return slot_id - 1; else if (ret == 1) { enqueue_slot_back(h, cached_free_slots, slot_id); return ret_val; @@ -1054,9 +1077,16 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, /* Check if slot is available */ if (likely(cur_bkt->key_idx[i] == EMPTY_SLOT)) { cur_bkt->sig_current[i] = short_sig; - cur_bkt->key_idx[i] = new_idx; + /* Store to signature and key should not + * leak after the store to key_idx. i.e. + * key_idx is the guard variable for signature + * and key. + */ + __atomic_store_n(&cur_bkt->key_idx[i], + slot_id, + __ATOMIC_RELEASE); __hash_rw_writer_unlock(h); - return new_idx - 1; + return slot_id - 1; } } } @@ -1064,20 +1094,26 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, /* Failed to get an empty entry from extendable buckets. Link a new * extendable bucket. We first get a free bucket from ring. */ - if (rte_ring_sc_dequeue(h->free_ext_bkts, &ext_bkt_id) != 0) { + if (rte_ring_sc_dequeue_elem(h->free_ext_bkts, &ext_bkt_id, + sizeof(uint32_t)) != 0) { ret = -ENOSPC; goto failure; } - bkt_id = (uint32_t)((uintptr_t)ext_bkt_id) - 1; /* Use the first location of the new bucket */ - (h->buckets_ext[bkt_id]).sig_current[0] = short_sig; - (h->buckets_ext[bkt_id]).key_idx[0] = new_idx; + (h->buckets_ext[ext_bkt_id - 1]).sig_current[0] = short_sig; + /* Store to signature and key should not leak after + * the store to key_idx. i.e. key_idx is the guard variable + * for signature and key. + */ + __atomic_store_n(&(h->buckets_ext[ext_bkt_id - 1]).key_idx[0], + slot_id, + __ATOMIC_RELEASE); /* Link the new bucket to sec bucket linked list */ last = rte_hash_get_last_bkt(sec_bkt); - last->next = &h->buckets_ext[bkt_id]; + last->next = &h->buckets_ext[ext_bkt_id - 1]; __hash_rw_writer_unlock(h); - return new_idx - 1; + return slot_id - 1; failure: __hash_rw_writer_unlock(h); @@ -1164,26 +1200,35 @@ search_one_bucket_lf(const struct rte_hash *h, const void *key, uint16_t sig, { int i; uint32_t key_idx; - void *pdata; struct rte_hash_key *k, *keys = h->key_store; for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) { - key_idx = __atomic_load_n(&bkt->key_idx[i], + /* Signature comparison is done before the acquire-load + * of the key index to achieve better performance. + * This can result in the reader loading old signature + * (which matches), while the key_idx is updated to a + * value that belongs to a new key. However, the full + * key comparison will ensure that the lookup fails. + */ + if (bkt->sig_current[i] == sig) { + key_idx = __atomic_load_n(&bkt->key_idx[i], __ATOMIC_ACQUIRE); - if (bkt->sig_current[i] == sig && key_idx != EMPTY_SLOT) { - k = (struct rte_hash_key *) ((char *)keys + - key_idx * h->key_entry_size); - pdata = __atomic_load_n(&k->pdata, - __ATOMIC_ACQUIRE); - - if (rte_hash_cmp_eq(key, k->key, h) == 0) { - if (data != NULL) - *data = pdata; - /* - * Return index where key is stored, - * subtracting the first dummy index - */ - return key_idx - 1; + if (key_idx != EMPTY_SLOT) { + k = (struct rte_hash_key *) ((char *)keys + + key_idx * h->key_entry_size); + + if (rte_hash_cmp_eq(key, k->key, h) == 0) { + if (data != NULL) { + *data = __atomic_load_n( + &k->pdata, + __ATOMIC_ACQUIRE); + } + /* + * Return index where key is stored, + * subtracting the first dummy index + */ + return key_idx - 1; + } } } } @@ -1256,10 +1301,8 @@ __rte_hash_lookup_with_hash_lf(const struct rte_hash *h, const void *key, /* Check if key is in primary location */ bkt = &h->buckets[prim_bucket_idx]; ret = search_one_bucket_lf(h, key, short_sig, data, bkt); - if (ret != -1) { - __hash_rw_reader_unlock(h); + if (ret != -1) return ret; - } /* Calculate secondary hash */ bkt = &h->buckets[sec_bucket_idx]; @@ -1267,10 +1310,8 @@ __rte_hash_lookup_with_hash_lf(const struct rte_hash *h, const void *key, FOR_EACH_BUCKET(cur_bkt, bkt) { ret = search_one_bucket_lf(h, key, short_sig, data, cur_bkt); - if (ret != -1) { - __hash_rw_reader_unlock(h); + if (ret != -1) return ret; - } } /* The loads of sig_current in search_one_bucket @@ -1344,18 +1385,22 @@ remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i) /* Cache full, need to free it. */ if (cached_free_slots->len == LCORE_CACHE_SIZE) { /* Need to enqueue the free slots in global ring. */ - n_slots = rte_ring_mp_enqueue_burst(h->free_slots, + n_slots = rte_ring_mp_enqueue_burst_elem(h->free_slots, cached_free_slots->objs, + sizeof(uint32_t), LCORE_CACHE_SIZE, NULL); + ERR_IF_TRUE((n_slots == 0), + "%s: could not enqueue free slots in global ring\n", + __func__); cached_free_slots->len -= n_slots; } /* Put index of new free slot in cache. */ cached_free_slots->objs[cached_free_slots->len] = - (void *)((uintptr_t)bkt->key_idx[i]); + bkt->key_idx[i]; cached_free_slots->len++; } else { - rte_ring_sp_enqueue(h->free_slots, - (void *)((uintptr_t)bkt->key_idx[i])); + rte_ring_sp_enqueue_elem(h->free_slots, + &bkt->key_idx[i], sizeof(uint32_t)); } } @@ -1363,7 +1408,8 @@ remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i) * empty slot. */ static inline void -__rte_hash_compact_ll(struct rte_hash_bucket *cur_bkt, int pos) { +__rte_hash_compact_ll(const struct rte_hash *h, + struct rte_hash_bucket *cur_bkt, int pos) { int i; struct rte_hash_bucket *last_bkt; @@ -1374,10 +1420,27 @@ __rte_hash_compact_ll(struct rte_hash_bucket *cur_bkt, int pos) { for (i = RTE_HASH_BUCKET_ENTRIES - 1; i >= 0; i--) { if (last_bkt->key_idx[i] != EMPTY_SLOT) { - cur_bkt->key_idx[pos] = last_bkt->key_idx[i]; cur_bkt->sig_current[pos] = last_bkt->sig_current[i]; + __atomic_store_n(&cur_bkt->key_idx[pos], + last_bkt->key_idx[i], + __ATOMIC_RELEASE); + if (h->readwrite_concur_lf_support) { + /* Inform the readers that the table has changed + * Since there is one writer, load acquire on + * tbl_chng_cnt is not required. + */ + __atomic_store_n(h->tbl_chng_cnt, + *h->tbl_chng_cnt + 1, + __ATOMIC_RELEASE); + /* The store to sig_current should + * not move above the store to tbl_chng_cnt. + */ + __atomic_thread_fence(__ATOMIC_RELEASE); + } last_bkt->sig_current[i] = NULL_SIGNATURE; - last_bkt->key_idx[i] = EMPTY_SLOT; + __atomic_store_n(&last_bkt->key_idx[i], + EMPTY_SLOT, + __ATOMIC_RELEASE); return; } } @@ -1446,7 +1509,7 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key, /* look for key in primary bucket */ ret = search_and_remove(h, key, prim_bkt, short_sig, &pos); if (ret != -1) { - __rte_hash_compact_ll(prim_bkt, pos); + __rte_hash_compact_ll(h, prim_bkt, pos); last_bkt = prim_bkt->next; prev_bkt = prim_bkt; goto return_bkt; @@ -1458,7 +1521,7 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key, FOR_EACH_BUCKET(cur_bkt, sec_bkt) { ret = search_and_remove(h, key, cur_bkt, short_sig, &pos); if (ret != -1) { - __rte_hash_compact_ll(cur_bkt, pos); + __rte_hash_compact_ll(h, cur_bkt, pos); last_bkt = sec_bkt->next; prev_bkt = sec_bkt; goto return_bkt; @@ -1485,11 +1548,25 @@ return_bkt: } /* found empty bucket and recycle */ if (i == RTE_HASH_BUCKET_ENTRIES) { - prev_bkt->next = last_bkt->next = NULL; + prev_bkt->next = NULL; uint32_t index = last_bkt - h->buckets_ext + 1; - rte_ring_sp_enqueue(h->free_ext_bkts, (void *)(uintptr_t)index); + /* Recycle the empty bkt if + * no_free_on_del is disabled. + */ + if (h->no_free_on_del) + /* Store index of an empty ext bkt to be recycled + * on calling rte_hash_del_xxx APIs. + * When lock free read-write concurrency is enabled, + * an empty ext bkt cannot be put into free list + * immediately (as readers might be using it still). + * Hence freeing of the ext bkt is piggy-backed to + * freeing of the key index. + */ + h->ext_bkt_to_free[ret] = index; + else + rte_ring_sp_enqueue_elem(h->free_ext_bkts, &index, + sizeof(uint32_t)); } - __hash_rw_writer_unlock(h); return ret; } @@ -1529,19 +1606,33 @@ rte_hash_get_key_with_position(const struct rte_hash *h, const int32_t position, return 0; } -int __rte_experimental +int rte_hash_free_key_with_position(const struct rte_hash *h, const int32_t position) { - RETURN_IF_TRUE(((h == NULL) || (position == EMPTY_SLOT)), -EINVAL); + /* Key index where key is stored, adding the first dummy index */ + uint32_t key_idx = position + 1; + + RETURN_IF_TRUE(((h == NULL) || (key_idx == EMPTY_SLOT)), -EINVAL); unsigned int lcore_id, n_slots; struct lcore_cache *cached_free_slots; - const int32_t total_entries = h->num_buckets * RTE_HASH_BUCKET_ENTRIES; + const uint32_t total_entries = h->use_local_cache ? + h->entries + (RTE_MAX_LCORE - 1) * (LCORE_CACHE_SIZE - 1) + 1 + : h->entries + 1; /* Out of bounds */ - if (position >= total_entries) + if (key_idx >= total_entries) return -EINVAL; + if (h->ext_table_support && h->readwrite_concur_lf_support) { + uint32_t index = h->ext_bkt_to_free[position]; + if (index) { + /* Recycle empty ext bkt to free list. */ + rte_ring_sp_enqueue_elem(h->free_ext_bkts, &index, + sizeof(uint32_t)); + h->ext_bkt_to_free[position] = 0; + } + } if (h->use_local_cache) { lcore_id = rte_lcore_id(); @@ -1549,18 +1640,19 @@ rte_hash_free_key_with_position(const struct rte_hash *h, /* Cache full, need to free it. */ if (cached_free_slots->len == LCORE_CACHE_SIZE) { /* Need to enqueue the free slots in global ring. */ - n_slots = rte_ring_mp_enqueue_burst(h->free_slots, + n_slots = rte_ring_mp_enqueue_burst_elem(h->free_slots, cached_free_slots->objs, + sizeof(uint32_t), LCORE_CACHE_SIZE, NULL); + RETURN_IF_TRUE((n_slots == 0), -EFAULT); cached_free_slots->len -= n_slots; } /* Put index of new free slot in cache. */ - cached_free_slots->objs[cached_free_slots->len] = - (void *)((uintptr_t)position); + cached_free_slots->objs[cached_free_slots->len] = key_idx; cached_free_slots->len++; } else { - rte_ring_sp_enqueue(h->free_slots, - (void *)((uintptr_t)position)); + rte_ring_sp_enqueue_elem(h->free_slots, &key_idx, + sizeof(uint32_t)); } return 0; @@ -1577,7 +1669,7 @@ compare_signatures(uint32_t *prim_hash_matches, uint32_t *sec_hash_matches, /* For match mask the first bit of every two bits indicates the match */ switch (sig_cmp_fn) { -#ifdef RTE_MACHINE_CPUFLAG_SSE2 +#if defined(RTE_MACHINE_CPUFLAG_SSE2) case RTE_HASH_COMPARE_SSE: /* Compare all signatures in the bucket */ *prim_hash_matches = _mm_movemask_epi8(_mm_cmpeq_epi16( @@ -1590,6 +1682,24 @@ compare_signatures(uint32_t *prim_hash_matches, uint32_t *sec_hash_matches, (__m128i const *)sec_bkt->sig_current), _mm_set1_epi16(sig))); break; +#elif defined(RTE_MACHINE_CPUFLAG_NEON) + case RTE_HASH_COMPARE_NEON: { + uint16x8_t vmat, vsig, x; + int16x8_t shift = {-15, -13, -11, -9, -7, -5, -3, -1}; + + vsig = vld1q_dup_u16((uint16_t const *)&sig); + /* Compare all signatures in the primary bucket */ + vmat = vceqq_u16(vsig, + vld1q_u16((uint16_t const *)prim_bkt->sig_current)); + x = vshlq_u16(vandq_u16(vmat, vdupq_n_u16(0x8000)), shift); + *prim_hash_matches = (uint32_t)(vaddvq_u16(x)); + /* Compare all signatures in the secondary bucket */ + vmat = vceqq_u16(vsig, + vld1q_u16((uint16_t const *)sec_bkt->sig_current)); + x = vshlq_u16(vandq_u16(vmat, vdupq_n_u16(0x8000)), shift); + *sec_hash_matches = (uint32_t)(vaddvq_u16(x)); + } + break; #endif default: for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) { @@ -1809,7 +1919,6 @@ __rte_hash_lookup_bulk_lf(const struct rte_hash *h, const void **keys, uint32_t prim_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0}; uint32_t sec_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0}; struct rte_hash_bucket *cur_bkt, *next_bkt; - void *pdata[RTE_HASH_LOOKUP_BULK_MAX]; uint32_t cnt_b, cnt_a; /* Prefetch first keys */ @@ -1851,6 +1960,9 @@ __rte_hash_lookup_bulk_lf(const struct rte_hash *h, const void **keys, rte_prefetch0(secondary_bkt[i]); } + for (i = 0; i < num_keys; i++) + positions[i] = -ENOENT; + do { /* Load the table change counter before the lookup * starts. Acquire semantics will make sure that @@ -1895,7 +2007,6 @@ __rte_hash_lookup_bulk_lf(const struct rte_hash *h, const void **keys, /* Compare keys, first hits in primary first */ for (i = 0; i < num_keys; i++) { - positions[i] = -ENOENT; while (prim_hitmask[i]) { uint32_t hit_index = __builtin_ctzl(prim_hitmask[i]) @@ -1909,10 +2020,6 @@ __rte_hash_lookup_bulk_lf(const struct rte_hash *h, const void **keys, (const char *)h->key_store + key_idx * h->key_entry_size); - if (key_idx != EMPTY_SLOT) - pdata[i] = __atomic_load_n( - &key_slot->pdata, - __ATOMIC_ACQUIRE); /* * If key index is 0, do not compare key, * as it is checking the dummy slot @@ -1921,7 +2028,9 @@ __rte_hash_lookup_bulk_lf(const struct rte_hash *h, const void **keys, !rte_hash_cmp_eq( key_slot->key, keys[i], h)) { if (data != NULL) - data[i] = pdata[i]; + data[i] = __atomic_load_n( + &key_slot->pdata, + __ATOMIC_ACQUIRE); hits |= 1ULL << i; positions[i] = key_idx - 1; @@ -1943,10 +2052,6 @@ __rte_hash_lookup_bulk_lf(const struct rte_hash *h, const void **keys, (const char *)h->key_store + key_idx * h->key_entry_size); - if (key_idx != EMPTY_SLOT) - pdata[i] = __atomic_load_n( - &key_slot->pdata, - __ATOMIC_ACQUIRE); /* * If key index is 0, do not compare key, * as it is checking the dummy slot @@ -1956,7 +2061,9 @@ __rte_hash_lookup_bulk_lf(const struct rte_hash *h, const void **keys, !rte_hash_cmp_eq( key_slot->key, keys[i], h)) { if (data != NULL) - data[i] = pdata[i]; + data[i] = __atomic_load_n( + &key_slot->pdata, + __ATOMIC_ACQUIRE); hits |= 1ULL << i; positions[i] = key_idx - 1; @@ -1968,6 +2075,35 @@ next_key: continue; } + /* all found, do not need to go through ext bkt */ + if (hits == ((1ULL << num_keys) - 1)) { + if (hit_mask != NULL) + *hit_mask = hits; + return; + } + /* need to check ext buckets for match */ + if (h->ext_table_support) { + for (i = 0; i < num_keys; i++) { + if ((hits & (1ULL << i)) != 0) + continue; + next_bkt = secondary_bkt[i]->next; + FOR_EACH_BUCKET(cur_bkt, next_bkt) { + if (data != NULL) + ret = search_one_bucket_lf(h, + keys[i], sig[i], + &data[i], cur_bkt); + else + ret = search_one_bucket_lf(h, + keys[i], sig[i], + NULL, cur_bkt); + if (ret != -1) { + positions[i] = ret; + hits |= 1ULL << i; + break; + } + } + } + } /* The loads of sig_current in compare_signatures * should not move below the load from tbl_chng_cnt. */ @@ -1984,34 +2120,6 @@ next_key: __ATOMIC_ACQUIRE); } while (cnt_b != cnt_a); - /* all found, do not need to go through ext bkt */ - if ((hits == ((1ULL << num_keys) - 1)) || !h->ext_table_support) { - if (hit_mask != NULL) - *hit_mask = hits; - __hash_rw_reader_unlock(h); - return; - } - - /* need to check ext buckets for match */ - for (i = 0; i < num_keys; i++) { - if ((hits & (1ULL << i)) != 0) - continue; - next_bkt = secondary_bkt[i]->next; - FOR_EACH_BUCKET(cur_bkt, next_bkt) { - if (data != NULL) - ret = search_one_bucket_lf(h, keys[i], - sig[i], &data[i], cur_bkt); - else - ret = search_one_bucket_lf(h, keys[i], - sig[i], NULL, cur_bkt); - if (ret != -1) { - positions[i] = ret; - hits |= 1ULL << i; - break; - } - } - } - if (hit_mask != NULL) *hit_mask = hits; } @@ -2022,11 +2130,11 @@ __rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys, uint64_t *hit_mask, void *data[]) { if (h->readwrite_concur_lf_support) - return __rte_hash_lookup_bulk_lf(h, keys, num_keys, - positions, hit_mask, data); + __rte_hash_lookup_bulk_lf(h, keys, num_keys, positions, + hit_mask, data); else - return __rte_hash_lookup_bulk_l(h, keys, num_keys, - positions, hit_mask, data); + __rte_hash_lookup_bulk_l(h, keys, num_keys, positions, + hit_mask, data); } int