mempool/bucket: implement bucket mempool manager
authorArtem V. Andreev <artem.andreev@oktetlabs.ru>
Thu, 26 Apr 2018 10:59:19 +0000 (11:59 +0100)
committerThomas Monjalon <thomas@monjalon.net>
Thu, 26 Apr 2018 21:34:07 +0000 (23:34 +0200)
The manager provides a way to allocate physically and virtually
contiguous set of objects.

Signed-off-by: Artem V. Andreev <artem.andreev@oktetlabs.ru>
Signed-off-by: Andrew Rybchenko <arybchenko@solarflare.com>
MAINTAINERS
config/common_base
doc/guides/rel_notes/release_18_05.rst
drivers/mempool/Makefile
drivers/mempool/bucket/Makefile [new file with mode: 0644]
drivers/mempool/bucket/meson.build [new file with mode: 0644]
drivers/mempool/bucket/rte_mempool_bucket.c [new file with mode: 0644]
drivers/mempool/bucket/rte_mempool_bucket_version.map [new file with mode: 0644]
mk/rte.app.mk

index 6f02351..b59fa05 100644 (file)
@@ -364,6 +364,15 @@ F: test/test/test_rawdev.c
 F: doc/guides/prog_guide/rawdev.rst
 
 
+Memory Pool Drivers
+-------------------
+
+Bucket memory pool
+M: Artem V. Andreev <artem.andreev@oktetlabs.ru>
+M: Andrew Rybchenko <arybchenko@solarflare.com>
+F: drivers/mempool/bucket/
+
+
 Bus Drivers
 -----------
 
index 7e45412..f24417c 100644 (file)
@@ -632,6 +632,8 @@ CONFIG_RTE_LIBRTE_MEMPOOL_DEBUG=n
 #
 # Compile Mempool drivers
 #
+CONFIG_RTE_DRIVER_MEMPOOL_BUCKET=y
+CONFIG_RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB=64
 CONFIG_RTE_DRIVER_MEMPOOL_RING=y
 CONFIG_RTE_DRIVER_MEMPOOL_STACK=y
 
index 5408645..3899c7d 100644 (file)
@@ -41,6 +41,13 @@ New Features
      Also, make sure to start the actual text at the margin.
      =========================================================
 
+* **Added bucket mempool driver.**
+
+  Added bucket mempool driver which provides a way to allocate contiguous
+  block of objects.
+  Number of objects in the block depends on how many objects fit in
+  RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB memory chunk which is build time option.
+
 * **Added PMD-recommended Tx and Rx parameters**
 
   Applications can now query drivers for device-tuned values of
index fc8b73b..28c2e83 100644 (file)
@@ -3,6 +3,7 @@
 
 include $(RTE_SDK)/mk/rte.vars.mk
 
+DIRS-$(CONFIG_RTE_DRIVER_MEMPOOL_BUCKET) += bucket
 ifeq ($(CONFIG_RTE_LIBRTE_DPAA_BUS),y)
 DIRS-$(CONFIG_RTE_LIBRTE_DPAA_MEMPOOL) += dpaa
 endif
