kvargs: remove experimental function to compare string
[dpdk.git] / lib / distributor / rte_distributor.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation
3  */
4
5 #include <stdio.h>
6 #include <sys/queue.h>
7 #include <string.h>
8 #include <rte_mbuf.h>
9 #include <rte_memory.h>
10 #include <rte_cycles.h>
11 #include <rte_memzone.h>
12 #include <rte_errno.h>
13 #include <rte_string_fns.h>
14 #include <rte_eal_memconfig.h>
15 #include <rte_pause.h>
16 #include <rte_tailq.h>
17 #include <rte_vect.h>
18
19 #include "rte_distributor.h"
20 #include "rte_distributor_single.h"
21 #include "distributor_private.h"
22
23 TAILQ_HEAD(rte_dist_burst_list, rte_distributor);
24
25 static struct rte_tailq_elem rte_dist_burst_tailq = {
26         .name = "RTE_DIST_BURST",
27 };
28 EAL_REGISTER_TAILQ(rte_dist_burst_tailq)
29
30 /**** APIs called by workers ****/
31
32 /**** Burst Packet APIs called by workers ****/
33
34 void
35 rte_distributor_request_pkt(struct rte_distributor *d,
36                 unsigned int worker_id, struct rte_mbuf **oldpkt,
37                 unsigned int count)
38 {
39         struct rte_distributor_buffer *buf = &(d->bufs[worker_id]);
40         unsigned int i;
41
42         volatile int64_t *retptr64;
43
44         if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
45                 rte_distributor_request_pkt_single(d->d_single,
46                         worker_id, count ? oldpkt[0] : NULL);
47                 return;
48         }
49
50         retptr64 = &(buf->retptr64[0]);
51         /* Spin while handshake bits are set (scheduler clears it).
52          * Sync with worker on GET_BUF flag.
53          */
54         while (unlikely(__atomic_load_n(retptr64, __ATOMIC_ACQUIRE)
55                         & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))) {
56                 rte_pause();
57                 uint64_t t = rte_rdtsc()+100;
58
59                 while (rte_rdtsc() < t)
60                         rte_pause();
61         }
62
63         /*
64          * OK, if we've got here, then the scheduler has just cleared the
65          * handshake bits. Populate the retptrs with returning packets.
66          */
67
68         for (i = count; i < RTE_DIST_BURST_SIZE; i++)
69                 buf->retptr64[i] = 0;
70
71         /* Set VALID_BUF bit for each packet returned */
72         for (i = count; i-- > 0; )
73                 buf->retptr64[i] =
74                         (((int64_t)(uintptr_t)(oldpkt[i])) <<
75                         RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_VALID_BUF;
76
77         /*
78          * Finally, set the GET_BUF  to signal to distributor that cache
79          * line is ready for processing
80          * Sync with distributor to release retptrs
81          */
82         __atomic_store_n(retptr64, *retptr64 | RTE_DISTRIB_GET_BUF,
83                         __ATOMIC_RELEASE);
84 }
85
86 int
87 rte_distributor_poll_pkt(struct rte_distributor *d,
88                 unsigned int worker_id, struct rte_mbuf **pkts)
89 {
90         struct rte_distributor_buffer *buf = &d->bufs[worker_id];
91         uint64_t ret;
92         int count = 0;
93         unsigned int i;
94
95         if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
96                 pkts[0] = rte_distributor_poll_pkt_single(d->d_single,
97                         worker_id);
98                 return (pkts[0]) ? 1 : 0;
99         }
100
101         /* If any of below bits is set, return.
102          * GET_BUF is set when distributor hasn't sent any packets yet
103          * RETURN_BUF is set when distributor must retrieve in-flight packets
104          * Sync with distributor to acquire bufptrs
105          */
106         if (__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE)
107                 & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))
108                 return -1;
109
110         /* since bufptr64 is signed, this should be an arithmetic shift */
111         for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
112                 if (likely(buf->bufptr64[i] & RTE_DISTRIB_VALID_BUF)) {
113                         ret = buf->bufptr64[i] >> RTE_DISTRIB_FLAG_BITS;
114                         pkts[count++] = (struct rte_mbuf *)((uintptr_t)(ret));
115                 }
116         }
117
118         /*
119          * so now we've got the contents of the cacheline into an array of
120          * mbuf pointers, so toggle the bit so scheduler can start working
121          * on the next cacheline while we're working.
122          * Sync with distributor on GET_BUF flag. Release bufptrs.
123          */
124         __atomic_store_n(&(buf->bufptr64[0]),
125                 buf->bufptr64[0] | RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE);
126
127         return count;
128 }
129
130 int
131 rte_distributor_get_pkt(struct rte_distributor *d,
132                 unsigned int worker_id, struct rte_mbuf **pkts,
133                 struct rte_mbuf **oldpkt, unsigned int return_count)
134 {
135         int count;
136
137         if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
138                 if (return_count <= 1) {
139                         pkts[0] = rte_distributor_get_pkt_single(d->d_single,
140                                 worker_id, return_count ? oldpkt[0] : NULL);
141                         return (pkts[0]) ? 1 : 0;
142                 } else
143                         return -EINVAL;
144         }
145
146         rte_distributor_request_pkt(d, worker_id, oldpkt, return_count);
147
148         count = rte_distributor_poll_pkt(d, worker_id, pkts);
149         while (count == -1) {
150                 uint64_t t = rte_rdtsc() + 100;
151
152                 while (rte_rdtsc() < t)
153                         rte_pause();
154
155                 count = rte_distributor_poll_pkt(d, worker_id, pkts);
156         }
157         return count;
158 }
159
160 int
161 rte_distributor_return_pkt(struct rte_distributor *d,
162                 unsigned int worker_id, struct rte_mbuf **oldpkt, int num)
163 {
164         struct rte_distributor_buffer *buf = &d->bufs[worker_id];
165         unsigned int i;
166
167         if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
168                 if (num == 1)
169                         return rte_distributor_return_pkt_single(d->d_single,
170                                 worker_id, oldpkt[0]);
171                 else if (num == 0)
172                         return rte_distributor_return_pkt_single(d->d_single,
173                                 worker_id, NULL);
174                 else
175                         return -EINVAL;
176         }
177
178         /* Spin while handshake bits are set (scheduler clears it).
179          * Sync with worker on GET_BUF flag.
180          */
181         while (unlikely(__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_RELAXED)
182                         & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF))) {
183                 rte_pause();
184                 uint64_t t = rte_rdtsc()+100;
185
186                 while (rte_rdtsc() < t)
187                         rte_pause();
188         }
189
190         /* Sync with distributor to acquire retptrs */
191         __atomic_thread_fence(__ATOMIC_ACQUIRE);
192         for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
193                 /* Switch off the return bit first */
194                 buf->retptr64[i] = 0;
195
196         for (i = num; i-- > 0; )
197                 buf->retptr64[i] = (((int64_t)(uintptr_t)oldpkt[i]) <<
198                         RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_VALID_BUF;
199
200         /* Use RETURN_BUF on bufptr64 to notify distributor that
201          * we won't read any mbufs from there even if GET_BUF is set.
202          * This allows distributor to retrieve in-flight already sent packets.
203          */
204         __atomic_or_fetch(&(buf->bufptr64[0]), RTE_DISTRIB_RETURN_BUF,
205                 __ATOMIC_ACQ_REL);
206
207         /* set the RETURN_BUF on retptr64 even if we got no returns.
208          * Sync with distributor on RETURN_BUF flag. Release retptrs.
209          * Notify distributor that we don't request more packets any more.
210          */
211         __atomic_store_n(&(buf->retptr64[0]),
212                 buf->retptr64[0] | RTE_DISTRIB_RETURN_BUF, __ATOMIC_RELEASE);
213
214         return 0;
215 }
216
217 /**** APIs called on distributor core ***/
218
219 /* stores a packet returned from a worker inside the returns array */
220 static inline void
221 store_return(uintptr_t oldbuf, struct rte_distributor *d,
222                 unsigned int *ret_start, unsigned int *ret_count)
223 {
224         if (!oldbuf)
225                 return;
226         /* store returns in a circular buffer */
227         d->returns.mbufs[(*ret_start + *ret_count) & RTE_DISTRIB_RETURNS_MASK]
228                         = (void *)oldbuf;
229         *ret_start += (*ret_count == RTE_DISTRIB_RETURNS_MASK);
230         *ret_count += (*ret_count != RTE_DISTRIB_RETURNS_MASK);
231 }
232
233 /*
234  * Match then flow_ids (tags) of the incoming packets to the flow_ids
235  * of the inflight packets (both inflight on the workers and in each worker
236  * backlog). This will then allow us to pin those packets to the relevant
237  * workers to give us our atomic flow pinning.
238  */
239 void
240 find_match_scalar(struct rte_distributor *d,
241                         uint16_t *data_ptr,
242                         uint16_t *output_ptr)
243 {
244         struct rte_distributor_backlog *bl;
245         uint16_t i, j, w;
246
247         /*
248          * Function overview:
249          * 1. Loop through all worker ID's
250          * 2. Compare the current inflights to the incoming tags
251          * 3. Compare the current backlog to the incoming tags
252          * 4. Add any matches to the output
253          */
254
255         for (j = 0 ; j < RTE_DIST_BURST_SIZE; j++)
256                 output_ptr[j] = 0;
257
258         for (i = 0; i < d->num_workers; i++) {
259                 bl = &d->backlog[i];
260
261                 for (j = 0; j < RTE_DIST_BURST_SIZE ; j++)
262                         for (w = 0; w < RTE_DIST_BURST_SIZE; w++)
263                                 if (d->in_flight_tags[i][w] == data_ptr[j]) {
264                                         output_ptr[j] = i+1;
265                                         break;
266                                 }
267                 for (j = 0; j < RTE_DIST_BURST_SIZE; j++)
268                         for (w = 0; w < RTE_DIST_BURST_SIZE; w++)
269                                 if (bl->tags[w] == data_ptr[j]) {
270                                         output_ptr[j] = i+1;
271                                         break;
272                                 }
273         }
274
275         /*
276          * At this stage, the output contains 8 16-bit values, with
277          * each non-zero value containing the worker ID on which the
278          * corresponding flow is pinned to.
279          */
280 }
281
282 /*
283  * When worker called rte_distributor_return_pkt()
284  * and passed RTE_DISTRIB_RETURN_BUF handshake through retptr64,
285  * distributor must retrieve both inflight and backlog packets assigned
286  * to the worker and reprocess them to another worker.
287  */
288 static void
289 handle_worker_shutdown(struct rte_distributor *d, unsigned int wkr)
290 {
291         struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
292         /* double BURST size for storing both inflights and backlog */
293         struct rte_mbuf *pkts[RTE_DIST_BURST_SIZE * 2];
294         unsigned int pkts_count = 0;
295         unsigned int i;
296
297         /* If GET_BUF is cleared there are in-flight packets sent
298          * to worker which does not require new packets.
299          * They must be retrieved and assigned to another worker.
300          */
301         if (!(__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE)
302                 & RTE_DISTRIB_GET_BUF))
303                 for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
304                         if (buf->bufptr64[i] & RTE_DISTRIB_VALID_BUF)
305                                 pkts[pkts_count++] = (void *)((uintptr_t)
306                                         (buf->bufptr64[i]
307                                                 >> RTE_DISTRIB_FLAG_BITS));
308
309         /* Make following operations on handshake flags on bufptr64:
310          * - set GET_BUF to indicate that distributor can overwrite buffer
311          *     with new packets if worker will make a new request.
312          * - clear RETURN_BUF to unlock reads on worker side.
313          */
314         __atomic_store_n(&(buf->bufptr64[0]), RTE_DISTRIB_GET_BUF,
315                 __ATOMIC_RELEASE);
316
317         /* Collect backlog packets from worker */
318         for (i = 0; i < d->backlog[wkr].count; i++)
319                 pkts[pkts_count++] = (void *)((uintptr_t)
320                         (d->backlog[wkr].pkts[i] >> RTE_DISTRIB_FLAG_BITS));
321
322         d->backlog[wkr].count = 0;
323
324         /* Clear both inflight and backlog tags */
325         for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
326                 d->in_flight_tags[wkr][i] = 0;
327                 d->backlog[wkr].tags[i] = 0;
328         }
329
330         /* Recursive call */
331         if (pkts_count > 0)
332                 rte_distributor_process(d, pkts, pkts_count);
333 }
334
335
336 /*
337  * When the handshake bits indicate that there are packets coming
338  * back from the worker, this function is called to copy and store
339  * the valid returned pointers (store_return).
340  */
341 static unsigned int
342 handle_returns(struct rte_distributor *d, unsigned int wkr)
343 {
344         struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
345         uintptr_t oldbuf;
346         unsigned int ret_start = d->returns.start,
347                         ret_count = d->returns.count;
348         unsigned int count = 0;
349         unsigned int i;
350
351         /* Sync on GET_BUF flag. Acquire retptrs. */
352         if (__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_ACQUIRE)
353                 & (RTE_DISTRIB_GET_BUF | RTE_DISTRIB_RETURN_BUF)) {
354                 for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
355                         if (buf->retptr64[i] & RTE_DISTRIB_VALID_BUF) {
356                                 oldbuf = ((uintptr_t)(buf->retptr64[i] >>
357                                         RTE_DISTRIB_FLAG_BITS));
358                                 /* store returns in a circular buffer */
359                                 store_return(oldbuf, d, &ret_start, &ret_count);
360                                 count++;
361                                 buf->retptr64[i] &= ~RTE_DISTRIB_VALID_BUF;
362                         }
363                 }
364                 d->returns.start = ret_start;
365                 d->returns.count = ret_count;
366
367                 /* If worker requested packets with GET_BUF, set it to active
368                  * otherwise (RETURN_BUF), set it to not active.
369                  */
370                 d->activesum -= d->active[wkr];
371                 d->active[wkr] = !!(buf->retptr64[0] & RTE_DISTRIB_GET_BUF);
372                 d->activesum += d->active[wkr];
373
374                 /* If worker returned packets without requesting new ones,
375                  * handle all in-flights and backlog packets assigned to it.
376                  */
377                 if (unlikely(buf->retptr64[0] & RTE_DISTRIB_RETURN_BUF))
378                         handle_worker_shutdown(d, wkr);
379
380                 /* Clear for the worker to populate with more returns.
381                  * Sync with distributor on GET_BUF flag. Release retptrs.
382                  */
383                 __atomic_store_n(&(buf->retptr64[0]), 0, __ATOMIC_RELEASE);
384         }
385         return count;
386 }
387
388 /*
389  * This function releases a burst (cache line) to a worker.
390  * It is called from the process function when a cacheline is
391  * full to make room for more packets for that worker, or when
392  * all packets have been assigned to bursts and need to be flushed
393  * to the workers.
394  * It also needs to wait for any outstanding packets from the worker
395  * before sending out new packets.
396  */
397 static unsigned int
398 release(struct rte_distributor *d, unsigned int wkr)
399 {
400         struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
401         unsigned int i;
402
403         handle_returns(d, wkr);
404         if (unlikely(!d->active[wkr]))
405                 return 0;
406
407         /* Sync with worker on GET_BUF flag */
408         while (!(__atomic_load_n(&(d->bufs[wkr].bufptr64[0]), __ATOMIC_ACQUIRE)
409                 & RTE_DISTRIB_GET_BUF)) {
410                 handle_returns(d, wkr);
411                 if (unlikely(!d->active[wkr]))
412                         return 0;
413                 rte_pause();
414         }
415
416         buf->count = 0;
417
418         for (i = 0; i < d->backlog[wkr].count; i++) {
419                 d->bufs[wkr].bufptr64[i] = d->backlog[wkr].pkts[i] |
420                                 RTE_DISTRIB_GET_BUF | RTE_DISTRIB_VALID_BUF;
421                 d->in_flight_tags[wkr][i] = d->backlog[wkr].tags[i];
422         }
423         buf->count = i;
424         for ( ; i < RTE_DIST_BURST_SIZE ; i++) {
425                 buf->bufptr64[i] = RTE_DISTRIB_GET_BUF;
426                 d->in_flight_tags[wkr][i] = 0;
427         }
428
429         d->backlog[wkr].count = 0;
430
431         /* Clear the GET bit.
432          * Sync with worker on GET_BUF flag. Release bufptrs.
433          */
434         __atomic_store_n(&(buf->bufptr64[0]),
435                 buf->bufptr64[0] & ~RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE);
436         return  buf->count;
437
438 }
439
440
441 /* process a set of packets to distribute them to workers */
442 int
443 rte_distributor_process(struct rte_distributor *d,
444                 struct rte_mbuf **mbufs, unsigned int num_mbufs)
445 {
446         unsigned int next_idx = 0;
447         static unsigned int wkr;
448         struct rte_mbuf *next_mb = NULL;
449         int64_t next_value = 0;
450         uint16_t new_tag = 0;
451         uint16_t flows[RTE_DIST_BURST_SIZE] __rte_cache_aligned;
452         unsigned int i, j, w, wid, matching_required;
453
454         if (d->alg_type == RTE_DIST_ALG_SINGLE) {
455                 /* Call the old API */
456                 return rte_distributor_process_single(d->d_single,
457                         mbufs, num_mbufs);
458         }
459
460         for (wid = 0 ; wid < d->num_workers; wid++)
461                 handle_returns(d, wid);
462
463         if (unlikely(num_mbufs == 0)) {
464                 /* Flush out all non-full cache-lines to workers. */
465                 for (wid = 0 ; wid < d->num_workers; wid++) {
466                         /* Sync with worker on GET_BUF flag. */
467                         if (__atomic_load_n(&(d->bufs[wid].bufptr64[0]),
468                                 __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF) {
469                                 d->bufs[wid].count = 0;
470                                 release(d, wid);
471                                 handle_returns(d, wid);
472                         }
473                 }
474                 return 0;
475         }
476
477         if (unlikely(!d->activesum))
478                 return 0;
479
480         while (next_idx < num_mbufs) {
481                 uint16_t matches[RTE_DIST_BURST_SIZE] __rte_aligned(128);
482                 unsigned int pkts;
483
484                 if ((num_mbufs - next_idx) < RTE_DIST_BURST_SIZE)
485                         pkts = num_mbufs - next_idx;
486                 else
487                         pkts = RTE_DIST_BURST_SIZE;
488
489                 for (i = 0; i < pkts; i++) {
490                         if (mbufs[next_idx + i]) {
491                                 /* flows have to be non-zero */
492                                 flows[i] = mbufs[next_idx + i]->hash.usr | 1;
493                         } else
494                                 flows[i] = 0;
495                 }
496                 for (; i < RTE_DIST_BURST_SIZE; i++)
497                         flows[i] = 0;
498
499                 matching_required = 1;
500
501                 for (j = 0; j < pkts; j++) {
502                         if (unlikely(!d->activesum))
503                                 return next_idx;
504
505                         if (unlikely(matching_required)) {
506                                 switch (d->dist_match_fn) {
507                                 case RTE_DIST_MATCH_VECTOR:
508                                         find_match_vec(d, &flows[0],
509                                                 &matches[0]);
510                                         break;
511                                 default:
512                                         find_match_scalar(d, &flows[0],
513                                                 &matches[0]);
514                                 }
515                                 matching_required = 0;
516                         }
517                 /*
518                  * Matches array now contain the intended worker ID (+1) of
519                  * the incoming packets. Any zeroes need to be assigned
520                  * workers.
521                  */
522
523                         next_mb = mbufs[next_idx++];
524                         next_value = (((int64_t)(uintptr_t)next_mb) <<
525                                         RTE_DISTRIB_FLAG_BITS);
526                         /*
527                          * User is advocated to set tag value for each
528                          * mbuf before calling rte_distributor_process.
529                          * User defined tags are used to identify flows,
530                          * or sessions.
531                          */
532                         /* flows MUST be non-zero */
533                         new_tag = (uint16_t)(next_mb->hash.usr) | 1;
534
535                         /*
536                          * Uncommenting the next line will cause the find_match
537                          * function to be optimized out, making this function
538                          * do parallel (non-atomic) distribution
539                          */
540                         /* matches[j] = 0; */
541
542                         if (matches[j] && d->active[matches[j]-1]) {
543                                 struct rte_distributor_backlog *bl =
544                                                 &d->backlog[matches[j]-1];
545                                 if (unlikely(bl->count ==
546                                                 RTE_DIST_BURST_SIZE)) {
547                                         release(d, matches[j]-1);
548                                         if (!d->active[matches[j]-1]) {
549                                                 j--;
550                                                 next_idx--;
551                                                 matching_required = 1;
552                                                 continue;
553                                         }
554                                 }
555
556                                 /* Add to worker that already has flow */
557                                 unsigned int idx = bl->count++;
558
559                                 bl->tags[idx] = new_tag;
560                                 bl->pkts[idx] = next_value;
561
562                         } else {
563                                 struct rte_distributor_backlog *bl;
564
565                                 while (unlikely(!d->active[wkr]))
566                                         wkr = (wkr + 1) % d->num_workers;
567                                 bl = &d->backlog[wkr];
568
569                                 if (unlikely(bl->count ==
570                                                 RTE_DIST_BURST_SIZE)) {
571                                         release(d, wkr);
572                                         if (!d->active[wkr]) {
573                                                 j--;
574                                                 next_idx--;
575                                                 matching_required = 1;
576                                                 continue;
577                                         }
578                                 }
579
580                                 /* Add to current worker worker */
581                                 unsigned int idx = bl->count++;
582
583                                 bl->tags[idx] = new_tag;
584                                 bl->pkts[idx] = next_value;
585                                 /*
586                                  * Now that we've just added an unpinned flow
587                                  * to a worker, we need to ensure that all
588                                  * other packets with that same flow will go
589                                  * to the same worker in this burst.
590                                  */
591                                 for (w = j; w < pkts; w++)
592                                         if (flows[w] == new_tag)
593                                                 matches[w] = wkr+1;
594                         }
595                 }
596                 wkr = (wkr + 1) % d->num_workers;
597         }
598
599         /* Flush out all non-full cache-lines to workers. */
600         for (wid = 0 ; wid < d->num_workers; wid++)
601                 /* Sync with worker on GET_BUF flag. */
602                 if ((__atomic_load_n(&(d->bufs[wid].bufptr64[0]),
603                         __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF)) {
604                         d->bufs[wid].count = 0;
605                         release(d, wid);
606                 }
607
608         return num_mbufs;
609 }
610
611 /* return to the caller, packets returned from workers */
612 int
613 rte_distributor_returned_pkts(struct rte_distributor *d,
614                 struct rte_mbuf **mbufs, unsigned int max_mbufs)
615 {
616         struct rte_distributor_returned_pkts *returns = &d->returns;
617         unsigned int retval = (max_mbufs < returns->count) ?
618                         max_mbufs : returns->count;
619         unsigned int i;
620
621         if (d->alg_type == RTE_DIST_ALG_SINGLE) {
622                 /* Call the old API */
623                 return rte_distributor_returned_pkts_single(d->d_single,
624                                 mbufs, max_mbufs);
625         }
626
627         for (i = 0; i < retval; i++) {
628                 unsigned int idx = (returns->start + i) &
629                                 RTE_DISTRIB_RETURNS_MASK;
630
631                 mbufs[i] = returns->mbufs[idx];
632         }
633         returns->start += i;
634         returns->count -= i;
635
636         return retval;
637 }
638
639 /*
640  * Return the number of packets in-flight in a distributor, i.e. packets
641  * being worked on or queued up in a backlog.
642  */
643 static inline unsigned int
644 total_outstanding(const struct rte_distributor *d)
645 {
646         unsigned int wkr, total_outstanding = 0;
647
648         for (wkr = 0; wkr < d->num_workers; wkr++)
649                 total_outstanding += d->backlog[wkr].count + d->bufs[wkr].count;
650
651         return total_outstanding;
652 }
653
654 /*
655  * Flush the distributor, so that there are no outstanding packets in flight or
656  * queued up.
657  */
658 int
659 rte_distributor_flush(struct rte_distributor *d)
660 {
661         unsigned int flushed;
662         unsigned int wkr;
663
664         if (d->alg_type == RTE_DIST_ALG_SINGLE) {
665                 /* Call the old API */
666                 return rte_distributor_flush_single(d->d_single);
667         }
668
669         flushed = total_outstanding(d);
670
671         while (total_outstanding(d) > 0)
672                 rte_distributor_process(d, NULL, 0);
673
674         /* wait 10ms to allow all worker drain the pkts */
675         rte_delay_us(10000);
676
677         /*
678          * Send empty burst to all workers to allow them to exit
679          * gracefully, should they need to.
680          */
681         rte_distributor_process(d, NULL, 0);
682
683         for (wkr = 0; wkr < d->num_workers; wkr++)
684                 handle_returns(d, wkr);
685
686         return flushed;
687 }
688
689 /* clears the internal returns array in the distributor */
690 void
691 rte_distributor_clear_returns(struct rte_distributor *d)
692 {
693         unsigned int wkr;
694
695         if (d->alg_type == RTE_DIST_ALG_SINGLE) {
696                 /* Call the old API */
697                 rte_distributor_clear_returns_single(d->d_single);
698                 return;
699         }
700
701         /* throw away returns, so workers can exit */
702         for (wkr = 0; wkr < d->num_workers; wkr++)
703                 /* Sync with worker. Release retptrs. */
704                 __atomic_store_n(&(d->bufs[wkr].retptr64[0]), 0,
705                                 __ATOMIC_RELEASE);
706
707         d->returns.start = d->returns.count = 0;
708 }
709
710 /* creates a distributor instance */
711 struct rte_distributor *
712 rte_distributor_create(const char *name,
713                 unsigned int socket_id,
714                 unsigned int num_workers,
715                 unsigned int alg_type)
716 {
717         struct rte_distributor *d;
718         struct rte_dist_burst_list *dist_burst_list;
719         char mz_name[RTE_MEMZONE_NAMESIZE];
720         const struct rte_memzone *mz;
721         unsigned int i;
722
723         /* TODO Reorganise function properly around RTE_DIST_ALG_SINGLE/BURST */
724
725         /* compilation-time checks */
726         RTE_BUILD_BUG_ON((sizeof(*d) & RTE_CACHE_LINE_MASK) != 0);
727         RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0);
728
729         if (name == NULL || num_workers >=
730                 (unsigned int)RTE_MIN(RTE_DISTRIB_MAX_WORKERS, RTE_MAX_LCORE)) {
731                 rte_errno = EINVAL;
732                 return NULL;
733         }
734
735         if (alg_type == RTE_DIST_ALG_SINGLE) {
736                 d = malloc(sizeof(struct rte_distributor));
737                 if (d == NULL) {
738                         rte_errno = ENOMEM;
739                         return NULL;
740                 }
741                 d->d_single = rte_distributor_create_single(name,
742                                 socket_id, num_workers);
743                 if (d->d_single == NULL) {
744                         free(d);
745                         /* rte_errno will have been set */
746                         return NULL;
747                 }
748                 d->alg_type = alg_type;
749                 return d;
750         }
751
752         snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name);
753         mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS);
754         if (mz == NULL) {
755                 rte_errno = ENOMEM;
756                 return NULL;
757         }
758
759         d = mz->addr;
760         strlcpy(d->name, name, sizeof(d->name));
761         d->num_workers = num_workers;
762         d->alg_type = alg_type;
763
764         d->dist_match_fn = RTE_DIST_MATCH_SCALAR;
765 #if defined(RTE_ARCH_X86)
766         if (rte_vect_get_max_simd_bitwidth() >= RTE_VECT_SIMD_128)
767                 d->dist_match_fn = RTE_DIST_MATCH_VECTOR;
768 #endif
769
770         /*
771          * Set up the backlog tags so they're pointing at the second cache
772          * line for performance during flow matching
773          */
774         for (i = 0 ; i < num_workers ; i++)
775                 d->backlog[i].tags = &d->in_flight_tags[i][RTE_DIST_BURST_SIZE];
776
777         memset(d->active, 0, sizeof(d->active));
778         d->activesum = 0;
779
780         dist_burst_list = RTE_TAILQ_CAST(rte_dist_burst_tailq.head,
781                                           rte_dist_burst_list);
782
783
784         rte_mcfg_tailq_write_lock();
785         TAILQ_INSERT_TAIL(dist_burst_list, d, next);
786         rte_mcfg_tailq_write_unlock();
787
788         return d;
789 }