X-Git-Url: http://git.droids-corp.org/?a=blobdiff_plain;f=lib%2Flibrte_hash%2Frte_cuckoo_hash.c;h=1191dfd81a814a985b3c8911edf1c9d37138290b;hb=3127f99274b679124658afdbfc49210730c50617;hp=1f5808ebaf6664a06fef74c450efb2489aa4e032;hpb=2bc731197b8afc0ea0a0796dd7cea43474a6e1f9;p=dpdk.git diff --git a/lib/librte_hash/rte_cuckoo_hash.c b/lib/librte_hash/rte_cuckoo_hash.c index 1f5808ebaf..1191dfd81a 100644 --- a/lib/librte_hash/rte_cuckoo_hash.c +++ b/lib/librte_hash/rte_cuckoo_hash.c @@ -24,13 +24,22 @@ #include #include #include -#include +#include #include #include +#include #include "rte_hash.h" #include "rte_cuckoo_hash.h" +/* Mask of all flags supported by this version */ +#define RTE_HASH_EXTRA_FLAGS_MASK (RTE_HASH_EXTRA_FLAGS_TRANS_MEM_SUPPORT | \ + RTE_HASH_EXTRA_FLAGS_MULTI_WRITER_ADD | \ + RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY | \ + RTE_HASH_EXTRA_FLAGS_EXT_TABLE | \ + RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL | \ + RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF) + #define FOR_EACH_BUCKET(CURRENT_BKT, START_BUCKET) \ for (CURRENT_BKT = START_BUCKET; \ CURRENT_BKT != NULL; \ @@ -43,6 +52,11 @@ static struct rte_tailq_elem rte_hash_tailq = { }; EAL_REGISTER_TAILQ(rte_hash_tailq) +struct __rte_hash_rcu_dq_entry { + uint32_t key_idx; + uint32_t ext_bkt_idx; +}; + struct rte_hash * rte_hash_find_existing(const char *name) { @@ -52,13 +66,13 @@ rte_hash_find_existing(const char *name) hash_list = RTE_TAILQ_CAST(rte_hash_tailq.head, rte_hash_list); - rte_rwlock_read_lock(RTE_EAL_TAILQ_RWLOCK); + rte_mcfg_tailq_read_lock(); TAILQ_FOREACH(te, hash_list, next) { h = (struct rte_hash *) te->data; if (strncmp(name, h->name, RTE_HASH_NAMESIZE) == 0) break; } - rte_rwlock_read_unlock(RTE_EAL_TAILQ_RWLOCK); + rte_mcfg_tailq_read_unlock(); if (te == NULL) { rte_errno = ENOENT; @@ -135,7 +149,6 @@ rte_hash_create(const struct rte_hash_parameters *params) char ring_name[RTE_RING_NAMESIZE]; char ext_ring_name[RTE_RING_NAMESIZE]; unsigned num_key_slots; - unsigned i; unsigned int hw_trans_mem_support = 0, use_local_cache = 0; unsigned int ext_table_support = 0; unsigned int readwrite_concur_support = 0; @@ -143,7 +156,9 @@ rte_hash_create(const struct rte_hash_parameters *params) unsigned int no_free_on_del = 0; uint32_t *ext_bkt_to_free = NULL; uint32_t *tbl_chng_cnt = NULL; + struct lcore_cache *local_free_slots = NULL; unsigned int readwrite_concur_lf_support = 0; + uint32_t i; rte_hash_function default_hash_func = (rte_hash_function)rte_jhash; @@ -163,6 +178,12 @@ rte_hash_create(const struct rte_hash_parameters *params) return NULL; } + if (params->extra_flag & ~RTE_HASH_EXTRA_FLAGS_MASK) { + rte_errno = EINVAL; + RTE_LOG(ERR, HASH, "rte_hash_create: unsupported extra flags\n"); + return NULL; + } + /* Validate correct usage of extra options */ if ((params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY) && (params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF)) { @@ -194,7 +215,10 @@ rte_hash_create(const struct rte_hash_parameters *params) if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF) { readwrite_concur_lf_support = 1; - /* Enable not freeing internal memory/index on delete */ + /* Enable not freeing internal memory/index on delete. + * If internal RCU is enabled, freeing of internal memory/index + * is done on delete + */ no_free_on_del = 1; } @@ -212,8 +236,8 @@ rte_hash_create(const struct rte_hash_parameters *params) snprintf(ring_name, sizeof(ring_name), "HT_%s", params->name); /* Create ring (Dummy slot index is not enqueued) */ - r = rte_ring_create(ring_name, rte_align32pow2(num_key_slots), - params->socket_id, 0); + r = rte_ring_create_elem(ring_name, sizeof(uint32_t), + rte_align32pow2(num_key_slots), params->socket_id, 0); if (r == NULL) { RTE_LOG(ERR, HASH, "memory allocation failed\n"); goto err; @@ -226,7 +250,7 @@ rte_hash_create(const struct rte_hash_parameters *params) if (ext_table_support) { snprintf(ext_ring_name, sizeof(ext_ring_name), "HT_EXT_%s", params->name); - r_ext = rte_ring_create(ext_ring_name, + r_ext = rte_ring_create_elem(ext_ring_name, sizeof(uint32_t), rte_align32pow2(num_buckets + 1), params->socket_id, 0); @@ -239,7 +263,7 @@ rte_hash_create(const struct rte_hash_parameters *params) snprintf(hash_name, sizeof(hash_name), "HT_%s", params->name); - rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK); + rte_mcfg_tailq_write_lock(); /* guarantee there's no existing: this is normally already checked * by ring creation above */ @@ -294,7 +318,7 @@ rte_hash_create(const struct rte_hash_parameters *params) * for next bucket */ for (i = 1; i <= num_buckets; i++) - rte_ring_sp_enqueue(r_ext, (void *)((uintptr_t) i)); + rte_ring_sp_enqueue_elem(r_ext, &i, sizeof(uint32_t)); if (readwrite_concur_lf_support) { ext_bkt_to_free = rte_zmalloc(NULL, sizeof(uint32_t) * @@ -368,9 +392,13 @@ rte_hash_create(const struct rte_hash_parameters *params) #endif if (use_local_cache) { - h->local_free_slots = rte_zmalloc_socket(NULL, + local_free_slots = rte_zmalloc_socket(NULL, sizeof(struct lcore_cache) * RTE_MAX_LCORE, RTE_CACHE_LINE_SIZE, params->socket_id); + if (local_free_slots == NULL) { + RTE_LOG(ERR, HASH, "local free slots memory allocation failed\n"); + goto err_unlock; + } } /* Default hash function */ @@ -401,6 +429,7 @@ rte_hash_create(const struct rte_hash_parameters *params) *h->tbl_chng_cnt = 0; h->hw_trans_mem_support = hw_trans_mem_support; h->use_local_cache = use_local_cache; + h->local_free_slots = local_free_slots; h->readwrite_concur_support = readwrite_concur_support; h->ext_table_support = ext_table_support; h->writer_takes_lock = writer_takes_lock; @@ -433,19 +462,20 @@ rte_hash_create(const struct rte_hash_parameters *params) /* Populate free slots ring. Entry zero is reserved for key misses. */ for (i = 1; i < num_key_slots; i++) - rte_ring_sp_enqueue(r, (void *)((uintptr_t) i)); + rte_ring_sp_enqueue_elem(r, &i, sizeof(uint32_t)); te->data = (void *) h; TAILQ_INSERT_TAIL(hash_list, te, next); - rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK); + rte_mcfg_tailq_write_unlock(); return h; err_unlock: - rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK); + rte_mcfg_tailq_write_unlock(); err: rte_ring_free(r); rte_ring_free(r_ext); rte_free(te); + rte_free(local_free_slots); rte_free(h); rte_free(buckets); rte_free(buckets_ext); @@ -466,7 +496,7 @@ rte_hash_free(struct rte_hash *h) hash_list = RTE_TAILQ_CAST(rte_hash_tailq.head, rte_hash_list); - rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK); + rte_mcfg_tailq_write_lock(); /* find out tailq entry */ TAILQ_FOREACH(te, hash_list, next) { @@ -475,13 +505,16 @@ rte_hash_free(struct rte_hash *h) } if (te == NULL) { - rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK); + rte_mcfg_tailq_write_unlock(); return; } TAILQ_REMOVE(hash_list, te, next); - rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK); + rte_mcfg_tailq_write_unlock(); + + if (h->dq) + rte_rcu_qsbr_dq_delete(h->dq); if (h->use_local_cache) rte_free(h->local_free_slots); @@ -505,6 +538,21 @@ rte_hash_hash(const struct rte_hash *h, const void *key) return h->hash_func(key, h->key_len, h->hash_func_init_val); } +int32_t +rte_hash_max_key_id(const struct rte_hash *h) +{ + RETURN_IF_TRUE((h == NULL), -EINVAL); + if (h->use_local_cache) + /* + * Increase number of slots by total number of indices + * that can be stored in the lcore caches + */ + return (h->entries + ((RTE_MAX_LCORE - 1) * + (LCORE_CACHE_SIZE - 1))); + else + return h->entries; +} + int32_t rte_hash_count(const struct rte_hash *h) { @@ -569,27 +617,33 @@ __hash_rw_reader_unlock(const struct rte_hash *h) void rte_hash_reset(struct rte_hash *h) { - void *ptr; uint32_t tot_ring_cnt, i; + unsigned int pending; if (h == NULL) return; __hash_rw_writer_lock(h); + + if (h->dq) { + /* Reclaim all the resources */ + rte_rcu_qsbr_dq_reclaim(h->dq, ~0, NULL, &pending, NULL); + if (pending != 0) + RTE_LOG(ERR, HASH, "RCU reclaim all resources failed\n"); + } + memset(h->buckets, 0, h->num_buckets * sizeof(struct rte_hash_bucket)); memset(h->key_store, 0, h->key_entry_size * (h->entries + 1)); *h->tbl_chng_cnt = 0; - /* clear the free ring */ - while (rte_ring_dequeue(h->free_slots, &ptr) == 0) - continue; + /* reset the free ring */ + rte_ring_reset(h->free_slots); - /* clear free extendable bucket ring and memory */ + /* flush free extendable bucket ring and memory */ if (h->ext_table_support) { memset(h->buckets_ext, 0, h->num_buckets * sizeof(struct rte_hash_bucket)); - while (rte_ring_dequeue(h->free_ext_bkts, &ptr) == 0) - continue; + rte_ring_reset(h->free_ext_bkts); } /* Repopulate the free slots ring. Entry zero is reserved for key misses */ @@ -600,13 +654,13 @@ rte_hash_reset(struct rte_hash *h) tot_ring_cnt = h->entries; for (i = 1; i < tot_ring_cnt + 1; i++) - rte_ring_sp_enqueue(h->free_slots, (void *)((uintptr_t) i)); + rte_ring_sp_enqueue_elem(h->free_slots, &i, sizeof(uint32_t)); /* Repopulate the free ext bkt ring. */ if (h->ext_table_support) { for (i = 1; i <= h->num_buckets; i++) - rte_ring_sp_enqueue(h->free_ext_bkts, - (void *)((uintptr_t) i)); + rte_ring_sp_enqueue_elem(h->free_ext_bkts, &i, + sizeof(uint32_t)); } if (h->use_local_cache) { @@ -625,13 +679,14 @@ rte_hash_reset(struct rte_hash *h) static inline void enqueue_slot_back(const struct rte_hash *h, struct lcore_cache *cached_free_slots, - void *slot_id) + uint32_t slot_id) { if (h->use_local_cache) { cached_free_slots->objs[cached_free_slots->len] = slot_id; cached_free_slots->len++; } else - rte_ring_sp_enqueue(h->free_slots, slot_id); + rte_ring_sp_enqueue_elem(h->free_slots, &slot_id, + sizeof(uint32_t)); } /* Search a key from bucket and update its data. @@ -649,9 +704,11 @@ search_and_update(const struct rte_hash *h, void *data, const void *key, k = (struct rte_hash_key *) ((char *)keys + bkt->key_idx[i] * h->key_entry_size); if (rte_hash_cmp_eq(key, k->key, h) == 0) { - /* 'pdata' acts as the synchronization point - * when an existing hash entry is updated. - * Key is not updated in this case. + /* The store to application data at *data + * should not leak after the store to pdata + * in the key store. i.e. pdata is the guard + * variable. Release the application data + * to the readers. */ __atomic_store_n(&k->pdata, data, @@ -711,11 +768,10 @@ rte_hash_cuckoo_insert_mw(const struct rte_hash *h, /* Check if slot is available */ if (likely(prim_bkt->key_idx[i] == EMPTY_SLOT)) { prim_bkt->sig_current[i] = sig; - /* Key can be of arbitrary length, so it is - * not possible to store it atomically. - * Hence the new key element's memory stores - * (key as well as data) should be complete - * before it is referenced. + /* Store to signature and key should not + * leak after the store to key_idx. i.e. + * key_idx is the guard variable for signature + * and key. */ __atomic_store_n(&prim_bkt->key_idx[i], new_idx, @@ -916,6 +972,38 @@ rte_hash_cuckoo_make_space_mw(const struct rte_hash *h, return -ENOSPC; } +static inline uint32_t +alloc_slot(const struct rte_hash *h, struct lcore_cache *cached_free_slots) +{ + unsigned int n_slots; + uint32_t slot_id; + + if (h->use_local_cache) { + /* Try to get a free slot from the local cache */ + if (cached_free_slots->len == 0) { + /* Need to get another burst of free slots from global ring */ + n_slots = rte_ring_mc_dequeue_burst_elem(h->free_slots, + cached_free_slots->objs, + sizeof(uint32_t), + LCORE_CACHE_SIZE, NULL); + if (n_slots == 0) + return EMPTY_SLOT; + + cached_free_slots->len += n_slots; + } + + /* Get a free slot from the local cache */ + cached_free_slots->len--; + slot_id = cached_free_slots->objs[cached_free_slots->len]; + } else { + if (rte_ring_sc_dequeue_elem(h->free_slots, &slot_id, + sizeof(uint32_t)) != 0) + return EMPTY_SLOT; + } + + return slot_id; +} + static inline int32_t __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, hash_sig_t sig, void *data) @@ -924,11 +1012,9 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, uint32_t prim_bucket_idx, sec_bucket_idx; struct rte_hash_bucket *prim_bkt, *sec_bkt, *cur_bkt; struct rte_hash_key *new_k, *keys = h->key_store; - void *slot_id = NULL; - void *ext_bkt_id = NULL; - uint32_t new_idx, bkt_id; + uint32_t ext_bkt_id = 0; + uint32_t slot_id; int ret; - unsigned n_slots; unsigned lcore_id; unsigned int i; struct lcore_cache *cached_free_slots = NULL; @@ -966,47 +1052,38 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, if (h->use_local_cache) { lcore_id = rte_lcore_id(); cached_free_slots = &h->local_free_slots[lcore_id]; - /* Try to get a free slot from the local cache */ - if (cached_free_slots->len == 0) { - /* Need to get another burst of free slots from global ring */ - n_slots = rte_ring_mc_dequeue_burst(h->free_slots, - cached_free_slots->objs, - LCORE_CACHE_SIZE, NULL); - if (n_slots == 0) { - return -ENOSPC; - } - - cached_free_slots->len += n_slots; + } + slot_id = alloc_slot(h, cached_free_slots); + if (slot_id == EMPTY_SLOT) { + if (h->dq) { + __hash_rw_writer_lock(h); + ret = rte_rcu_qsbr_dq_reclaim(h->dq, + h->hash_rcu_cfg->max_reclaim_size, + NULL, NULL, NULL); + __hash_rw_writer_unlock(h); + if (ret == 0) + slot_id = alloc_slot(h, cached_free_slots); } - - /* Get a free slot from the local cache */ - cached_free_slots->len--; - slot_id = cached_free_slots->objs[cached_free_slots->len]; - } else { - if (rte_ring_sc_dequeue(h->free_slots, &slot_id) != 0) { + if (slot_id == EMPTY_SLOT) return -ENOSPC; - } } - new_k = RTE_PTR_ADD(keys, (uintptr_t)slot_id * h->key_entry_size); - new_idx = (uint32_t)((uintptr_t) slot_id); - /* Copy key */ - memcpy(new_k->key, key, h->key_len); - /* Key can be of arbitrary length, so it is not possible to store - * it atomically. Hence the new key element's memory stores - * (key as well as data) should be complete before it is referenced. - * 'pdata' acts as the synchronization point when an existing hash - * entry is updated. + new_k = RTE_PTR_ADD(keys, slot_id * h->key_entry_size); + /* The store to application data (by the application) at *data should + * not leak after the store of pdata in the key store. i.e. pdata is + * the guard variable. Release the application data to the readers. */ __atomic_store_n(&new_k->pdata, data, __ATOMIC_RELEASE); + /* Copy key */ + memcpy(new_k->key, key, h->key_len); /* Find an empty slot and insert */ ret = rte_hash_cuckoo_insert_mw(h, prim_bkt, sec_bkt, key, data, - short_sig, new_idx, &ret_val); + short_sig, slot_id, &ret_val); if (ret == 0) - return new_idx - 1; + return slot_id - 1; else if (ret == 1) { enqueue_slot_back(h, cached_free_slots, slot_id); return ret_val; @@ -1014,9 +1091,9 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, /* Primary bucket full, need to make space for new entry */ ret = rte_hash_cuckoo_make_space_mw(h, prim_bkt, sec_bkt, key, data, - short_sig, prim_bucket_idx, new_idx, &ret_val); + short_sig, prim_bucket_idx, slot_id, &ret_val); if (ret == 0) - return new_idx - 1; + return slot_id - 1; else if (ret == 1) { enqueue_slot_back(h, cached_free_slots, slot_id); return ret_val; @@ -1024,10 +1101,10 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, /* Also search secondary bucket to get better occupancy */ ret = rte_hash_cuckoo_make_space_mw(h, sec_bkt, prim_bkt, key, data, - short_sig, sec_bucket_idx, new_idx, &ret_val); + short_sig, sec_bucket_idx, slot_id, &ret_val); if (ret == 0) - return new_idx - 1; + return slot_id - 1; else if (ret == 1) { enqueue_slot_back(h, cached_free_slots, slot_id); return ret_val; @@ -1064,14 +1141,16 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, /* Check if slot is available */ if (likely(cur_bkt->key_idx[i] == EMPTY_SLOT)) { cur_bkt->sig_current[i] = short_sig; - /* Store to signature should not leak after - * the store to key_idx + /* Store to signature and key should not + * leak after the store to key_idx. i.e. + * key_idx is the guard variable for signature + * and key. */ __atomic_store_n(&cur_bkt->key_idx[i], - new_idx, + slot_id, __ATOMIC_RELEASE); __hash_rw_writer_unlock(h); - return new_idx - 1; + return slot_id - 1; } } } @@ -1079,25 +1158,38 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, /* Failed to get an empty entry from extendable buckets. Link a new * extendable bucket. We first get a free bucket from ring. */ - if (rte_ring_sc_dequeue(h->free_ext_bkts, &ext_bkt_id) != 0) { - ret = -ENOSPC; - goto failure; + if (rte_ring_sc_dequeue_elem(h->free_ext_bkts, &ext_bkt_id, + sizeof(uint32_t)) != 0 || + ext_bkt_id == 0) { + if (h->dq) { + if (rte_rcu_qsbr_dq_reclaim(h->dq, + h->hash_rcu_cfg->max_reclaim_size, + NULL, NULL, NULL) == 0) { + rte_ring_sc_dequeue_elem(h->free_ext_bkts, + &ext_bkt_id, + sizeof(uint32_t)); + } + } + if (ext_bkt_id == 0) { + ret = -ENOSPC; + goto failure; + } } - bkt_id = (uint32_t)((uintptr_t)ext_bkt_id) - 1; /* Use the first location of the new bucket */ - (h->buckets_ext[bkt_id]).sig_current[0] = short_sig; - /* Store to signature should not leak after - * the store to key_idx + (h->buckets_ext[ext_bkt_id - 1]).sig_current[0] = short_sig; + /* Store to signature and key should not leak after + * the store to key_idx. i.e. key_idx is the guard variable + * for signature and key. */ - __atomic_store_n(&(h->buckets_ext[bkt_id]).key_idx[0], - new_idx, + __atomic_store_n(&(h->buckets_ext[ext_bkt_id - 1]).key_idx[0], + slot_id, __ATOMIC_RELEASE); /* Link the new bucket to sec bucket linked list */ last = rte_hash_get_last_bkt(sec_bkt); - last->next = &h->buckets_ext[bkt_id]; + last->next = &h->buckets_ext[ext_bkt_id - 1]; __hash_rw_writer_unlock(h); - return new_idx - 1; + return slot_id - 1; failure: __hash_rw_writer_unlock(h); @@ -1184,26 +1276,35 @@ search_one_bucket_lf(const struct rte_hash *h, const void *key, uint16_t sig, { int i; uint32_t key_idx; - void *pdata; struct rte_hash_key *k, *keys = h->key_store; for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) { - key_idx = __atomic_load_n(&bkt->key_idx[i], + /* Signature comparison is done before the acquire-load + * of the key index to achieve better performance. + * This can result in the reader loading old signature + * (which matches), while the key_idx is updated to a + * value that belongs to a new key. However, the full + * key comparison will ensure that the lookup fails. + */ + if (bkt->sig_current[i] == sig) { + key_idx = __atomic_load_n(&bkt->key_idx[i], __ATOMIC_ACQUIRE); - if (bkt->sig_current[i] == sig && key_idx != EMPTY_SLOT) { - k = (struct rte_hash_key *) ((char *)keys + - key_idx * h->key_entry_size); - pdata = __atomic_load_n(&k->pdata, - __ATOMIC_ACQUIRE); - - if (rte_hash_cmp_eq(key, k->key, h) == 0) { - if (data != NULL) - *data = pdata; - /* - * Return index where key is stored, - * subtracting the first dummy index - */ - return key_idx - 1; + if (key_idx != EMPTY_SLOT) { + k = (struct rte_hash_key *) ((char *)keys + + key_idx * h->key_entry_size); + + if (rte_hash_cmp_eq(key, k->key, h) == 0) { + if (data != NULL) { + *data = __atomic_load_n( + &k->pdata, + __ATOMIC_ACQUIRE); + } + /* + * Return index where key is stored, + * subtracting the first dummy index + */ + return key_idx - 1; + } } } } @@ -1276,10 +1377,8 @@ __rte_hash_lookup_with_hash_lf(const struct rte_hash *h, const void *key, /* Check if key is in primary location */ bkt = &h->buckets[prim_bucket_idx]; ret = search_one_bucket_lf(h, key, short_sig, data, bkt); - if (ret != -1) { - __hash_rw_reader_unlock(h); + if (ret != -1) return ret; - } /* Calculate secondary hash */ bkt = &h->buckets[sec_bucket_idx]; @@ -1287,10 +1386,8 @@ __rte_hash_lookup_with_hash_lf(const struct rte_hash *h, const void *key, FOR_EACH_BUCKET(cur_bkt, bkt) { ret = search_one_bucket_lf(h, key, short_sig, data, cur_bkt); - if (ret != -1) { - __hash_rw_reader_unlock(h); + if (ret != -1) return ret; - } } /* The loads of sig_current in search_one_bucket @@ -1352,33 +1449,145 @@ rte_hash_lookup_data(const struct rte_hash *h, const void *key, void **data) return __rte_hash_lookup_with_hash(h, key, rte_hash_hash(h, key), data); } -static inline void -remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i) +static int +free_slot(const struct rte_hash *h, uint32_t slot_id) { unsigned lcore_id, n_slots; - struct lcore_cache *cached_free_slots; + struct lcore_cache *cached_free_slots = NULL; + /* Return key indexes to free slot ring */ if (h->use_local_cache) { lcore_id = rte_lcore_id(); cached_free_slots = &h->local_free_slots[lcore_id]; /* Cache full, need to free it. */ if (cached_free_slots->len == LCORE_CACHE_SIZE) { /* Need to enqueue the free slots in global ring. */ - n_slots = rte_ring_mp_enqueue_burst(h->free_slots, + n_slots = rte_ring_mp_enqueue_burst_elem(h->free_slots, cached_free_slots->objs, + sizeof(uint32_t), LCORE_CACHE_SIZE, NULL); - ERR_IF_TRUE((n_slots == 0), - "%s: could not enqueue free slots in global ring\n", - __func__); + RETURN_IF_TRUE((n_slots == 0), -EFAULT); cached_free_slots->len -= n_slots; } - /* Put index of new free slot in cache. */ - cached_free_slots->objs[cached_free_slots->len] = - (void *)((uintptr_t)bkt->key_idx[i]); - cached_free_slots->len++; + } + + enqueue_slot_back(h, cached_free_slots, slot_id); + return 0; +} + +static void +__hash_rcu_qsbr_free_resource(void *p, void *e, unsigned int n) +{ + void *key_data = NULL; + int ret; + struct rte_hash_key *keys, *k; + struct rte_hash *h = (struct rte_hash *)p; + struct __rte_hash_rcu_dq_entry rcu_dq_entry = + *((struct __rte_hash_rcu_dq_entry *)e); + + RTE_SET_USED(n); + keys = h->key_store; + + k = (struct rte_hash_key *) ((char *)keys + + rcu_dq_entry.key_idx * h->key_entry_size); + key_data = k->pdata; + if (h->hash_rcu_cfg->free_key_data_func) + h->hash_rcu_cfg->free_key_data_func(h->hash_rcu_cfg->key_data_ptr, + key_data); + + if (h->ext_table_support && rcu_dq_entry.ext_bkt_idx != EMPTY_SLOT) + /* Recycle empty ext bkt to free list. */ + rte_ring_sp_enqueue_elem(h->free_ext_bkts, + &rcu_dq_entry.ext_bkt_idx, sizeof(uint32_t)); + + /* Return key indexes to free slot ring */ + ret = free_slot(h, rcu_dq_entry.key_idx); + if (ret < 0) { + RTE_LOG(ERR, HASH, + "%s: could not enqueue free slots in global ring\n", + __func__); + } +} + +int +rte_hash_rcu_qsbr_add(struct rte_hash *h, struct rte_hash_rcu_config *cfg) +{ + struct rte_rcu_qsbr_dq_parameters params = {0}; + char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE]; + struct rte_hash_rcu_config *hash_rcu_cfg = NULL; + + if (h == NULL || cfg == NULL || cfg->v == NULL) { + rte_errno = EINVAL; + return 1; + } + + const uint32_t total_entries = h->use_local_cache ? + h->entries + (RTE_MAX_LCORE - 1) * (LCORE_CACHE_SIZE - 1) + 1 + : h->entries + 1; + + if (h->hash_rcu_cfg) { + rte_errno = EEXIST; + return 1; + } + + hash_rcu_cfg = rte_zmalloc(NULL, sizeof(struct rte_hash_rcu_config), 0); + if (hash_rcu_cfg == NULL) { + RTE_LOG(ERR, HASH, "memory allocation failed\n"); + return 1; + } + + if (cfg->mode == RTE_HASH_QSBR_MODE_SYNC) { + /* No other things to do. */ + } else if (cfg->mode == RTE_HASH_QSBR_MODE_DQ) { + /* Init QSBR defer queue. */ + snprintf(rcu_dq_name, sizeof(rcu_dq_name), + "HASH_RCU_%s", h->name); + params.name = rcu_dq_name; + params.size = cfg->dq_size; + if (params.size == 0) + params.size = total_entries; + params.trigger_reclaim_limit = cfg->trigger_reclaim_limit; + if (params.max_reclaim_size == 0) + params.max_reclaim_size = RTE_HASH_RCU_DQ_RECLAIM_MAX; + params.esize = sizeof(struct __rte_hash_rcu_dq_entry); + params.free_fn = __hash_rcu_qsbr_free_resource; + params.p = h; + params.v = cfg->v; + h->dq = rte_rcu_qsbr_dq_create(¶ms); + if (h->dq == NULL) { + rte_free(hash_rcu_cfg); + RTE_LOG(ERR, HASH, "HASH defer queue creation failed\n"); + return 1; + } } else { - rte_ring_sp_enqueue(h->free_slots, - (void *)((uintptr_t)bkt->key_idx[i])); + rte_free(hash_rcu_cfg); + rte_errno = EINVAL; + return 1; + } + + hash_rcu_cfg->v = cfg->v; + hash_rcu_cfg->mode = cfg->mode; + hash_rcu_cfg->dq_size = params.size; + hash_rcu_cfg->trigger_reclaim_limit = params.trigger_reclaim_limit; + hash_rcu_cfg->max_reclaim_size = params.max_reclaim_size; + hash_rcu_cfg->free_key_data_func = cfg->free_key_data_func; + hash_rcu_cfg->key_data_ptr = cfg->key_data_ptr; + + h->hash_rcu_cfg = hash_rcu_cfg; + + return 0; +} + +static inline void +remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, + unsigned int i) +{ + int ret = free_slot(h, bkt->key_idx[i]); + + if (ret < 0) { + RTE_LOG(ERR, HASH, + "%s: could not enqueue free slots in global ring\n", + __func__); } } @@ -1477,6 +1686,8 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key, int pos; int32_t ret, i; uint16_t short_sig; + uint32_t index = EMPTY_SLOT; + struct __rte_hash_rcu_dq_entry rcu_dq_entry; short_sig = get_short_sig(sig); prim_bucket_idx = get_prim_bucket_index(h, sig); @@ -1511,10 +1722,9 @@ __rte_hash_del_key_with_hash(const struct rte_hash *h, const void *key, /* Search last bucket to see if empty to be recycled */ return_bkt: - if (!last_bkt) { - __hash_rw_writer_unlock(h); - return ret; - } + if (!last_bkt) + goto return_key; + while (last_bkt->next) { prev_bkt = last_bkt; last_bkt = last_bkt->next; @@ -1527,11 +1737,11 @@ return_bkt: /* found empty bucket and recycle */ if (i == RTE_HASH_BUCKET_ENTRIES) { prev_bkt->next = NULL; - uint32_t index = last_bkt - h->buckets_ext + 1; + index = last_bkt - h->buckets_ext + 1; /* Recycle the empty bkt if * no_free_on_del is disabled. */ - if (h->no_free_on_del) + if (h->no_free_on_del) { /* Store index of an empty ext bkt to be recycled * on calling rte_hash_del_xxx APIs. * When lock free read-write concurrency is enabled, @@ -1539,10 +1749,33 @@ return_bkt: * immediately (as readers might be using it still). * Hence freeing of the ext bkt is piggy-backed to * freeing of the key index. + * If using external RCU, store this index in an array. + */ + if (h->hash_rcu_cfg == NULL) + h->ext_bkt_to_free[ret] = index; + } else + rte_ring_sp_enqueue_elem(h->free_ext_bkts, &index, + sizeof(uint32_t)); + } + +return_key: + /* Using internal RCU QSBR */ + if (h->hash_rcu_cfg) { + /* Key index where key is stored, adding the first dummy index */ + rcu_dq_entry.key_idx = ret + 1; + rcu_dq_entry.ext_bkt_idx = index; + if (h->dq == NULL) { + /* Wait for quiescent state change if using + * RTE_HASH_QSBR_MODE_SYNC */ - h->ext_bkt_to_free[ret] = index; - else - rte_ring_sp_enqueue(h->free_ext_bkts, (void *)(uintptr_t)index); + rte_rcu_qsbr_synchronize(h->hash_rcu_cfg->v, + RTE_QSBR_THRID_INVALID); + __hash_rcu_qsbr_free_resource((void *)((uintptr_t)h), + &rcu_dq_entry, 1); + } else if (h->dq) + /* Push into QSBR FIFO if using RTE_HASH_QSBR_MODE_DQ */ + if (rte_rcu_qsbr_dq_enqueue(h->dq, &rcu_dq_entry) != 0) + RTE_LOG(ERR, HASH, "Failed to push QSBR FIFO\n"); } __hash_rw_writer_unlock(h); return ret; @@ -1583,7 +1816,7 @@ rte_hash_get_key_with_position(const struct rte_hash *h, const int32_t position, return 0; } -int __rte_experimental +int rte_hash_free_key_with_position(const struct rte_hash *h, const int32_t position) { @@ -1592,9 +1825,9 @@ rte_hash_free_key_with_position(const struct rte_hash *h, RETURN_IF_TRUE(((h == NULL) || (key_idx == EMPTY_SLOT)), -EINVAL); - unsigned int lcore_id, n_slots; - struct lcore_cache *cached_free_slots; - const uint32_t total_entries = h->num_buckets * RTE_HASH_BUCKET_ENTRIES; + const uint32_t total_entries = h->use_local_cache ? + h->entries + (RTE_MAX_LCORE - 1) * (LCORE_CACHE_SIZE - 1) + 1 + : h->entries + 1; /* Out of bounds */ if (key_idx >= total_entries) @@ -1603,33 +1836,15 @@ rte_hash_free_key_with_position(const struct rte_hash *h, uint32_t index = h->ext_bkt_to_free[position]; if (index) { /* Recycle empty ext bkt to free list. */ - rte_ring_sp_enqueue(h->free_ext_bkts, (void *)(uintptr_t)index); + rte_ring_sp_enqueue_elem(h->free_ext_bkts, &index, + sizeof(uint32_t)); h->ext_bkt_to_free[position] = 0; } } - if (h->use_local_cache) { - lcore_id = rte_lcore_id(); - cached_free_slots = &h->local_free_slots[lcore_id]; - /* Cache full, need to free it. */ - if (cached_free_slots->len == LCORE_CACHE_SIZE) { - /* Need to enqueue the free slots in global ring. */ - n_slots = rte_ring_mp_enqueue_burst(h->free_slots, - cached_free_slots->objs, - LCORE_CACHE_SIZE, NULL); - RETURN_IF_TRUE((n_slots == 0), -EFAULT); - cached_free_slots->len -= n_slots; - } - /* Put index of new free slot in cache. */ - cached_free_slots->objs[cached_free_slots->len] = - (void *)((uintptr_t)key_idx); - cached_free_slots->len++; - } else { - rte_ring_sp_enqueue(h->free_slots, - (void *)((uintptr_t)key_idx)); - } + /* Enqueue slot to cache/ring of free slots. */ + return free_slot(h, key_idx); - return 0; } static inline void @@ -1643,7 +1858,7 @@ compare_signatures(uint32_t *prim_hash_matches, uint32_t *sec_hash_matches, /* For match mask the first bit of every two bits indicates the match */ switch (sig_cmp_fn) { -#if defined(RTE_MACHINE_CPUFLAG_SSE2) +#if defined(__SSE2__) case RTE_HASH_COMPARE_SSE: /* Compare all signatures in the bucket */ *prim_hash_matches = _mm_movemask_epi8(_mm_cmpeq_epi16( @@ -1656,10 +1871,9 @@ compare_signatures(uint32_t *prim_hash_matches, uint32_t *sec_hash_matches, (__m128i const *)sec_bkt->sig_current), _mm_set1_epi16(sig))); break; -#elif defined(RTE_MACHINE_CPUFLAG_NEON) +#elif defined(__ARM_NEON) case RTE_HASH_COMPARE_NEON: { uint16x8_t vmat, vsig, x; - uint64x2_t x64; int16x8_t shift = {-15, -13, -11, -9, -7, -5, -3, -1}; vsig = vld1q_dup_u16((uint16_t const *)&sig); @@ -1667,16 +1881,13 @@ compare_signatures(uint32_t *prim_hash_matches, uint32_t *sec_hash_matches, vmat = vceqq_u16(vsig, vld1q_u16((uint16_t const *)prim_bkt->sig_current)); x = vshlq_u16(vandq_u16(vmat, vdupq_n_u16(0x8000)), shift); - x64 = vpaddlq_u32(vpaddlq_u16(x)); - *prim_hash_matches = (uint32_t)(vgetq_lane_u64(x64, 0) + - vgetq_lane_u64(x64, 1)); + *prim_hash_matches = (uint32_t)(vaddvq_u16(x)); /* Compare all signatures in the secondary bucket */ vmat = vceqq_u16(vsig, vld1q_u16((uint16_t const *)sec_bkt->sig_current)); x = vshlq_u16(vandq_u16(vmat, vdupq_n_u16(0x8000)), shift); - x64 = vpaddlq_u32(vpaddlq_u16(x)); - *sec_hash_matches = (uint32_t)(vgetq_lane_u64(x64, 0) + - vgetq_lane_u64(x64, 1)); } + *sec_hash_matches = (uint32_t)(vaddvq_u16(x)); + } break; #endif default: @@ -1689,64 +1900,20 @@ compare_signatures(uint32_t *prim_hash_matches, uint32_t *sec_hash_matches, } } -#define PREFETCH_OFFSET 4 static inline void -__rte_hash_lookup_bulk_l(const struct rte_hash *h, const void **keys, - int32_t num_keys, int32_t *positions, - uint64_t *hit_mask, void *data[]) +__bulk_lookup_l(const struct rte_hash *h, const void **keys, + const struct rte_hash_bucket **primary_bkt, + const struct rte_hash_bucket **secondary_bkt, + uint16_t *sig, int32_t num_keys, int32_t *positions, + uint64_t *hit_mask, void *data[]) { uint64_t hits = 0; int32_t i; int32_t ret; - uint32_t prim_hash[RTE_HASH_LOOKUP_BULK_MAX]; - uint32_t prim_index[RTE_HASH_LOOKUP_BULK_MAX]; - uint32_t sec_index[RTE_HASH_LOOKUP_BULK_MAX]; - uint16_t sig[RTE_HASH_LOOKUP_BULK_MAX]; - const struct rte_hash_bucket *primary_bkt[RTE_HASH_LOOKUP_BULK_MAX]; - const struct rte_hash_bucket *secondary_bkt[RTE_HASH_LOOKUP_BULK_MAX]; uint32_t prim_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0}; uint32_t sec_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0}; struct rte_hash_bucket *cur_bkt, *next_bkt; - /* Prefetch first keys */ - for (i = 0; i < PREFETCH_OFFSET && i < num_keys; i++) - rte_prefetch0(keys[i]); - - /* - * Prefetch rest of the keys, calculate primary and - * secondary bucket and prefetch them - */ - for (i = 0; i < (num_keys - PREFETCH_OFFSET); i++) { - rte_prefetch0(keys[i + PREFETCH_OFFSET]); - - prim_hash[i] = rte_hash_hash(h, keys[i]); - - sig[i] = get_short_sig(prim_hash[i]); - prim_index[i] = get_prim_bucket_index(h, prim_hash[i]); - sec_index[i] = get_alt_bucket_index(h, prim_index[i], sig[i]); - - primary_bkt[i] = &h->buckets[prim_index[i]]; - secondary_bkt[i] = &h->buckets[sec_index[i]]; - - rte_prefetch0(primary_bkt[i]); - rte_prefetch0(secondary_bkt[i]); - } - - /* Calculate and prefetch rest of the buckets */ - for (; i < num_keys; i++) { - prim_hash[i] = rte_hash_hash(h, keys[i]); - - sig[i] = get_short_sig(prim_hash[i]); - prim_index[i] = get_prim_bucket_index(h, prim_hash[i]); - sec_index[i] = get_alt_bucket_index(h, prim_index[i], sig[i]); - - primary_bkt[i] = &h->buckets[prim_index[i]]; - secondary_bkt[i] = &h->buckets[sec_index[i]]; - - rte_prefetch0(primary_bkt[i]); - rte_prefetch0(secondary_bkt[i]); - } - __hash_rw_reader_lock(h); /* Compare signatures and prefetch key slot of first hit */ @@ -1881,64 +2048,20 @@ next_key: } static inline void -__rte_hash_lookup_bulk_lf(const struct rte_hash *h, const void **keys, - int32_t num_keys, int32_t *positions, - uint64_t *hit_mask, void *data[]) +__bulk_lookup_lf(const struct rte_hash *h, const void **keys, + const struct rte_hash_bucket **primary_bkt, + const struct rte_hash_bucket **secondary_bkt, + uint16_t *sig, int32_t num_keys, int32_t *positions, + uint64_t *hit_mask, void *data[]) { uint64_t hits = 0; int32_t i; int32_t ret; - uint32_t prim_hash[RTE_HASH_LOOKUP_BULK_MAX]; - uint32_t prim_index[RTE_HASH_LOOKUP_BULK_MAX]; - uint32_t sec_index[RTE_HASH_LOOKUP_BULK_MAX]; - uint16_t sig[RTE_HASH_LOOKUP_BULK_MAX]; - const struct rte_hash_bucket *primary_bkt[RTE_HASH_LOOKUP_BULK_MAX]; - const struct rte_hash_bucket *secondary_bkt[RTE_HASH_LOOKUP_BULK_MAX]; uint32_t prim_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0}; uint32_t sec_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0}; struct rte_hash_bucket *cur_bkt, *next_bkt; - void *pdata[RTE_HASH_LOOKUP_BULK_MAX]; uint32_t cnt_b, cnt_a; - /* Prefetch first keys */ - for (i = 0; i < PREFETCH_OFFSET && i < num_keys; i++) - rte_prefetch0(keys[i]); - - /* - * Prefetch rest of the keys, calculate primary and - * secondary bucket and prefetch them - */ - for (i = 0; i < (num_keys - PREFETCH_OFFSET); i++) { - rte_prefetch0(keys[i + PREFETCH_OFFSET]); - - prim_hash[i] = rte_hash_hash(h, keys[i]); - - sig[i] = get_short_sig(prim_hash[i]); - prim_index[i] = get_prim_bucket_index(h, prim_hash[i]); - sec_index[i] = get_alt_bucket_index(h, prim_index[i], sig[i]); - - primary_bkt[i] = &h->buckets[prim_index[i]]; - secondary_bkt[i] = &h->buckets[sec_index[i]]; - - rte_prefetch0(primary_bkt[i]); - rte_prefetch0(secondary_bkt[i]); - } - - /* Calculate and prefetch rest of the buckets */ - for (; i < num_keys; i++) { - prim_hash[i] = rte_hash_hash(h, keys[i]); - - sig[i] = get_short_sig(prim_hash[i]); - prim_index[i] = get_prim_bucket_index(h, prim_hash[i]); - sec_index[i] = get_alt_bucket_index(h, prim_index[i], sig[i]); - - primary_bkt[i] = &h->buckets[prim_index[i]]; - secondary_bkt[i] = &h->buckets[sec_index[i]]; - - rte_prefetch0(primary_bkt[i]); - rte_prefetch0(secondary_bkt[i]); - } - for (i = 0; i < num_keys; i++) positions[i] = -ENOENT; @@ -1999,10 +2122,6 @@ __rte_hash_lookup_bulk_lf(const struct rte_hash *h, const void **keys, (const char *)h->key_store + key_idx * h->key_entry_size); - if (key_idx != EMPTY_SLOT) - pdata[i] = __atomic_load_n( - &key_slot->pdata, - __ATOMIC_ACQUIRE); /* * If key index is 0, do not compare key, * as it is checking the dummy slot @@ -2011,7 +2130,9 @@ __rte_hash_lookup_bulk_lf(const struct rte_hash *h, const void **keys, !rte_hash_cmp_eq( key_slot->key, keys[i], h)) { if (data != NULL) - data[i] = pdata[i]; + data[i] = __atomic_load_n( + &key_slot->pdata, + __ATOMIC_ACQUIRE); hits |= 1ULL << i; positions[i] = key_idx - 1; @@ -2033,10 +2154,6 @@ __rte_hash_lookup_bulk_lf(const struct rte_hash *h, const void **keys, (const char *)h->key_store + key_idx * h->key_entry_size); - if (key_idx != EMPTY_SLOT) - pdata[i] = __atomic_load_n( - &key_slot->pdata, - __ATOMIC_ACQUIRE); /* * If key index is 0, do not compare key, * as it is checking the dummy slot @@ -2046,7 +2163,9 @@ __rte_hash_lookup_bulk_lf(const struct rte_hash *h, const void **keys, !rte_hash_cmp_eq( key_slot->key, keys[i], h)) { if (data != NULL) - data[i] = pdata[i]; + data[i] = __atomic_load_n( + &key_slot->pdata, + __ATOMIC_ACQUIRE); hits |= 1ULL << i; positions[i] = key_idx - 1; @@ -2107,6 +2226,92 @@ next_key: *hit_mask = hits; } +#define PREFETCH_OFFSET 4 +static inline void +__bulk_lookup_prefetching_loop(const struct rte_hash *h, + const void **keys, int32_t num_keys, + uint16_t *sig, + const struct rte_hash_bucket **primary_bkt, + const struct rte_hash_bucket **secondary_bkt) +{ + int32_t i; + uint32_t prim_hash[RTE_HASH_LOOKUP_BULK_MAX]; + uint32_t prim_index[RTE_HASH_LOOKUP_BULK_MAX]; + uint32_t sec_index[RTE_HASH_LOOKUP_BULK_MAX]; + + /* Prefetch first keys */ + for (i = 0; i < PREFETCH_OFFSET && i < num_keys; i++) + rte_prefetch0(keys[i]); + + /* + * Prefetch rest of the keys, calculate primary and + * secondary bucket and prefetch them + */ + for (i = 0; i < (num_keys - PREFETCH_OFFSET); i++) { + rte_prefetch0(keys[i + PREFETCH_OFFSET]); + + prim_hash[i] = rte_hash_hash(h, keys[i]); + + sig[i] = get_short_sig(prim_hash[i]); + prim_index[i] = get_prim_bucket_index(h, prim_hash[i]); + sec_index[i] = get_alt_bucket_index(h, prim_index[i], sig[i]); + + primary_bkt[i] = &h->buckets[prim_index[i]]; + secondary_bkt[i] = &h->buckets[sec_index[i]]; + + rte_prefetch0(primary_bkt[i]); + rte_prefetch0(secondary_bkt[i]); + } + + /* Calculate and prefetch rest of the buckets */ + for (; i < num_keys; i++) { + prim_hash[i] = rte_hash_hash(h, keys[i]); + + sig[i] = get_short_sig(prim_hash[i]); + prim_index[i] = get_prim_bucket_index(h, prim_hash[i]); + sec_index[i] = get_alt_bucket_index(h, prim_index[i], sig[i]); + + primary_bkt[i] = &h->buckets[prim_index[i]]; + secondary_bkt[i] = &h->buckets[sec_index[i]]; + + rte_prefetch0(primary_bkt[i]); + rte_prefetch0(secondary_bkt[i]); + } +} + + +static inline void +__rte_hash_lookup_bulk_l(const struct rte_hash *h, const void **keys, + int32_t num_keys, int32_t *positions, + uint64_t *hit_mask, void *data[]) +{ + uint16_t sig[RTE_HASH_LOOKUP_BULK_MAX]; + const struct rte_hash_bucket *primary_bkt[RTE_HASH_LOOKUP_BULK_MAX]; + const struct rte_hash_bucket *secondary_bkt[RTE_HASH_LOOKUP_BULK_MAX]; + + __bulk_lookup_prefetching_loop(h, keys, num_keys, sig, + primary_bkt, secondary_bkt); + + __bulk_lookup_l(h, keys, primary_bkt, secondary_bkt, sig, num_keys, + positions, hit_mask, data); +} + +static inline void +__rte_hash_lookup_bulk_lf(const struct rte_hash *h, const void **keys, + int32_t num_keys, int32_t *positions, + uint64_t *hit_mask, void *data[]) +{ + uint16_t sig[RTE_HASH_LOOKUP_BULK_MAX]; + const struct rte_hash_bucket *primary_bkt[RTE_HASH_LOOKUP_BULK_MAX]; + const struct rte_hash_bucket *secondary_bkt[RTE_HASH_LOOKUP_BULK_MAX]; + + __bulk_lookup_prefetching_loop(h, keys, num_keys, sig, + primary_bkt, secondary_bkt); + + __bulk_lookup_lf(h, keys, primary_bkt, secondary_bkt, sig, num_keys, + positions, hit_mask, data); +} + static inline void __rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys, int32_t num_keys, int32_t *positions, @@ -2148,6 +2353,123 @@ rte_hash_lookup_bulk_data(const struct rte_hash *h, const void **keys, return __builtin_popcountl(*hit_mask); } + +static inline void +__rte_hash_lookup_with_hash_bulk_l(const struct rte_hash *h, + const void **keys, hash_sig_t *prim_hash, + int32_t num_keys, int32_t *positions, + uint64_t *hit_mask, void *data[]) +{ + int32_t i; + uint32_t prim_index[RTE_HASH_LOOKUP_BULK_MAX]; + uint32_t sec_index[RTE_HASH_LOOKUP_BULK_MAX]; + uint16_t sig[RTE_HASH_LOOKUP_BULK_MAX]; + const struct rte_hash_bucket *primary_bkt[RTE_HASH_LOOKUP_BULK_MAX]; + const struct rte_hash_bucket *secondary_bkt[RTE_HASH_LOOKUP_BULK_MAX]; + + /* + * Prefetch keys, calculate primary and + * secondary bucket and prefetch them + */ + for (i = 0; i < num_keys; i++) { + rte_prefetch0(keys[i]); + + sig[i] = get_short_sig(prim_hash[i]); + prim_index[i] = get_prim_bucket_index(h, prim_hash[i]); + sec_index[i] = get_alt_bucket_index(h, prim_index[i], sig[i]); + + primary_bkt[i] = &h->buckets[prim_index[i]]; + secondary_bkt[i] = &h->buckets[sec_index[i]]; + + rte_prefetch0(primary_bkt[i]); + rte_prefetch0(secondary_bkt[i]); + } + + __bulk_lookup_l(h, keys, primary_bkt, secondary_bkt, sig, num_keys, + positions, hit_mask, data); +} + +static inline void +__rte_hash_lookup_with_hash_bulk_lf(const struct rte_hash *h, + const void **keys, hash_sig_t *prim_hash, + int32_t num_keys, int32_t *positions, + uint64_t *hit_mask, void *data[]) +{ + int32_t i; + uint32_t prim_index[RTE_HASH_LOOKUP_BULK_MAX]; + uint32_t sec_index[RTE_HASH_LOOKUP_BULK_MAX]; + uint16_t sig[RTE_HASH_LOOKUP_BULK_MAX]; + const struct rte_hash_bucket *primary_bkt[RTE_HASH_LOOKUP_BULK_MAX]; + const struct rte_hash_bucket *secondary_bkt[RTE_HASH_LOOKUP_BULK_MAX]; + + /* + * Prefetch keys, calculate primary and + * secondary bucket and prefetch them + */ + for (i = 0; i < num_keys; i++) { + rte_prefetch0(keys[i]); + + sig[i] = get_short_sig(prim_hash[i]); + prim_index[i] = get_prim_bucket_index(h, prim_hash[i]); + sec_index[i] = get_alt_bucket_index(h, prim_index[i], sig[i]); + + primary_bkt[i] = &h->buckets[prim_index[i]]; + secondary_bkt[i] = &h->buckets[sec_index[i]]; + + rte_prefetch0(primary_bkt[i]); + rte_prefetch0(secondary_bkt[i]); + } + + __bulk_lookup_lf(h, keys, primary_bkt, secondary_bkt, sig, num_keys, + positions, hit_mask, data); +} + +static inline void +__rte_hash_lookup_with_hash_bulk(const struct rte_hash *h, const void **keys, + hash_sig_t *prim_hash, int32_t num_keys, + int32_t *positions, uint64_t *hit_mask, void *data[]) +{ + if (h->readwrite_concur_lf_support) + __rte_hash_lookup_with_hash_bulk_lf(h, keys, prim_hash, + num_keys, positions, hit_mask, data); + else + __rte_hash_lookup_with_hash_bulk_l(h, keys, prim_hash, + num_keys, positions, hit_mask, data); +} + +int +rte_hash_lookup_with_hash_bulk(const struct rte_hash *h, const void **keys, + hash_sig_t *sig, uint32_t num_keys, int32_t *positions) +{ + RETURN_IF_TRUE(((h == NULL) || (keys == NULL) || + (sig == NULL) || (num_keys == 0) || + (num_keys > RTE_HASH_LOOKUP_BULK_MAX) || + (positions == NULL)), -EINVAL); + + __rte_hash_lookup_with_hash_bulk(h, keys, sig, num_keys, + positions, NULL, NULL); + return 0; +} + +int +rte_hash_lookup_with_hash_bulk_data(const struct rte_hash *h, + const void **keys, hash_sig_t *sig, + uint32_t num_keys, uint64_t *hit_mask, void *data[]) +{ + RETURN_IF_TRUE(((h == NULL) || (keys == NULL) || + (sig == NULL) || (num_keys == 0) || + (num_keys > RTE_HASH_LOOKUP_BULK_MAX) || + (hit_mask == NULL)), -EINVAL); + + int32_t positions[num_keys]; + + __rte_hash_lookup_with_hash_bulk(h, keys, sig, num_keys, + positions, hit_mask, data); + + /* Return number of hits */ + return __builtin_popcountl(*hit_mask); +} + int32_t rte_hash_iterate(const struct rte_hash *h, const void **key, void **data, uint32_t *next) {