#include <rte_errno.h>
#include <rte_string_fns.h>
#include <rte_cpuflags.h>
-#include <rte_log.h>
#include <rte_rwlock.h>
#include <rte_spinlock.h>
#include <rte_ring.h>
#include <rte_compat.h>
+#include <rte_pause.h>
#include "rte_hash.h"
#include "rte_cuckoo_hash.h"
/* Search for an entry that can be pushed to its alternative location */
static inline int
-make_space_bucket(const struct rte_hash *h, struct rte_hash_bucket *bkt)
+make_space_bucket(const struct rte_hash *h, struct rte_hash_bucket *bkt,
+ unsigned int *nr_pushes)
{
unsigned i, j;
int ret;
break;
/* All entries have been pushed, so entry cannot be added */
- if (i == RTE_HASH_BUCKET_ENTRIES)
+ if (i == RTE_HASH_BUCKET_ENTRIES || ++(*nr_pushes) > RTE_HASH_MAX_PUSHES)
return -ENOSPC;
/* Set flag to indicate that this entry is going to be pushed */
bkt->flag[i] = 1;
+
/* Need room in alternative bucket to insert the pushed entry */
- ret = make_space_bucket(h, next_bkt[i]);
+ ret = make_space_bucket(h, next_bkt[i], nr_pushes);
/*
* After recursive function.
* Clear flags and insert the pushed entry
unsigned n_slots;
unsigned lcore_id;
struct lcore_cache *cached_free_slots = NULL;
+ unsigned int nr_pushes = 0;
if (h->add_key == ADD_KEY_MULTIWRITER)
rte_spinlock_lock(h->multiwriter_lock);
if (cached_free_slots->len == 0) {
/* Need to get another burst of free slots from global ring */
n_slots = rte_ring_mc_dequeue_burst(h->free_slots,
- cached_free_slots->objs, LCORE_CACHE_SIZE);
- if (n_slots == 0)
- return -ENOSPC;
+ cached_free_slots->objs,
+ LCORE_CACHE_SIZE, NULL);
+ if (n_slots == 0) {
+ ret = -ENOSPC;
+ goto failure;
+ }
cached_free_slots->len += n_slots;
}
cached_free_slots->len--;
slot_id = cached_free_slots->objs[cached_free_slots->len];
} else {
- if (rte_ring_sc_dequeue(h->free_slots, &slot_id) != 0)
- return -ENOSPC;
+ if (rte_ring_sc_dequeue(h->free_slots, &slot_id) != 0) {
+ ret = -ENOSPC;
+ goto failure;
+ }
}
new_k = RTE_PTR_ADD(keys, (uintptr_t)slot_id * h->key_entry_size);
k->pdata = data;
/*
* Return index where key is stored,
- * substracting the first dummy index
+ * subtracting the first dummy index
*/
return prim_bkt->key_idx[i] - 1;
}
k->pdata = data;
/*
* Return index where key is stored,
- * substracting the first dummy index
+ * subtracting the first dummy index
*/
return sec_bkt->key_idx[i] - 1;
}
* if successful or return error and
* store the new slot back in the ring
*/
- ret = make_space_bucket(h, prim_bkt);
+ ret = make_space_bucket(h, prim_bkt, &nr_pushes);
if (ret >= 0) {
prim_bkt->sig_current[ret] = sig;
prim_bkt->sig_alt[ret] = alt_hash;
/* Error in addition, store new slot back in the ring and return error */
enqueue_slot_back(h, cached_free_slots, (void *)((uintptr_t) new_idx));
+failure:
if (h->add_key == ADD_KEY_MULTIWRITER)
rte_spinlock_unlock(h->multiwriter_lock);
return ret;
*data = k->pdata;
/*
* Return index where key is stored,
- * substracting the first dummy index
+ * subtracting the first dummy index
*/
return bkt->key_idx[i] - 1;
}
*data = k->pdata;
/*
* Return index where key is stored,
- * substracting the first dummy index
+ * subtracting the first dummy index
*/
return bkt->key_idx[i] - 1;
}
/* Need to enqueue the free slots in global ring. */
n_slots = rte_ring_mp_enqueue_burst(h->free_slots,
cached_free_slots->objs,
- LCORE_CACHE_SIZE);
+ LCORE_CACHE_SIZE, NULL);
cached_free_slots->len -= n_slots;
}
/* Put index of new free slot in cache. */
/*
* Return index where key is stored,
- * substracting the first dummy index
+ * subtracting the first dummy index
*/
ret = bkt->key_idx[i] - 1;
bkt->key_idx[i] = EMPTY_SLOT;
/*
* Return index where key is stored,
- * substracting the first dummy index
+ * subtracting the first dummy index
*/
ret = bkt->key_idx[i] - 1;
bkt->key_idx[i] = EMPTY_SLOT;