hash: add lock-free r/w concurrency
authorHonnappa Nagarahalli <honnappa.nagarahalli@arm.com>
Fri, 26 Oct 2018 05:37:32 +0000 (00:37 -0500)
committerThomas Monjalon <thomas@monjalon.net>
Fri, 26 Oct 2018 10:50:43 +0000 (12:50 +0200)
Add lock-free read-write concurrency. This is achieved by the
following changes.

1) Add memory ordering to avoid race conditions. The only race
condition that can occur is -  using the key store element
before the key write is completed. Hence, while inserting the element
the release memory order is used. Any other race condition is caught
by the key comparison. Memory orderings are added only where needed.
For ex: reads in the writer's context do not need memory ordering
as there is a single writer.

key_idx in the bucket entry and pdata in the key store element are
used for synchronisation. key_idx is used to release an inserted
entry in the bucket to the reader. Use of pdata for synchronisation
is required due to updation of an existing entry where-in only
the pdata is updated without updating key_idx.

2) Reader-writer concurrency issue, caused by moving the keys
to their alternative locations during key insert, is solved
by introducing a global counter(tbl_chng_cnt) indicating a
change in table.

3) Add the flag to enable reader-writer concurrency during
run time.

Signed-off-by: Honnappa Nagarahalli <honnappa.nagarahalli@arm.com>
Reviewed-by: Gavin Hu <gavin.hu@arm.com>
Reviewed-by: Ola Liljedahl <ola.liljedahl@arm.com>
Reviewed-by: Steve Capper <steve.capper@arm.com>
Reviewed-by: Yipeng Wang <yipeng1.wang@intel.com>
Acked-by: Bruce Richardson <bruce.richardson@intel.com>
doc/guides/prog_guide/hash_lib.rst
doc/guides/rel_notes/release_18_11.rst
lib/librte_hash/rte_cuckoo_hash.c
lib/librte_hash/rte_cuckoo_hash.h
lib/librte_hash/rte_hash.h

index 76a1f32..f5beec1 100644 (file)
@@ -1,5 +1,6 @@
 ..  SPDX-License-Identifier: BSD-3-Clause
     Copyright(c) 2010-2015 Intel Corporation.
+    Copyright(c) 2018 Arm Limited.
 
 .. _Hash_Library:
 
@@ -38,7 +39,7 @@ The main methods exported by the hash are:
 *   Lookup for entry with key: The key is provided as input. If an entry with the specified key is found in the hash (lookup hit),
     then the position of the entry is returned, otherwise (lookup miss) a negative value is returned.
 
-Apart from these method explained above, the API allows the user three more options:
+Apart from these methods explained above, the API provides the user with few more options:
 
 *   Add / lookup / delete with key and precomputed hash: Both the key and its precomputed hash are provided as input. This allows
     the user to perform these operations faster, as hash is already computed.
@@ -48,6 +49,9 @@ Apart from these method explained above, the API allows the user three more opti
 
 *   Combination of the two options above: User can provide key, precomputed hash and data.
 
+*   Ability to not free the position of the entry in the hash upon calling delete. This is useful for multi-threaded scenarios where
+    readers continue to use the position even after the entry is deleted.
+
 Also, the API contains a method to allow the user to look up entries in bursts, achieving higher performance
 than looking up individual entries, as the function prefetches next entries at the time it is operating
 with the first ones, which reduces significantly the impact of the necessary memory accesses.
@@ -83,13 +87,20 @@ For concurrent writes, and concurrent reads and writes the following flag values
    Key add, delete, and table reset are protected from other writer threads. With only this flag set, readers are not protected from ongoing writes.
 
 *  If the read/write concurrency (RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY) is set, multithread read/write operation is safe
-   (i.e., no need to stop the readers from accessing the hash table until writers finish their updates. Reads and writes can operate table concurrently).
+   (i.e., application does not need to stop the readers from accessing the hash table until writers finish their updates. Readers and writers can operate on the table concurrently).
+   The library uses a reader-writer lock to provide the concurrency.
 
 *  In addition to these two flag values, if the transactional memory flag (RTE_HASH_EXTRA_FLAGS_TRANS_MEM_SUPPORT) is also set,
-   hardware transactional memory will be used to guarantee the thread safety as long as it is supported by the hardware (for example the Intel® TSX support).
+   the reader-writer lock will use hardware transactional memory to guarantee thread safety as long as it is supported by the hardware (for example the Intel® TSX support).
+   If the platform supports Intel® TSX, it is advised to set the transactional memory flag, as this will speed up concurrent table operations.
+   Otherwise concurrent operations will be slower because of the overhead associated with the software locking mechanisms.
+
+*  If lock free read/write concurrency (RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF) is set, read/write concurrency is provided without using reader-writer lock.
+   For platforms (ex: current Arm based platforms), that do not support transactional memory, it is advised to set this flag to achieve greater scalability in performance.
 
