distributor: enhance and fix tag matching
authorQinglai Xiao <jigsaw@gmail.com>
Mon, 10 Nov 2014 14:44:02 +0000 (16:44 +0200)
committerThomas Monjalon <thomas.monjalon@6wind.com>
Thu, 13 Nov 2014 11:26:10 +0000 (12:26 +0100)
With introduction of in_flight_bitmask, the whole 32 bits of tag can be
used. Further more, this patch fixed the integer overflow when finding
the matched tags.
The maximum number workers is now defined as 64, which is length of
double-word. The link between number of workers and RTE_MAX_LCORE is
now removed. Compile time check is added to ensure the
RTE_DISTRIB_MAX_WORKERS is less than or equal to size of double-word.

Signed-off-by: Qinglai Xiao <jigsaw@gmail.com>
Acked-by: Bruce Richardson <bruce.richardson@intel.com>
lib/librte_distributor/rte_distributor.c
lib/librte_distributor/rte_distributor.h

index 3dfec4a..2c5d61c 100644 (file)
 #define RTE_DISTRIB_MAX_RETURNS 128
 #define RTE_DISTRIB_RETURNS_MASK (RTE_DISTRIB_MAX_RETURNS - 1)
 
+/**
+ * Maximum number of workers allowed.
+ * Be aware of increasing the limit, becaus it is limited by how we track
+ * in-flight tags. See @in_flight_bitmask and @rte_distributor_process
+ */
+#define RTE_DISTRIB_MAX_WORKERS        64
+
 /**
  * Buffer structure used to pass the pointer data between cores. This is cache
  * line aligned, but to improve performance and prevent adjacent cache-line
@@ -91,11 +98,17 @@ struct rte_distributor {
        char name[RTE_DISTRIBUTOR_NAMESIZE];  /**< Name of the ring. */
        unsigned num_workers;                 /**< Number of workers polling */
 
-       uint32_t in_flight_tags[RTE_MAX_LCORE];
-               /**< Tracks the tag being processed per core, 0 == no pkt */
-       struct rte_distributor_backlog backlog[RTE_MAX_LCORE];
+       uint32_t in_flight_tags[RTE_DISTRIB_MAX_WORKERS];
+               /**< Tracks the tag being processed per core */
+       uint64_t in_flight_bitmask;
+               /**< on/off bits for in-flight tags.
+                * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64 then
+                * the bitmask has to expand.
+                */
+
+       struct rte_distributor_backlog backlog[RTE_DISTRIB_MAX_WORKERS];
 
-       union rte_distributor_buffer bufs[RTE_MAX_LCORE];
+       union rte_distributor_buffer bufs[RTE_DISTRIB_MAX_WORKERS];
 
        struct rte_distributor_returned_pkts returns;
 };
