e0cb698e1c63b8a674ff6efd59cf664c2142ea02
[dpdk.git] / app / test / test_distributor.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2017 Intel Corporation
3  */
4
5 #include "test.h"
6
7 #include <unistd.h>
8 #include <string.h>
9 #include <rte_cycles.h>
10 #include <rte_errno.h>
11 #include <rte_mempool.h>
12 #include <rte_mbuf.h>
13 #include <rte_distributor.h>
14 #include <rte_string_fns.h>
15
16 #define ITER_POWER 20 /* log 2 of how many iterations we do when timing. */
17 #define BURST 32
18 #define BIG_BATCH 1024
19
20 struct worker_params {
21         char name[64];
22         struct rte_distributor *dist;
23 };
24
25 struct worker_params worker_params;
26
27 /* statics - all zero-initialized by default */
28 static volatile int quit;      /**< general quit variable for all threads */
29 static volatile int zero_quit; /**< var for when we just want thr0 to quit*/
30 static volatile int zero_sleep; /**< thr0 has quit basic loop and is sleeping*/
31 static volatile unsigned worker_idx;
32 static volatile unsigned zero_idx;
33
34 struct worker_stats {
35         volatile unsigned handled_packets;
36 } __rte_cache_aligned;
37 struct worker_stats worker_stats[RTE_MAX_LCORE];
38
39 /* returns the total count of the number of packets handled by the worker
40  * functions given below.
41  */
42 static inline unsigned
43 total_packet_count(void)
44 {
45         unsigned i, count = 0;
46         for (i = 0; i < worker_idx; i++)
47                 count += __atomic_load_n(&worker_stats[i].handled_packets,
48                                 __ATOMIC_RELAXED);
49         return count;
50 }
51
52 /* resets the packet counts for a new test */
53 static inline void
54 clear_packet_count(void)
55 {
56         unsigned int i;
57         for (i = 0; i < RTE_MAX_LCORE; i++)
58                 __atomic_store_n(&worker_stats[i].handled_packets, 0,
59                         __ATOMIC_RELAXED);
60 }
61
62 /* this is the basic worker function for sanity test
63  * it does nothing but return packets and count them.
64  */
65 static int
66 handle_work(void *arg)
67 {
68         struct rte_mbuf *buf[8] __rte_cache_aligned;
69         struct worker_params *wp = arg;
70         struct rte_distributor *db = wp->dist;
71         unsigned int num;
72         unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
73
74         num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
75         while (!quit) {
76                 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
77                                 __ATOMIC_RELAXED);
78                 num = rte_distributor_get_pkt(db, id,
79                                 buf, buf, num);
80         }
81         __atomic_fetch_add(&worker_stats[id].handled_packets, num,
82                         __ATOMIC_RELAXED);
83         rte_distributor_return_pkt(db, id, buf, num);
84         return 0;
85 }
86
87 /* do basic sanity testing of the distributor. This test tests the following:
88  * - send 32 packets through distributor with the same tag and ensure they
89  *   all go to the one worker
90  * - send 32 packets through the distributor with two different tags and
91  *   verify that they go equally to two different workers.
92  * - send 32 packets with different tags through the distributors and
93  *   just verify we get all packets back.
94  * - send 1024 packets through the distributor, gathering the returned packets
95  *   as we go. Then verify that we correctly got all 1024 pointers back again,
96  *   not necessarily in the same order (as different flows).
97  */
98 static int
99 sanity_test(struct worker_params *wp, struct rte_mempool *p)
100 {
101         struct rte_distributor *db = wp->dist;
102         struct rte_mbuf *bufs[BURST];
103         struct rte_mbuf *returns[BURST*2];
104         unsigned int i, count;
105         unsigned int retries;
106         unsigned int processed;
107
108         printf("=== Basic distributor sanity tests ===\n");
109         clear_packet_count();
110         if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
111                 printf("line %d: Error getting mbufs from pool\n", __LINE__);
112                 return -1;
113         }
114
115         /* now set all hash values in all buffers to zero, so all pkts go to the
116          * one worker thread */
117         for (i = 0; i < BURST; i++)
118                 bufs[i]->hash.usr = 0;
119
120         processed = 0;
121         while (processed < BURST)
122                 processed += rte_distributor_process(db, &bufs[processed],
123                         BURST - processed);
124
125         count = 0;
126         do {
127
128                 rte_distributor_flush(db);
129                 count += rte_distributor_returned_pkts(db,
130                                 returns, BURST*2);
131         } while (count < BURST);
132
133         if (total_packet_count() != BURST) {
134                 printf("Line %d: Error, not all packets flushed. "
135                                 "Expected %u, got %u\n",
136                                 __LINE__, BURST, total_packet_count());
137                 return -1;
138         }
139
140         for (i = 0; i < rte_lcore_count() - 1; i++)
141                 printf("Worker %u handled %u packets\n", i,
142                         __atomic_load_n(&worker_stats[i].handled_packets,
143                                         __ATOMIC_RELAXED));
144         printf("Sanity test with all zero hashes done.\n");
145
146         /* pick two flows and check they go correctly */
147         if (rte_lcore_count() >= 3) {
148                 clear_packet_count();
149                 for (i = 0; i < BURST; i++)
150                         bufs[i]->hash.usr = (i & 1) << 8;
151
152                 rte_distributor_process(db, bufs, BURST);
153                 count = 0;
154                 do {
155                         rte_distributor_flush(db);
156                         count += rte_distributor_returned_pkts(db,
157                                         returns, BURST*2);
158                 } while (count < BURST);
159                 if (total_packet_count() != BURST) {
160                         printf("Line %d: Error, not all packets flushed. "
161                                         "Expected %u, got %u\n",
162                                         __LINE__, BURST, total_packet_count());
163                         return -1;
164                 }
165
166                 for (i = 0; i < rte_lcore_count() - 1; i++)
167                         printf("Worker %u handled %u packets\n", i,
168                                 __atomic_load_n(
169                                         &worker_stats[i].handled_packets,
170                                         __ATOMIC_RELAXED));
171                 printf("Sanity test with two hash values done\n");
172         }
173
174         /* give a different hash value to each packet,
175          * so load gets distributed */
176         clear_packet_count();
177         for (i = 0; i < BURST; i++)
178                 bufs[i]->hash.usr = i+1;
179
180         rte_distributor_process(db, bufs, BURST);
181         count = 0;
182         do {
183                 rte_distributor_flush(db);
184                 count += rte_distributor_returned_pkts(db,
185                                 returns, BURST*2);
186         } while (count < BURST);
187         if (total_packet_count() != BURST) {
188                 printf("Line %d: Error, not all packets flushed. "
189                                 "Expected %u, got %u\n",
190                                 __LINE__, BURST, total_packet_count());
191                 return -1;
192         }
193
194         for (i = 0; i < rte_lcore_count() - 1; i++)
195                 printf("Worker %u handled %u packets\n", i,
196                         __atomic_load_n(&worker_stats[i].handled_packets,
197                                         __ATOMIC_RELAXED));
198         printf("Sanity test with non-zero hashes done\n");
199
200         rte_mempool_put_bulk(p, (void *)bufs, BURST);
201
202         /* sanity test with BIG_BATCH packets to ensure they all arrived back
203          * from the returned packets function */
204         clear_packet_count();
205         struct rte_mbuf *many_bufs[BIG_BATCH], *return_bufs[BIG_BATCH];
206         unsigned num_returned = 0;
207
208         /* flush out any remaining packets */
209         rte_distributor_flush(db);
210         rte_distributor_clear_returns(db);
211
212         if (rte_mempool_get_bulk(p, (void *)many_bufs, BIG_BATCH) != 0) {
213                 printf("line %d: Error getting mbufs from pool\n", __LINE__);
214                 return -1;
215         }
216         for (i = 0; i < BIG_BATCH; i++)
217                 many_bufs[i]->hash.usr = i << 2;
218
219         printf("=== testing big burst (%s) ===\n", wp->name);
220         for (i = 0; i < BIG_BATCH/BURST; i++) {
221                 rte_distributor_process(db,
222                                 &many_bufs[i*BURST], BURST);
223                 count = rte_distributor_returned_pkts(db,
224                                 &return_bufs[num_returned],
225                                 BIG_BATCH - num_returned);
226                 num_returned += count;
227         }
228         rte_distributor_flush(db);
229         count = rte_distributor_returned_pkts(db,
230                 &return_bufs[num_returned],
231                         BIG_BATCH - num_returned);
232         num_returned += count;
233         retries = 0;
234         do {
235                 rte_distributor_flush(db);
236                 count = rte_distributor_returned_pkts(db,
237                                 &return_bufs[num_returned],
238                                 BIG_BATCH - num_returned);
239                 num_returned += count;
240                 retries++;
241         } while ((num_returned < BIG_BATCH) && (retries < 100));
242
243         if (num_returned != BIG_BATCH) {
244                 printf("line %d: Missing packets, expected %d\n",
245                                 __LINE__, num_returned);
246                 return -1;
247         }
248
249         /* big check -  make sure all packets made it back!! */
250         for (i = 0; i < BIG_BATCH; i++) {
251                 unsigned j;
252                 struct rte_mbuf *src = many_bufs[i];
253                 for (j = 0; j < BIG_BATCH; j++) {
254                         if (return_bufs[j] == src)
255                                 break;
256                 }
257
258                 if (j == BIG_BATCH) {
259                         printf("Error: could not find source packet #%u\n", i);
260                         return -1;
261                 }
262         }
263         printf("Sanity test of returned packets done\n");
264
265         rte_mempool_put_bulk(p, (void *)many_bufs, BIG_BATCH);
266
267         printf("\n");
268         return 0;
269 }
270
271
272 /* to test that the distributor does not lose packets, we use this worker
273  * function which frees mbufs when it gets them. The distributor thread does
274  * the mbuf allocation. If distributor drops packets we'll eventually run out
275  * of mbufs.
276  */
277 static int
278 handle_work_with_free_mbufs(void *arg)
279 {
280         struct rte_mbuf *buf[8] __rte_cache_aligned;
281         struct worker_params *wp = arg;
282         struct rte_distributor *d = wp->dist;
283         unsigned int i;
284         unsigned int num;
285         unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
286
287         num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
288         while (!quit) {
289                 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
290                                 __ATOMIC_RELAXED);
291                 for (i = 0; i < num; i++)
292                         rte_pktmbuf_free(buf[i]);
293                 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
294         }
295         __atomic_fetch_add(&worker_stats[id].handled_packets, num,
296                         __ATOMIC_RELAXED);
297         rte_distributor_return_pkt(d, id, buf, num);
298         return 0;
299 }
300
301 /* Perform a sanity test of the distributor with a large number of packets,
302  * where we allocate a new set of mbufs for each burst. The workers then
303  * free the mbufs. This ensures that we don't have any packet leaks in the
304  * library.
305  */
306 static int
307 sanity_test_with_mbuf_alloc(struct worker_params *wp, struct rte_mempool *p)
308 {
309         struct rte_distributor *d = wp->dist;
310         unsigned i;
311         struct rte_mbuf *bufs[BURST];
312         unsigned int processed;
313
314         printf("=== Sanity test with mbuf alloc/free (%s) ===\n", wp->name);
315
316         clear_packet_count();
317         for (i = 0; i < ((1<<ITER_POWER)); i += BURST) {
318                 unsigned j;
319                 while (rte_mempool_get_bulk(p, (void *)bufs, BURST) < 0)
320                         rte_distributor_process(d, NULL, 0);
321                 for (j = 0; j < BURST; j++) {
322                         bufs[j]->hash.usr = (i+j) << 1;
323                 }
324
325                 processed = 0;
326                 while (processed < BURST)
327                         processed += rte_distributor_process(d,
328                                 &bufs[processed], BURST - processed);
329         }
330
331         rte_distributor_flush(d);
332
333         rte_delay_us(10000);
334
335         if (total_packet_count() < (1<<ITER_POWER)) {
336                 printf("Line %u: Packet count is incorrect, %u, expected %u\n",
337                                 __LINE__, total_packet_count(),
338                                 (1<<ITER_POWER));
339                 return -1;
340         }
341
342         printf("Sanity test with mbuf alloc/free passed\n\n");
343         return 0;
344 }
345
346 static int
347 handle_work_for_shutdown_test(void *arg)
348 {
349         struct rte_mbuf *buf[8] __rte_cache_aligned;
350         struct worker_params *wp = arg;
351         struct rte_distributor *d = wp->dist;
352         unsigned int num;
353         unsigned int zero_id = 0;
354         unsigned int zero_unset;
355         const unsigned int id = __atomic_fetch_add(&worker_idx, 1,
356                         __ATOMIC_RELAXED);
357
358         num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
359
360         if (num > 0) {
361                 zero_unset = RTE_MAX_LCORE;
362                 __atomic_compare_exchange_n(&zero_idx, &zero_unset, id,
363                         false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE);
364         }
365         zero_id = __atomic_load_n(&zero_idx, __ATOMIC_ACQUIRE);
366
367         /* wait for quit single globally, or for worker zero, wait
368          * for zero_quit */
369         while (!quit && !(id == zero_id && zero_quit)) {
370                 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
371                                 __ATOMIC_RELAXED);
372                 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
373
374                 if (num > 0) {
375                         zero_unset = RTE_MAX_LCORE;
376                         __atomic_compare_exchange_n(&zero_idx, &zero_unset, id,
377                                 false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE);
378                 }
379                 zero_id = __atomic_load_n(&zero_idx, __ATOMIC_ACQUIRE);
380         }
381
382         __atomic_fetch_add(&worker_stats[id].handled_packets, num,
383                         __ATOMIC_RELAXED);
384         if (id == zero_id) {
385                 rte_distributor_return_pkt(d, id, NULL, 0);
386
387                 /* for worker zero, allow it to restart to pick up last packet
388                  * when all workers are shutting down.
389                  */
390                 __atomic_store_n(&zero_sleep, 1, __ATOMIC_RELEASE);
391                 while (zero_quit)
392                         usleep(100);
393                 __atomic_store_n(&zero_sleep, 0, __ATOMIC_RELEASE);
394
395                 num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
396
397                 while (!quit) {
398                         __atomic_fetch_add(&worker_stats[id].handled_packets,
399                                         num, __ATOMIC_RELAXED);
400                         num = rte_distributor_get_pkt(d, id, buf, NULL, 0);
401                 }
402         }
403         rte_distributor_return_pkt(d, id, buf, num);
404         return 0;
405 }
406
407
408 /* Perform a sanity test of the distributor with a large number of packets,
409  * where we allocate a new set of mbufs for each burst. The workers then
410  * free the mbufs. This ensures that we don't have any packet leaks in the
411  * library.
412  */
413 static int
414 sanity_test_with_worker_shutdown(struct worker_params *wp,
415                 struct rte_mempool *p)
416 {
417         struct rte_distributor *d = wp->dist;
418         struct rte_mbuf *bufs[BURST];
419         struct rte_mbuf *bufs2[BURST];
420         unsigned int i;
421         unsigned int failed = 0;
422         unsigned int processed = 0;
423
424         printf("=== Sanity test of worker shutdown ===\n");
425
426         clear_packet_count();
427
428         if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
429                 printf("line %d: Error getting mbufs from pool\n", __LINE__);
430                 return -1;
431         }
432
433         /*
434          * Now set all hash values in all buffers to same value so all
435          * pkts go to the one worker thread
436          */
437         for (i = 0; i < BURST; i++)
438                 bufs[i]->hash.usr = 1;
439
440         processed = 0;
441         while (processed < BURST)
442                 processed += rte_distributor_process(d, &bufs[processed],
443                         BURST - processed);
444         rte_distributor_flush(d);
445
446         /* at this point, we will have processed some packets and have a full
447          * backlog for the other ones at worker 0.
448          */
449
450         /* get more buffers to queue up, again setting them to the same flow */
451         if (rte_mempool_get_bulk(p, (void *)bufs2, BURST) != 0) {
452                 printf("line %d: Error getting mbufs from pool\n", __LINE__);
453                 rte_mempool_put_bulk(p, (void *)bufs, BURST);
454                 return -1;
455         }
456         for (i = 0; i < BURST; i++)
457                 bufs2[i]->hash.usr = 1;
458
459         /* get worker zero to quit */
460         zero_quit = 1;
461         rte_distributor_process(d, bufs2, BURST);
462
463         /* flush the distributor */
464         rte_distributor_flush(d);
465         while (!__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE))
466                 rte_distributor_flush(d);
467
468         zero_quit = 0;
469         while (__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE))
470                 rte_delay_us(100);
471
472         for (i = 0; i < rte_lcore_count() - 1; i++)
473                 printf("Worker %u handled %u packets\n", i,
474                         __atomic_load_n(&worker_stats[i].handled_packets,
475                                         __ATOMIC_RELAXED));
476
477         if (total_packet_count() != BURST * 2) {
478                 printf("Line %d: Error, not all packets flushed. "
479                                 "Expected %u, got %u\n",
480                                 __LINE__, BURST * 2, total_packet_count());
481                 failed = 1;
482         }
483
484         rte_mempool_put_bulk(p, (void *)bufs, BURST);
485         rte_mempool_put_bulk(p, (void *)bufs2, BURST);
486
487         if (failed)
488                 return -1;
489
490         printf("Sanity test with worker shutdown passed\n\n");
491         return 0;
492 }
493
494 /* Test that the flush function is able to move packets between workers when
495  * one worker shuts down..
496  */
497 static int
498 test_flush_with_worker_shutdown(struct worker_params *wp,
499                 struct rte_mempool *p)
500 {
501         struct rte_distributor *d = wp->dist;
502         struct rte_mbuf *bufs[BURST];
503         unsigned int i;
504         unsigned int failed = 0;
505         unsigned int processed;
506
507         printf("=== Test flush fn with worker shutdown (%s) ===\n", wp->name);
508
509         clear_packet_count();
510         if (rte_mempool_get_bulk(p, (void *)bufs, BURST) != 0) {
511                 printf("line %d: Error getting mbufs from pool\n", __LINE__);
512                 return -1;
513         }
514
515         /* now set all hash values in all buffers to zero, so all pkts go to the
516          * one worker thread */
517         for (i = 0; i < BURST; i++)
518                 bufs[i]->hash.usr = 0;
519
520         processed = 0;
521         while (processed < BURST)
522                 processed += rte_distributor_process(d, &bufs[processed],
523                         BURST - processed);
524         /* at this point, we will have processed some packets and have a full
525          * backlog for the other ones at worker 0.
526          */
527
528         /* get worker zero to quit */
529         zero_quit = 1;
530
531         /* flush the distributor */
532         rte_distributor_flush(d);
533
534         while (!__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE))
535                 rte_distributor_flush(d);
536
537         zero_quit = 0;
538
539         while (__atomic_load_n(&zero_sleep, __ATOMIC_ACQUIRE))
540                 rte_delay_us(100);
541
542         for (i = 0; i < rte_lcore_count() - 1; i++)
543                 printf("Worker %u handled %u packets\n", i,
544                         __atomic_load_n(&worker_stats[i].handled_packets,
545                                         __ATOMIC_RELAXED));
546
547         if (total_packet_count() != BURST) {
548                 printf("Line %d: Error, not all packets flushed. "
549                                 "Expected %u, got %u\n",
550                                 __LINE__, BURST, total_packet_count());
551                 failed = 1;
552         }
553
554         rte_mempool_put_bulk(p, (void *)bufs, BURST);
555
556         if (failed)
557                 return -1;
558
559         printf("Flush test with worker shutdown passed\n\n");
560         return 0;
561 }
562
563 static int
564 handle_and_mark_work(void *arg)
565 {
566         struct rte_mbuf *buf[8] __rte_cache_aligned;
567         struct worker_params *wp = arg;
568         struct rte_distributor *db = wp->dist;
569         unsigned int num, i;
570         unsigned int id = __atomic_fetch_add(&worker_idx, 1, __ATOMIC_RELAXED);
571         num = rte_distributor_get_pkt(db, id, buf, NULL, 0);
572         while (!quit) {
573                 __atomic_fetch_add(&worker_stats[id].handled_packets, num,
574                                 __ATOMIC_RELAXED);
575                 for (i = 0; i < num; i++)
576                         buf[i]->udata64 += id + 1;
577                 num = rte_distributor_get_pkt(db, id,
578                                 buf, buf, num);
579         }
580         __atomic_fetch_add(&worker_stats[id].handled_packets, num,
581                         __ATOMIC_RELAXED);
582         rte_distributor_return_pkt(db, id, buf, num);
583         return 0;
584 }
585
586 /* sanity_mark_test sends packets to workers which mark them.
587  * Every packet has also encoded sequence number.
588  * The returned packets are sorted and verified if they were handled
589  * by proper workers.
590  */
591 static int
592 sanity_mark_test(struct worker_params *wp, struct rte_mempool *p)
593 {
594         const unsigned int buf_count = 24;
595         const unsigned int burst = 8;
596         const unsigned int shift = 12;
597         const unsigned int seq_shift = 10;
598
599         struct rte_distributor *db = wp->dist;
600         struct rte_mbuf *bufs[buf_count];
601         struct rte_mbuf *returns[buf_count];
602         unsigned int i, count, id;
603         unsigned int sorted[buf_count], seq;
604         unsigned int failed = 0;
605         unsigned int processed;
606
607         printf("=== Marked packets test ===\n");
608         clear_packet_count();
609         if (rte_mempool_get_bulk(p, (void *)bufs, buf_count) != 0) {
610                 printf("line %d: Error getting mbufs from pool\n", __LINE__);
611                 return -1;
612         }
613
614         /* bufs' hashes will be like these below, but shifted left.
615          * The shifting is for avoiding collisions with backlogs
616          * and in-flight tags left by previous tests.
617          * [1, 1, 1, 1, 1, 1, 1, 1
618          *  1, 1, 1, 1, 2, 2, 2, 2
619          *  2, 2, 2, 2, 1, 1, 1, 1]
620          */
621         for (i = 0; i < burst; i++) {
622                 bufs[0 * burst + i]->hash.usr = 1 << shift;
623                 bufs[1 * burst + i]->hash.usr = ((i < burst / 2) ? 1 : 2)
624                         << shift;
625                 bufs[2 * burst + i]->hash.usr = ((i < burst / 2) ? 2 : 1)
626                         << shift;
627         }
628         /* Assign a sequence number to each packet. The sequence is shifted,
629          * so that lower bits of the udate64 will hold mark from worker.
630          */
631         for (i = 0; i < buf_count; i++)
632                 bufs[i]->udata64 = i << seq_shift;
633
634         count = 0;
635         for (i = 0; i < buf_count/burst; i++) {
636                 processed = 0;
637                 while (processed < burst)
638                         processed += rte_distributor_process(db,
639                                 &bufs[i * burst + processed],
640                                 burst - processed);
641                 count += rte_distributor_returned_pkts(db, &returns[count],
642                         buf_count - count);
643         }
644
645         do {
646                 rte_distributor_flush(db);
647                 count += rte_distributor_returned_pkts(db, &returns[count],
648                         buf_count - count);
649         } while (count < buf_count);
650
651         for (i = 0; i < rte_lcore_count() - 1; i++)
652                 printf("Worker %u handled %u packets\n", i,
653                         __atomic_load_n(&worker_stats[i].handled_packets,
654                                         __ATOMIC_RELAXED));
655
656         /* Sort returned packets by sent order (sequence numbers). */
657         for (i = 0; i < buf_count; i++) {
658                 seq = returns[i]->udata64 >> seq_shift;
659                 id = returns[i]->udata64 - (seq << seq_shift);
660                 sorted[seq] = id;
661         }
662
663         /* Verify that packets [0-11] and [20-23] were processed
664          * by the same worker
665          */
666         for (i = 1; i < 12; i++) {
667                 if (sorted[i] != sorted[0]) {
668                         printf("Packet number %u processed by worker %u,"
669                                 " but should be processes by worker %u\n",
670                                 i, sorted[i], sorted[0]);
671                         failed = 1;
672                 }
673         }
674         for (i = 20; i < 24; i++) {
675                 if (sorted[i] != sorted[0]) {
676                         printf("Packet number %u processed by worker %u,"
677                                 " but should be processes by worker %u\n",
678                                 i, sorted[i], sorted[0]);
679                         failed = 1;
680                 }
681         }
682         /* And verify that packets [12-19] were processed
683          * by the another worker
684          */
685         for (i = 13; i < 20; i++) {
686                 if (sorted[i] != sorted[12]) {
687                         printf("Packet number %u processed by worker %u,"
688                                 " but should be processes by worker %u\n",
689                                 i, sorted[i], sorted[12]);
690                         failed = 1;
691                 }
692         }
693
694         rte_mempool_put_bulk(p, (void *)bufs, buf_count);
695
696         if (failed)
697                 return -1;
698
699         printf("Marked packets test passed\n");
700         return 0;
701 }
702
703 static
704 int test_error_distributor_create_name(void)
705 {
706         struct rte_distributor *d = NULL;
707         struct rte_distributor *db = NULL;
708         char *name = NULL;
709
710         d = rte_distributor_create(name, rte_socket_id(),
711                         rte_lcore_count() - 1,
712                         RTE_DIST_ALG_SINGLE);
713         if (d != NULL || rte_errno != EINVAL) {
714                 printf("ERROR: No error on create() with NULL name param\n");
715                 return -1;
716         }
717
718         db = rte_distributor_create(name, rte_socket_id(),
719                         rte_lcore_count() - 1,
720                         RTE_DIST_ALG_BURST);
721         if (db != NULL || rte_errno != EINVAL) {
722                 printf("ERROR: No error on create() with NULL param\n");
723                 return -1;
724         }
725
726         return 0;
727 }
728
729
730 static
731 int test_error_distributor_create_numworkers(void)
732 {
733         struct rte_distributor *ds = NULL;
734         struct rte_distributor *db = NULL;
735
736         ds = rte_distributor_create("test_numworkers", rte_socket_id(),
737                         RTE_MAX_LCORE + 10,
738                         RTE_DIST_ALG_SINGLE);
739         if (ds != NULL || rte_errno != EINVAL) {
740                 printf("ERROR: No error on create() with num_workers > MAX\n");
741                 return -1;
742         }
743
744         db = rte_distributor_create("test_numworkers", rte_socket_id(),
745                         RTE_MAX_LCORE + 10,
746                         RTE_DIST_ALG_BURST);
747         if (db != NULL || rte_errno != EINVAL) {
748                 printf("ERROR: No error on create() num_workers > MAX\n");
749                 return -1;
750         }
751
752         return 0;
753 }
754
755
756 /* Useful function which ensures that all worker functions terminate */
757 static void
758 quit_workers(struct worker_params *wp, struct rte_mempool *p)
759 {
760         struct rte_distributor *d = wp->dist;
761         const unsigned num_workers = rte_lcore_count() - 1;
762         unsigned i;
763         struct rte_mbuf *bufs[RTE_MAX_LCORE];
764         struct rte_mbuf *returns[RTE_MAX_LCORE];
765         if (rte_mempool_get_bulk(p, (void *)bufs, num_workers) != 0) {
766                 printf("line %d: Error getting mbufs from pool\n", __LINE__);
767                 return;
768         }
769
770         zero_quit = 0;
771         quit = 1;
772         for (i = 0; i < num_workers; i++) {
773                 bufs[i]->hash.usr = i << 1;
774                 rte_distributor_process(d, &bufs[i], 1);
775         }
776
777         rte_distributor_process(d, NULL, 0);
778         rte_distributor_flush(d);
779         rte_eal_mp_wait_lcore();
780
781         while (rte_distributor_returned_pkts(d, returns, RTE_MAX_LCORE))
782                 ;
783
784         rte_distributor_clear_returns(d);
785         rte_mempool_put_bulk(p, (void *)bufs, num_workers);
786
787         quit = 0;
788         worker_idx = 0;
789         zero_idx = RTE_MAX_LCORE;
790         zero_quit = 0;
791         zero_sleep = 0;
792 }
793
794 static int
795 test_distributor(void)
796 {
797         static struct rte_distributor *ds;
798         static struct rte_distributor *db;
799         static struct rte_distributor *dist[2];
800         static struct rte_mempool *p;
801         int i;
802
803         if (rte_lcore_count() < 2) {
804                 printf("Not enough cores for distributor_autotest, expecting at least 2\n");
805                 return TEST_SKIPPED;
806         }
807
808         if (db == NULL) {
809                 db = rte_distributor_create("Test_dist_burst", rte_socket_id(),
810                                 rte_lcore_count() - 1,
811                                 RTE_DIST_ALG_BURST);
812                 if (db == NULL) {
813                         printf("Error creating burst distributor\n");
814                         return -1;
815                 }
816         } else {
817                 rte_distributor_flush(db);
818                 rte_distributor_clear_returns(db);
819         }
820
821         if (ds == NULL) {
822                 ds = rte_distributor_create("Test_dist_single",
823                                 rte_socket_id(),
824                                 rte_lcore_count() - 1,
825                         RTE_DIST_ALG_SINGLE);
826                 if (ds == NULL) {
827                         printf("Error creating single distributor\n");
828                         return -1;
829                 }
830         } else {
831                 rte_distributor_flush(ds);
832                 rte_distributor_clear_returns(ds);
833         }
834
835         const unsigned nb_bufs = (511 * rte_lcore_count()) < BIG_BATCH ?
836                         (BIG_BATCH * 2) - 1 : (511 * rte_lcore_count());
837         if (p == NULL) {
838                 p = rte_pktmbuf_pool_create("DT_MBUF_POOL", nb_bufs, BURST,
839                         0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
840                 if (p == NULL) {
841                         printf("Error creating mempool\n");
842                         return -1;
843                 }
844         }
845
846         dist[0] = ds;
847         dist[1] = db;
848
849         for (i = 0; i < 2; i++) {
850
851                 worker_params.dist = dist[i];
852                 if (i)
853                         strlcpy(worker_params.name, "burst",
854                                         sizeof(worker_params.name));
855                 else
856                         strlcpy(worker_params.name, "single",
857                                         sizeof(worker_params.name));
858
859                 rte_eal_mp_remote_launch(handle_work,
860                                 &worker_params, SKIP_MASTER);
861                 if (sanity_test(&worker_params, p) < 0)
862                         goto err;
863                 quit_workers(&worker_params, p);
864
865                 rte_eal_mp_remote_launch(handle_work_with_free_mbufs,
866                                 &worker_params, SKIP_MASTER);
867                 if (sanity_test_with_mbuf_alloc(&worker_params, p) < 0)
868                         goto err;
869                 quit_workers(&worker_params, p);
870
871                 if (rte_lcore_count() > 2) {
872                         rte_eal_mp_remote_launch(handle_work_for_shutdown_test,
873                                         &worker_params,
874                                         SKIP_MASTER);
875                         if (sanity_test_with_worker_shutdown(&worker_params,
876                                         p) < 0)
877                                 goto err;
878                         quit_workers(&worker_params, p);
879
880                         rte_eal_mp_remote_launch(handle_work_for_shutdown_test,
881                                         &worker_params,
882                                         SKIP_MASTER);
883                         if (test_flush_with_worker_shutdown(&worker_params,
884                                         p) < 0)
885                                 goto err;
886                         quit_workers(&worker_params, p);
887
888                         rte_eal_mp_remote_launch(handle_and_mark_work,
889                                         &worker_params, SKIP_MASTER);
890                         if (sanity_mark_test(&worker_params, p) < 0)
891                                 goto err;
892                         quit_workers(&worker_params, p);
893
894                 } else {
895                         printf("Too few cores to run worker shutdown test\n");
896                 }
897
898         }
899
900         if (test_error_distributor_create_numworkers() == -1 ||
901                         test_error_distributor_create_name() == -1) {
902                 printf("rte_distributor_create parameter check tests failed");
903                 return -1;
904         }
905
906         return 0;
907
908 err:
909         quit_workers(&worker_params, p);
910         return -1;
911 }
912
913 REGISTER_TEST_COMMAND(distributor_autotest, test_distributor);