#define RTE_DISTRIB_MAX_RETURNS 128
#define RTE_DISTRIB_RETURNS_MASK (RTE_DISTRIB_MAX_RETURNS - 1)
+/**
+ * Maximum number of workers allowed.
+ * Be aware of increasing the limit, becaus it is limited by how we track
+ * in-flight tags. See @in_flight_bitmask and @rte_distributor_process
+ */
+#define RTE_DISTRIB_MAX_WORKERS 64
+
/**
* Buffer structure used to pass the pointer data between cores. This is cache
* line aligned, but to improve performance and prevent adjacent cache-line
char name[RTE_DISTRIBUTOR_NAMESIZE]; /**< Name of the ring. */
unsigned num_workers; /**< Number of workers polling */
- uint32_t in_flight_tags[RTE_MAX_LCORE];
- /**< Tracks the tag being processed per core, 0 == no pkt */
- struct rte_distributor_backlog backlog[RTE_MAX_LCORE];
+ uint32_t in_flight_tags[RTE_DISTRIB_MAX_WORKERS];
+ /**< Tracks the tag being processed per core */
+ uint64_t in_flight_bitmask;
+ /**< on/off bits for in-flight tags.
+ * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64 then
+ * the bitmask has to expand.
+ */
+
+ struct rte_distributor_backlog backlog[RTE_DISTRIB_MAX_WORKERS];
- union rte_distributor_buffer bufs[RTE_MAX_LCORE];
+ union rte_distributor_buffer bufs[RTE_DISTRIB_MAX_WORKERS];
struct rte_distributor_returned_pkts returns;
};
handle_worker_shutdown(struct rte_distributor *d, unsigned wkr)
{
d->in_flight_tags[wkr] = 0;
+ d->in_flight_bitmask &= ~(1UL << wkr);
d->bufs[wkr].bufptr64 = 0;
if (unlikely(d->backlog[wkr].count != 0)) {
/* On return of a packet, we need to move the
pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >>
RTE_DISTRIB_FLAG_BITS));
}
- /* recursive call */
+ /* recursive call.
+ * Note that the tags were set before first level call
+ * to rte_distributor_process.
+ */
rte_distributor_process(d, pkts, i);
bl->count = bl->start = 0;
}
else {
d->bufs[wkr].bufptr64 = RTE_DISTRIB_GET_BUF;
d->in_flight_tags[wkr] = 0;
+ d->in_flight_bitmask &= ~(1UL << wkr);
}
oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
} else if (data & RTE_DISTRIB_RETURN_BUF) {
next_value = (((int64_t)(uintptr_t)next_mb)
<< RTE_DISTRIB_FLAG_BITS);
/*
- * Set the low bit on the tag, so we can guarantee that
- * we never store a tag value of zero. That means we can
- * use the zero-value to indicate that no packet is
- * being processed by a worker.
+ * User is advocated to set tag vaue for each
+ * mbuf before calling rte_distributor_process.
+ * User defined tags are used to identify flows,
+ * or sessions.
*/
- new_tag = (next_mb->hash.usr | 1);
+ new_tag = next_mb->hash.usr;
- uint32_t match = 0;
+ /*
+ * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64
+ * then the size of match has to be expanded.
+ */
+ uint64_t match = 0;
unsigned i;
/*
* to scan for a match use "xor" and "not" to get a 0/1
match |= (!(d->in_flight_tags[i] ^ new_tag)
<< i);
+ /* Only turned-on bits are considered as match */
+ match &= d->in_flight_bitmask;
+
if (match) {
next_mb = NULL;
- unsigned worker = __builtin_ctz(match);
+ unsigned worker = __builtin_ctzl(match);
if (add_to_backlog(&d->backlog[worker],
next_value) < 0)
next_idx--;
else {
d->bufs[wkr].bufptr64 = next_value;
d->in_flight_tags[wkr] = new_tag;
+ d->in_flight_bitmask |= (1UL << wkr);
next_mb = NULL;
}
oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
static inline unsigned
total_outstanding(const struct rte_distributor *d)
{
- unsigned wkr, total_outstanding = 0;
+ unsigned wkr, total_outstanding;
+
+ total_outstanding = __builtin_popcountl(d->in_flight_bitmask);
for (wkr = 0; wkr < d->num_workers; wkr++)
- total_outstanding += d->backlog[wkr].count +
- !!(d->in_flight_tags[wkr]);
+ total_outstanding += d->backlog[wkr].count;
+
return total_outstanding;
}
/* compilation-time checks */
RTE_BUILD_BUG_ON((sizeof(*d) & CACHE_LINE_MASK) != 0);
- RTE_BUILD_BUG_ON((RTE_MAX_LCORE & 7) != 0);
+ RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0);
+ RTE_BUILD_BUG_ON(RTE_DISTRIB_MAX_WORKERS >
+ sizeof(d->in_flight_bitmask) * CHAR_BIT);
- if (name == NULL || num_workers >= RTE_MAX_LCORE) {
+ if (name == NULL || num_workers >= RTE_DISTRIB_MAX_WORKERS) {
rte_errno = EINVAL;
return NULL;
}