@@ -189,6 +202,7 @@ static inline void
 handle_worker_shutdown(struct rte_distributor *d, unsigned wkr)
 {
        d->in_flight_tags[wkr] = 0;
+       d->in_flight_bitmask &= ~(1UL << wkr);
        d->bufs[wkr].bufptr64 = 0;
        if (unlikely(d->backlog[wkr].count != 0)) {
                /* On return of a packet, we need to move the
@@ -211,7 +225,10 @@ handle_worker_shutdown(struct rte_distributor *d, unsigned wkr)
                        pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >>
                                        RTE_DISTRIB_FLAG_BITS));
                }
-               /* recursive call */
+               /* recursive call.
+                * Note that the tags were set before first level call
+                * to rte_distributor_process.
+                */
                rte_distributor_process(d, pkts, i);
                bl->count = bl->start = 0;
        }
@@ -242,6 +259,7 @@ process_returns(struct rte_distributor *d)
                        else {
                                d->bufs[wkr].bufptr64 = RTE_DISTRIB_GET_BUF;
                                d->in_flight_tags[wkr] = 0;
+                               d->in_flight_bitmask &= ~(1UL << wkr);
                        }
                        oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
                } else if (data & RTE_DISTRIB_RETURN_BUF) {
@@ -284,14 +302,18 @@ rte_distributor_process(struct rte_distributor *d,
                        next_value = (((int64_t)(uintptr_t)next_mb)
                                        << RTE_DISTRIB_FLAG_BITS);
                        /*
-                        * Set the low bit on the tag, so we can guarantee that
-                        * we never store a tag value of zero. That means we can
-                        * use the zero-value to indicate that no packet is
-                        * being processed by a worker.
+                        * User is advocated to set tag vaue for each
+                        * mbuf before calling rte_distributor_process.
+                        * User defined tags are used to identify flows,
+                        * or sessions.
                         */
-                       new_tag = (next_mb->hash.usr | 1);
+                       new_tag = next_mb->hash.usr;
 
-                       uint32_t match = 0;
+                       /*
+                        * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64
+                        * then the size of match has to be expanded.
+                        */
+                       uint64_t match = 0;
                        unsigned i;
                        /*
                         * to scan for a match use "xor" and "not" to get a 0/1
@@ -303,9 +325,12 @@ rte_distributor_process(struct rte_distributor *d,
                                match |= (!(d->in_flight_tags[i] ^ new_tag)
                                        << i);
 
+                       /* Only turned-on bits are considered as match */
+                       match &= d->in_flight_bitmask;
+
                        if (match) {
                                next_mb = NULL;
-                               unsigned worker = __builtin_ctz(match);
+                               unsigned worker = __builtin_ctzl(match);
                                if (add_to_backlog(&d->backlog[worker],
                                                next_value) < 0)
                                        next_idx--;
@@ -322,6 +347,7 @@ rte_distributor_process(struct rte_distributor *d,
                        else {
                                d->bufs[wkr].bufptr64 = next_value;
                                d->in_flight_tags[wkr] = new_tag;
+                               d->in_flight_bitmask |= (1UL << wkr);
                                next_mb = NULL;
                        }
                        oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
@@ -379,11 +405,13 @@ rte_distributor_returned_pkts(struct rte_distributor *d,
 static inline unsigned
 total_outstanding(const struct rte_distributor *d)
 {
-       unsigned wkr, total_outstanding = 0;
+       unsigned wkr, total_outstanding;
+
+       total_outstanding = __builtin_popcountl(d->in_flight_bitmask);
 
        for (wkr = 0; wkr < d->num_workers; wkr++)
-               total_outstanding += d->backlog[wkr].count +
-                               !!(d->in_flight_tags[wkr]);
+               total_outstanding += d->backlog[wkr].count;
+
        return total_outstanding;
 }
 
@@ -423,9 +451,11 @@ rte_distributor_create(const char *name,
 
        /* compilation-time checks */
        RTE_BUILD_BUG_ON((sizeof(*d) & CACHE_LINE_MASK) != 0);
-       RTE_BUILD_BUG_ON((RTE_MAX_LCORE & 7) != 0);
+       RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0);
+       RTE_BUILD_BUG_ON(RTE_DISTRIB_MAX_WORKERS >
+                               sizeof(d->in_flight_bitmask) * CHAR_BIT);
 
-       if (name == NULL || num_workers >= RTE_MAX_LCORE) {
+       if (name == NULL || num_workers >= RTE_DISTRIB_MAX_WORKERS) {
                rte_errno = EINVAL;
                return NULL;
        }
index ec0d74a..cc1d559 100644 (file)
@@ -88,6 +88,10 @@ rte_distributor_create(const char *name, unsigned socket_id,
  * packets. The distributor will ensure that no two packets that have the
  * same flow id, or tag, in the mbuf will be procesed at the same time.
  *
+ * The user is advocated to set tag for each mbuf before calling this function.
+ * If user doesn't set the tag, the tag value can be various values depending on
+ * driver implementation and configuration.
+ *
  * This is not multi-thread safe and should only be called on a single lcore.
  *
  * @param d