hash: separate multi-writer from r/w concurrency
authorHonnappa Nagarahalli <honnappa.nagarahalli@arm.com>
Fri, 26 Oct 2018 05:37:29 +0000 (00:37 -0500)
committerThomas Monjalon <thomas@monjalon.net>
Fri, 26 Oct 2018 10:43:52 +0000 (12:43 +0200)
RW concurrency is required with single writer and multiple reader
usecase as well. Hence, multi-writer should not be enabled by default when
RW concurrency is enabled.

Fixes: f2e3001b53ec ("hash: support read/write concurrency")
Cc: stable@dpdk.org
Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
Reviewed-by: Gavin Hu <gavin.hu@arm.com>
Reviewed-by: Yipeng Wang <yipeng1.wang@intel.com>
Acked-by: Bruce Richardson <bruce.richardson@intel.com>
lib/librte_hash/rte_cuckoo_hash.c
lib/librte_hash/rte_cuckoo_hash.h
test/test/test_hash_readwrite.c

index 9a48934..3539a10 100644 (file)
@@ -136,9 +136,10 @@ rte_hash_create(const struct rte_hash_parameters *params)
        char ext_ring_name[RTE_RING_NAMESIZE];
        unsigned num_key_slots;
        unsigned i;
-       unsigned int hw_trans_mem_support = 0, multi_writer_support = 0;
+       unsigned int hw_trans_mem_support = 0, use_local_cache = 0;
        unsigned int ext_table_support = 0;
        unsigned int readwrite_concur_support = 0;
+       unsigned int writer_takes_lock = 0;
 
        rte_hash_function default_hash_func = (rte_hash_function)rte_jhash;
 
@@ -162,19 +163,21 @@ rte_hash_create(const struct rte_hash_parameters *params)
        if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_TRANS_MEM_SUPPORT)
                hw_trans_mem_support = 1;
 
-       if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_MULTI_WRITER_ADD)
-               multi_writer_support = 1;
+       if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_MULTI_WRITER_ADD) {
+               use_local_cache = 1;
+               writer_takes_lock = 1;
+       }
 
        if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY) {
                readwrite_concur_support = 1;
-               multi_writer_support = 1;
+               writer_takes_lock = 1;
        }
 
        if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_EXT_TABLE)
                ext_table_support = 1;
 
        /* Store all keys and leave the first entry as a dummy entry for lookup_bulk */
-       if (multi_writer_support)
+       if (use_local_cache)
                /*
                 * Increase number of slots by total number of indices
                 * that can be stored in the lcore caches
@@ -322,7 +325,7 @@ rte_hash_create(const struct rte_hash_parameters *params)
        h->cmp_jump_table_idx = KEY_OTHER_BYTES;
 #endif
 
-       if (multi_writer_support) {
+       if (use_local_cache) {
                h->local_free_slots = rte_zmalloc_socket(NULL,
                                sizeof(struct lcore_cache) * RTE_MAX_LCORE,
                                RTE_CACHE_LINE_SIZE, params->socket_id);
@@ -352,9 +355,10 @@ rte_hash_create(const struct rte_hash_parameters *params)
        h->key_store = k;
        h->free_slots = r;
        h->hw_trans_mem_support = hw_trans_mem_support;
-       h->multi_writer_support = multi_writer_support;
+       h->use_local_cache = use_local_cache;
        h->readwrite_concur_support = readwrite_concur_support;
        h->ext_table_support = ext_table_support;
+       h->writer_takes_lock = writer_takes_lock;
 
 #if defined(RTE_ARCH_X86)
        if (rte_cpu_get_flag_enabled(RTE_CPUFLAG_SSE2))
@@ -363,10 +367,11 @@ rte_hash_create(const struct rte_hash_parameters *params)
 #endif
                h->sig_cmp_fn = RTE_HASH_COMPARE_SCALAR;
 
-       /* Turn on multi-writer only with explicit flag from user and TM
-        * support.
+       /* Writer threads need to take the lock when:
+        * 1) RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY is enabled OR
+        * 2) RTE_HASH_EXTRA_FLAGS_MULTI_WRITER_ADD is enabled
         */
