unsigned int header_size;
unsigned int total_elt_size;
unsigned int obj_per_bucket;
+ unsigned int bucket_stack_thresh;
uintptr_t bucket_page_mask;
struct rte_ring *shared_bucket_ring;
struct bucket_stack *buckets[RTE_MAX_LCORE];
struct rte_ring *shared_orphan_ring;
struct rte_mempool *pool;
unsigned int bucket_mem_size;
+ void *lcore_callback_handle;
};
static struct bucket_stack *
unsigned int n)
{
struct bucket_data *bd = mp->pool_data;
+ struct bucket_stack *local_stack = bd->buckets[rte_lcore_id()];
unsigned int i;
int rc = 0;
rc = bucket_enqueue_single(bd, obj_table[i]);
RTE_ASSERT(rc == 0);
}
+ if (local_stack->top > bd->bucket_stack_thresh) {
+ rte_ring_enqueue_bulk(bd->shared_bucket_ring,
+ &local_stack->objects
+ [bd->bucket_stack_thresh],
+ local_stack->top -
+ bd->bucket_stack_thresh,
+ NULL);
+ local_stack->top = bd->bucket_stack_thresh;
+ }
return rc;
}
return 0;
}
+struct bucket_count_per_lcore_ctx {
+ const struct bucket_data *bd;
+ unsigned int count;
+};
+
+static int
+bucket_count_per_lcore(unsigned int lcore_id, void *arg)
+{
+ struct bucket_count_per_lcore_ctx *bplc = arg;
+
+ bplc->count += bplc->bd->obj_per_bucket *
+ bplc->bd->buckets[lcore_id]->top;
+ bplc->count +=
+ rte_ring_count(bplc->bd->adoption_buffer_rings[lcore_id]);
+ return 0;
+}
+
static void
count_underfilled_buckets(struct rte_mempool *mp,
void *opaque,
static unsigned int
bucket_get_count(const struct rte_mempool *mp)
{
- const struct bucket_data *bd = mp->pool_data;
- unsigned int count =
- bd->obj_per_bucket * rte_ring_count(bd->shared_bucket_ring) +
- rte_ring_count(bd->shared_orphan_ring);
- unsigned int i;
+ struct bucket_count_per_lcore_ctx bplc;
- for (i = 0; i < RTE_MAX_LCORE; i++) {
- if (!rte_lcore_is_enabled(i))
- continue;
- count += bd->obj_per_bucket * bd->buckets[i]->top +
- rte_ring_count(bd->adoption_buffer_rings[i]);
- }
+ bplc.bd = mp->pool_data;
+ bplc.count = bplc.bd->obj_per_bucket *
+ rte_ring_count(bplc.bd->shared_bucket_ring);
+ bplc.count += rte_ring_count(bplc.bd->shared_orphan_ring);
+ rte_lcore_iterate(bucket_count_per_lcore, &bplc);
rte_mempool_mem_iter((struct rte_mempool *)(uintptr_t)mp,
- count_underfilled_buckets, &count);
+ count_underfilled_buckets, &bplc.count);
- return count;
+ return bplc.count;
+}
+
+static int
+bucket_init_per_lcore(unsigned int lcore_id, void *arg)
+{
+ char rg_name[RTE_RING_NAMESIZE];
+ struct bucket_data *bd = arg;
+ struct rte_mempool *mp;
+ int rg_flags;
+ int rc;
+
+ mp = bd->pool;
+ bd->buckets[lcore_id] = bucket_stack_create(mp,
+ mp->size / bd->obj_per_bucket);
+ if (bd->buckets[lcore_id] == NULL)
+ goto error;
+
+ rc = snprintf(rg_name, sizeof(rg_name), RTE_MEMPOOL_MZ_FORMAT ".a%u",
+ mp->name, lcore_id);
+ if (rc < 0 || rc >= (int)sizeof(rg_name))
+ goto error;
+
+ rg_flags = RING_F_SC_DEQ;
+ if (mp->flags & RTE_MEMPOOL_F_SP_PUT)
+ rg_flags |= RING_F_SP_ENQ;
+ bd->adoption_buffer_rings[lcore_id] = rte_ring_create(rg_name,
+ rte_align32pow2(mp->size + 1), mp->socket_id, rg_flags);
+ if (bd->adoption_buffer_rings[lcore_id] == NULL)
+ goto error;
+
+ return 0;
+error:
+ rte_free(bd->buckets[lcore_id]);
+ bd->buckets[lcore_id] = NULL;
+ return -1;
+}
+
+static void
+bucket_uninit_per_lcore(unsigned int lcore_id, void *arg)
+{
+ struct bucket_data *bd = arg;
+
+ rte_ring_free(bd->adoption_buffer_rings[lcore_id]);
+ bd->adoption_buffer_rings[lcore_id] = NULL;
+ rte_free(bd->buckets[lcore_id]);
+ bd->buckets[lcore_id] = NULL;
}
static int
int rc = 0;
char rg_name[RTE_RING_NAMESIZE];
struct bucket_data *bd;
- unsigned int i;
unsigned int bucket_header_size;
+ size_t pg_sz;
+
+ rc = rte_mempool_get_page_size(mp, &pg_sz);
+ if (rc < 0)
+ return rc;
bd = rte_zmalloc_socket("bucket_pool", sizeof(*bd),
RTE_CACHE_LINE_SIZE, mp->socket_id);
goto no_mem_for_data;
}
bd->pool = mp;
- if (mp->flags & MEMPOOL_F_NO_CACHE_ALIGN)
+ if (mp->flags & RTE_MEMPOOL_F_NO_CACHE_ALIGN)
bucket_header_size = sizeof(struct bucket_header);
else
bucket_header_size = RTE_CACHE_LINE_SIZE;
RTE_BUILD_BUG_ON(sizeof(struct bucket_header) > RTE_CACHE_LINE_SIZE);
bd->header_size = mp->header_size + bucket_header_size;
bd->total_elt_size = mp->header_size + mp->elt_size + mp->trailer_size;
- bd->bucket_mem_size = RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB * 1024;
+ bd->bucket_mem_size = RTE_MIN(pg_sz,
+ (size_t)(RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB * 1024));
bd->obj_per_bucket = (bd->bucket_mem_size - bucket_header_size) /
bd->total_elt_size;
bd->bucket_page_mask = ~(rte_align64pow2(bd->bucket_mem_size) - 1);
+ /* eventually this should be a tunable parameter */
+ bd->bucket_stack_thresh = (mp->size / bd->obj_per_bucket) * 4 / 3;
- if (mp->flags & MEMPOOL_F_SP_PUT)
- rg_flags |= RING_F_SP_ENQ;
- if (mp->flags & MEMPOOL_F_SC_GET)
- rg_flags |= RING_F_SC_DEQ;
-
- for (i = 0; i < RTE_MAX_LCORE; i++) {
- if (!rte_lcore_is_enabled(i))
- continue;
- bd->buckets[i] =
- bucket_stack_create(mp, mp->size / bd->obj_per_bucket);
- if (bd->buckets[i] == NULL) {
- rc = -ENOMEM;
- goto no_mem_for_stacks;
- }
- rc = snprintf(rg_name, sizeof(rg_name),
- RTE_MEMPOOL_MZ_FORMAT ".a%u", mp->name, i);
- if (rc < 0 || rc >= (int)sizeof(rg_name)) {
- rc = -ENAMETOOLONG;
- goto no_mem_for_stacks;
- }
- bd->adoption_buffer_rings[i] =
- rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
- mp->socket_id,
- rg_flags | RING_F_SC_DEQ);
- if (bd->adoption_buffer_rings[i] == NULL) {
- rc = -rte_errno;
- goto no_mem_for_stacks;
- }
+ bd->lcore_callback_handle = rte_lcore_callback_register("bucket",
+ bucket_init_per_lcore, bucket_uninit_per_lcore, bd);
+ if (bd->lcore_callback_handle == NULL) {
+ rc = -ENOMEM;
+ goto no_mem_for_stacks;
}
+ if (mp->flags & RTE_MEMPOOL_F_SP_PUT)
+ rg_flags |= RING_F_SP_ENQ;
+ if (mp->flags & RTE_MEMPOOL_F_SC_GET)
+ rg_flags |= RING_F_SC_DEQ;
rc = snprintf(rg_name, sizeof(rg_name),
RTE_MEMPOOL_MZ_FORMAT ".0", mp->name);
if (rc < 0 || rc >= (int)sizeof(rg_name)) {
rte_ring_free(bd->shared_orphan_ring);
cannot_create_shared_orphan_ring:
invalid_shared_orphan_ring:
+ rte_lcore_callback_unregister(bd->lcore_callback_handle);
no_mem_for_stacks:
- for (i = 0; i < RTE_MAX_LCORE; i++) {
- rte_free(bd->buckets[i]);
- rte_ring_free(bd->adoption_buffer_rings[i]);
- }
rte_free(bd);
no_mem_for_data:
rte_errno = -rc;
static void
bucket_free(struct rte_mempool *mp)
{
- unsigned int i;
struct bucket_data *bd = mp->pool_data;
if (bd == NULL)
return;
- for (i = 0; i < RTE_MAX_LCORE; i++) {
- rte_free(bd->buckets[i]);
- rte_ring_free(bd->adoption_buffer_rings[i]);
- }
+ rte_lcore_callback_unregister(bd->lcore_callback_handle);
rte_ring_free(bd->shared_orphan_ring);
rte_ring_free(bd->shared_bucket_ring);
hdr->fill_cnt = 0;
hdr->lcore_id = LCORE_ID_ANY;
- rc = rte_mempool_op_populate_default(mp,
+ rc = rte_mempool_op_populate_helper(mp, 0,
RTE_MIN(bd->obj_per_bucket,
max_objs - n_objs),
iter + bucket_header_sz,
};
-MEMPOOL_REGISTER_OPS(ops_bucket);
+RTE_MEMPOOL_REGISTER_OPS(ops_bucket);