diff --git a/drivers/mempool/bucket/Makefile b/drivers/mempool/bucket/Makefile
new file mode 100644 (file)
index 0000000..7364916
--- /dev/null
@@ -0,0 +1,27 @@
+# SPDX-License-Identifier: BSD-3-Clause
+#
+# Copyright (c) 2017-2018 Solarflare Communications Inc.
+# All rights reserved.
+#
+# This software was jointly developed between OKTET Labs (under contract
+# for Solarflare) and Solarflare Communications, Inc.
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+#
+# library name
+#
+LIB = librte_mempool_bucket.a
+
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS)
+
+LDLIBS += -lrte_eal -lrte_mempool -lrte_ring
+
+EXPORT_MAP := rte_mempool_bucket_version.map
+
+LIBABIVER := 1
+
+SRCS-$(CONFIG_RTE_DRIVER_MEMPOOL_BUCKET) += rte_mempool_bucket.c
+
+include $(RTE_SDK)/mk/rte.lib.mk
diff --git a/drivers/mempool/bucket/meson.build b/drivers/mempool/bucket/meson.build
new file mode 100644 (file)
index 0000000..618d791
--- /dev/null
@@ -0,0 +1,9 @@
+# SPDX-License-Identifier: BSD-3-Clause
+#
+# Copyright (c) 2017-2018 Solarflare Communications Inc.
+# All rights reserved.
+#
+# This software was jointly developed between OKTET Labs (under contract
+# for Solarflare) and Solarflare Communications, Inc.
+
+sources = files('rte_mempool_bucket.c')
diff --git a/drivers/mempool/bucket/rte_mempool_bucket.c b/drivers/mempool/bucket/rte_mempool_bucket.c
new file mode 100644 (file)
index 0000000..ef822eb
--- /dev/null
@@ -0,0 +1,563 @@
+/* SPDX-License-Identifier: BSD-3-Clause
+ *
+ * Copyright (c) 2017-2018 Solarflare Communications Inc.
+ * All rights reserved.
+ *
+ * This software was jointly developed between OKTET Labs (under contract
+ * for Solarflare) and Solarflare Communications, Inc.
+ */
+
+#include <stdbool.h>
+#include <stdio.h>
+#include <string.h>
+
+#include <rte_errno.h>
+#include <rte_ring.h>
+#include <rte_mempool.h>
+#include <rte_malloc.h>
+
+/*
+ * The general idea of the bucket mempool driver is as follows.
+ * We keep track of physically contiguous groups (buckets) of objects
+ * of a certain size. Every such a group has a counter that is
+ * incremented every time an object from that group is enqueued.
+ * Until the bucket is full, no objects from it are eligible for allocation.
+ * If a request is made to dequeue a multiply of bucket size, it is
+ * satisfied by returning the whole buckets, instead of separate objects.
+ */
+
+
+struct bucket_header {
+       unsigned int lcore_id;
+       uint8_t fill_cnt;
+};
+
+struct bucket_stack {
+       unsigned int top;
+       unsigned int limit;
+       void *objects[];
+};
+
+struct bucket_data {
+       unsigned int header_size;
+       unsigned int total_elt_size;
+       unsigned int obj_per_bucket;
+       uintptr_t bucket_page_mask;
+       struct rte_ring *shared_bucket_ring;
+       struct bucket_stack *buckets[RTE_MAX_LCORE];
+       /*
+        * Multi-producer single-consumer ring to hold objects that are
+        * returned to the mempool at a different lcore than initially
+        * dequeued
+        */
+       struct rte_ring *adoption_buffer_rings[RTE_MAX_LCORE];
+       struct rte_ring *shared_orphan_ring;
+       struct rte_mempool *pool;
+       unsigned int bucket_mem_size;
+};
+
+static struct bucket_stack *
+bucket_stack_create(const struct rte_mempool *mp, unsigned int n_elts)
+{
+       struct bucket_stack *stack;
+
+       stack = rte_zmalloc_socket("bucket_stack",
+                                  sizeof(struct bucket_stack) +
+                                  n_elts * sizeof(void *),
+                                  RTE_CACHE_LINE_SIZE,
+                                  mp->socket_id);
+       if (stack == NULL)
+               return NULL;
+       stack->limit = n_elts;
+       stack->top = 0;
+
+       return stack;
+}
+
+static void
+bucket_stack_push(struct bucket_stack *stack, void *obj)
+{
+       RTE_ASSERT(stack->top < stack->limit);
+       stack->objects[stack->top++] = obj;
+}
+
+static void *
+bucket_stack_pop_unsafe(struct bucket_stack *stack)
+{
+       RTE_ASSERT(stack->top > 0);
+       return stack->objects[--stack->top];
+}
+
+static void *
+bucket_stack_pop(struct bucket_stack *stack)
+{
+       if (stack->top == 0)
+               return NULL;
+       return bucket_stack_pop_unsafe(stack);
+}
+
+static int
+bucket_enqueue_single(struct bucket_data *bd, void *obj)
+{
+       int rc = 0;
+       uintptr_t addr = (uintptr_t)obj;
+       struct bucket_header *hdr;
+       unsigned int lcore_id = rte_lcore_id();
+
+       addr &= bd->bucket_page_mask;
+       hdr = (struct bucket_header *)addr;
+
+       if (likely(hdr->lcore_id == lcore_id)) {
+               if (hdr->fill_cnt < bd->obj_per_bucket - 1) {
+                       hdr->fill_cnt++;
+               } else {
+                       hdr->fill_cnt = 0;
+                       /* Stack is big enough to put all buckets */
+                       bucket_stack_push(bd->buckets[lcore_id], hdr);
+               }
+       } else if (hdr->lcore_id != LCORE_ID_ANY) {
+               struct rte_ring *adopt_ring =
+                       bd->adoption_buffer_rings[hdr->lcore_id];
+
+               rc = rte_ring_enqueue(adopt_ring, obj);
+               /* Ring is big enough to put all objects */
+               RTE_ASSERT(rc == 0);
+       } else if (hdr->fill_cnt < bd->obj_per_bucket - 1) {
+               hdr->fill_cnt++;
+       } else {
+               hdr->fill_cnt = 0;
+               rc = rte_ring_enqueue(bd->shared_bucket_ring, hdr);
+               /* Ring is big enough to put all buckets */
+               RTE_ASSERT(rc == 0);
+       }
+
+       return rc;
+}
+
+static int
+bucket_enqueue(struct rte_mempool *mp, void * const *obj_table,
+              unsigned int n)
+{
+       struct bucket_data *bd = mp->pool_data;
+       unsigned int i;
+       int rc = 0;
+
+       for (i = 0; i < n; i++) {
+               rc = bucket_enqueue_single(bd, obj_table[i]);
+               RTE_ASSERT(rc == 0);
+       }
+       return rc;
+}
+
+static void **
+bucket_fill_obj_table(const struct bucket_data *bd, void **pstart,
+                     void **obj_table, unsigned int n)
+{
+       unsigned int i;
+       uint8_t *objptr = *pstart;
+
+       for (objptr += bd->header_size, i = 0; i < n;
+            i++, objptr += bd->total_elt_size)
+               *obj_table++ = objptr;
+       *pstart = objptr;
+       return obj_table;
+}
+
+static int
+bucket_dequeue_orphans(struct bucket_data *bd, void **obj_table,
+                      unsigned int n_orphans)
+{
+       unsigned int i;
+       int rc;
+       uint8_t *objptr;
+
+       rc = rte_ring_dequeue_bulk(bd->shared_orphan_ring, obj_table,
+                                  n_orphans, NULL);
+       if (unlikely(rc != (int)n_orphans)) {
+               struct bucket_header *hdr;
+
+               objptr = bucket_stack_pop(bd->buckets[rte_lcore_id()]);
+               hdr = (struct bucket_header *)objptr;
+
+               if (objptr == NULL) {
+                       rc = rte_ring_dequeue(bd->shared_bucket_ring,
+                                             (void **)&objptr);
+                       if (rc != 0) {
+                               rte_errno = ENOBUFS;
+                               return -rte_errno;
+                       }
+                       hdr = (struct bucket_header *)objptr;
+                       hdr->lcore_id = rte_lcore_id();
+               }
+               hdr->fill_cnt = 0;
+               bucket_fill_obj_table(bd, (void **)&objptr, obj_table,
+                                     n_orphans);
+               for (i = n_orphans; i < bd->obj_per_bucket; i++,
+                            objptr += bd->total_elt_size) {
+                       rc = rte_ring_enqueue(bd->shared_orphan_ring,
+                                             objptr);
+                       if (rc != 0) {
+                               RTE_ASSERT(0);
+                               rte_errno = -rc;
+                               return rc;
+                       }
+               }
+       }
+
+       return 0;
+}
+
+static int
+bucket_dequeue_buckets(struct bucket_data *bd, void **obj_table,
+                      unsigned int n_buckets)
+{
+       struct bucket_stack *cur_stack = bd->buckets[rte_lcore_id()];
+       unsigned int n_buckets_from_stack = RTE_MIN(n_buckets, cur_stack->top);
+       void **obj_table_base = obj_table;
+
+       n_buckets -= n_buckets_from_stack;
+       while (n_buckets_from_stack-- > 0) {
+               void *obj = bucket_stack_pop_unsafe(cur_stack);
+
+               obj_table = bucket_fill_obj_table(bd, &obj, obj_table,
+                                                 bd->obj_per_bucket);
+       }
+       while (n_buckets-- > 0) {
+               struct bucket_header *hdr;
+
+               if (unlikely(rte_ring_dequeue(bd->shared_bucket_ring,
+                                             (void **)&hdr) != 0)) {
+                       /*
+                        * Return the already-dequeued buffers
+                        * back to the mempool
+                        */
+                       bucket_enqueue(bd->pool, obj_table_base,
+                                      obj_table - obj_table_base);
+                       rte_errno = ENOBUFS;
+                       return -rte_errno;
+               }
+               hdr->lcore_id = rte_lcore_id();
+               obj_table = bucket_fill_obj_table(bd, (void **)&hdr,
+                                                 obj_table,
+                                                 bd->obj_per_bucket);
+       }
+
+       return 0;
+}
+
+static int
+bucket_adopt_orphans(struct bucket_data *bd)
+{
+       int rc = 0;
+       struct rte_ring *adopt_ring =
+               bd->adoption_buffer_rings[rte_lcore_id()];
+
+       if (unlikely(!rte_ring_empty(adopt_ring))) {
+               void *orphan;
+
+               while (rte_ring_sc_dequeue(adopt_ring, &orphan) == 0) {
+                       rc = bucket_enqueue_single(bd, orphan);
+                       RTE_ASSERT(rc == 0);
+               }
+       }
+       return rc;
+}
+
+static int
+bucket_dequeue(struct rte_mempool *mp, void **obj_table, unsigned int n)
+{
+       struct bucket_data *bd = mp->pool_data;
+       unsigned int n_buckets = n / bd->obj_per_bucket;
+       unsigned int n_orphans = n - n_buckets * bd->obj_per_bucket;
+       int rc = 0;
+
+       bucket_adopt_orphans(bd);
+
+       if (unlikely(n_orphans > 0)) {
+               rc = bucket_dequeue_orphans(bd, obj_table +
+                                           (n_buckets * bd->obj_per_bucket),
+                                           n_orphans);
+               if (rc != 0)
+                       return rc;
+       }
+
+       if (likely(n_buckets > 0)) {
+               rc = bucket_dequeue_buckets(bd, obj_table, n_buckets);
+               if (unlikely(rc != 0) && n_orphans > 0) {
+                       rte_ring_enqueue_bulk(bd->shared_orphan_ring,
+                                             obj_table + (n_buckets *
+                                                          bd->obj_per_bucket),
+                                             n_orphans, NULL);
+               }
+       }
+
+       return rc;
+}
+
+static void
+count_underfilled_buckets(struct rte_mempool *mp,
+                         void *opaque,
+                         struct rte_mempool_memhdr *memhdr,
+                         __rte_unused unsigned int mem_idx)
+{
+       unsigned int *pcount = opaque;
+       const struct bucket_data *bd = mp->pool_data;
+       unsigned int bucket_page_sz =
+               (unsigned int)(~bd->bucket_page_mask + 1);
+       uintptr_t align;
+       uint8_t *iter;
+
+       align = (uintptr_t)RTE_PTR_ALIGN_CEIL(memhdr->addr, bucket_page_sz) -
+               (uintptr_t)memhdr->addr;
+
+       for (iter = (uint8_t *)memhdr->addr + align;
+            iter < (uint8_t *)memhdr->addr + memhdr->len;
+            iter += bucket_page_sz) {
+               struct bucket_header *hdr = (struct bucket_header *)iter;
+
+               *pcount += hdr->fill_cnt;
+       }
+}
+
+static unsigned int
+bucket_get_count(const struct rte_mempool *mp)
+{
+       const struct bucket_data *bd = mp->pool_data;
+       unsigned int count =
+               bd->obj_per_bucket * rte_ring_count(bd->shared_bucket_ring) +
+               rte_ring_count(bd->shared_orphan_ring);
+       unsigned int i;
+
+       for (i = 0; i < RTE_MAX_LCORE; i++) {
+               if (!rte_lcore_is_enabled(i))
+                       continue;
+               count += bd->obj_per_bucket * bd->buckets[i]->top +
+                       rte_ring_count(bd->adoption_buffer_rings[i]);
+       }
+
+       rte_mempool_mem_iter((struct rte_mempool *)(uintptr_t)mp,
+                            count_underfilled_buckets, &count);
+
+       return count;
+}
+
+static int
+bucket_alloc(struct rte_mempool *mp)
+{
+       int rg_flags = 0;
+       int rc = 0;
+       char rg_name[RTE_RING_NAMESIZE];
+       struct bucket_data *bd;
+       unsigned int i;
+       unsigned int bucket_header_size;
+
+       bd = rte_zmalloc_socket("bucket_pool", sizeof(*bd),
+                               RTE_CACHE_LINE_SIZE, mp->socket_id);
+       if (bd == NULL) {
+               rc = -ENOMEM;
+               goto no_mem_for_data;
+       }
+       bd->pool = mp;
+       if (mp->flags & MEMPOOL_F_NO_CACHE_ALIGN)
+               bucket_header_size = sizeof(struct bucket_header);
+       else
+               bucket_header_size = RTE_CACHE_LINE_SIZE;
+       RTE_BUILD_BUG_ON(sizeof(struct bucket_header) > RTE_CACHE_LINE_SIZE);
+       bd->header_size = mp->header_size + bucket_header_size;
+       bd->total_elt_size = mp->header_size + mp->elt_size + mp->trailer_size;
+       bd->bucket_mem_size = RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB * 1024;
+       bd->obj_per_bucket = (bd->bucket_mem_size - bucket_header_size) /
+               bd->total_elt_size;
+       bd->bucket_page_mask = ~(rte_align64pow2(bd->bucket_mem_size) - 1);
+
+       if (mp->flags & MEMPOOL_F_SP_PUT)
+               rg_flags |= RING_F_SP_ENQ;
+       if (mp->flags & MEMPOOL_F_SC_GET)
+               rg_flags |= RING_F_SC_DEQ;
+
+       for (i = 0; i < RTE_MAX_LCORE; i++) {
+               if (!rte_lcore_is_enabled(i))
+                       continue;
+               bd->buckets[i] =
+                       bucket_stack_create(mp, mp->size / bd->obj_per_bucket);
+               if (bd->buckets[i] == NULL) {
+                       rc = -ENOMEM;
+                       goto no_mem_for_stacks;
+               }
+               rc = snprintf(rg_name, sizeof(rg_name),
+                             RTE_MEMPOOL_MZ_FORMAT ".a%u", mp->name, i);
+               if (rc < 0 || rc >= (int)sizeof(rg_name)) {
+                       rc = -ENAMETOOLONG;
+                       goto no_mem_for_stacks;
+               }
+               bd->adoption_buffer_rings[i] =
+                       rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
+                                       mp->socket_id,
+                                       rg_flags | RING_F_SC_DEQ);
+               if (bd->adoption_buffer_rings[i] == NULL) {
+                       rc = -rte_errno;
+                       goto no_mem_for_stacks;
+               }
+       }
+
+       rc = snprintf(rg_name, sizeof(rg_name),
+                     RTE_MEMPOOL_MZ_FORMAT ".0", mp->name);
+       if (rc < 0 || rc >= (int)sizeof(rg_name)) {
+               rc = -ENAMETOOLONG;
+               goto invalid_shared_orphan_ring;
+       }
+       bd->shared_orphan_ring =
+               rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
+                               mp->socket_id, rg_flags);
+       if (bd->shared_orphan_ring == NULL) {
+               rc = -rte_errno;
+               goto cannot_create_shared_orphan_ring;
+       }
+
+       rc = snprintf(rg_name, sizeof(rg_name),
+                      RTE_MEMPOOL_MZ_FORMAT ".1", mp->name);
+       if (rc < 0 || rc >= (int)sizeof(rg_name)) {
+               rc = -ENAMETOOLONG;
+               goto invalid_shared_bucket_ring;
+       }
+       bd->shared_bucket_ring =
+               rte_ring_create(rg_name,
+                               rte_align32pow2((mp->size + 1) /
+                                               bd->obj_per_bucket),
+                               mp->socket_id, rg_flags);
+       if (bd->shared_bucket_ring == NULL) {
+               rc = -rte_errno;
+               goto cannot_create_shared_bucket_ring;
+       }
+
+       mp->pool_data = bd;
+
+       return 0;
+
+cannot_create_shared_bucket_ring:
+invalid_shared_bucket_ring:
+       rte_ring_free(bd->shared_orphan_ring);
+cannot_create_shared_orphan_ring:
+invalid_shared_orphan_ring:
+no_mem_for_stacks:
+       for (i = 0; i < RTE_MAX_LCORE; i++) {
+               rte_free(bd->buckets[i]);
+               rte_ring_free(bd->adoption_buffer_rings[i]);
+       }
+       rte_free(bd);
+no_mem_for_data:
+       rte_errno = -rc;
+       return rc;
+}
+
+static void
+bucket_free(struct rte_mempool *mp)
+{
+       unsigned int i;
+       struct bucket_data *bd = mp->pool_data;
+
+       if (bd == NULL)
+               return;
+
+       for (i = 0; i < RTE_MAX_LCORE; i++) {
+               rte_free(bd->buckets[i]);
+               rte_ring_free(bd->adoption_buffer_rings[i]);
+       }
+
+       rte_ring_free(bd->shared_orphan_ring);
+       rte_ring_free(bd->shared_bucket_ring);
+
+       rte_free(bd);
+}
+
+static ssize_t
+bucket_calc_mem_size(const struct rte_mempool *mp, uint32_t obj_num,
+                    __rte_unused uint32_t pg_shift, size_t *min_total_elt_size,
+                    size_t *align)
+{
+       struct bucket_data *bd = mp->pool_data;
+       unsigned int bucket_page_sz;
+
+       if (bd == NULL)
+               return -EINVAL;
+
+       bucket_page_sz = rte_align32pow2(bd->bucket_mem_size);
+       *align = bucket_page_sz;
+       *min_total_elt_size = bucket_page_sz;
+       /*
+        * Each bucket occupies its own block aligned to
+        * bucket_page_sz, so the required amount of memory is
+        * a multiple of bucket_page_sz.
+        * We also need extra space for a bucket header
+        */
+       return ((obj_num + bd->obj_per_bucket - 1) /
+               bd->obj_per_bucket) * bucket_page_sz;
+}
+
+static int
+bucket_populate(struct rte_mempool *mp, unsigned int max_objs,
+               void *vaddr, rte_iova_t iova, size_t len,
+               rte_mempool_populate_obj_cb_t *obj_cb, void *obj_cb_arg)
+{
+       struct bucket_data *bd = mp->pool_data;
+       unsigned int bucket_page_sz;
+       unsigned int bucket_header_sz;
+       unsigned int n_objs;
+       uintptr_t align;
+       uint8_t *iter;
+       int rc;
+
+       if (bd == NULL)
+               return -EINVAL;
+
+       bucket_page_sz = rte_align32pow2(bd->bucket_mem_size);
+       align = RTE_PTR_ALIGN_CEIL((uintptr_t)vaddr, bucket_page_sz) -
+               (uintptr_t)vaddr;
+
+       bucket_header_sz = bd->header_size - mp->header_size;
+       if (iova != RTE_BAD_IOVA)
+               iova += align + bucket_header_sz;
+
+       for (iter = (uint8_t *)vaddr + align, n_objs = 0;
+            iter < (uint8_t *)vaddr + len && n_objs < max_objs;
+            iter += bucket_page_sz) {
+               struct bucket_header *hdr = (struct bucket_header *)iter;
+               unsigned int chunk_len = bd->bucket_mem_size;
+
+               if ((size_t)(iter - (uint8_t *)vaddr) + chunk_len > len)
+                       chunk_len = len - (iter - (uint8_t *)vaddr);
+               if (chunk_len <= bucket_header_sz)
+                       break;
+               chunk_len -= bucket_header_sz;
+
+               hdr->fill_cnt = 0;
+               hdr->lcore_id = LCORE_ID_ANY;
+               rc = rte_mempool_op_populate_default(mp,
+                                                    RTE_MIN(bd->obj_per_bucket,
+                                                            max_objs - n_objs),
+                                                    iter + bucket_header_sz,
+                                                    iova, chunk_len,
+                                                    obj_cb, obj_cb_arg);
+               if (rc < 0)
+                       return rc;
+               n_objs += rc;
+               if (iova != RTE_BAD_IOVA)
+                       iova += bucket_page_sz;
+       }
+
+       return n_objs;
+}
+
+static const struct rte_mempool_ops ops_bucket = {
+       .name = "bucket",
+       .alloc = bucket_alloc,
+       .free = bucket_free,
+       .enqueue = bucket_enqueue,
+       .dequeue = bucket_dequeue,
+       .get_count = bucket_get_count,
+       .calc_mem_size = bucket_calc_mem_size,
+       .populate = bucket_populate,
+};
+
+
+MEMPOOL_REGISTER_OPS(ops_bucket);
diff --git a/drivers/mempool/bucket/rte_mempool_bucket_version.map b/drivers/mempool/bucket/rte_mempool_bucket_version.map
new file mode 100644 (file)
index 0000000..9b9ab1a
--- /dev/null
@@ -0,0 +1,4 @@
+DPDK_18.05 {
+
+       local: *;
+};
index a145791..1324f19 100644 (file)
@@ -125,6 +125,7 @@ endif
 ifeq ($(CONFIG_RTE_BUILD_SHARED_LIB),n)
 # plugins (link only if static libraries)
 
+_LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_BUCKET) += -lrte_mempool_bucket
 _LDLIBS-$(CONFIG_RTE_DRIVER_MEMPOOL_STACK)  += -lrte_mempool_stack
 ifeq ($(CONFIG_RTE_LIBRTE_DPAA_BUS),y)
 _LDLIBS-$(CONFIG_RTE_LIBRTE_DPAA_MEMPOOL)   += -lrte_mempool_dpaa