* Sync with worker on GET_BUF flag.
*/
while (unlikely(__atomic_load_n(retptr64, __ATOMIC_ACQUIRE)
- & RTE_DISTRIB_GET_BUF)) {
+ & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))) {
rte_pause();
uint64_t t = rte_rdtsc()+100;
for (i = count; i < RTE_DIST_BURST_SIZE; i++)
buf->retptr64[i] = 0;
- /* Set Return bit for each packet returned */
+ /* Set VALID_BUF bit for each packet returned */
for (i = count; i-- > 0; )
buf->retptr64[i] =
(((int64_t)(uintptr_t)(oldpkt[i])) <<
- RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF;
+ RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_VALID_BUF;
/*
* Finally, set the GET_BUF to signal to distributor that cache
return (pkts[0]) ? 1 : 0;
}
- /* If bit is set, return
+ /* If any of below bits is set, return.
+ * GET_BUF is set when distributor hasn't sent any packets yet
+ * RETURN_BUF is set when distributor must retrieve in-flight packets
* Sync with distributor to acquire bufptrs
*/
if (__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE)
- & RTE_DISTRIB_GET_BUF)
+ & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))
return -1;
/* since bufptr64 is signed, this should be an arithmetic shift */
}
/*
- * so now we've got the contents of the cacheline into an array of
+ * so now we've got the contents of the cacheline into an array of
* mbuf pointers, so toggle the bit so scheduler can start working
* on the next cacheline while we're working.
* Sync with distributor on GET_BUF flag. Release bufptrs.
* Sync with worker on GET_BUF flag.
*/
while (unlikely(__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_RELAXED)
- & RTE_DISTRIB_GET_BUF)) {
+ & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))) {
rte_pause();
uint64_t t = rte_rdtsc()+100;
__atomic_thread_fence(__ATOMIC_ACQUIRE);
for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
/* Switch off the return bit first */
- buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF;
+ buf->retptr64[i] = 0;
for (i = num; i-- > 0; )
buf->retptr64[i] = (((int64_t)(uintptr_t)oldpkt[i]) <<
- RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF;
+ RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_VALID_BUF;
+
+ /* Use RETURN_BUF on bufptr64 to notify distributor that
+ * we won't read any mbufs from there even if GET_BUF is set.
+ * This allows distributor to retrieve in-flight already sent packets.
+ */
+ __atomic_or_fetch(&(buf->bufptr64[0]), RTE_DISTRIB_RETURN_BUF,
+ __ATOMIC_ACQ_REL);
- /* set the GET_BUF but even if we got no returns.
- * Sync with distributor on GET_BUF flag. Release retptrs.
+ /* set the RETURN_BUF on retptr64 even if we got no returns.
+ * Sync with distributor on RETURN_BUF flag. Release retptrs.
+ * Notify distributor that we don't request more packets any more.
*/
__atomic_store_n(&(buf->retptr64[0]),
- buf->retptr64[0] | RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE);
+ buf->retptr64[0] | RTE_DISTRIB_RETURN_BUF, __ATOMIC_RELEASE);
return 0;
}
*/
}
+/*
+ * When worker called rte_distributor_return_pkt()
+ * and passed RTE_DISTRIB_RETURN_BUF handshake through retptr64,
+ * distributor must retrieve both inflight and backlog packets assigned
+ * to the worker and reprocess them to another worker.
+ */
+static void
+handle_worker_shutdown(struct rte_distributor *d, unsigned int wkr)
+{
+ struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
+ /* double BURST size for storing both inflights and backlog */
+ struct rte_mbuf *pkts[RTE_DIST_BURST_SIZE * 2];
+ unsigned int pkts_count = 0;
+ unsigned int i;
+
+ /* If GET_BUF is cleared there are in-flight packets sent
+ * to worker which does not require new packets.
+ * They must be retrieved and assigned to another worker.
+ */
+ if (!(__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE)
+ & RTE_DISTRIB_GET_BUF))
+ for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
+ if (buf->bufptr64[i] & RTE_DISTRIB_VALID_BUF)
+ pkts[pkts_count++] = (void *)((uintptr_t)
+ (buf->bufptr64[i]
+ >> RTE_DISTRIB_FLAG_BITS));
+
+ /* Make following operations on handshake flags on bufptr64:
+ * - set GET_BUF to indicate that distributor can overwrite buffer
+ * with new packets if worker will make a new request.
+ * - clear RETURN_BUF to unlock reads on worker side.
+ */
+ __atomic_store_n(&(buf->bufptr64[0]), RTE_DISTRIB_GET_BUF,
+ __ATOMIC_RELEASE);
+
+ /* Collect backlog packets from worker */
+ for (i = 0; i < d->backlog[wkr].count; i++)
+ pkts[pkts_count++] = (void *)((uintptr_t)
+ (d->backlog[wkr].pkts[i] >> RTE_DISTRIB_FLAG_BITS));
+
+ d->backlog[wkr].count = 0;
+
+ /* Clear both inflight and backlog tags */
+ for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
+ d->in_flight_tags[wkr][i] = 0;
+ d->backlog[wkr].tags[i] = 0;
+ }
+
+ /* Recursive call */
+ if (pkts_count > 0)
+ rte_distributor_process(d, pkts, pkts_count);
+}
+
/*
* When the handshake bits indicate that there are packets coming
/* Sync on GET_BUF flag. Acquire retptrs. */
if (__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_ACQUIRE)
- & RTE_DISTRIB_GET_BUF) {
+ & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF)) {
for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
- if (buf->retptr64[i] & RTE_DISTRIB_RETURN_BUF) {
+ if (buf->retptr64[i] & RTE_DISTRIB_VALID_BUF) {
oldbuf = ((uintptr_t)(buf->retptr64[i] >>
RTE_DISTRIB_FLAG_BITS));
/* store returns in a circular buffer */
store_return(oldbuf, d, &ret_start, &ret_count);
count++;
- buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF;
+ buf->retptr64[i] &= ~RTE_DISTRIB_VALID_BUF;
}
}
d->returns.start = ret_start;
d->returns.count = ret_count;
+
+ /* If worker requested packets with GET_BUF, set it to active
+ * otherwise (RETURN_BUF), set it to not active.
+ */
+ d->activesum -= d->active[wkr];
+ d->active[wkr] = !!(buf->retptr64[0] & RTE_DISTRIB_GET_BUF);
+ d->activesum += d->active[wkr];
+
+ /* If worker returned packets without requesting new ones,
+ * handle all in-flights and backlog packets assigned to it.
+ */
+ if (unlikely(buf->retptr64[0] & RTE_DISTRIB_RETURN_BUF))
+ handle_worker_shutdown(d, wkr);
+
/* Clear for the worker to populate with more returns.
* Sync with distributor on GET_BUF flag. Release retptrs.
*/
unsigned int i;
handle_returns(d, wkr);
+ if (unlikely(!d->active[wkr]))
+ return 0;
/* Sync with worker on GET_BUF flag */
while (!(__atomic_load_n(&(d->bufs[wkr].bufptr64[0]), __ATOMIC_ACQUIRE)
& RTE_DISTRIB_GET_BUF)) {
handle_returns(d, wkr);
+ if (unlikely(!d->active[wkr]))
+ return 0;
rte_pause();
}
int64_t next_value = 0;
uint16_t new_tag = 0;
uint16_t flows[RTE_DIST_BURST_SIZE] __rte_cache_aligned;
- unsigned int i, j, w, wid;
+ unsigned int i, j, w, wid, matching_required;
if (d->alg_type == RTE_DIST_ALG_SINGLE) {
/* Call the old API */
mbufs, num_mbufs);
}
+ for (wid = 0 ; wid < d->num_workers; wid++)
+ handle_returns(d, wid);
+
if (unlikely(num_mbufs == 0)) {
/* Flush out all non-full cache-lines to workers. */
for (wid = 0 ; wid < d->num_workers; wid++) {
/* Sync with worker on GET_BUF flag. */
- handle_returns(d, wid);
if (__atomic_load_n(&(d->bufs[wid].bufptr64[0]),
__ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF) {
release(d, wid);
return 0;
}
+ if (unlikely(!d->activesum))
+ return 0;
+
while (next_idx < num_mbufs) {
uint16_t matches[RTE_DIST_BURST_SIZE];
unsigned int pkts;
for (; i < RTE_DIST_BURST_SIZE; i++)
flows[i] = 0;
- switch (d->dist_match_fn) {
- case RTE_DIST_MATCH_VECTOR:
- find_match_vec(d, &flows[0], &matches[0]);
- break;
- default:
- find_match_scalar(d, &flows[0], &matches[0]);
- }
+ matching_required = 1;
+ for (j = 0; j < pkts; j++) {
+ if (unlikely(!d->activesum))
+ return next_idx;
+
+ if (unlikely(matching_required)) {
+ switch (d->dist_match_fn) {
+ case RTE_DIST_MATCH_VECTOR:
+ find_match_vec(d, &flows[0],
+ &matches[0]);
+ break;
+ default:
+ find_match_scalar(d, &flows[0],
+ &matches[0]);
+ }
+ matching_required = 0;
+ }
/*
* Matches array now contain the intended worker ID (+1) of
* the incoming packets. Any zeroes need to be assigned
* workers.
*/
- for (j = 0; j < pkts; j++) {
-
next_mb = mbufs[next_idx++];
next_value = (((int64_t)(uintptr_t)next_mb) <<
RTE_DISTRIB_FLAG_BITS);
*/
/* matches[j] = 0; */
- if (matches[j]) {
+ if (matches[j] && d->active[matches[j]-1]) {
struct rte_distributor_backlog *bl =
&d->backlog[matches[j]-1];
if (unlikely(bl->count ==
RTE_DIST_BURST_SIZE)) {
release(d, matches[j]-1);
+ if (!d->active[matches[j]-1]) {
+ j--;
+ next_idx--;
+ matching_required = 1;
+ continue;
+ }
}
/* Add to worker that already has flow */
bl->pkts[idx] = next_value;
} else {
- struct rte_distributor_backlog *bl =
- &d->backlog[wkr];
+ struct rte_distributor_backlog *bl;
+
+ while (unlikely(!d->active[wkr]))
+ wkr = (wkr + 1) % d->num_workers;
+ bl = &d->backlog[wkr];
+
if (unlikely(bl->count ==
RTE_DIST_BURST_SIZE)) {
release(d, wkr);
+ if (!d->active[wkr]) {
+ j--;
+ next_idx--;
+ matching_required = 1;
+ continue;
+ }
}
/* Add to current worker worker */
matches[w] = wkr+1;
}
}
- wkr++;
- if (wkr >= d->num_workers)
- wkr = 0;
+ wkr = (wkr + 1) % d->num_workers;
}
/* Flush out all non-full cache-lines to workers. */
for (i = 0 ; i < num_workers ; i++)
d->backlog[i].tags = &d->in_flight_tags[i][RTE_DIST_BURST_SIZE];
+ memset(d->active, 0, sizeof(d->active));
+ d->activesum = 0;
+
dist_burst_list = RTE_TAILQ_CAST(rte_dist_burst_tailq.head,
rte_dist_burst_list);