mempool: remove deprecated functions
[dpdk.git] / lib / librte_mempool / rte_mempool.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2014 Intel Corporation.
3  * Copyright(c) 2016 6WIND S.A.
4  */
5
6 #include <stdbool.h>
7 #include <stdio.h>
8 #include <string.h>
9 #include <stdint.h>
10 #include <stdarg.h>
11 #include <unistd.h>
12 #include <inttypes.h>
13 #include <errno.h>
14 #include <sys/queue.h>
15 #include <sys/mman.h>
16
17 #include <rte_common.h>
18 #include <rte_log.h>
19 #include <rte_debug.h>
20 #include <rte_memory.h>
21 #include <rte_memzone.h>
22 #include <rte_malloc.h>
23 #include <rte_atomic.h>
24 #include <rte_launch.h>
25 #include <rte_eal.h>
26 #include <rte_eal_memconfig.h>
27 #include <rte_per_lcore.h>
28 #include <rte_lcore.h>
29 #include <rte_branch_prediction.h>
30 #include <rte_errno.h>
31 #include <rte_string_fns.h>
32 #include <rte_spinlock.h>
33
34 #include "rte_mempool.h"
35
36 TAILQ_HEAD(rte_mempool_list, rte_tailq_entry);
37
38 static struct rte_tailq_elem rte_mempool_tailq = {
39         .name = "RTE_MEMPOOL",
40 };
41 EAL_REGISTER_TAILQ(rte_mempool_tailq)
42
43 #define CACHE_FLUSHTHRESH_MULTIPLIER 1.5
44 #define CALC_CACHE_FLUSHTHRESH(c)       \
45         ((typeof(c))((c) * CACHE_FLUSHTHRESH_MULTIPLIER))
46
47 /*
48  * return the greatest common divisor between a and b (fast algorithm)
49  *
50  */
51 static unsigned get_gcd(unsigned a, unsigned b)
52 {
53         unsigned c;
54
55         if (0 == a)
56                 return b;
57         if (0 == b)
58                 return a;
59
60         if (a < b) {
61                 c = a;
62                 a = b;
63                 b = c;
64         }
65
66         while (b != 0) {
67                 c = a % b;
68                 a = b;
69                 b = c;
70         }
71
72         return a;
73 }
74
75 /*
76  * Depending on memory configuration, objects addresses are spread
77  * between channels and ranks in RAM: the pool allocator will add
78  * padding between objects. This function return the new size of the
79  * object.
80  */
81 static unsigned optimize_object_size(unsigned obj_size)
82 {
83         unsigned nrank, nchan;
84         unsigned new_obj_size;
85
86         /* get number of channels */
87         nchan = rte_memory_get_nchannel();
88         if (nchan == 0)
89                 nchan = 4;
90
91         nrank = rte_memory_get_nrank();
92         if (nrank == 0)
93                 nrank = 1;
94
95         /* process new object size */
96         new_obj_size = (obj_size + RTE_MEMPOOL_ALIGN_MASK) / RTE_MEMPOOL_ALIGN;
97         while (get_gcd(new_obj_size, nrank * nchan) != 1)
98                 new_obj_size++;
99         return new_obj_size * RTE_MEMPOOL_ALIGN;
100 }
101
102 static int
103 find_min_pagesz(const struct rte_memseg_list *msl, void *arg)
104 {
105         size_t *min = arg;
106
107         if (msl->page_sz < *min)
108                 *min = msl->page_sz;
109
110         return 0;
111 }
112
113 static size_t
114 get_min_page_size(void)
115 {
116         size_t min_pagesz = SIZE_MAX;
117
118         rte_memseg_list_walk(find_min_pagesz, &min_pagesz);
119
120         return min_pagesz == SIZE_MAX ? (size_t) getpagesize() : min_pagesz;
121 }
122
123
124 static void
125 mempool_add_elem(struct rte_mempool *mp, __rte_unused void *opaque,
126                  void *obj, rte_iova_t iova)
127 {
128         struct rte_mempool_objhdr *hdr;
129         struct rte_mempool_objtlr *tlr __rte_unused;
130
131         /* set mempool ptr in header */
132         hdr = RTE_PTR_SUB(obj, sizeof(*hdr));
133         hdr->mp = mp;
134         hdr->iova = iova;
135         STAILQ_INSERT_TAIL(&mp->elt_list, hdr, next);
136         mp->populated_size++;
137
138 #ifdef RTE_LIBRTE_MEMPOOL_DEBUG
139         hdr->cookie = RTE_MEMPOOL_HEADER_COOKIE2;
140         tlr = __mempool_get_trailer(obj);
141         tlr->cookie = RTE_MEMPOOL_TRAILER_COOKIE;
142 #endif
143 }
144
145 /* call obj_cb() for each mempool element */
146 uint32_t
147 rte_mempool_obj_iter(struct rte_mempool *mp,
148         rte_mempool_obj_cb_t *obj_cb, void *obj_cb_arg)
149 {
150         struct rte_mempool_objhdr *hdr;
151         void *obj;
152         unsigned n = 0;
153
154         STAILQ_FOREACH(hdr, &mp->elt_list, next) {
155                 obj = (char *)hdr + sizeof(*hdr);
156                 obj_cb(mp, obj_cb_arg, obj, n);
157                 n++;
158         }
159
160         return n;
161 }
162
163 /* call mem_cb() for each mempool memory chunk */
164 uint32_t
165 rte_mempool_mem_iter(struct rte_mempool *mp,
166         rte_mempool_mem_cb_t *mem_cb, void *mem_cb_arg)
167 {
168         struct rte_mempool_memhdr *hdr;
169         unsigned n = 0;
170
171         STAILQ_FOREACH(hdr, &mp->mem_list, next) {
172                 mem_cb(mp, mem_cb_arg, hdr, n);
173                 n++;
174         }
175
176         return n;
177 }
178
179 /* get the header, trailer and total size of a mempool element. */
180 uint32_t
181 rte_mempool_calc_obj_size(uint32_t elt_size, uint32_t flags,
182         struct rte_mempool_objsz *sz)
183 {
184         struct rte_mempool_objsz lsz;
185
186         sz = (sz != NULL) ? sz : &lsz;
187
188         sz->header_size = sizeof(struct rte_mempool_objhdr);
189         if ((flags & MEMPOOL_F_NO_CACHE_ALIGN) == 0)
190                 sz->header_size = RTE_ALIGN_CEIL(sz->header_size,
191                         RTE_MEMPOOL_ALIGN);
192
193 #ifdef RTE_LIBRTE_MEMPOOL_DEBUG
194         sz->trailer_size = sizeof(struct rte_mempool_objtlr);
195 #else
196         sz->trailer_size = 0;
197 #endif
198
199         /* element size is 8 bytes-aligned at least */
200         sz->elt_size = RTE_ALIGN_CEIL(elt_size, sizeof(uint64_t));
201
202         /* expand trailer to next cache line */
203         if ((flags & MEMPOOL_F_NO_CACHE_ALIGN) == 0) {
204                 sz->total_size = sz->header_size + sz->elt_size +
205                         sz->trailer_size;
206                 sz->trailer_size += ((RTE_MEMPOOL_ALIGN -
207                                   (sz->total_size & RTE_MEMPOOL_ALIGN_MASK)) &
208                                  RTE_MEMPOOL_ALIGN_MASK);
209         }
210
211         /*
212          * increase trailer to add padding between objects in order to
213          * spread them across memory channels/ranks
214          */
215         if ((flags & MEMPOOL_F_NO_SPREAD) == 0) {
216                 unsigned new_size;
217                 new_size = optimize_object_size(sz->header_size + sz->elt_size +
218                         sz->trailer_size);
219                 sz->trailer_size = new_size - sz->header_size - sz->elt_size;
220         }
221
222         /* this is the size of an object, including header and trailer */
223         sz->total_size = sz->header_size + sz->elt_size + sz->trailer_size;
224
225         return sz->total_size;
226 }
227
228
229 /*
230  * Internal function to calculate required memory chunk size.
231  */
232 size_t
233 rte_mempool_calc_mem_size_helper(uint32_t elt_num, size_t total_elt_sz,
234                                  uint32_t pg_shift)
235 {
236         size_t obj_per_page, pg_num, pg_sz;
237
238         if (total_elt_sz == 0)
239                 return 0;
240
241         if (pg_shift == 0)
242                 return total_elt_sz * elt_num;
243
244         pg_sz = (size_t)1 << pg_shift;
245         obj_per_page = pg_sz / total_elt_sz;
246         if (obj_per_page == 0)
247                 return RTE_ALIGN_CEIL(total_elt_sz, pg_sz) * elt_num;
248
249         pg_num = (elt_num + obj_per_page - 1) / obj_per_page;
250         return pg_num << pg_shift;
251 }
252
253 /* free a memchunk allocated with rte_memzone_reserve() */
254 static void
255 rte_mempool_memchunk_mz_free(__rte_unused struct rte_mempool_memhdr *memhdr,
256         void *opaque)
257 {
258         const struct rte_memzone *mz = opaque;
259         rte_memzone_free(mz);
260 }
261
262 /* Free memory chunks used by a mempool. Objects must be in pool */
263 static void
264 rte_mempool_free_memchunks(struct rte_mempool *mp)
265 {
266         struct rte_mempool_memhdr *memhdr;
267         void *elt;
268
269         while (!STAILQ_EMPTY(&mp->elt_list)) {
270                 rte_mempool_ops_dequeue_bulk(mp, &elt, 1);
271                 (void)elt;
272                 STAILQ_REMOVE_HEAD(&mp->elt_list, next);
273                 mp->populated_size--;
274         }
275
276         while (!STAILQ_EMPTY(&mp->mem_list)) {
277                 memhdr = STAILQ_FIRST(&mp->mem_list);
278                 STAILQ_REMOVE_HEAD(&mp->mem_list, next);
279                 if (memhdr->free_cb != NULL)
280                         memhdr->free_cb(memhdr, memhdr->opaque);
281                 rte_free(memhdr);
282                 mp->nb_mem_chunks--;
283         }
284 }
285
286 static int
287 mempool_ops_alloc_once(struct rte_mempool *mp)
288 {
289         int ret;
290
291         /* create the internal ring if not already done */
292         if ((mp->flags & MEMPOOL_F_POOL_CREATED) == 0) {
293                 ret = rte_mempool_ops_alloc(mp);
294                 if (ret != 0)
295                         return ret;
296                 mp->flags |= MEMPOOL_F_POOL_CREATED;
297         }
298         return 0;
299 }
300
301 /* Add objects in the pool, using a physically contiguous memory
302  * zone. Return the number of objects added, or a negative value
303  * on error.
304  */
305 int
306 rte_mempool_populate_iova(struct rte_mempool *mp, char *vaddr,
307         rte_iova_t iova, size_t len, rte_mempool_memchunk_free_cb_t *free_cb,
308         void *opaque)
309 {
310         unsigned i = 0;
311         size_t off;
312         struct rte_mempool_memhdr *memhdr;
313         int ret;
314
315         ret = mempool_ops_alloc_once(mp);
316         if (ret != 0)
317                 return ret;
318
319         /* mempool is already populated */
320         if (mp->populated_size >= mp->size)
321                 return -ENOSPC;
322
323         memhdr = rte_zmalloc("MEMPOOL_MEMHDR", sizeof(*memhdr), 0);
324         if (memhdr == NULL)
325                 return -ENOMEM;
326
327         memhdr->mp = mp;
328         memhdr->addr = vaddr;
329         memhdr->iova = iova;
330         memhdr->len = len;
331         memhdr->free_cb = free_cb;
332         memhdr->opaque = opaque;
333
334         if (mp->flags & MEMPOOL_F_NO_CACHE_ALIGN)
335                 off = RTE_PTR_ALIGN_CEIL(vaddr, 8) - vaddr;
336         else
337                 off = RTE_PTR_ALIGN_CEIL(vaddr, RTE_CACHE_LINE_SIZE) - vaddr;
338
339         if (off > len) {
340                 ret = -EINVAL;
341                 goto fail;
342         }
343
344         i = rte_mempool_ops_populate(mp, mp->size - mp->populated_size,
345                 (char *)vaddr + off,
346                 (iova == RTE_BAD_IOVA) ? RTE_BAD_IOVA : (iova + off),
347                 len - off, mempool_add_elem, NULL);
348
349         /* not enough room to store one object */
350         if (i == 0) {
351                 ret = -EINVAL;
352                 goto fail;
353         }
354
355         STAILQ_INSERT_TAIL(&mp->mem_list, memhdr, next);
356         mp->nb_mem_chunks++;
357         return i;
358
359 fail:
360         rte_free(memhdr);
361         return ret;
362 }
363
364 /* Populate the mempool with a virtual area. Return the number of
365  * objects added, or a negative value on error.
366  */
367 int
368 rte_mempool_populate_virt(struct rte_mempool *mp, char *addr,
369         size_t len, size_t pg_sz, rte_mempool_memchunk_free_cb_t *free_cb,
370         void *opaque)
371 {
372         rte_iova_t iova;
373         size_t off, phys_len;
374         int ret, cnt = 0;
375
376         /* address and len must be page-aligned */
377         if (RTE_PTR_ALIGN_CEIL(addr, pg_sz) != addr)
378                 return -EINVAL;
379         if (RTE_ALIGN_CEIL(len, pg_sz) != len)
380                 return -EINVAL;
381
382         if (mp->flags & MEMPOOL_F_NO_IOVA_CONTIG)
383                 return rte_mempool_populate_iova(mp, addr, RTE_BAD_IOVA,
384                         len, free_cb, opaque);
385
386         for (off = 0; off + pg_sz <= len &&
387                      mp->populated_size < mp->size; off += phys_len) {
388
389                 iova = rte_mem_virt2iova(addr + off);
390
391                 if (iova == RTE_BAD_IOVA && rte_eal_has_hugepages()) {
392                         ret = -EINVAL;
393                         goto fail;
394                 }
395
396                 /* populate with the largest group of contiguous pages */
397                 for (phys_len = pg_sz; off + phys_len < len; phys_len += pg_sz) {
398                         rte_iova_t iova_tmp;
399
400                         iova_tmp = rte_mem_virt2iova(addr + off + phys_len);
401
402                         if (iova_tmp != iova + phys_len)
403                                 break;
404                 }
405
406                 ret = rte_mempool_populate_iova(mp, addr + off, iova,
407                         phys_len, free_cb, opaque);
408                 if (ret < 0)
409                         goto fail;
410                 /* no need to call the free callback for next chunks */
411                 free_cb = NULL;
412                 cnt += ret;
413         }
414
415         return cnt;
416
417  fail:
418         rte_mempool_free_memchunks(mp);
419         return ret;
420 }
421
422 /* Default function to populate the mempool: allocate memory in memzones,
423  * and populate them. Return the number of objects added, or a negative
424  * value on error.
425  */
426 int
427 rte_mempool_populate_default(struct rte_mempool *mp)
428 {
429         unsigned int mz_flags = RTE_MEMZONE_1GB|RTE_MEMZONE_SIZE_HINT_ONLY;
430         char mz_name[RTE_MEMZONE_NAMESIZE];
431         const struct rte_memzone *mz;
432         ssize_t mem_size;
433         size_t align, pg_sz, pg_shift;
434         rte_iova_t iova;
435         unsigned mz_id, n;
436         int ret;
437         bool no_contig, try_contig, no_pageshift;
438
439         ret = mempool_ops_alloc_once(mp);
440         if (ret != 0)
441                 return ret;
442
443         /* mempool must not be populated */
444         if (mp->nb_mem_chunks != 0)
445                 return -EEXIST;
446
447         no_contig = mp->flags & MEMPOOL_F_NO_IOVA_CONTIG;
448
449         /*
450          * the following section calculates page shift and page size values.
451          *
452          * these values impact the result of calc_mem_size operation, which
453          * returns the amount of memory that should be allocated to store the
454          * desired number of objects. when not zero, it allocates more memory
455          * for the padding between objects, to ensure that an object does not
456          * cross a page boundary. in other words, page size/shift are to be set
457          * to zero if mempool elements won't care about page boundaries.
458          * there are several considerations for page size and page shift here.
459          *
460          * if we don't need our mempools to have physically contiguous objects,
461          * then just set page shift and page size to 0, because the user has
462          * indicated that there's no need to care about anything.
463          *
464          * if we do need contiguous objects, there is also an option to reserve
465          * the entire mempool memory as one contiguous block of memory, in
466          * which case the page shift and alignment wouldn't matter as well.
467          *
468          * if we require contiguous objects, but not necessarily the entire
469          * mempool reserved space to be contiguous, then there are two options.
470          *
471          * if our IO addresses are virtual, not actual physical (IOVA as VA
472          * case), then no page shift needed - our memory allocation will give us
473          * contiguous IO memory as far as the hardware is concerned, so
474          * act as if we're getting contiguous memory.
475          *
476          * if our IO addresses are physical, we may get memory from bigger
477          * pages, or we might get memory from smaller pages, and how much of it
478          * we require depends on whether we want bigger or smaller pages.
479          * However, requesting each and every memory size is too much work, so
480          * what we'll do instead is walk through the page sizes available, pick
481          * the smallest one and set up page shift to match that one. We will be
482          * wasting some space this way, but it's much nicer than looping around
483          * trying to reserve each and every page size.
484          *
485          * However, since size calculation will produce page-aligned sizes, it
486          * makes sense to first try and see if we can reserve the entire memzone
487          * in one contiguous chunk as well (otherwise we might end up wasting a
488          * 1G page on a 10MB memzone). If we fail to get enough contiguous
489          * memory, then we'll go and reserve space page-by-page.
490          */
491         no_pageshift = no_contig || rte_eal_iova_mode() == RTE_IOVA_VA;
492         try_contig = !no_contig && !no_pageshift && rte_eal_has_hugepages();
493
494         if (no_pageshift) {
495                 pg_sz = 0;
496                 pg_shift = 0;
497         } else if (try_contig) {
498                 pg_sz = get_min_page_size();
499                 pg_shift = rte_bsf32(pg_sz);
500         } else {
501                 pg_sz = getpagesize();
502                 pg_shift = rte_bsf32(pg_sz);
503         }
504
505         for (mz_id = 0, n = mp->size; n > 0; mz_id++, n -= ret) {
506                 size_t min_chunk_size;
507                 unsigned int flags;
508
509                 if (try_contig || no_pageshift)
510                         mem_size = rte_mempool_ops_calc_mem_size(mp, n,
511                                         0, &min_chunk_size, &align);
512                 else
513                         mem_size = rte_mempool_ops_calc_mem_size(mp, n,
514                                         pg_shift, &min_chunk_size, &align);
515
516                 if (mem_size < 0) {
517                         ret = mem_size;
518                         goto fail;
519                 }
520
521                 ret = snprintf(mz_name, sizeof(mz_name),
522                         RTE_MEMPOOL_MZ_FORMAT "_%d", mp->name, mz_id);
523                 if (ret < 0 || ret >= (int)sizeof(mz_name)) {
524                         ret = -ENAMETOOLONG;
525                         goto fail;
526                 }
527
528                 flags = mz_flags;
529
530                 /* if we're trying to reserve contiguous memory, add appropriate
531                  * memzone flag.
532                  */
533                 if (try_contig)
534                         flags |= RTE_MEMZONE_IOVA_CONTIG;
535
536                 mz = rte_memzone_reserve_aligned(mz_name, mem_size,
537                                 mp->socket_id, flags, align);
538
539                 /* if we were trying to allocate contiguous memory, failed and
540                  * minimum required contiguous chunk fits minimum page, adjust
541                  * memzone size to the page size, and try again.
542                  */
543                 if (mz == NULL && try_contig && min_chunk_size <= pg_sz) {
544                         try_contig = false;
545                         flags &= ~RTE_MEMZONE_IOVA_CONTIG;
546
547                         mem_size = rte_mempool_ops_calc_mem_size(mp, n,
548                                         pg_shift, &min_chunk_size, &align);
549                         if (mem_size < 0) {
550                                 ret = mem_size;
551                                 goto fail;
552                         }
553
554                         mz = rte_memzone_reserve_aligned(mz_name, mem_size,
555                                 mp->socket_id, flags, align);
556                 }
557                 /* don't try reserving with 0 size if we were asked to reserve
558                  * IOVA-contiguous memory.
559                  */
560                 if (min_chunk_size < (size_t)mem_size && mz == NULL) {
561                         /* not enough memory, retry with the biggest zone we
562                          * have
563                          */
564                         mz = rte_memzone_reserve_aligned(mz_name, 0,
565                                         mp->socket_id, flags,
566                                         RTE_MAX(pg_sz, align));
567                 }
568                 if (mz == NULL) {
569                         ret = -rte_errno;
570                         goto fail;
571                 }
572
573                 if (mz->len < min_chunk_size) {
574                         rte_memzone_free(mz);
575                         ret = -ENOMEM;
576                         goto fail;
577                 }
578
579                 if (no_contig)
580                         iova = RTE_BAD_IOVA;
581                 else
582                         iova = mz->iova;
583
584                 if (no_pageshift || try_contig)
585                         ret = rte_mempool_populate_iova(mp, mz->addr,
586                                 iova, mz->len,
587                                 rte_mempool_memchunk_mz_free,
588                                 (void *)(uintptr_t)mz);
589                 else
590                         ret = rte_mempool_populate_virt(mp, mz->addr,
591                                 RTE_ALIGN_FLOOR(mz->len, pg_sz), pg_sz,
592                                 rte_mempool_memchunk_mz_free,
593                                 (void *)(uintptr_t)mz);
594                 if (ret < 0) {
595                         rte_memzone_free(mz);
596                         goto fail;
597                 }
598         }
599
600         return mp->size;
601
602  fail:
603         rte_mempool_free_memchunks(mp);
604         return ret;
605 }
606
607 /* return the memory size required for mempool objects in anonymous mem */
608 static ssize_t
609 get_anon_size(const struct rte_mempool *mp)
610 {
611         ssize_t size;
612         size_t pg_sz, pg_shift;
613         size_t min_chunk_size;
614         size_t align;
615
616         pg_sz = getpagesize();
617         pg_shift = rte_bsf32(pg_sz);
618         size = rte_mempool_ops_calc_mem_size(mp, mp->size, pg_shift,
619                                              &min_chunk_size, &align);
620
621         return size;
622 }
623
624 /* unmap a memory zone mapped by rte_mempool_populate_anon() */
625 static void
626 rte_mempool_memchunk_anon_free(struct rte_mempool_memhdr *memhdr,
627         void *opaque)
628 {
629         ssize_t size;
630
631         /*
632          * Calculate size since memhdr->len has contiguous chunk length
633          * which may be smaller if anon map is split into many contiguous
634          * chunks. Result must be the same as we calculated on populate.
635          */
636         size = get_anon_size(memhdr->mp);
637         if (size < 0)
638                 return;
639
640         munmap(opaque, size);
641 }
642
643 /* populate the mempool with an anonymous mapping */
644 int
645 rte_mempool_populate_anon(struct rte_mempool *mp)
646 {
647         ssize_t size;
648         int ret;
649         char *addr;
650
651         /* mempool is already populated, error */
652         if ((!STAILQ_EMPTY(&mp->mem_list)) || mp->nb_mem_chunks != 0) {
653                 rte_errno = EINVAL;
654                 return 0;
655         }
656
657         ret = mempool_ops_alloc_once(mp);
658         if (ret != 0)
659                 return ret;
660
661         size = get_anon_size(mp);
662         if (size < 0) {
663                 rte_errno = -size;
664                 return 0;
665         }
666
667         /* get chunk of virtually continuous memory */
668         addr = mmap(NULL, size, PROT_READ | PROT_WRITE,
669                 MAP_SHARED | MAP_ANONYMOUS, -1, 0);
670         if (addr == MAP_FAILED) {
671                 rte_errno = errno;
672                 return 0;
673         }
674         /* can't use MMAP_LOCKED, it does not exist on BSD */
675         if (mlock(addr, size) < 0) {
676                 rte_errno = errno;
677                 munmap(addr, size);
678                 return 0;
679         }
680
681         ret = rte_mempool_populate_virt(mp, addr, size, getpagesize(),
682                 rte_mempool_memchunk_anon_free, addr);
683         if (ret == 0)
684                 goto fail;
685
686         return mp->populated_size;
687
688  fail:
689         rte_mempool_free_memchunks(mp);
690         return 0;
691 }
692
693 /* free a mempool */
694 void
695 rte_mempool_free(struct rte_mempool *mp)
696 {
697         struct rte_mempool_list *mempool_list = NULL;
698         struct rte_tailq_entry *te;
699
700         if (mp == NULL)
701                 return;
702
703         mempool_list = RTE_TAILQ_CAST(rte_mempool_tailq.head, rte_mempool_list);
704         rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
705         /* find out tailq entry */
706         TAILQ_FOREACH(te, mempool_list, next) {
707                 if (te->data == (void *)mp)
708                         break;
709         }
710
711         if (te != NULL) {
712                 TAILQ_REMOVE(mempool_list, te, next);
713                 rte_free(te);
714         }
715         rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
716
717         rte_mempool_free_memchunks(mp);
718         rte_mempool_ops_free(mp);
719         rte_memzone_free(mp->mz);
720 }
721
722 static void
723 mempool_cache_init(struct rte_mempool_cache *cache, uint32_t size)
724 {
725         cache->size = size;
726         cache->flushthresh = CALC_CACHE_FLUSHTHRESH(size);
727         cache->len = 0;
728 }
729
730 /*
731  * Create and initialize a cache for objects that are retrieved from and
732  * returned to an underlying mempool. This structure is identical to the
733  * local_cache[lcore_id] pointed to by the mempool structure.
734  */
735 struct rte_mempool_cache *
736 rte_mempool_cache_create(uint32_t size, int socket_id)
737 {
738         struct rte_mempool_cache *cache;
739
740         if (size == 0 || size > RTE_MEMPOOL_CACHE_MAX_SIZE) {
741                 rte_errno = EINVAL;
742                 return NULL;
743         }
744
745         cache = rte_zmalloc_socket("MEMPOOL_CACHE", sizeof(*cache),
746                                   RTE_CACHE_LINE_SIZE, socket_id);
747         if (cache == NULL) {
748                 RTE_LOG(ERR, MEMPOOL, "Cannot allocate mempool cache.\n");
749                 rte_errno = ENOMEM;
750                 return NULL;
751         }
752
753         mempool_cache_init(cache, size);
754
755         return cache;
756 }
757
758 /*
759  * Free a cache. It's the responsibility of the user to make sure that any
760  * remaining objects in the cache are flushed to the corresponding
761  * mempool.
762  */
763 void
764 rte_mempool_cache_free(struct rte_mempool_cache *cache)
765 {
766         rte_free(cache);
767 }
768
769 /* create an empty mempool */
770 struct rte_mempool *
771 rte_mempool_create_empty(const char *name, unsigned n, unsigned elt_size,
772         unsigned cache_size, unsigned private_data_size,
773         int socket_id, unsigned flags)
774 {
775         char mz_name[RTE_MEMZONE_NAMESIZE];
776         struct rte_mempool_list *mempool_list;
777         struct rte_mempool *mp = NULL;
778         struct rte_tailq_entry *te = NULL;
779         const struct rte_memzone *mz = NULL;
780         size_t mempool_size;
781         unsigned int mz_flags = RTE_MEMZONE_1GB|RTE_MEMZONE_SIZE_HINT_ONLY;
782         struct rte_mempool_objsz objsz;
783         unsigned lcore_id;
784         int ret;
785
786         /* compilation-time checks */
787         RTE_BUILD_BUG_ON((sizeof(struct rte_mempool) &
788                           RTE_CACHE_LINE_MASK) != 0);
789         RTE_BUILD_BUG_ON((sizeof(struct rte_mempool_cache) &
790                           RTE_CACHE_LINE_MASK) != 0);
791 #ifdef RTE_LIBRTE_MEMPOOL_DEBUG
792         RTE_BUILD_BUG_ON((sizeof(struct rte_mempool_debug_stats) &
793                           RTE_CACHE_LINE_MASK) != 0);
794         RTE_BUILD_BUG_ON((offsetof(struct rte_mempool, stats) &
795                           RTE_CACHE_LINE_MASK) != 0);
796 #endif
797
798         mempool_list = RTE_TAILQ_CAST(rte_mempool_tailq.head, rte_mempool_list);
799
800         /* asked for zero items */
801         if (n == 0) {
802                 rte_errno = EINVAL;
803                 return NULL;
804         }
805
806         /* asked cache too big */
807         if (cache_size > RTE_MEMPOOL_CACHE_MAX_SIZE ||
808             CALC_CACHE_FLUSHTHRESH(cache_size) > n) {
809                 rte_errno = EINVAL;
810                 return NULL;
811         }
812
813         /* "no cache align" imply "no spread" */
814         if (flags & MEMPOOL_F_NO_CACHE_ALIGN)
815                 flags |= MEMPOOL_F_NO_SPREAD;
816
817         /* calculate mempool object sizes. */
818         if (!rte_mempool_calc_obj_size(elt_size, flags, &objsz)) {
819                 rte_errno = EINVAL;
820                 return NULL;
821         }
822
823         rte_rwlock_write_lock(RTE_EAL_MEMPOOL_RWLOCK);
824
825         /*
826          * reserve a memory zone for this mempool: private data is
827          * cache-aligned
828          */
829         private_data_size = (private_data_size +
830                              RTE_MEMPOOL_ALIGN_MASK) & (~RTE_MEMPOOL_ALIGN_MASK);
831
832
833         /* try to allocate tailq entry */
834         te = rte_zmalloc("MEMPOOL_TAILQ_ENTRY", sizeof(*te), 0);
835         if (te == NULL) {
836                 RTE_LOG(ERR, MEMPOOL, "Cannot allocate tailq entry!\n");
837                 goto exit_unlock;
838         }
839
840         mempool_size = MEMPOOL_HEADER_SIZE(mp, cache_size);
841         mempool_size += private_data_size;
842         mempool_size = RTE_ALIGN_CEIL(mempool_size, RTE_MEMPOOL_ALIGN);
843
844         ret = snprintf(mz_name, sizeof(mz_name), RTE_MEMPOOL_MZ_FORMAT, name);
845         if (ret < 0 || ret >= (int)sizeof(mz_name)) {
846                 rte_errno = ENAMETOOLONG;
847                 goto exit_unlock;
848         }
849
850         mz = rte_memzone_reserve(mz_name, mempool_size, socket_id, mz_flags);
851         if (mz == NULL)
852                 goto exit_unlock;
853
854         /* init the mempool structure */
855         mp = mz->addr;
856         memset(mp, 0, MEMPOOL_HEADER_SIZE(mp, cache_size));
857         ret = snprintf(mp->name, sizeof(mp->name), "%s", name);
858         if (ret < 0 || ret >= (int)sizeof(mp->name)) {
859                 rte_errno = ENAMETOOLONG;
860                 goto exit_unlock;
861         }
862         mp->mz = mz;
863         mp->size = n;
864         mp->flags = flags;
865         mp->socket_id = socket_id;
866         mp->elt_size = objsz.elt_size;
867         mp->header_size = objsz.header_size;
868         mp->trailer_size = objsz.trailer_size;
869         /* Size of default caches, zero means disabled. */
870         mp->cache_size = cache_size;
871         mp->private_data_size = private_data_size;
872         STAILQ_INIT(&mp->elt_list);
873         STAILQ_INIT(&mp->mem_list);
874
875         /*
876          * local_cache pointer is set even if cache_size is zero.
877          * The local_cache points to just past the elt_pa[] array.
878          */
879         mp->local_cache = (struct rte_mempool_cache *)
880                 RTE_PTR_ADD(mp, MEMPOOL_HEADER_SIZE(mp, 0));
881
882         /* Init all default caches. */
883         if (cache_size != 0) {
884                 for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++)
885                         mempool_cache_init(&mp->local_cache[lcore_id],
886                                            cache_size);
887         }
888
889         te->data = mp;
890
891         rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
892         TAILQ_INSERT_TAIL(mempool_list, te, next);
893         rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
894         rte_rwlock_write_unlock(RTE_EAL_MEMPOOL_RWLOCK);
895
896         return mp;
897
898 exit_unlock:
899         rte_rwlock_write_unlock(RTE_EAL_MEMPOOL_RWLOCK);
900         rte_free(te);
901         rte_mempool_free(mp);
902         return NULL;
903 }
904
905 /* create the mempool */
906 struct rte_mempool *
907 rte_mempool_create(const char *name, unsigned n, unsigned elt_size,
908         unsigned cache_size, unsigned private_data_size,
909         rte_mempool_ctor_t *mp_init, void *mp_init_arg,
910         rte_mempool_obj_cb_t *obj_init, void *obj_init_arg,
911         int socket_id, unsigned flags)
912 {
913         int ret;
914         struct rte_mempool *mp;
915
916         mp = rte_mempool_create_empty(name, n, elt_size, cache_size,
917                 private_data_size, socket_id, flags);
918         if (mp == NULL)
919                 return NULL;
920
921         /*
922          * Since we have 4 combinations of the SP/SC/MP/MC examine the flags to
923          * set the correct index into the table of ops structs.
924          */
925         if ((flags & MEMPOOL_F_SP_PUT) && (flags & MEMPOOL_F_SC_GET))
926                 ret = rte_mempool_set_ops_byname(mp, "ring_sp_sc", NULL);
927         else if (flags & MEMPOOL_F_SP_PUT)
928                 ret = rte_mempool_set_ops_byname(mp, "ring_sp_mc", NULL);
929         else if (flags & MEMPOOL_F_SC_GET)
930                 ret = rte_mempool_set_ops_byname(mp, "ring_mp_sc", NULL);
931         else
932                 ret = rte_mempool_set_ops_byname(mp, "ring_mp_mc", NULL);
933
934         if (ret)
935                 goto fail;
936
937         /* call the mempool priv initializer */
938         if (mp_init)
939                 mp_init(mp, mp_init_arg);
940
941         if (rte_mempool_populate_default(mp) < 0)
942                 goto fail;
943
944         /* call the object initializers */
945         if (obj_init)
946                 rte_mempool_obj_iter(mp, obj_init, obj_init_arg);
947
948         return mp;
949
950  fail:
951         rte_mempool_free(mp);
952         return NULL;
953 }
954
955 /* Return the number of entries in the mempool */
956 unsigned int
957 rte_mempool_avail_count(const struct rte_mempool *mp)
958 {
959         unsigned count;
960         unsigned lcore_id;
961
962         count = rte_mempool_ops_get_count(mp);
963
964         if (mp->cache_size == 0)
965                 return count;
966
967         for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++)
968                 count += mp->local_cache[lcore_id].len;
969
970         /*
971          * due to race condition (access to len is not locked), the
972          * total can be greater than size... so fix the result
973          */
974         if (count > mp->size)
975                 return mp->size;
976         return count;
977 }
978
979 /* return the number of entries allocated from the mempool */
980 unsigned int
981 rte_mempool_in_use_count(const struct rte_mempool *mp)
982 {
983         return mp->size - rte_mempool_avail_count(mp);
984 }
985
986 /* dump the cache status */
987 static unsigned
988 rte_mempool_dump_cache(FILE *f, const struct rte_mempool *mp)
989 {
990         unsigned lcore_id;
991         unsigned count = 0;
992         unsigned cache_count;
993
994         fprintf(f, "  internal cache infos:\n");
995         fprintf(f, "    cache_size=%"PRIu32"\n", mp->cache_size);
996
997         if (mp->cache_size == 0)
998                 return count;
999
1000         for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
1001                 cache_count = mp->local_cache[lcore_id].len;
1002                 fprintf(f, "    cache_count[%u]=%"PRIu32"\n",
1003                         lcore_id, cache_count);
1004                 count += cache_count;
1005         }
1006         fprintf(f, "    total_cache_count=%u\n", count);
1007         return count;
1008 }
1009
1010 #ifndef __INTEL_COMPILER
1011 #pragma GCC diagnostic ignored "-Wcast-qual"
1012 #endif
1013
1014 /* check and update cookies or panic (internal) */
1015 void rte_mempool_check_cookies(const struct rte_mempool *mp,
1016         void * const *obj_table_const, unsigned n, int free)
1017 {
1018 #ifdef RTE_LIBRTE_MEMPOOL_DEBUG
1019         struct rte_mempool_objhdr *hdr;
1020         struct rte_mempool_objtlr *tlr;
1021         uint64_t cookie;
1022         void *tmp;
1023         void *obj;
1024         void **obj_table;
1025
1026         /* Force to drop the "const" attribute. This is done only when
1027          * DEBUG is enabled */
1028         tmp = (void *) obj_table_const;
1029         obj_table = tmp;
1030
1031         while (n--) {
1032                 obj = obj_table[n];
1033
1034                 if (rte_mempool_from_obj(obj) != mp)
1035                         rte_panic("MEMPOOL: object is owned by another "
1036                                   "mempool\n");
1037
1038                 hdr = __mempool_get_header(obj);
1039                 cookie = hdr->cookie;
1040
1041                 if (free == 0) {
1042                         if (cookie != RTE_MEMPOOL_HEADER_COOKIE1) {
1043                                 RTE_LOG(CRIT, MEMPOOL,
1044                                         "obj=%p, mempool=%p, cookie=%" PRIx64 "\n",
1045                                         obj, (const void *) mp, cookie);
1046                                 rte_panic("MEMPOOL: bad header cookie (put)\n");
1047                         }
1048                         hdr->cookie = RTE_MEMPOOL_HEADER_COOKIE2;
1049                 } else if (free == 1) {
1050                         if (cookie != RTE_MEMPOOL_HEADER_COOKIE2) {
1051                                 RTE_LOG(CRIT, MEMPOOL,
1052                                         "obj=%p, mempool=%p, cookie=%" PRIx64 "\n",
1053                                         obj, (const void *) mp, cookie);
1054                                 rte_panic("MEMPOOL: bad header cookie (get)\n");
1055                         }
1056                         hdr->cookie = RTE_MEMPOOL_HEADER_COOKIE1;
1057                 } else if (free == 2) {
1058                         if (cookie != RTE_MEMPOOL_HEADER_COOKIE1 &&
1059                             cookie != RTE_MEMPOOL_HEADER_COOKIE2) {
1060                                 RTE_LOG(CRIT, MEMPOOL,
1061                                         "obj=%p, mempool=%p, cookie=%" PRIx64 "\n",
1062                                         obj, (const void *) mp, cookie);
1063                                 rte_panic("MEMPOOL: bad header cookie (audit)\n");
1064                         }
1065                 }
1066                 tlr = __mempool_get_trailer(obj);
1067                 cookie = tlr->cookie;
1068                 if (cookie != RTE_MEMPOOL_TRAILER_COOKIE) {
1069                         RTE_LOG(CRIT, MEMPOOL,
1070                                 "obj=%p, mempool=%p, cookie=%" PRIx64 "\n",
1071                                 obj, (const void *) mp, cookie);
1072                         rte_panic("MEMPOOL: bad trailer cookie\n");
1073                 }
1074         }
1075 #else
1076         RTE_SET_USED(mp);
1077         RTE_SET_USED(obj_table_const);
1078         RTE_SET_USED(n);
1079         RTE_SET_USED(free);
1080 #endif
1081 }
1082
1083 void
1084 rte_mempool_contig_blocks_check_cookies(const struct rte_mempool *mp,
1085         void * const *first_obj_table_const, unsigned int n, int free)
1086 {
1087 #ifdef RTE_LIBRTE_MEMPOOL_DEBUG
1088         struct rte_mempool_info info;
1089         const size_t total_elt_sz =
1090                 mp->header_size + mp->elt_size + mp->trailer_size;
1091         unsigned int i, j;
1092
1093         rte_mempool_ops_get_info(mp, &info);
1094
1095         for (i = 0; i < n; ++i) {
1096                 void *first_obj = first_obj_table_const[i];
1097
1098                 for (j = 0; j < info.contig_block_size; ++j) {
1099                         void *obj;
1100
1101                         obj = (void *)((uintptr_t)first_obj + j * total_elt_sz);
1102                         rte_mempool_check_cookies(mp, &obj, 1, free);
1103                 }
1104         }
1105 #else
1106         RTE_SET_USED(mp);
1107         RTE_SET_USED(first_obj_table_const);
1108         RTE_SET_USED(n);
1109         RTE_SET_USED(free);
1110 #endif
1111 }
1112
1113 #ifdef RTE_LIBRTE_MEMPOOL_DEBUG
1114 static void
1115 mempool_obj_audit(struct rte_mempool *mp, __rte_unused void *opaque,
1116         void *obj, __rte_unused unsigned idx)
1117 {
1118         __mempool_check_cookies(mp, &obj, 1, 2);
1119 }
1120
1121 static void
1122 mempool_audit_cookies(struct rte_mempool *mp)
1123 {
1124         unsigned num;
1125
1126         num = rte_mempool_obj_iter(mp, mempool_obj_audit, NULL);
1127         if (num != mp->size) {
1128                 rte_panic("rte_mempool_obj_iter(mempool=%p, size=%u) "
1129                         "iterated only over %u elements\n",
1130                         mp, mp->size, num);
1131         }
1132 }
1133 #else
1134 #define mempool_audit_cookies(mp) do {} while(0)
1135 #endif
1136
1137 #ifndef __INTEL_COMPILER
1138 #pragma GCC diagnostic error "-Wcast-qual"
1139 #endif
1140
1141 /* check cookies before and after objects */
1142 static void
1143 mempool_audit_cache(const struct rte_mempool *mp)
1144 {
1145         /* check cache size consistency */
1146         unsigned lcore_id;
1147
1148         if (mp->cache_size == 0)
1149                 return;
1150
1151         for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
1152                 const struct rte_mempool_cache *cache;
1153                 cache = &mp->local_cache[lcore_id];
1154                 if (cache->len > cache->flushthresh) {
1155                         RTE_LOG(CRIT, MEMPOOL, "badness on cache[%u]\n",
1156                                 lcore_id);
1157                         rte_panic("MEMPOOL: invalid cache len\n");
1158                 }
1159         }
1160 }
1161
1162 /* check the consistency of mempool (size, cookies, ...) */
1163 void
1164 rte_mempool_audit(struct rte_mempool *mp)
1165 {
1166         mempool_audit_cache(mp);
1167         mempool_audit_cookies(mp);
1168
1169         /* For case where mempool DEBUG is not set, and cache size is 0 */
1170         RTE_SET_USED(mp);
1171 }
1172
1173 /* dump the status of the mempool on the console */
1174 void
1175 rte_mempool_dump(FILE *f, struct rte_mempool *mp)
1176 {
1177 #ifdef RTE_LIBRTE_MEMPOOL_DEBUG
1178         struct rte_mempool_info info;
1179         struct rte_mempool_debug_stats sum;
1180         unsigned lcore_id;
1181 #endif
1182         struct rte_mempool_memhdr *memhdr;
1183         unsigned common_count;
1184         unsigned cache_count;
1185         size_t mem_len = 0;
1186
1187         RTE_ASSERT(f != NULL);
1188         RTE_ASSERT(mp != NULL);
1189
1190         fprintf(f, "mempool <%s>@%p\n", mp->name, mp);
1191         fprintf(f, "  flags=%x\n", mp->flags);
1192         fprintf(f, "  pool=%p\n", mp->pool_data);
1193         fprintf(f, "  iova=0x%" PRIx64 "\n", mp->mz->iova);
1194         fprintf(f, "  nb_mem_chunks=%u\n", mp->nb_mem_chunks);
1195         fprintf(f, "  size=%"PRIu32"\n", mp->size);
1196         fprintf(f, "  populated_size=%"PRIu32"\n", mp->populated_size);
1197         fprintf(f, "  header_size=%"PRIu32"\n", mp->header_size);
1198         fprintf(f, "  elt_size=%"PRIu32"\n", mp->elt_size);
1199         fprintf(f, "  trailer_size=%"PRIu32"\n", mp->trailer_size);
1200         fprintf(f, "  total_obj_size=%"PRIu32"\n",
1201                mp->header_size + mp->elt_size + mp->trailer_size);
1202
1203         fprintf(f, "  private_data_size=%"PRIu32"\n", mp->private_data_size);
1204
1205         STAILQ_FOREACH(memhdr, &mp->mem_list, next)
1206                 mem_len += memhdr->len;
1207         if (mem_len != 0) {
1208                 fprintf(f, "  avg bytes/object=%#Lf\n",
1209                         (long double)mem_len / mp->size);
1210         }
1211
1212         cache_count = rte_mempool_dump_cache(f, mp);
1213         common_count = rte_mempool_ops_get_count(mp);
1214         if ((cache_count + common_count) > mp->size)
1215                 common_count = mp->size - cache_count;
1216         fprintf(f, "  common_pool_count=%u\n", common_count);
1217
1218         /* sum and dump statistics */
1219 #ifdef RTE_LIBRTE_MEMPOOL_DEBUG
1220         rte_mempool_ops_get_info(mp, &info);
1221         memset(&sum, 0, sizeof(sum));
1222         for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
1223                 sum.put_bulk += mp->stats[lcore_id].put_bulk;
1224                 sum.put_objs += mp->stats[lcore_id].put_objs;
1225                 sum.get_success_bulk += mp->stats[lcore_id].get_success_bulk;
1226                 sum.get_success_objs += mp->stats[lcore_id].get_success_objs;
1227                 sum.get_fail_bulk += mp->stats[lcore_id].get_fail_bulk;
1228                 sum.get_fail_objs += mp->stats[lcore_id].get_fail_objs;
1229                 sum.get_success_blks += mp->stats[lcore_id].get_success_blks;
1230                 sum.get_fail_blks += mp->stats[lcore_id].get_fail_blks;
1231         }
1232         fprintf(f, "  stats:\n");
1233         fprintf(f, "    put_bulk=%"PRIu64"\n", sum.put_bulk);
1234         fprintf(f, "    put_objs=%"PRIu64"\n", sum.put_objs);
1235         fprintf(f, "    get_success_bulk=%"PRIu64"\n", sum.get_success_bulk);
1236         fprintf(f, "    get_success_objs=%"PRIu64"\n", sum.get_success_objs);
1237         fprintf(f, "    get_fail_bulk=%"PRIu64"\n", sum.get_fail_bulk);
1238         fprintf(f, "    get_fail_objs=%"PRIu64"\n", sum.get_fail_objs);
1239         if (info.contig_block_size > 0) {
1240                 fprintf(f, "    get_success_blks=%"PRIu64"\n",
1241                         sum.get_success_blks);
1242                 fprintf(f, "    get_fail_blks=%"PRIu64"\n", sum.get_fail_blks);
1243         }
1244 #else
1245         fprintf(f, "  no statistics available\n");
1246 #endif
1247
1248         rte_mempool_audit(mp);
1249 }
1250
1251 /* dump the status of all mempools on the console */
1252 void
1253 rte_mempool_list_dump(FILE *f)
1254 {
1255         struct rte_mempool *mp = NULL;
1256         struct rte_tailq_entry *te;
1257         struct rte_mempool_list *mempool_list;
1258
1259         mempool_list = RTE_TAILQ_CAST(rte_mempool_tailq.head, rte_mempool_list);
1260
1261         rte_rwlock_read_lock(RTE_EAL_MEMPOOL_RWLOCK);
1262
1263         TAILQ_FOREACH(te, mempool_list, next) {
1264                 mp = (struct rte_mempool *) te->data;
1265                 rte_mempool_dump(f, mp);
1266         }
1267
1268         rte_rwlock_read_unlock(RTE_EAL_MEMPOOL_RWLOCK);
1269 }
1270
1271 /* search a mempool from its name */
1272 struct rte_mempool *
1273 rte_mempool_lookup(const char *name)
1274 {
1275         struct rte_mempool *mp = NULL;
1276         struct rte_tailq_entry *te;
1277         struct rte_mempool_list *mempool_list;
1278
1279         mempool_list = RTE_TAILQ_CAST(rte_mempool_tailq.head, rte_mempool_list);
1280
1281         rte_rwlock_read_lock(RTE_EAL_MEMPOOL_RWLOCK);
1282
1283         TAILQ_FOREACH(te, mempool_list, next) {
1284                 mp = (struct rte_mempool *) te->data;
1285                 if (strncmp(name, mp->name, RTE_MEMPOOL_NAMESIZE) == 0)
1286                         break;
1287         }
1288
1289         rte_rwlock_read_unlock(RTE_EAL_MEMPOOL_RWLOCK);
1290
1291         if (te == NULL) {
1292                 rte_errno = ENOENT;
1293                 return NULL;
1294         }
1295
1296         return mp;
1297 }
1298
1299 void rte_mempool_walk(void (*func)(struct rte_mempool *, void *),
1300                       void *arg)
1301 {
1302         struct rte_tailq_entry *te = NULL;
1303         struct rte_mempool_list *mempool_list;
1304         void *tmp_te;
1305
1306         mempool_list = RTE_TAILQ_CAST(rte_mempool_tailq.head, rte_mempool_list);
1307
1308         rte_rwlock_read_lock(RTE_EAL_MEMPOOL_RWLOCK);
1309
1310         TAILQ_FOREACH_SAFE(te, mempool_list, next, tmp_te) {
1311                 (*func)((struct rte_mempool *) te->data, arg);
1312         }
1313
1314         rte_rwlock_read_unlock(RTE_EAL_MEMPOOL_RWLOCK);
1315 }