-If the platform supports Intel® TSX, it is advised to set the transactional memory flag, as this will speed up concurrent table operations.
-Otherwise concurrent operations will be slower because of the overhead associated with the software locking mechanisms.
+*  If, do not free on delete (RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL) flag is set, the position of the entry in the hash is not freed upon calling delete. This flag is enabled
+   by default when lock free read/write concurrency is set. The application should free the position after all the readers have stopped referencing the position.
+   Where required, the application can make use of RCU mechanisms to determine when the readers have stopped referencing the position.
 
 Implementation Details
 ----------------------
@@ -148,6 +159,14 @@ key is considered not able to be stored.
 With random keys, this method allows the user to get around 90% of the table utilization, without
 having to drop any stored entry (LRU) or allocate more memory (extended buckets).
 
+
+Example of deletion:
+
+Similar to lookup, the key is searched in its primary and secondary buckets. If the key is found, the bucket
+entry is marked as an empty slot. If the hash table was configured with 'no free on delete' or 'lock free read/write concurrency',
+the position of the key is not freed. It is the responsibility of the user to free the position while making sure that
+readers are not referencing the position anymore.
+
 Entry distribution in hash table
 --------------------------------
 
@@ -240,6 +259,10 @@ The flow table operations on the application side are described below:
 *   Delete flow: Delete the flow key from the hash. If the returned position is valid,
     use it to access the flow entry in the flow table to invalidate the information associated with the flow.
 
+*   Free flow: Free flow key position. If 'no free on delete' or 'lock-free read/write concurrency' flags are set,
+    wait till the readers are not referencing the position returned during add/delete flow and then free the position.
+    RCU mechanisms can be used to find out when the readers are not referencing the position anymore.
+
 *   Lookup flow: Lookup for the flow key in the hash.
     If the returned position is valid (flow lookup hit), use the returned position to access the flow entry in the flow table.
     Otherwise (flow lookup miss) there is no flow registered for the current packet.
index 771295d..b354840 100644 (file)
@@ -190,6 +190,12 @@ New Features
   collisions.  In addition, the internal hashing algorithm was changed to use
   partial-key hashing to improve memory efficiency and lookup performance.
 
+* **Added lock free reader/writer concurrency to hash library (rte_hash).**
+
+  Lock free reader/writer concurrency prevents the readers from getting
+  blocked due to a pre-empted writer thread. This allows the hash library
+  to be used in scenarios where the writer thread runs on control plane.
+
 * **Added Traffic Pattern Aware Power Control Library**
 
   Added an experimental library. This extend Power Library and provide
index d79ba68..0648a22 100644 (file)
@@ -1,5 +1,6 @@
 /* SPDX-License-Identifier: BSD-3-Clause
  * Copyright(c) 2010-2016 Intel Corporation
+ * Copyright(c) 2018 Arm Limited
  */
 
 #include <string.h>
@@ -141,6 +142,8 @@ rte_hash_create(const struct rte_hash_parameters *params)
        unsigned int readwrite_concur_support = 0;
        unsigned int writer_takes_lock = 0;
        unsigned int no_free_on_del = 0;
+       uint32_t *tbl_chng_cnt = NULL;
+       unsigned int readwrite_concur_lf_support = 0;
 
        rte_hash_function default_hash_func = (rte_hash_function)rte_jhash;
 
@@ -160,6 +163,24 @@ rte_hash_create(const struct rte_hash_parameters *params)
                return NULL;
        }
 
+       /* Validate correct usage of extra options */
+       if ((params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY) &&
+           (params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF)) {
+               rte_errno = EINVAL;
+               RTE_LOG(ERR, HASH, "rte_hash_create: choose rw concurrency or "
+                       "rw concurrency lock free\n");
+               return NULL;
+       }
+
+       if ((params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF) &&
+           (params->extra_flag & RTE_HASH_EXTRA_FLAGS_EXT_TABLE)) {
+               rte_errno = EINVAL;
+               RTE_LOG(ERR, HASH, "rte_hash_create: extendable bucket "
+                       "feature not supported with rw concurrency "
+                       "lock free\n");
+               return NULL;
+       }
+
        /* Check extra flags field to check extra options. */
        if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_TRANS_MEM_SUPPORT)
                hw_trans_mem_support = 1;
@@ -180,6 +201,12 @@ rte_hash_create(const struct rte_hash_parameters *params)
        if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL)
                no_free_on_del = 1;
 
