mempool: support handler operations
authorDavid Hunt <david.hunt@intel.com>
Wed, 22 Jun 2016 09:27:27 +0000 (10:27 +0100)
committerThomas Monjalon <thomas.monjalon@6wind.com>
Fri, 24 Jun 2016 09:01:05 +0000 (11:01 +0200)
Until now, the objects stored in a mempool were internally stored in a
ring. This patch introduces the possibility to register external handlers
replacing the ring.

The default behavior remains unchanged, but calling the new function
rte_mempool_set_ops_byname() right after rte_mempool_create_empty() allows
the user to change the handler that will be used when populating
the mempool.

This patch also adds a set of default ops (function callbacks) based
on rte_ring.

Signed-off-by: David Hunt <david.hunt@intel.com>
Signed-off-by: Olivier Matz <olivier.matz@6wind.com>
Acked-by: Shreyansh Jain <shreyansh.jain@nxp.com>
Acked-by: Olivier Matz <olivier.matz@6wind.com>
app/test/test_mempool_perf.c
doc/guides/prog_guide/mempool_lib.rst
doc/guides/rel_notes/deprecation.rst
lib/librte_mempool/Makefile
lib/librte_mempool/rte_mempool.c
lib/librte_mempool/rte_mempool.h
lib/librte_mempool/rte_mempool_ops.c [new file with mode: 0644]
lib/librte_mempool/rte_mempool_ring.c [new file with mode: 0644]
lib/librte_mempool/rte_mempool_version.map

index c5e3576..c5f8455 100644 (file)
@@ -161,7 +161,6 @@ per_lcore_mempool_test(__attribute__((unused)) void *arg)
                                                           n_get_bulk);
                                if (unlikely(ret < 0)) {
                                        rte_mempool_dump(stdout, mp);
-                                       rte_ring_dump(stdout, mp->ring);
                                        /* in this case, objects are lost... */
                                        return -1;
                                }
index c3afc2e..1943fc4 100644 (file)
@@ -34,7 +34,8 @@ Mempool Library
 ===============
 
 A memory pool is an allocator of a fixed-sized object.
-In the DPDK, it is identified by name and uses a ring to store free objects.
+In the DPDK, it is identified by name and uses a mempool handler to store free objects.
+The default mempool handler is ring based.
 It provides some other optional services such as a per-core object cache and
 an alignment helper to ensure that objects are padded to spread them equally on all DRAM or DDR3 channels.
 
@@ -127,6 +128,35 @@ The maximum size of the cache is static and is defined at compilation time (CONF
    A mempool in Memory with its Associated Ring
 
 
+Mempool Handlers
+------------------------
+
+This allows external memory subsystems, such as external hardware memory
+management systems and software based memory allocators, to be used with DPDK.
+
+There are two aspects to a mempool handler.
+
+* Adding the code for your new mempool operations (ops). This is achieved by
+  adding a new mempool ops code, and using the ``REGISTER_MEMPOOL_OPS`` macro.
+
+* Using the new API to call ``rte_mempool_create_empty()`` and
+  ``rte_mempool_set_ops_byname()`` to create a new mempool and specifying which
+  ops to use.
+
+Several different mempool handlers may be used in the same application. A new
+mempool can be created by using the ``rte_mempool_create_empty()`` function,
+then using ``rte_mempool_set_ops_byname()`` to point the mempool to the
+relevant mempool handler callback (ops) structure.
+
+Legacy applications may continue to use the old ``rte_mempool_create()`` API
+call, which uses a ring based mempool handler by default. These applications
+will need to be modified to use a new mempool handler.
+
+For applications that use ``rte_pktmbuf_create()``, there is a config setting
+(``RTE_MBUF_DEFAULT_MEMPOOL_OPS``) that allows the application to make use of
+an alternative mempool handler.
+
+
 Use Cases
 ---------
 
index 9ab8da8..80f873d 100644 (file)
@@ -34,15 +34,6 @@ Deprecation Notices
   compact API. The ones that remain are backwards compatible and use the
   per-lcore default cache if available. This change targets release 16.07.
 
-* The rte_mempool struct will be changed in 16.07 to facilitate the new
-  external mempool manager functionality.
-  The ring element will be replaced with a more generic 'pool' opaque pointer
-  to allow new mempool handlers to use their own user-defined mempool
-  layout. Also newly added to rte_mempool is a handler index.
-  The existing API will be backward compatible, but there will be new API
-  functions added to facilitate the creation of mempools using an external
-  handler. The 16.07 release will contain these changes.
-
 * The mbuf flags PKT_RX_VLAN_PKT and PKT_RX_QINQ_PKT are deprecated and
   are respectively replaced by PKT_RX_VLAN_STRIPPED and
   PKT_RX_QINQ_STRIPPED, that are better described. The old flags and
index 43423e0..a4c089e 100644 (file)
@@ -42,6 +42,8 @@ LIBABIVER := 2
 
 # all source are stored in SRCS-y
 SRCS-$(CONFIG_RTE_LIBRTE_MEMPOOL) +=  rte_mempool.c
+SRCS-$(CONFIG_RTE_LIBRTE_MEMPOOL) +=  rte_mempool_ops.c
+SRCS-$(CONFIG_RTE_LIBRTE_MEMPOOL) +=  rte_mempool_ring.c
 # install includes
 SYMLINK-$(CONFIG_RTE_LIBRTE_MEMPOOL)-include := rte_mempool.h
 
index af71edd..e6a83d0 100644 (file)
@@ -148,7 +148,7 @@ mempool_add_elem(struct rte_mempool *mp, void *obj, phys_addr_t physaddr)
 #endif
 
        /* enqueue in ring */
-       rte_ring_sp_enqueue(mp->ring, obj);
+       rte_mempool_ops_enqueue_bulk(mp, &obj, 1);
 }
 
 /* call obj_cb() for each mempool element */
