distributor: fix livelock on flush
[dpdk.git] / lib / librte_distributor / rte_distributor.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation
3  */
4
5 #include <stdio.h>
6 #include <sys/queue.h>
7 #include <string.h>
8 #include <rte_mbuf.h>
9 #include <rte_memory.h>
10 #include <rte_cycles.h>
11 #include <rte_compat.h>
12 #include <rte_memzone.h>
13 #include <rte_errno.h>
14 #include <rte_string_fns.h>
15 #include <rte_eal_memconfig.h>
16 #include <rte_pause.h>
17 #include <rte_tailq.h>
18
19 #include "rte_distributor_private.h"
20 #include "rte_distributor.h"
21 #include "rte_distributor_v20.h"
22 #include "rte_distributor_v1705.h"
23
24 TAILQ_HEAD(rte_dist_burst_list, rte_distributor);
25
26 static struct rte_tailq_elem rte_dist_burst_tailq = {
27         .name = "RTE_DIST_BURST",
28 };
29 EAL_REGISTER_TAILQ(rte_dist_burst_tailq)
30
31 /**** APIs called by workers ****/
32
33 /**** Burst Packet APIs called by workers ****/
34
35 void
36 rte_distributor_request_pkt_v1705(struct rte_distributor *d,
37                 unsigned int worker_id, struct rte_mbuf **oldpkt,
38                 unsigned int count)
39 {
40         struct rte_distributor_buffer *buf = &(d->bufs[worker_id]);
41         unsigned int i;
42
43         volatile int64_t *retptr64;
44
45         if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
46                 rte_distributor_request_pkt_v20(d->d_v20,
47                         worker_id, oldpkt[0]);
48                 return;
49         }
50
51         retptr64 = &(buf->retptr64[0]);
52         /* Spin while handshake bits are set (scheduler clears it) */
53         while (unlikely(*retptr64 & RTE_DISTRIB_GET_BUF)) {
54                 rte_pause();
55                 uint64_t t = rte_rdtsc()+100;
56
57                 while (rte_rdtsc() < t)
58                         rte_pause();
59         }
60
61         /*
62          * OK, if we've got here, then the scheduler has just cleared the
63          * handshake bits. Populate the retptrs with returning packets.
64          */
65
66         for (i = count; i < RTE_DIST_BURST_SIZE; i++)
67                 buf->retptr64[i] = 0;
68
69         /* Set Return bit for each packet returned */
70         for (i = count; i-- > 0; )
71                 buf->retptr64[i] =
72                         (((int64_t)(uintptr_t)(oldpkt[i])) <<
73                         RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF;
74
75         /*
76          * Finally, set the GET_BUF  to signal to distributor that cache
77          * line is ready for processing
78          */
79         *retptr64 |= RTE_DISTRIB_GET_BUF;
80 }
81 BIND_DEFAULT_SYMBOL(rte_distributor_request_pkt, _v1705, 17.05);
82 MAP_STATIC_SYMBOL(void rte_distributor_request_pkt(struct rte_distributor *d,
83                 unsigned int worker_id, struct rte_mbuf **oldpkt,
84                 unsigned int count),
85                 rte_distributor_request_pkt_v1705);
86
87 int
88 rte_distributor_poll_pkt_v1705(struct rte_distributor *d,
89                 unsigned int worker_id, struct rte_mbuf **pkts)
90 {
91         struct rte_distributor_buffer *buf = &d->bufs[worker_id];
92         uint64_t ret;
93         int count = 0;
94         unsigned int i;
95
96         if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
97                 pkts[0] = rte_distributor_poll_pkt_v20(d->d_v20, worker_id);
98                 return (pkts[0]) ? 1 : 0;
99         }
100
101         /* If bit is set, return */
102         if (buf->bufptr64[0] & RTE_DISTRIB_GET_BUF)
103                 return -1;
104
105         /* since bufptr64 is signed, this should be an arithmetic shift */
106         for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
107                 if (likely(buf->bufptr64[i] & RTE_DISTRIB_VALID_BUF)) {
108                         ret = buf->bufptr64[i] >> RTE_DISTRIB_FLAG_BITS;
109                         pkts[count++] = (struct rte_mbuf *)((uintptr_t)(ret));
110                 }
111         }
112
113         /*
114          * so now we've got the contents of the cacheline into an  array of
115          * mbuf pointers, so toggle the bit so scheduler can start working
116          * on the next cacheline while we're working.
117          */
118         buf->bufptr64[0] |= RTE_DISTRIB_GET_BUF;
119
120         return count;
121 }
122 BIND_DEFAULT_SYMBOL(rte_distributor_poll_pkt, _v1705, 17.05);
123 MAP_STATIC_SYMBOL(int rte_distributor_poll_pkt(struct rte_distributor *d,
124                 unsigned int worker_id, struct rte_mbuf **pkts),
125                 rte_distributor_poll_pkt_v1705);
126
127 int
128 rte_distributor_get_pkt_v1705(struct rte_distributor *d,
129                 unsigned int worker_id, struct rte_mbuf **pkts,
130                 struct rte_mbuf **oldpkt, unsigned int return_count)
131 {
132         int count;
133
134         if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
135                 if (return_count <= 1) {
136                         pkts[0] = rte_distributor_get_pkt_v20(d->d_v20,
137                                 worker_id, oldpkt[0]);
138                         return (pkts[0]) ? 1 : 0;
139                 } else
140                         return -EINVAL;
141         }
142
143         rte_distributor_request_pkt(d, worker_id, oldpkt, return_count);
144
145         count = rte_distributor_poll_pkt(d, worker_id, pkts);
146         while (count == -1) {
147                 uint64_t t = rte_rdtsc() + 100;
148
149                 while (rte_rdtsc() < t)
150                         rte_pause();
151
152                 count = rte_distributor_poll_pkt(d, worker_id, pkts);
153         }
154         return count;
155 }
156 BIND_DEFAULT_SYMBOL(rte_distributor_get_pkt, _v1705, 17.05);
157 MAP_STATIC_SYMBOL(int rte_distributor_get_pkt(struct rte_distributor *d,
158                 unsigned int worker_id, struct rte_mbuf **pkts,
159                 struct rte_mbuf **oldpkt, unsigned int return_count),
160                 rte_distributor_get_pkt_v1705);
161
162 int
163 rte_distributor_return_pkt_v1705(struct rte_distributor *d,
164                 unsigned int worker_id, struct rte_mbuf **oldpkt, int num)
165 {
166         struct rte_distributor_buffer *buf = &d->bufs[worker_id];
167         unsigned int i;
168
169         if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
170                 if (num == 1)
171                         return rte_distributor_return_pkt_v20(d->d_v20,
172                                 worker_id, oldpkt[0]);
173                 else
174                         return -EINVAL;
175         }
176
177         for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
178                 /* Switch off the return bit first */
179                 buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF;
180
181         for (i = num; i-- > 0; )
182                 buf->retptr64[i] = (((int64_t)(uintptr_t)oldpkt[i]) <<
183                         RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF;
184
185         /* set the GET_BUF but even if we got no returns */
186         buf->retptr64[0] |= RTE_DISTRIB_GET_BUF;
187
188         return 0;
189 }
190 BIND_DEFAULT_SYMBOL(rte_distributor_return_pkt, _v1705, 17.05);
191 MAP_STATIC_SYMBOL(int rte_distributor_return_pkt(struct rte_distributor *d,
192                 unsigned int worker_id, struct rte_mbuf **oldpkt, int num),
193                 rte_distributor_return_pkt_v1705);
194
195 /**** APIs called on distributor core ***/
196
197 /* stores a packet returned from a worker inside the returns array */
198 static inline void
199 store_return(uintptr_t oldbuf, struct rte_distributor *d,
200                 unsigned int *ret_start, unsigned int *ret_count)
201 {
202         if (!oldbuf)
203                 return;
204         /* store returns in a circular buffer */
205         d->returns.mbufs[(*ret_start + *ret_count) & RTE_DISTRIB_RETURNS_MASK]
206                         = (void *)oldbuf;
207         *ret_start += (*ret_count == RTE_DISTRIB_RETURNS_MASK);
208         *ret_count += (*ret_count != RTE_DISTRIB_RETURNS_MASK);
209 }
210
211 /*
212  * Match then flow_ids (tags) of the incoming packets to the flow_ids
213  * of the inflight packets (both inflight on the workers and in each worker
214  * backlog). This will then allow us to pin those packets to the relevant
215  * workers to give us our atomic flow pinning.
216  */
217 void
218 find_match_scalar(struct rte_distributor *d,
219                         uint16_t *data_ptr,
220                         uint16_t *output_ptr)
221 {
222         struct rte_distributor_backlog *bl;
223         uint16_t i, j, w;
224
225         /*
226          * Function overview:
227          * 1. Loop through all worker ID's
228          * 2. Compare the current inflights to the incoming tags
229          * 3. Compare the current backlog to the incoming tags
230          * 4. Add any matches to the output
231          */
232
233         for (j = 0 ; j < RTE_DIST_BURST_SIZE; j++)
234                 output_ptr[j] = 0;
235
236         for (i = 0; i < d->num_workers; i++) {
237                 bl = &d->backlog[i];
238
239                 for (j = 0; j < RTE_DIST_BURST_SIZE ; j++)
240                         for (w = 0; w < RTE_DIST_BURST_SIZE; w++)
241                                 if (d->in_flight_tags[i][j] == data_ptr[w]) {
242                                         output_ptr[j] = i+1;
243                                         break;
244                                 }
245                 for (j = 0; j < RTE_DIST_BURST_SIZE; j++)
246                         for (w = 0; w < RTE_DIST_BURST_SIZE; w++)
247                                 if (bl->tags[j] == data_ptr[w]) {
248                                         output_ptr[j] = i+1;
249                                         break;
250                                 }
251         }
252
253         /*
254          * At this stage, the output contains 8 16-bit values, with
255          * each non-zero value containing the worker ID on which the
256          * corresponding flow is pinned to.
257          */
258 }
259
260
261 /*
262  * When the handshake bits indicate that there are packets coming
263  * back from the worker, this function is called to copy and store
264  * the valid returned pointers (store_return).
265  */
266 static unsigned int
267 handle_returns(struct rte_distributor *d, unsigned int wkr)
268 {
269         struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
270         uintptr_t oldbuf;
271         unsigned int ret_start = d->returns.start,
272                         ret_count = d->returns.count;
273         unsigned int count = 0;
274         unsigned int i;
275
276         if (buf->retptr64[0] & RTE_DISTRIB_GET_BUF) {
277                 for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
278                         if (buf->retptr64[i] & RTE_DISTRIB_RETURN_BUF) {
279                                 oldbuf = ((uintptr_t)(buf->retptr64[i] >>
280                                         RTE_DISTRIB_FLAG_BITS));
281                                 /* store returns in a circular buffer */
282                                 store_return(oldbuf, d, &ret_start, &ret_count);
283                                 count++;
284                                 buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF;
285                         }
286                 }
287                 d->returns.start = ret_start;
288                 d->returns.count = ret_count;
289                 /* Clear for the worker to populate with more returns */
290                 buf->retptr64[0] = 0;
291         }
292         return count;
293 }
294
295 /*
296  * This function releases a burst (cache line) to a worker.
297  * It is called from the process function when a cacheline is
298  * full to make room for more packets for that worker, or when
299  * all packets have been assigned to bursts and need to be flushed
300  * to the workers.
301  * It also needs to wait for any outstanding packets from the worker
302  * before sending out new packets.
303  */
304 static unsigned int
305 release(struct rte_distributor *d, unsigned int wkr)
306 {
307         struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
308         unsigned int i;
309
310         while (!(d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF))
311                 rte_pause();
312
313         handle_returns(d, wkr);
314
315         buf->count = 0;
316
317         for (i = 0; i < d->backlog[wkr].count; i++) {
318                 d->bufs[wkr].bufptr64[i] = d->backlog[wkr].pkts[i] |
319                                 RTE_DISTRIB_GET_BUF | RTE_DISTRIB_VALID_BUF;
320                 d->in_flight_tags[wkr][i] = d->backlog[wkr].tags[i];
321         }
322         buf->count = i;
323         for ( ; i < RTE_DIST_BURST_SIZE ; i++) {
324                 buf->bufptr64[i] = RTE_DISTRIB_GET_BUF;
325                 d->in_flight_tags[wkr][i] = 0;
326         }
327
328         d->backlog[wkr].count = 0;
329
330         /* Clear the GET bit */
331         buf->bufptr64[0] &= ~RTE_DISTRIB_GET_BUF;
332         return  buf->count;
333
334 }
335
336
337 /* process a set of packets to distribute them to workers */
338 int
339 rte_distributor_process_v1705(struct rte_distributor *d,
340                 struct rte_mbuf **mbufs, unsigned int num_mbufs)
341 {
342         unsigned int next_idx = 0;
343         static unsigned int wkr;
344         struct rte_mbuf *next_mb = NULL;
345         int64_t next_value = 0;
346         uint16_t new_tag = 0;
347         uint16_t flows[RTE_DIST_BURST_SIZE] __rte_cache_aligned;
348         unsigned int i, j, w, wid;
349
350         if (d->alg_type == RTE_DIST_ALG_SINGLE) {
351                 /* Call the old API */
352                 return rte_distributor_process_v20(d->d_v20, mbufs, num_mbufs);
353         }
354
355         if (unlikely(num_mbufs == 0)) {
356                 /* Flush out all non-full cache-lines to workers. */
357                 for (wid = 0 ; wid < d->num_workers; wid++) {
358                         if (d->bufs[wid].bufptr64[0] & RTE_DISTRIB_GET_BUF) {
359                                 release(d, wid);
360                                 handle_returns(d, wid);
361                         }
362                 }
363                 return 0;
364         }
365
366         while (next_idx < num_mbufs) {
367                 uint16_t matches[RTE_DIST_BURST_SIZE];
368                 unsigned int pkts;
369
370                 if (d->bufs[wkr].bufptr64[0] & RTE_DISTRIB_GET_BUF)
371                         d->bufs[wkr].count = 0;
372
373                 if ((num_mbufs - next_idx) < RTE_DIST_BURST_SIZE)
374                         pkts = num_mbufs - next_idx;
375                 else
376                         pkts = RTE_DIST_BURST_SIZE;
377
378                 for (i = 0; i < pkts; i++) {
379                         if (mbufs[next_idx + i]) {
380                                 /* flows have to be non-zero */
381                                 flows[i] = mbufs[next_idx + i]->hash.usr | 1;
382                         } else
383                                 flows[i] = 0;
384                 }
385                 for (; i < RTE_DIST_BURST_SIZE; i++)
386                         flows[i] = 0;
387
388                 switch (d->dist_match_fn) {
389                 case RTE_DIST_MATCH_VECTOR:
390                         find_match_vec(d, &flows[0], &matches[0]);
391                         break;
392                 default:
393                         find_match_scalar(d, &flows[0], &matches[0]);
394                 }
395
396                 /*
397                  * Matches array now contain the intended worker ID (+1) of
398                  * the incoming packets. Any zeroes need to be assigned
399                  * workers.
400                  */
401
402                 for (j = 0; j < pkts; j++) {
403
404                         next_mb = mbufs[next_idx++];
405                         next_value = (((int64_t)(uintptr_t)next_mb) <<
406                                         RTE_DISTRIB_FLAG_BITS);
407                         /*
408                          * User is advocated to set tag value for each
409                          * mbuf before calling rte_distributor_process.
410                          * User defined tags are used to identify flows,
411                          * or sessions.
412                          */
413                         /* flows MUST be non-zero */
414                         new_tag = (uint16_t)(next_mb->hash.usr) | 1;
415
416                         /*
417                          * Uncommenting the next line will cause the find_match
418                          * function to be optimized out, making this function
419                          * do parallel (non-atomic) distribution
420                          */
421                         /* matches[j] = 0; */
422
423                         if (matches[j]) {
424                                 struct rte_distributor_backlog *bl =
425                                                 &d->backlog[matches[j]-1];
426                                 if (unlikely(bl->count ==
427                                                 RTE_DIST_BURST_SIZE)) {
428                                         release(d, matches[j]-1);
429                                 }
430
431                                 /* Add to worker that already has flow */
432                                 unsigned int idx = bl->count++;
433
434                                 bl->tags[idx] = new_tag;
435                                 bl->pkts[idx] = next_value;
436
437                         } else {
438                                 struct rte_distributor_backlog *bl =
439                                                 &d->backlog[wkr];
440                                 if (unlikely(bl->count ==
441                                                 RTE_DIST_BURST_SIZE)) {
442                                         release(d, wkr);
443                                 }
444
445                                 /* Add to current worker worker */
446                                 unsigned int idx = bl->count++;
447
448                                 bl->tags[idx] = new_tag;
449                                 bl->pkts[idx] = next_value;
450                                 /*
451                                  * Now that we've just added an unpinned flow
452                                  * to a worker, we need to ensure that all
453                                  * other packets with that same flow will go
454                                  * to the same worker in this burst.
455                                  */
456                                 for (w = j; w < pkts; w++)
457                                         if (flows[w] == new_tag)
458                                                 matches[w] = wkr+1;
459                         }
460                 }
461                 wkr++;
462                 if (wkr >= d->num_workers)
463                         wkr = 0;
464         }
465
466         /* Flush out all non-full cache-lines to workers. */
467         for (wid = 0 ; wid < d->num_workers; wid++)
468                 if ((d->bufs[wid].bufptr64[0] & RTE_DISTRIB_GET_BUF))
469                         release(d, wid);
470
471         return num_mbufs;
472 }
473 BIND_DEFAULT_SYMBOL(rte_distributor_process, _v1705, 17.05);
474 MAP_STATIC_SYMBOL(int rte_distributor_process(struct rte_distributor *d,
475                 struct rte_mbuf **mbufs, unsigned int num_mbufs),
476                 rte_distributor_process_v1705);
477
478 /* return to the caller, packets returned from workers */
479 int
480 rte_distributor_returned_pkts_v1705(struct rte_distributor *d,
481                 struct rte_mbuf **mbufs, unsigned int max_mbufs)
482 {
483         struct rte_distributor_returned_pkts *returns = &d->returns;
484         unsigned int retval = (max_mbufs < returns->count) ?
485                         max_mbufs : returns->count;
486         unsigned int i;
487
488         if (d->alg_type == RTE_DIST_ALG_SINGLE) {
489                 /* Call the old API */
490                 return rte_distributor_returned_pkts_v20(d->d_v20,
491                                 mbufs, max_mbufs);
492         }
493
494         for (i = 0; i < retval; i++) {
495                 unsigned int idx = (returns->start + i) &
496                                 RTE_DISTRIB_RETURNS_MASK;
497
498                 mbufs[i] = returns->mbufs[idx];
499         }
500         returns->start += i;
501         returns->count -= i;
502
503         return retval;
504 }
505 BIND_DEFAULT_SYMBOL(rte_distributor_returned_pkts, _v1705, 17.05);
506 MAP_STATIC_SYMBOL(int rte_distributor_returned_pkts(struct rte_distributor *d,
507                 struct rte_mbuf **mbufs, unsigned int max_mbufs),
508                 rte_distributor_returned_pkts_v1705);
509
510 /*
511  * Return the number of packets in-flight in a distributor, i.e. packets
512  * being worked on or queued up in a backlog.
513  */
514 static inline unsigned int
515 total_outstanding(const struct rte_distributor *d)
516 {
517         unsigned int wkr, total_outstanding = 0;
518
519         for (wkr = 0; wkr < d->num_workers; wkr++)
520                 total_outstanding += d->backlog[wkr].count;
521
522         return total_outstanding;
523 }
524
525 /*
526  * Flush the distributor, so that there are no outstanding packets in flight or
527  * queued up.
528  */
529 int
530 rte_distributor_flush_v1705(struct rte_distributor *d)
531 {
532         unsigned int flushed;
533         unsigned int wkr;
534
535         if (d->alg_type == RTE_DIST_ALG_SINGLE) {
536                 /* Call the old API */
537                 return rte_distributor_flush_v20(d->d_v20);
538         }
539
540         flushed = total_outstanding(d);
541
542         while (total_outstanding(d) > 0)
543                 rte_distributor_process(d, NULL, 0);
544
545         /* wait 10ms to allow all worker drain the pkts */
546         rte_delay_us(10000);
547
548         /*
549          * Send empty burst to all workers to allow them to exit
550          * gracefully, should they need to.
551          */
552         rte_distributor_process(d, NULL, 0);
553
554         for (wkr = 0; wkr < d->num_workers; wkr++)
555                 handle_returns(d, wkr);
556
557         return flushed;
558 }
559 BIND_DEFAULT_SYMBOL(rte_distributor_flush, _v1705, 17.05);
560 MAP_STATIC_SYMBOL(int rte_distributor_flush(struct rte_distributor *d),
561                 rte_distributor_flush_v1705);
562
563 /* clears the internal returns array in the distributor */
564 void
565 rte_distributor_clear_returns_v1705(struct rte_distributor *d)
566 {
567         unsigned int wkr;
568
569         if (d->alg_type == RTE_DIST_ALG_SINGLE) {
570                 /* Call the old API */
571                 rte_distributor_clear_returns_v20(d->d_v20);
572                 return;
573         }
574
575         /* throw away returns, so workers can exit */
576         for (wkr = 0; wkr < d->num_workers; wkr++)
577                 d->bufs[wkr].retptr64[0] = 0;
578 }
579 BIND_DEFAULT_SYMBOL(rte_distributor_clear_returns, _v1705, 17.05);
580 MAP_STATIC_SYMBOL(void rte_distributor_clear_returns(struct rte_distributor *d),
581                 rte_distributor_clear_returns_v1705);
582
583 /* creates a distributor instance */
584 struct rte_distributor *
585 rte_distributor_create_v1705(const char *name,
586                 unsigned int socket_id,
587                 unsigned int num_workers,
588                 unsigned int alg_type)
589 {
590         struct rte_distributor *d;
591         struct rte_dist_burst_list *dist_burst_list;
592         char mz_name[RTE_MEMZONE_NAMESIZE];
593         const struct rte_memzone *mz;
594         unsigned int i;
595
596         /* TODO Reorganise function properly around RTE_DIST_ALG_SINGLE/BURST */
597
598         /* compilation-time checks */
599         RTE_BUILD_BUG_ON((sizeof(*d) & RTE_CACHE_LINE_MASK) != 0);
600         RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0);
601
602         if (name == NULL || num_workers >=
603                 (unsigned int)RTE_MIN(RTE_DISTRIB_MAX_WORKERS, RTE_MAX_LCORE)) {
604                 rte_errno = EINVAL;
605                 return NULL;
606         }
607
608         if (alg_type == RTE_DIST_ALG_SINGLE) {
609                 d = malloc(sizeof(struct rte_distributor));
610                 if (d == NULL) {
611                         rte_errno = ENOMEM;
612                         return NULL;
613                 }
614                 d->d_v20 = rte_distributor_create_v20(name,
615                                 socket_id, num_workers);
616                 if (d->d_v20 == NULL) {
617                         free(d);
618                         /* rte_errno will have been set */
619                         return NULL;
620                 }
621                 d->alg_type = alg_type;
622                 return d;
623         }
624
625         snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name);
626         mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS);
627         if (mz == NULL) {
628                 rte_errno = ENOMEM;
629                 return NULL;
630         }
631
632         d = mz->addr;
633         strlcpy(d->name, name, sizeof(d->name));
634         d->num_workers = num_workers;
635         d->alg_type = alg_type;
636
637         d->dist_match_fn = RTE_DIST_MATCH_SCALAR;
638 #if defined(RTE_ARCH_X86)
639         d->dist_match_fn = RTE_DIST_MATCH_VECTOR;
640 #endif
641
642         /*
643          * Set up the backlog tags so they're pointing at the second cache
644          * line for performance during flow matching
645          */
646         for (i = 0 ; i < num_workers ; i++)
647                 d->backlog[i].tags = &d->in_flight_tags[i][RTE_DIST_BURST_SIZE];
648
649         dist_burst_list = RTE_TAILQ_CAST(rte_dist_burst_tailq.head,
650                                           rte_dist_burst_list);
651
652
653         rte_mcfg_tailq_write_lock();
654         TAILQ_INSERT_TAIL(dist_burst_list, d, next);
655         rte_mcfg_tailq_write_unlock();
656
657         return d;
658 }
659 BIND_DEFAULT_SYMBOL(rte_distributor_create, _v1705, 17.05);
660 MAP_STATIC_SYMBOL(struct rte_distributor *rte_distributor_create(
661                 const char *name, unsigned int socket_id,
662                 unsigned int num_workers, unsigned int alg_type),
663                 rte_distributor_create_v1705);