+       if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF) {
+               readwrite_concur_lf_support = 1;
+               /* Enable not freeing internal memory/index on delete */
+               no_free_on_del = 1;
+       }
+
        /* Store all keys and leave the first entry as a dummy entry for lookup_bulk */
        if (use_local_cache)
                /*
@@ -292,6 +319,14 @@ rte_hash_create(const struct rte_hash_parameters *params)
                goto err_unlock;
        }
 
+       tbl_chng_cnt = rte_zmalloc_socket(NULL, sizeof(uint32_t),
+                       RTE_CACHE_LINE_SIZE, params->socket_id);
+
+       if (tbl_chng_cnt == NULL) {
+               RTE_LOG(ERR, HASH, "memory allocation failed\n");
+               goto err_unlock;
+       }
+
 /*
  * If x86 architecture is used, select appropriate compare function,
  * which may use x86 intrinsics, otherwise use memcmp
@@ -360,12 +395,15 @@ rte_hash_create(const struct rte_hash_parameters *params)
                default_hash_func : params->hash_func;
        h->key_store = k;
        h->free_slots = r;
+       h->tbl_chng_cnt = tbl_chng_cnt;
+       *h->tbl_chng_cnt = 0;
        h->hw_trans_mem_support = hw_trans_mem_support;
        h->use_local_cache = use_local_cache;
        h->readwrite_concur_support = readwrite_concur_support;
        h->ext_table_support = ext_table_support;
        h->writer_takes_lock = writer_takes_lock;
        h->no_free_on_del = no_free_on_del;
+       h->readwrite_concur_lf_support = readwrite_concur_lf_support;
 
 #if defined(RTE_ARCH_X86)
        if (rte_cpu_get_flag_enabled(RTE_CPUFLAG_SSE2))
@@ -406,6 +444,7 @@ err:
        rte_free(buckets);
        rte_free(buckets_ext);
        rte_free(k);
+       rte_free(tbl_chng_cnt);
        return NULL;
 }
 
@@ -446,6 +485,7 @@ rte_hash_free(struct rte_hash *h)
        rte_free(h->key_store);
        rte_free(h->buckets);
        rte_free(h->buckets_ext);
+       rte_free(h->tbl_chng_cnt);
        rte_free(h);
        rte_free(te);
 }
@@ -530,6 +570,7 @@ rte_hash_reset(struct rte_hash *h)
        __hash_rw_writer_lock(h);
        memset(h->buckets, 0, h->num_buckets * sizeof(struct rte_hash_bucket));
        memset(h->key_store, 0, h->key_entry_size * (h->entries + 1));
+       *h->tbl_chng_cnt = 0;
 
        /* clear the free ring */
        while (rte_ring_dequeue(h->free_slots, &ptr) == 0)
@@ -585,7 +626,9 @@ enqueue_slot_back(const struct rte_hash *h,
                rte_ring_sp_enqueue(h->free_slots, slot_id);
 }
 
-/* Search a key from bucket and update its data */
+/* Search a key from bucket and update its data.
+ * Writer holds the lock before calling this.
+ */
 static inline int32_t
 search_and_update(const struct rte_hash *h, void *data, const void *key,
        struct rte_hash_bucket *bkt, uint16_t sig)
