common/sfc_efx/base: add flag to use Rx prefix user flag
[dpdk.git] / drivers / mempool / bucket / rte_mempool_bucket.c
index ef822eb..c0b480b 100644 (file)
@@ -42,6 +42,7 @@ struct bucket_data {
        unsigned int header_size;
        unsigned int total_elt_size;
        unsigned int obj_per_bucket;
+       unsigned int bucket_stack_thresh;
        uintptr_t bucket_page_mask;
        struct rte_ring *shared_bucket_ring;
        struct bucket_stack *buckets[RTE_MAX_LCORE];
@@ -54,6 +55,7 @@ struct bucket_data {
        struct rte_ring *shared_orphan_ring;
        struct rte_mempool *pool;
        unsigned int bucket_mem_size;
+       void *lcore_callback_handle;
 };
 
 static struct bucket_stack *
@@ -139,6 +141,7 @@ bucket_enqueue(struct rte_mempool *mp, void * const *obj_table,
               unsigned int n)
 {
        struct bucket_data *bd = mp->pool_data;
+       struct bucket_stack *local_stack = bd->buckets[rte_lcore_id()];
        unsigned int i;
        int rc = 0;
 
@@ -146,6 +149,15 @@ bucket_enqueue(struct rte_mempool *mp, void * const *obj_table,
                rc = bucket_enqueue_single(bd, obj_table[i]);
                RTE_ASSERT(rc == 0);
        }
+       if (local_stack->top > bd->bucket_stack_thresh) {
+               rte_ring_enqueue_bulk(bd->shared_bucket_ring,
+                                     &local_stack->objects
+                                     [bd->bucket_stack_thresh],
+                                     local_stack->top -
+                                     bd->bucket_stack_thresh,
+                                     NULL);
+           local_stack->top = bd->bucket_stack_thresh;
+       }
        return rc;
 }
 
@@ -294,6 +306,63 @@ bucket_dequeue(struct rte_mempool *mp, void **obj_table, unsigned int n)
        return rc;
 }
 