-       if (h->multi_writer_support) {
+       if (h->writer_takes_lock) {
                h->readwrite_lock = rte_malloc(NULL, sizeof(rte_rwlock_t),
                                                RTE_CACHE_LINE_SIZE);
                if (h->readwrite_lock == NULL)
@@ -425,10 +430,10 @@ rte_hash_free(struct rte_hash *h)
 
        rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
 
-       if (h->multi_writer_support) {
+       if (h->use_local_cache)
                rte_free(h->local_free_slots);
+       if (h->writer_takes_lock)
                rte_free(h->readwrite_lock);
-       }
        rte_ring_free(h->free_slots);
        rte_ring_free(h->free_ext_bkts);
        rte_free(h->key_store);
@@ -454,7 +459,7 @@ rte_hash_count(const struct rte_hash *h)
        if (h == NULL)
                return -EINVAL;
 
-       if (h->multi_writer_support) {
+       if (h->use_local_cache) {
                tot_ring_cnt = h->entries + (RTE_MAX_LCORE - 1) *
                                        (LCORE_CACHE_SIZE - 1);
                for (i = 0; i < RTE_MAX_LCORE; i++)
@@ -473,9 +478,9 @@ rte_hash_count(const struct rte_hash *h)
 static inline void
 __hash_rw_writer_lock(const struct rte_hash *h)
 {
-       if (h->multi_writer_support && h->hw_trans_mem_support)
+       if (h->writer_takes_lock && h->hw_trans_mem_support)
                rte_rwlock_write_lock_tm(h->readwrite_lock);
-       else if (h->multi_writer_support)
+       else if (h->writer_takes_lock)
                rte_rwlock_write_lock(h->readwrite_lock);
 }
 
@@ -491,9 +496,9 @@ __hash_rw_reader_lock(const struct rte_hash *h)
 static inline void
 __hash_rw_writer_unlock(const struct rte_hash *h)
 {
-       if (h->multi_writer_support && h->hw_trans_mem_support)
+       if (h->writer_takes_lock && h->hw_trans_mem_support)
                rte_rwlock_write_unlock_tm(h->readwrite_lock);
-       else if (h->multi_writer_support)
+       else if (h->writer_takes_lock)
                rte_rwlock_write_unlock(h->readwrite_lock);
 }
 
@@ -532,7 +537,7 @@ rte_hash_reset(struct rte_hash *h)
        }
 
        /* Repopulate the free slots ring. Entry zero is reserved for key misses */
-       if (h->multi_writer_support)
+       if (h->use_local_cache)
                tot_ring_cnt = h->entries + (RTE_MAX_LCORE - 1) *
                                        (LCORE_CACHE_SIZE - 1);
        else
@@ -548,7 +553,7 @@ rte_hash_reset(struct rte_hash *h)
                                                (void *)((uintptr_t) i));
        }
 
-       if (h->multi_writer_support) {
+       if (h->use_local_cache) {
                /* Reset local caches per lcore */
                for (i = 0; i < RTE_MAX_LCORE; i++)
                        h->local_free_slots[i].len = 0;
@@ -566,7 +571,7 @@ enqueue_slot_back(const struct rte_hash *h,
                struct lcore_cache *cached_free_slots,
                void *slot_id)
 {
-       if (h->multi_writer_support) {
+       if (h->use_local_cache) {
                cached_free_slots->objs[cached_free_slots->len] = slot_id;
                cached_free_slots->len++;
        } else
@@ -848,7 +853,7 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
        __hash_rw_writer_unlock(h);
 
        /* Did not find a match, so get a new slot for storing the new key */
-       if (h->multi_writer_support) {
+       if (h->use_local_cache) {
                lcore_id = rte_lcore_id();
                cached_free_slots = &h->local_free_slots[lcore_id];
                /* Try to get a free slot from the local cache */
@@ -1117,7 +1122,7 @@ remove_entry(const struct rte_hash *h, struct rte_hash_bucket *bkt, unsigned i)
        struct lcore_cache *cached_free_slots;
 
        bkt->sig_current[i] = NULL_SIGNATURE;
-       if (h->multi_writer_support) {
+       if (h->use_local_cache) {
                lcore_id = rte_lcore_id();
                cached_free_slots = &h->local_free_slots[lcore_id];
                /* Cache full, need to free it. */
index 7753cd8..8c522ac 100644 (file)
@@ -161,11 +161,15 @@ struct rte_hash {
        /**< Length of hash key. */
        uint8_t hw_trans_mem_support;
        /**< If hardware transactional memory is used. */
-       uint8_t multi_writer_support;
-       /**< If multi-writer support is enabled. */
+       uint8_t use_local_cache;
+       /**< If multi-writer support is enabled, use local cache
+        * to allocate key-store slots.
+        */
        uint8_t readwrite_concur_support;
        /**< If read-write concurrency support is enabled */
        uint8_t ext_table_support;     /**< Enable extendable bucket table */
+       uint8_t writer_takes_lock;
+       /**< Indicates if the writer threads need to take lock */
        rte_hash_function hash_func;    /**< Function used to calculate hash. */
        uint32_t hash_func_init_val;    /**< Init value used by hash_func. */
        rte_hash_cmp_eq_t rte_hash_custom_cmp_eq;
index 2a4f7b9..a8fadd0 100644 (file)
@@ -122,10 +122,12 @@ init_params(int use_htm, int use_jhash)
        if (use_htm)
                hash_params.extra_flag =
                        RTE_HASH_EXTRA_FLAGS_TRANS_MEM_SUPPORT |
-                       RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY;
+                       RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY |
+                       RTE_HASH_EXTRA_FLAGS_MULTI_WRITER_ADD;
        else
                hash_params.extra_flag =
-                       RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY;
+                       RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY |
+                       RTE_HASH_EXTRA_FLAGS_MULTI_WRITER_ADD;
 
        hash_params.name = "tests";