#include "rte_hash.h"
 #include "rte_cuckoo_hash.h"
 
+#define FOR_EACH_BUCKET(CURRENT_BKT, START_BUCKET)                            \
+       for (CURRENT_BKT = START_BUCKET;                                      \
+               CURRENT_BKT != NULL;                                          \
+               CURRENT_BKT = CURRENT_BKT->next)
 
 TAILQ_HEAD(rte_hash_list, rte_tailq_entry);
 
        return h;
 }
 
+static inline struct rte_hash_bucket *
+rte_hash_get_last_bkt(struct rte_hash_bucket *lst_bkt)
+{
+       while (lst_bkt->next != NULL)
+               lst_bkt = lst_bkt->next;
+       return lst_bkt;
+}
+
 void rte_hash_set_cmp_func(struct rte_hash *h, rte_hash_cmp_eq_t func)
 {
        h->cmp_jump_table_idx = KEY_CUSTOM;
        struct rte_tailq_entry *te = NULL;
        struct rte_hash_list *hash_list;
        struct rte_ring *r = NULL;
+       struct rte_ring *r_ext = NULL;
        char hash_name[RTE_HASH_NAMESIZE];
        void *k = NULL;
        void *buckets = NULL;
+       void *buckets_ext = NULL;
        char ring_name[RTE_RING_NAMESIZE];
+       char ext_ring_name[RTE_RING_NAMESIZE];
        unsigned num_key_slots;
        unsigned i;
        unsigned int hw_trans_mem_support = 0, multi_writer_support = 0;
+       unsigned int ext_table_support = 0;
        unsigned int readwrite_concur_support = 0;
 
        rte_hash_function default_hash_func = (rte_hash_function)rte_jhash;
                multi_writer_support = 1;
        }
 
+       if (params->extra_flag & RTE_HASH_EXTRA_FLAGS_EXT_TABLE)
+               ext_table_support = 1;
+
        /* Store all keys and leave the first entry as a dummy entry for lookup_bulk */
        if (multi_writer_support)
                /*
                goto err;
        }
 
+       const uint32_t num_buckets = rte_align32pow2(params->entries) /
+                                               RTE_HASH_BUCKET_ENTRIES;
+
+       /* Create ring for extendable buckets. */
+       if (ext_table_support) {
+               snprintf(ext_ring_name, sizeof(ext_ring_name), "HT_EXT_%s",
+                                                               params->name);
+               r_ext = rte_ring_create(ext_ring_name,
+                               rte_align32pow2(num_buckets + 1),
+                               params->socket_id, 0);
+
+               if (r_ext == NULL) {
+                       RTE_LOG(ERR, HASH, "ext buckets memory allocation "
+                                                               "failed\n");
+                       goto err;
+               }
+       }
+
        snprintf(hash_name, sizeof(hash_name), "HT_%s", params->name);
 
        rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
                goto err_unlock;
        }
 
-       const uint32_t num_buckets = rte_align32pow2(params->entries)
-                                       / RTE_HASH_BUCKET_ENTRIES;
-
        buckets = rte_zmalloc_socket(NULL,
                                num_buckets * sizeof(struct rte_hash_bucket),
                                RTE_CACHE_LINE_SIZE, params->socket_id);
 
        if (buckets == NULL) {
-               RTE_LOG(ERR, HASH, "memory allocation failed\n");
+               RTE_LOG(ERR, HASH, "buckets memory allocation failed\n");
                goto err_unlock;
        }
 
+       /* Allocate same number of extendable buckets */
+       if (ext_table_support) {
+               buckets_ext = rte_zmalloc_socket(NULL,
+                               num_buckets * sizeof(struct rte_hash_bucket),
+                               RTE_CACHE_LINE_SIZE, params->socket_id);
+               if (buckets_ext == NULL) {
+                       RTE_LOG(ERR, HASH, "ext buckets memory allocation "
+                                                       "failed\n");
+                       goto err_unlock;
+               }
+               /* Populate ext bkt ring. We reserve 0 similar to the
+                * key-data slot, just in case in future we want to
+                * use bucket index for the linked list and 0 means NULL
+                * for next bucket
+                */
+               for (i = 1; i <= num_buckets; i++)
+                       rte_ring_sp_enqueue(r_ext, (void *)((uintptr_t) i));
+       }
+
        const uint32_t key_entry_size = sizeof(struct rte_hash_key) + params->key_len;
        const uint64_t key_tbl_size = (uint64_t) key_entry_size * num_key_slots;
 
        h->num_buckets = num_buckets;
        h->bucket_bitmask = h->num_buckets - 1;
        h->buckets = buckets;