@@ -598,8 +641,13 @@ search_and_update(const struct rte_hash *h, void *data, const void *key,
                        k = (struct rte_hash_key *) ((char *)keys +
                                        bkt->key_idx[i] * h->key_entry_size);
                        if (rte_hash_cmp_eq(key, k->key, h) == 0) {
-                               /* Update data */
-                               k->pdata = data;
+                               /* 'pdata' acts as the synchronization point
+                                * when an existing hash entry is updated.
+                                * Key is not updated in this case.
+                                */
+                               __atomic_store_n(&k->pdata,
+                                       data,
+                                       __ATOMIC_RELEASE);
                                /*
                                 * Return index where key is stored,
                                 * subtracting the first dummy index
@@ -655,7 +703,15 @@ rte_hash_cuckoo_insert_mw(const struct rte_hash *h,
                /* Check if slot is available */
                if (likely(prim_bkt->key_idx[i] == EMPTY_SLOT)) {
                        prim_bkt->sig_current[i] = sig;
-                       prim_bkt->key_idx[i] = new_idx;
+                       /* Key can be of arbitrary length, so it is
+                        * not possible to store it atomically.
+                        * Hence the new key element's memory stores
+                        * (key as well as data) should be complete
+                        * before it is referenced.
+                        */
+                       __atomic_store_n(&prim_bkt->key_idx[i],
+                                        new_idx,
+                                        __ATOMIC_RELEASE);
                        break;
                }
        }
@@ -728,27 +784,66 @@ rte_hash_cuckoo_move_insert_mw(const struct rte_hash *h,
                if (unlikely(&h->buckets[prev_alt_bkt_idx]
                                != curr_bkt)) {
                        /* revert it to empty, otherwise duplicated keys */
-                       curr_bkt->key_idx[curr_slot] = EMPTY_SLOT;
+                       __atomic_store_n(&curr_bkt->key_idx[curr_slot],
+                               EMPTY_SLOT,
+                               __ATOMIC_RELEASE);
                        __hash_rw_writer_unlock(h);
                        return -1;
                }
 
+               if (h->readwrite_concur_lf_support) {
+                       /* Inform the previous move. The current move need
+                        * not be informed now as the current bucket entry
+                        * is present in both primary and secondary.
+                        * Since there is one writer, load acquires on
+                        * tbl_chng_cnt are not required.
+                        */
+                       __atomic_store_n(h->tbl_chng_cnt,
+                                        *h->tbl_chng_cnt + 1,
+                                        __ATOMIC_RELEASE);
+                       /* The stores to sig_alt and sig_current should not
+                        * move above the store to tbl_chng_cnt.
+                        */
+                       __atomic_thread_fence(__ATOMIC_RELEASE);
+               }
+
                /* Need to swap current/alt sig to allow later
                 * Cuckoo insert to move elements back to its
                 * primary bucket if available
                 */
                curr_bkt->sig_current[curr_slot] =
                        prev_bkt->sig_current[prev_slot];
-               curr_bkt->key_idx[curr_slot] =
-                       prev_bkt->key_idx[prev_slot];
+               /* Release the updated bucket entry */
+               __atomic_store_n(&curr_bkt->key_idx[curr_slot],
+                       prev_bkt->key_idx[prev_slot],
+                       __ATOMIC_RELEASE);
 
                curr_slot = prev_slot;
                curr_node = prev_node;
                curr_bkt = curr_node->bkt;
        }
 
+       if (h->readwrite_concur_lf_support) {
+               /* Inform the previous move. The current move need
+                * not be informed now as the current bucket entry
+                * is present in both primary and secondary.
+                * Since there is one writer, load acquires on
+                * tbl_chng_cnt are not required.
+                */
+               __atomic_store_n(h->tbl_chng_cnt,
+                                *h->tbl_chng_cnt + 1,
+                                __ATOMIC_RELEASE);
+               /* The stores to sig_alt and sig_current should not
+                * move above the store to tbl_chng_cnt.
+                */
+               __atomic_thread_fence(__ATOMIC_RELEASE);
+       }
+
        curr_bkt->sig_current[curr_slot] = sig;
-       curr_bkt->key_idx[curr_slot] = new_idx;
+       /* Release the new bucket entry */
+       __atomic_store_n(&curr_bkt->key_idx[curr_slot],
+                        new_idx,
+                        __ATOMIC_RELEASE);
 
        __hash_rw_writer_unlock(h);
 
@@ -889,8 +984,15 @@ __rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key,
        new_idx = (uint32_t)((uintptr_t) slot_id);
        /* Copy key */
        rte_memcpy(new_k->key, key, h->key_len);
-       new_k->pdata = data;
-
+       /* Key can be of arbitrary length, so it is not possible to store
+        * it atomically. Hence the new key element's memory stores
+        * (key as well as data) should be complete before it is referenced.
+        * 'pdata' acts as the synchronization point when an existing hash
+        * entry is updated.
+        */
+       __atomic_store_n(&new_k->pdata,
+               data,
+               __ATOMIC_RELEASE);
 
        /* Find an empty slot and insert */
        ret = rte_hash_cuckoo_insert_mw(h, prim_bkt, sec_bkt, key, data,
@@ -1034,21 +1136,27 @@ search_one_bucket(const struct rte_hash *h, const void *key, uint16_t sig,
                        void **data, const struct rte_hash_bucket *bkt)
 {
        int i;
+       uint32_t key_idx;
+       void *pdata;
        struct rte_hash_key *k, *keys = h->key_store;
 
        for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
-               if (bkt->sig_current[i] == sig &&
-                               bkt->key_idx[i] != EMPTY_SLOT) {
+               key_idx = __atomic_load_n(&bkt->key_idx[i],
+                                         __ATOMIC_ACQUIRE);
+               if (bkt->sig_current[i] == sig && key_idx != EMPTY_SLOT) {
                        k = (struct rte_hash_key *) ((char *)keys +
-                                       bkt->key_idx[i] * h->key_entry_size);
+                                       key_idx * h->key_entry_size);
+                       pdata = __atomic_load_n(&k->pdata,
+                                       __ATOMIC_ACQUIRE);
+
                        if (rte_hash_cmp_eq(key, k->key, h) == 0) {
                                if (data != NULL)
-                                       *data = k->pdata;
+                                       *data = pdata;
                                /*
                                 * Return index where key is stored,
                                 * subtracting the first dummy index
                                 */
-                               return bkt->key_idx[i] - 1;
+                               return key_idx - 1;
                        }
                }
        }
@@ -1061,34 +1169,62 @@ __rte_hash_lookup_with_hash(const struct rte_hash *h, const void *key,
 {
        uint32_t prim_bucket_idx, sec_bucket_idx;
        struct rte_hash_bucket *bkt, *cur_bkt;
+       uint32_t cnt_b, cnt_a;
        int ret;
        uint16_t short_sig;
 
        short_sig = get_short_sig(sig);
        prim_bucket_idx = get_prim_bucket_index(h, sig);
        sec_bucket_idx = get_alt_bucket_index(h, prim_bucket_idx, short_sig);
-       bkt = &h->buckets[prim_bucket_idx];
 
        __hash_rw_reader_lock(h);
 
-       /* Check if key is in primary location */
-       ret = search_one_bucket(h, key, short_sig, data, bkt);
-       if (ret != -1) {
-               __hash_rw_reader_unlock(h);
-               return ret;
-       }
-       /* Calculate secondary hash */
-       bkt = &h->buckets[sec_bucket_idx];
+       do {
+               /* Load the table change counter before the lookup
+                * starts. Acquire semantics will make sure that
+                * loads in search_one_bucket are not hoisted.
+                */
+               cnt_b = __atomic_load_n(h->tbl_chng_cnt,
+                               __ATOMIC_ACQUIRE);
 
-       /* Check if key is in secondary location */
-       FOR_EACH_BUCKET(cur_bkt, bkt) {
-               ret = search_one_bucket(h, key, short_sig, data, cur_bkt);
+               /* Check if key is in primary location */
+               bkt = &h->buckets[prim_bucket_idx];
+               ret = search_one_bucket(h, key, short_sig, data, bkt);
                if (ret != -1) {
                        __hash_rw_reader_unlock(h);
                        return ret;
                }
-       }
+               /* Calculate secondary hash */
+               bkt = &h->buckets[sec_bucket_idx];
+
+               /* Check if key is in secondary location */
+               FOR_EACH_BUCKET(cur_bkt, bkt) {
+                       ret = search_one_bucket(h, key, short_sig,
+                                               data, cur_bkt);
+                       if (ret != -1) {
+                               __hash_rw_reader_unlock(h);
+                               return ret;
+                       }
+               }
+
+               /* The loads of sig_current in search_one_bucket
+                * should not move below the load from tbl_chng_cnt.
+                */
+               __atomic_thread_fence(__ATOMIC_ACQUIRE);
+               /* Re-read the table change counter to check if the
+                * table has changed during search. If yes, re-do
+                * the search.
+                * This load should not get hoisted. The load
+                * acquires on cnt_b, key index in primary bucket
+                * and key index in secondary bucket will make sure
+                * that it does not get hoisted.
+                */
+               cnt_a = __atomic_load_n(h->tbl_chng_cnt,
+                                       __ATOMIC_ACQUIRE);
+       } while (cnt_b != cnt_a);
+
        __hash_rw_reader_unlock(h);
+
        return -ENOENT;
 }
 
@@ -1173,21 +1309,25 @@ __rte_hash_compact_ll(struct rte_hash_bucket *cur_bkt, int pos) {
        }
 }
 
-/* Search one bucket and remove the matched key */
+/* Search one bucket and remove the matched key.
+ * Writer is expected to hold the lock while calling this
+ * function.
+ */
 static inline int32_t
 search_and_remove(const struct rte_hash *h, const void *key,
                        struct rte_hash_bucket *bkt, uint16_t sig, int *pos)
 {
        struct rte_hash_key *k, *keys = h->key_store;
        unsigned int i;
-       int32_t ret;
+       uint32_t key_idx;
 
        /* Check if key is in bucket */
        for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
-               if (bkt->sig_current[i] == sig &&
-                               bkt->key_idx[i] != EMPTY_SLOT) {
+               key_idx = __atomic_load_n(&bkt->key_idx[i],
+                                         __ATOMIC_ACQUIRE);
+               if (bkt->sig_current[i] == sig && key_idx != EMPTY_SLOT) {
                        k = (struct rte_hash_key *) ((char *)keys +
-                                       bkt->key_idx[i] * h->key_entry_size);
+                                       key_idx * h->key_entry_size);
                        if (rte_hash_cmp_eq(key, k->key, h) == 0) {
                                bkt->sig_current[i] = NULL_SIGNATURE;
                                /* Free the key store index if
@@ -1196,13 +1336,16 @@ search_and_remove(const struct rte_hash *h, const void *key,
                                if (!h->no_free_on_del)
                                        remove_entry(h, bkt, i);
 
-                               /* Return index where key is stored,
+                               __atomic_store_n(&bkt->key_idx[i],
+                                                EMPTY_SLOT,
+                                                __ATOMIC_RELEASE);
+
+                               *pos = i;
+                               /*
+                                * Return index where key is stored,
                                 * subtracting the first dummy index
                                 */
-                               ret = bkt->key_idx[i] - 1;
-                               bkt->key_idx[i] = EMPTY_SLOT;
-                               *pos = i;
-                               return ret;
+                               return key_idx - 1;
                        }
                }
        }
@@ -1402,6 +1545,8 @@ __rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys,
        uint32_t prim_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0};
        uint32_t sec_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0};
        struct rte_hash_bucket *cur_bkt, *next_bkt;
