1 /* SPDX-License-Identifier: BSD-3-Clause
3 * Copyright (c) 2017-2018 Solarflare Communications Inc.
6 * This software was jointly developed between OKTET Labs (under contract
7 * for Solarflare) and Solarflare Communications, Inc.
14 #include <rte_errno.h>
16 #include <rte_mempool.h>
17 #include <rte_malloc.h>
20 * The general idea of the bucket mempool driver is as follows.
21 * We keep track of physically contiguous groups (buckets) of objects
22 * of a certain size. Every such a group has a counter that is
23 * incremented every time an object from that group is enqueued.
24 * Until the bucket is full, no objects from it are eligible for allocation.
25 * If a request is made to dequeue a multiply of bucket size, it is
26 * satisfied by returning the whole buckets, instead of separate objects.
30 struct bucket_header {
31 unsigned int lcore_id;
42 unsigned int header_size;
43 unsigned int total_elt_size;
44 unsigned int obj_per_bucket;
45 uintptr_t bucket_page_mask;
46 struct rte_ring *shared_bucket_ring;
47 struct bucket_stack *buckets[RTE_MAX_LCORE];
49 * Multi-producer single-consumer ring to hold objects that are
50 * returned to the mempool at a different lcore than initially
53 struct rte_ring *adoption_buffer_rings[RTE_MAX_LCORE];
54 struct rte_ring *shared_orphan_ring;
55 struct rte_mempool *pool;
56 unsigned int bucket_mem_size;
59 static struct bucket_stack *
60 bucket_stack_create(const struct rte_mempool *mp, unsigned int n_elts)
62 struct bucket_stack *stack;
64 stack = rte_zmalloc_socket("bucket_stack",
65 sizeof(struct bucket_stack) +
66 n_elts * sizeof(void *),
71 stack->limit = n_elts;
78 bucket_stack_push(struct bucket_stack *stack, void *obj)
80 RTE_ASSERT(stack->top < stack->limit);
81 stack->objects[stack->top++] = obj;
85 bucket_stack_pop_unsafe(struct bucket_stack *stack)
87 RTE_ASSERT(stack->top > 0);
88 return stack->objects[--stack->top];
92 bucket_stack_pop(struct bucket_stack *stack)
96 return bucket_stack_pop_unsafe(stack);
100 bucket_enqueue_single(struct bucket_data *bd, void *obj)
103 uintptr_t addr = (uintptr_t)obj;
104 struct bucket_header *hdr;
105 unsigned int lcore_id = rte_lcore_id();
107 addr &= bd->bucket_page_mask;
108 hdr = (struct bucket_header *)addr;
110 if (likely(hdr->lcore_id == lcore_id)) {
111 if (hdr->fill_cnt < bd->obj_per_bucket - 1) {
115 /* Stack is big enough to put all buckets */
116 bucket_stack_push(bd->buckets[lcore_id], hdr);
118 } else if (hdr->lcore_id != LCORE_ID_ANY) {
119 struct rte_ring *adopt_ring =
120 bd->adoption_buffer_rings[hdr->lcore_id];
122 rc = rte_ring_enqueue(adopt_ring, obj);
123 /* Ring is big enough to put all objects */
125 } else if (hdr->fill_cnt < bd->obj_per_bucket - 1) {
129 rc = rte_ring_enqueue(bd->shared_bucket_ring, hdr);
130 /* Ring is big enough to put all buckets */
138 bucket_enqueue(struct rte_mempool *mp, void * const *obj_table,
141 struct bucket_data *bd = mp->pool_data;
145 for (i = 0; i < n; i++) {
146 rc = bucket_enqueue_single(bd, obj_table[i]);
153 bucket_fill_obj_table(const struct bucket_data *bd, void **pstart,
154 void **obj_table, unsigned int n)
157 uint8_t *objptr = *pstart;
159 for (objptr += bd->header_size, i = 0; i < n;
160 i++, objptr += bd->total_elt_size)
161 *obj_table++ = objptr;
167 bucket_dequeue_orphans(struct bucket_data *bd, void **obj_table,
168 unsigned int n_orphans)
174 rc = rte_ring_dequeue_bulk(bd->shared_orphan_ring, obj_table,
176 if (unlikely(rc != (int)n_orphans)) {
177 struct bucket_header *hdr;
179 objptr = bucket_stack_pop(bd->buckets[rte_lcore_id()]);
180 hdr = (struct bucket_header *)objptr;
182 if (objptr == NULL) {
183 rc = rte_ring_dequeue(bd->shared_bucket_ring,
189 hdr = (struct bucket_header *)objptr;
190 hdr->lcore_id = rte_lcore_id();
193 bucket_fill_obj_table(bd, (void **)&objptr, obj_table,
195 for (i = n_orphans; i < bd->obj_per_bucket; i++,
196 objptr += bd->total_elt_size) {
197 rc = rte_ring_enqueue(bd->shared_orphan_ring,
211 bucket_dequeue_buckets(struct bucket_data *bd, void **obj_table,
212 unsigned int n_buckets)
214 struct bucket_stack *cur_stack = bd->buckets[rte_lcore_id()];
215 unsigned int n_buckets_from_stack = RTE_MIN(n_buckets, cur_stack->top);
216 void **obj_table_base = obj_table;
218 n_buckets -= n_buckets_from_stack;
219 while (n_buckets_from_stack-- > 0) {
220 void *obj = bucket_stack_pop_unsafe(cur_stack);
222 obj_table = bucket_fill_obj_table(bd, &obj, obj_table,
225 while (n_buckets-- > 0) {
226 struct bucket_header *hdr;
228 if (unlikely(rte_ring_dequeue(bd->shared_bucket_ring,
229 (void **)&hdr) != 0)) {
231 * Return the already-dequeued buffers
232 * back to the mempool
234 bucket_enqueue(bd->pool, obj_table_base,
235 obj_table - obj_table_base);
239 hdr->lcore_id = rte_lcore_id();
240 obj_table = bucket_fill_obj_table(bd, (void **)&hdr,
249 bucket_adopt_orphans(struct bucket_data *bd)
252 struct rte_ring *adopt_ring =
253 bd->adoption_buffer_rings[rte_lcore_id()];
255 if (unlikely(!rte_ring_empty(adopt_ring))) {
258 while (rte_ring_sc_dequeue(adopt_ring, &orphan) == 0) {
259 rc = bucket_enqueue_single(bd, orphan);
267 bucket_dequeue(struct rte_mempool *mp, void **obj_table, unsigned int n)
269 struct bucket_data *bd = mp->pool_data;
270 unsigned int n_buckets = n / bd->obj_per_bucket;
271 unsigned int n_orphans = n - n_buckets * bd->obj_per_bucket;
274 bucket_adopt_orphans(bd);
276 if (unlikely(n_orphans > 0)) {
277 rc = bucket_dequeue_orphans(bd, obj_table +
278 (n_buckets * bd->obj_per_bucket),
284 if (likely(n_buckets > 0)) {
285 rc = bucket_dequeue_buckets(bd, obj_table, n_buckets);
286 if (unlikely(rc != 0) && n_orphans > 0) {
287 rte_ring_enqueue_bulk(bd->shared_orphan_ring,
288 obj_table + (n_buckets *
298 bucket_dequeue_contig_blocks(struct rte_mempool *mp, void **first_obj_table,
301 struct bucket_data *bd = mp->pool_data;
302 const uint32_t header_size = bd->header_size;
303 struct bucket_stack *cur_stack = bd->buckets[rte_lcore_id()];
304 unsigned int n_buckets_from_stack = RTE_MIN(n, cur_stack->top);
305 struct bucket_header *hdr;
306 void **first_objp = first_obj_table;
308 bucket_adopt_orphans(bd);
310 n -= n_buckets_from_stack;
311 while (n_buckets_from_stack-- > 0) {
312 hdr = bucket_stack_pop_unsafe(cur_stack);
313 *first_objp++ = (uint8_t *)hdr + header_size;
316 if (unlikely(rte_ring_dequeue_bulk(bd->shared_bucket_ring,
317 first_objp, n, NULL) != n)) {
318 /* Return the already dequeued buckets */
319 while (first_objp-- != first_obj_table) {
320 bucket_stack_push(cur_stack,
321 (uint8_t *)*first_objp -
328 hdr = (struct bucket_header *)*first_objp;
329 hdr->lcore_id = rte_lcore_id();
330 *first_objp++ = (uint8_t *)hdr + header_size;
338 count_underfilled_buckets(struct rte_mempool *mp,
340 struct rte_mempool_memhdr *memhdr,
341 __rte_unused unsigned int mem_idx)
343 unsigned int *pcount = opaque;
344 const struct bucket_data *bd = mp->pool_data;
345 unsigned int bucket_page_sz =
346 (unsigned int)(~bd->bucket_page_mask + 1);
350 align = (uintptr_t)RTE_PTR_ALIGN_CEIL(memhdr->addr, bucket_page_sz) -
351 (uintptr_t)memhdr->addr;
353 for (iter = (uint8_t *)memhdr->addr + align;
354 iter < (uint8_t *)memhdr->addr + memhdr->len;
355 iter += bucket_page_sz) {
356 struct bucket_header *hdr = (struct bucket_header *)iter;
358 *pcount += hdr->fill_cnt;
363 bucket_get_count(const struct rte_mempool *mp)
365 const struct bucket_data *bd = mp->pool_data;
367 bd->obj_per_bucket * rte_ring_count(bd->shared_bucket_ring) +
368 rte_ring_count(bd->shared_orphan_ring);
371 for (i = 0; i < RTE_MAX_LCORE; i++) {
372 if (!rte_lcore_is_enabled(i))
374 count += bd->obj_per_bucket * bd->buckets[i]->top +
375 rte_ring_count(bd->adoption_buffer_rings[i]);
378 rte_mempool_mem_iter((struct rte_mempool *)(uintptr_t)mp,
379 count_underfilled_buckets, &count);
385 bucket_alloc(struct rte_mempool *mp)
389 char rg_name[RTE_RING_NAMESIZE];
390 struct bucket_data *bd;
392 unsigned int bucket_header_size;
394 bd = rte_zmalloc_socket("bucket_pool", sizeof(*bd),
395 RTE_CACHE_LINE_SIZE, mp->socket_id);
398 goto no_mem_for_data;
401 if (mp->flags & MEMPOOL_F_NO_CACHE_ALIGN)
402 bucket_header_size = sizeof(struct bucket_header);
404 bucket_header_size = RTE_CACHE_LINE_SIZE;
405 RTE_BUILD_BUG_ON(sizeof(struct bucket_header) > RTE_CACHE_LINE_SIZE);
406 bd->header_size = mp->header_size + bucket_header_size;
407 bd->total_elt_size = mp->header_size + mp->elt_size + mp->trailer_size;
408 bd->bucket_mem_size = RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB * 1024;
409 bd->obj_per_bucket = (bd->bucket_mem_size - bucket_header_size) /
411 bd->bucket_page_mask = ~(rte_align64pow2(bd->bucket_mem_size) - 1);
413 if (mp->flags & MEMPOOL_F_SP_PUT)
414 rg_flags |= RING_F_SP_ENQ;
415 if (mp->flags & MEMPOOL_F_SC_GET)
416 rg_flags |= RING_F_SC_DEQ;
418 for (i = 0; i < RTE_MAX_LCORE; i++) {
419 if (!rte_lcore_is_enabled(i))
422 bucket_stack_create(mp, mp->size / bd->obj_per_bucket);
423 if (bd->buckets[i] == NULL) {
425 goto no_mem_for_stacks;
427 rc = snprintf(rg_name, sizeof(rg_name),
428 RTE_MEMPOOL_MZ_FORMAT ".a%u", mp->name, i);
429 if (rc < 0 || rc >= (int)sizeof(rg_name)) {
431 goto no_mem_for_stacks;
433 bd->adoption_buffer_rings[i] =
434 rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
436 rg_flags | RING_F_SC_DEQ);
437 if (bd->adoption_buffer_rings[i] == NULL) {
439 goto no_mem_for_stacks;
443 rc = snprintf(rg_name, sizeof(rg_name),
444 RTE_MEMPOOL_MZ_FORMAT ".0", mp->name);
445 if (rc < 0 || rc >= (int)sizeof(rg_name)) {
447 goto invalid_shared_orphan_ring;
449 bd->shared_orphan_ring =
450 rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
451 mp->socket_id, rg_flags);
452 if (bd->shared_orphan_ring == NULL) {
454 goto cannot_create_shared_orphan_ring;
457 rc = snprintf(rg_name, sizeof(rg_name),
458 RTE_MEMPOOL_MZ_FORMAT ".1", mp->name);
459 if (rc < 0 || rc >= (int)sizeof(rg_name)) {
461 goto invalid_shared_bucket_ring;
463 bd->shared_bucket_ring =
464 rte_ring_create(rg_name,
465 rte_align32pow2((mp->size + 1) /
467 mp->socket_id, rg_flags);
468 if (bd->shared_bucket_ring == NULL) {
470 goto cannot_create_shared_bucket_ring;
477 cannot_create_shared_bucket_ring:
478 invalid_shared_bucket_ring:
479 rte_ring_free(bd->shared_orphan_ring);
480 cannot_create_shared_orphan_ring:
481 invalid_shared_orphan_ring:
483 for (i = 0; i < RTE_MAX_LCORE; i++) {
484 rte_free(bd->buckets[i]);
485 rte_ring_free(bd->adoption_buffer_rings[i]);
494 bucket_free(struct rte_mempool *mp)
497 struct bucket_data *bd = mp->pool_data;
502 for (i = 0; i < RTE_MAX_LCORE; i++) {
503 rte_free(bd->buckets[i]);
504 rte_ring_free(bd->adoption_buffer_rings[i]);
507 rte_ring_free(bd->shared_orphan_ring);
508 rte_ring_free(bd->shared_bucket_ring);
514 bucket_calc_mem_size(const struct rte_mempool *mp, uint32_t obj_num,
515 __rte_unused uint32_t pg_shift, size_t *min_total_elt_size,
518 struct bucket_data *bd = mp->pool_data;
519 unsigned int bucket_page_sz;
524 bucket_page_sz = rte_align32pow2(bd->bucket_mem_size);
525 *align = bucket_page_sz;
526 *min_total_elt_size = bucket_page_sz;
528 * Each bucket occupies its own block aligned to
529 * bucket_page_sz, so the required amount of memory is
530 * a multiple of bucket_page_sz.
531 * We also need extra space for a bucket header
533 return ((obj_num + bd->obj_per_bucket - 1) /
534 bd->obj_per_bucket) * bucket_page_sz;
538 bucket_populate(struct rte_mempool *mp, unsigned int max_objs,
539 void *vaddr, rte_iova_t iova, size_t len,
540 rte_mempool_populate_obj_cb_t *obj_cb, void *obj_cb_arg)
542 struct bucket_data *bd = mp->pool_data;
543 unsigned int bucket_page_sz;
544 unsigned int bucket_header_sz;
553 bucket_page_sz = rte_align32pow2(bd->bucket_mem_size);
554 align = RTE_PTR_ALIGN_CEIL((uintptr_t)vaddr, bucket_page_sz) -
557 bucket_header_sz = bd->header_size - mp->header_size;
558 if (iova != RTE_BAD_IOVA)
559 iova += align + bucket_header_sz;
561 for (iter = (uint8_t *)vaddr + align, n_objs = 0;
562 iter < (uint8_t *)vaddr + len && n_objs < max_objs;
563 iter += bucket_page_sz) {
564 struct bucket_header *hdr = (struct bucket_header *)iter;
565 unsigned int chunk_len = bd->bucket_mem_size;
567 if ((size_t)(iter - (uint8_t *)vaddr) + chunk_len > len)
568 chunk_len = len - (iter - (uint8_t *)vaddr);
569 if (chunk_len <= bucket_header_sz)
571 chunk_len -= bucket_header_sz;
574 hdr->lcore_id = LCORE_ID_ANY;
575 rc = rte_mempool_op_populate_default(mp,
576 RTE_MIN(bd->obj_per_bucket,
578 iter + bucket_header_sz,
584 if (iova != RTE_BAD_IOVA)
585 iova += bucket_page_sz;
592 bucket_get_info(const struct rte_mempool *mp, struct rte_mempool_info *info)
594 struct bucket_data *bd = mp->pool_data;
596 info->contig_block_size = bd->obj_per_bucket;
601 static const struct rte_mempool_ops ops_bucket = {
603 .alloc = bucket_alloc,
605 .enqueue = bucket_enqueue,
606 .dequeue = bucket_dequeue,
607 .get_count = bucket_get_count,
608 .calc_mem_size = bucket_calc_mem_size,
609 .populate = bucket_populate,
610 .get_info = bucket_get_info,
611 .dequeue_contig_blocks = bucket_dequeue_contig_blocks,
615 MEMPOOL_REGISTER_OPS(ops_bucket);