net/bnxt: fix mbuf flags for PTP packets
[dpdk.git] / lib / librte_distributor / rte_distributor.c
index 1c047f0..07e385a 100644 (file)
@@ -14,6 +14,7 @@
 #include <rte_eal_memconfig.h>
 #include <rte_pause.h>
 #include <rte_tailq.h>
+#include <rte_vect.h>
 
 #include "rte_distributor.h"
 #include "rte_distributor_single.h"
@@ -42,7 +43,7 @@ rte_distributor_request_pkt(struct rte_distributor *d,
 
        if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
                rte_distributor_request_pkt_single(d->d_single,
-                       worker_id, oldpkt[0]);
+                       worker_id, count ? oldpkt[0] : NULL);
                return;
        }
 
@@ -51,7 +52,7 @@ rte_distributor_request_pkt(struct rte_distributor *d,
         * Sync with worker on GET_BUF flag.
         */
        while (unlikely(__atomic_load_n(retptr64, __ATOMIC_ACQUIRE)
-                       & RTE_DISTRIB_GET_BUF)) {
+                       & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))) {
                rte_pause();
                uint64_t t = rte_rdtsc()+100;
 
@@ -67,11 +68,11 @@ rte_distributor_request_pkt(struct rte_distributor *d,
        for (i = count; i < RTE_DIST_BURST_SIZE; i++)
                buf->retptr64[i] = 0;
 
-       /* Set Return bit for each packet returned */
+       /* Set VALID_BUF bit for each packet returned */
        for (i = count; i-- > 0; )
                buf->retptr64[i] =
                        (((int64_t)(uintptr_t)(oldpkt[i])) <<
-                       RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF;
+                       RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_VALID_BUF;
 
        /*
         * Finally, set the GET_BUF  to signal to distributor that cache
@@ -97,11 +98,13 @@ rte_distributor_poll_pkt(struct rte_distributor *d,
                return (pkts[0]) ? 1 : 0;
        }
 
-       /* If bit is set, return
+       /* If any of below bits is set, return.
+        * GET_BUF is set when distributor hasn't sent any packets yet
+        * RETURN_BUF is set when distributor must retrieve in-flight packets
         * Sync with distributor to acquire bufptrs
         */
        if (__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE)
-               & RTE_DISTRIB_GET_BUF)
+               & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))
                return -1;
 
        /* since bufptr64 is signed, this should be an arithmetic shift */
@@ -113,7 +116,7 @@ rte_distributor_poll_pkt(struct rte_distributor *d,
        }
 
        /*
-        * so now we've got the contents of the cacheline into an  array of
+        * so now we've got the contents of the cacheline into an array of
         * mbuf pointers, so toggle the bit so scheduler can start working
         * on the next cacheline while we're working.
         * Sync with distributor on GET_BUF flag. Release bufptrs.
@@ -134,7 +137,7 @@ rte_distributor_get_pkt(struct rte_distributor *d,
        if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
                if (return_count <= 1) {
                        pkts[0] = rte_distributor_get_pkt_single(d->d_single,
-                               worker_id, oldpkt[0]);
+                               worker_id, return_count ? oldpkt[0] : NULL);
                        return (pkts[0]) ? 1 : 0;
                } else
                        return -EINVAL;
@@ -165,25 +168,48 @@ rte_distributor_return_pkt(struct rte_distributor *d,
                if (num == 1)
                        return rte_distributor_return_pkt_single(d->d_single,
                                worker_id, oldpkt[0]);
+               else if (num == 0)
+                       return rte_distributor_return_pkt_single(d->d_single,
+                               worker_id, NULL);
                else
                        return -EINVAL;
        }
 
+       /* Spin while handshake bits are set (scheduler clears it).
+        * Sync with worker on GET_BUF flag.
+        */
+       while (unlikely(__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_RELAXED)
+                       & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))) {
+               rte_pause();
+               uint64_t t = rte_rdtsc()+100;
+
+               while (rte_rdtsc() < t)
+                       rte_pause();
+       }
+
        /* Sync with distributor to acquire retptrs */
        __atomic_thread_fence(__ATOMIC_ACQUIRE);
        for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
                /* Switch off the return bit first */