+static int
+bucket_dequeue_contig_blocks(struct rte_mempool *mp, void **first_obj_table,
+                            unsigned int n)
+{
+       struct bucket_data *bd = mp->pool_data;
+       const uint32_t header_size = bd->header_size;
+       struct bucket_stack *cur_stack = bd->buckets[rte_lcore_id()];
+       unsigned int n_buckets_from_stack = RTE_MIN(n, cur_stack->top);
+       struct bucket_header *hdr;
+       void **first_objp = first_obj_table;
+
+       bucket_adopt_orphans(bd);
+
+       n -= n_buckets_from_stack;
+       while (n_buckets_from_stack-- > 0) {
+               hdr = bucket_stack_pop_unsafe(cur_stack);
+               *first_objp++ = (uint8_t *)hdr + header_size;
+       }
+       if (n > 0) {
+               if (unlikely(rte_ring_dequeue_bulk(bd->shared_bucket_ring,
+                                                  first_objp, n, NULL) != n)) {
+                       /* Return the already dequeued buckets */
+                       while (first_objp-- != first_obj_table) {
+                               bucket_stack_push(cur_stack,
+                                                 (uint8_t *)*first_objp -
+                                                 header_size);
+                       }
+                       rte_errno = ENOBUFS;
+                       return -rte_errno;
+               }
+               while (n-- > 0) {
+                       hdr = (struct bucket_header *)*first_objp;
+                       hdr->lcore_id = rte_lcore_id();
+                       *first_objp++ = (uint8_t *)hdr + header_size;
+               }
+       }
+
+       return 0;
+}
+
+struct bucket_count_per_lcore_ctx {
+       const struct bucket_data *bd;
+       unsigned int count;
+};
+
+static int
+bucket_count_per_lcore(unsigned int lcore_id, void *arg)
+{
+       struct bucket_count_per_lcore_ctx *bplc = arg;
+
+       bplc->count += bplc->bd->obj_per_bucket *
+               bplc->bd->buckets[lcore_id]->top;
+       bplc->count +=
+               rte_ring_count(bplc->bd->adoption_buffer_rings[lcore_id]);
+       return 0;
+}
+
 static void
 count_underfilled_buckets(struct rte_mempool *mp,
                          void *opaque,
@@ -322,23 +391,64 @@ count_underfilled_buckets(struct rte_mempool *mp,
 static unsigned int
 bucket_get_count(const struct rte_mempool *mp)
 {
-       const struct bucket_data *bd = mp->pool_data;
-       unsigned int count =
-               bd->obj_per_bucket * rte_ring_count(bd->shared_bucket_ring) +
-               rte_ring_count(bd->shared_orphan_ring);
-       unsigned int i;
+       struct bucket_count_per_lcore_ctx bplc;
 
-       for (i = 0; i < RTE_MAX_LCORE; i++) {
-               if (!rte_lcore_is_enabled(i))
-                       continue;
-               count += bd->obj_per_bucket * bd->buckets[i]->top +
-                       rte_ring_count(bd->adoption_buffer_rings[i]);
-       }
+       bplc.bd = mp->pool_data;
+       bplc.count = bplc.bd->obj_per_bucket *
+               rte_ring_count(bplc.bd->shared_bucket_ring);
+       bplc.count += rte_ring_count(bplc.bd->shared_orphan_ring);
 
+       rte_lcore_iterate(bucket_count_per_lcore, &bplc);
        rte_mempool_mem_iter((struct rte_mempool *)(uintptr_t)mp,
-                            count_underfilled_buckets, &count);
+                            count_underfilled_buckets, &bplc.count);
+
+       return bplc.count;
+}
+
+static int
+bucket_init_per_lcore(unsigned int lcore_id, void *arg)
+{
+       char rg_name[RTE_RING_NAMESIZE];
+       struct bucket_data *bd = arg;
+       struct rte_mempool *mp;
+       int rg_flags;
+       int rc;
+
+       mp = bd->pool;
+       bd->buckets[lcore_id] = bucket_stack_create(mp,
+               mp->size / bd->obj_per_bucket);
+       if (bd->buckets[lcore_id] == NULL)
+               goto error;
+
+       rc = snprintf(rg_name, sizeof(rg_name), RTE_MEMPOOL_MZ_FORMAT ".a%u",
+               mp->name, lcore_id);
+       if (rc < 0 || rc >= (int)sizeof(rg_name))
+               goto error;
+
+       rg_flags = RING_F_SC_DEQ;
+       if (mp->flags & RTE_MEMPOOL_F_SP_PUT)
+               rg_flags |= RING_F_SP_ENQ;
+       bd->adoption_buffer_rings[lcore_id] = rte_ring_create(rg_name,
+               rte_align32pow2(mp->size + 1), mp->socket_id, rg_flags);
+       if (bd->adoption_buffer_rings[lcore_id] == NULL)
+               goto error;
+
+       return 0;
+error:
+       rte_free(bd->buckets[lcore_id]);
+       bd->buckets[lcore_id] = NULL;
+       return -1;
+}
+
+static void
+bucket_uninit_per_lcore(unsigned int lcore_id, void *arg)
+{
+       struct bucket_data *bd = arg;
 
-       return count;
+       rte_ring_free(bd->adoption_buffer_rings[lcore_id]);
+       bd->adoption_buffer_rings[lcore_id] = NULL;
+       rte_free(bd->buckets[lcore_id]);
+       bd->buckets[lcore_id] = NULL;
 }
 
 static int
@@ -348,8 +458,12 @@ bucket_alloc(struct rte_mempool *mp)
        int rc = 0;
        char rg_name[RTE_RING_NAMESIZE];
        struct bucket_data *bd;
-       unsigned int i;
        unsigned int bucket_header_size;
+       size_t pg_sz;
+
+       rc = rte_mempool_get_page_size(mp, &pg_sz);
+       if (rc < 0)
+               return rc;
 
        bd = rte_zmalloc_socket("bucket_pool", sizeof(*bd),
                                RTE_CACHE_LINE_SIZE, mp->socket_id);
@@ -358,48 +472,32 @@ bucket_alloc(struct rte_mempool *mp)
                goto no_mem_for_data;
        }
        bd->pool = mp;
-       if (mp->flags & MEMPOOL_F_NO_CACHE_ALIGN)
+       if (mp->flags & RTE_MEMPOOL_F_NO_CACHE_ALIGN)
                bucket_header_size = sizeof(struct bucket_header);
        else
                bucket_header_size = RTE_CACHE_LINE_SIZE;
        RTE_BUILD_BUG_ON(sizeof(struct bucket_header) > RTE_CACHE_LINE_SIZE);
        bd->header_size = mp->header_size + bucket_header_size;
        bd->total_elt_size = mp->header_size + mp->elt_size + mp->trailer_size;
-       bd->bucket_mem_size = RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB * 1024;
+       bd->bucket_mem_size = RTE_MIN(pg_sz,
+                       (size_t)(RTE_DRIVER_MEMPOOL_BUCKET_SIZE_KB * 1024));
        bd->obj_per_bucket = (bd->bucket_mem_size - bucket_header_size) /
                bd->total_elt_size;
        bd->bucket_page_mask = ~(rte_align64pow2(bd->bucket_mem_size) - 1);
+       /* eventually this should be a tunable parameter */
+       bd->bucket_stack_thresh = (mp->size / bd->obj_per_bucket) * 4 / 3;
 
