fdb6ea9ceb8effdbedd19ab8695714d0e16b2c3a
[dpdk.git] / app / test / test_distributor.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2017 Intel Corporation
3  */
4
5 #include "test.h"
6
7 #include <unistd.h>
8 #include <string.h>
9 #include <rte_cycles.h>
10 #include <rte_errno.h>
11 #include <rte_mempool.h>
12 #include <rte_mbuf.h>
13 #include <rte_distributor.h>
14 #include <rte_string_fns.h>
15
16 #define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */
17 #define BURST 32
18 #define BIG_BATCH 1024
19
20 struct worker_params {
21         char name[64];
22         struct rte_distributor *dist;
23 };
24
25 struct worker_params worker_params;
26
27 /* statics - all zero-initialized by default */
28 static volatile int quit;      /**< general quit variable for all threads */
29 static volatile int zero_quit; /**< var for when we just want thr0 to quit*/
30 static volatile int zero_sleep; /**< thr0 has quit basic loop and is sleeping*/
31 static volatile unsigned worker_idx;
32 static volatile unsigned zero_idx;
33
34 struct worker_stats {
35         volatile unsigned handled_packets;
36 } __rte_cache_aligned;
37 struct worker_stats worker_stats[RTE_MAX_LCORE];
38
39 /* returns the total count of the number of packets handled by the worker
40  * functions given below.
41  */
42 static inline unsigned
43 total_packet_count(void)
44 {
45         unsigned i, count = 0;
46         for (i = 0; i < worker_idx; i++)
47                 count += __atomic_load_n(&worker_stats[i].handled_packets,
48                                 __ATOMIC_RELAXED);
49         return count;
50 }
51
52 /* resets the packet counts for a new test */
53 static inline void
54 clear_packet_count(void)
55 {
56         unsigned int i;
57         for (i = 0; i < RTE_MAX_LCORE; i++)
58                 __atomic_store_n(&worker_stats[i].handled_packets, 0,
59                         __ATOMIC_RELAXED);
60 }
61
62 /* this is the basic worker function for sanity test
63  * it does nothing but return packets and count them.
64  */
65 static int
66 handle_work(void *arg)
67 {
68         struct rte_mbuf *buf[8] __rte_cache_aligned;
69         struct worker_params *wp = arg;
70         struct rte_distributor *db = wp->dist;
71         unsigned int num;
72         unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
73
74         num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
75         while (!quit) {
76                 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
77                                 __ATOMIC_RELAXED);
78                 num = rte_distributor_get_pkt(db, id,
79                                 buf, buf, num);
80         }
81         __atomic_fetch_add(&worker_stats[id].handled_packets, num,
82                         __ATOMIC_RELAXED);
83         rte_distributor_return_pkt(db, id, buf, num);
84         return 0;
85 }
86
87 /* do basic sanity testing of the distributor. This test tests the following:
88  * - send 32 packets through distributor with the same tag and ensure they
89  *   all go to the one worker
90  * - send 32 packets through the distributor with two different tags and
91  *   verify that they go equally to two different workers.
92  * - send 32 packets with different tags through the distributors and
93  *   just verify we get all packets back.
94  * - send 1024 packets through the distributor, gathering the returned packets
95  *   as we go. Then verify that we correctly got all 1024 pointers back again,
96  *   not necessarily in the same order (as different flows).
97  */
98 static int
99 sanity_test(struct worker_params *wp, struct rte_mempool *p)
100 {
101         struct rte_distributor *db = wp->dist;
102         struct rte_mbuf *bufs[BURST];
103         struct rte_mbuf *returns[BURST*2];
104         unsigned int i, count;
105         unsigned int retries;
106
107         printf("=== Basic distributor sanity tests ===\n");
108         clear_packet_count();
109         if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
110                 printf("line %d: Error getting mbufs from pool\n", __LINE__);
111                 return -1;
112         }
113
114         /* now set all hash values in all buffers to zero, so all pkts go to the
115          * one worker thread */
116         for (i = 0; i < BURST; i++)
117                 bufs[i]->hash.usr = 0;
118
119         rte_distributor_process(db, bufs, BURST);
120         count = 0;
121         do {
122
123                 rte_distributor_flush(db);
124                 count += rte_distributor_returned_pkts(db,
125                                 returns, BURST*2);
126         } while (count < BURST);
127
128         if (total_packet_count() != BURST) {
129                 printf("Line %d: Error, not all packets flushed. "
130                                 "Expected %u, got %u\n",
131                                 __LINE__, BURST, total_packet_count());
132                 return -1;
133         }
134
135         for (i = 0; i < rte_lcore_count() - 1; i++)
136                 printf("Worker %u handled %u packets\n", i,
137                         __atomic_load_n(&worker_stats[i].handled_packets,
138                                         __ATOMIC_RELAXED));
139         printf("Sanity test with all zero hashes done.\n");
140
141         /* pick two flows and check they go correctly */
142         if (rte_lcore_count() >= 3) {
143                 clear_packet_count();
144                 for (i = 0; i < BURST; i++)
145                         bufs[i]->hash.usr = (i & 1) << 8;
146
147                 rte_distributor_process(db, bufs, BURST);
148                 count = 0;
149                 do {
150                         rte_distributor_flush(db);
151                         count += rte_distributor_returned_pkts(db,
152                                         returns, BURST*2);
153                 } while (count < BURST);
154                 if (total_packet_count() != BURST) {
155                         printf("Line %d: Error, not all packets flushed. "
156                                         "Expected %u, got %u\n",
157                                         __LINE__, BURST, total_packet_count());
158                         return -1;
159                 }
160
161                 for (i = 0; i < rte_lcore_count() - 1; i++)
162                         printf("Worker %u handled %u packets\n", i,
163                                 __atomic_load_n(
164                                         &worker_stats[i].handled_packets,
165                                         __ATOMIC_RELAXED));
166                 printf("Sanity test with two hash values done\n");
167         }
168
169         /* give a different hash value to each packet,
170          * so load gets distributed */
171         clear_packet_count();
172         for (i = 0; i < BURST; i++)
173                 bufs[i]->hash.usr = i+1;
174
175         rte_distributor_process(db, bufs, BURST);
176         count = 0;
177         do {
178                 rte_distributor_flush(db);
179                 count += rte_distributor_returned_pkts(db,
180                                 returns, BURST*2);
181         } while (count < BURST);
182         if (total_packet_count() != BURST) {
183                 printf("Line %d: Error, not all packets flushed. "
184                                 "Expected %u, got %u\n",
185                                 __LINE__, BURST, total_packet_count());
186                 return -1;
187         }
188
189         for (i = 0; i < rte_lcore_count() - 1; i++)
190                 printf("Worker %u handled %u packets\n", i,
191                         __atomic_load_n(&worker_stats[i].handled_packets,
192                                         __ATOMIC_RELAXED));
193         printf("Sanity test with non-zero hashes done\n");
194
195         rte_mempool_put_bulk(p, (void *)bufs, BURST);
196
197         /* sanity test with BIG_BATCH packets to ensure they all arrived back
198          * from the returned packets function */
199         clear_packet_count();
200         struct rte_mbuf *many_bufs[BIG_BATCH], *return_bufs[BIG_BATCH];
201         unsigned num_returned = 0;
202
203         /* flush out any remaining packets */
204         rte_distributor_flush(db);
205         rte_distributor_clear_returns(db);
206
207         if (rte_mempool_get_bulk(p, (void *)many_bufs, BIG_BATCH) != 0) {
208                 printf("line %d: Error getting mbufs from pool\n", __LINE__);
209                 return -1;
210         }
211         for (i = 0; i < BIG_BATCH; i++)
212                 many_bufs[i]->hash.usr = i << 2;
213
214         printf("=== testing big burst (%s) ===\n", wp->name);
215         for (i = 0; i < BIG_BATCH/BURST; i++) {
216                 rte_distributor_process(db,
217                                 &many_bufs[i*BURST], BURST);
218                 count = rte_distributor_returned_pkts(db,
219                                 &return_bufs[num_returned],
220                                 BIG_BATCH - num_returned);
221                 num_returned += count;
222         }
223         rte_distributor_flush(db);
224         count = rte_distributor_returned_pkts(db,
225                 &return_bufs[num_returned],
226                         BIG_BATCH - num_returned);
227         num_returned += count;
228         retries = 0;
229         do {
230                 rte_distributor_flush(db);
231                 count = rte_distributor_returned_pkts(db,
232                                 &return_bufs[num_returned],
233                                 BIG_BATCH - num_returned);
234                 num_returned += count;
235                 retries++;
236         } while ((num_returned < BIG_BATCH) && (retries < 100));
237
238         if (num_returned != BIG_BATCH) {
239                 printf("line %d: Missing packets, expected %d\n",
240                                 __LINE__, num_returned);
241                 return -1;
242         }
243
244         /* big check -  make sure all packets made it back!! */
245         for (i = 0; i < BIG_BATCH; i++) {
246                 unsigned j;
247                 struct rte_mbuf *src = many_bufs[i];
248                 for (j = 0; j < BIG_BATCH; j++) {
249                         if (return_bufs[j] == src)
250                                 break;
251                 }
252
253                 if (j == BIG_BATCH) {
254                         printf("Error: could not find source packet #%u\n", i);
255                         return -1;
256                 }
257         }
258         printf("Sanity test of returned packets done\n");
259
260         rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH);
261
262         printf("\n");
263         return 0;
264 }
265
266
267 /* to test that the distributor does not lose packets, we use this worker
268  * function which frees mbufs when it gets them. The distributor thread does
269  * the mbuf allocation. If distributor drops packets we'll eventually run out
270  * of mbufs.
271  */
272 static int
273 handle_work_with_free_mbufs(void *arg)
274 {
275         struct rte_mbuf *buf[8] __rte_cache_aligned;
276         struct worker_params *wp = arg;
277         struct rte_distributor *d = wp->dist;
278         unsigned int i;
279         unsigned int num;
280         unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
281
282         num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
283         while (!quit) {
284                 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
285                                 __ATOMIC_RELAXED);
286                 for (i = 0; i < num; i++)
287                         rte_pktmbuf_free(buf[i]);
288                 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
289         }
290         __atomic_fetch_add(&worker_stats[id].handled_packets, num,
291                         __ATOMIC_RELAXED);
292         rte_distributor_return_pkt(d, id, buf, num);
293         return 0;
294 }
295
296 /* Perform a sanity test of the distributor with a large number of packets,
297  * where we allocate a new set of mbufs for each burst. The workers then
298  * free the mbufs. This ensures that we don't have any packet leaks in the
299  * library.
300  */
301 static int
302 sanity_test_with_mbuf_alloc(struct worker_params *wp, struct rte_mempool *p)
303 {
304         struct rte_distributor *d = wp->dist;
305         unsigned i;
306         struct rte_mbuf *bufs[BURST];
307
308         printf("=== Sanity test with mbuf alloc/free (%s) ===\n", wp->name);
309
310         clear_packet_count();
311         for (i = 0; i < ((1<<ITER_POWER)); i += BURST) {
312                 unsigned j;
313                 while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0)
314                         rte_distributor_process(d, NULL, 0);
315                 for (j = 0; j < BURST; j++) {
316                         bufs[j]->hash.usr = (i+j) << 1;
317                 }
318
319                 rte_distributor_process(d, bufs, BURST);
320         }
321
322         rte_distributor_flush(d);
323
324         rte_delay_us(10000);
325
326         if (total_packet_count() < (1<<ITER_POWER)) {
327                 printf("Line %u: Packet count is incorrect, %u, expected %u\n",
328                                 __LINE__, total_packet_count(),
329                                 (1<<ITER_POWER));
330                 return -1;
331         }
332
333         printf("Sanity test with mbuf alloc/free passed\n\n");
334         return 0;
335 }
336
337 static int
338 handle_work_for_shutdown_test(void *arg)
339 {
340         struct rte_mbuf *buf[8] __rte_cache_aligned;
341         struct worker_params *wp = arg;
342         struct rte_distributor *d = wp->dist;
343         unsigned int num;
344         unsigned int zero_id = 0;
345         unsigned int zero_unset;
346         const unsigned int id = __atomic_fetch_add(&worker_idx, 1,
347                         __ATOMIC_RELAXED);
348
349         num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
350
351         if (num > 0) {
352                 zero_unset = RTE_MAX_LCORE;
353                 __atomic_compare_exchange_n(&zero_idx, &zero_unset, id,
354                         false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE);
355         }
356         zero_id = __atomic_load_n(&zero_idx, __ATOMIC_ACQUIRE);
357
358         /* wait for quit single globally, or for worker zero, wait
359          * for zero_quit */
360         while (!quit && !(id == zero_id && zero_quit)) {
361                 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
362                                 __ATOMIC_RELAXED);
363                 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
364
365                 if (num > 0) {
366                         zero_unset = RTE_MAX_LCORE;
367                         __atomic_compare_exchange_n(&zero_idx, &zero_unset, id,
368                                 false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE);
369                 }
370                 zero_id = __atomic_load_n(&zero_idx, __ATOMIC_ACQUIRE);
371         }
372
373         __atomic_fetch_add(&worker_stats[id].handled_packets, num,
374                         __ATOMIC_RELAXED);
375         if (id == zero_id) {
376                 rte_distributor_return_pkt(d, id, NULL, 0);
377
378                 /* for worker zero, allow it to restart to pick up last packet
379                  * when all workers are shutting down.
380                  */
381                 __atomic_store_n(&zero_sleep, 1, __ATOMIC_RELEASE);
382                 while (zero_quit)
383                         usleep(100);
384                 __atomic_store_n(&zero_sleep, 0, __ATOMIC_RELEASE);
385
386                 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
387
388                 while (!quit) {
389                         __atomic_fetch_add(&worker_stats[id].handled_packets,
390                                         num, __ATOMIC_RELAXED);
391                         num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
392                 }
393         }
394         rte_distributor_return_pkt(d, id, buf, num);
395         return 0;
396 }
397
398
399 /* Perform a sanity test of the distributor with a large number of packets,
400  * where we allocate a new set of mbufs for each burst. The workers then
401  * free the mbufs. This ensures that we don't have any packet leaks in the
402  * library.
403  */
404 static int
405 sanity_test_with_worker_shutdown(struct worker_params *wp,
406                 struct rte_mempool *p)
407 {
408         struct rte_distributor *d = wp->dist;
409         struct rte_mbuf *bufs[BURST];
410         struct rte_mbuf *bufs2[BURST];
411         unsigned int i;
412         unsigned int failed = 0;
413
414         printf("=== Sanity test of worker shutdown ===\n");
415
416         clear_packet_count();
417
418         if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
419                 printf("line %d: Error getting mbufs from pool\n", __LINE__);
420                 return -1;
421         }
422
423         /*
424          * Now set all hash values in all buffers to same value so all
425          * pkts go to the one worker thread
426          */
427         for (i = 0; i < BURST; i++)
428                 bufs[i]->hash.usr = 1;
429
430         rte_distributor_process(d, bufs, BURST);
431         rte_distributor_flush(d);
432
433         /* at this point, we will have processed some packets and have a full
434          * backlog for the other ones at worker 0.
435          */
436
437         /* get more buffers to queue up, again setting them to the same flow */
438         if (rte_mempool_get_bulk(p, (void *)bufs2, BURST) != 0) {
439                 printf("line %d: Error getting mbufs from pool\n", __LINE__);
440                 rte_mempool_put_bulk(p, (void *)bufs, BURST);
441                 return -1;
442         }
443         for (i = 0; i < BURST; i++)
444                 bufs2[i]->hash.usr = 1;
445
446         /* get worker zero to quit */
447         zero_quit = 1;
448         rte_distributor_process(d, bufs2, BURST);
449
450         /* flush the distributor */
451         rte_distributor_flush(d);
452         while (!__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE))
453                 rte_distributor_flush(d);
454
455         zero_quit = 0;
456         while (__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE))
457                 rte_delay_us(100);
458
459         for (i = 0; i < rte_lcore_count() - 1; i++)
460                 printf("Worker %u handled %u packets\n", i,
461                         __atomic_load_n(&worker_stats[i].handled_packets,
462                                         __ATOMIC_RELAXED));
463
464         if (total_packet_count() != BURST * 2) {
465                 printf("Line %d: Error, not all packets flushed. "
466                                 "Expected %u, got %u\n",
467                                 __LINE__, BURST * 2, total_packet_count());
468                 failed = 1;
469         }
470
471         rte_mempool_put_bulk(p, (void *)bufs, BURST);
472         rte_mempool_put_bulk(p, (void *)bufs2, BURST);
473
474         if (failed)
475                 return -1;
476
477         printf("Sanity test with worker shutdown passed\n\n");
478         return 0;
479 }
480
481 /* Test that the flush function is able to move packets between workers when
482  * one worker shuts down..
483  */
484 static int
485 test_flush_with_worker_shutdown(struct worker_params *wp,
486                 struct rte_mempool *p)
487 {
488         struct rte_distributor *d = wp->dist;
489         struct rte_mbuf *bufs[BURST];
490         unsigned int i;
491         unsigned int failed = 0;
492
493         printf("=== Test flush fn with worker shutdown (%s) ===\n", wp->name);
494
495         clear_packet_count();
496         if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
497                 printf("line %d: Error getting mbufs from pool\n", __LINE__);
498                 return -1;
499         }
500
501         /* now set all hash values in all buffers to zero, so all pkts go to the
502          * one worker thread */
503         for (i = 0; i < BURST; i++)
504                 bufs[i]->hash.usr = 0;
505
506         rte_distributor_process(d, bufs, BURST);
507         /* at this point, we will have processed some packets and have a full
508          * backlog for the other ones at worker 0.
509          */
510
511         /* get worker zero to quit */
512         zero_quit = 1;
513
514         /* flush the distributor */
515         rte_distributor_flush(d);
516
517         while (!__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE))
518                 rte_distributor_flush(d);
519
520         zero_quit = 0;
521
522         while (__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE))
523                 rte_delay_us(100);
524
525         for (i = 0; i < rte_lcore_count() - 1; i++)
526                 printf("Worker %u handled %u packets\n", i,
527                         __atomic_load_n(&worker_stats[i].handled_packets,
528                                         __ATOMIC_RELAXED));
529
530         if (total_packet_count() != BURST) {
531                 printf("Line %d: Error, not all packets flushed. "
532                                 "Expected %u, got %u\n",
533                                 __LINE__, BURST, total_packet_count());
534                 failed = 1;
535         }
536
537         rte_mempool_put_bulk(p, (void *)bufs, BURST);
538
539         if (failed)
540                 return -1;
541
542         printf("Flush test with worker shutdown passed\n\n");
543         return 0;
544 }
545
546 static
547 int test_error_distributor_create_name(void)
548 {
549         struct rte_distributor *d = NULL;
550         struct rte_distributor *db = NULL;
551         char *name = NULL;
552
553         d = rte_distributor_create(name, rte_socket_id(),
554                         rte_lcore_count() - 1,
555                         RTE_DIST_ALG_SINGLE);
556         if (d != NULL || rte_errno != EINVAL) {
557                 printf("ERROR: No error on create() with NULL name param\n");
558                 return -1;
559         }
560
561         db = rte_distributor_create(name, rte_socket_id(),
562                         rte_lcore_count() - 1,
563                         RTE_DIST_ALG_BURST);
564         if (db != NULL || rte_errno != EINVAL) {
565                 printf("ERROR: No error on create() with NULL param\n");
566                 return -1;
567         }
568
569         return 0;
570 }
571
572
573 static
574 int test_error_distributor_create_numworkers(void)
575 {
576         struct rte_distributor *ds = NULL;
577         struct rte_distributor *db = NULL;
578
579         ds = rte_distributor_create("test_numworkers", rte_socket_id(),
580                         RTE_MAX_LCORE + 10,
581                         RTE_DIST_ALG_SINGLE);
582         if (ds != NULL || rte_errno != EINVAL) {
583                 printf("ERROR: No error on create() with num_workers > MAX\n");
584                 return -1;
585         }
586
587         db = rte_distributor_create("test_numworkers", rte_socket_id(),
588                         RTE_MAX_LCORE + 10,
589                         RTE_DIST_ALG_BURST);
590         if (db != NULL || rte_errno != EINVAL) {
591                 printf("ERROR: No error on create() num_workers > MAX\n");
592                 return -1;
593         }
594
595         return 0;
596 }
597
598
599 /* Useful function which ensures that all worker functions terminate */
600 static void
601 quit_workers(struct worker_params *wp, struct rte_mempool *p)
602 {
603         struct rte_distributor *d = wp->dist;
604         const unsigned num_workers = rte_lcore_count() - 1;
605         unsigned i;
606         struct rte_mbuf *bufs[RTE_MAX_LCORE];
607         struct rte_mbuf *returns[RTE_MAX_LCORE];
608         if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) {
609                 printf("line %d: Error getting mbufs from pool\n", __LINE__);
610                 return;
611         }
612
613         zero_quit = 0;
614         quit = 1;
615         for (i = 0; i < num_workers; i++)
616                 bufs[i]->hash.usr = i << 1;
617         rte_distributor_process(d, bufs, num_workers);
618
619         rte_distributor_process(d, NULL, 0);
620         rte_distributor_flush(d);
621         rte_eal_mp_wait_lcore();
622
623         while (rte_distributor_returned_pkts(d, returns, RTE_MAX_LCORE))
624                 ;
625
626         rte_distributor_clear_returns(d);
627         rte_mempool_put_bulk(p, (void *)bufs, num_workers);
628
629         quit = 0;
630         worker_idx = 0;
631         zero_idx = RTE_MAX_LCORE;
632         zero_quit = 0;
633         zero_sleep = 0;
634 }
635
636 static int
637 test_distributor(void)
638 {
639         static struct rte_distributor *ds;
640         static struct rte_distributor *db;
641         static struct rte_distributor *dist[2];
642         static struct rte_mempool *p;
643         int i;
644
645         if (rte_lcore_count() < 2) {
646                 printf("Not enough cores for distributor_autotest, expecting at least 2\n");
647                 return TEST_SKIPPED;
648         }
649
650         if (db == NULL) {
651                 db = rte_distributor_create("Test_dist_burst", rte_socket_id(),
652                                 rte_lcore_count() - 1,
653                                 RTE_DIST_ALG_BURST);
654                 if (db == NULL) {
655                         printf("Error creating burst distributor\n");
656                         return -1;
657                 }
658         } else {
659                 rte_distributor_flush(db);
660                 rte_distributor_clear_returns(db);
661         }
662
663         if (ds == NULL) {
664                 ds = rte_distributor_create("Test_dist_single",
665                                 rte_socket_id(),
666                                 rte_lcore_count() - 1,
667                         RTE_DIST_ALG_SINGLE);
668                 if (ds == NULL) {
669                         printf("Error creating single distributor\n");
670                         return -1;
671                 }
672         } else {
673                 rte_distributor_flush(ds);
674                 rte_distributor_clear_returns(ds);
675         }
676
677         const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ?
678                         (BIG_BATCH * 2) - 1 : (511 * rte_lcore_count());
679         if (p == NULL) {
680                 p = rte_pktmbuf_pool_create("DT_MBUF_POOL", nb_bufs, BURST,
681                         0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
682                 if (p == NULL) {
683                         printf("Error creating mempool\n");
684                         return -1;
685                 }
686         }
687
688         dist[0] = ds;
689         dist[1] = db;
690
691         for (i = 0; i < 2; i++) {
692
693                 worker_params.dist = dist[i];
694                 if (i)
695                         strlcpy(worker_params.name, "burst",
696                                         sizeof(worker_params.name));
697                 else
698                         strlcpy(worker_params.name, "single",
699                                         sizeof(worker_params.name));
700
701                 rte_eal_mp_remote_launch(handle_work,
702                                 &worker_params, SKIP_MASTER);
703                 if (sanity_test(&worker_params, p) < 0)
704                         goto err;
705                 quit_workers(&worker_params, p);
706
707                 rte_eal_mp_remote_launch(handle_work_with_free_mbufs,
708                                 &worker_params, SKIP_MASTER);
709                 if (sanity_test_with_mbuf_alloc(&worker_params, p) < 0)
710                         goto err;
711                 quit_workers(&worker_params, p);
712
713                 if (rte_lcore_count() > 2) {
714                         rte_eal_mp_remote_launch(handle_work_for_shutdown_test,
715                                         &worker_params,
716                                         SKIP_MASTER);
717                         if (sanity_test_with_worker_shutdown(&worker_params,
718                                         p) < 0)
719                                 goto err;
720                         quit_workers(&worker_params, p);
721
722                         rte_eal_mp_remote_launch(handle_work_for_shutdown_test,
723                                         &worker_params,
724                                         SKIP_MASTER);
725                         if (test_flush_with_worker_shutdown(&worker_params,
726                                         p) < 0)
727                                 goto err;
728                         quit_workers(&worker_params, p);
729
730                 } else {
731                         printf("Too few cores to run worker shutdown test\n");
732                 }
733
734         }
735
736         if (test_error_distributor_create_numworkers() == -1 ||
737                         test_error_distributor_create_name() == -1) {
738                 printf("rte_distributor_create parameter check tests failed");
739                 return -1;
740         }
741
742         return 0;
743
744 err:
745         quit_workers(&worker_params, p);
746         return -1;
747 }
748
749 REGISTER_TEST_COMMAND(distributor_autotest, test_distributor);