1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2010-2017 Intel Corporation
9 #include <rte_cycles.h>
10 #include <rte_errno.h>
11 #include <rte_mempool.h>
13 #include <rte_distributor.h>
14 #include <rte_string_fns.h>
16 #define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */
18 #define BIG_BATCH 1024
20 struct worker_params {
22 struct rte_distributor *dist;
25 struct worker_params worker_params;
27 /* statics - all zero-initialized by default */
28 static volatile int quit; /**< general quit variable for all threads */
29 static volatile int zero_quit; /**< var for when we just want thr0 to quit*/
30 static volatile int zero_sleep; /**< thr0 has quit basic loop and is sleeping*/
31 static volatile unsigned worker_idx;
32 static volatile unsigned zero_idx;
35 volatile unsigned handled_packets;
36 } __rte_cache_aligned;
37 struct worker_stats worker_stats[RTE_MAX_LCORE];
39 /* returns the total count of the number of packets handled by the worker
40 * functions given below.
42 static inline unsigned
43 total_packet_count(void)
45 unsigned i, count = 0;
46 for (i = 0; i < worker_idx; i++)
47 count += __atomic_load_n(&worker_stats[i].handled_packets,
52 /* resets the packet counts for a new test */
54 clear_packet_count(void)
57 for (i = 0; i < RTE_MAX_LCORE; i++)
58 __atomic_store_n(&worker_stats[i].handled_packets, 0,
62 /* this is the basic worker function for sanity test
63 * it does nothing but return packets and count them.
66 handle_work(void *arg)
68 struct rte_mbuf *buf[8] __rte_cache_aligned;
69 struct worker_params *wp = arg;
70 struct rte_distributor *db = wp->dist;
72 unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
74 num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
76 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
78 num = rte_distributor_get_pkt(db, id,
81 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
83 rte_distributor_return_pkt(db, id, buf, num);
87 /* do basic sanity testing of the distributor. This test tests the following:
88 * - send 32 packets through distributor with the same tag and ensure they
89 * all go to the one worker
90 * - send 32 packets through the distributor with two different tags and
91 * verify that they go equally to two different workers.
92 * - send 32 packets with different tags through the distributors and
93 * just verify we get all packets back.
94 * - send 1024 packets through the distributor, gathering the returned packets
95 * as we go. Then verify that we correctly got all 1024 pointers back again,
96 * not necessarily in the same order (as different flows).
99 sanity_test(struct worker_params *wp, struct rte_mempool *p)
101 struct rte_distributor *db = wp->dist;
102 struct rte_mbuf *bufs[BURST];
103 struct rte_mbuf *returns[BURST*2];
104 unsigned int i, count;
105 unsigned int retries;
106 unsigned int processed;
108 printf("=== Basic distributor sanity tests ===\n");
109 clear_packet_count();
110 if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
111 printf("line %d: Error getting mbufs from pool\n", __LINE__);
115 /* now set all hash values in all buffers to zero, so all pkts go to the
116 * one worker thread */
117 for (i = 0; i < BURST; i++)
118 bufs[i]->hash.usr = 0;
121 while (processed < BURST)
122 processed += rte_distributor_process(db, &bufs[processed],
128 rte_distributor_flush(db);
129 count += rte_distributor_returned_pkts(db,
131 } while (count < BURST);
133 if (total_packet_count() != BURST) {
134 printf("Line %d: Error, not all packets flushed. "
135 "Expected %u, got %u\n",
136 __LINE__, BURST, total_packet_count());
140 for (i = 0; i < rte_lcore_count() - 1; i++)
141 printf("Worker %u handled %u packets\n", i,
142 __atomic_load_n(&worker_stats[i].handled_packets,
144 printf("Sanity test with all zero hashes done.\n");
146 /* pick two flows and check they go correctly */
147 if (rte_lcore_count() >= 3) {
148 clear_packet_count();
149 for (i = 0; i < BURST; i++)
150 bufs[i]->hash.usr = (i & 1) << 8;
152 rte_distributor_process(db, bufs, BURST);
155 rte_distributor_flush(db);
156 count += rte_distributor_returned_pkts(db,
158 } while (count < BURST);
159 if (total_packet_count() != BURST) {
160 printf("Line %d: Error, not all packets flushed. "
161 "Expected %u, got %u\n",
162 __LINE__, BURST, total_packet_count());
166 for (i = 0; i < rte_lcore_count() - 1; i++)
167 printf("Worker %u handled %u packets\n", i,
169 &worker_stats[i].handled_packets,
171 printf("Sanity test with two hash values done\n");
174 /* give a different hash value to each packet,
175 * so load gets distributed */
176 clear_packet_count();
177 for (i = 0; i < BURST; i++)
178 bufs[i]->hash.usr = i+1;
180 rte_distributor_process(db, bufs, BURST);
183 rte_distributor_flush(db);
184 count += rte_distributor_returned_pkts(db,
186 } while (count < BURST);
187 if (total_packet_count() != BURST) {
188 printf("Line %d: Error, not all packets flushed. "
189 "Expected %u, got %u\n",
190 __LINE__, BURST, total_packet_count());
194 for (i = 0; i < rte_lcore_count() - 1; i++)
195 printf("Worker %u handled %u packets\n", i,
196 __atomic_load_n(&worker_stats[i].handled_packets,
198 printf("Sanity test with non-zero hashes done\n");
200 rte_mempool_put_bulk(p, (void *)bufs, BURST);
202 /* sanity test with BIG_BATCH packets to ensure they all arrived back
203 * from the returned packets function */
204 clear_packet_count();
205 struct rte_mbuf *many_bufs[BIG_BATCH], *return_bufs[BIG_BATCH];
206 unsigned num_returned = 0;
208 /* flush out any remaining packets */
209 rte_distributor_flush(db);
210 rte_distributor_clear_returns(db);
212 if (rte_mempool_get_bulk(p, (void *)many_bufs, BIG_BATCH) != 0) {
213 printf("line %d: Error getting mbufs from pool\n", __LINE__);
216 for (i = 0; i < BIG_BATCH; i++)
217 many_bufs[i]->hash.usr = i << 2;
219 printf("=== testing big burst (%s) ===\n", wp->name);
220 for (i = 0; i < BIG_BATCH/BURST; i++) {
221 rte_distributor_process(db,
222 &many_bufs[i*BURST], BURST);
223 count = rte_distributor_returned_pkts(db,
224 &return_bufs[num_returned],
225 BIG_BATCH - num_returned);
226 num_returned += count;
228 rte_distributor_flush(db);
229 count = rte_distributor_returned_pkts(db,
230 &return_bufs[num_returned],
231 BIG_BATCH - num_returned);
232 num_returned += count;
235 rte_distributor_flush(db);
236 count = rte_distributor_returned_pkts(db,
237 &return_bufs[num_returned],
238 BIG_BATCH - num_returned);
239 num_returned += count;
241 } while ((num_returned < BIG_BATCH) && (retries < 100));
243 if (num_returned != BIG_BATCH) {
244 printf("line %d: Missing packets, expected %d\n",
245 __LINE__, num_returned);
249 /* big check - make sure all packets made it back!! */
250 for (i = 0; i < BIG_BATCH; i++) {
252 struct rte_mbuf *src = many_bufs[i];
253 for (j = 0; j < BIG_BATCH; j++) {
254 if (return_bufs[j] == src)
258 if (j == BIG_BATCH) {
259 printf("Error: could not find source packet #%u\n", i);
263 printf("Sanity test of returned packets done\n");
265 rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH);
272 /* to test that the distributor does not lose packets, we use this worker
273 * function which frees mbufs when it gets them. The distributor thread does
274 * the mbuf allocation. If distributor drops packets we'll eventually run out
278 handle_work_with_free_mbufs(void *arg)
280 struct rte_mbuf *buf[8] __rte_cache_aligned;
281 struct worker_params *wp = arg;
282 struct rte_distributor *d = wp->dist;
285 unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
287 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
289 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
291 for (i = 0; i < num; i++)
292 rte_pktmbuf_free(buf[i]);
293 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
295 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
297 rte_distributor_return_pkt(d, id, buf, num);
301 /* Perform a sanity test of the distributor with a large number of packets,
302 * where we allocate a new set of mbufs for each burst. The workers then
303 * free the mbufs. This ensures that we don't have any packet leaks in the
307 sanity_test_with_mbuf_alloc(struct worker_params *wp, struct rte_mempool *p)
309 struct rte_distributor *d = wp->dist;
311 struct rte_mbuf *bufs[BURST];
312 unsigned int processed;
314 printf("=== Sanity test with mbuf alloc/free (%s) ===\n", wp->name);
316 clear_packet_count();
317 for (i = 0; i < ((1<<ITER_POWER)); i += BURST) {
319 while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0)
320 rte_distributor_process(d, NULL, 0);
321 for (j = 0; j < BURST; j++) {
322 bufs[j]->hash.usr = (i+j) << 1;
326 while (processed < BURST)
327 processed += rte_distributor_process(d,
328 &bufs[processed], BURST - processed);
331 rte_distributor_flush(d);
335 if (total_packet_count() < (1<<ITER_POWER)) {
336 printf("Line %u: Packet count is incorrect, %u, expected %u\n",
337 __LINE__, total_packet_count(),
342 printf("Sanity test with mbuf alloc/free passed\n\n");
347 handle_work_for_shutdown_test(void *arg)
349 struct rte_mbuf *buf[8] __rte_cache_aligned;
350 struct worker_params *wp = arg;
351 struct rte_distributor *d = wp->dist;
353 unsigned int zero_id = 0;
354 unsigned int zero_unset;
355 const unsigned int id = __atomic_fetch_add(&worker_idx, 1,
358 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
361 zero_unset = RTE_MAX_LCORE;
362 __atomic_compare_exchange_n(&zero_idx, &zero_unset, id,
363 false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE);
365 zero_id = __atomic_load_n(&zero_idx, __ATOMIC_ACQUIRE);
367 /* wait for quit single globally, or for worker zero, wait
369 while (!quit && !(id == zero_id && zero_quit)) {
370 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
372 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
375 zero_unset = RTE_MAX_LCORE;
376 __atomic_compare_exchange_n(&zero_idx, &zero_unset, id,
377 false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE);
379 zero_id = __atomic_load_n(&zero_idx, __ATOMIC_ACQUIRE);
382 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
385 rte_distributor_return_pkt(d, id, NULL, 0);
387 /* for worker zero, allow it to restart to pick up last packet
388 * when all workers are shutting down.
390 __atomic_store_n(&zero_sleep, 1, __ATOMIC_RELEASE);
393 __atomic_store_n(&zero_sleep, 0, __ATOMIC_RELEASE);
395 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
398 __atomic_fetch_add(&worker_stats[id].handled_packets,
399 num, __ATOMIC_RELAXED);
400 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
403 rte_distributor_return_pkt(d, id, buf, num);
408 /* Perform a sanity test of the distributor with a large number of packets,
409 * where we allocate a new set of mbufs for each burst. The workers then
410 * free the mbufs. This ensures that we don't have any packet leaks in the
414 sanity_test_with_worker_shutdown(struct worker_params *wp,
415 struct rte_mempool *p)
417 struct rte_distributor *d = wp->dist;
418 struct rte_mbuf *bufs[BURST];
419 struct rte_mbuf *bufs2[BURST];
421 unsigned int failed = 0;
422 unsigned int processed = 0;
424 printf("=== Sanity test of worker shutdown ===\n");
426 clear_packet_count();
428 if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
429 printf("line %d: Error getting mbufs from pool\n", __LINE__);
434 * Now set all hash values in all buffers to same value so all
435 * pkts go to the one worker thread
437 for (i = 0; i < BURST; i++)
438 bufs[i]->hash.usr = 1;
441 while (processed < BURST)
442 processed += rte_distributor_process(d, &bufs[processed],
444 rte_distributor_flush(d);
446 /* at this point, we will have processed some packets and have a full
447 * backlog for the other ones at worker 0.
450 /* get more buffers to queue up, again setting them to the same flow */
451 if (rte_mempool_get_bulk(p, (void *)bufs2, BURST) != 0) {
452 printf("line %d: Error getting mbufs from pool\n", __LINE__);
453 rte_mempool_put_bulk(p, (void *)bufs, BURST);
456 for (i = 0; i < BURST; i++)
457 bufs2[i]->hash.usr = 1;
459 /* get worker zero to quit */
461 rte_distributor_process(d, bufs2, BURST);
463 /* flush the distributor */
464 rte_distributor_flush(d);
465 while (!__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE))
466 rte_distributor_flush(d);
469 while (__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE))
472 for (i = 0; i < rte_lcore_count() - 1; i++)
473 printf("Worker %u handled %u packets\n", i,
474 __atomic_load_n(&worker_stats[i].handled_packets,
477 if (total_packet_count() != BURST * 2) {
478 printf("Line %d: Error, not all packets flushed. "
479 "Expected %u, got %u\n",
480 __LINE__, BURST * 2, total_packet_count());
484 rte_mempool_put_bulk(p, (void *)bufs, BURST);
485 rte_mempool_put_bulk(p, (void *)bufs2, BURST);
490 printf("Sanity test with worker shutdown passed\n\n");
494 /* Test that the flush function is able to move packets between workers when
495 * one worker shuts down..
498 test_flush_with_worker_shutdown(struct worker_params *wp,
499 struct rte_mempool *p)
501 struct rte_distributor *d = wp->dist;
502 struct rte_mbuf *bufs[BURST];
504 unsigned int failed = 0;
505 unsigned int processed;
507 printf("=== Test flush fn with worker shutdown (%s) ===\n", wp->name);
509 clear_packet_count();
510 if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
511 printf("line %d: Error getting mbufs from pool\n", __LINE__);
515 /* now set all hash values in all buffers to zero, so all pkts go to the
516 * one worker thread */
517 for (i = 0; i < BURST; i++)
518 bufs[i]->hash.usr = 0;
521 while (processed < BURST)
522 processed += rte_distributor_process(d, &bufs[processed],
524 /* at this point, we will have processed some packets and have a full
525 * backlog for the other ones at worker 0.
528 /* get worker zero to quit */
531 /* flush the distributor */
532 rte_distributor_flush(d);
534 while (!__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE))
535 rte_distributor_flush(d);
539 while (__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE))
542 for (i = 0; i < rte_lcore_count() - 1; i++)
543 printf("Worker %u handled %u packets\n", i,
544 __atomic_load_n(&worker_stats[i].handled_packets,
547 if (total_packet_count() != BURST) {
548 printf("Line %d: Error, not all packets flushed. "
549 "Expected %u, got %u\n",
550 __LINE__, BURST, total_packet_count());
554 rte_mempool_put_bulk(p, (void *)bufs, BURST);
559 printf("Flush test with worker shutdown passed\n\n");
564 handle_and_mark_work(void *arg)
566 struct rte_mbuf *buf[8] __rte_cache_aligned;
567 struct worker_params *wp = arg;
568 struct rte_distributor *db = wp->dist;
570 unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
571 num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
573 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
575 for (i = 0; i < num; i++)
576 buf[i]->udata64 += id + 1;
577 num = rte_distributor_get_pkt(db, id,
580 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
582 rte_distributor_return_pkt(db, id, buf, num);
586 /* sanity_mark_test sends packets to workers which mark them.
587 * Every packet has also encoded sequence number.
588 * The returned packets are sorted and verified if they were handled
592 sanity_mark_test(struct worker_params *wp, struct rte_mempool *p)
594 const unsigned int buf_count = 24;
595 const unsigned int burst = 8;
596 const unsigned int shift = 12;
597 const unsigned int seq_shift = 10;
599 struct rte_distributor *db = wp->dist;
600 struct rte_mbuf *bufs[buf_count];
601 struct rte_mbuf *returns[buf_count];
602 unsigned int i, count, id;
603 unsigned int sorted[buf_count], seq;
604 unsigned int failed = 0;
605 unsigned int processed;
607 printf("=== Marked packets test ===\n");
608 clear_packet_count();
609 if (rte_mempool_get_bulk(p, (void *)bufs, buf_count) != 0) {
610 printf("line %d: Error getting mbufs from pool\n", __LINE__);
614 /* bufs' hashes will be like these below, but shifted left.
615 * The shifting is for avoiding collisions with backlogs
616 * and in-flight tags left by previous tests.
617 * [1, 1, 1, 1, 1, 1, 1, 1
618 * 1, 1, 1, 1, 2, 2, 2, 2
619 * 2, 2, 2, 2, 1, 1, 1, 1]
621 for (i = 0; i < burst; i++) {
622 bufs[0 * burst + i]->hash.usr = 1 << shift;
623 bufs[1 * burst + i]->hash.usr = ((i < burst / 2) ? 1 : 2)
625 bufs[2 * burst + i]->hash.usr = ((i < burst / 2) ? 2 : 1)
628 /* Assign a sequence number to each packet. The sequence is shifted,
629 * so that lower bits of the udate64 will hold mark from worker.
631 for (i = 0; i < buf_count; i++)
632 bufs[i]->udata64 = i << seq_shift;
635 for (i = 0; i < buf_count/burst; i++) {
637 while (processed < burst)
638 processed += rte_distributor_process(db,
639 &bufs[i * burst + processed],
641 count += rte_distributor_returned_pkts(db, &returns[count],
646 rte_distributor_flush(db);
647 count += rte_distributor_returned_pkts(db, &returns[count],
649 } while (count < buf_count);
651 for (i = 0; i < rte_lcore_count() - 1; i++)
652 printf("Worker %u handled %u packets\n", i,
653 __atomic_load_n(&worker_stats[i].handled_packets,
656 /* Sort returned packets by sent order (sequence numbers). */
657 for (i = 0; i < buf_count; i++) {
658 seq = returns[i]->udata64 >> seq_shift;
659 id = returns[i]->udata64 - (seq << seq_shift);
663 /* Verify that packets [0-11] and [20-23] were processed
666 for (i = 1; i < 12; i++) {
667 if (sorted[i] != sorted[0]) {
668 printf("Packet number %u processed by worker %u,"
669 " but should be processes by worker %u\n",
670 i, sorted[i], sorted[0]);
674 for (i = 20; i < 24; i++) {
675 if (sorted[i] != sorted[0]) {
676 printf("Packet number %u processed by worker %u,"
677 " but should be processes by worker %u\n",
678 i, sorted[i], sorted[0]);
682 /* And verify that packets [12-19] were processed
683 * by the another worker
685 for (i = 13; i < 20; i++) {
686 if (sorted[i] != sorted[12]) {
687 printf("Packet number %u processed by worker %u,"
688 " but should be processes by worker %u\n",
689 i, sorted[i], sorted[12]);
694 rte_mempool_put_bulk(p, (void *)bufs, buf_count);
699 printf("Marked packets test passed\n");
704 int test_error_distributor_create_name(void)
706 struct rte_distributor *d = NULL;
707 struct rte_distributor *db = NULL;
710 d = rte_distributor_create(name, rte_socket_id(),
711 rte_lcore_count() - 1,
712 RTE_DIST_ALG_SINGLE);
713 if (d != NULL || rte_errno != EINVAL) {
714 printf("ERROR: No error on create() with NULL name param\n");
718 db = rte_distributor_create(name, rte_socket_id(),
719 rte_lcore_count() - 1,
721 if (db != NULL || rte_errno != EINVAL) {
722 printf("ERROR: No error on create() with NULL param\n");
731 int test_error_distributor_create_numworkers(void)
733 struct rte_distributor *ds = NULL;
734 struct rte_distributor *db = NULL;
736 ds = rte_distributor_create("test_numworkers", rte_socket_id(),
738 RTE_DIST_ALG_SINGLE);
739 if (ds != NULL || rte_errno != EINVAL) {
740 printf("ERROR: No error on create() with num_workers > MAX\n");
744 db = rte_distributor_create("test_numworkers", rte_socket_id(),
747 if (db != NULL || rte_errno != EINVAL) {
748 printf("ERROR: No error on create() num_workers > MAX\n");
756 /* Useful function which ensures that all worker functions terminate */
758 quit_workers(struct worker_params *wp, struct rte_mempool *p)
760 struct rte_distributor *d = wp->dist;
761 const unsigned num_workers = rte_lcore_count() - 1;
763 struct rte_mbuf *bufs[RTE_MAX_LCORE];
764 struct rte_mbuf *returns[RTE_MAX_LCORE];
765 if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) {
766 printf("line %d: Error getting mbufs from pool\n", __LINE__);
772 for (i = 0; i < num_workers; i++)
773 bufs[i]->hash.usr = i << 1;
774 rte_distributor_process(d, bufs, num_workers);
776 rte_distributor_process(d, NULL, 0);
777 rte_distributor_flush(d);
778 rte_eal_mp_wait_lcore();
780 while (rte_distributor_returned_pkts(d, returns, RTE_MAX_LCORE))
783 rte_distributor_clear_returns(d);
784 rte_mempool_put_bulk(p, (void *)bufs, num_workers);
788 zero_idx = RTE_MAX_LCORE;
794 test_distributor(void)
796 static struct rte_distributor *ds;
797 static struct rte_distributor *db;
798 static struct rte_distributor *dist[2];
799 static struct rte_mempool *p;
802 if (rte_lcore_count() < 2) {
803 printf("Not enough cores for distributor_autotest, expecting at least 2\n");
808 db = rte_distributor_create("Test_dist_burst", rte_socket_id(),
809 rte_lcore_count() - 1,
812 printf("Error creating burst distributor\n");
816 rte_distributor_flush(db);
817 rte_distributor_clear_returns(db);
821 ds = rte_distributor_create("Test_dist_single",
823 rte_lcore_count() - 1,
824 RTE_DIST_ALG_SINGLE);
826 printf("Error creating single distributor\n");
830 rte_distributor_flush(ds);
831 rte_distributor_clear_returns(ds);
834 const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ?
835 (BIG_BATCH * 2) - 1 : (511 * rte_lcore_count());
837 p = rte_pktmbuf_pool_create("DT_MBUF_POOL", nb_bufs, BURST,
838 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
840 printf("Error creating mempool\n");
848 for (i = 0; i < 2; i++) {
850 worker_params.dist = dist[i];
852 strlcpy(worker_params.name, "burst",
853 sizeof(worker_params.name));
855 strlcpy(worker_params.name, "single",
856 sizeof(worker_params.name));
858 rte_eal_mp_remote_launch(handle_work,
859 &worker_params, SKIP_MASTER);
860 if (sanity_test(&worker_params, p) < 0)
862 quit_workers(&worker_params, p);
864 rte_eal_mp_remote_launch(handle_work_with_free_mbufs,
865 &worker_params, SKIP_MASTER);
866 if (sanity_test_with_mbuf_alloc(&worker_params, p) < 0)
868 quit_workers(&worker_params, p);
870 if (rte_lcore_count() > 2) {
871 rte_eal_mp_remote_launch(handle_work_for_shutdown_test,
874 if (sanity_test_with_worker_shutdown(&worker_params,
877 quit_workers(&worker_params, p);
879 rte_eal_mp_remote_launch(handle_work_for_shutdown_test,
882 if (test_flush_with_worker_shutdown(&worker_params,
885 quit_workers(&worker_params, p);
887 rte_eal_mp_remote_launch(handle_and_mark_work,
888 &worker_params, SKIP_MASTER);
889 if (sanity_mark_test(&worker_params, p) < 0)
891 quit_workers(&worker_params, p);
894 printf("Too few cores to run worker shutdown test\n");
899 if (test_error_distributor_create_numworkers() == -1 ||
900 test_error_distributor_create_name() == -1) {
901 printf("rte_distributor_create parameter check tests failed");
908 quit_workers(&worker_params, p);
912 REGISTER_TEST_COMMAND(distributor_autotest, test_distributor);