1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2010-2017 Intel Corporation
9 #include <rte_cycles.h>
10 #include <rte_errno.h>
11 #include <rte_mempool.h>
13 #include <rte_mbuf_dyn.h>
14 #include <rte_distributor.h>
15 #include <rte_string_fns.h>
17 #define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */
19 #define BIG_BATCH 1024
21 typedef uint32_t seq_dynfield_t;
22 static int seq_dynfield_offset = -1;
24 static inline seq_dynfield_t *
25 seq_field(struct rte_mbuf *mbuf)
27 return RTE_MBUF_DYNFIELD(mbuf, seq_dynfield_offset, seq_dynfield_t *);
30 struct worker_params {
32 struct rte_distributor *dist;
35 struct worker_params worker_params;
37 /* statics - all zero-initialized by default */
38 static volatile int quit; /**< general quit variable for all threads */
39 static volatile int zero_quit; /**< var for when we just want thr0 to quit*/
40 static volatile int zero_sleep; /**< thr0 has quit basic loop and is sleeping*/
41 static volatile unsigned worker_idx;
42 static volatile unsigned zero_idx;
45 volatile unsigned handled_packets;
46 } __rte_cache_aligned;
47 struct worker_stats worker_stats[RTE_MAX_LCORE];
49 /* returns the total count of the number of packets handled by the worker
50 * functions given below.
52 static inline unsigned
53 total_packet_count(void)
55 unsigned i, count = 0;
56 for (i = 0; i < worker_idx; i++)
57 count += __atomic_load_n(&worker_stats[i].handled_packets,
62 /* resets the packet counts for a new test */
64 clear_packet_count(void)
67 for (i = 0; i < RTE_MAX_LCORE; i++)
68 __atomic_store_n(&worker_stats[i].handled_packets, 0,
72 /* this is the basic worker function for sanity test
73 * it does nothing but return packets and count them.
76 handle_work(void *arg)
78 struct rte_mbuf *buf[8] __rte_cache_aligned;
79 struct worker_params *wp = arg;
80 struct rte_distributor *db = wp->dist;
82 unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
84 num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
86 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
88 num = rte_distributor_get_pkt(db, id,
91 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
93 rte_distributor_return_pkt(db, id, buf, num);
97 /* do basic sanity testing of the distributor. This test tests the following:
98 * - send 32 packets through distributor with the same tag and ensure they
99 * all go to the one worker
100 * - send 32 packets through the distributor with two different tags and
101 * verify that they go equally to two different workers.
102 * - send 32 packets with different tags through the distributors and
103 * just verify we get all packets back.
104 * - send 1024 packets through the distributor, gathering the returned packets
105 * as we go. Then verify that we correctly got all 1024 pointers back again,
106 * not necessarily in the same order (as different flows).
109 sanity_test(struct worker_params *wp, struct rte_mempool *p)
111 struct rte_distributor *db = wp->dist;
112 struct rte_mbuf *bufs[BURST];
113 struct rte_mbuf *returns[BURST*2];
114 unsigned int i, count;
115 unsigned int retries;
116 unsigned int processed;
118 printf("=== Basic distributor sanity tests ===\n");
119 clear_packet_count();
120 if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
121 printf("line %d: Error getting mbufs from pool\n", __LINE__);
125 /* now set all hash values in all buffers to zero, so all pkts go to the
126 * one worker thread */
127 for (i = 0; i < BURST; i++)
128 bufs[i]->hash.usr = 0;
131 while (processed < BURST)
132 processed += rte_distributor_process(db, &bufs[processed],
138 rte_distributor_flush(db);
139 count += rte_distributor_returned_pkts(db,
141 } while (count < BURST);
143 if (total_packet_count() != BURST) {
144 printf("Line %d: Error, not all packets flushed. "
145 "Expected %u, got %u\n",
146 __LINE__, BURST, total_packet_count());
147 rte_mempool_put_bulk(p, (void *)bufs, BURST);
151 for (i = 0; i < rte_lcore_count() - 1; i++)
152 printf("Worker %u handled %u packets\n", i,
153 __atomic_load_n(&worker_stats[i].handled_packets,
155 printf("Sanity test with all zero hashes done.\n");
157 /* pick two flows and check they go correctly */
158 if (rte_lcore_count() >= 3) {
159 clear_packet_count();
160 for (i = 0; i < BURST; i++)
161 bufs[i]->hash.usr = (i & 1) << 8;
163 rte_distributor_process(db, bufs, BURST);
166 rte_distributor_flush(db);
167 count += rte_distributor_returned_pkts(db,
169 } while (count < BURST);
170 if (total_packet_count() != BURST) {
171 printf("Line %d: Error, not all packets flushed. "
172 "Expected %u, got %u\n",
173 __LINE__, BURST, total_packet_count());
174 rte_mempool_put_bulk(p, (void *)bufs, BURST);
178 for (i = 0; i < rte_lcore_count() - 1; i++)
179 printf("Worker %u handled %u packets\n", i,
181 &worker_stats[i].handled_packets,
183 printf("Sanity test with two hash values done\n");
186 /* give a different hash value to each packet,
187 * so load gets distributed */
188 clear_packet_count();
189 for (i = 0; i < BURST; i++)
190 bufs[i]->hash.usr = i+1;
192 rte_distributor_process(db, bufs, BURST);
195 rte_distributor_flush(db);
196 count += rte_distributor_returned_pkts(db,
198 } while (count < BURST);
199 if (total_packet_count() != BURST) {
200 printf("Line %d: Error, not all packets flushed. "
201 "Expected %u, got %u\n",
202 __LINE__, BURST, total_packet_count());
203 rte_mempool_put_bulk(p, (void *)bufs, BURST);
207 for (i = 0; i < rte_lcore_count() - 1; i++)
208 printf("Worker %u handled %u packets\n", i,
209 __atomic_load_n(&worker_stats[i].handled_packets,
211 printf("Sanity test with non-zero hashes done\n");
213 rte_mempool_put_bulk(p, (void *)bufs, BURST);
215 /* sanity test with BIG_BATCH packets to ensure they all arrived back
216 * from the returned packets function */
217 clear_packet_count();
218 struct rte_mbuf *many_bufs[BIG_BATCH], *return_bufs[BIG_BATCH];
219 unsigned num_returned = 0;
221 /* flush out any remaining packets */
222 rte_distributor_flush(db);
223 rte_distributor_clear_returns(db);
225 if (rte_mempool_get_bulk(p, (void *)many_bufs, BIG_BATCH) != 0) {
226 printf("line %d: Error getting mbufs from pool\n", __LINE__);
229 for (i = 0; i < BIG_BATCH; i++)
230 many_bufs[i]->hash.usr = i << 2;
232 printf("=== testing big burst (%s) ===\n", wp->name);
233 for (i = 0; i < BIG_BATCH/BURST; i++) {
234 rte_distributor_process(db,
235 &many_bufs[i*BURST], BURST);
236 count = rte_distributor_returned_pkts(db,
237 &return_bufs[num_returned],
238 BIG_BATCH - num_returned);
239 num_returned += count;
241 rte_distributor_flush(db);
242 count = rte_distributor_returned_pkts(db,
243 &return_bufs[num_returned],
244 BIG_BATCH - num_returned);
245 num_returned += count;
248 rte_distributor_flush(db);
249 count = rte_distributor_returned_pkts(db,
250 &return_bufs[num_returned],
251 BIG_BATCH - num_returned);
252 num_returned += count;
254 } while ((num_returned < BIG_BATCH) && (retries < 100));
256 if (num_returned != BIG_BATCH) {
257 printf("line %d: Missing packets, expected %d\n",
258 __LINE__, num_returned);
259 rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH);
263 /* big check - make sure all packets made it back!! */
264 for (i = 0; i < BIG_BATCH; i++) {
266 struct rte_mbuf *src = many_bufs[i];
267 for (j = 0; j < BIG_BATCH; j++) {
268 if (return_bufs[j] == src)
272 if (j == BIG_BATCH) {
273 printf("Error: could not find source packet #%u\n", i);
274 rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH);
278 printf("Sanity test of returned packets done\n");
280 rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH);
287 /* to test that the distributor does not lose packets, we use this worker
288 * function which frees mbufs when it gets them. The distributor thread does
289 * the mbuf allocation. If distributor drops packets we'll eventually run out
293 handle_work_with_free_mbufs(void *arg)
295 struct rte_mbuf *buf[8] __rte_cache_aligned;
296 struct worker_params *wp = arg;
297 struct rte_distributor *d = wp->dist;
300 unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
302 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
304 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
306 for (i = 0; i < num; i++)
307 rte_pktmbuf_free(buf[i]);
308 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
310 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
312 rte_distributor_return_pkt(d, id, buf, num);
316 /* Perform a sanity test of the distributor with a large number of packets,
317 * where we allocate a new set of mbufs for each burst. The workers then
318 * free the mbufs. This ensures that we don't have any packet leaks in the
322 sanity_test_with_mbuf_alloc(struct worker_params *wp, struct rte_mempool *p)
324 struct rte_distributor *d = wp->dist;
326 struct rte_mbuf *bufs[BURST];
327 unsigned int processed;
329 printf("=== Sanity test with mbuf alloc/free (%s) ===\n", wp->name);
331 clear_packet_count();
332 for (i = 0; i < ((1<<ITER_POWER)); i += BURST) {
334 while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0)
335 rte_distributor_process(d, NULL, 0);
336 for (j = 0; j < BURST; j++) {
337 bufs[j]->hash.usr = (i+j) << 1;
341 while (processed < BURST)
342 processed += rte_distributor_process(d,
343 &bufs[processed], BURST - processed);
346 rte_distributor_flush(d);
350 if (total_packet_count() < (1<<ITER_POWER)) {
351 printf("Line %u: Packet count is incorrect, %u, expected %u\n",
352 __LINE__, total_packet_count(),
357 printf("Sanity test with mbuf alloc/free passed\n\n");
362 handle_work_for_shutdown_test(void *arg)
364 struct rte_mbuf *buf[8] __rte_cache_aligned;
365 struct worker_params *wp = arg;
366 struct rte_distributor *d = wp->dist;
368 unsigned int zero_id = 0;
369 unsigned int zero_unset;
370 const unsigned int id = __atomic_fetch_add(&worker_idx, 1,
373 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
376 zero_unset = RTE_MAX_LCORE;
377 __atomic_compare_exchange_n(&zero_idx, &zero_unset, id,
378 false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE);
380 zero_id = __atomic_load_n(&zero_idx, __ATOMIC_ACQUIRE);
382 /* wait for quit single globally, or for worker zero, wait
384 while (!quit && !(id == zero_id && zero_quit)) {
385 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
387 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
390 zero_unset = RTE_MAX_LCORE;
391 __atomic_compare_exchange_n(&zero_idx, &zero_unset, id,
392 false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE);
394 zero_id = __atomic_load_n(&zero_idx, __ATOMIC_ACQUIRE);
397 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
400 rte_distributor_return_pkt(d, id, NULL, 0);
402 /* for worker zero, allow it to restart to pick up last packet
403 * when all workers are shutting down.
405 __atomic_store_n(&zero_sleep, 1, __ATOMIC_RELEASE);
408 __atomic_store_n(&zero_sleep, 0, __ATOMIC_RELEASE);
410 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
413 __atomic_fetch_add(&worker_stats[id].handled_packets,
414 num, __ATOMIC_RELAXED);
415 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
418 rte_distributor_return_pkt(d, id, buf, num);
423 /* Perform a sanity test of the distributor with a large number of packets,
424 * where we allocate a new set of mbufs for each burst. The workers then
425 * free the mbufs. This ensures that we don't have any packet leaks in the
429 sanity_test_with_worker_shutdown(struct worker_params *wp,
430 struct rte_mempool *p)
432 struct rte_distributor *d = wp->dist;
433 struct rte_mbuf *bufs[BURST];
434 struct rte_mbuf *bufs2[BURST];
436 unsigned int failed = 0;
437 unsigned int processed = 0;
439 printf("=== Sanity test of worker shutdown ===\n");
441 clear_packet_count();
443 if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
444 printf("line %d: Error getting mbufs from pool\n", __LINE__);
449 * Now set all hash values in all buffers to same value so all
450 * pkts go to the one worker thread
452 for (i = 0; i < BURST; i++)
453 bufs[i]->hash.usr = 1;
456 while (processed < BURST)
457 processed += rte_distributor_process(d, &bufs[processed],
459 rte_distributor_flush(d);
461 /* at this point, we will have processed some packets and have a full
462 * backlog for the other ones at worker 0.
465 /* get more buffers to queue up, again setting them to the same flow */
466 if (rte_mempool_get_bulk(p, (void *)bufs2, BURST) != 0) {
467 printf("line %d: Error getting mbufs from pool\n", __LINE__);
468 rte_mempool_put_bulk(p, (void *)bufs, BURST);
471 for (i = 0; i < BURST; i++)
472 bufs2[i]->hash.usr = 1;
474 /* get worker zero to quit */
476 rte_distributor_process(d, bufs2, BURST);
478 /* flush the distributor */
479 rte_distributor_flush(d);
480 while (!__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE))
481 rte_distributor_flush(d);
484 while (__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE))
487 for (i = 0; i < rte_lcore_count() - 1; i++)
488 printf("Worker %u handled %u packets\n", i,
489 __atomic_load_n(&worker_stats[i].handled_packets,
492 if (total_packet_count() != BURST * 2) {
493 printf("Line %d: Error, not all packets flushed. "
494 "Expected %u, got %u\n",
495 __LINE__, BURST * 2, total_packet_count());
499 rte_mempool_put_bulk(p, (void *)bufs, BURST);
500 rte_mempool_put_bulk(p, (void *)bufs2, BURST);
505 printf("Sanity test with worker shutdown passed\n\n");
509 /* Test that the flush function is able to move packets between workers when
510 * one worker shuts down..
513 test_flush_with_worker_shutdown(struct worker_params *wp,
514 struct rte_mempool *p)
516 struct rte_distributor *d = wp->dist;
517 struct rte_mbuf *bufs[BURST];
519 unsigned int failed = 0;
520 unsigned int processed;
522 printf("=== Test flush fn with worker shutdown (%s) ===\n", wp->name);
524 clear_packet_count();
525 if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
526 printf("line %d: Error getting mbufs from pool\n", __LINE__);
530 /* now set all hash values in all buffers to zero, so all pkts go to the
531 * one worker thread */
532 for (i = 0; i < BURST; i++)
533 bufs[i]->hash.usr = 0;
536 while (processed < BURST)
537 processed += rte_distributor_process(d, &bufs[processed],
539 /* at this point, we will have processed some packets and have a full
540 * backlog for the other ones at worker 0.
543 /* get worker zero to quit */
546 /* flush the distributor */
547 rte_distributor_flush(d);
549 while (!__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE))
550 rte_distributor_flush(d);
554 while (__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE))
557 for (i = 0; i < rte_lcore_count() - 1; i++)
558 printf("Worker %u handled %u packets\n", i,
559 __atomic_load_n(&worker_stats[i].handled_packets,
562 if (total_packet_count() != BURST) {
563 printf("Line %d: Error, not all packets flushed. "
564 "Expected %u, got %u\n",
565 __LINE__, BURST, total_packet_count());
569 rte_mempool_put_bulk(p, (void *)bufs, BURST);
574 printf("Flush test with worker shutdown passed\n\n");
579 handle_and_mark_work(void *arg)
581 struct rte_mbuf *buf[8] __rte_cache_aligned;
582 struct worker_params *wp = arg;
583 struct rte_distributor *db = wp->dist;
585 unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
586 num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
588 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
590 for (i = 0; i < num; i++)
591 *seq_field(buf[i]) += id + 1;
592 num = rte_distributor_get_pkt(db, id,
595 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
597 rte_distributor_return_pkt(db, id, buf, num);
601 /* sanity_mark_test sends packets to workers which mark them.
602 * Every packet has also encoded sequence number.
603 * The returned packets are sorted and verified if they were handled
607 sanity_mark_test(struct worker_params *wp, struct rte_mempool *p)
609 const unsigned int buf_count = 24;
610 const unsigned int burst = 8;
611 const unsigned int shift = 12;
612 const unsigned int seq_shift = 10;
614 struct rte_distributor *db = wp->dist;
615 struct rte_mbuf *bufs[buf_count];
616 struct rte_mbuf *returns[buf_count];
617 unsigned int i, count, id;
618 unsigned int sorted[buf_count], seq;
619 unsigned int failed = 0;
620 unsigned int processed;
622 printf("=== Marked packets test ===\n");
623 clear_packet_count();
624 if (rte_mempool_get_bulk(p, (void *)bufs, buf_count) != 0) {
625 printf("line %d: Error getting mbufs from pool\n", __LINE__);
629 /* bufs' hashes will be like these below, but shifted left.
630 * The shifting is for avoiding collisions with backlogs
631 * and in-flight tags left by previous tests.
632 * [1, 1, 1, 1, 1, 1, 1, 1
633 * 1, 1, 1, 1, 2, 2, 2, 2
634 * 2, 2, 2, 2, 1, 1, 1, 1]
636 for (i = 0; i < burst; i++) {
637 bufs[0 * burst + i]->hash.usr = 1 << shift;
638 bufs[1 * burst + i]->hash.usr = ((i < burst / 2) ? 1 : 2)
640 bufs[2 * burst + i]->hash.usr = ((i < burst / 2) ? 2 : 1)
643 /* Assign a sequence number to each packet. The sequence is shifted,
644 * so that lower bits will hold mark from worker.
646 for (i = 0; i < buf_count; i++)
647 *seq_field(bufs[i]) = i << seq_shift;
650 for (i = 0; i < buf_count/burst; i++) {
652 while (processed < burst)
653 processed += rte_distributor_process(db,
654 &bufs[i * burst + processed],
656 count += rte_distributor_returned_pkts(db, &returns[count],
661 rte_distributor_flush(db);
662 count += rte_distributor_returned_pkts(db, &returns[count],
664 } while (count < buf_count);
666 for (i = 0; i < rte_lcore_count() - 1; i++)
667 printf("Worker %u handled %u packets\n", i,
668 __atomic_load_n(&worker_stats[i].handled_packets,
671 /* Sort returned packets by sent order (sequence numbers). */
672 for (i = 0; i < buf_count; i++) {
673 seq = *seq_field(returns[i]) >> seq_shift;
674 id = *seq_field(returns[i]) - (seq << seq_shift);
678 /* Verify that packets [0-11] and [20-23] were processed
681 for (i = 1; i < 12; i++) {
682 if (sorted[i] != sorted[0]) {
683 printf("Packet number %u processed by worker %u,"
684 " but should be processes by worker %u\n",
685 i, sorted[i], sorted[0]);
689 for (i = 20; i < 24; i++) {
690 if (sorted[i] != sorted[0]) {
691 printf("Packet number %u processed by worker %u,"
692 " but should be processes by worker %u\n",
693 i, sorted[i], sorted[0]);
697 /* And verify that packets [12-19] were processed
698 * by the another worker
700 for (i = 13; i < 20; i++) {
701 if (sorted[i] != sorted[12]) {
702 printf("Packet number %u processed by worker %u,"
703 " but should be processes by worker %u\n",
704 i, sorted[i], sorted[12]);
709 rte_mempool_put_bulk(p, (void *)bufs, buf_count);
714 printf("Marked packets test passed\n");
719 int test_error_distributor_create_name(void)
721 struct rte_distributor *d = NULL;
722 struct rte_distributor *db = NULL;
725 d = rte_distributor_create(name, rte_socket_id(),
726 rte_lcore_count() - 1,
727 RTE_DIST_ALG_SINGLE);
728 if (d != NULL || rte_errno != EINVAL) {
729 printf("ERROR: No error on create() with NULL name param\n");
733 db = rte_distributor_create(name, rte_socket_id(),
734 rte_lcore_count() - 1,
736 if (db != NULL || rte_errno != EINVAL) {
737 printf("ERROR: No error on create() with NULL param\n");
746 int test_error_distributor_create_numworkers(void)
748 struct rte_distributor *ds = NULL;
749 struct rte_distributor *db = NULL;
751 ds = rte_distributor_create("test_numworkers", rte_socket_id(),
753 RTE_DIST_ALG_SINGLE);
754 if (ds != NULL || rte_errno != EINVAL) {
755 printf("ERROR: No error on create() with num_workers > MAX\n");
759 db = rte_distributor_create("test_numworkers", rte_socket_id(),
762 if (db != NULL || rte_errno != EINVAL) {
763 printf("ERROR: No error on create() num_workers > MAX\n");
771 /* Useful function which ensures that all worker functions terminate */
773 quit_workers(struct worker_params *wp, struct rte_mempool *p)
775 struct rte_distributor *d = wp->dist;
776 const unsigned num_workers = rte_lcore_count() - 1;
778 struct rte_mbuf *bufs[RTE_MAX_LCORE];
779 struct rte_mbuf *returns[RTE_MAX_LCORE];
780 if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) {
781 printf("line %d: Error getting mbufs from pool\n", __LINE__);
787 for (i = 0; i < num_workers; i++) {
788 bufs[i]->hash.usr = i << 1;
789 rte_distributor_process(d, &bufs[i], 1);
792 rte_distributor_process(d, NULL, 0);
793 rte_distributor_flush(d);
794 rte_eal_mp_wait_lcore();
796 while (rte_distributor_returned_pkts(d, returns, RTE_MAX_LCORE))
799 rte_distributor_clear_returns(d);
800 rte_mempool_put_bulk(p, (void *)bufs, num_workers);
804 zero_idx = RTE_MAX_LCORE;
810 test_distributor(void)
812 static struct rte_distributor *ds;
813 static struct rte_distributor *db;
814 static struct rte_distributor *dist[2];
815 static struct rte_mempool *p;
818 static const struct rte_mbuf_dynfield seq_dynfield_desc = {
819 .name = "test_distributor_dynfield_seq",
820 .size = sizeof(seq_dynfield_t),
821 .align = __alignof__(seq_dynfield_t),
823 seq_dynfield_offset =
824 rte_mbuf_dynfield_register(&seq_dynfield_desc);
825 if (seq_dynfield_offset < 0) {
826 printf("Error registering mbuf field\n");
830 if (rte_lcore_count() < 2) {
831 printf("Not enough cores for distributor_autotest, expecting at least 2\n");
836 db = rte_distributor_create("Test_dist_burst", rte_socket_id(),
837 rte_lcore_count() - 1,
840 printf("Error creating burst distributor\n");
844 rte_distributor_flush(db);
845 rte_distributor_clear_returns(db);
849 ds = rte_distributor_create("Test_dist_single",
851 rte_lcore_count() - 1,
852 RTE_DIST_ALG_SINGLE);
854 printf("Error creating single distributor\n");
858 rte_distributor_flush(ds);
859 rte_distributor_clear_returns(ds);
862 const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ?
863 (BIG_BATCH * 2) - 1 : (511 * rte_lcore_count());
865 p = rte_pktmbuf_pool_create("DT_MBUF_POOL", nb_bufs, BURST,
866 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
868 printf("Error creating mempool\n");
876 for (i = 0; i < 2; i++) {
878 worker_params.dist = dist[i];
880 strlcpy(worker_params.name, "burst",
881 sizeof(worker_params.name));
883 strlcpy(worker_params.name, "single",
884 sizeof(worker_params.name));
886 rte_eal_mp_remote_launch(handle_work,
887 &worker_params, SKIP_MAIN);
888 if (sanity_test(&worker_params, p) < 0)
890 quit_workers(&worker_params, p);
892 rte_eal_mp_remote_launch(handle_work_with_free_mbufs,
893 &worker_params, SKIP_MAIN);
894 if (sanity_test_with_mbuf_alloc(&worker_params, p) < 0)
896 quit_workers(&worker_params, p);
898 if (rte_lcore_count() > 2) {
899 rte_eal_mp_remote_launch(handle_work_for_shutdown_test,
902 if (sanity_test_with_worker_shutdown(&worker_params,
905 quit_workers(&worker_params, p);
907 rte_eal_mp_remote_launch(handle_work_for_shutdown_test,
910 if (test_flush_with_worker_shutdown(&worker_params,
913 quit_workers(&worker_params, p);
915 rte_eal_mp_remote_launch(handle_and_mark_work,
916 &worker_params, SKIP_MAIN);
917 if (sanity_mark_test(&worker_params, p) < 0)
919 quit_workers(&worker_params, p);
922 printf("Too few cores to run worker shutdown test\n");
927 if (test_error_distributor_create_numworkers() == -1 ||
928 test_error_distributor_create_name() == -1) {
929 printf("rte_distributor_create parameter check tests failed");
936 quit_workers(&worker_params, p);
940 REGISTER_TEST_COMMAND(distributor_autotest, test_distributor);