+ for (j = 0; j < pkts; j++) {
+
+ next_mb = mbufs[next_idx++];
+ next_value = (((int64_t)(uintptr_t)next_mb) <<
+ RTE_DISTRIB_FLAG_BITS);
+ /*
+ * User is advocated to set tag value for each
+ * mbuf before calling rte_distributor_process.
+ * User defined tags are used to identify flows,
+ * or sessions.
+ */
+ /* flows MUST be non-zero */
+ new_tag = (uint16_t)(next_mb->hash.usr) | 1;
+
+ /*
+ * Uncommenting the next line will cause the find_match
+ * function to be optimized out, making this function
+ * do parallel (non-atomic) distribution
+ */
+ /* matches[j] = 0; */
+
+ if (matches[j]) {
+ struct rte_distributor_backlog *bl =
+ &d->backlog[matches[j]-1];
+ if (unlikely(bl->count ==
+ RTE_DIST_BURST_SIZE)) {
+ release(d, matches[j]-1);
+ }
+
+ /* Add to worker that already has flow */
+ unsigned int idx = bl->count++;
+
+ bl->tags[idx] = new_tag;
+ bl->pkts[idx] = next_value;
+
+ } else {
+ struct rte_distributor_backlog *bl =
+ &d->backlog[wkr];
+ if (unlikely(bl->count ==
+ RTE_DIST_BURST_SIZE)) {
+ release(d, wkr);
+ }
+
+ /* Add to current worker worker */
+ unsigned int idx = bl->count++;
+
+ bl->tags[idx] = new_tag;
+ bl->pkts[idx] = next_value;
+ /*
+ * Now that we've just added an unpinned flow
+ * to a worker, we need to ensure that all
+ * other packets with that same flow will go
+ * to the same worker in this burst.
+ */
+ for (w = j; w < pkts; w++)
+ if (flows[w] == new_tag)
+ matches[w] = wkr+1;
+ }
+ }
+ wkr++;
+ if (wkr >= d->num_workers)