- d->in_flight_tags[wkr] = 0;
- d->bufs[wkr].bufptr64 = 0;
- if (unlikely(d->backlog[wkr].count != 0)) {
- /* On return of a packet, we need to move the
- * queued packets for this core elsewhere.
- * Easiest solution is to set things up for
- * a recursive call. That will cause those
- * packets to be queued up for the next free
- * core, i.e. it will return as soon as a
- * core becomes free to accept the first
- * packet, as subsequent ones will be added to
- * the backlog for that core.
- */
- struct rte_mbuf *pkts[RTE_DISTRIB_BACKLOG_SIZE];
- unsigned i;
- struct rte_distributor_backlog *bl = &d->backlog[wkr];
-
- for (i = 0; i < bl->count; i++) {
- unsigned idx = (bl->start + i) &
- RTE_DISTRIB_BACKLOG_MASK;
- pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >>
+ struct rte_distributor_backlog *bl;
+ uint16_t i, j, w;
+
+ /*
+ * Function overview:
+ * 1. Loop through all worker ID's
+ * 2. Compare the current inflights to the incoming tags
+ * 3. Compare the current backlog to the incoming tags
+ * 4. Add any matches to the output
+ */
+
+ for (j = 0 ; j < RTE_DIST_BURST_SIZE; j++)
+ output_ptr[j] = 0;
+
+ for (i = 0; i < d->num_workers; i++) {
+ bl = &d->backlog[i];
+
+ for (j = 0; j < RTE_DIST_BURST_SIZE ; j++)
+ for (w = 0; w < RTE_DIST_BURST_SIZE; w++)
+ if (d->in_flight_tags[i][j] == data_ptr[w]) {
+ output_ptr[j] = i+1;
+ break;
+ }
+ for (j = 0; j < RTE_DIST_BURST_SIZE; j++)
+ for (w = 0; w < RTE_DIST_BURST_SIZE; w++)
+ if (bl->tags[j] == data_ptr[w]) {
+ output_ptr[j] = i+1;
+ break;
+ }
+ }
+
+ /*
+ * At this stage, the output contains 8 16-bit values, with
+ * each non-zero value containing the worker ID on which the
+ * corresponding flow is pinned to.
+ */
+}
+
+
+/*
+ * When the handshake bits indicate that there are packets coming
+ * back from the worker, this function is called to copy and store
+ * the valid returned pointers (store_return).
+ */
+static unsigned int
+handle_returns(struct rte_distributor *d, unsigned int wkr)
+{
+ struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
+ uintptr_t oldbuf;
+ unsigned int ret_start = d->returns.start,
+ ret_count = d->returns.count;
+ unsigned int count = 0;
+ unsigned int i;
+
+ /* Sync on GET_BUF flag. Acquire retptrs. */
+ if (__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_ACQUIRE)
+ & RTE_DISTRIB_GET_BUF) {
+ for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
+ if (buf->retptr64[i] & RTE_DISTRIB_RETURN_BUF) {
+ oldbuf = ((uintptr_t)(buf->retptr64[i] >>