add prefix to cache line macros
[dpdk.git] / lib / librte_distributor / rte_distributor.c
index 585ff88..aa2f740 100644 (file)
 #define RTE_DISTRIB_MAX_RETURNS 128
 #define RTE_DISTRIB_RETURNS_MASK (RTE_DISTRIB_MAX_RETURNS - 1)
 
+/**
+ * Maximum number of workers allowed.
+ * Be aware of increasing the limit, becaus it is limited by how we track
+ * in-flight tags. See @in_flight_bitmask and @rte_distributor_process
+ */
+#define RTE_DISTRIB_MAX_WORKERS        64
+
 /**
  * Buffer structure used to pass the pointer data between cores. This is cache
  * line aligned, but to improve performance and prevent adjacent cache-line
@@ -70,7 +77,7 @@
  */
 union rte_distributor_buffer {
        volatile int64_t bufptr64;
-       char pad[CACHE_LINE_SIZE*3];
+       char pad[RTE_CACHE_LINE_SIZE*3];
 } __rte_cache_aligned;
 
 struct rte_distributor_backlog {
@@ -91,10 +98,17 @@ struct rte_distributor {
        char name[RTE_DISTRIBUTOR_NAMESIZE];  /**< Name of the ring. */
        unsigned num_workers;                 /**< Number of workers polling */
 
-       uint32_t in_flight_tags[RTE_MAX_LCORE];
-       struct rte_distributor_backlog backlog[RTE_MAX_LCORE];
+       uint32_t in_flight_tags[RTE_DISTRIB_MAX_WORKERS];
+               /**< Tracks the tag being processed per core */
+       uint64_t in_flight_bitmask;
+               /**< on/off bits for in-flight tags.
+                * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64 then
+                * the bitmask has to expand.
+                */
+
+       struct rte_distributor_backlog backlog[RTE_DISTRIB_MAX_WORKERS];
 
-       union rte_distributor_buffer bufs[RTE_MAX_LCORE];
+       union rte_distributor_buffer bufs[RTE_DISTRIB_MAX_WORKERS];
 
        struct rte_distributor_returned_pkts returns;
 };
@@ -188,6 +202,7 @@ static inline void
 handle_worker_shutdown(struct rte_distributor *d, unsigned wkr)
 {
        d->in_flight_tags[wkr] = 0;
+       d->in_flight_bitmask &= ~(1UL << wkr);
        d->bufs[wkr].bufptr64 = 0;
        if (unlikely(d->backlog[wkr].count != 0)) {
                /* On return of a packet, we need to move the
@@ -210,7 +225,10 @@ handle_worker_shutdown(struct rte_distributor *d, unsigned wkr)
                        pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >>
                                        RTE_DISTRIB_FLAG_BITS));
                }
-               /* recursive call */
+               /* recursive call.
+                * Note that the tags were set before first level call
+                * to rte_distributor_process.
+                */
                rte_distributor_process(d, pkts, i);
                bl->count = bl->start = 0;
        }
@@ -241,6 +259,7 @@ process_returns(struct rte_distributor *d)
                        else {
                                d->bufs[wkr].bufptr64 = RTE_DISTRIB_GET_BUF;
                                d->in_flight_tags[wkr] = 0;
+                               d->in_flight_bitmask &= ~(1UL << wkr);
                        }
                        oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
                } else if (data & RTE_DISTRIB_RETURN_BUF) {
@@ -282,17 +301,36 @@ rte_distributor_process(struct rte_distributor *d,
                        next_mb = mbufs[next_idx++];
                        next_value = (((int64_t)(uintptr_t)next_mb)
                                        << RTE_DISTRIB_FLAG_BITS);
-                       new_tag = (next_mb->hash.rss | 1);
-
-                       uint32_t match = 0;
+                       /*
+                        * User is advocated to set tag vaue for each
+                        * mbuf before calling rte_distributor_process.
+                        * User defined tags are used to identify flows,
+                        * or sessions.
+                        */
+                       new_tag = next_mb->hash.usr;
+
+                       /*
+                        * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64
+                        * then the size of match has to be expanded.
+                        */
+                       uint64_t match = 0;
                        unsigned i;
+                       /*
+                        * to scan for a match use "xor" and "not" to get a 0/1
+                        * value, then use shifting to merge to single "match"
+                        * variable, where a one-bit indicates a match for the
+                        * worker given by the bit-position
+                        */
                        for (i = 0; i < d->num_workers; i++)
                                match |= (!(d->in_flight_tags[i] ^ new_tag)
                                        << i);
 
+                       /* Only turned-on bits are considered as match */
+                       match &= d->in_flight_bitmask;
+
                        if (match) {
                                next_mb = NULL;
-                               unsigned worker = __builtin_ctz(match);
+                               unsigned worker = __builtin_ctzl(match);
                                if (add_to_backlog(&d->backlog[worker],
                                                next_value) < 0)
                                        next_idx--;
@@ -309,6 +347,7 @@ rte_distributor_process(struct rte_distributor *d,
                        else {
                                d->bufs[wkr].bufptr64 = next_value;
                                d->in_flight_tags[wkr] = new_tag;
+                               d->in_flight_bitmask |= (1UL << wkr);
                                next_mb = NULL;
                        }
                        oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
@@ -366,11 +405,13 @@ rte_distributor_returned_pkts(struct rte_distributor *d,
 static inline unsigned
 total_outstanding(const struct rte_distributor *d)
 {
-       unsigned wkr, total_outstanding = 0;
+       unsigned wkr, total_outstanding;
+
+       total_outstanding = __builtin_popcountl(d->in_flight_bitmask);
 
        for (wkr = 0; wkr < d->num_workers; wkr++)
-               total_outstanding += d->backlog[wkr].count +
-                               !!(d->in_flight_tags[wkr]);
+               total_outstanding += d->backlog[wkr].count;
+
        return total_outstanding;
 }
 
@@ -409,10 +450,12 @@ rte_distributor_create(const char *name,
        const struct rte_memzone *mz;
 
        /* compilation-time checks */
-       RTE_BUILD_BUG_ON((sizeof(*d) & CACHE_LINE_MASK) != 0);
-       RTE_BUILD_BUG_ON((RTE_MAX_LCORE & 7) != 0);
+       RTE_BUILD_BUG_ON((sizeof(*d) & RTE_CACHE_LINE_MASK) != 0);
+       RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0);
+       RTE_BUILD_BUG_ON(RTE_DISTRIB_MAX_WORKERS >
+                               sizeof(d->in_flight_bitmask) * CHAR_BIT);
 
-       if (name == NULL || num_workers >= RTE_MAX_LCORE) {
+       if (name == NULL || num_workers >= RTE_DISTRIB_MAX_WORKERS) {
                rte_errno = EINVAL;
                return NULL;
        }