+       void *pdata[RTE_HASH_LOOKUP_BULK_MAX];
+       uint32_t cnt_b, cnt_a;
 
        /* Prefetch first keys */
        for (i = 0; i < PREFETCH_OFFSET && i < num_keys; i++)
@@ -1443,91 +1588,138 @@ __rte_hash_lookup_bulk(const struct rte_hash *h, const void **keys,
        }
 
        __hash_rw_reader_lock(h);
-       /* Compare signatures and prefetch key slot of first hit */
-       for (i = 0; i < num_keys; i++) {
-               compare_signatures(&prim_hitmask[i], &sec_hitmask[i],
+       do {
+               /* Load the table change counter before the lookup
+                * starts. Acquire semantics will make sure that
+                * loads in compare_signatures are not hoisted.
+                */
+               cnt_b = __atomic_load_n(h->tbl_chng_cnt,
+                                       __ATOMIC_ACQUIRE);
+
+               /* Compare signatures and prefetch key slot of first hit */
+               for (i = 0; i < num_keys; i++) {
+                       compare_signatures(&prim_hitmask[i], &sec_hitmask[i],
                                primary_bkt[i], secondary_bkt[i],
                                sig[i], h->sig_cmp_fn);
 
-               if (prim_hitmask[i]) {
-                       uint32_t first_hit =
-                                       __builtin_ctzl(prim_hitmask[i]) >> 1;
-                       uint32_t key_idx = primary_bkt[i]->key_idx[first_hit];
-                       const struct rte_hash_key *key_slot =
-                               (const struct rte_hash_key *)(
-                               (const char *)h->key_store +
-                               key_idx * h->key_entry_size);
-                       rte_prefetch0(key_slot);
-                       continue;
-               }
-
-               if (sec_hitmask[i]) {
-                       uint32_t first_hit =
-                                       __builtin_ctzl(sec_hitmask[i]) >> 1;
-                       uint32_t key_idx = secondary_bkt[i]->key_idx[first_hit];
-                       const struct rte_hash_key *key_slot =
-                               (const struct rte_hash_key *)(
-                               (const char *)h->key_store +
-                               key_idx * h->key_entry_size);
-                       rte_prefetch0(key_slot);
-               }
-       }
-
-       /* Compare keys, first hits in primary first */
-       for (i = 0; i < num_keys; i++) {
-               positions[i] = -ENOENT;
-               while (prim_hitmask[i]) {
-                       uint32_t hit_index =
-                                       __builtin_ctzl(prim_hitmask[i]) >> 1;
-
-                       uint32_t key_idx = primary_bkt[i]->key_idx[hit_index];
-                       const struct rte_hash_key *key_slot =
-                               (const struct rte_hash_key *)(
-                               (const char *)h->key_store +
-                               key_idx * h->key_entry_size);
-                       /*
-                        * If key index is 0, do not compare key,
-                        * as it is checking the dummy slot
-                        */
-                       if (!!key_idx & !rte_hash_cmp_eq(key_slot->key, keys[i], h)) {
-                               if (data != NULL)
-                                       data[i] = key_slot->pdata;
+                       if (prim_hitmask[i]) {
+                               uint32_t first_hit =
+                                               __builtin_ctzl(prim_hitmask[i])
+                                               >> 1;
+                               uint32_t key_idx =
+                                       primary_bkt[i]->key_idx[first_hit];
+                               const struct rte_hash_key *key_slot =
+                                       (const struct rte_hash_key *)(
+                                       (const char *)h->key_store +
+                                       key_idx * h->key_entry_size);
+                               rte_prefetch0(key_slot);
+                               continue;
+                       }
 
-                               hits |= 1ULL << i;
-                               positions[i] = key_idx - 1;
-                               goto next_key;
+                       if (sec_hitmask[i]) {
+                               uint32_t first_hit =
+                                               __builtin_ctzl(sec_hitmask[i])
+                                               >> 1;
+                               uint32_t key_idx =
+                                       secondary_bkt[i]->key_idx[first_hit];
+                               const struct rte_hash_key *key_slot =
+                                       (const struct rte_hash_key *)(
+                                       (const char *)h->key_store +
+                                       key_idx * h->key_entry_size);
+                               rte_prefetch0(key_slot);
                        }
-                       prim_hitmask[i] &= ~(3ULL << (hit_index << 1));
                }
 
-               while (sec_hitmask[i]) {
-                       uint32_t hit_index =
-                                       __builtin_ctzl(sec_hitmask[i]) >> 1;
-
-                       uint32_t key_idx = secondary_bkt[i]->key_idx[hit_index];
-                       const struct rte_hash_key *key_slot =
-                               (const struct rte_hash_key *)(
-                               (const char *)h->key_store +
-                               key_idx * h->key_entry_size);
-                       /*
-                        * If key index is 0, do not compare key,
-                        * as it is checking the dummy slot
-                        */
+               /* Compare keys, first hits in primary first */
+               for (i = 0; i < num_keys; i++) {
+                       positions[i] = -ENOENT;
+                       while (prim_hitmask[i]) {
+                               uint32_t hit_index =
+                                               __builtin_ctzl(prim_hitmask[i])
+                                               >> 1;
+                               uint32_t key_idx =
+                               __atomic_load_n(
+                                       &primary_bkt[i]->key_idx[hit_index],
+                                       __ATOMIC_ACQUIRE);
+                               const struct rte_hash_key *key_slot =
+                                       (const struct rte_hash_key *)(
+                                       (const char *)h->key_store +
+                                       key_idx * h->key_entry_size);
+
+                               if (key_idx != EMPTY_SLOT)
+                                       pdata[i] = __atomic_load_n(
+                                                       &key_slot->pdata,
+                                                       __ATOMIC_ACQUIRE);
+                               /*
+                                * If key index is 0, do not compare key,
+                                * as it is checking the dummy slot
+                                */
+                               if (!!key_idx &
+                                       !rte_hash_cmp_eq(
+                                               key_slot->key, keys[i], h)) {
+                                       if (data != NULL)
+                                               data[i] = pdata[i];
+
+                                       hits |= 1ULL << i;
+                                       positions[i] = key_idx - 1;
+                                       goto next_key;
+                               }
+                               prim_hitmask[i] &= ~(3ULL << (hit_index << 1));
+                       }
 
-                       if (!!key_idx & !rte_hash_cmp_eq(key_slot->key, keys[i], h)) {
-                               if (data != NULL)
-                                       data[i] = key_slot->pdata;
+                       while (sec_hitmask[i]) {
+                               uint32_t hit_index =
+                                               __builtin_ctzl(sec_hitmask[i])
+                                               >> 1;
+                               uint32_t key_idx =
+                               __atomic_load_n(
+                                       &secondary_bkt[i]->key_idx[hit_index],
+                                       __ATOMIC_ACQUIRE);
+                               const struct rte_hash_key *key_slot =
+                                       (const struct rte_hash_key *)(
+                                       (const char *)h->key_store +
+                                       key_idx * h->key_entry_size);
+
+                               if (key_idx != EMPTY_SLOT)
+                                       pdata[i] = __atomic_load_n(
+                                                       &key_slot->pdata,
+                                                       __ATOMIC_ACQUIRE);
+                               /*
+                                * If key index is 0, do not compare key,
+                                * as it is checking the dummy slot
+                                */
 
-                               hits |= 1ULL << i;
-                               positions[i] = key_idx - 1;
-                               goto next_key;
+                               if (!!key_idx &
+                                       !rte_hash_cmp_eq(
+                                               key_slot->key, keys[i], h)) {
+                                       if (data != NULL)
+                                               data[i] = pdata[i];
+
+                                       hits |= 1ULL << i;
+                                       positions[i] = key_idx - 1;
+                                       goto next_key;
+                               }
+                               sec_hitmask[i] &= ~(3ULL << (hit_index << 1));
                        }
-                       sec_hitmask[i] &= ~(3ULL << (hit_index << 1));
+next_key:
+                       continue;
                }
 
-next_key:
-               continue;
-       }
+               /* The loads of sig_current in compare_signatures
+                * should not move below the load from tbl_chng_cnt.
+                */
+               __atomic_thread_fence(__ATOMIC_ACQUIRE);
+               /* Re-read the table change counter to check if the
+                * table has changed during search. If yes, re-do
+                * the search.
+                * This load should not get hoisted. The load
+                * acquires on cnt_b, primary key index and secondary
+                * key index will make sure that it does not get
+                * hoisted.
+                */
+               cnt_a = __atomic_load_n(h->tbl_chng_cnt,
+                                       __ATOMIC_ACQUIRE);
+       } while (cnt_b != cnt_a);
 
        /* all found, do not need to go through ext bkt */
        if ((hits == ((1ULL << num_keys) - 1)) || !h->ext_table_support) {
@@ -1612,7 +1804,8 @@ rte_hash_iterate(const struct rte_hash *h, const void **key, void **data, uint32
        idx = *next % RTE_HASH_BUCKET_ENTRIES;
 
        /* If current position is empty, go to the next one */
-       while ((position = h->buckets[bucket_idx].key_idx[idx]) == EMPTY_SLOT) {
+       while ((position = __atomic_load_n(&h->buckets[bucket_idx].key_idx[idx],
+                                       __ATOMIC_ACQUIRE)) == EMPTY_SLOT) {
                (*next)++;
                /* End of table */
                if (*next == total_entries_main)
index 601b2ce..5dfbbc4 100644 (file)
@@ -1,5 +1,6 @@
 /* SPDX-License-Identifier: BSD-3-Clause
  * Copyright(c) 2016 Intel Corporation
+ * Copyright(c) 2018 Arm Limited
  */
 
 /* rte_cuckoo_hash.h
@@ -174,6 +175,8 @@ struct rte_hash {
         * free the key index associated with the deleted entry.
         * This flag is enabled by default.
         */
+       uint8_t readwrite_concur_lf_support;
+       /**< If read-write concurrency lock free support is enabled */
        uint8_t writer_takes_lock;
        /**< Indicates if the writer threads need to take lock */
        rte_hash_function hash_func;    /**< Function used to calculate hash. */
@@ -196,6 +199,8 @@ struct rte_hash {
        rte_rwlock_t *readwrite_lock; /**< Read-write lock thread-safety. */
        struct rte_hash_bucket *buckets_ext; /**< Extra buckets array */
        struct rte_ring *free_ext_bkts; /**< Ring of indexes of free buckets */
+       uint32_t *tbl_chng_cnt;
+       /**< Indicates if the hash table changed from last read. */
 } __rte_cache_aligned;
 
 struct queue_node {
index dfa542b..c93d1a1 100644 (file)
@@ -44,9 +44,18 @@ extern "C" {
 
 /** Flag to disable freeing of key index on hash delete.
  * Refer to rte_hash_del_xxx APIs for more details.
+ * This is enabled by default when RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF
+ * is enabled.
  */
 #define RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL 0x10
 
+/** Flag to support lock free reader writer concurrency. Both single writer
+ * and multi writer use cases are supported.
+ * Currently, extendable bucket table feature is not supported with
+ * this feature.
+ */
+#define RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF 0x20
+
 /**
  * The type of hash value of a key.
  * It should be a value of at least 32bit with fully random pattern.
@@ -132,7 +141,12 @@ void
 rte_hash_free(struct rte_hash *h);
 
 /**
- * Reset all hash structure, by zeroing all entries
+ * Reset all hash structure, by zeroing all entries.
+ * When RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
+ * it is application's responsibility to make sure that
+ * none of the readers are referencing the hash table
+ * while calling this API.
+ *
  * @param h
  *   Hash table to reset
  */
@@ -156,6 +170,11 @@ rte_hash_count(const struct rte_hash *h);
  * and should only be called from one thread by default.
  * Thread safety can be enabled by setting flag during
  * table creation.
+ * If the key exists already in the table, this API updates its value
+ * with 'data' passed in this API. It is the responsibility of
+ * the application to manage any memory associated with the old value.
+ * The readers might still be using the old value even after this API
+ * has returned.
  *
  * @param h
  *   Hash table to add the key to.
@@ -178,6 +197,11 @@ rte_hash_add_key_data(const struct rte_hash *h, const void *key, void *data);
  * and should only be called from one thread by default.
  * Thread safety can be enabled by setting flag during
  * table creation.
+ * If the key exists already in the table, this API updates its value
+ * with 'data' passed in this API. It is the responsibility of
+ * the application to manage any memory associated with the old value.
+ * The readers might still be using the old value even after this API
+ * has returned.
  *
  * @param h
  *   Hash table to add the key to.
@@ -243,10 +267,14 @@ rte_hash_add_key_with_hash(const struct rte_hash *h, const void *key, hash_sig_t
  * and should only be called from one thread by default.
  * Thread safety can be enabled by setting flag during
  * table creation.
- * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL is enabled,
+ * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
+ * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
  * the key index returned by rte_hash_add_key_xxx APIs will not be
  * freed by this API. rte_hash_free_key_with_position API must be called
  * additionally to free the index associated with the key.
+ * rte_hash_free_key_with_position API should be called after all
+ * the readers have stopped referencing the entry corresponding to
+ * this key. RCU mechanisms could be used to determine such a state.
  *
  * @param h
  *   Hash table to remove the key from.
@@ -268,10 +296,14 @@ rte_hash_del_key(const struct rte_hash *h, const void *key);
  * and should only be called from one thread by default.
  * Thread safety can be enabled by setting flag during
  * table creation.
- * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL is enabled,
+ * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
+ * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
  * the key index returned by rte_hash_add_key_xxx APIs will not be
  * freed by this API. rte_hash_free_key_with_position API must be called
  * additionally to free the index associated with the key.
+ * rte_hash_free_key_with_position API should be called after all
+ * the readers have stopped referencing the entry corresponding to
+ * this key. RCU mechanisms could be used to determine such a state.
  *
  * @param h
  *   Hash table to remove the key from.
@@ -318,9 +350,12 @@ rte_hash_get_key_with_position(const struct rte_hash *h, const int32_t position,
  * of the key. This operation is not multi-thread safe and should
  * only be called from one thread by default. Thread safety
  * can be enabled by setting flag during table creation.
- * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL is enabled,
- * this API must be called, with the key index returned by rte_hash_add_key_xxx
- * APIs, after the key is deleted using rte_hash_del_key_xxx APIs.
+ * If RTE_HASH_EXTRA_FLAGS_NO_FREE_ON_DEL or
+ * RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF is enabled,
+ * the key index returned by rte_hash_del_key_xxx APIs must be freed
+ * using this API. This API should be called after all the readers
+ * have stopped referencing the entry corresponding to this key.
+ * RCU mechanisms could be used to determine such a state.
  * This API does not validate if the key is already freed.
  *
  * @param h