mempool/bucket: implement bucket mempool manager
[dpdk.git] / drivers / mempool / bucket / rte_mempool_bucket.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2017-2018 Solarflare Communications Inc.
4  * All rights reserved.
5  *
6  * This software was jointly developed between OKTET Labs (under contract
7  * for Solarflare) and Solarflare Communications, Inc.
8  */
9
10 #include <stdbool.h>
11 #include <stdio.h>
12 #include <string.h>
13
14 #include <rte_errno.h>
15 #include <rte_ring.h>
16 #include <rte_mempool.h>
17 #include <rte_malloc.h>
18
19 /*
20  * The general idea of the bucket mempool driver is as follows.
21  * We keep track of physically contiguous groups (buckets) of objects
22  * of a certain size. Every such a group has a counter that is
23  * incremented every time an object from that group is enqueued.
24  * Until the bucket is full, no objects from it are eligible for allocation.
25  * If a request is made to dequeue a multiply of bucket size, it is
26  * satisfied by returning the whole buckets, instead of separate objects.
27  */
28
29
30 struct bucket_header {
31         unsigned int lcore_id;
32         uint8_t fill_cnt;
33 };
34
35 struct bucket_stack {
36         unsigned int top;
37         unsigned int limit;
38         void *objects[];
39 };
40
41 struct bucket_data {
42         unsigned int header_size;
43         unsigned int total_elt_size;
44         unsigned int obj_per_bucket;
45         uintptr_t bucket_page_mask;
46         struct rte_ring *shared_bucket_ring;
47         struct bucket_stack *buckets[RTE_MAX_LCORE];
48         /*
49          * Multi-producer single-consumer ring to hold objects that are
50          * returned to the mempool at a different lcore than initially
51          * dequeued
52          */
53         struct rte_ring *adoption_buffer_rings[RTE_MAX_LCORE];
54         struct rte_ring *shared_orphan_ring;
55         struct rte_mempool *pool;
56         unsigned int bucket_mem_size;
57 };
58
59 static struct bucket_stack *
60 bucket_stack_create(const struct rte_mempool *mp, unsigned int n_elts)
61 {
62         struct bucket_stack *stack;
63
64         stack = rte_zmalloc_socket("bucket_stack",
65                                    sizeof(struct bucket_stack) +
66                                    n_elts * sizeof(void *),
67                                    RTE_CACHE_LINE_SIZE,
68                                    mp->socket_id);
69         if (stack == NULL)
70                 return NULL;
71         stack->limit = n_elts;
72         stack->top = 0;
73
74         return stack;
75 }
76
77 static void
78 bucket_stack_push(struct bucket_stack *stack, void *obj)
79 {
80         RTE_ASSERT(stack->top < stack->limit);
81         stack->objects[stack->top++] = obj;
82 }
83
84 static void *
85 bucket_stack_pop_unsafe(struct bucket_stack *stack)
86 {
87         RTE_ASSERT(stack->top > 0);
88         return stack->objects[--stack->top];
89 }
90
91 static void *
92 bucket_stack_pop(struct bucket_stack *stack)
93 {
94         if (stack->top == 0)
95                 return NULL;
96         return bucket_stack_pop_unsafe(stack);
97 }
98
99 static int
100 bucket_enqueue_single(struct bucket_data *bd, void *obj)
101 {
102         int rc = 0;
103         uintptr_t addr = (uintptr_t)obj;
104         struct bucket_header *hdr;
105         unsigned int lcore_id = rte_lcore_id();
106
107         addr &= bd->bucket_page_mask;
108         hdr = (struct bucket_header *)addr;
109
110         if (likely(hdr->lcore_id == lcore_id)) {
111                 if (hdr->fill_cnt < bd->obj_per_bucket - 1) {
112                         hdr->fill_cnt++;
113                 } else {
114                         hdr->fill_cnt = 0;
115                         /* Stack is big enough to put all buckets */
116                         bucket_stack_push(bd->buckets[lcore_id], hdr);
117                 }
118         } else if (hdr->lcore_id != LCORE_ID_ANY) {
119                 struct rte_ring *adopt_ring =
120                         bd->adoption_buffer_rings[hdr->lcore_id];
121
122                 rc = rte_ring_enqueue(adopt_ring, obj);
123                 /* Ring is big enough to put all objects */
124                 RTE_ASSERT(rc == 0);
125         } else if (hdr->fill_cnt < bd->obj_per_bucket - 1) {
126                 hdr->fill_cnt++;
127         } else {
128                 hdr->fill_cnt = 0;
129                 rc = rte_ring_enqueue(bd->shared_bucket_ring, hdr);
130                 /* Ring is big enough to put all buckets */
131                 RTE_ASSERT(rc == 0);
132         }
133
134         return rc;
135 }
136
137 static int
138 bucket_enqueue(struct rte_mempool *mp, void * const *obj_table,
139                unsigned int n)
140 {
141         struct bucket_data *bd = mp->pool_data;
142         unsigned int i;
143         int rc = 0;
144
145         for (i = 0; i < n; i++) {
146                 rc = bucket_enqueue_single(bd, obj_table[i]);
147                 RTE_ASSERT(rc == 0);
148         }
149         return rc;
150 }
151
152 static void **
153 bucket_fill_obj_table(const struct bucket_data *bd, void **pstart,
154                       void **obj_table, unsigned int n)
155 {
156         unsigned int i;
157         uint8_t *objptr = *pstart;
158
159         for (objptr += bd->header_size, i = 0; i < n;
160              i++, objptr += bd->total_elt_size)
161                 *obj_table++ = objptr;
162         *pstart = objptr;
163         return obj_table;
164 }
165
166 static int
167 bucket_dequeue_orphans(struct bucket_data *bd, void **obj_table,
168                        unsigned int n_orphans)
169 {
170         unsigned int i;
171         int rc;
172         uint8_t *objptr;
173
174         rc = rte_ring_dequeue_bulk(bd->shared_orphan_ring, obj_table,
175                                    n_orphans, NULL);
176         if (unlikely(rc != (int)n_orphans)) {
177                 struct bucket_header *hdr;
178
179                 objptr = bucket_stack_pop(bd->buckets[rte_lcore_id()]);
180                 hdr = (struct bucket_header *)objptr;
181
182                 if (objptr == NULL) {
183                         rc = rte_ring_dequeue(bd->shared_bucket_ring,
184                                               (void **)&objptr);
185                         if (rc != 0) {
186                                 rte_errno = ENOBUFS;
187                                 return -rte_errno;
188                         }
189                         hdr = (struct bucket_header *)objptr;
190                         hdr->lcore_id = rte_lcore_id();
191                 }
192                 hdr->fill_cnt = 0;
193                 bucket_fill_obj_table(bd, (void **)&objptr, obj_table,
194                                       n_orphans);
195                 for (i = n_orphans; i < bd->obj_per_bucket; i++,
196                              objptr += bd->total_elt_size) {
197                         rc = rte_ring_enqueue(bd->shared_orphan_ring,
198                                               objptr);
199                         if (rc != 0) {
200                                 RTE_ASSERT(0);
201                                 rte_errno = -rc;
202                                 return rc;
203                         }
204                 }
205         }
206
207         return 0;
208 }
209
210 static int
211 bucket_dequeue_buckets(struct bucket_data *bd, void **obj_table,
212                        unsigned int n_buckets)
213 {
214         struct bucket_stack *cur_stack = bd->buckets[rte_lcore_id()];
215         unsigned int n_buckets_from_stack = RTE_MIN(n_buckets, cur_stack->top);
216         void **obj_table_base = obj_table;
217
218         n_buckets -= n_buckets_from_stack;
219         while (n_buckets_from_stack-- > 0) {
220                 void *obj = bucket_stack_pop_unsafe(cur_stack);
221
222                 obj_table = bucket_fill_obj_table(bd, &obj, obj_table,
223                                                   bd->obj_per_bucket);
224         }
225         while (n_buckets-- > 0) {
226                 struct bucket_header *hdr;
227
228                 if (unlikely(rte_ring_dequeue(bd->shared_bucket_ring,
229                                               (void **)&hdr) != 0)) {
230                         /*
231                          * Return the already-dequeued buffers
232                          * back to the mempool
233                          */
234                         bucket_enqueue(bd->pool, obj_table_base,
235                                        obj_table - obj_table_base);
236                         rte_errno = ENOBUFS;
237                         return -rte_errno;
238                 }
239                 hdr->lcore_id = rte_lcore_id();
240                 obj_table = bucket_fill_obj_table(bd, (void **)&hdr,
241                                                   obj_table,
242                                                   bd->obj_per_bucket);
243         }
244
245         return 0;
246 }
247
248 static int
249 bucket_adopt_orphans(struct bucket_data *bd)
250 {
251         int rc = 0;
252         struct rte_ring *adopt_ring =
253                 bd->adoption_buffer_rings[rte_lcore_id()];
254
255         if (unlikely(!rte_ring_empty(adopt_ring))) {
256                 void *orphan;
257
258                 while (rte_ring_sc_dequeue(adopt_ring, &orphan) == 0) {
259                         rc = bucket_enqueue_single(bd, orphan);
260                         RTE_ASSERT(rc == 0);
261                 }
262         }
263         return rc;
264 }
265
266 static int
267 bucket_dequeue(struct rte_mempool *mp, void **obj_table, unsigned int n)
268 {
269         struct bucket_data *bd = mp->pool_data;
270         unsigned int n_buckets = n / bd->obj_per_bucket;
271         unsigned int n_orphans = n - n_buckets * bd->obj_per_bucket;
272         int rc = 0;
273
274         bucket_adopt_orphans(bd);
275
276         if (unlikely(n_orphans > 0)) {
277                 rc = bucket_dequeue_orphans(bd, obj_table +
278                                             (n_buckets * bd->obj_per_bucket),
279                                             n_orphans);
280                 if (rc != 0)
281                         return rc;
282         }
283
284         if (likely(n_buckets > 0)) {
285                 rc = bucket_dequeue_buckets(bd, obj_table, n_buckets);
286                 if (unlikely(rc != 0) && n_orphans > 0) {
287                         rte_ring_enqueue_bulk(bd->shared_orphan_ring,
288                                               obj_table + (n_buckets *
289                                                            bd->obj_per_bucket),
290                                               n_orphans, NULL);
291                 }
292         }
293
294         return rc;
295 }
296
297 static void
298 count_underfilled_buckets(struct rte_mempool *mp,
299                           void *opaque,
300                           struct rte_mempool_memhdr *memhdr,
301                           __rte_unused unsigned int mem_idx)
302 {
303         unsigned int *pcount = opaque;
304         const struct bucket_data *bd = mp->pool_data;
305         unsigned int bucket_page_sz =
306                 (unsigned int)(~bd->bucket_page_mask + 1);
307         uintptr_t align;
308         uint8_t *iter;
309
310         align = (uintptr_t)RTE_PTR_ALIGN_CEIL(memhdr->addr, bucket_page_sz) -
311                 (uintptr_t)memhdr->addr;
312
313         for (iter = (uint8_t *)memhdr->addr + align;
314              iter < (uint8_t *)memhdr->addr + memhdr->len;
315              iter += bucket_page_sz) {
316                 struct bucket_header *hdr = (struct bucket_header *)iter;
317
318                 *pcount += hdr->fill_cnt;
319         }
320 }
321
322 static unsigned int
323 bucket_get_count(const struct rte_mempool *mp)
324 {
325         const struct bucket_data *bd = mp->pool_data;
326         unsigned int count =
327                 bd->obj_per_bucket * rte_ring_count(bd->shared_bucket_ring) +
328                 rte_ring_count(bd->shared_orphan_ring);
329         unsigned int i;
330
331         for (i = 0; i < RTE_MAX_LCORE; i++) {
332                 if (!rte_lcore_is_enabled(i))
333                         continue;
334                 count += bd->obj_per_bucket * bd->buckets[i]->top +
335                         rte_ring_count(bd->adoption_buffer_rings[i]);
336         }
337
338         rte_mempool_mem_iter((struct rte_mempool *)(uintptr_t)mp,
339                              count_underfilled_buckets, &count);
340
341         return count;
342 }
343
344 static int
345 bucket_alloc(struct rte_mempool *mp)
346 {
347         int rg_flags = 0;
348         int rc = 0;
349         char rg_name[RTE_RING_NAMESIZE];
350         struct bucket_data *bd;
351         unsigned int i;
352         unsigned int bucket_header_size;
353
354         bd = rte_zmalloc_socket("bucket_pool", sizeof(*bd),
355                                 RTE_CACHE_LINE_SIZE, mp->socket_id);
356         if (bd == NULL) {
357                 rc = -ENOMEM;
358                 goto no_mem_for_data;
359         }
360         bd->pool = mp;
361         if (mp->flags & MEMPOOL_F_NO_CACHE_ALIGN)
362                 bucket_header_size = sizeof(struct bucket_header);
363         else
364                 bucket_header_size = RTE_CACHE_LINE_SIZE;
365         RTE_BUILD_BUG_ON(sizeof(struct bucket_header) > RTE_CACHE_LINE_SIZE);
366         bd->header_size = mp->header_size + bucket_header_size;
367         bd->total_elt_size = mp->header_size + mp->elt_size + mp->trailer_size;
368         bd->bucket_mem_size = RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB * 1024;
369         bd->obj_per_bucket = (bd->bucket_mem_size - bucket_header_size) /
370                 bd->total_elt_size;
371         bd->bucket_page_mask = ~(rte_align64pow2(bd->bucket_mem_size) - 1);
372
373         if (mp->flags & MEMPOOL_F_SP_PUT)
374                 rg_flags |= RING_F_SP_ENQ;
375         if (mp->flags & MEMPOOL_F_SC_GET)
376                 rg_flags |= RING_F_SC_DEQ;
377
378         for (i = 0; i < RTE_MAX_LCORE; i++) {
379                 if (!rte_lcore_is_enabled(i))
380                         continue;
381                 bd->buckets[i] =
382                         bucket_stack_create(mp, mp->size / bd->obj_per_bucket);
383                 if (bd->buckets[i] == NULL) {
384                         rc = -ENOMEM;
385                         goto no_mem_for_stacks;
386                 }
387                 rc = snprintf(rg_name, sizeof(rg_name),
388                               RTE_MEMPOOL_MZ_FORMAT ".a%u", mp->name, i);
389                 if (rc < 0 || rc >= (int)sizeof(rg_name)) {
390                         rc = -ENAMETOOLONG;
391                         goto no_mem_for_stacks;
392                 }
393                 bd->adoption_buffer_rings[i] =
394                         rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
395                                         mp->socket_id,
396                                         rg_flags | RING_F_SC_DEQ);
397                 if (bd->adoption_buffer_rings[i] == NULL) {
398                         rc = -rte_errno;
399                         goto no_mem_for_stacks;
400                 }
401         }
402
403         rc = snprintf(rg_name, sizeof(rg_name),
404                       RTE_MEMPOOL_MZ_FORMAT ".0", mp->name);
405         if (rc < 0 || rc >= (int)sizeof(rg_name)) {
406                 rc = -ENAMETOOLONG;
407                 goto invalid_shared_orphan_ring;
408         }
409         bd->shared_orphan_ring =
410                 rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
411                                 mp->socket_id, rg_flags);
412         if (bd->shared_orphan_ring == NULL) {
413                 rc = -rte_errno;
414                 goto cannot_create_shared_orphan_ring;
415         }
416
417         rc = snprintf(rg_name, sizeof(rg_name),
418                        RTE_MEMPOOL_MZ_FORMAT ".1", mp->name);
419         if (rc < 0 || rc >= (int)sizeof(rg_name)) {
420                 rc = -ENAMETOOLONG;
421                 goto invalid_shared_bucket_ring;
422         }
423         bd->shared_bucket_ring =
424                 rte_ring_create(rg_name,
425                                 rte_align32pow2((mp->size + 1) /
426                                                 bd->obj_per_bucket),
427                                 mp->socket_id, rg_flags);
428         if (bd->shared_bucket_ring == NULL) {
429                 rc = -rte_errno;
430                 goto cannot_create_shared_bucket_ring;
431         }
432
433         mp->pool_data = bd;
434
435         return 0;
436
437 cannot_create_shared_bucket_ring:
438 invalid_shared_bucket_ring:
439         rte_ring_free(bd->shared_orphan_ring);
440 cannot_create_shared_orphan_ring:
441 invalid_shared_orphan_ring:
442 no_mem_for_stacks:
443         for (i = 0; i < RTE_MAX_LCORE; i++) {
444                 rte_free(bd->buckets[i]);
445                 rte_ring_free(bd->adoption_buffer_rings[i]);
446         }
447         rte_free(bd);
448 no_mem_for_data:
449         rte_errno = -rc;
450         return rc;
451 }
452
453 static void
454 bucket_free(struct rte_mempool *mp)
455 {
456         unsigned int i;
457         struct bucket_data *bd = mp->pool_data;
458
459         if (bd == NULL)
460                 return;
461
462         for (i = 0; i < RTE_MAX_LCORE; i++) {
463                 rte_free(bd->buckets[i]);
464                 rte_ring_free(bd->adoption_buffer_rings[i]);
465         }
466
467         rte_ring_free(bd->shared_orphan_ring);
468         rte_ring_free(bd->shared_bucket_ring);
469
470         rte_free(bd);
471 }
472
473 static ssize_t
474 bucket_calc_mem_size(const struct rte_mempool *mp, uint32_t obj_num,
475                      __rte_unused uint32_t pg_shift, size_t *min_total_elt_size,
476                      size_t *align)
477 {
478         struct bucket_data *bd = mp->pool_data;
479         unsigned int bucket_page_sz;
480
481         if (bd == NULL)
482                 return -EINVAL;
483
484         bucket_page_sz = rte_align32pow2(bd->bucket_mem_size);
485         *align = bucket_page_sz;
486         *min_total_elt_size = bucket_page_sz;
487         /*
488          * Each bucket occupies its own block aligned to
489          * bucket_page_sz, so the required amount of memory is
490          * a multiple of bucket_page_sz.
491          * We also need extra space for a bucket header
492          */
493         return ((obj_num + bd->obj_per_bucket - 1) /
494                 bd->obj_per_bucket) * bucket_page_sz;
495 }
496
497 static int
498 bucket_populate(struct rte_mempool *mp, unsigned int max_objs,
499                 void *vaddr, rte_iova_t iova, size_t len,
500                 rte_mempool_populate_obj_cb_t *obj_cb, void *obj_cb_arg)
501 {
502         struct bucket_data *bd = mp->pool_data;
503         unsigned int bucket_page_sz;
504         unsigned int bucket_header_sz;
505         unsigned int n_objs;
506         uintptr_t align;
507         uint8_t *iter;
508         int rc;
509
510         if (bd == NULL)
511                 return -EINVAL;
512
513         bucket_page_sz = rte_align32pow2(bd->bucket_mem_size);
514         align = RTE_PTR_ALIGN_CEIL((uintptr_t)vaddr, bucket_page_sz) -
515                 (uintptr_t)vaddr;
516
517         bucket_header_sz = bd->header_size - mp->header_size;
518         if (iova != RTE_BAD_IOVA)
519                 iova += align + bucket_header_sz;
520
521         for (iter = (uint8_t *)vaddr + align, n_objs = 0;
522              iter < (uint8_t *)vaddr + len && n_objs < max_objs;
523              iter += bucket_page_sz) {
524                 struct bucket_header *hdr = (struct bucket_header *)iter;
525                 unsigned int chunk_len = bd->bucket_mem_size;
526
527                 if ((size_t)(iter - (uint8_t *)vaddr) + chunk_len > len)
528                         chunk_len = len - (iter - (uint8_t *)vaddr);
529                 if (chunk_len <= bucket_header_sz)
530                         break;
531                 chunk_len -= bucket_header_sz;
532
533                 hdr->fill_cnt = 0;
534                 hdr->lcore_id = LCORE_ID_ANY;
535                 rc = rte_mempool_op_populate_default(mp,
536                                                      RTE_MIN(bd->obj_per_bucket,
537                                                              max_objs - n_objs),
538                                                      iter + bucket_header_sz,
539                                                      iova, chunk_len,
540                                                      obj_cb, obj_cb_arg);
541                 if (rc < 0)
542                         return rc;
543                 n_objs += rc;
544                 if (iova != RTE_BAD_IOVA)
545                         iova += bucket_page_sz;
546         }
547
548         return n_objs;
549 }
550
551 static const struct rte_mempool_ops ops_bucket = {
552         .name = "bucket",
553         .alloc = bucket_alloc,
554         .free = bucket_free,
555         .enqueue = bucket_enqueue,
556         .dequeue = bucket_dequeue,
557         .get_count = bucket_get_count,
558         .calc_mem_size = bucket_calc_mem_size,
559         .populate = bucket_populate,
560 };
561
562
563 MEMPOOL_REGISTER_OPS(ops_bucket);