From ecb6c4559e1cd5b7cd0583cc01d2140a4c12e0c9 Mon Sep 17 00:00:00 2001 From: Qinglai Xiao Date: Mon, 10 Nov 2014 16:44:02 +0200 Subject: [PATCH] distributor: enhance and fix tag matching With introduction of in_flight_bitmask, the whole 32 bits of tag can be used. Further more, this patch fixed the integer overflow when finding the matched tags. The maximum number workers is now defined as 64, which is length of double-word. The link between number of workers and RTE_MAX_LCORE is now removed. Compile time check is added to ensure the RTE_DISTRIB_MAX_WORKERS is less than or equal to size of double-word. Signed-off-by: Qinglai Xiao Acked-by: Bruce Richardson --- lib/librte_distributor/rte_distributor.c | 64 +++++++++++++++++------- lib/librte_distributor/rte_distributor.h | 4 ++ 2 files changed, 51 insertions(+), 17 deletions(-) diff --git a/lib/librte_distributor/rte_distributor.c b/lib/librte_distributor/rte_distributor.c index 3dfec4a0c7..2c5d61cf21 100644 --- a/lib/librte_distributor/rte_distributor.c +++ b/lib/librte_distributor/rte_distributor.c @@ -61,6 +61,13 @@ #define RTE_DISTRIB_MAX_RETURNS 128 #define RTE_DISTRIB_RETURNS_MASK (RTE_DISTRIB_MAX_RETURNS - 1) +/** + * Maximum number of workers allowed. + * Be aware of increasing the limit, becaus it is limited by how we track + * in-flight tags. See @in_flight_bitmask and @rte_distributor_process + */ +#define RTE_DISTRIB_MAX_WORKERS 64 + /** * Buffer structure used to pass the pointer data between cores. This is cache * line aligned, but to improve performance and prevent adjacent cache-line @@ -91,11 +98,17 @@ struct rte_distributor { char name[RTE_DISTRIBUTOR_NAMESIZE]; /**< Name of the ring. */ unsigned num_workers; /**< Number of workers polling */ - uint32_t in_flight_tags[RTE_MAX_LCORE]; - /**< Tracks the tag being processed per core, 0 == no pkt */ - struct rte_distributor_backlog backlog[RTE_MAX_LCORE]; + uint32_t in_flight_tags[RTE_DISTRIB_MAX_WORKERS]; + /**< Tracks the tag being processed per core */ + uint64_t in_flight_bitmask; + /**< on/off bits for in-flight tags. + * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64 then + * the bitmask has to expand. + */ + + struct rte_distributor_backlog backlog[RTE_DISTRIB_MAX_WORKERS]; - union rte_distributor_buffer bufs[RTE_MAX_LCORE]; + union rte_distributor_buffer bufs[RTE_DISTRIB_MAX_WORKERS]; struct rte_distributor_returned_pkts returns; }; @@ -189,6 +202,7 @@ static inline void handle_worker_shutdown(struct rte_distributor *d, unsigned wkr) { d->in_flight_tags[wkr] = 0; + d->in_flight_bitmask &= ~(1UL << wkr); d->bufs[wkr].bufptr64 = 0; if (unlikely(d->backlog[wkr].count != 0)) { /* On return of a packet, we need to move the @@ -211,7 +225,10 @@ handle_worker_shutdown(struct rte_distributor *d, unsigned wkr) pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >> RTE_DISTRIB_FLAG_BITS)); } - /* recursive call */ + /* recursive call. + * Note that the tags were set before first level call + * to rte_distributor_process. + */ rte_distributor_process(d, pkts, i); bl->count = bl->start = 0; } @@ -242,6 +259,7 @@ process_returns(struct rte_distributor *d) else { d->bufs[wkr].bufptr64 = RTE_DISTRIB_GET_BUF; d->in_flight_tags[wkr] = 0; + d->in_flight_bitmask &= ~(1UL << wkr); } oldbuf = data >> RTE_DISTRIB_FLAG_BITS; } else if (data & RTE_DISTRIB_RETURN_BUF) { @@ -284,14 +302,18 @@ rte_distributor_process(struct rte_distributor *d, next_value = (((int64_t)(uintptr_t)next_mb) << RTE_DISTRIB_FLAG_BITS); /* - * Set the low bit on the tag, so we can guarantee that - * we never store a tag value of zero. That means we can - * use the zero-value to indicate that no packet is - * being processed by a worker. + * User is advocated to set tag vaue for each + * mbuf before calling rte_distributor_process. + * User defined tags are used to identify flows, + * or sessions. */ - new_tag = (next_mb->hash.usr | 1); + new_tag = next_mb->hash.usr; - uint32_t match = 0; + /* + * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64 + * then the size of match has to be expanded. + */ + uint64_t match = 0; unsigned i; /* * to scan for a match use "xor" and "not" to get a 0/1 @@ -303,9 +325,12 @@ rte_distributor_process(struct rte_distributor *d, match |= (!(d->in_flight_tags[i] ^ new_tag) << i); + /* Only turned-on bits are considered as match */ + match &= d->in_flight_bitmask; + if (match) { next_mb = NULL; - unsigned worker = __builtin_ctz(match); + unsigned worker = __builtin_ctzl(match); if (add_to_backlog(&d->backlog[worker], next_value) < 0) next_idx--; @@ -322,6 +347,7 @@ rte_distributor_process(struct rte_distributor *d, else { d->bufs[wkr].bufptr64 = next_value; d->in_flight_tags[wkr] = new_tag; + d->in_flight_bitmask |= (1UL << wkr); next_mb = NULL; } oldbuf = data >> RTE_DISTRIB_FLAG_BITS; @@ -379,11 +405,13 @@ rte_distributor_returned_pkts(struct rte_distributor *d, static inline unsigned total_outstanding(const struct rte_distributor *d) { - unsigned wkr, total_outstanding = 0; + unsigned wkr, total_outstanding; + + total_outstanding = __builtin_popcountl(d->in_flight_bitmask); for (wkr = 0; wkr < d->num_workers; wkr++) - total_outstanding += d->backlog[wkr].count + - !!(d->in_flight_tags[wkr]); + total_outstanding += d->backlog[wkr].count; + return total_outstanding; } @@ -423,9 +451,11 @@ rte_distributor_create(const char *name, /* compilation-time checks */ RTE_BUILD_BUG_ON((sizeof(*d) & CACHE_LINE_MASK) != 0); - RTE_BUILD_BUG_ON((RTE_MAX_LCORE & 7) != 0); + RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0); + RTE_BUILD_BUG_ON(RTE_DISTRIB_MAX_WORKERS > + sizeof(d->in_flight_bitmask) * CHAR_BIT); - if (name == NULL || num_workers >= RTE_MAX_LCORE) { + if (name == NULL || num_workers >= RTE_DISTRIB_MAX_WORKERS) { rte_errno = EINVAL; return NULL; } diff --git a/lib/librte_distributor/rte_distributor.h b/lib/librte_distributor/rte_distributor.h index ec0d74ada0..cc1d559071 100644 --- a/lib/librte_distributor/rte_distributor.h +++ b/lib/librte_distributor/rte_distributor.h @@ -88,6 +88,10 @@ rte_distributor_create(const char *name, unsigned socket_id, * packets. The distributor will ensure that no two packets that have the * same flow id, or tag, in the mbuf will be procesed at the same time. * + * The user is advocated to set tag for each mbuf before calling this function. + * If user doesn't set the tag, the tag value can be various values depending on + * driver implementation and configuration. + * * This is not multi-thread safe and should only be called on a single lcore. * * @param d -- 2.20.1