X-Git-Url: http://git.droids-corp.org/?a=blobdiff_plain;f=lib%2Flibrte_ring%2Frte_ring.h;h=bdf69b7790dee68418970f28069090151349702e;hb=873a61c7526be06f45d8d709a7c56d10cc06ab34;hp=3c34e227a753056bbb1b4feed2264f88e3022593;hpb=c738c6a644e5a07fa98ede668775c08ec5321273;p=dpdk.git diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index 3c34e227a7..bdf69b7790 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -1,13 +1,13 @@ /*- * BSD LICENSE - * + * * Copyright(c) 2010-2014 Intel Corporation. All rights reserved. * All rights reserved. - * + * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: - * + * * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above copyright @@ -17,7 +17,7 @@ * * Neither the name of Intel Corporation nor the names of its * contributors may be used to endorse or promote products derived * from this software without specific prior written permission. - * + * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR @@ -91,6 +91,7 @@ extern "C" { #endif +#include #include #include #include @@ -126,6 +127,11 @@ struct rte_ring_debug_stats { #define RTE_RING_NAMESIZE 32 /**< The maximum length of a ring name. */ #define RTE_RING_MZ_PREFIX "RG_" +#ifndef RTE_RING_PAUSE_REP_COUNT +#define RTE_RING_PAUSE_REP_COUNT 0 /**< Yield after pause num of times, no yield + * if RTE_RING_PAUSE_REP not defined. */ +#endif + /** * An RTE ring structure. * @@ -137,8 +143,6 @@ struct rte_ring_debug_stats { * a problem. */ struct rte_ring { - TAILQ_ENTRY(rte_ring) next; /**< Next in list. */ - char name[RTE_RING_NAMESIZE]; /**< Name of the ring. */ int flags; /**< Flags supplied at creation. */ @@ -170,8 +174,8 @@ struct rte_ring { #endif void * ring[0] __rte_cache_aligned; /**< Memory space of ring starts here. - * not volatile so need to be careful - * about compiler re-ordering */ + * not volatile so need to be careful + * about compiler re-ordering */ }; #define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */ @@ -189,10 +193,12 @@ struct rte_ring { * The number to add to the object-oriented statistics. */ #ifdef RTE_LIBRTE_RING_DEBUG -#define __RING_STAT_ADD(r, name, n) do { \ - unsigned __lcore_id = rte_lcore_id(); \ - r->stats[__lcore_id].name##_objs += n; \ - r->stats[__lcore_id].name##_bulk += 1; \ +#define __RING_STAT_ADD(r, name, n) do { \ + unsigned __lcore_id = rte_lcore_id(); \ + if (__lcore_id < RTE_MAX_LCORE) { \ + r->stats[__lcore_id].name##_objs += n; \ + r->stats[__lcore_id].name##_bulk += 1; \ + } \ } while(0) #else #define __RING_STAT_ADD(r, name, n) do {} while(0) @@ -317,12 +323,14 @@ int rte_ring_set_water_mark(struct rte_ring *r, unsigned count); /** * Dump the status of the ring to the console. * + * @param f + * A pointer to a file for output * @param r * A pointer to the ring structure. */ -void rte_ring_dump(const struct rte_ring *r); +void rte_ring_dump(FILE *f, const struct rte_ring *r); -/* the actual enqueue of pointers on the ring. +/* the actual enqueue of pointers on the ring. * Placed here since identical code needed in both * single and multi producer enqueue functions */ #define ENQUEUE_PTRS() do { \ @@ -348,7 +356,7 @@ void rte_ring_dump(const struct rte_ring *r); } \ } while(0) -/* the actual copy of pointers on the ring to obj_table. +/* the actual copy of pointers on the ring to obj_table. * Placed here since identical code needed in both * single and multi consumer dequeue functions */ #define DEQUEUE_PTRS() do { \ @@ -407,7 +415,7 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table, uint32_t cons_tail, free_entries; const unsigned max = n; int success; - unsigned i; + unsigned i, rep = 0; uint32_t mask = r->prod.mask; int ret; @@ -465,9 +473,18 @@ __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table, * If there are other enqueues in progress that preceded us, * we need to wait for them to complete */ - while (unlikely(r->prod.tail != prod_head)) + while (unlikely(r->prod.tail != prod_head)) { rte_pause(); + /* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting + * for other thread finish. It gives pre-empted thread a chance + * to proceed and finish with ring dequeue operation. */ + if (RTE_RING_PAUSE_REP_COUNT && + ++rep == RTE_RING_PAUSE_REP_COUNT) { + rep = 0; + sched_yield(); + } + } r->prod.tail = prod_next; return ret; } @@ -586,7 +603,7 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table, uint32_t cons_next, entries; const unsigned max = n; int success; - unsigned i; + unsigned i, rep = 0; uint32_t mask = r->prod.mask; /* move cons.head atomically */ @@ -631,9 +648,18 @@ __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table, * If there are other dequeues in progress that preceded us, * we need to wait for them to complete */ - while (unlikely(r->cons.tail != cons_head)) + while (unlikely(r->cons.tail != cons_head)) { rte_pause(); + /* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting + * for other thread finish. It gives pre-empted thread a chance + * to proceed and finish with ring dequeue operation. */ + if (RTE_RING_PAUSE_REP_COUNT && + ++rep == RTE_RING_PAUSE_REP_COUNT) { + rep = 0; + sched_yield(); + } + } __RING_STAT_ADD(r, deq_success, n); r->cons.tail = cons_next; @@ -1053,8 +1079,11 @@ rte_ring_free_count(const struct rte_ring *r) /** * Dump the status of all rings on the console + * + * @param f + * A pointer to a file for output */ -void rte_ring_list_dump(void); +void rte_ring_list_dump(FILE *f); /** * Search a ring from its name @@ -1083,7 +1112,7 @@ struct rte_ring *rte_ring_lookup(const char *name); * @return * - n: Actual number of objects enqueued. */ -static inline int __attribute__((always_inline)) +static inline unsigned __attribute__((always_inline)) rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table, unsigned n) { @@ -1102,7 +1131,7 @@ rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table, * @return * - n: Actual number of objects enqueued. */ -static inline int __attribute__((always_inline)) +static inline unsigned __attribute__((always_inline)) rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table, unsigned n) { @@ -1125,14 +1154,14 @@ rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table, * @return * - n: Actual number of objects enqueued. */ -static inline int __attribute__((always_inline)) +static inline unsigned __attribute__((always_inline)) rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, unsigned n) { if (r->prod.sp_enqueue) - return rte_ring_sp_enqueue_burst(r, obj_table, n); + return rte_ring_sp_enqueue_burst(r, obj_table, n); else - return rte_ring_mp_enqueue_burst(r, obj_table, n); + return rte_ring_mp_enqueue_burst(r, obj_table, n); } /** @@ -1152,7 +1181,7 @@ rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, * @return * - n: Actual number of objects dequeued, 0 if ring is empty */ -static inline int __attribute__((always_inline)) +static inline unsigned __attribute__((always_inline)) rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n) { return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE); @@ -1172,7 +1201,7 @@ rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n) * @return * - n: Actual number of objects dequeued, 0 if ring is empty */ -static inline int __attribute__((always_inline)) +static inline unsigned __attribute__((always_inline)) rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n) { return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE); @@ -1192,9 +1221,9 @@ rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n) * @param n * The number of objects to dequeue from the ring to the obj_table. * @return - * - Number of objects dequeued, or a negative error code on error + * - Number of objects dequeued */ -static inline int __attribute__((always_inline)) +static inline unsigned __attribute__((always_inline)) rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n) { if (r->cons.sc_dequeue)