1 /* SPDX-License-Identifier: BSD-3-Clause
3 * Copyright (c) 2017-2018 Solarflare Communications Inc.
6 * This software was jointly developed between OKTET Labs (under contract
7 * for Solarflare) and Solarflare Communications, Inc.
14 #include <rte_errno.h>
16 #include <rte_mempool.h>
17 #include <rte_malloc.h>
20 * The general idea of the bucket mempool driver is as follows.
21 * We keep track of physically contiguous groups (buckets) of objects
22 * of a certain size. Every such a group has a counter that is
23 * incremented every time an object from that group is enqueued.
24 * Until the bucket is full, no objects from it are eligible for allocation.
25 * If a request is made to dequeue a multiply of bucket size, it is
26 * satisfied by returning the whole buckets, instead of separate objects.
30 struct bucket_header {
31 unsigned int lcore_id;
42 unsigned int header_size;
43 unsigned int total_elt_size;
44 unsigned int obj_per_bucket;
45 unsigned int bucket_stack_thresh;
46 uintptr_t bucket_page_mask;
47 struct rte_ring *shared_bucket_ring;
48 struct bucket_stack *buckets[RTE_MAX_LCORE];
50 * Multi-producer single-consumer ring to hold objects that are
51 * returned to the mempool at a different lcore than initially
54 struct rte_ring *adoption_buffer_rings[RTE_MAX_LCORE];
55 struct rte_ring *shared_orphan_ring;
56 struct rte_mempool *pool;
57 unsigned int bucket_mem_size;
58 void *lcore_callback_handle;
61 static struct bucket_stack *
62 bucket_stack_create(const struct rte_mempool *mp, unsigned int n_elts)
64 struct bucket_stack *stack;
66 stack = rte_zmalloc_socket("bucket_stack",
67 sizeof(struct bucket_stack) +
68 n_elts * sizeof(void *),
73 stack->limit = n_elts;
80 bucket_stack_push(struct bucket_stack *stack, void *obj)
82 RTE_ASSERT(stack->top < stack->limit);
83 stack->objects[stack->top++] = obj;
87 bucket_stack_pop_unsafe(struct bucket_stack *stack)
89 RTE_ASSERT(stack->top > 0);
90 return stack->objects[--stack->top];
94 bucket_stack_pop(struct bucket_stack *stack)
98 return bucket_stack_pop_unsafe(stack);
102 bucket_enqueue_single(struct bucket_data *bd, void *obj)
105 uintptr_t addr = (uintptr_t)obj;
106 struct bucket_header *hdr;
107 unsigned int lcore_id = rte_lcore_id();
109 addr &= bd->bucket_page_mask;
110 hdr = (struct bucket_header *)addr;
112 if (likely(hdr->lcore_id == lcore_id)) {
113 if (hdr->fill_cnt < bd->obj_per_bucket - 1) {
117 /* Stack is big enough to put all buckets */
118 bucket_stack_push(bd->buckets[lcore_id], hdr);
120 } else if (hdr->lcore_id != LCORE_ID_ANY) {
121 struct rte_ring *adopt_ring =
122 bd->adoption_buffer_rings[hdr->lcore_id];
124 rc = rte_ring_enqueue(adopt_ring, obj);
125 /* Ring is big enough to put all objects */
127 } else if (hdr->fill_cnt < bd->obj_per_bucket - 1) {
131 rc = rte_ring_enqueue(bd->shared_bucket_ring, hdr);
132 /* Ring is big enough to put all buckets */
140 bucket_enqueue(struct rte_mempool *mp, void * const *obj_table,
143 struct bucket_data *bd = mp->pool_data;
144 struct bucket_stack *local_stack = bd->buckets[rte_lcore_id()];
148 for (i = 0; i < n; i++) {
149 rc = bucket_enqueue_single(bd, obj_table[i]);
152 if (local_stack->top > bd->bucket_stack_thresh) {
153 rte_ring_enqueue_bulk(bd->shared_bucket_ring,
154 &local_stack->objects
155 [bd->bucket_stack_thresh],
157 bd->bucket_stack_thresh,
159 local_stack->top = bd->bucket_stack_thresh;
165 bucket_fill_obj_table(const struct bucket_data *bd, void **pstart,
166 void **obj_table, unsigned int n)
169 uint8_t *objptr = *pstart;
171 for (objptr += bd->header_size, i = 0; i < n;
172 i++, objptr += bd->total_elt_size)
173 *obj_table++ = objptr;
179 bucket_dequeue_orphans(struct bucket_data *bd, void **obj_table,
180 unsigned int n_orphans)
186 rc = rte_ring_dequeue_bulk(bd->shared_orphan_ring, obj_table,
188 if (unlikely(rc != (int)n_orphans)) {
189 struct bucket_header *hdr;
191 objptr = bucket_stack_pop(bd->buckets[rte_lcore_id()]);
192 hdr = (struct bucket_header *)objptr;
194 if (objptr == NULL) {
195 rc = rte_ring_dequeue(bd->shared_bucket_ring,
201 hdr = (struct bucket_header *)objptr;
202 hdr->lcore_id = rte_lcore_id();
205 bucket_fill_obj_table(bd, (void **)&objptr, obj_table,
207 for (i = n_orphans; i < bd->obj_per_bucket; i++,
208 objptr += bd->total_elt_size) {
209 rc = rte_ring_enqueue(bd->shared_orphan_ring,
223 bucket_dequeue_buckets(struct bucket_data *bd, void **obj_table,
224 unsigned int n_buckets)
226 struct bucket_stack *cur_stack = bd->buckets[rte_lcore_id()];
227 unsigned int n_buckets_from_stack = RTE_MIN(n_buckets, cur_stack->top);
228 void **obj_table_base = obj_table;
230 n_buckets -= n_buckets_from_stack;
231 while (n_buckets_from_stack-- > 0) {
232 void *obj = bucket_stack_pop_unsafe(cur_stack);
234 obj_table = bucket_fill_obj_table(bd, &obj, obj_table,
237 while (n_buckets-- > 0) {
238 struct bucket_header *hdr;
240 if (unlikely(rte_ring_dequeue(bd->shared_bucket_ring,
241 (void **)&hdr) != 0)) {
243 * Return the already-dequeued buffers
244 * back to the mempool
246 bucket_enqueue(bd->pool, obj_table_base,
247 obj_table - obj_table_base);
251 hdr->lcore_id = rte_lcore_id();
252 obj_table = bucket_fill_obj_table(bd, (void **)&hdr,
261 bucket_adopt_orphans(struct bucket_data *bd)
264 struct rte_ring *adopt_ring =
265 bd->adoption_buffer_rings[rte_lcore_id()];
267 if (unlikely(!rte_ring_empty(adopt_ring))) {
270 while (rte_ring_sc_dequeue(adopt_ring, &orphan) == 0) {
271 rc = bucket_enqueue_single(bd, orphan);
279 bucket_dequeue(struct rte_mempool *mp, void **obj_table, unsigned int n)
281 struct bucket_data *bd = mp->pool_data;
282 unsigned int n_buckets = n / bd->obj_per_bucket;
283 unsigned int n_orphans = n - n_buckets * bd->obj_per_bucket;
286 bucket_adopt_orphans(bd);
288 if (unlikely(n_orphans > 0)) {
289 rc = bucket_dequeue_orphans(bd, obj_table +
290 (n_buckets * bd->obj_per_bucket),
296 if (likely(n_buckets > 0)) {
297 rc = bucket_dequeue_buckets(bd, obj_table, n_buckets);
298 if (unlikely(rc != 0) && n_orphans > 0) {
299 rte_ring_enqueue_bulk(bd->shared_orphan_ring,
300 obj_table + (n_buckets *
310 bucket_dequeue_contig_blocks(struct rte_mempool *mp, void **first_obj_table,
313 struct bucket_data *bd = mp->pool_data;
314 const uint32_t header_size = bd->header_size;
315 struct bucket_stack *cur_stack = bd->buckets[rte_lcore_id()];
316 unsigned int n_buckets_from_stack = RTE_MIN(n, cur_stack->top);
317 struct bucket_header *hdr;
318 void **first_objp = first_obj_table;
320 bucket_adopt_orphans(bd);
322 n -= n_buckets_from_stack;
323 while (n_buckets_from_stack-- > 0) {
324 hdr = bucket_stack_pop_unsafe(cur_stack);
325 *first_objp++ = (uint8_t *)hdr + header_size;
328 if (unlikely(rte_ring_dequeue_bulk(bd->shared_bucket_ring,
329 first_objp, n, NULL) != n)) {
330 /* Return the already dequeued buckets */
331 while (first_objp-- != first_obj_table) {
332 bucket_stack_push(cur_stack,
333 (uint8_t *)*first_objp -
340 hdr = (struct bucket_header *)*first_objp;
341 hdr->lcore_id = rte_lcore_id();
342 *first_objp++ = (uint8_t *)hdr + header_size;
349 struct bucket_count_per_lcore_ctx {
350 const struct bucket_data *bd;
355 bucket_count_per_lcore(unsigned int lcore_id, void *arg)
357 struct bucket_count_per_lcore_ctx *bplc = arg;
359 bplc->count += bplc->bd->obj_per_bucket *
360 bplc->bd->buckets[lcore_id]->top;
362 rte_ring_count(bplc->bd->adoption_buffer_rings[lcore_id]);
367 count_underfilled_buckets(struct rte_mempool *mp,
369 struct rte_mempool_memhdr *memhdr,
370 __rte_unused unsigned int mem_idx)
372 unsigned int *pcount = opaque;
373 const struct bucket_data *bd = mp->pool_data;
374 unsigned int bucket_page_sz =
375 (unsigned int)(~bd->bucket_page_mask + 1);
379 align = (uintptr_t)RTE_PTR_ALIGN_CEIL(memhdr->addr, bucket_page_sz) -
380 (uintptr_t)memhdr->addr;
382 for (iter = (uint8_t *)memhdr->addr + align;
383 iter < (uint8_t *)memhdr->addr + memhdr->len;
384 iter += bucket_page_sz) {
385 struct bucket_header *hdr = (struct bucket_header *)iter;
387 *pcount += hdr->fill_cnt;
392 bucket_get_count(const struct rte_mempool *mp)
394 struct bucket_count_per_lcore_ctx bplc;
396 bplc.bd = mp->pool_data;
397 bplc.count = bplc.bd->obj_per_bucket *
398 rte_ring_count(bplc.bd->shared_bucket_ring);
399 bplc.count += rte_ring_count(bplc.bd->shared_orphan_ring);
401 rte_lcore_iterate(bucket_count_per_lcore, &bplc);
402 rte_mempool_mem_iter((struct rte_mempool *)(uintptr_t)mp,
403 count_underfilled_buckets, &bplc.count);
409 bucket_init_per_lcore(unsigned int lcore_id, void *arg)
411 char rg_name[RTE_RING_NAMESIZE];
412 struct bucket_data *bd = arg;
413 struct rte_mempool *mp;
418 bd->buckets[lcore_id] = bucket_stack_create(mp,
419 mp->size / bd->obj_per_bucket);
420 if (bd->buckets[lcore_id] == NULL)
423 rc = snprintf(rg_name, sizeof(rg_name), RTE_MEMPOOL_MZ_FORMAT ".a%u",
425 if (rc < 0 || rc >= (int)sizeof(rg_name))
428 rg_flags = RING_F_SC_DEQ;
429 if (mp->flags & MEMPOOL_F_SP_PUT)
430 rg_flags |= RING_F_SP_ENQ;
431 bd->adoption_buffer_rings[lcore_id] = rte_ring_create(rg_name,
432 rte_align32pow2(mp->size + 1), mp->socket_id, rg_flags);
433 if (bd->adoption_buffer_rings[lcore_id] == NULL)
438 rte_free(bd->buckets[lcore_id]);
439 bd->buckets[lcore_id] = NULL;
444 bucket_uninit_per_lcore(unsigned int lcore_id, void *arg)
446 struct bucket_data *bd = arg;
448 rte_ring_free(bd->adoption_buffer_rings[lcore_id]);
449 bd->adoption_buffer_rings[lcore_id] = NULL;
450 rte_free(bd->buckets[lcore_id]);
451 bd->buckets[lcore_id] = NULL;
455 bucket_alloc(struct rte_mempool *mp)
459 char rg_name[RTE_RING_NAMESIZE];
460 struct bucket_data *bd;
461 unsigned int bucket_header_size;
464 rc = rte_mempool_get_page_size(mp, &pg_sz);
468 bd = rte_zmalloc_socket("bucket_pool", sizeof(*bd),
469 RTE_CACHE_LINE_SIZE, mp->socket_id);
472 goto no_mem_for_data;
475 if (mp->flags & MEMPOOL_F_NO_CACHE_ALIGN)
476 bucket_header_size = sizeof(struct bucket_header);
478 bucket_header_size = RTE_CACHE_LINE_SIZE;
479 RTE_BUILD_BUG_ON(sizeof(struct bucket_header) > RTE_CACHE_LINE_SIZE);
480 bd->header_size = mp->header_size + bucket_header_size;
481 bd->total_elt_size = mp->header_size + mp->elt_size + mp->trailer_size;
482 bd->bucket_mem_size = RTE_MIN(pg_sz,
483 (size_t)(RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB * 1024));
484 bd->obj_per_bucket = (bd->bucket_mem_size - bucket_header_size) /
486 bd->bucket_page_mask = ~(rte_align64pow2(bd->bucket_mem_size) - 1);
487 /* eventually this should be a tunable parameter */
488 bd->bucket_stack_thresh = (mp->size / bd->obj_per_bucket) * 4 / 3;
490 bd->lcore_callback_handle = rte_lcore_callback_register("bucket",
491 bucket_init_per_lcore, bucket_uninit_per_lcore, bd);
492 if (bd->lcore_callback_handle == NULL) {
494 goto no_mem_for_stacks;
497 if (mp->flags & MEMPOOL_F_SP_PUT)
498 rg_flags |= RING_F_SP_ENQ;
499 if (mp->flags & MEMPOOL_F_SC_GET)
500 rg_flags |= RING_F_SC_DEQ;
501 rc = snprintf(rg_name, sizeof(rg_name),
502 RTE_MEMPOOL_MZ_FORMAT ".0", mp->name);
503 if (rc < 0 || rc >= (int)sizeof(rg_name)) {
505 goto invalid_shared_orphan_ring;
507 bd->shared_orphan_ring =
508 rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
509 mp->socket_id, rg_flags);
510 if (bd->shared_orphan_ring == NULL) {
512 goto cannot_create_shared_orphan_ring;
515 rc = snprintf(rg_name, sizeof(rg_name),
516 RTE_MEMPOOL_MZ_FORMAT ".1", mp->name);
517 if (rc < 0 || rc >= (int)sizeof(rg_name)) {
519 goto invalid_shared_bucket_ring;
521 bd->shared_bucket_ring =
522 rte_ring_create(rg_name,
523 rte_align32pow2((mp->size + 1) /
525 mp->socket_id, rg_flags);
526 if (bd->shared_bucket_ring == NULL) {
528 goto cannot_create_shared_bucket_ring;
535 cannot_create_shared_bucket_ring:
536 invalid_shared_bucket_ring:
537 rte_ring_free(bd->shared_orphan_ring);
538 cannot_create_shared_orphan_ring:
539 invalid_shared_orphan_ring:
540 rte_lcore_callback_unregister(bd->lcore_callback_handle);
549 bucket_free(struct rte_mempool *mp)
551 struct bucket_data *bd = mp->pool_data;
556 rte_lcore_callback_unregister(bd->lcore_callback_handle);
558 rte_ring_free(bd->shared_orphan_ring);
559 rte_ring_free(bd->shared_bucket_ring);
565 bucket_calc_mem_size(const struct rte_mempool *mp, uint32_t obj_num,
566 __rte_unused uint32_t pg_shift, size_t *min_total_elt_size,
569 struct bucket_data *bd = mp->pool_data;
570 unsigned int bucket_page_sz;
575 bucket_page_sz = rte_align32pow2(bd->bucket_mem_size);
576 *align = bucket_page_sz;
577 *min_total_elt_size = bucket_page_sz;
579 * Each bucket occupies its own block aligned to
580 * bucket_page_sz, so the required amount of memory is
581 * a multiple of bucket_page_sz.
582 * We also need extra space for a bucket header
584 return ((obj_num + bd->obj_per_bucket - 1) /
585 bd->obj_per_bucket) * bucket_page_sz;
589 bucket_populate(struct rte_mempool *mp, unsigned int max_objs,
590 void *vaddr, rte_iova_t iova, size_t len,
591 rte_mempool_populate_obj_cb_t *obj_cb, void *obj_cb_arg)
593 struct bucket_data *bd = mp->pool_data;
594 unsigned int bucket_page_sz;
595 unsigned int bucket_header_sz;
604 bucket_page_sz = rte_align32pow2(bd->bucket_mem_size);
605 align = RTE_PTR_ALIGN_CEIL((uintptr_t)vaddr, bucket_page_sz) -
608 bucket_header_sz = bd->header_size - mp->header_size;
609 if (iova != RTE_BAD_IOVA)
610 iova += align + bucket_header_sz;
612 for (iter = (uint8_t *)vaddr + align, n_objs = 0;
613 iter < (uint8_t *)vaddr + len && n_objs < max_objs;
614 iter += bucket_page_sz) {
615 struct bucket_header *hdr = (struct bucket_header *)iter;
616 unsigned int chunk_len = bd->bucket_mem_size;
618 if ((size_t)(iter - (uint8_t *)vaddr) + chunk_len > len)
619 chunk_len = len - (iter - (uint8_t *)vaddr);
620 if (chunk_len <= bucket_header_sz)
622 chunk_len -= bucket_header_sz;
625 hdr->lcore_id = LCORE_ID_ANY;
626 rc = rte_mempool_op_populate_helper(mp, 0,
627 RTE_MIN(bd->obj_per_bucket,
629 iter + bucket_header_sz,
635 if (iova != RTE_BAD_IOVA)
636 iova += bucket_page_sz;
643 bucket_get_info(const struct rte_mempool *mp, struct rte_mempool_info *info)
645 struct bucket_data *bd = mp->pool_data;
647 info->contig_block_size = bd->obj_per_bucket;
652 static const struct rte_mempool_ops ops_bucket = {
654 .alloc = bucket_alloc,
656 .enqueue = bucket_enqueue,
657 .dequeue = bucket_dequeue,
658 .get_count = bucket_get_count,
659 .calc_mem_size = bucket_calc_mem_size,
660 .populate = bucket_populate,
661 .get_info = bucket_get_info,
662 .dequeue_contig_blocks = bucket_dequeue_contig_blocks,
666 MEMPOOL_REGISTER_OPS(ops_bucket);