-       if (mp->flags & MEMPOOL_F_SP_PUT)
-               rg_flags |= RING_F_SP_ENQ;
-       if (mp->flags & MEMPOOL_F_SC_GET)
-               rg_flags |= RING_F_SC_DEQ;
-
-       for (i = 0; i < RTE_MAX_LCORE; i++) {
-               if (!rte_lcore_is_enabled(i))
-                       continue;
-               bd->buckets[i] =
-                       bucket_stack_create(mp, mp->size / bd->obj_per_bucket);
-               if (bd->buckets[i] == NULL) {
-                       rc = -ENOMEM;
-                       goto no_mem_for_stacks;
-               }
-               rc = snprintf(rg_name, sizeof(rg_name),
-                             RTE_MEMPOOL_MZ_FORMAT ".a%u", mp->name, i);
-               if (rc < 0 || rc >= (int)sizeof(rg_name)) {
-                       rc = -ENAMETOOLONG;
-                       goto no_mem_for_stacks;
-               }
-               bd->adoption_buffer_rings[i] =
-                       rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
-                                       mp->socket_id,
-                                       rg_flags | RING_F_SC_DEQ);
-               if (bd->adoption_buffer_rings[i] == NULL) {
-                       rc = -rte_errno;
-                       goto no_mem_for_stacks;
-               }
+       bd->lcore_callback_handle = rte_lcore_callback_register("bucket",
+               bucket_init_per_lcore, bucket_uninit_per_lcore, bd);
+       if (bd->lcore_callback_handle == NULL) {
+               rc = -ENOMEM;
+               goto no_mem_for_stacks;
        }
 
+       if (mp->flags & RTE_MEMPOOL_F_SP_PUT)
+               rg_flags |= RING_F_SP_ENQ;
+       if (mp->flags & RTE_MEMPOOL_F_SC_GET)
+               rg_flags |= RING_F_SC_DEQ;
        rc = snprintf(rg_name, sizeof(rg_name),
                      RTE_MEMPOOL_MZ_FORMAT ".0", mp->name);
        if (rc < 0 || rc >= (int)sizeof(rg_name)) {
@@ -439,11 +537,8 @@ invalid_shared_bucket_ring:
        rte_ring_free(bd->shared_orphan_ring);
 cannot_create_shared_orphan_ring:
 invalid_shared_orphan_ring:
+       rte_lcore_callback_unregister(bd->lcore_callback_handle);
 no_mem_for_stacks:
-       for (i = 0; i < RTE_MAX_LCORE; i++) {
-               rte_free(bd->buckets[i]);
-               rte_ring_free(bd->adoption_buffer_rings[i]);
-       }
        rte_free(bd);
 no_mem_for_data:
        rte_errno = -rc;
@@ -453,16 +548,12 @@ no_mem_for_data:
 static void
 bucket_free(struct rte_mempool *mp)
 {
-       unsigned int i;
        struct bucket_data *bd = mp->pool_data;
 
        if (bd == NULL)
                return;
 
-       for (i = 0; i < RTE_MAX_LCORE; i++) {
-               rte_free(bd->buckets[i]);
-               rte_ring_free(bd->adoption_buffer_rings[i]);
-       }
+       rte_lcore_callback_unregister(bd->lcore_callback_handle);
 
        rte_ring_free(bd->shared_orphan_ring);
        rte_ring_free(bd->shared_bucket_ring);
@@ -532,7 +623,7 @@ bucket_populate(struct rte_mempool *mp, unsigned int max_objs,
 
                hdr->fill_cnt = 0;
                hdr->lcore_id = LCORE_ID_ANY;
-               rc = rte_mempool_op_populate_default(mp,
+               rc = rte_mempool_op_populate_helper(mp, 0,
                                                     RTE_MIN(bd->obj_per_bucket,
                                                             max_objs - n_objs),
                                                     iter + bucket_header_sz,
@@ -548,6 +639,16 @@ bucket_populate(struct rte_mempool *mp, unsigned int max_objs,
        return n_objs;
 }
 
+static int
+bucket_get_info(const struct rte_mempool *mp, struct rte_mempool_info *info)
+{
+       struct bucket_data *bd = mp->pool_data;
+
+       info->contig_block_size = bd->obj_per_bucket;
+       return 0;
+}
+
+
 static const struct rte_mempool_ops ops_bucket = {
        .name = "bucket",
        .alloc = bucket_alloc,
@@ -557,7 +658,9 @@ static const struct rte_mempool_ops ops_bucket = {
        .get_count = bucket_get_count,
        .calc_mem_size = bucket_calc_mem_size,
        .populate = bucket_populate,
+       .get_info = bucket_get_info,
+       .dequeue_contig_blocks = bucket_dequeue_contig_blocks,
 };
 
 
-MEMPOOL_REGISTER_OPS(ops_bucket);
+RTE_MEMPOOL_REGISTER_OPS(ops_bucket);