+       h->buckets_ext = buckets_ext;
+       h->free_ext_bkts = r_ext;
        h->hash_func = (params->hash_func == NULL) ?
                default_hash_func : params->hash_func;
        h->key_store = k;
        h->hw_trans_mem_support = hw_trans_mem_support;
        h->multi_writer_support = multi_writer_support;
        h->readwrite_concur_support = readwrite_concur_support;
+       h->ext_table_support = ext_table_support;
 
 #if defined(RTE_ARCH_X86)
        if (rte_cpu_get_flag_enabled(RTE_CPUFLAG_AVX2))
        rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
 err:
        rte_ring_free(r);
+       rte_ring_free(r_ext);
        rte_free(te);
        rte_free(h);
        rte_free(buckets);
+       rte_free(buckets_ext);
        rte_free(k);
        return NULL;
 }
                rte_free(h->readwrite_lock);
        }
        rte_ring_free(h->free_slots);
+       rte_ring_free(h->free_ext_bkts);
        rte_free(h->key_store);
        rte_free(h->buckets);
+       rte_free(h->buckets_ext);
        rte_free(h);
        rte_free(te);
 }
                rte_rwlock_write_lock(h->readwrite_lock);
 }
 
-
 static inline void
 __hash_rw_reader_lock(const struct rte_hash *h)
 {
        while (rte_ring_dequeue(h->free_slots, &ptr) == 0)
                rte_pause();
 
+       /* clear free extendable bucket ring and memory */
+       if (h->ext_table_support) {
+               memset(h->buckets_ext, 0, h->num_buckets *
+                                               sizeof(struct rte_hash_bucket));
+               while (rte_ring_dequeue(h->free_ext_bkts, &ptr) == 0)
+                       rte_pause();
+       }
+
        /* Repopulate the free slots ring. Entry zero is reserved for key misses */
        if (h->multi_writer_support)
                tot_ring_cnt = h->entries + (RTE_MAX_LCORE - 1) *
        for (i = 1; i < tot_ring_cnt + 1; i++)
                rte_ring_sp_enqueue(h->free_slots, (void *)((uintptr_t) i));
 
+       /* Repopulate the free ext bkt ring. */
+       if (h->ext_table_support) {
+               for (i = 1; i <= h->num_buckets; i++)
+                       rte_ring_sp_enqueue(h->free_ext_bkts,
+                                               (void *)((uintptr_t) i));
+       }
+
        if (h->multi_writer_support) {
                /* Reset local caches per lcore */
                for (i = 0; i < RTE_MAX_LCORE; i++)
                int32_t *ret_val)
 {
        unsigned int i;
-       struct rte_hash_bucket *cur_bkt = prim_bkt;
+       struct rte_hash_bucket *cur_bkt;
        int32_t ret;
 
        __hash_rw_writer_lock(h);
        /* Check if key was inserted after last check but before this
         * protected region in case of inserting duplicated keys.
         */
-       ret = search_and_update(h, data, key, cur_bkt, sig, alt_hash);
+       ret = search_and_update(h, data, key, prim_bkt, sig, alt_hash);
        if (ret != -1) {
                __hash_rw_writer_unlock(h);
                *ret_val = ret;
                return 1;
        }
-       ret = search_and_update(h, data, key, sec_bkt, alt_hash, sig);
-       if (ret != -1) {
-               __hash_rw_writer_unlock(h);
-               *ret_val = ret;
-               return 1;
+
+       FOR_EACH_BUCKET(cur_bkt, sec_bkt) {
+               ret = search_and_update(h, data, key, cur_bkt, alt_hash, sig);
+               if (ret != -1) {
+                       __hash_rw_writer_unlock(h);
+                       *ret_val = ret;
+                       return 1;
+               }
        }
 
        /* Insert new entry if there is room in the primary
                        int32_t *ret_val)
 {
        uint32_t prev_alt_bkt_idx;
-       struct rte_hash_bucket *cur_bkt = bkt;
+       struct rte_hash_bucket *cur_bkt;
        struct queue_node *prev_node, *curr_node = leaf;
        struct rte_hash_bucket *prev_bkt, *curr_bkt = leaf->bkt;
        uint32_t prev_slot, curr_slot = leaf_slot;
        /* Check if key was inserted after last check but before this
         * protected region.
         */
-       ret = search_and_update(h, data, key, cur_bkt, sig, alt_hash);
+       ret = search_and_update(h, data, key, bkt, sig, alt_hash);
        if (ret != -1) {
                __hash_rw_writer_unlock(h);
                *ret_val = ret;
                return 1;
        }
 
-       ret = search_and_update(h, data, key, alt_bkt, alt_hash, sig);
-       if (ret != -1) {
-               __hash_rw_writer_unlock(h);
-               *ret_val = ret;
-               return 1;
+       FOR_EACH_BUCKET(cur_bkt, alt_bkt) {
+               ret = search_and_update(h, data, key, cur_bkt, alt_hash, sig);
+               if (ret != -1) {
+                       __hash_rw_writer_unlock(h);
+                       *ret_val = ret;
+                       return 1;
+               }
        }
 
        while (likely(curr_node->prev != NULL)) {
 {
        hash_sig_t alt_hash;
        uint32_t prim_bucket_idx, sec_bucket_idx;
-       struct rte_hash_bucket *prim_bkt, *sec_bkt;
+       struct rte_hash_bucket *prim_bkt, *sec_bkt, *cur_bkt;
        struct rte_hash_key *new_k, *keys = h->key_store;
        void *slot_id = NULL;
-       uint32_t new_idx;
+       void *ext_bkt_id = NULL;
+       uint32_t new_idx, bkt_id;
        int ret;
        unsigned n_slots;
        unsigned lcore_id;
+       unsigned int i;
        struct lcore_cache *cached_free_slots = NULL;
        int32_t ret_val;
+       struct rte_hash_bucket *last;
 
        prim_bucket_idx = sig & h->bucket_bitmask;
        prim_bkt = &h->buckets[prim_bucket_idx];
        }
 
        /* Check if key is already inserted in secondary location */
-       ret = search_and_update(h, data, key, sec_bkt, alt_hash, sig);
-       if (ret != -1) {
-               __hash_rw_writer_unlock(h);
-               return ret;
+       FOR_EACH_BUCKET(cur_bkt, sec_bkt) {
+               ret = search_and_update(h, data, key, cur_bkt, alt_hash, sig);
+               if (ret != -1) {
+                       __hash_rw_writer_unlock(h);
+                       return ret;
+               }
        }
        __hash_rw_writer_unlock(h);
 
        else if (ret == 1) {
                enqueue_slot_back(h, cached_free_slots, slot_id);
                return ret_val;
-       } else {
+       }
+
+       /* if ext table not enabled, we failed the insertion */
+       if (!h->ext_table_support) {
                enqueue_slot_back(h, cached_free_slots, slot_id);
                return ret;
        }
+
+       /* Now we need to go through the extendable bucket. Protection is needed
+        * to protect all extendable bucket processes.
+        */
+       __hash_rw_writer_lock(h);
+       /* We check for duplicates again since could be inserted before the lock */
+       ret = search_and_update(h, data, key, prim_bkt, sig, alt_hash);
+       if (ret != -1) {
+               enqueue_slot_back(h, cached_free_slots, slot_id);
+               goto failure;
+       }
+
+       FOR_EACH_BUCKET(cur_bkt, sec_bkt) {
+               ret = search_and_update(h, data, key, cur_bkt, alt_hash, sig);
+               if (ret != -1) {
+                       enqueue_slot_back(h, cached_free_slots, slot_id);
+                       goto failure;
+               }
+       }
+
+       /* Search sec and ext buckets to find an empty entry to insert. */
+       FOR_EACH_BUCKET(cur_bkt, sec_bkt) {
+               for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
+                       /* Check if slot is available */
+                       if (likely(cur_bkt->key_idx[i] == EMPTY_SLOT)) {
+                               cur_bkt->sig_current[i] = alt_hash;
+                               cur_bkt->sig_alt[i] = sig;
+                               cur_bkt->key_idx[i] = new_idx;
+                               __hash_rw_writer_unlock(h);
+                               return new_idx - 1;
+                       }
+               }
+       }
+
+       /* Failed to get an empty entry from extendable buckets. Link a new
+        * extendable bucket. We first get a free bucket from ring.
+        */
+       if (rte_ring_sc_dequeue(h->free_ext_bkts, &ext_bkt_id) != 0) {
+               ret = -ENOSPC;
+               goto failure;
+       }
+
+       bkt_id = (uint32_t)((uintptr_t)ext_bkt_id) - 1;
+       /* Use the first location of the new bucket */
+       (h->buckets_ext[bkt_id]).sig_current[0] = alt_hash;
+       (h->buckets_ext[bkt_id]).sig_alt[0] = sig;
+       (h->buckets_ext[bkt_id]).key_idx[0] = new_idx;
+       /* Link the new bucket to sec bucket linked list */
+       last = rte_hash_get_last_bkt(sec_bkt);
+       last->next = &h->buckets_ext[bkt_id];
+       __hash_rw_writer_unlock(h);
+       return new_idx - 1;
+
+failure:
+       __hash_rw_writer_unlock(h);
+       return ret;
+
 }
 
 int32_t
 {
        uint32_t bucket_idx;
        hash_sig_t alt_hash;
-       struct rte_hash_bucket *bkt;
+       struct rte_hash_bucket *bkt, *cur_bkt;
        int ret;
 
        bucket_idx = sig & h->bucket_bitmask;
        bkt = &h->buckets[bucket_idx];
 
        /* Check if key is in secondary location */
-       ret = search_one_bucket(h, key, alt_hash, data, bkt);
-       if (ret != -1) {
-               __hash_rw_reader_unlock(h);
-               return ret;
+       FOR_EACH_BUCKET(cur_bkt, bkt) {
+               ret = search_one_bucket(h, key, alt_hash, data, cur_bkt);
+               if (ret != -1) {
+                       __hash_rw_reader_unlock(h);
+                       return ret;
+               }
        }
        __hash_rw_reader_unlock(h);
        return -ENOENT;
        }
 }
 
+/* Compact the linked list by moving key from last entry in linked list to the
+ * empty slot.
+ */
+static inline void
+__rte_hash_compact_ll(struct rte_hash_bucket *cur_bkt, int pos) {
+       int i;
+       struct rte_hash_bucket *last_bkt;
+
+       if (!cur_bkt->next)
+               return;
+
+       last_bkt = rte_hash_get_last_bkt(cur_bkt);
+
+       for (i = RTE_HASH_BUCKET_ENTRIES - 1; i >= 0; i--) {
+               if (last_bkt->key_idx[i] != EMPTY_SLOT) {
+                       cur_bkt->key_idx[pos] = last_bkt->key_idx[i];
+                       cur_bkt->sig_current[pos] = last_bkt->sig_current[i];
+                       cur_bkt->sig_alt[pos] = last_bkt->sig_alt[i];
+                       last_bkt->sig_current[i] = NULL_SIGNATURE;
+                       last_bkt->sig_alt[i] = NULL_SIGNATURE;
+                       last_bkt->key_idx[i] = EMPTY_SLOT;
+                       return;
+               }
+       }
+}
+
 /* Search one bucket and remove the matched key */
 static inline int32_t
 search_and_remove(const struct rte_hash *h, const void *key,
-                       struct rte_hash_bucket *bkt, hash_sig_t sig)
+                       struct rte_hash_bucket *bkt, hash_sig_t sig, int *pos)
 {
        struct rte_hash_key *k, *keys = h->key_store;
        unsigned int i;
        int32_t ret;
 
-       /* Check if key is in primary location */
+       /* Check if key is in bucket */
        for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
                if (bkt->sig_current[i] == sig &&
                                bkt->key_idx[i] != EMPTY_SLOT) {
                        if (rte_hash_cmp_eq(key, k->key, h) == 0) {
                                remove_entry(h, bkt, i);
 
-                               /*
-                                * Return index where key is stored,
+                               /* Return index where key is stored,
                                 * subtracting the first dummy index
                                 */
                                ret = bkt->key_idx[i] - 1;
                                bkt->key_idx[i] = EMPTY_SLOT;
+                               *pos = i;
                                return ret;
                        }
                }
 {
        uint32_t bucket_idx;
        hash_sig_t alt_hash;
-       struct rte_hash_bucket *bkt;
-       int32_t ret;
+       struct rte_hash_bucket *prim_bkt, *sec_bkt, *prev_bkt, *last_bkt;
+       struct rte_hash_bucket *cur_bkt;
+       int pos;
+       int32_t ret, i;
 
        bucket_idx = sig & h->bucket_bitmask;
-       bkt = &h->buckets[bucket_idx];
+       prim_bkt = &h->buckets[bucket_idx];
 
        __hash_rw_writer_lock(h);
        /* look for key in primary bucket */
-       ret = search_and_remove(h, key, bkt, sig);
+       ret = search_and_remove(h, key, prim_bkt, sig, &pos);
        if (ret != -1) {
-               __hash_rw_writer_unlock(h);
-               return ret;
+               __rte_hash_compact_ll(prim_bkt, pos);
+               last_bkt = prim_bkt->next;
+               prev_bkt = prim_bkt;
+               goto return_bkt;
        }
 
        /* Calculate secondary hash */
        alt_hash = rte_hash_secondary_hash(sig);
        bucket_idx = alt_hash & h->bucket_bitmask;
-       bkt = &h->buckets[bucket_idx];
+       sec_bkt = &h->buckets[bucket_idx];
+
+       FOR_EACH_BUCKET(cur_bkt, sec_bkt) {
+               ret = search_and_remove(h, key, cur_bkt, alt_hash, &pos);
+               if (ret != -1) {
+                       __rte_hash_compact_ll(cur_bkt, pos);
+                       last_bkt = sec_bkt->next;
+                       prev_bkt = sec_bkt;
+                       goto return_bkt;
+               }
+       }
 
-       /* look for key in secondary bucket */
-       ret = search_and_remove(h, key, bkt, alt_hash);
-       if (ret != -1) {
+       __hash_rw_writer_unlock(h);
+       return -ENOENT;
+
+/* Search last bucket to see if empty to be recycled */
+return_bkt:
+       if (!last_bkt) {
                __hash_rw_writer_unlock(h);
                return ret;
        }
+       while (last_bkt->next) {
+               prev_bkt = last_bkt;
+               last_bkt = last_bkt->next;
+       }
+
+       for (i = 0; i < RTE_HASH_BUCKET_ENTRIES; i++) {
+               if (last_bkt->key_idx[i] != EMPTY_SLOT)
+                       break;
+       }
+       /* found empty bucket and recycle */
+       if (i == RTE_HASH_BUCKET_ENTRIES) {
+               prev_bkt->next = last_bkt->next = NULL;
+               uint32_t index = last_bkt - h->buckets_ext + 1;
+               rte_ring_sp_enqueue(h->free_ext_bkts, (void *)(uintptr_t)index);
+       }
 
        __hash_rw_writer_unlock(h);
-       return -ENOENT;
+       return ret;
 }
 
 int32_t
 {
        uint64_t hits = 0;
        int32_t i;
+       int32_t ret;
        uint32_t prim_hash[RTE_HASH_LOOKUP_BULK_MAX];
        uint32_t sec_hash[RTE_HASH_LOOKUP_BULK_MAX];
        const struct rte_hash_bucket *primary_bkt[RTE_HASH_LOOKUP_BULK_MAX];
        const struct rte_hash_bucket *secondary_bkt[RTE_HASH_LOOKUP_BULK_MAX];
        uint32_t prim_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0};
        uint32_t sec_hitmask[RTE_HASH_LOOKUP_BULK_MAX] = {0};
+       struct rte_hash_bucket *cur_bkt, *next_bkt;
 
        /* Prefetch first keys */
        for (i = 0; i < PREFETCH_OFFSET && i < num_keys; i++)
                continue;
        }
 
+       /* all found, do not need to go through ext bkt */
+       if ((hits == ((1ULL << num_keys) - 1)) || !h->ext_table_support) {
+               if (hit_mask != NULL)
+                       *hit_mask = hits;
+               __hash_rw_reader_unlock(h);
+               return;
+       }
+
+       /* need to check ext buckets for match */
+       for (i = 0; i < num_keys; i++) {
+               if ((hits & (1ULL << i)) != 0)
+                       continue;
+               next_bkt = secondary_bkt[i]->next;
+               FOR_EACH_BUCKET(cur_bkt, next_bkt) {
+                       if (data != NULL)
+                               ret = search_one_bucket(h, keys[i],
+                                               sec_hash[i], &data[i], cur_bkt);
+                       else
+                               ret = search_one_bucket(h, keys[i],
+                                               sec_hash[i], NULL, cur_bkt);
+                       if (ret != -1) {
+                               positions[i] = ret;
+                               hits |= 1ULL << i;
+                               break;
+                       }
+               }
+       }
+
        __hash_rw_reader_unlock(h);
 
        if (hit_mask != NULL)
 
        RETURN_IF_TRUE(((h == NULL) || (next == NULL)), -EINVAL);
 
-       const uint32_t total_entries = h->num_buckets * RTE_HASH_BUCKET_ENTRIES;
-       /* Out of bounds */
-       if (*next >= total_entries)
-               return -ENOENT;
+       const uint32_t total_entries_main = h->num_buckets *
+                                                       RTE_HASH_BUCKET_ENTRIES;
+       const uint32_t total_entries = total_entries_main << 1;
+
+       /* Out of bounds of all buckets (both main table and ext table) */
+       if (*next >= total_entries_main)
+               goto extend_table;
 
        /* Calculate bucket and index of current iterator */
        bucket_idx = *next / RTE_HASH_BUCKET_ENTRIES;
        while ((position = h->buckets[bucket_idx].key_idx[idx]) == EMPTY_SLOT) {
                (*next)++;
                /* End of table */
-               if (*next == total_entries)
-                       return -ENOENT;
+               if (*next == total_entries_main)
+                       goto extend_table;
                bucket_idx = *next / RTE_HASH_BUCKET_ENTRIES;
                idx = *next % RTE_HASH_BUCKET_ENTRIES;
        }
        (*next)++;
 
        return position - 1;
+
+/* Begin to iterate extendable buckets */
+extend_table:
+       /* Out of total bound or if ext bucket feature is not enabled */
+       if (*next >= total_entries || !h->ext_table_support)
+               return -ENOENT;
+
+       bucket_idx = (*next - total_entries_main) / RTE_HASH_BUCKET_ENTRIES;
+       idx = (*next - total_entries_main) % RTE_HASH_BUCKET_ENTRIES;
+
+       while ((position = h->buckets_ext[bucket_idx].key_idx[idx]) == EMPTY_SLOT) {
+               (*next)++;
+               if (*next == total_entries)
+                       return -ENOENT;
+               bucket_idx = (*next - total_entries_main) /
+                                               RTE_HASH_BUCKET_ENTRIES;
+               idx = (*next - total_entries_main) % RTE_HASH_BUCKET_ENTRIES;
+       }
+       __hash_rw_reader_lock(h);
+       next_key = (struct rte_hash_key *) ((char *)h->key_store +
+                               position * h->key_entry_size);
+       /* Return key and data */
+       *key = next_key->key;
+       *data = next_key->pdata;
+
+       __hash_rw_reader_unlock(h);
+
+       /* Increment iterator */
+       (*next)++;
+       return position - 1;
 }