@@ -303,40 +303,6 @@ rte_mempool_xmem_usage(__rte_unused void *vaddr, uint32_t elt_num,
        return (size_t)paddr_idx << pg_shift;
 }
 
-/* create the internal ring */
-static int
-rte_mempool_ring_create(struct rte_mempool *mp)
-{
-       int rg_flags = 0, ret;
-       char rg_name[RTE_RING_NAMESIZE];
-       struct rte_ring *r;
-
-       ret = snprintf(rg_name, sizeof(rg_name),
-               RTE_MEMPOOL_MZ_FORMAT, mp->name);
-       if (ret < 0 || ret >= (int)sizeof(rg_name))
-               return -ENAMETOOLONG;
-
-       /* ring flags */
-       if (mp->flags & MEMPOOL_F_SP_PUT)
-               rg_flags |= RING_F_SP_ENQ;
-       if (mp->flags & MEMPOOL_F_SC_GET)
-               rg_flags |= RING_F_SC_DEQ;
-
-       /* Allocate the ring that will be used to store objects.
-        * Ring functions will return appropriate errors if we are
-        * running as a secondary process etc., so no checks made
-        * in this function for that condition.
-        */
-       r = rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
-               mp->socket_id, rg_flags);
-       if (r == NULL)
-               return -rte_errno;
-
-       mp->ring = r;
-       mp->flags |= MEMPOOL_F_RING_CREATED;
-       return 0;
-}
-
 /* free a memchunk allocated with rte_memzone_reserve() */
 static void
 rte_mempool_memchunk_mz_free(__rte_unused struct rte_mempool_memhdr *memhdr,
@@ -354,7 +320,7 @@ rte_mempool_free_memchunks(struct rte_mempool *mp)
        void *elt;
 
        while (!STAILQ_EMPTY(&mp->elt_list)) {
-               rte_ring_sc_dequeue(mp->ring, &elt);
+               rte_mempool_ops_dequeue_bulk(mp, &elt, 1);
                (void)elt;
                STAILQ_REMOVE_HEAD(&mp->elt_list, next);
                mp->populated_size--;
@@ -386,10 +352,11 @@ rte_mempool_populate_phys(struct rte_mempool *mp, char *vaddr,
        int ret;
 
        /* create the internal ring if not already done */
-       if ((mp->flags & MEMPOOL_F_RING_CREATED) == 0) {
-               ret = rte_mempool_ring_create(mp);
-               if (ret < 0)
+       if ((mp->flags & MEMPOOL_F_POOL_CREATED) == 0) {
+               ret = rte_mempool_ops_alloc(mp);
+               if (ret != 0)
                        return ret;
+               mp->flags |= MEMPOOL_F_POOL_CREATED;
        }
 
        /* mempool is already populated */
@@ -703,7 +670,7 @@ rte_mempool_free(struct rte_mempool *mp)
        rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
 
        rte_mempool_free_memchunks(mp);
-       rte_ring_free(mp->ring);
+       rte_mempool_ops_free(mp);
        rte_memzone_free(mp->mz);
 }
 
@@ -815,6 +782,7 @@ rte_mempool_create_empty(const char *name, unsigned n, unsigned elt_size,
                RTE_PTR_ADD(mp, MEMPOOL_HEADER_SIZE(mp, 0));
 
        te->data = mp;
+
        rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);
        TAILQ_INSERT_TAIL(mempool_list, te, next);
        rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
@@ -844,6 +812,19 @@ rte_mempool_create(const char *name, unsigned n, unsigned elt_size,
        if (mp == NULL)
                return NULL;
 
+       /*
+        * Since we have 4 combinations of the SP/SC/MP/MC examine the flags to
+        * set the correct index into the table of ops structs.
+        */
+       if (flags & (MEMPOOL_F_SP_PUT | MEMPOOL_F_SC_GET))
+               rte_mempool_set_ops_byname(mp, "ring_sp_sc", NULL);
+       else if (flags & MEMPOOL_F_SP_PUT)
+               rte_mempool_set_ops_byname(mp, "ring_sp_mc", NULL);
+       else if (flags & MEMPOOL_F_SC_GET)
+               rte_mempool_set_ops_byname(mp, "ring_mp_sc", NULL);
+       else
+               rte_mempool_set_ops_byname(mp, "ring_mp_mc", NULL);
+
        /* call the mempool priv initializer */
        if (mp_init)
                mp_init(mp, mp_init_arg);
@@ -930,7 +911,7 @@ rte_mempool_count(const struct rte_mempool *mp)
        unsigned count;
        unsigned lcore_id;
 
-       count = rte_ring_count(mp->ring);
+       count = rte_mempool_ops_get_count(mp);
 
        if (mp->cache_size == 0)
                return count;
@@ -1119,7 +1100,7 @@ rte_mempool_dump(FILE *f, struct rte_mempool *mp)
 
        fprintf(f, "mempool <%s>@%p\n", mp->name, mp);
        fprintf(f, "  flags=%x\n", mp->flags);
-       fprintf(f, "  ring=<%s>@%p\n", mp->ring->name, mp->ring);
+       fprintf(f, "  pool=%p\n", mp->pool_data);
        fprintf(f, "  phys_addr=0x%" PRIx64 "\n", mp->mz->phys_addr);
        fprintf(f, "  nb_mem_chunks=%u\n", mp->nb_mem_chunks);
        fprintf(f, "  size=%"PRIu32"\n", mp->size);
@@ -1140,7 +1121,7 @@ rte_mempool_dump(FILE *f, struct rte_mempool *mp)
        }
 
        cache_count = rte_mempool_dump_cache(f, mp);
-       common_count = rte_ring_count(mp->ring);
+       common_count = rte_mempool_ops_get_count(mp);
        if ((cache_count + common_count) > mp->size)
                common_count = mp->size - cache_count;
        fprintf(f, "  common_pool_count=%u\n", common_count);
index 60339bd..0a1777c 100644 (file)
@@ -67,6 +67,7 @@
 #include <inttypes.h>
 #include <sys/queue.h>
 
+#include <rte_spinlock.h>
 #include <rte_log.h>
 #include <rte_debug.h>
 #include <rte_lcore.h>
@@ -203,10 +204,14 @@ struct rte_mempool_memhdr {
  */
 struct rte_mempool {
        char name[RTE_MEMPOOL_NAMESIZE]; /**< Name of mempool. */
-       struct rte_ring *ring;           /**< Ring to store objects. */
-       const struct rte_memzone *mz;    /**< Memzone where pool is allocated */
+       union {
+               void *pool_data;         /**< Ring or pool to store objects. */
+               uint64_t pool_id;        /**< External mempool identifier. */
+       };
+       void *pool_config;               /**< optional args for ops alloc. */
+       const struct rte_memzone *mz;    /**< Memzone where pool is alloc'd. */
        int flags;                       /**< Flags of the mempool. */
-       int socket_id;                   /**< Socket id passed at mempool creation. */
+       int socket_id;                   /**< Socket id passed at create. */
        uint32_t size;                   /**< Max size of the mempool. */
        uint32_t cache_size;             /**< Size of per-lcore local cache. */
        uint32_t cache_flushthresh;
@@ -217,6 +222,14 @@ struct rte_mempool {
        uint32_t trailer_size;           /**< Size of trailer (after elt). */
 
        unsigned private_data_size;      /**< Size of private data. */
+       /**
+        * Index into rte_mempool_ops_table array of mempool ops
+        * structs, which contain callback function pointers.
+        * We're using an index here rather than pointers to the callbacks
+        * to facilitate any secondary processes that may want to use
+        * this mempool.
+        */
+       int32_t ops_index;
 
        struct rte_mempool_cache *local_cache; /**< Per-lcore local cache */
 
@@ -235,7 +248,7 @@ struct rte_mempool {
 #define MEMPOOL_F_NO_CACHE_ALIGN 0x0002 /**< Do not align objs on cache lines.*/
 #define MEMPOOL_F_SP_PUT         0x0004 /**< Default put is "single-producer".*/
 #define MEMPOOL_F_SC_GET         0x0008 /**< Default get is "single-consumer".*/
-#define MEMPOOL_F_RING_CREATED   0x0010 /**< Internal: ring is created */
+#define MEMPOOL_F_POOL_CREATED   0x0010 /**< Internal: pool is created. */
 #define MEMPOOL_F_NO_PHYS_CONTIG 0x0020 /**< Don't need physically contiguous objs. */
 
 /**
@@ -325,6 +338,215 @@ void rte_mempool_check_cookies(const struct rte_mempool *mp,
 #define __mempool_check_cookies(mp, obj_table_const, n, free) do {} while(0)
 #endif /* RTE_LIBRTE_MEMPOOL_DEBUG */
 
+#define RTE_MEMPOOL_OPS_NAMESIZE 32 /**< Max length of ops struct name. */
+
+/**
+ * Prototype for implementation specific data provisioning function.
+ *
+ * The function should provide the implementation specific memory for
+ * for use by the other mempool ops functions in a given mempool ops struct.
+ * E.g. the default ops provides an instance of the rte_ring for this purpose.
+ * it will most likely point to a different type of data structure, and
+ * will be transparent to the application programmer.
+ * This function should set mp->pool_data.
+ */
+typedef int (*rte_mempool_alloc_t)(struct rte_mempool *mp);
+
+/**
+ * Free the opaque private data pointed to by mp->pool_data pointer.
+ */
+typedef void (*rte_mempool_free_t)(struct rte_mempool *mp);
+
+/**
+ * Enqueue an object into the external pool.
+ */
+typedef int (*rte_mempool_enqueue_t)(struct rte_mempool *mp,
+               void * const *obj_table, unsigned int n);
+
+/**
+ * Dequeue an object from the external pool.
+ */
+typedef int (*rte_mempool_dequeue_t)(struct rte_mempool *mp,
+               void **obj_table, unsigned int n);
+
+/**
+ * Return the number of available objects in the external pool.
+ */
+typedef unsigned (*rte_mempool_get_count)(const struct rte_mempool *mp);
+
+/** Structure defining mempool operations structure */
+struct rte_mempool_ops {
+       char name[RTE_MEMPOOL_OPS_NAMESIZE]; /**< Name of mempool ops struct. */
+       rte_mempool_alloc_t alloc;       /**< Allocate private data. */
+       rte_mempool_free_t free;         /**< Free the external pool. */
+       rte_mempool_enqueue_t enqueue;   /**< Enqueue an object. */
+       rte_mempool_dequeue_t dequeue;   /**< Dequeue an object. */
+       rte_mempool_get_count get_count; /**< Get qty of available objs. */
+} __rte_cache_aligned;
+
+#define RTE_MEMPOOL_MAX_OPS_IDX 16  /**< Max registered ops structs */
+
+/**
+ * Structure storing the table of registered ops structs, each of which contain
+ * the function pointers for the mempool ops functions.
+ * Each process has its own storage for this ops struct array so that
+ * the mempools can be shared across primary and secondary processes.
+ * The indices used to access the array are valid across processes, whereas
+ * any function pointers stored directly in the mempool struct would not be.
+ * This results in us simply having "ops_index" in the mempool struct.
+ */
+struct rte_mempool_ops_table {
+       rte_spinlock_t sl;     /**< Spinlock for add/delete. */
+       uint32_t num_ops;      /**< Number of used ops structs in the table. */
+       /**
+        * Storage for all possible ops structs.
+        */
+       struct rte_mempool_ops ops[RTE_MEMPOOL_MAX_OPS_IDX];
+} __rte_cache_aligned;
+
+/** Array of registered ops structs. */
+extern struct rte_mempool_ops_table rte_mempool_ops_table;
+
+/**
+ * @internal Get the mempool ops struct from its index.
+ *
+ * @param ops_index
+ *   The index of the ops struct in the ops struct table. It must be a valid
+ *   index: (0 <= idx < num_ops).
+ * @return
+ *   The pointer to the ops struct in the table.
+ */
+static inline struct rte_mempool_ops *
+rte_mempool_get_ops(int ops_index)
+{
+       RTE_VERIFY((ops_index >= 0) && (ops_index < RTE_MEMPOOL_MAX_OPS_IDX));
+
+       return &rte_mempool_ops_table.ops[ops_index];
+}
+
+/**
+ * @internal Wrapper for mempool_ops alloc callback.
+ *
+ * @param mp
+ *   Pointer to the memory pool.
+ * @return
+ *   - 0: Success; successfully allocated mempool pool_data.
+ *   - <0: Error; code of alloc function.
+ */
+int
+rte_mempool_ops_alloc(struct rte_mempool *mp);
+
+/**
+ * @internal Wrapper for mempool_ops dequeue callback.
+ *
+ * @param mp
+ *   Pointer to the memory pool.
+ * @param obj_table
+ *   Pointer to a table of void * pointers (objects).
+ * @param n
+ *   Number of objects to get.
+ * @return
+ *   - 0: Success; got n objects.
+ *   - <0: Error; code of dequeue function.
+ */
+static inline int
+rte_mempool_ops_dequeue_bulk(struct rte_mempool *mp,
+               void **obj_table, unsigned n)
+{
+       struct rte_mempool_ops *ops;
+
+       ops = rte_mempool_get_ops(mp->ops_index);
+       return ops->dequeue(mp, obj_table, n);
+}
+
+/**
+ * @internal wrapper for mempool_ops enqueue callback.
+ *
+ * @param mp
+ *   Pointer to the memory pool.
+ * @param obj_table
+ *   Pointer to a table of void * pointers (objects).
+ * @param n
+ *   Number of objects to put.
+ * @return
+ *   - 0: Success; n objects supplied.
+ *   - <0: Error; code of enqueue function.
+ */
+static inline int
+rte_mempool_ops_enqueue_bulk(struct rte_mempool *mp, void * const *obj_table,
+               unsigned n)
+{
+       struct rte_mempool_ops *ops;
+
+       ops = rte_mempool_get_ops(mp->ops_index);
+       return ops->enqueue(mp, obj_table, n);
+}
+
+/**
+ * @internal wrapper for mempool_ops get_count callback.
+ *
+ * @param mp
+ *   Pointer to the memory pool.
+ * @return
+ *   The number of available objects in the external pool.
+ */
+unsigned
+rte_mempool_ops_get_count(const struct rte_mempool *mp);
+
+/**
+ * @internal wrapper for mempool_ops free callback.
+ *
+ * @param mp
+ *   Pointer to the memory pool.
+ */
+void
+rte_mempool_ops_free(struct rte_mempool *mp);
+
+/**
+ * Set the ops of a mempool.
+ *
+ * This can only be done on a mempool that is not populated, i.e. just after
+ * a call to rte_mempool_create_empty().
+ *
+ * @param mp
+ *   Pointer to the memory pool.
+ * @param name
+ *   Name of the ops structure to use for this mempool.
+ * @param pool_config
+ *   Opaque data that can be passed by the application to the ops functions.
+ * @return
+ *   - 0: Success; the mempool is now using the requested ops functions.
+ *   - -EINVAL - Invalid ops struct name provided.
+ *   - -EEXIST - mempool already has an ops struct assigned.
+ */
+int
+rte_mempool_set_ops_byname(struct rte_mempool *mp, const char *name,
+               void *pool_config);
+
+/**
+ * Register mempool operations.
+ *
+ * @param ops
+ *   Pointer to an ops structure to register.
+ * @return
+ *   - >=0: Success; return the index of the ops struct in the table.
+ *   - -EINVAL - some missing callbacks while registering ops struct.
+ *   - -ENOSPC - the maximum number of ops structs has been reached.
+ */
+int rte_mempool_register_ops(const struct rte_mempool_ops *ops);
+
+/**
+ * Macro to statically register the ops of a mempool handler.
+ * Note that the rte_mempool_register_ops fails silently here when
+ * more then RTE_MEMPOOL_MAX_OPS_IDX is registered.
+ */
+#define MEMPOOL_REGISTER_OPS(ops)                                      \
+       void mp_hdlr_init_##ops(void);                                  \
+       void __attribute__((constructor, used)) mp_hdlr_init_##ops(void)\
+       {                                                               \
+               rte_mempool_register_ops(&ops);                 \
+       }
+
 /**
  * An object callback function for mempool.
  *
@@ -774,7 +996,7 @@ __mempool_put_bulk(struct rte_mempool *mp, void * const *obj_table,
        cache->len += n;
 
        if (cache->len >= flushthresh) {
-               rte_ring_mp_enqueue_bulk(mp->ring, &cache->objs[cache_size],
+               rte_mempool_ops_enqueue_bulk(mp, &cache->objs[cache_size],
                                cache->len - cache_size);
                cache->len = cache_size;
        }
@@ -785,19 +1007,10 @@ ring_enqueue:
 
        /* push remaining objects in ring */
 #ifdef RTE_LIBRTE_MEMPOOL_DEBUG
-       if (is_mp) {
-               if (rte_ring_mp_enqueue_bulk(mp->ring, obj_table, n) < 0)
-                       rte_panic("cannot put objects in mempool\n");
-       }
-       else {
-               if (rte_ring_sp_enqueue_bulk(mp->ring, obj_table, n) < 0)
-                       rte_panic("cannot put objects in mempool\n");
-       }
+       if (rte_mempool_ops_enqueue_bulk(mp, obj_table, n) < 0)
+               rte_panic("cannot put objects in mempool\n");
 #else
-       if (is_mp)
-               rte_ring_mp_enqueue_bulk(mp->ring, obj_table, n);
-       else
-               rte_ring_sp_enqueue_bulk(mp->ring, obj_table, n);
+       rte_mempool_ops_enqueue_bulk(mp, obj_table, n);
 #endif
 }
 
@@ -945,7 +1158,8 @@ __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
                uint32_t req = n + (cache_size - cache->len);
 
                /* How many do we require i.e. number to fill the cache + the request */
-               ret = rte_ring_mc_dequeue_bulk(mp->ring, &cache->objs[cache->len], req);
+               ret = rte_mempool_ops_dequeue_bulk(mp,
+                       &cache->objs[cache->len], req);
                if (unlikely(ret < 0)) {
                        /*
                         * In the offchance that we are buffer constrained,
@@ -972,10 +1186,7 @@ __mempool_get_bulk(struct rte_mempool *mp, void **obj_table,
 ring_dequeue:
 
        /* get remaining objects from ring */
-       if (is_mc)
-               ret = rte_ring_mc_dequeue_bulk(mp->ring, obj_table, n);
-       else
-               ret = rte_ring_sc_dequeue_bulk(mp->ring, obj_table, n);
+       ret = rte_mempool_ops_dequeue_bulk(mp, obj_table, n);
 
        if (ret < 0)
                __MEMPOOL_STAT_ADD(mp, get_fail, n);
diff --git a/lib/librte_mempool/rte_mempool_ops.c b/lib/librte_mempool/rte_mempool_ops.c
new file mode 100644 (file)
index 0000000..fd0b64c
--- /dev/null
@@ -0,0 +1,151 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2016 Intel Corporation. All rights reserved.
+ *   Copyright(c) 2016 6WIND S.A.
+ *   All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <string.h>
+
+#include <rte_mempool.h>
+#include <rte_errno.h>
+
+/* indirect jump table to support external memory pools. */
+struct rte_mempool_ops_table rte_mempool_ops_table = {
+       .sl =  RTE_SPINLOCK_INITIALIZER,
+       .num_ops = 0
+};
+
+/* add a new ops struct in rte_mempool_ops_table, return its index. */
+int
+rte_mempool_register_ops(const struct rte_mempool_ops *h)
+{
+       struct rte_mempool_ops *ops;
+       int16_t ops_index;
+
+       rte_spinlock_lock(&rte_mempool_ops_table.sl);
+
+       if (rte_mempool_ops_table.num_ops >=
+                       RTE_MEMPOOL_MAX_OPS_IDX) {
+               rte_spinlock_unlock(&rte_mempool_ops_table.sl);
+               RTE_LOG(ERR, MEMPOOL,
+                       "Maximum number of mempool ops structs exceeded\n");
+               return -ENOSPC;
+       }
+
+       if (h->alloc == NULL || h->enqueue == NULL ||
+                       h->dequeue == NULL || h->get_count == NULL) {
+               rte_spinlock_unlock(&rte_mempool_ops_table.sl);
+               RTE_LOG(ERR, MEMPOOL,
+                       "Missing callback while registering mempool ops\n");
+               return -EINVAL;
+       }
+
+       if (strlen(h->name) >= sizeof(ops->name) - 1) {
+               rte_spinlock_unlock(&rte_mempool_ops_table.sl);
+               RTE_LOG(DEBUG, EAL, "%s(): mempool_ops <%s>: name too long\n",
+                               __func__, h->name);
+               rte_errno = EEXIST;
+               return -EEXIST;
+       }
+
+       ops_index = rte_mempool_ops_table.num_ops++;
+       ops = &rte_mempool_ops_table.ops[ops_index];
+       snprintf(ops->name, sizeof(ops->name), "%s", h->name);
+       ops->alloc = h->alloc;
+       ops->enqueue = h->enqueue;
+       ops->dequeue = h->dequeue;
+       ops->get_count = h->get_count;
+
+       rte_spinlock_unlock(&rte_mempool_ops_table.sl);
+
+       return ops_index;
+}
+
+/* wrapper to allocate an external mempool's private (pool) data. */
+int
+rte_mempool_ops_alloc(struct rte_mempool *mp)
+{
+       struct rte_mempool_ops *ops;
+
+       ops = rte_mempool_get_ops(mp->ops_index);
+       return ops->alloc(mp);
+}
+
+/* wrapper to free an external pool ops. */
+void
+rte_mempool_ops_free(struct rte_mempool *mp)
+{
+       struct rte_mempool_ops *ops;
+
+       ops = rte_mempool_get_ops(mp->ops_index);
+       if (ops->free == NULL)
+               return;
+       ops->free(mp);
+}
+
+/* wrapper to get available objects in an external mempool. */
+unsigned int
+rte_mempool_ops_get_count(const struct rte_mempool *mp)
+{
+       struct rte_mempool_ops *ops;
+
+       ops = rte_mempool_get_ops(mp->ops_index);
+       return ops->get_count(mp);
+}
+
+/* sets mempool ops previously registered by rte_mempool_register_ops. */
+int
+rte_mempool_set_ops_byname(struct rte_mempool *mp, const char *name,
+       void *pool_config)
+{
+       struct rte_mempool_ops *ops = NULL;
+       unsigned i;
+
+       /* too late, the mempool is already populated. */
+       if (mp->flags & MEMPOOL_F_POOL_CREATED)
+               return -EEXIST;
+
+       for (i = 0; i < rte_mempool_ops_table.num_ops; i++) {
+               if (!strcmp(name,
+                               rte_mempool_ops_table.ops[i].name)) {
+                       ops = &rte_mempool_ops_table.ops[i];
+                       break;
+               }
+       }
+
+       if (ops == NULL)
+               return -EINVAL;
+
+       mp->ops_index = i;
+       mp->pool_config = pool_config;
+       return 0;
+}
diff --git a/lib/librte_mempool/rte_mempool_ring.c b/lib/librte_mempool/rte_mempool_ring.c
new file mode 100644 (file)
index 0000000..b9aa64d
--- /dev/null
@@ -0,0 +1,161 @@
+/*-
+ *   BSD LICENSE
+ *
+ *   Copyright(c) 2010-2016 Intel Corporation. All rights reserved.
+ *   All rights reserved.
+ *
+ *   Redistribution and use in source and binary forms, with or without
+ *   modification, are permitted provided that the following conditions
+ *   are met:
+ *
+ *     * Redistributions of source code must retain the above copyright
+ *       notice, this list of conditions and the following disclaimer.
+ *     * Redistributions in binary form must reproduce the above copyright
+ *       notice, this list of conditions and the following disclaimer in
+ *       the documentation and/or other materials provided with the
+ *       distribution.
+ *     * Neither the name of Intel Corporation nor the names of its
+ *       contributors may be used to endorse or promote products derived
+ *       from this software without specific prior written permission.
+ *
+ *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <string.h>
+
+#include <rte_errno.h>
+#include <rte_ring.h>
+#include <rte_mempool.h>
+
+static int
+common_ring_mp_enqueue(struct rte_mempool *mp, void * const *obj_table,
+               unsigned n)
+{
+       return rte_ring_mp_enqueue_bulk(mp->pool_data, obj_table, n);
+}
+
+static int
+common_ring_sp_enqueue(struct rte_mempool *mp, void * const *obj_table,
+               unsigned n)
+{
+       return rte_ring_sp_enqueue_bulk(mp->pool_data, obj_table, n);
+}
+
+static int
+common_ring_mc_dequeue(struct rte_mempool *mp, void **obj_table, unsigned n)
+{
+       return rte_ring_mc_dequeue_bulk(mp->pool_data, obj_table, n);
+}
+
+static int
+common_ring_sc_dequeue(struct rte_mempool *mp, void **obj_table, unsigned n)
+{
+       return rte_ring_sc_dequeue_bulk(mp->pool_data, obj_table, n);
+}
+
+static unsigned
+common_ring_get_count(const struct rte_mempool *mp)
+{
+       return rte_ring_count(mp->pool_data);
+}
+
+
+static int
+common_ring_alloc(struct rte_mempool *mp)
+{
+       int rg_flags = 0, ret;
+       char rg_name[RTE_RING_NAMESIZE];
+       struct rte_ring *r;
+
+       ret = snprintf(rg_name, sizeof(rg_name),
+               RTE_MEMPOOL_MZ_FORMAT, mp->name);
+       if (ret < 0 || ret >= (int)sizeof(rg_name)) {
+               rte_errno = ENAMETOOLONG;
+               return -rte_errno;
+       }
+
+       /* ring flags */
+       if (mp->flags & MEMPOOL_F_SP_PUT)
+               rg_flags |= RING_F_SP_ENQ;
+       if (mp->flags & MEMPOOL_F_SC_GET)
+               rg_flags |= RING_F_SC_DEQ;
+
+       /*
+        * Allocate the ring that will be used to store objects.
+        * Ring functions will return appropriate errors if we are
+        * running as a secondary process etc., so no checks made
+        * in this function for that condition.
+        */
+       r = rte_ring_create(rg_name, rte_align32pow2(mp->size + 1),
+               mp->socket_id, rg_flags);
+       if (r == NULL)
+               return -rte_errno;
+
+       mp->pool_data = r;
+
+       return 0;
+}
+
+static void
+common_ring_free(struct rte_mempool *mp)
+{
+       rte_ring_free(mp->pool_data);
+}
+
+/*
+ * The following 4 declarations of mempool ops structs address
+ * the need for the backward compatible mempool handlers for
+ * single/multi producers and single/multi consumers as dictated by the
+ * flags provided to the rte_mempool_create function
+ */
+static const struct rte_mempool_ops ops_mp_mc = {
+       .name = "ring_mp_mc",
+       .alloc = common_ring_alloc,
+       .free = common_ring_free,
+       .enqueue = common_ring_mp_enqueue,
+       .dequeue = common_ring_mc_dequeue,
+       .get_count = common_ring_get_count,
+};
+
+static const struct rte_mempool_ops ops_sp_sc = {
+       .name = "ring_sp_sc",
+       .alloc = common_ring_alloc,
+       .free = common_ring_free,
+       .enqueue = common_ring_sp_enqueue,
+       .dequeue = common_ring_sc_dequeue,
+       .get_count = common_ring_get_count,
+};
+
+static const struct rte_mempool_ops ops_mp_sc = {
+       .name = "ring_mp_sc",
+       .alloc = common_ring_alloc,
+       .free = common_ring_free,
+       .enqueue = common_ring_mp_enqueue,
+       .dequeue = common_ring_sc_dequeue,
+       .get_count = common_ring_get_count,
+};
+
+static const struct rte_mempool_ops ops_sp_mc = {
+       .name = "ring_sp_mc",
+       .alloc = common_ring_alloc,
+       .free = common_ring_free,
+       .enqueue = common_ring_sp_enqueue,
+       .dequeue = common_ring_mc_dequeue,
+       .get_count = common_ring_get_count,
+};
+
+MEMPOOL_REGISTER_OPS(ops_mp_mc);
+MEMPOOL_REGISTER_OPS(ops_sp_sc);
+MEMPOOL_REGISTER_OPS(ops_mp_sc);
+MEMPOOL_REGISTER_OPS(ops_sp_mc);
index f63461b..a4a6c1f 100644 (file)
@@ -20,15 +20,18 @@ DPDK_16.7 {
        global:
 
        rte_mempool_check_cookies;
-       rte_mempool_obj_iter;
-       rte_mempool_mem_iter;
        rte_mempool_create_empty;
+       rte_mempool_free;
+       rte_mempool_mem_iter;
+       rte_mempool_obj_iter;
+       rte_mempool_ops_table;
+       rte_mempool_populate_anon;
+       rte_mempool_populate_default;
        rte_mempool_populate_phys;
        rte_mempool_populate_phys_tab;
        rte_mempool_populate_virt;
-       rte_mempool_populate_default;
-       rte_mempool_populate_anon;
-       rte_mempool_free;
+       rte_mempool_register_ops;
+       rte_mempool_set_ops_byname;
 
        local: *;
 } DPDK_2.0;