From: Intel Date: Wed, 19 Dec 2012 23:00:00 +0000 (+0100) Subject: mempool: cache optimisations X-Git-Tag: spdx-start~11386 X-Git-Url: http://git.droids-corp.org/?a=commitdiff_plain;h=ea5dd2744b90b330f07fd10f327ab99ef55c7266;p=dpdk.git mempool: cache optimisations Signed-off-by: Intel --- diff --git a/lib/librte_mempool/rte_mempool.c b/lib/librte_mempool/rte_mempool.c index 77157801fb..5db7e2e748 100644 --- a/lib/librte_mempool/rte_mempool.c +++ b/lib/librte_mempool/rte_mempool.c @@ -61,6 +61,7 @@ TAILQ_HEAD(rte_mempool_list, rte_mempool); +#define CACHE_FLUSHTHRESH_MULTIPLIER 1.5 /* * return the greatest common divisor between a and b (fast algorithm) @@ -252,11 +253,11 @@ rte_mempool_create(const char *name, unsigned n, unsigned elt_size, mp->ring = r; mp->size = n; mp->flags = flags; - mp->bulk_default = 1; mp->elt_size = elt_size; mp->header_size = header_size; mp->trailer_size = trailer_size; mp->cache_size = cache_size; + mp->cache_flushthresh = (uint32_t)(cache_size * CACHE_FLUSHTHRESH_MULTIPLIER); mp->private_data_size = private_data_size; /* call the initializer */ @@ -379,7 +380,7 @@ mempool_audit_cache(const struct rte_mempool *mp) /* check cache size consistency */ unsigned lcore_id; for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) { - if (mp->local_cache[lcore_id].len > mp->cache_size) { + if (mp->local_cache[lcore_id].len > mp->cache_flushthresh) { RTE_LOG(CRIT, MEMPOOL, "badness on cache[%u]\n", lcore_id); rte_panic("MEMPOOL: invalid cache len\n"); @@ -414,7 +415,6 @@ rte_mempool_dump(const struct rte_mempool *mp) printf(" flags=%x\n", mp->flags); printf(" ring=<%s>@%p\n", mp->ring->name, mp->ring); printf(" size=%"PRIu32"\n", mp->size); - printf(" bulk_default=%"PRIu32"\n", mp->bulk_default); printf(" header_size=%"PRIu32"\n", mp->header_size); printf(" elt_size=%"PRIu32"\n", mp->elt_size); printf(" trailer_size=%"PRIu32"\n", mp->trailer_size); diff --git a/lib/librte_mempool/rte_mempool.h b/lib/librte_mempool/rte_mempool.h index d4a5c65cf0..a42c215a0a 100644 --- a/lib/librte_mempool/rte_mempool.h +++ b/lib/librte_mempool/rte_mempool.h @@ -68,8 +68,8 @@ #include #include -#include #include +#include #include #include @@ -101,7 +101,11 @@ struct rte_mempool_debug_stats { */ struct rte_mempool_cache { unsigned len; /**< Cache len */ - void *objs[RTE_MEMPOOL_CACHE_MAX_SIZE]; /**< Cache objects */ + /* + * Cache is allocated to this size to allow it to overflow in certain + * cases to avoid needless emptying of cache. + */ + void *objs[RTE_MEMPOOL_CACHE_MAX_SIZE * 3]; /**< Cache objects */ } __rte_cache_aligned; #endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */ @@ -118,8 +122,8 @@ struct rte_mempool { phys_addr_t phys_addr; /**< Phys. addr. of mempool struct. */ int flags; /**< Flags of the mempool. */ uint32_t size; /**< Size of the mempool. */ - uint32_t bulk_default; /**< Default bulk count. */ uint32_t cache_size; /**< Size of per-lcore local cache. */ + uint32_t cache_flushthresh; /**< Threshold before we flush excess elements. */ uint32_t elt_size; /**< Size of an element. */ uint32_t header_size; /**< Size of header (before elt). */ @@ -144,7 +148,7 @@ struct rte_mempool { #define MEMPOOL_F_SC_GET 0x0008 /**< Default get is "single-consumer".*/ /** - * When debug is enabled, store some statistics. + * @internal When debug is enabled, store some statistics. * @param mp * Pointer to the memory pool. * @param name @@ -163,7 +167,7 @@ struct rte_mempool { #endif /** - * Get a pointer to a mempool pointer in the object header. + * @internal Get a pointer to a mempool pointer in the object header. * @param obj * Pointer to object. * @return @@ -235,7 +239,7 @@ static inline void __mempool_write_trailer_cookie(void *obj) #endif /* RTE_LIBRTE_MEMPOOL_DEBUG */ /** - * Check and update cookies or panic. + * @internal Check and update cookies or panic. * * @param mp * Pointer to the memory pool. @@ -344,10 +348,7 @@ typedef void (rte_mempool_ctor_t)(struct rte_mempool *, void *); * Creates a new mempool named *name* in memory. * * This function uses ``memzone_reserve()`` to allocate memory. The - * pool contains n elements of elt_size. Its size is set to n. By - * default, bulk_default_count (the default number of elements to - * get/put in the pool) is set to 1. @see rte_mempool_set_bulk_count() - * to modify this valule. + * pool contains n elements of elt_size. Its size is set to n. * * @param name * The name of the mempool. @@ -430,45 +431,6 @@ rte_mempool_create(const char *name, unsigned n, unsigned elt_size, rte_mempool_obj_ctor_t *obj_init, void *obj_init_arg, int socket_id, unsigned flags); -/** - * Set the default bulk count for put/get. - * - * The *count* parameter is the default number of bulk elements to - * get/put when using ``rte_mempool_*_{en,de}queue_bulk()``. It must - * be greater than 0 and less than half of the mempool size. - * - * @param mp - * A pointer to the mempool structure. - * @param count - * A new water mark value. - * @return - * - 0: Success; default_bulk_count changed. - * - -EINVAL: Invalid count value. - */ -static inline int -rte_mempool_set_bulk_count(struct rte_mempool *mp, unsigned count) -{ - if (unlikely(count == 0 || count >= mp->size)) - return -EINVAL; - - mp->bulk_default = count; - return 0; -} - -/** - * Get the default bulk count for put/get. - * - * @param mp - * A pointer to the mempool structure. - * @return - * The default bulk count for enqueue/dequeue. - */ -static inline unsigned -rte_mempool_get_bulk_count(struct rte_mempool *mp) -{ - return mp->bulk_default; -} - /** * Dump the status of the mempool to the console. * @@ -495,11 +457,11 @@ __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table, { #if RTE_MEMPOOL_CACHE_MAX_SIZE > 0 struct rte_mempool_cache *cache; - uint32_t cache_len; + uint32_t index; void **cache_objs; unsigned lcore_id = rte_lcore_id(); uint32_t cache_size = mp->cache_size; - uint32_t cache_add_count; + uint32_t flushthresh = mp->cache_flushthresh; #endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */ /* increment stat now, adding in mempool always success */ @@ -510,52 +472,35 @@ __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table, if (unlikely(cache_size == 0 || is_mp == 0)) goto ring_enqueue; - cache = &mp->local_cache[lcore_id]; - cache_len = cache->len; - cache_objs = cache->objs; - - /* cache is full and we add many objects: enqueue in ring */ - if (unlikely(cache_len == cache_size && n >= cache_size)) + /* Go straight to ring if put would overflow mem allocated for cache */ + if (unlikely(n > RTE_MEMPOOL_CACHE_MAX_SIZE)) goto ring_enqueue; + cache = &mp->local_cache[lcore_id]; + cache_objs = &cache->objs[cache->len]; + /* - * cache is full and we add few objects: enqueue the content - * of the cache in ring + * The cache follows the following algorithm + * 1. Add the objects to the cache + * 2. Anything greater than the cache min value (if it crosses the + * cache flush threshold) is flushed to the ring. */ - if (unlikely(cache_len == cache_size)) { -#ifdef RTE_LIBRTE_MEMPOOL_DEBUG - if (rte_ring_mp_enqueue_bulk(mp->ring, cache->objs, - cache_size) < 0) - rte_panic("cannot put objects in mempool\n"); -#else - rte_ring_mp_enqueue_bulk(mp->ring, cache->objs, - cache_size); -#endif - cache_len = 0; - } - /* determine how many objects we can add in cache */ - if (likely(n <= cache_size - cache_len)) - cache_add_count = n; - else - cache_add_count = cache_size - cache_len; - - /* add in cache while there is enough room */ - while (cache_add_count > 0) { - cache_objs[cache_len] = *obj_table; - obj_table++; - cache_len++; - n--; - cache_add_count--; - } + /* Add elements back into the cache */ + for (index = 0; index < n; ++index, obj_table++) + cache_objs[index] = *obj_table; + + cache->len += n; - cache->len = cache_len; + if (cache->len >= flushthresh) { + rte_ring_mp_enqueue_bulk(mp->ring, &cache->objs[cache_size], + cache->len - cache_size); + cache->len = cache_size; + } - /* no more object to add, return */ - if (likely(n == 0)) - return; + return; - ring_enqueue: +ring_enqueue: #endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */ /* push remaining objects in ring */ @@ -705,62 +650,50 @@ __mempool_get_bulk(struct rte_mempool *mp, void **obj_table, #endif #if RTE_MEMPOOL_CACHE_MAX_SIZE > 0 struct rte_mempool_cache *cache; - uint32_t cache_len, cache_len_save = 0; + uint32_t index, len; void **cache_objs; unsigned lcore_id = rte_lcore_id(); uint32_t cache_size = mp->cache_size; - uint32_t cache_del_count; cache = &mp->local_cache[lcore_id]; /* cache is not enabled or single consumer */ - if (unlikely(cache_size == 0 || is_mc == 0)) + if (unlikely(cache_size == 0 || is_mc == 0 || n >= cache_size)) goto ring_dequeue; - cache_len = cache->len; cache_objs = cache->objs; - /* cache is empty and we need many objects: dequeue from ring */ - if (unlikely(cache_len == 0 && n >= cache_size)) - goto ring_dequeue; + /* Can this be satisfied from the cache? */ + if (cache->len < n) { + /* No. Backfill the cache first, and then fill from it */ + uint32_t req = n + (cache_size - cache->len); - /* cache is empty and we dequeue few objects: fill the cache first */ - if (unlikely(cache_len == 0 && n < cache_size)) { - ret = rte_ring_mc_dequeue_bulk(mp->ring, cache_objs, - cache_size); + /* How many do we require i.e. number to fill the cache + the request */ + ret = rte_ring_mc_dequeue_bulk(mp->ring, &cache->objs[cache->len], req); if (unlikely(ret < 0)) { - __MEMPOOL_STAT_ADD(mp, get_fail, n_orig); - return ret; + /* + * In the offchance that we are buffer constrained, + * where we are not able to allocate cache + n, go to + * the ring directly. If that fails, we are truly out of + * buffers. + */ + goto ring_dequeue; } - cache_len = cache_size; + cache->len += req; } - if (likely(n <= cache_len)) - cache_del_count = n; - else - cache_del_count = cache_len; + /* Now fill in the response ... */ + for (index = 0, len = cache->len - 1; index < n; ++index, len--, obj_table++) + *obj_table = cache_objs[len]; - cache_len_save = cache_len; + cache->len -= n; - /* add in cache only while there is enough room */ - while (cache_del_count > 0) { - cache_len--; - *obj_table = cache_objs[cache_len]; - obj_table++; - n--; - cache_del_count--; - } + __MEMPOOL_STAT_ADD(mp, get_success, n_orig); - cache->len = cache_len; - - /* no more object to get, return */ - if (likely(n == 0)) { - __MEMPOOL_STAT_ADD(mp, get_success, n_orig); - return 0; - } + return 0; - ring_dequeue: +ring_dequeue: #endif /* RTE_MEMPOOL_CACHE_MAX_SIZE > 0 */ /* get remaining objects from ring */ @@ -769,15 +702,6 @@ __mempool_get_bulk(struct rte_mempool *mp, void **obj_table, else ret = rte_ring_sc_dequeue_bulk(mp->ring, obj_table, n); -#if RTE_MEMPOOL_CACHE_MAX_SIZE > 0 - /* - * bad luck, the ring is empty but we already dequeued some - * entries from cache, we have to restore them - */ - if (unlikely(ret < 0 && cache_len_save != 0)) - cache->len = cache_len_save; -#endif - if (ret < 0) __MEMPOOL_STAT_ADD(mp, get_fail, n_orig); else