+ /* Check if key was inserted after last check but before this
+ * protected region.
+ */
+ ret = search_and_update(h, data, key, bkt, sig);
+ if (ret != -1) {
+ __hash_rw_writer_unlock(h);
+ *ret_val = ret;
+ return 1;
+ }
+
+ FOR_EACH_BUCKET(cur_bkt, alt_bkt) {
+ ret = search_and_update(h, data, key, cur_bkt, sig);
+ if (ret != -1) {
+ __hash_rw_writer_unlock(h);
+ *ret_val = ret;
+ return 1;
+ }
+ }
+
+ while (likely(curr_node->prev != NULL)) {
+ prev_node = curr_node->prev;
+ prev_bkt = prev_node->bkt;
+ prev_slot = curr_node->prev_slot;
+
+ prev_alt_bkt_idx = get_alt_bucket_index(h,
+ prev_node->cur_bkt_idx,
+ prev_bkt->sig_current[prev_slot]);
+
+ if (unlikely(&h->buckets[prev_alt_bkt_idx]
+ != curr_bkt)) {
+ /* revert it to empty, otherwise duplicated keys */
+ __atomic_store_n(&curr_bkt->key_idx[curr_slot],
+ EMPTY_SLOT,
+ __ATOMIC_RELEASE);
+ __hash_rw_writer_unlock(h);
+ return -1;
+ }
+
+ if (h->readwrite_concur_lf_support) {
+ /* Inform the previous move. The current move need
+ * not be informed now as the current bucket entry
+ * is present in both primary and secondary.
+ * Since there is one writer, load acquires on
+ * tbl_chng_cnt are not required.
+ */
+ __atomic_store_n(h->tbl_chng_cnt,
+ *h->tbl_chng_cnt + 1,
+ __ATOMIC_RELEASE);
+ /* The store to sig_current should not
+ * move above the store to tbl_chng_cnt.
+ */
+ __atomic_thread_fence(__ATOMIC_RELEASE);
+ }
+
+ /* Need to swap current/alt sig to allow later
+ * Cuckoo insert to move elements back to its
+ * primary bucket if available
+ */
+ curr_bkt->sig_current[curr_slot] =
+ prev_bkt->sig_current[prev_slot];
+ /* Release the updated bucket entry */
+ __atomic_store_n(&curr_bkt->key_idx[curr_slot],
+ prev_bkt->key_idx[prev_slot],
+ __ATOMIC_RELEASE);
+
+ curr_slot = prev_slot;
+ curr_node = prev_node;
+ curr_bkt = curr_node->bkt;
+ }
+
+ if (h->readwrite_concur_lf_support) {
+ /* Inform the previous move. The current move need
+ * not be informed now as the current bucket entry
+ * is present in both primary and secondary.
+ * Since there is one writer, load acquires on
+ * tbl_chng_cnt are not required.
+ */
+ __atomic_store_n(h->tbl_chng_cnt,
+ *h->tbl_chng_cnt + 1,
+ __ATOMIC_RELEASE);
+ /* The store to sig_current should not
+ * move above the store to tbl_chng_cnt.
+ */
+ __atomic_thread_fence(__ATOMIC_RELEASE);
+ }
+
+ curr_bkt->sig_current[curr_slot] = sig;
+ /* Release the new bucket entry */
+ __atomic_store_n(&curr_bkt->key_idx[curr_slot],
+ new_idx,
+ __ATOMIC_RELEASE);
+
+ __hash_rw_writer_unlock(h);
+
+ return 0;
+
+}
+
+/*
+ * Make space for new key, using bfs Cuckoo Search and Multi-Writer safe
+ * Cuckoo
+ */
+static inline int
+rte_hash_cuckoo_make_space_mw(const struct rte_hash *h,
+ struct rte_hash_bucket *bkt,
+ struct rte_hash_bucket *sec_bkt,
+ const struct rte_hash_key *key, void *data,
+ uint16_t sig, uint32_t bucket_idx,
+ uint32_t new_idx, int32_t *ret_val)
+{
+ unsigned int i;
+ struct queue_node queue[RTE_HASH_BFS_QUEUE_MAX_LEN];
+ struct queue_node *tail, *head;
+ struct rte_hash_bucket *curr_bkt, *alt_bkt;
+ uint32_t cur_idx, alt_idx;
+
+ tail = queue;
+ head = queue + 1;
+ tail->bkt = bkt;
+ tail->prev = NULL;
+ tail->prev_slot = -1;
+ tail->cur_bkt_idx = bucket_idx;
+
+ /* Cuckoo bfs Search */
+ while (likely(tail != head && head <
+ queue + RTE_HASH_BFS_QUEUE_MAX_LEN -
+ RTE_HASH_BUCKET_ENTRIES)) {
+ curr_bkt = tail->bkt;
+ cur_idx = tail->cur_bkt_idx;
+ for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
+ if (curr_bkt->key_idx[i] == EMPTY_SLOT) {
+ int32_t ret = rte_hash_cuckoo_move_insert_mw(h,
+ bkt, sec_bkt, key, data,
+ tail, i, sig,
+ new_idx, ret_val);
+ if (likely(ret != -1))
+ return ret;
+ }
+
+ /* Enqueue new node and keep prev node info */
+ alt_idx = get_alt_bucket_index(h, cur_idx,
+ curr_bkt->sig_current[i]);
+ alt_bkt = &(h->buckets[alt_idx]);
+ head->bkt = alt_bkt;
+ head->cur_bkt_idx = alt_idx;
+ head->prev = tail;
+ head->prev_slot = i;
+ head++;
+ }
+ tail++;
+ }
+
+ return -ENOSPC;
+}
+
+static inline uint32_t
+alloc_slot(const struct rte_hash *h, struct lcore_cache *cached_free_slots)
+{
+ unsigned int n_slots;
+ uint32_t slot_id;
+
+ if (h->use_local_cache) {
+ /* Try to get a free slot from the local cache */
+ if (cached_free_slots->len == 0) {
+ /* Need to get another burst of free slots from global ring */
+ n_slots = rte_ring_mc_dequeue_burst_elem(h->free_slots,
+ cached_free_slots->objs,
+ sizeof(uint32_t),
+ LCORE_CACHE_SIZE, NULL);
+ if (n_slots == 0)
+ return EMPTY_SLOT;
+
+ cached_free_slots->len += n_slots;
+ }
+
+ /* Get a free slot from the local cache */
+ cached_free_slots->len--;
+ slot_id = cached_free_slots->objs[cached_free_slots->len];
+ } else {
+ if (rte_ring_sc_dequeue_elem(h->free_slots, &slot_id,
+ sizeof(uint32_t)) != 0)
+ return EMPTY_SLOT;
+ }
+
+ return slot_id;
+}
+
+static inline int32_t
+__rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
+ hash_sig_t sig, void *data)
+{
+ uint16_t short_sig;
+ uint32_t prim_bucket_idx, sec_bucket_idx;
+ struct rte_hash_bucket *prim_bkt, *sec_bkt, *cur_bkt;
+ struct rte_hash_key *new_k, *keys = h->key_store;
+ uint32_t ext_bkt_id = 0;
+ uint32_t slot_id;
+ int ret;
+ unsigned lcore_id;
+ unsigned int i;
+ struct lcore_cache *cached_free_slots = NULL;
+ int32_t ret_val;
+ struct rte_hash_bucket *last;
+
+ short_sig = get_short_sig(sig);
+ prim_bucket_idx = get_prim_bucket_index(h, sig);
+ sec_bucket_idx = get_alt_bucket_index(h, prim_bucket_idx, short_sig);
+ prim_bkt = &h->buckets[prim_bucket_idx];
+ sec_bkt = &h->buckets[sec_bucket_idx];
+ rte_prefetch0(prim_bkt);
+ rte_prefetch0(sec_bkt);
+
+ /* Check if key is already inserted in primary location */
+ __hash_rw_writer_lock(h);
+ ret = search_and_update(h, data, key, prim_bkt, short_sig);
+ if (ret != -1) {
+ __hash_rw_writer_unlock(h);
+ return ret;
+ }
+
+ /* Check if key is already inserted in secondary location */
+ FOR_EACH_BUCKET(cur_bkt, sec_bkt) {
+ ret = search_and_update(h, data, key, cur_bkt, short_sig);
+ if (ret != -1) {
+ __hash_rw_writer_unlock(h);
+ return ret;
+ }
+ }
+
+ __hash_rw_writer_unlock(h);
+
+ /* Did not find a match, so get a new slot for storing the new key */
+ if (h->use_local_cache) {
+ lcore_id = rte_lcore_id();
+ cached_free_slots = &h->local_free_slots[lcore_id];
+ }
+ slot_id = alloc_slot(h, cached_free_slots);
+ if (slot_id == EMPTY_SLOT) {
+ if (h->dq) {
+ __hash_rw_writer_lock(h);
+ ret = rte_rcu_qsbr_dq_reclaim(h->dq,
+ h->hash_rcu_cfg->max_reclaim_size,
+ NULL, NULL, NULL);
+ __hash_rw_writer_unlock(h);
+ if (ret == 0)
+ slot_id = alloc_slot(h, cached_free_slots);
+ }
+ if (slot_id == EMPTY_SLOT)
+ return -ENOSPC;
+ }
+
+ new_k = RTE_PTR_ADD(keys, slot_id * h->key_entry_size);
+ /* The store to application data (by the application) at *data should
+ * not leak after the store of pdata in the key store. i.e. pdata is
+ * the guard variable. Release the application data to the readers.
+ */
+ __atomic_store_n(&new_k->pdata,
+ data,
+ __ATOMIC_RELEASE);
+ /* Copy key */
+ memcpy(new_k->key, key, h->key_len);
+
+ /* Find an empty slot and insert */
+ ret = rte_hash_cuckoo_insert_mw(h, prim_bkt, sec_bkt, key, data,
+ short_sig, slot_id, &ret_val);
+ if (ret == 0)
+ return slot_id - 1;
+ else if (ret == 1) {
+ enqueue_slot_back(h, cached_free_slots, slot_id);
+ return ret_val;
+ }
+
+ /* Primary bucket full, need to make space for new entry */
+ ret = rte_hash_cuckoo_make_space_mw(h, prim_bkt, sec_bkt, key, data,
+ short_sig, prim_bucket_idx, slot_id, &ret_val);
+ if (ret == 0)
+ return slot_id - 1;
+ else if (ret == 1) {
+ enqueue_slot_back(h, cached_free_slots, slot_id);
+ return ret_val;
+ }
+
+ /* Also search secondary bucket to get better occupancy */
+ ret = rte_hash_cuckoo_make_space_mw(h, sec_bkt, prim_bkt, key, data,
+ short_sig, sec_bucket_idx, slot_id, &ret_val);
+
+ if (ret == 0)
+ return slot_id - 1;
+ else if (ret == 1) {
+ enqueue_slot_back(h, cached_free_slots, slot_id);
+ return ret_val;
+ }
+
+ /* if ext table not enabled, we failed the insertion */
+ if (!h->ext_table_support) {
+ enqueue_slot_back(h, cached_free_slots, slot_id);
+ return ret;
+ }
+
+ /* Now we need to go through the extendable bucket. Protection is needed
+ * to protect all extendable bucket processes.
+ */
+ __hash_rw_writer_lock(h);
+ /* We check for duplicates again since could be inserted before the lock */
+ ret = search_and_update(h, data, key, prim_bkt, short_sig);
+ if (ret != -1) {
+ enqueue_slot_back(h, cached_free_slots, slot_id);
+ goto failure;
+ }
+
+ FOR_EACH_BUCKET(cur_bkt, sec_bkt) {
+ ret = search_and_update(h, data, key, cur_bkt, short_sig);
+ if (ret != -1) {
+ enqueue_slot_back(h, cached_free_slots, slot_id);
+ goto failure;
+ }
+ }
+
+ /* Search sec and ext buckets to find an empty entry to insert. */
+ FOR_EACH_BUCKET(cur_bkt, sec_bkt) {
+ for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
+ /* Check if slot is available */
+ if (likely(cur_bkt->key_idx[i] == EMPTY_SLOT)) {
+ cur_bkt->sig_current[i] = short_sig;
+ /* Store to signature and key should not
+ * leak after the store to key_idx. i.e.
+ * key_idx is the guard variable for signature
+ * and key.
+ */
+ __atomic_store_n(&cur_bkt->key_idx[i],
+ slot_id,
+ __ATOMIC_RELEASE);
+ __hash_rw_writer_unlock(h);
+ return slot_id - 1;
+ }
+ }
+ }
+
+ /* Failed to get an empty entry from extendable buckets. Link a new
+ * extendable bucket. We first get a free bucket from ring.
+ */
+ if (rte_ring_sc_dequeue_elem(h->free_ext_bkts, &ext_bkt_id,
+ sizeof(uint32_t)) != 0 ||
+ ext_bkt_id == 0) {
+ if (h->dq) {
+ if (rte_rcu_qsbr_dq_reclaim(h->dq,
+ h->hash_rcu_cfg->max_reclaim_size,
+ NULL, NULL, NULL) == 0) {
+ rte_ring_sc_dequeue_elem(h->free_ext_bkts,
+ &ext_bkt_id,
+ sizeof(uint32_t));
+ }
+ }
+ if (ext_bkt_id == 0) {
+ ret = -ENOSPC;
+ goto failure;
+ }
+ }
+
+ /* Use the first location of the new bucket */
+ (h->buckets_ext[ext_bkt_id - 1]).sig_current[0] = short_sig;
+ /* Store to signature and key should not leak after
+ * the store to key_idx. i.e. key_idx is the guard variable
+ * for signature and key.
+ */
+ __atomic_store_n(&(h->buckets_ext[ext_bkt_id - 1]).key_idx[0],
+ slot_id,
+ __ATOMIC_RELEASE);
+ /* Link the new bucket to sec bucket linked list */
+ last = rte_hash_get_last_bkt(sec_bkt);
+ last->next = &h->buckets_ext[ext_bkt_id - 1];
+ __hash_rw_writer_unlock(h);
+ return slot_id - 1;
+
+failure:
+ __hash_rw_writer_unlock(h);
+ return ret;
+
+}
+
+int32_t
+rte_hash_add_key_with_hash(const struct rte_hash *h,
+ const void *key, hash_sig_t sig)
+{
+ RETURN_IF_TRUE(((h == NULL) || (key == NULL)), -EINVAL);
+ return __rte_hash_add_key_with_hash(h, key, sig, 0);
+}
+
+int32_t
+rte_hash_add_key(const struct rte_hash *h, const void *key)
+{
+ RETURN_IF_TRUE(((h == NULL) || (key == NULL)), -EINVAL);
+ return __rte_hash_add_key_with_hash(h, key, rte_hash_hash(h, key), 0);
+}