mempool: prevent objects from being across pages
[dpdk.git] / drivers / mempool / bucket / rte_mempool_bucket.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2017-2018 Solarflare Communications Inc.
4  * All rights reserved.
5  *
6  * This software was jointly developed between OKTET Labs (under contract
7  * for Solarflare) and Solarflare Communications, Inc.
8  */
9
10 #include <stdbool.h>
11 #include <stdio.h>
12 #include <string.h>
13
14 #include <rte_errno.h>
15 #include <rte_ring.h>
16 #include <rte_mempool.h>
17 #include <rte_malloc.h>
18
19 /*
20  * The general idea of the bucket mempool driver is as follows.
21  * We keep track of physically contiguous groups (buckets) of objects
22  * of a certain size. Every such a group has a counter that is
23  * incremented every time an object from that group is enqueued.
24  * Until the bucket is full, no objects from it are eligible for allocation.
25  * If a request is made to dequeue a multiply of bucket size, it is
26  * satisfied by returning the whole buckets, instead of separate objects.
27  */
28
29
30 struct bucket_header {
31         unsigned int lcore_id;
32         uint8_t fill_cnt;
33 };
34
35 struct bucket_stack {
36         unsigned int top;
37         unsigned int limit;
38         void *objects[];
39 };
40
41 struct bucket_data {
42         unsigned int header_size;
43         unsigned int total_elt_size;
44         unsigned int obj_per_bucket;
45         unsigned int bucket_stack_thresh;
46         uintptr_t bucket_page_mask;
47         struct rte_ring *shared_bucket_ring;
48         struct bucket_stack *buckets[RTE_MAX_LCORE];
49         /*
50          * Multi-producer single-consumer ring to hold objects that are
51          * returned to the mempool at a different lcore than initially
52          * dequeued
53          */
54         struct rte_ring *adoption_buffer_rings[RTE_MAX_LCORE];
55         struct rte_ring *shared_orphan_ring;
56         struct rte_mempool *pool;
57         unsigned int bucket_mem_size;
58 };
59
60 static struct bucket_stack *
61 bucket_stack_create(const struct rte_mempool *mp, unsigned int n_elts)
62 {
63         struct bucket_stack *stack;
64
65         stack = rte_zmalloc_socket("bucket_stack",
66                                    sizeof(struct bucket_stack) +
67                                    n_elts * sizeof(void *),
68                                    RTE_CACHE_LINE_SIZE,
69                                    mp->socket_id);
70         if (stack == NULL)
71                 return NULL;
72         stack->limit = n_elts;
73         stack->top = 0;
74
75         return stack;
76 }
77
78 static void
79 bucket_stack_push(struct bucket_stack *stack, void *obj)
80 {
81         RTE_ASSERT(stack->top < stack->limit);
82         stack->objects[stack->top++] = obj;
83 }
84
85 static void *
86 bucket_stack_pop_unsafe(struct bucket_stack *stack)
87 {
88         RTE_ASSERT(stack->top > 0);
89         return stack->objects[--stack->top];
90 }
91
92 static void *
93 bucket_stack_pop(struct bucket_stack *stack)
94 {
95         if (stack->top == 0)
96                 return NULL;
97         return bucket_stack_pop_unsafe(stack);
98 }
99
100 static int
101 bucket_enqueue_single(struct bucket_data *bd, void *obj)
102 {
103         int rc = 0;
104         uintptr_t addr = (uintptr_t)obj;
105         struct bucket_header *hdr;
106         unsigned int lcore_id = rte_lcore_id();
107
108         addr &= bd->bucket_page_mask;
109         hdr = (struct bucket_header *)addr;
110
111         if (likely(hdr->lcore_id == lcore_id)) {
112                 if (hdr->fill_cnt < bd->obj_per_bucket - 1) {
113                         hdr->fill_cnt++;
114                 } else {
115                         hdr->fill_cnt = 0;
116                         /* Stack is big enough to put all buckets */
117                         bucket_stack_push(bd->buckets[lcore_id], hdr);
118                 }
119         } else if (hdr->lcore_id != LCORE_ID_ANY) {
120                 struct rte_ring *adopt_ring =
121                         bd->adoption_buffer_rings[hdr->lcore_id];
122
123                 rc = rte_ring_enqueue(adopt_ring, obj);
124                 /* Ring is big enough to put all objects */
125                 RTE_ASSERT(rc == 0);
126         } else if (hdr->fill_cnt < bd->obj_per_bucket - 1) {
127                 hdr->fill_cnt++;
128         } else {
129                 hdr->fill_cnt = 0;
130                 rc = rte_ring_enqueue(bd->shared_bucket_ring, hdr);
131                 /* Ring is big enough to put all buckets */
132                 RTE_ASSERT(rc == 0);
133         }
134
135         return rc;
136 }
137
138 static int
139 bucket_enqueue(struct rte_mempool *mp, void * const *obj_table,
140                unsigned int n)
141 {
142         struct bucket_data *bd = mp->pool_data;
143         struct bucket_stack *local_stack = bd->buckets[rte_lcore_id()];
144         unsigned int i;
145         int rc = 0;
146
147         for (i = 0; i < n; i++) {
148                 rc = bucket_enqueue_single(bd, obj_table[i]);
149                 RTE_ASSERT(rc == 0);
150         }
151         if (local_stack->top > bd->bucket_stack_thresh) {
152                 rte_ring_enqueue_bulk(bd->shared_bucket_ring,
153                                       &local_stack->objects
154                                       [bd->bucket_stack_thresh],
155                                       local_stack->top -
156                                       bd->bucket_stack_thresh,
157                                       NULL);
158             local_stack->top = bd->bucket_stack_thresh;
159         }
160         return rc;
161 }
162
163 static void **
164 bucket_fill_obj_table(const struct bucket_data *bd, void **pstart,
165                       void **obj_table, unsigned int n)
166 {
167         unsigned int i;
168         uint8_t *objptr = *pstart;
169
170         for (objptr += bd->header_size, i = 0; i < n;
171              i++, objptr += bd->total_elt_size)
172                 *obj_table++ = objptr;
173         *pstart = objptr;
174         return obj_table;
175 }
176
177 static int
178 bucket_dequeue_orphans(struct bucket_data *bd, void **obj_table,
179                        unsigned int n_orphans)
180 {
181         unsigned int i;
182         int rc;
183         uint8_t *objptr;
184
185         rc = rte_ring_dequeue_bulk(bd->shared_orphan_ring, obj_table,
186                                    n_orphans, NULL);
187         if (unlikely(rc != (int)n_orphans)) {
188                 struct bucket_header *hdr;
189
190                 objptr = bucket_stack_pop(bd->buckets[rte_lcore_id()]);
191                 hdr = (struct bucket_header *)objptr;
192
193                 if (objptr == NULL) {
194                         rc = rte_ring_dequeue(bd->shared_bucket_ring,
195                                               (void **)&objptr);
196                         if (rc != 0) {
197                                 rte_errno = ENOBUFS;
198                                 return -rte_errno;
199                         }
200                         hdr = (struct bucket_header *)objptr;
201                         hdr->lcore_id = rte_lcore_id();
202                 }
203                 hdr->fill_cnt = 0;
204                 bucket_fill_obj_table(bd, (void **)&objptr, obj_table,
205                                       n_orphans);
206                 for (i = n_orphans; i < bd->obj_per_bucket; i++,
207                              objptr += bd->total_elt_size) {
208                         rc = rte_ring_enqueue(bd->shared_orphan_ring,
209                                               objptr);
210                         if (rc != 0) {
211                                 RTE_ASSERT(0);
212                                 rte_errno = -rc;
213                                 return rc;
214                         }
215                 }
216         }
217
218         return 0;
219 }
220
221 static int
222 bucket_dequeue_buckets(struct bucket_data *bd, void **obj_table,
223                        unsigned int n_buckets)
224 {
225         struct bucket_stack *cur_stack = bd->buckets[rte_lcore_id()];
226         unsigned int n_buckets_from_stack = RTE_MIN(n_buckets, cur_stack->top);
227         void **obj_table_base = obj_table;
228
229         n_buckets -= n_buckets_from_stack;
230         while (n_buckets_from_stack-- > 0) {
231                 void *obj = bucket_stack_pop_unsafe(cur_stack);
232
233                 obj_table = bucket_fill_obj_table(bd, &obj, obj_table,
234                                                   bd->obj_per_bucket);
235         }
236         while (n_buckets-- > 0) {
237                 struct bucket_header *hdr;
238
239                 if (unlikely(rte_ring_dequeue(bd->shared_bucket_ring,
240                                               (void **)&hdr) != 0)) {
241                         /*
242                          * Return the already-dequeued buffers
243                          * back to the mempool
244                          */
245                         bucket_enqueue(bd->pool, obj_table_base,
246                                        obj_table - obj_table_base);
247                         rte_errno = ENOBUFS;
248                         return -rte_errno;
249                 }
250                 hdr->lcore_id = rte_lcore_id();
251                 obj_table = bucket_fill_obj_table(bd, (void **)&hdr,
252                                                   obj_table,
253                                                   bd->obj_per_bucket);
254         }
255
256         return 0;
257 }
258
259 static int
260 bucket_adopt_orphans(struct bucket_data *bd)
261 {
262         int rc = 0;
263         struct rte_ring *adopt_ring =
264                 bd->adoption_buffer_rings[rte_lcore_id()];
265
266         if (unlikely(!rte_ring_empty(adopt_ring))) {
267                 void *orphan;
268
269                 while (rte_ring_sc_dequeue(adopt_ring, &orphan) == 0) {
270                         rc = bucket_enqueue_single(bd, orphan);
271                         RTE_ASSERT(rc == 0);
272                 }
273         }
274         return rc;
275 }
276
277 static int
278 bucket_dequeue(struct rte_mempool *mp, void **obj_table, unsigned int n)
279 {
280         struct bucket_data *bd = mp->pool_data;
281         unsigned int n_buckets = n / bd->obj_per_bucket;
282         unsigned int n_orphans = n - n_buckets * bd->obj_per_bucket;
283         int rc = 0;
284
285         bucket_adopt_orphans(bd);
286
287         if (unlikely(n_orphans > 0)) {
288                 rc = bucket_dequeue_orphans(bd, obj_table +
289                                             (n_buckets * bd->obj_per_bucket),
290                                             n_orphans);
291                 if (rc != 0)
292                         return rc;
293         }
294
295         if (likely(n_buckets > 0)) {
296                 rc = bucket_dequeue_buckets(bd, obj_table, n_buckets);
297                 if (unlikely(rc != 0) && n_orphans > 0) {
298                         rte_ring_enqueue_bulk(bd->shared_orphan_ring,
299                                               obj_table + (n_buckets *
300                                                            bd->obj_per_bucket),
301                                               n_orphans, NULL);
302                 }
303         }
304
305         return rc;
306 }
307
308 static int
309 bucket_dequeue_contig_blocks(struct rte_mempool *mp, void **first_obj_table,
310                              unsigned int n)
311 {
312         struct bucket_data *bd = mp->pool_data;
313         const uint32_t header_size = bd->header_size;
314         struct bucket_stack *cur_stack = bd->buckets[rte_lcore_id()];
315         unsigned int n_buckets_from_stack = RTE_MIN(n, cur_stack->top);
316         struct bucket_header *hdr;
317         void **first_objp = first_obj_table;
318
319         bucket_adopt_orphans(bd);
320
321         n -= n_buckets_from_stack;
322         while (n_buckets_from_stack-- > 0) {
323                 hdr = bucket_stack_pop_unsafe(cur_stack);
324                 *first_objp++ = (uint8_t *)hdr + header_size;
325         }
326         if (n > 0) {
327                 if (unlikely(rte_ring_dequeue_bulk(bd->shared_bucket_ring,
328                                                    first_objp, n, NULL) != n)) {
329                         /* Return the already dequeued buckets */
330                         while (first_objp-- != first_obj_table) {
331                                 bucket_stack_push(cur_stack,
332                                                   (uint8_t *)*first_objp -
333                                                   header_size);
334                         }
335                         rte_errno = ENOBUFS;
336                         return -rte_errno;
337                 }
338                 while (n-- > 0) {
339                         hdr = (struct bucket_header *)*first_objp;
340                         hdr->lcore_id = rte_lcore_id();
341                         *first_objp++ = (uint8_t *)hdr + header_size;
342                 }
343         }
344
345         return 0;
346 }
347
348 static void
349 count_underfilled_buckets(struct rte_mempool *mp,
350                           void *opaque,
351                           struct rte_mempool_memhdr *memhdr,
352                           __rte_unused unsigned int mem_idx)
353 {
354         unsigned int *pcount = opaque;
355         const struct bucket_data *bd = mp->pool_data;
356         unsigned int bucket_page_sz =
357                 (unsigned int)(~bd->bucket_page_mask + 1);
358         uintptr_t align;
359         uint8_t *iter;
360
361         align = (uintptr_t)RTE_PTR_ALIGN_CEIL(memhdr->addr, bucket_page_sz) -
362                 (uintptr_t)memhdr->addr;
363
364         for (iter = (uint8_t *)memhdr->addr + align;
365              iter < (uint8_t *)memhdr->addr + memhdr->len;
366              iter += bucket_page_sz) {
367                 struct bucket_header *hdr = (struct bucket_header *)iter;
368
369                 *pcount += hdr->fill_cnt;
370         }
371 }
372
373 static unsigned int
374 bucket_get_count(const struct rte_mempool *mp)
375 {
376         const struct bucket_data *bd = mp->pool_data;
377         unsigned int count =
378                 bd->obj_per_bucket * rte_ring_count(bd->shared_bucket_ring) +
379                 rte_ring_count(bd->shared_orphan_ring);
380         unsigned int i;
381
382         for (i = 0; i < RTE_MAX_LCORE; i++) {
383                 if (!rte_lcore_is_enabled(i))
384                         continue;
385                 count += bd->obj_per_bucket * bd->buckets[i]->top +
386                         rte_ring_count(bd->adoption_buffer_rings[i]);
387         }
388
389         rte_mempool_mem_iter((struct rte_mempool *)(uintptr_t)mp,
390                              count_underfilled_buckets, &count);
391
392         return count;
393 }
394
395 static int
396 bucket_alloc(struct rte_mempool *mp)
397 {
398         int rg_flags = 0;
399         int rc = 0;
400         char rg_name[RTE_RING_NAMESIZE];
401         struct bucket_data *bd;
402         unsigned int i;
403         unsigned int bucket_header_size;
404         size_t pg_sz;
405
406         rc = rte_mempool_get_page_size(mp, &pg_sz);
407         if (rc < 0)
408                 return rc;
409
410         bd = rte_zmalloc_socket("bucket_pool", sizeof(*bd),
411                                 RTE_CACHE_LINE_SIZE, mp->socket_id);
412         if (bd == NULL) {
413                 rc = -ENOMEM;
414                 goto no_mem_for_data;
415         }
416         bd->pool = mp;
417         if (mp->flags & MEMPOOL_F_NO_CACHE_ALIGN)
418                 bucket_header_size = sizeof(struct bucket_header);
419         else
420                 bucket_header_size = RTE_CACHE_LINE_SIZE;
421         RTE_BUILD_BUG_ON(sizeof(struct bucket_header) > RTE_CACHE_LINE_SIZE);
422         bd->header_size = mp->header_size + bucket_header_size;
423         bd->total_elt_size = mp->header_size + mp->elt_size + mp->trailer_size;
424         bd->bucket_mem_size = RTE_MIN(pg_sz,
425                         (size_t)(RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB * 1024));
426         bd->obj_per_bucket = (bd->bucket_mem_size - bucket_header_size) /
427                 bd->total_elt_size;
428         bd->bucket_page_mask = ~(rte_align64pow2(bd->bucket_mem_size) - 1);
429         /* eventually this should be a tunable parameter */
430         bd->bucket_stack_thresh = (mp->size / bd->obj_per_bucket) * 4 / 3;
431
432         if (mp->flags & MEMPOOL_F_SP_PUT)
433                 rg_flags |= RING_F_SP_ENQ;
434         if (mp->flags & MEMPOOL_F_SC_GET)
435                 rg_flags |= RING_F_SC_DEQ;
436
437         for (i = 0; i < RTE_MAX_LCORE; i++) {
438                 if (!rte_lcore_is_enabled(i))
439                         continue;
440                 bd->buckets[i] =
441                         bucket_stack_create(mp, mp->size / bd->obj_per_bucket);
442                 if (bd->buckets[i] == NULL) {
443                         rc = -ENOMEM;
444                         goto no_mem_for_stacks;
445                 }
446                 rc = snprintf(rg_name, sizeof(rg_name),
447                               RTE_MEMPOOL_MZ_FORMAT ".a%u", mp->name, i);
448                 if (rc < 0 || rc >= (int)sizeof(rg_name)) {
449                         rc = -ENAMETOOLONG;
450                         goto no_mem_for_stacks;
451                 }
452                 bd->adoption_buffer_rings[i] =
453                         rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
454                                         mp->socket_id,
455                                         rg_flags | RING_F_SC_DEQ);
456                 if (bd->adoption_buffer_rings[i] == NULL) {
457                         rc = -rte_errno;
458                         goto no_mem_for_stacks;
459                 }
460         }
461
462         rc = snprintf(rg_name, sizeof(rg_name),
463                       RTE_MEMPOOL_MZ_FORMAT ".0", mp->name);
464         if (rc < 0 || rc >= (int)sizeof(rg_name)) {
465                 rc = -ENAMETOOLONG;
466                 goto invalid_shared_orphan_ring;
467         }
468         bd->shared_orphan_ring =
469                 rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
470                                 mp->socket_id, rg_flags);
471         if (bd->shared_orphan_ring == NULL) {
472                 rc = -rte_errno;
473                 goto cannot_create_shared_orphan_ring;
474         }
475
476         rc = snprintf(rg_name, sizeof(rg_name),
477                        RTE_MEMPOOL_MZ_FORMAT ".1", mp->name);
478         if (rc < 0 || rc >= (int)sizeof(rg_name)) {
479                 rc = -ENAMETOOLONG;
480                 goto invalid_shared_bucket_ring;
481         }
482         bd->shared_bucket_ring =
483                 rte_ring_create(rg_name,
484                                 rte_align32pow2((mp->size + 1) /
485                                                 bd->obj_per_bucket),
486                                 mp->socket_id, rg_flags);
487         if (bd->shared_bucket_ring == NULL) {
488                 rc = -rte_errno;
489                 goto cannot_create_shared_bucket_ring;
490         }
491
492         mp->pool_data = bd;
493
494         return 0;
495
496 cannot_create_shared_bucket_ring:
497 invalid_shared_bucket_ring:
498         rte_ring_free(bd->shared_orphan_ring);
499 cannot_create_shared_orphan_ring:
500 invalid_shared_orphan_ring:
501 no_mem_for_stacks:
502         for (i = 0; i < RTE_MAX_LCORE; i++) {
503                 rte_free(bd->buckets[i]);
504                 rte_ring_free(bd->adoption_buffer_rings[i]);
505         }
506         rte_free(bd);
507 no_mem_for_data:
508         rte_errno = -rc;
509         return rc;
510 }
511
512 static void
513 bucket_free(struct rte_mempool *mp)
514 {
515         unsigned int i;
516         struct bucket_data *bd = mp->pool_data;
517
518         if (bd == NULL)
519                 return;
520
521         for (i = 0; i < RTE_MAX_LCORE; i++) {
522                 rte_free(bd->buckets[i]);
523                 rte_ring_free(bd->adoption_buffer_rings[i]);
524         }
525
526         rte_ring_free(bd->shared_orphan_ring);
527         rte_ring_free(bd->shared_bucket_ring);
528
529         rte_free(bd);
530 }
531
532 static ssize_t
533 bucket_calc_mem_size(const struct rte_mempool *mp, uint32_t obj_num,
534                      __rte_unused uint32_t pg_shift, size_t *min_total_elt_size,
535                      size_t *align)
536 {
537         struct bucket_data *bd = mp->pool_data;
538         unsigned int bucket_page_sz;
539
540         if (bd == NULL)
541                 return -EINVAL;
542
543         bucket_page_sz = rte_align32pow2(bd->bucket_mem_size);
544         *align = bucket_page_sz;
545         *min_total_elt_size = bucket_page_sz;
546         /*
547          * Each bucket occupies its own block aligned to
548          * bucket_page_sz, so the required amount of memory is
549          * a multiple of bucket_page_sz.
550          * We also need extra space for a bucket header
551          */
552         return ((obj_num + bd->obj_per_bucket - 1) /
553                 bd->obj_per_bucket) * bucket_page_sz;
554 }
555
556 static int
557 bucket_populate(struct rte_mempool *mp, unsigned int max_objs,
558                 void *vaddr, rte_iova_t iova, size_t len,
559                 rte_mempool_populate_obj_cb_t *obj_cb, void *obj_cb_arg)
560 {
561         struct bucket_data *bd = mp->pool_data;
562         unsigned int bucket_page_sz;
563         unsigned int bucket_header_sz;
564         unsigned int n_objs;
565         uintptr_t align;
566         uint8_t *iter;
567         int rc;
568
569         if (bd == NULL)
570                 return -EINVAL;
571
572         bucket_page_sz = rte_align32pow2(bd->bucket_mem_size);
573         align = RTE_PTR_ALIGN_CEIL((uintptr_t)vaddr, bucket_page_sz) -
574                 (uintptr_t)vaddr;
575
576         bucket_header_sz = bd->header_size - mp->header_size;
577         if (iova != RTE_BAD_IOVA)
578                 iova += align + bucket_header_sz;
579
580         for (iter = (uint8_t *)vaddr + align, n_objs = 0;
581              iter < (uint8_t *)vaddr + len && n_objs < max_objs;
582              iter += bucket_page_sz) {
583                 struct bucket_header *hdr = (struct bucket_header *)iter;
584                 unsigned int chunk_len = bd->bucket_mem_size;
585
586                 if ((size_t)(iter - (uint8_t *)vaddr) + chunk_len > len)
587                         chunk_len = len - (iter - (uint8_t *)vaddr);
588                 if (chunk_len <= bucket_header_sz)
589                         break;
590                 chunk_len -= bucket_header_sz;
591
592                 hdr->fill_cnt = 0;
593                 hdr->lcore_id = LCORE_ID_ANY;
594                 rc = rte_mempool_op_populate_helper(mp, 0,
595                                                      RTE_MIN(bd->obj_per_bucket,
596                                                              max_objs - n_objs),
597                                                      iter + bucket_header_sz,
598                                                      iova, chunk_len,
599                                                      obj_cb, obj_cb_arg);
600                 if (rc < 0)
601                         return rc;
602                 n_objs += rc;
603                 if (iova != RTE_BAD_IOVA)
604                         iova += bucket_page_sz;
605         }
606
607         return n_objs;
608 }
609
610 static int
611 bucket_get_info(const struct rte_mempool *mp, struct rte_mempool_info *info)
612 {
613         struct bucket_data *bd = mp->pool_data;
614
615         info->contig_block_size = bd->obj_per_bucket;
616         return 0;
617 }
618
619
620 static const struct rte_mempool_ops ops_bucket = {
621         .name = "bucket",
622         .alloc = bucket_alloc,
623         .free = bucket_free,
624         .enqueue = bucket_enqueue,
625         .dequeue = bucket_dequeue,
626         .get_count = bucket_get_count,
627         .calc_mem_size = bucket_calc_mem_size,
628         .populate = bucket_populate,
629         .get_info = bucket_get_info,
630         .dequeue_contig_blocks = bucket_dequeue_contig_blocks,
631 };
632
633
634 MEMPOOL_REGISTER_OPS(ops_bucket);