+/*
+ * When worker called rte_distributor_return_pkt()
+ * and passed RTE_DISTRIB_RETURN_BUF handshake through retptr64,
+ * distributor must retrieve both inflight and backlog packets assigned
+ * to the worker and reprocess them to another worker.
+ */
+static void
+handle_worker_shutdown(struct rte_distributor *d, unsigned int wkr)
+{
+ struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
+ /* double BURST size for storing both inflights and backlog */
+ struct rte_mbuf *pkts[RTE_DIST_BURST_SIZE * 2];
+ unsigned int pkts_count = 0;
+ unsigned int i;
+
+ /* If GET_BUF is cleared there are in-flight packets sent
+ * to worker which does not require new packets.
+ * They must be retrieved and assigned to another worker.
+ */
+ if (!(__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE)
+ & RTE_DISTRIB_GET_BUF))
+ for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
+ if (buf->bufptr64[i] & RTE_DISTRIB_VALID_BUF)
+ pkts[pkts_count++] = (void *)((uintptr_t)
+ (buf->bufptr64[i]
+ >> RTE_DISTRIB_FLAG_BITS));
+
+ /* Make following operations on handshake flags on bufptr64:
+ * - set GET_BUF to indicate that distributor can overwrite buffer
+ * with new packets if worker will make a new request.
+ * - clear RETURN_BUF to unlock reads on worker side.
+ */
+ __atomic_store_n(&(buf->bufptr64[0]), RTE_DISTRIB_GET_BUF,
+ __ATOMIC_RELEASE);
+
+ /* Collect backlog packets from worker */
+ for (i = 0; i < d->backlog[wkr].count; i++)
+ pkts[pkts_count++] = (void *)((uintptr_t)
+ (d->backlog[wkr].pkts[i] >> RTE_DISTRIB_FLAG_BITS));
+
+ d->backlog[wkr].count = 0;
+
+ /* Clear both inflight and backlog tags */
+ for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
+ d->in_flight_tags[wkr][i] = 0;
+ d->backlog[wkr].tags[i] = 0;
+ }
+
+ /* Recursive call */
+ if (pkts_count > 0)
+ rte_distributor_process(d, pkts, pkts_count);
+}
+