lib: fix cache alignment of structures
[dpdk.git] / lib / librte_distributor / rte_distributor.c
index 5eee442..e0fdb4c 100644 (file)
@@ -35,6 +35,7 @@
 #include <sys/queue.h>
 #include <string.h>
 #include <rte_mbuf.h>
+#include <rte_memory.h>
 #include <rte_memzone.h>
 #include <rte_errno.h>
 #include <rte_string_fns.h>
 #define RTE_DISTRIB_MAX_RETURNS 128
 #define RTE_DISTRIB_RETURNS_MASK (RTE_DISTRIB_MAX_RETURNS - 1)
 
+/**
+ * Maximum number of workers allowed.
+ * Be aware of increasing the limit, becaus it is limited by how we track
+ * in-flight tags. See @in_flight_bitmask and @rte_distributor_process
+ */
+#define RTE_DISTRIB_MAX_WORKERS        64
+
 /**
  * Buffer structure used to pass the pointer data between cores. This is cache
  * line aligned, but to improve performance and prevent adjacent cache-line
@@ -70,7 +78,7 @@
  */
 union rte_distributor_buffer {
        volatile int64_t bufptr64;
-       char pad[CACHE_LINE_SIZE*3];
+       char pad[RTE_CACHE_LINE_SIZE*3];
 } __rte_cache_aligned;
 
 struct rte_distributor_backlog {
@@ -91,10 +99,17 @@ struct rte_distributor {
        char name[RTE_DISTRIBUTOR_NAMESIZE];  /**< Name of the ring. */
        unsigned num_workers;                 /**< Number of workers polling */
 
-       uint32_t in_flight_tags[RTE_MAX_LCORE];
-       struct rte_distributor_backlog backlog[RTE_MAX_LCORE];
+       uint32_t in_flight_tags[RTE_DISTRIB_MAX_WORKERS];
+               /**< Tracks the tag being processed per core */
+       uint64_t in_flight_bitmask;
+               /**< on/off bits for in-flight tags.
+                * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64 then
+                * the bitmask has to expand.
+                */
 
-       union rte_distributor_buffer bufs[RTE_MAX_LCORE];
+       struct rte_distributor_backlog backlog[RTE_DISTRIB_MAX_WORKERS];
+
+       union rte_distributor_buffer bufs[RTE_DISTRIB_MAX_WORKERS];
 
        struct rte_distributor_returned_pkts returns;
 };
@@ -103,8 +118,8 @@ TAILQ_HEAD(rte_distributor_list, rte_distributor);
 
 /**** APIs called by workers ****/
 
-struct rte_mbuf *
-rte_distributor_get_pkt(struct rte_distributor *d,
+void
+rte_distributor_request_pkt(struct rte_distributor *d,
                unsigned worker_id, struct rte_mbuf *oldpkt)
 {
        union rte_distributor_buffer *buf = &d->bufs[worker_id];
@@ -113,13 +128,32 @@ rte_distributor_get_pkt(struct rte_distributor *d,
        while (unlikely(buf->bufptr64 & RTE_DISTRIB_FLAGS_MASK))
                rte_pause();
        buf->bufptr64 = req;
-       while (buf->bufptr64 & RTE_DISTRIB_GET_BUF)
-               rte_pause();
+}
+
+struct rte_mbuf *
+rte_distributor_poll_pkt(struct rte_distributor *d,
+               unsigned worker_id)
+{
+       union rte_distributor_buffer *buf = &d->bufs[worker_id];
+       if (buf->bufptr64 & RTE_DISTRIB_GET_BUF)
+               return NULL;
+
        /* since bufptr64 is signed, this should be an arithmetic shift */
        int64_t ret = buf->bufptr64 >> RTE_DISTRIB_FLAG_BITS;
        return (struct rte_mbuf *)((uintptr_t)ret);
 }
 
+struct rte_mbuf *
+rte_distributor_get_pkt(struct rte_distributor *d,
+               unsigned worker_id, struct rte_mbuf *oldpkt)
+{
+       struct rte_mbuf *ret;
+       rte_distributor_request_pkt(d, worker_id, oldpkt);
+       while ((ret = rte_distributor_poll_pkt(d, worker_id)) == NULL)
+               rte_pause();
+       return ret;
+}
+
 int
 rte_distributor_return_pkt(struct rte_distributor *d,
                unsigned worker_id, struct rte_mbuf *oldpkt)
@@ -169,6 +203,7 @@ static inline void
 handle_worker_shutdown(struct rte_distributor *d, unsigned wkr)
 {
        d->in_flight_tags[wkr] = 0;
+       d->in_flight_bitmask &= ~(1UL << wkr);
        d->bufs[wkr].bufptr64 = 0;
        if (unlikely(d->backlog[wkr].count != 0)) {
                /* On return of a packet, we need to move the
@@ -191,7 +226,10 @@ handle_worker_shutdown(struct rte_distributor *d, unsigned wkr)
                        pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >>
                                        RTE_DISTRIB_FLAG_BITS));
                }
-               /* recursive call */
+               /* recursive call.
+                * Note that the tags were set before first level call
+                * to rte_distributor_process.
+                */
                rte_distributor_process(d, pkts, i);
                bl->count = bl->start = 0;
        }
@@ -222,6 +260,7 @@ process_returns(struct rte_distributor *d)
                        else {
                                d->bufs[wkr].bufptr64 = RTE_DISTRIB_GET_BUF;
                                d->in_flight_tags[wkr] = 0;
+                               d->in_flight_bitmask &= ~(1UL << wkr);
                        }
                        oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
                } else if (data & RTE_DISTRIB_RETURN_BUF) {
@@ -263,17 +302,36 @@ rte_distributor_process(struct rte_distributor *d,
                        next_mb = mbufs[next_idx++];
                        next_value = (((int64_t)(uintptr_t)next_mb)
                                        << RTE_DISTRIB_FLAG_BITS);
-                       new_tag = (next_mb->pkt.hash.rss | 1);
-
-                       uint32_t match = 0;
+                       /*
+                        * User is advocated to set tag vaue for each
+                        * mbuf before calling rte_distributor_process.
+                        * User defined tags are used to identify flows,
+                        * or sessions.
+                        */
+                       new_tag = next_mb->hash.usr;
+
+                       /*
+                        * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64
+                        * then the size of match has to be expanded.
+                        */
+                       uint64_t match = 0;
                        unsigned i;
+                       /*
+                        * to scan for a match use "xor" and "not" to get a 0/1
+                        * value, then use shifting to merge to single "match"
+                        * variable, where a one-bit indicates a match for the
+                        * worker given by the bit-position
+                        */
                        for (i = 0; i < d->num_workers; i++)
                                match |= (!(d->in_flight_tags[i] ^ new_tag)
                                        << i);
 
+                       /* Only turned-on bits are considered as match */
+                       match &= d->in_flight_bitmask;
+
                        if (match) {
                                next_mb = NULL;
-                               unsigned worker = __builtin_ctz(match);
+                               unsigned worker = __builtin_ctzl(match);
                                if (add_to_backlog(&d->backlog[worker],
                                                next_value) < 0)
                                        next_idx--;
@@ -290,6 +348,7 @@ rte_distributor_process(struct rte_distributor *d,
                        else {
                                d->bufs[wkr].bufptr64 = next_value;
                                d->in_flight_tags[wkr] = new_tag;
+                               d->in_flight_bitmask |= (1UL << wkr);
                                next_mb = NULL;
                        }
                        oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
@@ -347,11 +406,13 @@ rte_distributor_returned_pkts(struct rte_distributor *d,
 static inline unsigned
 total_outstanding(const struct rte_distributor *d)
 {
-       unsigned wkr, total_outstanding = 0;
+       unsigned wkr, total_outstanding;
+
+       total_outstanding = __builtin_popcountl(d->in_flight_bitmask);
 
        for (wkr = 0; wkr < d->num_workers; wkr++)
-               total_outstanding += d->backlog[wkr].count +
-                               !!(d->in_flight_tags[wkr]);
+               total_outstanding += d->backlog[wkr].count;
+
        return total_outstanding;
 }
 
@@ -390,10 +451,12 @@ rte_distributor_create(const char *name,
        const struct rte_memzone *mz;
 
        /* compilation-time checks */
-       RTE_BUILD_BUG_ON((sizeof(*d) & CACHE_LINE_MASK) != 0);
-       RTE_BUILD_BUG_ON((RTE_MAX_LCORE & 7) != 0);
+       RTE_BUILD_BUG_ON((sizeof(*d) & RTE_CACHE_LINE_MASK) != 0);
+       RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0);
+       RTE_BUILD_BUG_ON(RTE_DISTRIB_MAX_WORKERS >
+                               sizeof(d->in_flight_bitmask) * CHAR_BIT);
 
-       if (name == NULL || num_workers >= RTE_MAX_LCORE) {
+       if (name == NULL || num_workers >= RTE_DISTRIB_MAX_WORKERS) {
                rte_errno = EINVAL;
                return NULL;
        }
@@ -406,7 +469,7 @@ rte_distributor_create(const char *name,
                return NULL;
        }
 
-       rte_snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name);
+       snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name);
        mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS);
        if (mz == NULL) {
                rte_errno = ENOMEM;
@@ -414,7 +477,7 @@ rte_distributor_create(const char *name,
        }
 
        d = mz->addr;
-       rte_snprintf(d->name, sizeof(d->name), "%s", name);
+       snprintf(d->name, sizeof(d->name), "%s", name);
        d->num_workers = num_workers;
 
        rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);