-               buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF;
+               buf->retptr64[i] = 0;
 
        for (i = num; i-- > 0; )
                buf->retptr64[i] = (((int64_t)(uintptr_t)oldpkt[i]) <<
-                       RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF;
+                       RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_VALID_BUF;
 
-       /* set the GET_BUF but even if we got no returns.
-        * Sync with distributor on GET_BUF flag. Release retptrs.
+       /* Use RETURN_BUF on bufptr64 to notify distributor that
+        * we won't read any mbufs from there even if GET_BUF is set.
+        * This allows distributor to retrieve in-flight already sent packets.
+        */
+       __atomic_or_fetch(&(buf->bufptr64[0]), RTE_DISTRIB_RETURN_BUF,
+               __ATOMIC_ACQ_REL);
+
+       /* set the RETURN_BUF on retptr64 even if we got no returns.
+        * Sync with distributor on RETURN_BUF flag. Release retptrs.
+        * Notify distributor that we don't request more packets any more.
         */
        __atomic_store_n(&(buf->retptr64[0]),
-               buf->retptr64[0] | RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE);
+               buf->retptr64[0] | RTE_DISTRIB_RETURN_BUF, __ATOMIC_RELEASE);
 
        return 0;
 }
@@ -234,13 +260,13 @@ find_match_scalar(struct rte_distributor *d,
 
                for (j = 0; j < RTE_DIST_BURST_SIZE ; j++)
                        for (w = 0; w < RTE_DIST_BURST_SIZE; w++)
-                               if (d->in_flight_tags[i][j] == data_ptr[w]) {
+                               if (d->in_flight_tags[i][w] == data_ptr[j]) {
                                        output_ptr[j] = i+1;
                                        break;
                                }
                for (j = 0; j < RTE_DIST_BURST_SIZE; j++)
                        for (w = 0; w < RTE_DIST_BURST_SIZE; w++)
-                               if (bl->tags[j] == data_ptr[w]) {
+                               if (bl->tags[w] == data_ptr[j]) {
                                        output_ptr[j] = i+1;
                                        break;
                                }
@@ -253,6 +279,59 @@ find_match_scalar(struct rte_distributor *d,
         */
 }
 
+/*
+ * When worker called rte_distributor_return_pkt()
+ * and passed RTE_DISTRIB_RETURN_BUF handshake through retptr64,
+ * distributor must retrieve both inflight and backlog packets assigned
+ * to the worker and reprocess them to another worker.
+ */
+static void
+handle_worker_shutdown(struct rte_distributor *d, unsigned int wkr)
+{
+       struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
+       /* double BURST size for storing both inflights and backlog */
+       struct rte_mbuf *pkts[RTE_DIST_BURST_SIZE * 2];
+       unsigned int pkts_count = 0;
+       unsigned int i;
+
+       /* If GET_BUF is cleared there are in-flight packets sent
+        * to worker which does not require new packets.
+        * They must be retrieved and assigned to another worker.
+        */
+       if (!(__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE)
+               & RTE_DISTRIB_GET_BUF))
+               for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
+                       if (buf->bufptr64[i] & RTE_DISTRIB_VALID_BUF)
+                               pkts[pkts_count++] = (void *)((uintptr_t)
+                                       (buf->bufptr64[i]
+                                               >> RTE_DISTRIB_FLAG_BITS));
+
+       /* Make following operations on handshake flags on bufptr64:
+        * - set GET_BUF to indicate that distributor can overwrite buffer
+        *     with new packets if worker will make a new request.
+        * - clear RETURN_BUF to unlock reads on worker side.
+        */
+       __atomic_store_n(&(buf->bufptr64[0]), RTE_DISTRIB_GET_BUF,
+               __ATOMIC_RELEASE);
+
+       /* Collect backlog packets from worker */
+       for (i = 0; i < d->backlog[wkr].count; i++)
+               pkts[pkts_count++] = (void *)((uintptr_t)
+                       (d->backlog[wkr].pkts[i] >> RTE_DISTRIB_FLAG_BITS));
+
+       d->backlog[wkr].count = 0;
+
+       /* Clear both inflight and backlog tags */
+       for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
+               d->in_flight_tags[wkr][i] = 0;
+               d->backlog[wkr].tags[i] = 0;
+       }
+
+       /* Recursive call */
+       if (pkts_count > 0)
+               rte_distributor_process(d, pkts, pkts_count);
+}
+
 
 /*
  * When the handshake bits indicate that there are packets coming
@@ -271,19 +350,33 @@ handle_returns(struct rte_distributor *d, unsigned int wkr)
 
        /* Sync on GET_BUF flag. Acquire retptrs. */
        if (__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_ACQUIRE)
-               & RTE_DISTRIB_GET_BUF) {
+               & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF)) {
                for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
-                       if (buf->retptr64[i] & RTE_DISTRIB_RETURN_BUF) {
+                       if (buf->retptr64[i] & RTE_DISTRIB_VALID_BUF) {
                                oldbuf = ((uintptr_t)(buf->retptr64[i] >>
                                        RTE_DISTRIB_FLAG_BITS));
                                /* store returns in a circular buffer */
                                store_return(oldbuf, d, &ret_start, &ret_count);
                                count++;
-                               buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF;
+                               buf->retptr64[i] &= ~RTE_DISTRIB_VALID_BUF;
                        }
                }
                d->returns.start = ret_start;
                d->returns.count = ret_count;
+
+               /* If worker requested packets with GET_BUF, set it to active
+                * otherwise (RETURN_BUF), set it to not active.
+                */
+               d->activesum -= d->active[wkr];
+               d->active[wkr] = !!(buf->retptr64[0] & RTE_DISTRIB_GET_BUF);
+               d->activesum += d->active[wkr];
+
+               /* If worker returned packets without requesting new ones,
+                * handle all in-flights and backlog packets assigned to it.
+                */
+               if (unlikely(buf->retptr64[0] & RTE_DISTRIB_RETURN_BUF))
+                       handle_worker_shutdown(d, wkr);
+
                /* Clear for the worker to populate with more returns.
                 * Sync with distributor on GET_BUF flag. Release retptrs.
                 */
@@ -307,12 +400,18 @@ release(struct rte_distributor *d, unsigned int wkr)
        struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
        unsigned int i;
 
+       handle_returns(d, wkr);
+       if (unlikely(!d->active[wkr]))
+               return 0;
+
        /* Sync with worker on GET_BUF flag */
        while (!(__atomic_load_n(&(d->bufs[wkr].bufptr64[0]), __ATOMIC_ACQUIRE)
-               & RTE_DISTRIB_GET_BUF))
+               & RTE_DISTRIB_GET_BUF)) {
+               handle_returns(d, wkr);
+               if (unlikely(!d->active[wkr]))
+                       return 0;
                rte_pause();
-
-       handle_returns(d, wkr);
+       }
 
        buf->count = 0;
 
