ethdev: fix MAC address in telemetry device info
[dpdk.git] / lib / distributor / rte_distributor.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation
3  */
4
5 #include <stdio.h>
6 #include <sys/queue.h>
7 #include <string.h>
8 #include <rte_mbuf.h>
9 #include <rte_cycles.h>
10 #include <rte_memzone.h>
11 #include <rte_errno.h>
12 #include <rte_string_fns.h>
13 #include <rte_eal_memconfig.h>
14 #include <rte_pause.h>
15 #include <rte_tailq.h>
16
17 #include "rte_distributor.h"
18 #include "rte_distributor_single.h"
19 #include "distributor_private.h"
20
21 TAILQ_HEAD(rte_dist_burst_list, rte_distributor);
22
23 static struct rte_tailq_elem rte_dist_burst_tailq = {
24         .name = "RTE_DIST_BURST",
25 };
26 EAL_REGISTER_TAILQ(rte_dist_burst_tailq)
27
28 /**** APIs called by workers ****/
29
30 /**** Burst Packet APIs called by workers ****/
31
32 void
33 rte_distributor_request_pkt(struct rte_distributor *d,
34                 unsigned int worker_id, struct rte_mbuf **oldpkt,
35                 unsigned int count)
36 {
37         struct rte_distributor_buffer *buf = &(d->bufs[worker_id]);
38         unsigned int i;
39
40         volatile int64_t *retptr64;
41
42         if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
43                 rte_distributor_request_pkt_single(d->d_single,
44                         worker_id, count ? oldpkt[0] : NULL);
45                 return;
46         }
47
48         retptr64 = &(buf->retptr64[0]);
49         /* Spin while handshake bits are set (scheduler clears it).
50          * Sync with worker on GET_BUF flag.
51          */
52         while (unlikely(__atomic_load_n(retptr64, __ATOMIC_ACQUIRE)
53                         & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))) {
54                 rte_pause();
55                 uint64_t t = rte_rdtsc()+100;
56
57                 while (rte_rdtsc() < t)
58                         rte_pause();
59         }
60
61         /*
62          * OK, if we've got here, then the scheduler has just cleared the
63          * handshake bits. Populate the retptrs with returning packets.
64          */
65
66         for (i = count; i < RTE_DIST_BURST_SIZE; i++)
67                 buf->retptr64[i] = 0;
68
69         /* Set VALID_BUF bit for each packet returned */
70         for (i = count; i-- > 0; )
71                 buf->retptr64[i] =
72                         (((int64_t)(uintptr_t)(oldpkt[i])) <<
73                         RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_VALID_BUF;
74
75         /*
76          * Finally, set the GET_BUF  to signal to distributor that cache
77          * line is ready for processing
78          * Sync with distributor to release retptrs
79          */
80         __atomic_store_n(retptr64, *retptr64 | RTE_DISTRIB_GET_BUF,
81                         __ATOMIC_RELEASE);
82 }
83
84 int
85 rte_distributor_poll_pkt(struct rte_distributor *d,
86                 unsigned int worker_id, struct rte_mbuf **pkts)
87 {
88         struct rte_distributor_buffer *buf = &d->bufs[worker_id];
89         uint64_t ret;
90         int count = 0;
91         unsigned int i;
92
93         if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
94                 pkts[0] = rte_distributor_poll_pkt_single(d->d_single,
95                         worker_id);
96                 return (pkts[0]) ? 1 : 0;
97         }
98
99         /* If any of below bits is set, return.
100          * GET_BUF is set when distributor hasn't sent any packets yet
101          * RETURN_BUF is set when distributor must retrieve in-flight packets
102          * Sync with distributor to acquire bufptrs
103          */
104         if (__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE)
105                 & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))
106                 return -1;
107
108         /* since bufptr64 is signed, this should be an arithmetic shift */
109         for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
110                 if (likely(buf->bufptr64[i] & RTE_DISTRIB_VALID_BUF)) {
111                         ret = buf->bufptr64[i] >> RTE_DISTRIB_FLAG_BITS;
112                         pkts[count++] = (struct rte_mbuf *)((uintptr_t)(ret));
113                 }
114         }
115
116         /*
117          * so now we've got the contents of the cacheline into an array of
118          * mbuf pointers, so toggle the bit so scheduler can start working
119          * on the next cacheline while we're working.
120          * Sync with distributor on GET_BUF flag. Release bufptrs.
121          */
122         __atomic_store_n(&(buf->bufptr64[0]),
123                 buf->bufptr64[0] | RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE);
124
125         return count;
126 }
127
128 int
129 rte_distributor_get_pkt(struct rte_distributor *d,
130                 unsigned int worker_id, struct rte_mbuf **pkts,
131                 struct rte_mbuf **oldpkt, unsigned int return_count)
132 {
133         int count;
134
135         if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
136                 if (return_count <= 1) {
137                         pkts[0] = rte_distributor_get_pkt_single(d->d_single,
138                                 worker_id, return_count ? oldpkt[0] : NULL);
139                         return (pkts[0]) ? 1 : 0;
140                 } else
141                         return -EINVAL;
142         }
143
144         rte_distributor_request_pkt(d, worker_id, oldpkt, return_count);
145
146         count = rte_distributor_poll_pkt(d, worker_id, pkts);
147         while (count == -1) {
148                 uint64_t t = rte_rdtsc() + 100;
149
150                 while (rte_rdtsc() < t)
151                         rte_pause();
152
153                 count = rte_distributor_poll_pkt(d, worker_id, pkts);
154         }
155         return count;
156 }
157
158 int
159 rte_distributor_return_pkt(struct rte_distributor *d,
160                 unsigned int worker_id, struct rte_mbuf **oldpkt, int num)
161 {
162         struct rte_distributor_buffer *buf = &d->bufs[worker_id];
163         unsigned int i;
164
165         if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
166                 if (num == 1)
167                         return rte_distributor_return_pkt_single(d->d_single,
168                                 worker_id, oldpkt[0]);
169                 else if (num == 0)
170                         return rte_distributor_return_pkt_single(d->d_single,
171                                 worker_id, NULL);
172                 else
173                         return -EINVAL;
174         }
175
176         /* Spin while handshake bits are set (scheduler clears it).
177          * Sync with worker on GET_BUF flag.
178          */
179         while (unlikely(__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_RELAXED)
180                         & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))) {
181                 rte_pause();
182                 uint64_t t = rte_rdtsc()+100;
183
184                 while (rte_rdtsc() < t)
185                         rte_pause();
186         }
187
188         /* Sync with distributor to acquire retptrs */
189         __atomic_thread_fence(__ATOMIC_ACQUIRE);
190         for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
191                 /* Switch off the return bit first */
192                 buf->retptr64[i] = 0;
193
194         for (i = num; i-- > 0; )
195                 buf->retptr64[i] = (((int64_t)(uintptr_t)oldpkt[i]) <<
196                         RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_VALID_BUF;
197
198         /* Use RETURN_BUF on bufptr64 to notify distributor that
199          * we won't read any mbufs from there even if GET_BUF is set.
200          * This allows distributor to retrieve in-flight already sent packets.
201          */
202         __atomic_or_fetch(&(buf->bufptr64[0]), RTE_DISTRIB_RETURN_BUF,
203                 __ATOMIC_ACQ_REL);
204
205         /* set the RETURN_BUF on retptr64 even if we got no returns.
206          * Sync with distributor on RETURN_BUF flag. Release retptrs.
207          * Notify distributor that we don't request more packets any more.
208          */
209         __atomic_store_n(&(buf->retptr64[0]),
210                 buf->retptr64[0] | RTE_DISTRIB_RETURN_BUF, __ATOMIC_RELEASE);
211
212         return 0;
213 }
214
215 /**** APIs called on distributor core ***/
216
217 /* stores a packet returned from a worker inside the returns array */
218 static inline void
219 store_return(uintptr_t oldbuf, struct rte_distributor *d,
220                 unsigned int *ret_start, unsigned int *ret_count)
221 {
222         if (!oldbuf)
223                 return;
224         /* store returns in a circular buffer */
225         d->returns.mbufs[(*ret_start + *ret_count) & RTE_DISTRIB_RETURNS_MASK]
226                         = (void *)oldbuf;
227         *ret_start += (*ret_count == RTE_DISTRIB_RETURNS_MASK);
228         *ret_count += (*ret_count != RTE_DISTRIB_RETURNS_MASK);
229 }
230
231 /*
232  * Match then flow_ids (tags) of the incoming packets to the flow_ids
233  * of the inflight packets (both inflight on the workers and in each worker
234  * backlog). This will then allow us to pin those packets to the relevant
235  * workers to give us our atomic flow pinning.
236  */
237 void
238 find_match_scalar(struct rte_distributor *d,
239                         uint16_t *data_ptr,
240                         uint16_t *output_ptr)
241 {
242         struct rte_distributor_backlog *bl;
243         uint16_t i, j, w;
244
245         /*
246          * Function overview:
247          * 1. Loop through all worker ID's
248          * 2. Compare the current inflights to the incoming tags
249          * 3. Compare the current backlog to the incoming tags
250          * 4. Add any matches to the output
251          */
252
253         for (j = 0 ; j < RTE_DIST_BURST_SIZE; j++)
254                 output_ptr[j] = 0;
255
256         for (i = 0; i < d->num_workers; i++) {
257                 bl = &d->backlog[i];
258
259                 for (j = 0; j < RTE_DIST_BURST_SIZE ; j++)
260                         for (w = 0; w < RTE_DIST_BURST_SIZE; w++)
261                                 if (d->in_flight_tags[i][w] == data_ptr[j]) {
262                                         output_ptr[j] = i+1;
263                                         break;
264                                 }
265                 for (j = 0; j < RTE_DIST_BURST_SIZE; j++)
266                         for (w = 0; w < RTE_DIST_BURST_SIZE; w++)
267                                 if (bl->tags[w] == data_ptr[j]) {
268                                         output_ptr[j] = i+1;
269                                         break;
270                                 }
271         }
272
273         /*
274          * At this stage, the output contains 8 16-bit values, with
275          * each non-zero value containing the worker ID on which the
276          * corresponding flow is pinned to.
277          */
278 }
279
280 /*
281  * When worker called rte_distributor_return_pkt()
282  * and passed RTE_DISTRIB_RETURN_BUF handshake through retptr64,
283  * distributor must retrieve both inflight and backlog packets assigned
284  * to the worker and reprocess them to another worker.
285  */
286 static void
287 handle_worker_shutdown(struct rte_distributor *d, unsigned int wkr)
288 {
289         struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
290         /* double BURST size for storing both inflights and backlog */
291         struct rte_mbuf *pkts[RTE_DIST_BURST_SIZE * 2];
292         unsigned int pkts_count = 0;
293         unsigned int i;
294
295         /* If GET_BUF is cleared there are in-flight packets sent
296          * to worker which does not require new packets.
297          * They must be retrieved and assigned to another worker.
298          */
299         if (!(__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE)
300                 & RTE_DISTRIB_GET_BUF))
301                 for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
302                         if (buf->bufptr64[i] & RTE_DISTRIB_VALID_BUF)
303                                 pkts[pkts_count++] = (void *)((uintptr_t)
304                                         (buf->bufptr64[i]
305                                                 >> RTE_DISTRIB_FLAG_BITS));
306
307         /* Make following operations on handshake flags on bufptr64:
308          * - set GET_BUF to indicate that distributor can overwrite buffer
309          *     with new packets if worker will make a new request.
310          * - clear RETURN_BUF to unlock reads on worker side.
311          */
312         __atomic_store_n(&(buf->bufptr64[0]), RTE_DISTRIB_GET_BUF,
313                 __ATOMIC_RELEASE);
314
315         /* Collect backlog packets from worker */
316         for (i = 0; i < d->backlog[wkr].count; i++)
317                 pkts[pkts_count++] = (void *)((uintptr_t)
318                         (d->backlog[wkr].pkts[i] >> RTE_DISTRIB_FLAG_BITS));
319
320         d->backlog[wkr].count = 0;
321
322         /* Clear both inflight and backlog tags */
323         for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
324                 d->in_flight_tags[wkr][i] = 0;
325                 d->backlog[wkr].tags[i] = 0;
326         }
327
328         /* Recursive call */
329         if (pkts_count > 0)
330                 rte_distributor_process(d, pkts, pkts_count);
331 }
332
333
334 /*
335  * When the handshake bits indicate that there are packets coming
336  * back from the worker, this function is called to copy and store
337  * the valid returned pointers (store_return).
338  */
339 static unsigned int
340 handle_returns(struct rte_distributor *d, unsigned int wkr)
341 {
342         struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
343         uintptr_t oldbuf;
344         unsigned int ret_start = d->returns.start,
345                         ret_count = d->returns.count;
346         unsigned int count = 0;
347         unsigned int i;
348
349         /* Sync on GET_BUF flag. Acquire retptrs. */
350         if (__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_ACQUIRE)
351                 & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF)) {
352                 for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
353                         if (buf->retptr64[i] & RTE_DISTRIB_VALID_BUF) {
354                                 oldbuf = ((uintptr_t)(buf->retptr64[i] >>
355                                         RTE_DISTRIB_FLAG_BITS));
356                                 /* store returns in a circular buffer */
357                                 store_return(oldbuf, d, &ret_start, &ret_count);
358                                 count++;
359                                 buf->retptr64[i] &= ~RTE_DISTRIB_VALID_BUF;
360                         }
361                 }
362                 d->returns.start = ret_start;
363                 d->returns.count = ret_count;
364
365                 /* If worker requested packets with GET_BUF, set it to active
366                  * otherwise (RETURN_BUF), set it to not active.
367                  */
368                 d->activesum -= d->active[wkr];
369                 d->active[wkr] = !!(buf->retptr64[0] & RTE_DISTRIB_GET_BUF);
370                 d->activesum += d->active[wkr];
371
372                 /* If worker returned packets without requesting new ones,
373                  * handle all in-flights and backlog packets assigned to it.
374                  */
375                 if (unlikely(buf->retptr64[0] & RTE_DISTRIB_RETURN_BUF))
376                         handle_worker_shutdown(d, wkr);
377
378                 /* Clear for the worker to populate with more returns.
379                  * Sync with distributor on GET_BUF flag. Release retptrs.
380                  */
381                 __atomic_store_n(&(buf->retptr64[0]), 0, __ATOMIC_RELEASE);
382         }
383         return count;
384 }
385
386 /*
387  * This function releases a burst (cache line) to a worker.
388  * It is called from the process function when a cacheline is
389  * full to make room for more packets for that worker, or when
390  * all packets have been assigned to bursts and need to be flushed
391  * to the workers.
392  * It also needs to wait for any outstanding packets from the worker
393  * before sending out new packets.
394  */
395 static unsigned int
396 release(struct rte_distributor *d, unsigned int wkr)
397 {
398         struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
399         unsigned int i;
400
401         handle_returns(d, wkr);
402         if (unlikely(!d->active[wkr]))
403                 return 0;
404
405         /* Sync with worker on GET_BUF flag */
406         while (!(__atomic_load_n(&(d->bufs[wkr].bufptr64[0]), __ATOMIC_ACQUIRE)
407                 & RTE_DISTRIB_GET_BUF)) {
408                 handle_returns(d, wkr);
409                 if (unlikely(!d->active[wkr]))
410                         return 0;
411                 rte_pause();
412         }
413
414         buf->count = 0;
415
416         for (i = 0; i < d->backlog[wkr].count; i++) {
417                 d->bufs[wkr].bufptr64[i] = d->backlog[wkr].pkts[i] |
418                                 RTE_DISTRIB_GET_BUF | RTE_DISTRIB_VALID_BUF;
419                 d->in_flight_tags[wkr][i] = d->backlog[wkr].tags[i];
420         }
421         buf->count = i;
422         for ( ; i < RTE_DIST_BURST_SIZE ; i++) {
423                 buf->bufptr64[i] = RTE_DISTRIB_GET_BUF;
424                 d->in_flight_tags[wkr][i] = 0;
425         }
426
427         d->backlog[wkr].count = 0;
428
429         /* Clear the GET bit.
430          * Sync with worker on GET_BUF flag. Release bufptrs.
431          */
432         __atomic_store_n(&(buf->bufptr64[0]),
433                 buf->bufptr64[0] & ~RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE);
434         return  buf->count;
435
436 }
437
438
439 /* process a set of packets to distribute them to workers */
440 int
441 rte_distributor_process(struct rte_distributor *d,
442                 struct rte_mbuf **mbufs, unsigned int num_mbufs)
443 {
444         unsigned int next_idx = 0;
445         static unsigned int wkr;
446         struct rte_mbuf *next_mb = NULL;
447         int64_t next_value = 0;
448         uint16_t new_tag = 0;
449         uint16_t flows[RTE_DIST_BURST_SIZE] __rte_cache_aligned;
450         unsigned int i, j, w, wid, matching_required;
451
452         if (d->alg_type == RTE_DIST_ALG_SINGLE) {
453                 /* Call the old API */
454                 return rte_distributor_process_single(d->d_single,
455                         mbufs, num_mbufs);
456         }
457
458         for (wid = 0 ; wid < d->num_workers; wid++)
459                 handle_returns(d, wid);
460
461         if (unlikely(num_mbufs == 0)) {
462                 /* Flush out all non-full cache-lines to workers. */
463                 for (wid = 0 ; wid < d->num_workers; wid++) {
464                         /* Sync with worker on GET_BUF flag. */
465                         if (__atomic_load_n(&(d->bufs[wid].bufptr64[0]),
466                                 __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF) {
467                                 d->bufs[wid].count = 0;
468                                 release(d, wid);
469                                 handle_returns(d, wid);
470                         }
471                 }
472                 return 0;
473         }
474
475         if (unlikely(!d->activesum))
476                 return 0;
477
478         while (next_idx < num_mbufs) {
479                 uint16_t matches[RTE_DIST_BURST_SIZE] __rte_aligned(128);
480                 unsigned int pkts;
481
482                 if ((num_mbufs - next_idx) < RTE_DIST_BURST_SIZE)
483                         pkts = num_mbufs - next_idx;
484                 else
485                         pkts = RTE_DIST_BURST_SIZE;
486
487                 for (i = 0; i < pkts; i++) {
488                         if (mbufs[next_idx + i]) {
489                                 /* flows have to be non-zero */
490                                 flows[i] = mbufs[next_idx + i]->hash.usr | 1;
491                         } else
492                                 flows[i] = 0;
493                 }
494                 for (; i < RTE_DIST_BURST_SIZE; i++)
495                         flows[i] = 0;
496
497                 matching_required = 1;
498
499                 for (j = 0; j < pkts; j++) {
500                         if (unlikely(!d->activesum))
501                                 return next_idx;
502
503                         if (unlikely(matching_required)) {
504                                 switch (d->dist_match_fn) {
505                                 case RTE_DIST_MATCH_VECTOR:
506                                         find_match_vec(d, &flows[0],
507                                                 &matches[0]);
508                                         break;
509                                 default:
510                                         find_match_scalar(d, &flows[0],
511                                                 &matches[0]);
512                                 }
513                                 matching_required = 0;
514                         }
515                 /*
516                  * Matches array now contain the intended worker ID (+1) of
517                  * the incoming packets. Any zeroes need to be assigned
518                  * workers.
519                  */
520
521                         next_mb = mbufs[next_idx++];
522                         next_value = (((int64_t)(uintptr_t)next_mb) <<
523                                         RTE_DISTRIB_FLAG_BITS);
524                         /*
525                          * User is advocated to set tag value for each
526                          * mbuf before calling rte_distributor_process.
527                          * User defined tags are used to identify flows,
528                          * or sessions.
529                          */
530                         /* flows MUST be non-zero */
531                         new_tag = (uint16_t)(next_mb->hash.usr) | 1;
532
533                         /*
534                          * Uncommenting the next line will cause the find_match
535                          * function to be optimized out, making this function
536                          * do parallel (non-atomic) distribution
537                          */
538                         /* matches[j] = 0; */
539
540                         if (matches[j] && d->active[matches[j]-1]) {
541                                 struct rte_distributor_backlog *bl =
542                                                 &d->backlog[matches[j]-1];
543                                 if (unlikely(bl->count ==
544                                                 RTE_DIST_BURST_SIZE)) {
545                                         release(d, matches[j]-1);
546                                         if (!d->active[matches[j]-1]) {
547                                                 j--;
548                                                 next_idx--;
549                                                 matching_required = 1;
550                                                 continue;
551                                         }
552                                 }
553
554                                 /* Add to worker that already has flow */
555                                 unsigned int idx = bl->count++;
556
557                                 bl->tags[idx] = new_tag;
558                                 bl->pkts[idx] = next_value;
559
560                         } else {
561                                 struct rte_distributor_backlog *bl;
562
563                                 while (unlikely(!d->active[wkr]))
564                                         wkr = (wkr + 1) % d->num_workers;
565                                 bl = &d->backlog[wkr];
566
567                                 if (unlikely(bl->count ==
568                                                 RTE_DIST_BURST_SIZE)) {
569                                         release(d, wkr);
570                                         if (!d->active[wkr]) {
571                                                 j--;
572                                                 next_idx--;
573                                                 matching_required = 1;
574                                                 continue;
575                                         }
576                                 }
577
578                                 /* Add to current worker worker */
579                                 unsigned int idx = bl->count++;
580
581                                 bl->tags[idx] = new_tag;
582                                 bl->pkts[idx] = next_value;
583                                 /*
584                                  * Now that we've just added an unpinned flow
585                                  * to a worker, we need to ensure that all
586                                  * other packets with that same flow will go
587                                  * to the same worker in this burst.
588                                  */
589                                 for (w = j; w < pkts; w++)
590                                         if (flows[w] == new_tag)
591                                                 matches[w] = wkr+1;
592                         }
593                 }
594                 wkr = (wkr + 1) % d->num_workers;
595         }
596
597         /* Flush out all non-full cache-lines to workers. */
598         for (wid = 0 ; wid < d->num_workers; wid++)
599                 /* Sync with worker on GET_BUF flag. */
600                 if ((__atomic_load_n(&(d->bufs[wid].bufptr64[0]),
601                         __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF)) {
602                         d->bufs[wid].count = 0;
603                         release(d, wid);
604                 }
605
606         return num_mbufs;
607 }
608
609 /* return to the caller, packets returned from workers */
610 int
611 rte_distributor_returned_pkts(struct rte_distributor *d,
612                 struct rte_mbuf **mbufs, unsigned int max_mbufs)
613 {
614         struct rte_distributor_returned_pkts *returns = &d->returns;
615         unsigned int retval = (max_mbufs < returns->count) ?
616                         max_mbufs : returns->count;
617         unsigned int i;
618
619         if (d->alg_type == RTE_DIST_ALG_SINGLE) {
620                 /* Call the old API */
621                 return rte_distributor_returned_pkts_single(d->d_single,
622                                 mbufs, max_mbufs);
623         }
624
625         for (i = 0; i < retval; i++) {
626                 unsigned int idx = (returns->start + i) &
627                                 RTE_DISTRIB_RETURNS_MASK;
628
629                 mbufs[i] = returns->mbufs[idx];
630         }
631         returns->start += i;
632         returns->count -= i;
633
634         return retval;
635 }
636
637 /*
638  * Return the number of packets in-flight in a distributor, i.e. packets
639  * being worked on or queued up in a backlog.
640  */
641 static inline unsigned int
642 total_outstanding(const struct rte_distributor *d)
643 {
644         unsigned int wkr, total_outstanding = 0;
645
646         for (wkr = 0; wkr < d->num_workers; wkr++)
647                 total_outstanding += d->backlog[wkr].count + d->bufs[wkr].count;
648
649         return total_outstanding;
650 }
651
652 /*
653  * Flush the distributor, so that there are no outstanding packets in flight or
654  * queued up.
655  */
656 int
657 rte_distributor_flush(struct rte_distributor *d)
658 {
659         unsigned int flushed;
660         unsigned int wkr;
661
662         if (d->alg_type == RTE_DIST_ALG_SINGLE) {
663                 /* Call the old API */
664                 return rte_distributor_flush_single(d->d_single);
665         }
666
667         flushed = total_outstanding(d);
668
669         while (total_outstanding(d) > 0)
670                 rte_distributor_process(d, NULL, 0);
671
672         /* wait 10ms to allow all worker drain the pkts */
673         rte_delay_us(10000);
674
675         /*
676          * Send empty burst to all workers to allow them to exit
677          * gracefully, should they need to.
678          */
679         rte_distributor_process(d, NULL, 0);
680
681         for (wkr = 0; wkr < d->num_workers; wkr++)
682                 handle_returns(d, wkr);
683
684         return flushed;
685 }
686
687 /* clears the internal returns array in the distributor */
688 void
689 rte_distributor_clear_returns(struct rte_distributor *d)
690 {
691         unsigned int wkr;
692
693         if (d->alg_type == RTE_DIST_ALG_SINGLE) {
694                 /* Call the old API */
695                 rte_distributor_clear_returns_single(d->d_single);
696                 return;
697         }
698
699         /* throw away returns, so workers can exit */
700         for (wkr = 0; wkr < d->num_workers; wkr++)
701                 /* Sync with worker. Release retptrs. */
702                 __atomic_store_n(&(d->bufs[wkr].retptr64[0]), 0,
703                                 __ATOMIC_RELEASE);
704
705         d->returns.start = d->returns.count = 0;
706 }
707
708 /* creates a distributor instance */
709 struct rte_distributor *
710 rte_distributor_create(const char *name,
711                 unsigned int socket_id,
712                 unsigned int num_workers,
713                 unsigned int alg_type)
714 {
715         struct rte_distributor *d;
716         struct rte_dist_burst_list *dist_burst_list;
717         char mz_name[RTE_MEMZONE_NAMESIZE];
718         const struct rte_memzone *mz;
719         unsigned int i;
720
721         /* TODO Reorganise function properly around RTE_DIST_ALG_SINGLE/BURST */
722
723         /* compilation-time checks */
724         RTE_BUILD_BUG_ON((sizeof(*d) & RTE_CACHE_LINE_MASK) != 0);
725         RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0);
726
727         if (name == NULL || num_workers >=
728                 (unsigned int)RTE_MIN(RTE_DISTRIB_MAX_WORKERS, RTE_MAX_LCORE)) {
729                 rte_errno = EINVAL;
730                 return NULL;
731         }
732
733         if (alg_type == RTE_DIST_ALG_SINGLE) {
734                 d = malloc(sizeof(struct rte_distributor));
735                 if (d == NULL) {
736                         rte_errno = ENOMEM;
737                         return NULL;
738                 }
739                 d->d_single = rte_distributor_create_single(name,
740                                 socket_id, num_workers);
741                 if (d->d_single == NULL) {
742                         free(d);
743                         /* rte_errno will have been set */
744                         return NULL;
745                 }
746                 d->alg_type = alg_type;
747                 return d;
748         }
749
750         snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name);
751         mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS);
752         if (mz == NULL) {
753                 rte_errno = ENOMEM;
754                 return NULL;
755         }
756
757         d = mz->addr;
758         strlcpy(d->name, name, sizeof(d->name));
759         d->num_workers = num_workers;
760         d->alg_type = alg_type;
761
762         d->dist_match_fn = RTE_DIST_MATCH_SCALAR;
763 #if defined(RTE_ARCH_X86)
764         if (rte_vect_get_max_simd_bitwidth() >= RTE_VECT_SIMD_128)
765                 d->dist_match_fn = RTE_DIST_MATCH_VECTOR;
766 #endif
767
768         /*
769          * Set up the backlog tags so they're pointing at the second cache
770          * line for performance during flow matching
771          */
772         for (i = 0 ; i < num_workers ; i++)
773                 d->backlog[i].tags = &d->in_flight_tags[i][RTE_DIST_BURST_SIZE];
774
775         memset(d->active, 0, sizeof(d->active));
776         d->activesum = 0;
777
778         dist_burst_list = RTE_TAILQ_CAST(rte_dist_burst_tailq.head,
779                                           rte_dist_burst_list);
780
781
782         rte_mcfg_tailq_write_lock();
783         TAILQ_INSERT_TAIL(dist_burst_list, d, next);
784         rte_mcfg_tailq_write_unlock();
785
786         return d;
787 }