1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2010-2017 Intel Corporation
9 #include <rte_cycles.h>
10 #include <rte_errno.h>
11 #include <rte_mempool.h>
13 #include <rte_distributor.h>
14 #include <rte_string_fns.h>
16 #define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */
18 #define BIG_BATCH 1024
20 struct worker_params {
22 struct rte_distributor *dist;
25 struct worker_params worker_params;
27 /* statics - all zero-initialized by default */
28 static volatile int quit; /**< general quit variable for all threads */
29 static volatile int zero_quit; /**< var for when we just want thr0 to quit*/
30 static volatile int zero_sleep; /**< thr0 has quit basic loop and is sleeping*/
31 static volatile unsigned worker_idx;
32 static volatile unsigned zero_idx;
35 volatile unsigned handled_packets;
36 } __rte_cache_aligned;
37 struct worker_stats worker_stats[RTE_MAX_LCORE];
39 /* returns the total count of the number of packets handled by the worker
40 * functions given below.
42 static inline unsigned
43 total_packet_count(void)
45 unsigned i, count = 0;
46 for (i = 0; i < worker_idx; i++)
47 count += __atomic_load_n(&worker_stats[i].handled_packets,
52 /* resets the packet counts for a new test */
54 clear_packet_count(void)
57 for (i = 0; i < RTE_MAX_LCORE; i++)
58 __atomic_store_n(&worker_stats[i].handled_packets, 0,
62 /* this is the basic worker function for sanity test
63 * it does nothing but return packets and count them.
66 handle_work(void *arg)
68 struct rte_mbuf *buf[8] __rte_cache_aligned;
69 struct worker_params *wp = arg;
70 struct rte_distributor *db = wp->dist;
72 unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
74 num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
76 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
78 num = rte_distributor_get_pkt(db, id,
81 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
83 rte_distributor_return_pkt(db, id, buf, num);
87 /* do basic sanity testing of the distributor. This test tests the following:
88 * - send 32 packets through distributor with the same tag and ensure they
89 * all go to the one worker
90 * - send 32 packets through the distributor with two different tags and
91 * verify that they go equally to two different workers.
92 * - send 32 packets with different tags through the distributors and
93 * just verify we get all packets back.
94 * - send 1024 packets through the distributor, gathering the returned packets
95 * as we go. Then verify that we correctly got all 1024 pointers back again,
96 * not necessarily in the same order (as different flows).
99 sanity_test(struct worker_params *wp, struct rte_mempool *p)
101 struct rte_distributor *db = wp->dist;
102 struct rte_mbuf *bufs[BURST];
103 struct rte_mbuf *returns[BURST*2];
104 unsigned int i, count;
105 unsigned int retries;
106 unsigned int processed;
108 printf("=== Basic distributor sanity tests ===\n");
109 clear_packet_count();
110 if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
111 printf("line %d: Error getting mbufs from pool\n", __LINE__);
115 /* now set all hash values in all buffers to zero, so all pkts go to the
116 * one worker thread */
117 for (i = 0; i < BURST; i++)
118 bufs[i]->hash.usr = 0;
121 while (processed < BURST)
122 processed += rte_distributor_process(db, &bufs[processed],
128 rte_distributor_flush(db);
129 count += rte_distributor_returned_pkts(db,
131 } while (count < BURST);
133 if (total_packet_count() != BURST) {
134 printf("Line %d: Error, not all packets flushed. "
135 "Expected %u, got %u\n",
136 __LINE__, BURST, total_packet_count());
137 rte_mempool_put_bulk(p, (void *)bufs, BURST);
141 for (i = 0; i < rte_lcore_count() - 1; i++)
142 printf("Worker %u handled %u packets\n", i,
143 __atomic_load_n(&worker_stats[i].handled_packets,
145 printf("Sanity test with all zero hashes done.\n");
147 /* pick two flows and check they go correctly */
148 if (rte_lcore_count() >= 3) {
149 clear_packet_count();
150 for (i = 0; i < BURST; i++)
151 bufs[i]->hash.usr = (i & 1) << 8;
153 rte_distributor_process(db, bufs, BURST);
156 rte_distributor_flush(db);
157 count += rte_distributor_returned_pkts(db,
159 } while (count < BURST);
160 if (total_packet_count() != BURST) {
161 printf("Line %d: Error, not all packets flushed. "
162 "Expected %u, got %u\n",
163 __LINE__, BURST, total_packet_count());
164 rte_mempool_put_bulk(p, (void *)bufs, BURST);
168 for (i = 0; i < rte_lcore_count() - 1; i++)
169 printf("Worker %u handled %u packets\n", i,
171 &worker_stats[i].handled_packets,
173 printf("Sanity test with two hash values done\n");
176 /* give a different hash value to each packet,
177 * so load gets distributed */
178 clear_packet_count();
179 for (i = 0; i < BURST; i++)
180 bufs[i]->hash.usr = i+1;
182 rte_distributor_process(db, bufs, BURST);
185 rte_distributor_flush(db);
186 count += rte_distributor_returned_pkts(db,
188 } while (count < BURST);
189 if (total_packet_count() != BURST) {
190 printf("Line %d: Error, not all packets flushed. "
191 "Expected %u, got %u\n",
192 __LINE__, BURST, total_packet_count());
193 rte_mempool_put_bulk(p, (void *)bufs, BURST);
197 for (i = 0; i < rte_lcore_count() - 1; i++)
198 printf("Worker %u handled %u packets\n", i,
199 __atomic_load_n(&worker_stats[i].handled_packets,
201 printf("Sanity test with non-zero hashes done\n");
203 rte_mempool_put_bulk(p, (void *)bufs, BURST);
205 /* sanity test with BIG_BATCH packets to ensure they all arrived back
206 * from the returned packets function */
207 clear_packet_count();
208 struct rte_mbuf *many_bufs[BIG_BATCH], *return_bufs[BIG_BATCH];
209 unsigned num_returned = 0;
211 /* flush out any remaining packets */
212 rte_distributor_flush(db);
213 rte_distributor_clear_returns(db);
215 if (rte_mempool_get_bulk(p, (void *)many_bufs, BIG_BATCH) != 0) {
216 printf("line %d: Error getting mbufs from pool\n", __LINE__);
219 for (i = 0; i < BIG_BATCH; i++)
220 many_bufs[i]->hash.usr = i << 2;
222 printf("=== testing big burst (%s) ===\n", wp->name);
223 for (i = 0; i < BIG_BATCH/BURST; i++) {
224 rte_distributor_process(db,
225 &many_bufs[i*BURST], BURST);
226 count = rte_distributor_returned_pkts(db,
227 &return_bufs[num_returned],
228 BIG_BATCH - num_returned);
229 num_returned += count;
231 rte_distributor_flush(db);
232 count = rte_distributor_returned_pkts(db,
233 &return_bufs[num_returned],
234 BIG_BATCH - num_returned);
235 num_returned += count;
238 rte_distributor_flush(db);
239 count = rte_distributor_returned_pkts(db,
240 &return_bufs[num_returned],
241 BIG_BATCH - num_returned);
242 num_returned += count;
244 } while ((num_returned < BIG_BATCH) && (retries < 100));
246 if (num_returned != BIG_BATCH) {
247 printf("line %d: Missing packets, expected %d\n",
248 __LINE__, num_returned);
249 rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH);
253 /* big check - make sure all packets made it back!! */
254 for (i = 0; i < BIG_BATCH; i++) {
256 struct rte_mbuf *src = many_bufs[i];
257 for (j = 0; j < BIG_BATCH; j++) {
258 if (return_bufs[j] == src)
262 if (j == BIG_BATCH) {
263 printf("Error: could not find source packet #%u\n", i);
264 rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH);
268 printf("Sanity test of returned packets done\n");
270 rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH);
277 /* to test that the distributor does not lose packets, we use this worker
278 * function which frees mbufs when it gets them. The distributor thread does
279 * the mbuf allocation. If distributor drops packets we'll eventually run out
283 handle_work_with_free_mbufs(void *arg)
285 struct rte_mbuf *buf[8] __rte_cache_aligned;
286 struct worker_params *wp = arg;
287 struct rte_distributor *d = wp->dist;
290 unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
292 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
294 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
296 for (i = 0; i < num; i++)
297 rte_pktmbuf_free(buf[i]);
298 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
300 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
302 rte_distributor_return_pkt(d, id, buf, num);
306 /* Perform a sanity test of the distributor with a large number of packets,
307 * where we allocate a new set of mbufs for each burst. The workers then
308 * free the mbufs. This ensures that we don't have any packet leaks in the
312 sanity_test_with_mbuf_alloc(struct worker_params *wp, struct rte_mempool *p)
314 struct rte_distributor *d = wp->dist;
316 struct rte_mbuf *bufs[BURST];
317 unsigned int processed;
319 printf("=== Sanity test with mbuf alloc/free (%s) ===\n", wp->name);
321 clear_packet_count();
322 for (i = 0; i < ((1<<ITER_POWER)); i += BURST) {
324 while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0)
325 rte_distributor_process(d, NULL, 0);
326 for (j = 0; j < BURST; j++) {
327 bufs[j]->hash.usr = (i+j) << 1;
331 while (processed < BURST)
332 processed += rte_distributor_process(d,
333 &bufs[processed], BURST - processed);
336 rte_distributor_flush(d);
340 if (total_packet_count() < (1<<ITER_POWER)) {
341 printf("Line %u: Packet count is incorrect, %u, expected %u\n",
342 __LINE__, total_packet_count(),
347 printf("Sanity test with mbuf alloc/free passed\n\n");
352 handle_work_for_shutdown_test(void *arg)
354 struct rte_mbuf *buf[8] __rte_cache_aligned;
355 struct worker_params *wp = arg;
356 struct rte_distributor *d = wp->dist;
358 unsigned int zero_id = 0;
359 unsigned int zero_unset;
360 const unsigned int id = __atomic_fetch_add(&worker_idx, 1,
363 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
366 zero_unset = RTE_MAX_LCORE;
367 __atomic_compare_exchange_n(&zero_idx, &zero_unset, id,
368 false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE);
370 zero_id = __atomic_load_n(&zero_idx, __ATOMIC_ACQUIRE);
372 /* wait for quit single globally, or for worker zero, wait
374 while (!quit && !(id == zero_id && zero_quit)) {
375 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
377 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
380 zero_unset = RTE_MAX_LCORE;
381 __atomic_compare_exchange_n(&zero_idx, &zero_unset, id,
382 false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE);
384 zero_id = __atomic_load_n(&zero_idx, __ATOMIC_ACQUIRE);
387 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
390 rte_distributor_return_pkt(d, id, NULL, 0);
392 /* for worker zero, allow it to restart to pick up last packet
393 * when all workers are shutting down.
395 __atomic_store_n(&zero_sleep, 1, __ATOMIC_RELEASE);
398 __atomic_store_n(&zero_sleep, 0, __ATOMIC_RELEASE);
400 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
403 __atomic_fetch_add(&worker_stats[id].handled_packets,
404 num, __ATOMIC_RELAXED);
405 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
408 rte_distributor_return_pkt(d, id, buf, num);
413 /* Perform a sanity test of the distributor with a large number of packets,
414 * where we allocate a new set of mbufs for each burst. The workers then
415 * free the mbufs. This ensures that we don't have any packet leaks in the
419 sanity_test_with_worker_shutdown(struct worker_params *wp,
420 struct rte_mempool *p)
422 struct rte_distributor *d = wp->dist;
423 struct rte_mbuf *bufs[BURST];
424 struct rte_mbuf *bufs2[BURST];
426 unsigned int failed = 0;
427 unsigned int processed = 0;
429 printf("=== Sanity test of worker shutdown ===\n");
431 clear_packet_count();
433 if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
434 printf("line %d: Error getting mbufs from pool\n", __LINE__);
439 * Now set all hash values in all buffers to same value so all
440 * pkts go to the one worker thread
442 for (i = 0; i < BURST; i++)
443 bufs[i]->hash.usr = 1;
446 while (processed < BURST)
447 processed += rte_distributor_process(d, &bufs[processed],
449 rte_distributor_flush(d);
451 /* at this point, we will have processed some packets and have a full
452 * backlog for the other ones at worker 0.
455 /* get more buffers to queue up, again setting them to the same flow */
456 if (rte_mempool_get_bulk(p, (void *)bufs2, BURST) != 0) {
457 printf("line %d: Error getting mbufs from pool\n", __LINE__);
458 rte_mempool_put_bulk(p, (void *)bufs, BURST);
461 for (i = 0; i < BURST; i++)
462 bufs2[i]->hash.usr = 1;
464 /* get worker zero to quit */
466 rte_distributor_process(d, bufs2, BURST);
468 /* flush the distributor */
469 rte_distributor_flush(d);
470 while (!__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE))
471 rte_distributor_flush(d);
474 while (__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE))
477 for (i = 0; i < rte_lcore_count() - 1; i++)
478 printf("Worker %u handled %u packets\n", i,
479 __atomic_load_n(&worker_stats[i].handled_packets,
482 if (total_packet_count() != BURST * 2) {
483 printf("Line %d: Error, not all packets flushed. "
484 "Expected %u, got %u\n",
485 __LINE__, BURST * 2, total_packet_count());
489 rte_mempool_put_bulk(p, (void *)bufs, BURST);
490 rte_mempool_put_bulk(p, (void *)bufs2, BURST);
495 printf("Sanity test with worker shutdown passed\n\n");
499 /* Test that the flush function is able to move packets between workers when
500 * one worker shuts down..
503 test_flush_with_worker_shutdown(struct worker_params *wp,
504 struct rte_mempool *p)
506 struct rte_distributor *d = wp->dist;
507 struct rte_mbuf *bufs[BURST];
509 unsigned int failed = 0;
510 unsigned int processed;
512 printf("=== Test flush fn with worker shutdown (%s) ===\n", wp->name);
514 clear_packet_count();
515 if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
516 printf("line %d: Error getting mbufs from pool\n", __LINE__);
520 /* now set all hash values in all buffers to zero, so all pkts go to the
521 * one worker thread */
522 for (i = 0; i < BURST; i++)
523 bufs[i]->hash.usr = 0;
526 while (processed < BURST)
527 processed += rte_distributor_process(d, &bufs[processed],
529 /* at this point, we will have processed some packets and have a full
530 * backlog for the other ones at worker 0.
533 /* get worker zero to quit */
536 /* flush the distributor */
537 rte_distributor_flush(d);
539 while (!__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE))
540 rte_distributor_flush(d);
544 while (__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE))
547 for (i = 0; i < rte_lcore_count() - 1; i++)
548 printf("Worker %u handled %u packets\n", i,
549 __atomic_load_n(&worker_stats[i].handled_packets,
552 if (total_packet_count() != BURST) {
553 printf("Line %d: Error, not all packets flushed. "
554 "Expected %u, got %u\n",
555 __LINE__, BURST, total_packet_count());
559 rte_mempool_put_bulk(p, (void *)bufs, BURST);
564 printf("Flush test with worker shutdown passed\n\n");
569 handle_and_mark_work(void *arg)
571 struct rte_mbuf *buf[8] __rte_cache_aligned;
572 struct worker_params *wp = arg;
573 struct rte_distributor *db = wp->dist;
575 unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
576 num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
578 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
580 for (i = 0; i < num; i++)
581 buf[i]->udata64 += id + 1;
582 num = rte_distributor_get_pkt(db, id,
585 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
587 rte_distributor_return_pkt(db, id, buf, num);
591 /* sanity_mark_test sends packets to workers which mark them.
592 * Every packet has also encoded sequence number.
593 * The returned packets are sorted and verified if they were handled
597 sanity_mark_test(struct worker_params *wp, struct rte_mempool *p)
599 const unsigned int buf_count = 24;
600 const unsigned int burst = 8;
601 const unsigned int shift = 12;
602 const unsigned int seq_shift = 10;
604 struct rte_distributor *db = wp->dist;
605 struct rte_mbuf *bufs[buf_count];
606 struct rte_mbuf *returns[buf_count];
607 unsigned int i, count, id;
608 unsigned int sorted[buf_count], seq;
609 unsigned int failed = 0;
610 unsigned int processed;
612 printf("=== Marked packets test ===\n");
613 clear_packet_count();
614 if (rte_mempool_get_bulk(p, (void *)bufs, buf_count) != 0) {
615 printf("line %d: Error getting mbufs from pool\n", __LINE__);
619 /* bufs' hashes will be like these below, but shifted left.
620 * The shifting is for avoiding collisions with backlogs
621 * and in-flight tags left by previous tests.
622 * [1, 1, 1, 1, 1, 1, 1, 1
623 * 1, 1, 1, 1, 2, 2, 2, 2
624 * 2, 2, 2, 2, 1, 1, 1, 1]
626 for (i = 0; i < burst; i++) {
627 bufs[0 * burst + i]->hash.usr = 1 << shift;
628 bufs[1 * burst + i]->hash.usr = ((i < burst / 2) ? 1 : 2)
630 bufs[2 * burst + i]->hash.usr = ((i < burst / 2) ? 2 : 1)
633 /* Assign a sequence number to each packet. The sequence is shifted,
634 * so that lower bits of the udate64 will hold mark from worker.
636 for (i = 0; i < buf_count; i++)
637 bufs[i]->udata64 = i << seq_shift;
640 for (i = 0; i < buf_count/burst; i++) {
642 while (processed < burst)
643 processed += rte_distributor_process(db,
644 &bufs[i * burst + processed],
646 count += rte_distributor_returned_pkts(db, &returns[count],
651 rte_distributor_flush(db);
652 count += rte_distributor_returned_pkts(db, &returns[count],
654 } while (count < buf_count);
656 for (i = 0; i < rte_lcore_count() - 1; i++)
657 printf("Worker %u handled %u packets\n", i,
658 __atomic_load_n(&worker_stats[i].handled_packets,
661 /* Sort returned packets by sent order (sequence numbers). */
662 for (i = 0; i < buf_count; i++) {
663 seq = returns[i]->udata64 >> seq_shift;
664 id = returns[i]->udata64 - (seq << seq_shift);
668 /* Verify that packets [0-11] and [20-23] were processed
671 for (i = 1; i < 12; i++) {
672 if (sorted[i] != sorted[0]) {
673 printf("Packet number %u processed by worker %u,"
674 " but should be processes by worker %u\n",
675 i, sorted[i], sorted[0]);
679 for (i = 20; i < 24; i++) {
680 if (sorted[i] != sorted[0]) {
681 printf("Packet number %u processed by worker %u,"
682 " but should be processes by worker %u\n",
683 i, sorted[i], sorted[0]);
687 /* And verify that packets [12-19] were processed
688 * by the another worker
690 for (i = 13; i < 20; i++) {
691 if (sorted[i] != sorted[12]) {
692 printf("Packet number %u processed by worker %u,"
693 " but should be processes by worker %u\n",
694 i, sorted[i], sorted[12]);
699 rte_mempool_put_bulk(p, (void *)bufs, buf_count);
704 printf("Marked packets test passed\n");
709 int test_error_distributor_create_name(void)
711 struct rte_distributor *d = NULL;
712 struct rte_distributor *db = NULL;
715 d = rte_distributor_create(name, rte_socket_id(),
716 rte_lcore_count() - 1,
717 RTE_DIST_ALG_SINGLE);
718 if (d != NULL || rte_errno != EINVAL) {
719 printf("ERROR: No error on create() with NULL name param\n");
723 db = rte_distributor_create(name, rte_socket_id(),
724 rte_lcore_count() - 1,
726 if (db != NULL || rte_errno != EINVAL) {
727 printf("ERROR: No error on create() with NULL param\n");
736 int test_error_distributor_create_numworkers(void)
738 struct rte_distributor *ds = NULL;
739 struct rte_distributor *db = NULL;
741 ds = rte_distributor_create("test_numworkers", rte_socket_id(),
743 RTE_DIST_ALG_SINGLE);
744 if (ds != NULL || rte_errno != EINVAL) {
745 printf("ERROR: No error on create() with num_workers > MAX\n");
749 db = rte_distributor_create("test_numworkers", rte_socket_id(),
752 if (db != NULL || rte_errno != EINVAL) {
753 printf("ERROR: No error on create() num_workers > MAX\n");
761 /* Useful function which ensures that all worker functions terminate */
763 quit_workers(struct worker_params *wp, struct rte_mempool *p)
765 struct rte_distributor *d = wp->dist;
766 const unsigned num_workers = rte_lcore_count() - 1;
768 struct rte_mbuf *bufs[RTE_MAX_LCORE];
769 struct rte_mbuf *returns[RTE_MAX_LCORE];
770 if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) {
771 printf("line %d: Error getting mbufs from pool\n", __LINE__);
777 for (i = 0; i < num_workers; i++) {
778 bufs[i]->hash.usr = i << 1;
779 rte_distributor_process(d, &bufs[i], 1);
782 rte_distributor_process(d, NULL, 0);
783 rte_distributor_flush(d);
784 rte_eal_mp_wait_lcore();
786 while (rte_distributor_returned_pkts(d, returns, RTE_MAX_LCORE))
789 rte_distributor_clear_returns(d);
790 rte_mempool_put_bulk(p, (void *)bufs, num_workers);
794 zero_idx = RTE_MAX_LCORE;
800 test_distributor(void)
802 static struct rte_distributor *ds;
803 static struct rte_distributor *db;
804 static struct rte_distributor *dist[2];
805 static struct rte_mempool *p;
808 if (rte_lcore_count() < 2) {
809 printf("Not enough cores for distributor_autotest, expecting at least 2\n");
814 db = rte_distributor_create("Test_dist_burst", rte_socket_id(),
815 rte_lcore_count() - 1,
818 printf("Error creating burst distributor\n");
822 rte_distributor_flush(db);
823 rte_distributor_clear_returns(db);
827 ds = rte_distributor_create("Test_dist_single",
829 rte_lcore_count() - 1,
830 RTE_DIST_ALG_SINGLE);
832 printf("Error creating single distributor\n");
836 rte_distributor_flush(ds);
837 rte_distributor_clear_returns(ds);
840 const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ?
841 (BIG_BATCH * 2) - 1 : (511 * rte_lcore_count());
843 p = rte_pktmbuf_pool_create("DT_MBUF_POOL", nb_bufs, BURST,
844 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
846 printf("Error creating mempool\n");
854 for (i = 0; i < 2; i++) {
856 worker_params.dist = dist[i];
858 strlcpy(worker_params.name, "burst",
859 sizeof(worker_params.name));
861 strlcpy(worker_params.name, "single",
862 sizeof(worker_params.name));
864 rte_eal_mp_remote_launch(handle_work,
865 &worker_params, SKIP_MASTER);
866 if (sanity_test(&worker_params, p) < 0)
868 quit_workers(&worker_params, p);
870 rte_eal_mp_remote_launch(handle_work_with_free_mbufs,
871 &worker_params, SKIP_MASTER);
872 if (sanity_test_with_mbuf_alloc(&worker_params, p) < 0)
874 quit_workers(&worker_params, p);
876 if (rte_lcore_count() > 2) {
877 rte_eal_mp_remote_launch(handle_work_for_shutdown_test,
880 if (sanity_test_with_worker_shutdown(&worker_params,
883 quit_workers(&worker_params, p);
885 rte_eal_mp_remote_launch(handle_work_for_shutdown_test,
888 if (test_flush_with_worker_shutdown(&worker_params,
891 quit_workers(&worker_params, p);
893 rte_eal_mp_remote_launch(handle_and_mark_work,
894 &worker_params, SKIP_MASTER);
895 if (sanity_mark_test(&worker_params, p) < 0)
897 quit_workers(&worker_params, p);
900 printf("Too few cores to run worker shutdown test\n");
905 if (test_error_distributor_create_numworkers() == -1 ||
906 test_error_distributor_create_name() == -1) {
907 printf("rte_distributor_create parameter check tests failed");
914 quit_workers(&worker_params, p);
918 REGISTER_TEST_COMMAND(distributor_autotest, test_distributor);