@@ -350,7 +449,7 @@ rte_distributor_process(struct rte_distributor *d,
        int64_t next_value = 0;
        uint16_t new_tag = 0;
        uint16_t flows[RTE_DIST_BURST_SIZE] __rte_cache_aligned;
-       unsigned int i, j, w, wid;
+       unsigned int i, j, w, wid, matching_required;
 
        if (d->alg_type == RTE_DIST_ALG_SINGLE) {
                /* Call the old API */
@@ -358,12 +457,16 @@ rte_distributor_process(struct rte_distributor *d,
                        mbufs, num_mbufs);
        }
 
+       for (wid = 0 ; wid < d->num_workers; wid++)
+               handle_returns(d, wid);
+
        if (unlikely(num_mbufs == 0)) {
                /* Flush out all non-full cache-lines to workers. */
                for (wid = 0 ; wid < d->num_workers; wid++) {
                        /* Sync with worker on GET_BUF flag. */
                        if (__atomic_load_n(&(d->bufs[wid].bufptr64[0]),
                                __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF) {
+                               d->bufs[wid].count = 0;
                                release(d, wid);
                                handle_returns(d, wid);
                        }
@@ -371,15 +474,13 @@ rte_distributor_process(struct rte_distributor *d,
                return 0;
        }
 
+       if (unlikely(!d->activesum))
+               return 0;
+
        while (next_idx < num_mbufs) {
                uint16_t matches[RTE_DIST_BURST_SIZE];
                unsigned int pkts;
 
-               /* Sync with worker on GET_BUF flag. */
-               if (__atomic_load_n(&(d->bufs[wkr].bufptr64[0]),
-                       __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF)
-                       d->bufs[wkr].count = 0;
-
                if ((num_mbufs - next_idx) < RTE_DIST_BURST_SIZE)
                        pkts = num_mbufs - next_idx;
                else
@@ -395,22 +496,30 @@ rte_distributor_process(struct rte_distributor *d,
                for (; i < RTE_DIST_BURST_SIZE; i++)
                        flows[i] = 0;
 
-               switch (d->dist_match_fn) {
-               case RTE_DIST_MATCH_VECTOR:
-                       find_match_vec(d, &flows[0], &matches[0]);
-                       break;
-               default:
-                       find_match_scalar(d, &flows[0], &matches[0]);
-               }
+               matching_required = 1;
 
+               for (j = 0; j < pkts; j++) {
+                       if (unlikely(!d->activesum))
+                               return next_idx;
+
+                       if (unlikely(matching_required)) {
+                               switch (d->dist_match_fn) {
+                               case RTE_DIST_MATCH_VECTOR:
+                                       find_match_vec(d, &flows[0],
+                                               &matches[0]);
+                                       break;
+                               default:
+                                       find_match_scalar(d, &flows[0],
+                                               &matches[0]);
+                               }
+                               matching_required = 0;
+                       }
                /*
                 * Matches array now contain the intended worker ID (+1) of
                 * the incoming packets. Any zeroes need to be assigned
                 * workers.
                 */
 
-               for (j = 0; j < pkts; j++) {
-
                        next_mb = mbufs[next_idx++];
                        next_value = (((int64_t)(uintptr_t)next_mb) <<
                                        RTE_DISTRIB_FLAG_BITS);
@@ -430,12 +539,18 @@ rte_distributor_process(struct rte_distributor *d,
                         */
                        /* matches[j] = 0; */
 
-                       if (matches[j]) {
+                       if (matches[j] && d->active[matches[j]-1]) {
                                struct rte_distributor_backlog *bl =
                                                &d->backlog[matches[j]-1];
                                if (unlikely(bl->count ==
                                                RTE_DIST_BURST_SIZE)) {
                                        release(d, matches[j]-1);
+                                       if (!d->active[matches[j]-1]) {
+                                               j--;
+                                               next_idx--;
+                                               matching_required = 1;
+                                               continue;
+                                       }
                                }
 
                                /* Add to worker that already has flow */
@@ -445,11 +560,21 @@ rte_distributor_process(struct rte_distributor *d,
                                bl->pkts[idx] = next_value;
 
                        } else {
-                               struct rte_distributor_backlog *bl =
-                                               &d->backlog[wkr];
+                               struct rte_distributor_backlog *bl;
+
+                               while (unlikely(!d->active[wkr]))
+                                       wkr = (wkr + 1) % d->num_workers;
+                               bl = &d->backlog[wkr];
+
                                if (unlikely(bl->count ==
                                                RTE_DIST_BURST_SIZE)) {
                                        release(d, wkr);
+                                       if (!d->active[wkr]) {
+                                               j--;
+                                               next_idx--;
+                                               matching_required = 1;
+                                               continue;
+                                       }
                                }
 
                                /* Add to current worker worker */
@@ -468,17 +593,17 @@ rte_distributor_process(struct rte_distributor *d,
                                                matches[w] = wkr+1;
                        }
                }
-               wkr++;
-               if (wkr >= d->num_workers)
-                       wkr = 0;
+               wkr = (wkr + 1) % d->num_workers;
        }
 
        /* Flush out all non-full cache-lines to workers. */
        for (wid = 0 ; wid < d->num_workers; wid++)
                /* Sync with worker on GET_BUF flag. */
                if ((__atomic_load_n(&(d->bufs[wid].bufptr64[0]),
-                       __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF))
+                       __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF)) {
+                       d->bufs[wid].count = 0;
                        release(d, wid);
+               }
 
        return num_mbufs;
 }
@@ -521,7 +646,7 @@ total_outstanding(const struct rte_distributor *d)
        unsigned int wkr, total_outstanding = 0;
 
        for (wkr = 0; wkr < d->num_workers; wkr++)
-               total_outstanding += d->backlog[wkr].count;
+               total_outstanding += d->backlog[wkr].count + d->bufs[wkr].count;
 
        return total_outstanding;
 }
@@ -578,6 +703,8 @@ rte_distributor_clear_returns(struct rte_distributor *d)
                /* Sync with worker. Release retptrs. */
                __atomic_store_n(&(d->bufs[wkr].retptr64[0]), 0,
                                __ATOMIC_RELEASE);
+
+       d->returns.start = d->returns.count = 0;
 }
 
 /* creates a distributor instance */
@@ -636,7 +763,8 @@ rte_distributor_create(const char *name,
 
        d->dist_match_fn = RTE_DIST_MATCH_SCALAR;
 #if defined(RTE_ARCH_X86)
-       d->dist_match_fn = RTE_DIST_MATCH_VECTOR;
+       if (rte_vect_get_max_simd_bitwidth() >= RTE_VECT_SIMD_128)
+               d->dist_match_fn = RTE_DIST_MATCH_VECTOR;
 #endif
 
        /*
@@ -646,6 +774,9 @@ rte_distributor_create(const char *name,
        for (i = 0 ; i < num_workers ; i++)
                d->backlog[i].tags = &d->in_flight_tags[i][RTE_DIST_BURST_SIZE];
 
+       memset(d->active, 0, sizeof(d->active));
+       d->activesum = 0;
+
        dist_burst_list = RTE_TAILQ_CAST(rte_dist_burst_tailq.head,
                                          rte_dist_burst_list);