mempool/bucket: implement block dequeue operation
[dpdk.git] / drivers / mempool / bucket / rte_mempool_bucket.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2017-2018 Solarflare Communications Inc.
4  * All rights reserved.
5  *
6  * This software was jointly developed between OKTET Labs (under contract
7  * for Solarflare) and Solarflare Communications, Inc.
8  */
9
10 #include <stdbool.h>
11 #include <stdio.h>
12 #include <string.h>
13
14 #include <rte_errno.h>
15 #include <rte_ring.h>
16 #include <rte_mempool.h>
17 #include <rte_malloc.h>
18
19 /*
20  * The general idea of the bucket mempool driver is as follows.
21  * We keep track of physically contiguous groups (buckets) of objects
22  * of a certain size. Every such a group has a counter that is
23  * incremented every time an object from that group is enqueued.
24  * Until the bucket is full, no objects from it are eligible for allocation.
25  * If a request is made to dequeue a multiply of bucket size, it is
26  * satisfied by returning the whole buckets, instead of separate objects.
27  */
28
29
30 struct bucket_header {
31         unsigned int lcore_id;
32         uint8_t fill_cnt;
33 };
34
35 struct bucket_stack {
36         unsigned int top;
37         unsigned int limit;
38         void *objects[];
39 };
40
41 struct bucket_data {
42         unsigned int header_size;
43         unsigned int total_elt_size;
44         unsigned int obj_per_bucket;
45         uintptr_t bucket_page_mask;
46         struct rte_ring *shared_bucket_ring;
47         struct bucket_stack *buckets[RTE_MAX_LCORE];
48         /*
49          * Multi-producer single-consumer ring to hold objects that are
50          * returned to the mempool at a different lcore than initially
51          * dequeued
52          */
53         struct rte_ring *adoption_buffer_rings[RTE_MAX_LCORE];
54         struct rte_ring *shared_orphan_ring;
55         struct rte_mempool *pool;
56         unsigned int bucket_mem_size;
57 };
58
59 static struct bucket_stack *
60 bucket_stack_create(const struct rte_mempool *mp, unsigned int n_elts)
61 {
62         struct bucket_stack *stack;
63
64         stack = rte_zmalloc_socket("bucket_stack",
65                                    sizeof(struct bucket_stack) +
66                                    n_elts * sizeof(void *),
67                                    RTE_CACHE_LINE_SIZE,
68                                    mp->socket_id);
69         if (stack == NULL)
70                 return NULL;
71         stack->limit = n_elts;
72         stack->top = 0;
73
74         return stack;
75 }
76
77 static void
78 bucket_stack_push(struct bucket_stack *stack, void *obj)
79 {
80         RTE_ASSERT(stack->top < stack->limit);
81         stack->objects[stack->top++] = obj;
82 }
83
84 static void *
85 bucket_stack_pop_unsafe(struct bucket_stack *stack)
86 {
87         RTE_ASSERT(stack->top > 0);
88         return stack->objects[--stack->top];
89 }
90
91 static void *
92 bucket_stack_pop(struct bucket_stack *stack)
93 {
94         if (stack->top == 0)
95                 return NULL;
96         return bucket_stack_pop_unsafe(stack);
97 }
98
99 static int
100 bucket_enqueue_single(struct bucket_data *bd, void *obj)
101 {
102         int rc = 0;
103         uintptr_t addr = (uintptr_t)obj;
104         struct bucket_header *hdr;
105         unsigned int lcore_id = rte_lcore_id();
106
107         addr &= bd->bucket_page_mask;
108         hdr = (struct bucket_header *)addr;
109
110         if (likely(hdr->lcore_id == lcore_id)) {
111                 if (hdr->fill_cnt < bd->obj_per_bucket - 1) {
112                         hdr->fill_cnt++;
113                 } else {
114                         hdr->fill_cnt = 0;
115                         /* Stack is big enough to put all buckets */
116                         bucket_stack_push(bd->buckets[lcore_id], hdr);
117                 }
118         } else if (hdr->lcore_id != LCORE_ID_ANY) {
119                 struct rte_ring *adopt_ring =
120                         bd->adoption_buffer_rings[hdr->lcore_id];
121
122                 rc = rte_ring_enqueue(adopt_ring, obj);
123                 /* Ring is big enough to put all objects */
124                 RTE_ASSERT(rc == 0);
125         } else if (hdr->fill_cnt < bd->obj_per_bucket - 1) {
126                 hdr->fill_cnt++;
127         } else {
128                 hdr->fill_cnt = 0;
129                 rc = rte_ring_enqueue(bd->shared_bucket_ring, hdr);
130                 /* Ring is big enough to put all buckets */
131                 RTE_ASSERT(rc == 0);
132         }
133
134         return rc;
135 }
136
137 static int
138 bucket_enqueue(struct rte_mempool *mp, void * const *obj_table,
139                unsigned int n)
140 {
141         struct bucket_data *bd = mp->pool_data;
142         unsigned int i;
143         int rc = 0;
144
145         for (i = 0; i < n; i++) {
146                 rc = bucket_enqueue_single(bd, obj_table[i]);
147                 RTE_ASSERT(rc == 0);
148         }
149         return rc;
150 }
151
152 static void **
153 bucket_fill_obj_table(const struct bucket_data *bd, void **pstart,
154                       void **obj_table, unsigned int n)
155 {
156         unsigned int i;
157         uint8_t *objptr = *pstart;
158
159         for (objptr += bd->header_size, i = 0; i < n;
160              i++, objptr += bd->total_elt_size)
161                 *obj_table++ = objptr;
162         *pstart = objptr;
163         return obj_table;
164 }
165
166 static int
167 bucket_dequeue_orphans(struct bucket_data *bd, void **obj_table,
168                        unsigned int n_orphans)
169 {
170         unsigned int i;
171         int rc;
172         uint8_t *objptr;
173
174         rc = rte_ring_dequeue_bulk(bd->shared_orphan_ring, obj_table,
175                                    n_orphans, NULL);
176         if (unlikely(rc != (int)n_orphans)) {
177                 struct bucket_header *hdr;
178
179                 objptr = bucket_stack_pop(bd->buckets[rte_lcore_id()]);
180                 hdr = (struct bucket_header *)objptr;
181
182                 if (objptr == NULL) {
183                         rc = rte_ring_dequeue(bd->shared_bucket_ring,
184                                               (void **)&objptr);
185                         if (rc != 0) {
186                                 rte_errno = ENOBUFS;
187                                 return -rte_errno;
188                         }
189                         hdr = (struct bucket_header *)objptr;
190                         hdr->lcore_id = rte_lcore_id();
191                 }
192                 hdr->fill_cnt = 0;
193                 bucket_fill_obj_table(bd, (void **)&objptr, obj_table,
194                                       n_orphans);
195                 for (i = n_orphans; i < bd->obj_per_bucket; i++,
196                              objptr += bd->total_elt_size) {
197                         rc = rte_ring_enqueue(bd->shared_orphan_ring,
198                                               objptr);
199                         if (rc != 0) {
200                                 RTE_ASSERT(0);
201                                 rte_errno = -rc;
202                                 return rc;
203                         }
204                 }
205         }
206
207         return 0;
208 }
209
210 static int
211 bucket_dequeue_buckets(struct bucket_data *bd, void **obj_table,
212                        unsigned int n_buckets)
213 {
214         struct bucket_stack *cur_stack = bd->buckets[rte_lcore_id()];
215         unsigned int n_buckets_from_stack = RTE_MIN(n_buckets, cur_stack->top);
216         void **obj_table_base = obj_table;
217
218         n_buckets -= n_buckets_from_stack;
219         while (n_buckets_from_stack-- > 0) {
220                 void *obj = bucket_stack_pop_unsafe(cur_stack);
221
222                 obj_table = bucket_fill_obj_table(bd, &obj, obj_table,
223                                                   bd->obj_per_bucket);
224         }
225         while (n_buckets-- > 0) {
226                 struct bucket_header *hdr;
227
228                 if (unlikely(rte_ring_dequeue(bd->shared_bucket_ring,
229                                               (void **)&hdr) != 0)) {
230                         /*
231                          * Return the already-dequeued buffers
232                          * back to the mempool
233                          */
234                         bucket_enqueue(bd->pool, obj_table_base,
235                                        obj_table - obj_table_base);
236                         rte_errno = ENOBUFS;
237                         return -rte_errno;
238                 }
239                 hdr->lcore_id = rte_lcore_id();
240                 obj_table = bucket_fill_obj_table(bd, (void **)&hdr,
241                                                   obj_table,
242                                                   bd->obj_per_bucket);
243         }
244
245         return 0;
246 }
247
248 static int
249 bucket_adopt_orphans(struct bucket_data *bd)
250 {
251         int rc = 0;
252         struct rte_ring *adopt_ring =
253                 bd->adoption_buffer_rings[rte_lcore_id()];
254
255         if (unlikely(!rte_ring_empty(adopt_ring))) {
256                 void *orphan;
257
258                 while (rte_ring_sc_dequeue(adopt_ring, &orphan) == 0) {
259                         rc = bucket_enqueue_single(bd, orphan);
260                         RTE_ASSERT(rc == 0);
261                 }
262         }
263         return rc;
264 }
265
266 static int
267 bucket_dequeue(struct rte_mempool *mp, void **obj_table, unsigned int n)
268 {
269         struct bucket_data *bd = mp->pool_data;
270         unsigned int n_buckets = n / bd->obj_per_bucket;
271         unsigned int n_orphans = n - n_buckets * bd->obj_per_bucket;
272         int rc = 0;
273
274         bucket_adopt_orphans(bd);
275
276         if (unlikely(n_orphans > 0)) {
277                 rc = bucket_dequeue_orphans(bd, obj_table +
278                                             (n_buckets * bd->obj_per_bucket),
279                                             n_orphans);
280                 if (rc != 0)
281                         return rc;
282         }
283
284         if (likely(n_buckets > 0)) {
285                 rc = bucket_dequeue_buckets(bd, obj_table, n_buckets);
286                 if (unlikely(rc != 0) && n_orphans > 0) {
287                         rte_ring_enqueue_bulk(bd->shared_orphan_ring,
288                                               obj_table + (n_buckets *
289                                                            bd->obj_per_bucket),
290                                               n_orphans, NULL);
291                 }
292         }
293
294         return rc;
295 }
296
297 static int
298 bucket_dequeue_contig_blocks(struct rte_mempool *mp, void **first_obj_table,
299                              unsigned int n)
300 {
301         struct bucket_data *bd = mp->pool_data;
302         const uint32_t header_size = bd->header_size;
303         struct bucket_stack *cur_stack = bd->buckets[rte_lcore_id()];
304         unsigned int n_buckets_from_stack = RTE_MIN(n, cur_stack->top);
305         struct bucket_header *hdr;
306         void **first_objp = first_obj_table;
307
308         bucket_adopt_orphans(bd);
309
310         n -= n_buckets_from_stack;
311         while (n_buckets_from_stack-- > 0) {
312                 hdr = bucket_stack_pop_unsafe(cur_stack);
313                 *first_objp++ = (uint8_t *)hdr + header_size;
314         }
315         if (n > 0) {
316                 if (unlikely(rte_ring_dequeue_bulk(bd->shared_bucket_ring,
317                                                    first_objp, n, NULL) != n)) {
318                         /* Return the already dequeued buckets */
319                         while (first_objp-- != first_obj_table) {
320                                 bucket_stack_push(cur_stack,
321                                                   (uint8_t *)*first_objp -
322                                                   header_size);
323                         }
324                         rte_errno = ENOBUFS;
325                         return -rte_errno;
326                 }
327                 while (n-- > 0) {
328                         hdr = (struct bucket_header *)*first_objp;
329                         hdr->lcore_id = rte_lcore_id();
330                         *first_objp++ = (uint8_t *)hdr + header_size;
331                 }
332         }
333
334         return 0;
335 }
336
337 static void
338 count_underfilled_buckets(struct rte_mempool *mp,
339                           void *opaque,
340                           struct rte_mempool_memhdr *memhdr,
341                           __rte_unused unsigned int mem_idx)
342 {
343         unsigned int *pcount = opaque;
344         const struct bucket_data *bd = mp->pool_data;
345         unsigned int bucket_page_sz =
346                 (unsigned int)(~bd->bucket_page_mask + 1);
347         uintptr_t align;
348         uint8_t *iter;
349
350         align = (uintptr_t)RTE_PTR_ALIGN_CEIL(memhdr->addr, bucket_page_sz) -
351                 (uintptr_t)memhdr->addr;
352
353         for (iter = (uint8_t *)memhdr->addr + align;
354              iter < (uint8_t *)memhdr->addr + memhdr->len;
355              iter += bucket_page_sz) {
356                 struct bucket_header *hdr = (struct bucket_header *)iter;
357
358                 *pcount += hdr->fill_cnt;
359         }
360 }
361
362 static unsigned int
363 bucket_get_count(const struct rte_mempool *mp)
364 {
365         const struct bucket_data *bd = mp->pool_data;
366         unsigned int count =
367                 bd->obj_per_bucket * rte_ring_count(bd->shared_bucket_ring) +
368                 rte_ring_count(bd->shared_orphan_ring);
369         unsigned int i;
370
371         for (i = 0; i < RTE_MAX_LCORE; i++) {
372                 if (!rte_lcore_is_enabled(i))
373                         continue;
374                 count += bd->obj_per_bucket * bd->buckets[i]->top +
375                         rte_ring_count(bd->adoption_buffer_rings[i]);
376         }
377
378         rte_mempool_mem_iter((struct rte_mempool *)(uintptr_t)mp,
379                              count_underfilled_buckets, &count);
380
381         return count;
382 }
383
384 static int
385 bucket_alloc(struct rte_mempool *mp)
386 {
387         int rg_flags = 0;
388         int rc = 0;
389         char rg_name[RTE_RING_NAMESIZE];
390         struct bucket_data *bd;
391         unsigned int i;
392         unsigned int bucket_header_size;
393
394         bd = rte_zmalloc_socket("bucket_pool", sizeof(*bd),
395                                 RTE_CACHE_LINE_SIZE, mp->socket_id);
396         if (bd == NULL) {
397                 rc = -ENOMEM;
398                 goto no_mem_for_data;
399         }
400         bd->pool = mp;
401         if (mp->flags & MEMPOOL_F_NO_CACHE_ALIGN)
402                 bucket_header_size = sizeof(struct bucket_header);
403         else
404                 bucket_header_size = RTE_CACHE_LINE_SIZE;
405         RTE_BUILD_BUG_ON(sizeof(struct bucket_header) > RTE_CACHE_LINE_SIZE);
406         bd->header_size = mp->header_size + bucket_header_size;
407         bd->total_elt_size = mp->header_size + mp->elt_size + mp->trailer_size;
408         bd->bucket_mem_size = RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB * 1024;
409         bd->obj_per_bucket = (bd->bucket_mem_size - bucket_header_size) /
410                 bd->total_elt_size;
411         bd->bucket_page_mask = ~(rte_align64pow2(bd->bucket_mem_size) - 1);
412
413         if (mp->flags & MEMPOOL_F_SP_PUT)
414                 rg_flags |= RING_F_SP_ENQ;
415         if (mp->flags & MEMPOOL_F_SC_GET)
416                 rg_flags |= RING_F_SC_DEQ;
417
418         for (i = 0; i < RTE_MAX_LCORE; i++) {
419                 if (!rte_lcore_is_enabled(i))
420                         continue;
421                 bd->buckets[i] =
422                         bucket_stack_create(mp, mp->size / bd->obj_per_bucket);
423                 if (bd->buckets[i] == NULL) {
424                         rc = -ENOMEM;
425                         goto no_mem_for_stacks;
426                 }
427                 rc = snprintf(rg_name, sizeof(rg_name),
428                               RTE_MEMPOOL_MZ_FORMAT ".a%u", mp->name, i);
429                 if (rc < 0 || rc >= (int)sizeof(rg_name)) {
430                         rc = -ENAMETOOLONG;
431                         goto no_mem_for_stacks;
432                 }
433                 bd->adoption_buffer_rings[i] =
434                         rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
435                                         mp->socket_id,
436                                         rg_flags | RING_F_SC_DEQ);
437                 if (bd->adoption_buffer_rings[i] == NULL) {
438                         rc = -rte_errno;
439                         goto no_mem_for_stacks;
440                 }
441         }
442
443         rc = snprintf(rg_name, sizeof(rg_name),
444                       RTE_MEMPOOL_MZ_FORMAT ".0", mp->name);
445         if (rc < 0 || rc >= (int)sizeof(rg_name)) {
446                 rc = -ENAMETOOLONG;
447                 goto invalid_shared_orphan_ring;
448         }
449         bd->shared_orphan_ring =
450                 rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
451                                 mp->socket_id, rg_flags);
452         if (bd->shared_orphan_ring == NULL) {
453                 rc = -rte_errno;
454                 goto cannot_create_shared_orphan_ring;
455         }
456
457         rc = snprintf(rg_name, sizeof(rg_name),
458                        RTE_MEMPOOL_MZ_FORMAT ".1", mp->name);
459         if (rc < 0 || rc >= (int)sizeof(rg_name)) {
460                 rc = -ENAMETOOLONG;
461                 goto invalid_shared_bucket_ring;
462         }
463         bd->shared_bucket_ring =
464                 rte_ring_create(rg_name,
465                                 rte_align32pow2((mp->size + 1) /
466                                                 bd->obj_per_bucket),
467                                 mp->socket_id, rg_flags);
468         if (bd->shared_bucket_ring == NULL) {
469                 rc = -rte_errno;
470                 goto cannot_create_shared_bucket_ring;
471         }
472
473         mp->pool_data = bd;
474
475         return 0;
476
477 cannot_create_shared_bucket_ring:
478 invalid_shared_bucket_ring:
479         rte_ring_free(bd->shared_orphan_ring);
480 cannot_create_shared_orphan_ring:
481 invalid_shared_orphan_ring:
482 no_mem_for_stacks:
483         for (i = 0; i < RTE_MAX_LCORE; i++) {
484                 rte_free(bd->buckets[i]);
485                 rte_ring_free(bd->adoption_buffer_rings[i]);
486         }
487         rte_free(bd);
488 no_mem_for_data:
489         rte_errno = -rc;
490         return rc;
491 }
492
493 static void
494 bucket_free(struct rte_mempool *mp)
495 {
496         unsigned int i;
497         struct bucket_data *bd = mp->pool_data;
498
499         if (bd == NULL)
500                 return;
501
502         for (i = 0; i < RTE_MAX_LCORE; i++) {
503                 rte_free(bd->buckets[i]);
504                 rte_ring_free(bd->adoption_buffer_rings[i]);
505         }
506
507         rte_ring_free(bd->shared_orphan_ring);
508         rte_ring_free(bd->shared_bucket_ring);
509
510         rte_free(bd);
511 }
512
513 static ssize_t
514 bucket_calc_mem_size(const struct rte_mempool *mp, uint32_t obj_num,
515                      __rte_unused uint32_t pg_shift, size_t *min_total_elt_size,
516                      size_t *align)
517 {
518         struct bucket_data *bd = mp->pool_data;
519         unsigned int bucket_page_sz;
520
521         if (bd == NULL)
522                 return -EINVAL;
523
524         bucket_page_sz = rte_align32pow2(bd->bucket_mem_size);
525         *align = bucket_page_sz;
526         *min_total_elt_size = bucket_page_sz;
527         /*
528          * Each bucket occupies its own block aligned to
529          * bucket_page_sz, so the required amount of memory is
530          * a multiple of bucket_page_sz.
531          * We also need extra space for a bucket header
532          */
533         return ((obj_num + bd->obj_per_bucket - 1) /
534                 bd->obj_per_bucket) * bucket_page_sz;
535 }
536
537 static int
538 bucket_populate(struct rte_mempool *mp, unsigned int max_objs,
539                 void *vaddr, rte_iova_t iova, size_t len,
540                 rte_mempool_populate_obj_cb_t *obj_cb, void *obj_cb_arg)
541 {
542         struct bucket_data *bd = mp->pool_data;
543         unsigned int bucket_page_sz;
544         unsigned int bucket_header_sz;
545         unsigned int n_objs;
546         uintptr_t align;
547         uint8_t *iter;
548         int rc;
549
550         if (bd == NULL)
551                 return -EINVAL;
552
553         bucket_page_sz = rte_align32pow2(bd->bucket_mem_size);
554         align = RTE_PTR_ALIGN_CEIL((uintptr_t)vaddr, bucket_page_sz) -
555                 (uintptr_t)vaddr;
556
557         bucket_header_sz = bd->header_size - mp->header_size;
558         if (iova != RTE_BAD_IOVA)
559                 iova += align + bucket_header_sz;
560
561         for (iter = (uint8_t *)vaddr + align, n_objs = 0;
562              iter < (uint8_t *)vaddr + len && n_objs < max_objs;
563              iter += bucket_page_sz) {
564                 struct bucket_header *hdr = (struct bucket_header *)iter;
565                 unsigned int chunk_len = bd->bucket_mem_size;
566
567                 if ((size_t)(iter - (uint8_t *)vaddr) + chunk_len > len)
568                         chunk_len = len - (iter - (uint8_t *)vaddr);
569                 if (chunk_len <= bucket_header_sz)
570                         break;
571                 chunk_len -= bucket_header_sz;
572
573                 hdr->fill_cnt = 0;
574                 hdr->lcore_id = LCORE_ID_ANY;
575                 rc = rte_mempool_op_populate_default(mp,
576                                                      RTE_MIN(bd->obj_per_bucket,
577                                                              max_objs - n_objs),
578                                                      iter + bucket_header_sz,
579                                                      iova, chunk_len,
580                                                      obj_cb, obj_cb_arg);
581                 if (rc < 0)
582                         return rc;
583                 n_objs += rc;
584                 if (iova != RTE_BAD_IOVA)
585                         iova += bucket_page_sz;
586         }
587
588         return n_objs;
589 }
590
591 static int
592 bucket_get_info(const struct rte_mempool *mp, struct rte_mempool_info *info)
593 {
594         struct bucket_data *bd = mp->pool_data;
595
596         info->contig_block_size = bd->obj_per_bucket;
597         return 0;
598 }
599
600
601 static const struct rte_mempool_ops ops_bucket = {
602         .name = "bucket",
603         .alloc = bucket_alloc,
604         .free = bucket_free,
605         .enqueue = bucket_enqueue,
606         .dequeue = bucket_dequeue,
607         .get_count = bucket_get_count,
608         .calc_mem_size = bucket_calc_mem_size,
609         .populate = bucket_populate,
610         .get_info = bucket_get_info,
611         .dequeue_contig_blocks = bucket_dequeue_contig_blocks,
612 };
613
614
615 MEMPOOL_REGISTER_OPS(ops_bucket);