1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2010-2014 Intel Corporation
9 #include <rte_memory.h>
10 #include <rte_memzone.h>
11 #include <rte_errno.h>
12 #include <rte_function_versioning.h>
13 #include <rte_string_fns.h>
14 #include <rte_eal_memconfig.h>
15 #include <rte_pause.h>
16 #include <rte_tailq.h>
18 #include "rte_distributor_v20.h"
19 #include "rte_distributor_private.h"
21 TAILQ_HEAD(rte_distributor_list, rte_distributor_v20);
23 static struct rte_tailq_elem rte_distributor_tailq = {
24 .name = "RTE_DISTRIBUTOR",
26 EAL_REGISTER_TAILQ(rte_distributor_tailq)
28 /**** APIs called by workers ****/
31 rte_distributor_request_pkt_v20(struct rte_distributor_v20 *d,
32 unsigned worker_id, struct rte_mbuf *oldpkt)
34 union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id];
35 int64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS)
36 | RTE_DISTRIB_GET_BUF;
37 while (unlikely(__atomic_load_n(&buf->bufptr64, __ATOMIC_RELAXED)
38 & RTE_DISTRIB_FLAGS_MASK))
41 /* Sync with distributor on GET_BUF flag. */
42 __atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE);
44 VERSION_SYMBOL(rte_distributor_request_pkt, _v20, 2.0);
47 rte_distributor_poll_pkt_v20(struct rte_distributor_v20 *d,
50 union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id];
51 /* Sync with distributor. Acquire bufptr64. */
52 if (__atomic_load_n(&buf->bufptr64, __ATOMIC_ACQUIRE)
53 & RTE_DISTRIB_GET_BUF)
56 /* since bufptr64 is signed, this should be an arithmetic shift */
57 int64_t ret = buf->bufptr64 >> RTE_DISTRIB_FLAG_BITS;
58 return (struct rte_mbuf *)((uintptr_t)ret);
60 VERSION_SYMBOL(rte_distributor_poll_pkt, _v20, 2.0);
63 rte_distributor_get_pkt_v20(struct rte_distributor_v20 *d,
64 unsigned worker_id, struct rte_mbuf *oldpkt)
67 rte_distributor_request_pkt_v20(d, worker_id, oldpkt);
68 while ((ret = rte_distributor_poll_pkt_v20(d, worker_id)) == NULL)
72 VERSION_SYMBOL(rte_distributor_get_pkt, _v20, 2.0);
75 rte_distributor_return_pkt_v20(struct rte_distributor_v20 *d,
76 unsigned worker_id, struct rte_mbuf *oldpkt)
78 union rte_distributor_buffer_v20 *buf = &d->bufs[worker_id];
79 uint64_t req = (((int64_t)(uintptr_t)oldpkt) << RTE_DISTRIB_FLAG_BITS)
80 | RTE_DISTRIB_RETURN_BUF;
81 /* Sync with distributor on RETURN_BUF flag. */
82 __atomic_store_n(&(buf->bufptr64), req, __ATOMIC_RELEASE);
85 VERSION_SYMBOL(rte_distributor_return_pkt, _v20, 2.0);
87 /**** APIs called on distributor core ***/
89 /* as name suggests, adds a packet to the backlog for a particular worker */
91 add_to_backlog(struct rte_distributor_backlog *bl, int64_t item)
93 if (bl->count == RTE_DISTRIB_BACKLOG_SIZE)
96 bl->pkts[(bl->start + bl->count++) & (RTE_DISTRIB_BACKLOG_MASK)]
101 /* takes the next packet for a worker off the backlog */
103 backlog_pop(struct rte_distributor_backlog *bl)
106 return bl->pkts[bl->start++ & RTE_DISTRIB_BACKLOG_MASK];
109 /* stores a packet returned from a worker inside the returns array */
111 store_return(uintptr_t oldbuf, struct rte_distributor_v20 *d,
112 unsigned *ret_start, unsigned *ret_count)
114 /* store returns in a circular buffer - code is branch-free */
115 d->returns.mbufs[(*ret_start + *ret_count) & RTE_DISTRIB_RETURNS_MASK]
117 *ret_start += (*ret_count == RTE_DISTRIB_RETURNS_MASK) & !!(oldbuf);
118 *ret_count += (*ret_count != RTE_DISTRIB_RETURNS_MASK) & !!(oldbuf);
122 handle_worker_shutdown(struct rte_distributor_v20 *d, unsigned int wkr)
124 d->in_flight_tags[wkr] = 0;
125 d->in_flight_bitmask &= ~(1UL << wkr);
126 /* Sync with worker. Release bufptr64. */
127 __atomic_store_n(&(d->bufs[wkr].bufptr64), 0, __ATOMIC_RELEASE);
128 if (unlikely(d->backlog[wkr].count != 0)) {
129 /* On return of a packet, we need to move the
130 * queued packets for this core elsewhere.
131 * Easiest solution is to set things up for
132 * a recursive call. That will cause those
133 * packets to be queued up for the next free
134 * core, i.e. it will return as soon as a
135 * core becomes free to accept the first
136 * packet, as subsequent ones will be added to
137 * the backlog for that core.
139 struct rte_mbuf *pkts[RTE_DISTRIB_BACKLOG_SIZE];
141 struct rte_distributor_backlog *bl = &d->backlog[wkr];
143 for (i = 0; i < bl->count; i++) {
144 unsigned idx = (bl->start + i) &
145 RTE_DISTRIB_BACKLOG_MASK;
146 pkts[i] = (void *)((uintptr_t)(bl->pkts[idx] >>
147 RTE_DISTRIB_FLAG_BITS));
150 * Note that the tags were set before first level call
151 * to rte_distributor_process.
153 rte_distributor_process_v20(d, pkts, i);
154 bl->count = bl->start = 0;
158 /* this function is called when process() fn is called without any new
159 * packets. It goes through all the workers and clears any returned packets
160 * to do a partial flush.
163 process_returns(struct rte_distributor_v20 *d)
166 unsigned flushed = 0;
167 unsigned ret_start = d->returns.start,
168 ret_count = d->returns.count;
170 for (wkr = 0; wkr < d->num_workers; wkr++) {
171 uintptr_t oldbuf = 0;
172 /* Sync with worker. Acquire bufptr64. */
173 const int64_t data = __atomic_load_n(&(d->bufs[wkr].bufptr64),
176 if (data & RTE_DISTRIB_GET_BUF) {
178 if (d->backlog[wkr].count)
179 /* Sync with worker. Release bufptr64. */
180 __atomic_store_n(&(d->bufs[wkr].bufptr64),
181 backlog_pop(&d->backlog[wkr]),
184 /* Sync with worker on GET_BUF flag. */
185 __atomic_store_n(&(d->bufs[wkr].bufptr64),
188 d->in_flight_tags[wkr] = 0;
189 d->in_flight_bitmask &= ~(1UL << wkr);
191 oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
192 } else if (data & RTE_DISTRIB_RETURN_BUF) {
193 handle_worker_shutdown(d, wkr);
194 oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
197 store_return(oldbuf, d, &ret_start, &ret_count);
200 d->returns.start = ret_start;
201 d->returns.count = ret_count;
206 /* process a set of packets to distribute them to workers */
208 rte_distributor_process_v20(struct rte_distributor_v20 *d,
209 struct rte_mbuf **mbufs, unsigned num_mbufs)
211 unsigned next_idx = 0;
213 struct rte_mbuf *next_mb = NULL;
214 int64_t next_value = 0;
215 uint32_t new_tag = 0;
216 unsigned ret_start = d->returns.start,
217 ret_count = d->returns.count;
219 if (unlikely(num_mbufs == 0))
220 return process_returns(d);
222 while (next_idx < num_mbufs || next_mb != NULL) {
223 uintptr_t oldbuf = 0;
224 /* Sync with worker. Acquire bufptr64. */
225 int64_t data = __atomic_load_n(&(d->bufs[wkr].bufptr64),
229 next_mb = mbufs[next_idx++];
230 next_value = (((int64_t)(uintptr_t)next_mb)
231 << RTE_DISTRIB_FLAG_BITS);
233 * User is advocated to set tag value for each
234 * mbuf before calling rte_distributor_process.
235 * User defined tags are used to identify flows,
238 new_tag = next_mb->hash.usr;
241 * Note that if RTE_DISTRIB_MAX_WORKERS is larger than 64
242 * then the size of match has to be expanded.
247 * to scan for a match use "xor" and "not" to get a 0/1
248 * value, then use shifting to merge to single "match"
249 * variable, where a one-bit indicates a match for the
250 * worker given by the bit-position
252 for (i = 0; i < d->num_workers; i++)
253 match |= (!(d->in_flight_tags[i] ^ new_tag)
256 /* Only turned-on bits are considered as match */
257 match &= d->in_flight_bitmask;
261 unsigned worker = __builtin_ctzl(match);
262 if (add_to_backlog(&d->backlog[worker],
268 if ((data & RTE_DISTRIB_GET_BUF) &&
269 (d->backlog[wkr].count || next_mb)) {
271 if (d->backlog[wkr].count)
272 /* Sync with worker. Release bufptr64. */
273 __atomic_store_n(&(d->bufs[wkr].bufptr64),
274 backlog_pop(&d->backlog[wkr]),
278 /* Sync with worker. Release bufptr64. */
279 __atomic_store_n(&(d->bufs[wkr].bufptr64),
282 d->in_flight_tags[wkr] = new_tag;
283 d->in_flight_bitmask |= (1UL << wkr);
286 oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
287 } else if (data & RTE_DISTRIB_RETURN_BUF) {
288 handle_worker_shutdown(d, wkr);
289 oldbuf = data >> RTE_DISTRIB_FLAG_BITS;
292 /* store returns in a circular buffer */
293 store_return(oldbuf, d, &ret_start, &ret_count);
295 if (++wkr == d->num_workers)
298 /* to finish, check all workers for backlog and schedule work for them
299 * if they are ready */
300 for (wkr = 0; wkr < d->num_workers; wkr++)
301 if (d->backlog[wkr].count &&
302 /* Sync with worker. Acquire bufptr64. */
303 (__atomic_load_n(&(d->bufs[wkr].bufptr64),
304 __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF)) {
306 int64_t oldbuf = d->bufs[wkr].bufptr64 >>
307 RTE_DISTRIB_FLAG_BITS;
309 store_return(oldbuf, d, &ret_start, &ret_count);
311 /* Sync with worker. Release bufptr64. */
312 __atomic_store_n(&(d->bufs[wkr].bufptr64),
313 backlog_pop(&d->backlog[wkr]),
317 d->returns.start = ret_start;
318 d->returns.count = ret_count;
321 VERSION_SYMBOL(rte_distributor_process, _v20, 2.0);
323 /* return to the caller, packets returned from workers */
325 rte_distributor_returned_pkts_v20(struct rte_distributor_v20 *d,
326 struct rte_mbuf **mbufs, unsigned max_mbufs)
328 struct rte_distributor_returned_pkts *returns = &d->returns;
329 unsigned retval = (max_mbufs < returns->count) ?
330 max_mbufs : returns->count;
333 for (i = 0; i < retval; i++) {
334 unsigned idx = (returns->start + i) & RTE_DISTRIB_RETURNS_MASK;
335 mbufs[i] = returns->mbufs[idx];
342 VERSION_SYMBOL(rte_distributor_returned_pkts, _v20, 2.0);
344 /* return the number of packets in-flight in a distributor, i.e. packets
345 * being worked on or queued up in a backlog.
347 static inline unsigned
348 total_outstanding(const struct rte_distributor_v20 *d)
350 unsigned wkr, total_outstanding;
352 total_outstanding = __builtin_popcountl(d->in_flight_bitmask);
354 for (wkr = 0; wkr < d->num_workers; wkr++)
355 total_outstanding += d->backlog[wkr].count;
357 return total_outstanding;
360 /* flush the distributor, so that there are no outstanding packets in flight or
363 rte_distributor_flush_v20(struct rte_distributor_v20 *d)
365 const unsigned flushed = total_outstanding(d);
367 while (total_outstanding(d) > 0)
368 rte_distributor_process_v20(d, NULL, 0);
372 VERSION_SYMBOL(rte_distributor_flush, _v20, 2.0);
374 /* clears the internal returns array in the distributor */
376 rte_distributor_clear_returns_v20(struct rte_distributor_v20 *d)
378 d->returns.start = d->returns.count = 0;
380 memset(d->returns.mbufs, 0, sizeof(d->returns.mbufs));
383 VERSION_SYMBOL(rte_distributor_clear_returns, _v20, 2.0);
385 /* creates a distributor instance */
386 struct rte_distributor_v20 *
387 rte_distributor_create_v20(const char *name,
389 unsigned num_workers)
391 struct rte_distributor_v20 *d;
392 struct rte_distributor_list *distributor_list;
393 char mz_name[RTE_MEMZONE_NAMESIZE];
394 const struct rte_memzone *mz;
396 /* compilation-time checks */
397 RTE_BUILD_BUG_ON((sizeof(*d) & RTE_CACHE_LINE_MASK) != 0);
398 RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0);
399 RTE_BUILD_BUG_ON(RTE_DISTRIB_MAX_WORKERS >
400 sizeof(d->in_flight_bitmask) * CHAR_BIT);
402 if (name == NULL || num_workers >= RTE_DISTRIB_MAX_WORKERS) {
407 snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name);
408 mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS);
415 strlcpy(d->name, name, sizeof(d->name));
416 d->num_workers = num_workers;
418 distributor_list = RTE_TAILQ_CAST(rte_distributor_tailq.head,
419 rte_distributor_list);
421 rte_mcfg_tailq_write_lock();
422 TAILQ_INSERT_TAIL(distributor_list, d, next);
423 rte_mcfg_tailq_write_unlock();
427 VERSION_SYMBOL(rte_distributor_create, _v20, 2.0);