X-Git-Url: http://git.droids-corp.org/?a=blobdiff_plain;f=lib%2Flibrte_distributor%2Frte_distributor.c;h=f3f778c9028971ea1d663893cecd135ea5abf947;hb=a3358d942fa97ddb3b9461543896a55f60b5e782;hp=585ff88ae80dd8de77c8c4c5a02c5eebbf6f730a;hpb=ea672a8b1655bbb44876d2550ff56f384968a43b;p=dpdk.git diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c index 585ff88ae8..f3f778c902 100644 --- a/lib/librte_distributor/rte_distributor.c +++ b/lib/librte_distributor/rte_distributor.c @@ -35,10 +35,10 @@ #include #include #include +#include #include #include #include -#include #include #include "rte_distributor.h" @@ -61,6 +61,13 @@ #define RTE_DISTRIB_MAX_RETURNS 128 #define RTE_DISTRIB_RETURNS_MASK (RTE_DISTRIB_MAX_RETURNS - 1) +/** + * Maximum number of workers allowed. + * Be aware of increasing the limit, becaus it is limited by how we track + * in-flight tags. See @in_flight_bitmask and @rte_distributor_process + */ +#define RTE_DISTRIB_MAX_WORKERS 64 + /** * Buffer structure used to pass the pointer data between cores. This is cache * line aligned, but to improve performance and prevent adjacent cache-line @@ -70,7 +77,7 @@ */ union rte_distributor_buffer { volatile int64_t bufptr64; - char pad[CACHE_LINE_SIZE*3]; + char pad[RTE_CACHE_LINE_SIZE*3]; } __rte_cache_aligned; struct rte_distributor_backlog { @@ -91,16 +98,28 @@ struct rte_distributor { char name[RTE_DISTRIBUTOR_NAMESIZE]; /**< Name of the ring. */ unsigned num_workers; /**< Number of workers polling */ - uint32_t in_flight_tags[RTE_MAX_LCORE]; - struct rte_distributor_backlog backlog[RTE_MAX_LCORE]; + uint32_t in_flight_tags[RTE_DISTRIB_MAX_WORKERS]; + /**< Tracks the tag being processed per core */ + uint64_t in_flight_bitmask; + /**< on/off bits for in-flight tags. + * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64 then + * the bitmask has to expand. + */ + + struct rte_distributor_backlog backlog[RTE_DISTRIB_MAX_WORKERS]; - union rte_distributor_buffer bufs[RTE_MAX_LCORE]; + union rte_distributor_buffer bufs[RTE_DISTRIB_MAX_WORKERS]; struct rte_distributor_returned_pkts returns; }; TAILQ_HEAD(rte_distributor_list, rte_distributor); +static struct rte_tailq_elem rte_distributor_tailq = { + .name = "RTE_DISTRIBUTOR", +}; +EAL_REGISTER_TAILQ(rte_distributor_tailq) + /**** APIs called by workers ****/ void @@ -188,6 +207,7 @@ static inline void handle_worker_shutdown(struct rte_distributor *d, unsigned wkr) { d->in_flight_tags[wkr] = 0; + d->in_flight_bitmask &= ~(1UL << wkr); d->bufs[wkr].bufptr64 = 0; if (unlikely(d->backlog[wkr].count != 0)) { /* On return of a packet, we need to move the @@ -210,7 +230,10 @@ handle_worker_shutdown(struct rte_distributor *d, unsigned wkr) pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >> RTE_DISTRIB_FLAG_BITS)); } - /* recursive call */ + /* recursive call. + * Note that the tags were set before first level call + * to rte_distributor_process. + */ rte_distributor_process(d, pkts, i); bl->count = bl->start = 0; } @@ -241,6 +264,7 @@ process_returns(struct rte_distributor *d) else { d->bufs[wkr].bufptr64 = RTE_DISTRIB_GET_BUF; d->in_flight_tags[wkr] = 0; + d->in_flight_bitmask &= ~(1UL << wkr); } oldbuf = data >> RTE_DISTRIB_FLAG_BITS; } else if (data & RTE_DISTRIB_RETURN_BUF) { @@ -282,17 +306,36 @@ rte_distributor_process(struct rte_distributor *d, next_mb = mbufs[next_idx++]; next_value = (((int64_t)(uintptr_t)next_mb) << RTE_DISTRIB_FLAG_BITS); - new_tag = (next_mb->hash.rss | 1); - - uint32_t match = 0; + /* + * User is advocated to set tag vaue for each + * mbuf before calling rte_distributor_process. + * User defined tags are used to identify flows, + * or sessions. + */ + new_tag = next_mb->hash.usr; + + /* + * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64 + * then the size of match has to be expanded. + */ + uint64_t match = 0; unsigned i; + /* + * to scan for a match use "xor" and "not" to get a 0/1 + * value, then use shifting to merge to single "match" + * variable, where a one-bit indicates a match for the + * worker given by the bit-position + */ for (i = 0; i < d->num_workers; i++) match |= (!(d->in_flight_tags[i] ^ new_tag) << i); + /* Only turned-on bits are considered as match */ + match &= d->in_flight_bitmask; + if (match) { next_mb = NULL; - unsigned worker = __builtin_ctz(match); + unsigned worker = __builtin_ctzl(match); if (add_to_backlog(&d->backlog[worker], next_value) < 0) next_idx--; @@ -309,6 +352,7 @@ rte_distributor_process(struct rte_distributor *d, else { d->bufs[wkr].bufptr64 = next_value; d->in_flight_tags[wkr] = new_tag; + d->in_flight_bitmask |= (1UL << wkr); next_mb = NULL; } oldbuf = data >> RTE_DISTRIB_FLAG_BITS; @@ -366,11 +410,13 @@ rte_distributor_returned_pkts(struct rte_distributor *d, static inline unsigned total_outstanding(const struct rte_distributor *d) { - unsigned wkr, total_outstanding = 0; + unsigned wkr, total_outstanding; + + total_outstanding = __builtin_popcountl(d->in_flight_bitmask); for (wkr = 0; wkr < d->num_workers; wkr++) - total_outstanding += d->backlog[wkr].count + - !!(d->in_flight_tags[wkr]); + total_outstanding += d->backlog[wkr].count; + return total_outstanding; } @@ -409,22 +455,16 @@ rte_distributor_create(const char *name, const struct rte_memzone *mz; /* compilation-time checks */ - RTE_BUILD_BUG_ON((sizeof(*d) & CACHE_LINE_MASK) != 0); - RTE_BUILD_BUG_ON((RTE_MAX_LCORE & 7) != 0); + RTE_BUILD_BUG_ON((sizeof(*d) & RTE_CACHE_LINE_MASK) != 0); + RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0); + RTE_BUILD_BUG_ON(RTE_DISTRIB_MAX_WORKERS > + sizeof(d->in_flight_bitmask) * CHAR_BIT); - if (name == NULL || num_workers >= RTE_MAX_LCORE) { + if (name == NULL || num_workers >= RTE_DISTRIB_MAX_WORKERS) { rte_errno = EINVAL; return NULL; } - /* check that we have an initialised tail queue */ - distributor_list = RTE_TAILQ_LOOKUP_BY_IDX(RTE_TAILQ_DISTRIBUTOR, - rte_distributor_list); - if (distributor_list == NULL) { - rte_errno = E_RTE_NO_TAILQ; - return NULL; - } - snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name); mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS); if (mz == NULL) { @@ -436,6 +476,9 @@ rte_distributor_create(const char *name, snprintf(d->name, sizeof(d->name), "%s", name); d->num_workers = num_workers; + distributor_list = RTE_TAILQ_CAST(rte_distributor_tailq.head, + rte_distributor_list); + rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK); TAILQ_INSERT_TAIL(distributor_list, d, next); rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);