+ next_mb = mbufs[next_idx++];
+ next_value = (((int64_t)(uintptr_t)next_mb) <<
+ RTE_DISTRIB_FLAG_BITS);
+ /*
+ * User is advocated to set tag value for each
+ * mbuf before calling rte_distributor_process.
+ * User defined tags are used to identify flows,
+ * or sessions.
+ */
+ /* flows MUST be non-zero */
+ new_tag = (uint16_t)(next_mb->hash.usr) | 1;
+
+ /*
+ * Uncommenting the next line will cause the find_match
+ * function to be optimized out, making this function
+ * do parallel (non-atomic) distribution
+ */
+ /* matches[j] = 0; */
+
+ if (matches[j] && d->active[matches[j]-1]) {
+ struct rte_distributor_backlog *bl =
+ &d->backlog[matches[j]-1];
+ if (unlikely(bl->count ==
+ RTE_DIST_BURST_SIZE)) {
+ release(d, matches[j]-1);
+ if (!d->active[matches[j]-1]) {
+ j--;
+ next_idx--;
+ matching_required = 1;
+ continue;
+ }
+ }
+
+ /* Add to worker that already has flow */
+ unsigned int idx = bl->count++;
+
+ bl->tags[idx] = new_tag;
+ bl->pkts[idx] = next_value;
+
+ } else {
+ struct rte_distributor_backlog *bl;
+
+ while (unlikely(!d->active[wkr]))
+ wkr = (wkr + 1) % d->num_workers;
+ bl = &d->backlog[wkr];
+
+ if (unlikely(bl->count ==
+ RTE_DIST_BURST_SIZE)) {
+ release(d, wkr);
+ if (!d->active[wkr]) {
+ j--;
+ next_idx--;
+ matching_required = 1;
+ continue;
+ }
+ }
+
+ /* Add to current worker worker */
+ unsigned int idx = bl->count++;
+
+ bl->tags[idx] = new_tag;
+ bl->pkts[idx] = next_value;
+ /*
+ * Now that we've just added an unpinned flow
+ * to a worker, we need to ensure that all
+ * other packets with that same flow will go
+ * to the same worker in this burst.
+ */
+ for (w = j; w < pkts; w++)
+ if (flows[w] == new_tag)
+ matches[w] = wkr+1;
+ }
+ }
+ wkr = (wkr + 1) % d->num_workers;