lib: add rte_hexdump and remove duplicated code
[dpdk.git] / app / test / test_ring.c
1 /*-
2  *   BSD LICENSE
3  * 
4  *   Copyright(c) 2010-2013 Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  * 
7  *   Redistribution and use in source and binary forms, with or without 
8  *   modification, are permitted provided that the following conditions 
9  *   are met:
10  * 
11  *     * Redistributions of source code must retain the above copyright 
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright 
14  *       notice, this list of conditions and the following disclaimer in 
15  *       the documentation and/or other materials provided with the 
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its 
18  *       contributors may be used to endorse or promote products derived 
19  *       from this software without specific prior written permission.
20  * 
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  * 
33  */
34
35 #include <string.h>
36 #include <stdarg.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <stdint.h>
40 #include <inttypes.h>
41 #include <errno.h>
42 #include <sys/queue.h>
43
44 #include <rte_common.h>
45 #include <rte_log.h>
46 #include <rte_memory.h>
47 #include <rte_memzone.h>
48 #include <rte_launch.h>
49 #include <rte_cycles.h>
50 #include <rte_tailq.h>
51 #include <rte_eal.h>
52 #include <rte_per_lcore.h>
53 #include <rte_lcore.h>
54 #include <rte_atomic.h>
55 #include <rte_branch_prediction.h>
56 #include <rte_malloc.h>
57 #include <rte_ring.h>
58 #include <rte_random.h>
59 #include <rte_common.h>
60 #include <rte_errno.h>
61 #include <rte_hexdump.h>
62
63 #include <cmdline_parse.h>
64
65 #include "test.h"
66
67 /*
68  * Ring
69  * ====
70  *
71  * #. Basic tests: done on one core:
72  *
73  *    - Using single producer/single consumer functions:
74  *
75  *      - Enqueue one object, two objects, MAX_BULK objects
76  *      - Dequeue one object, two objects, MAX_BULK objects
77  *      - Check that dequeued pointers are correct
78  *
79  *    - Using multi producers/multi consumers functions:
80  *
81  *      - Enqueue one object, two objects, MAX_BULK objects
82  *      - Dequeue one object, two objects, MAX_BULK objects
83  *      - Check that dequeued pointers are correct
84  *
85  *    - Test watermark and default bulk enqueue/dequeue:
86  *
87  *      - Set watermark
88  *      - Set default bulk value
89  *      - Enqueue objects, check that -EDQUOT is returned when
90  *        watermark is exceeded
91  *      - Check that dequeued pointers are correct
92  *
93  * #. Check live watermark change
94  *
95  *    - Start a loop on another lcore that will enqueue and dequeue
96  *      objects in a ring. It will monitor the value of watermark.
97  *    - At the same time, change the watermark on the master lcore.
98  *    - The slave lcore will check that watermark changes from 16 to 32.
99  *
100  * #. Performance tests.
101  *
102  *    This test is done on the following configurations:
103  *
104  *    - One core enqueuing, one core dequeuing
105  *    - One core enqueuing, other cores dequeuing
106  *    - One core dequeuing, other cores enqueuing
107  *    - Half of the cores enqueuing, the other half dequeuing
108  *
109  *    When only one core enqueues/dequeues, the test is done with the
110  *    SP/SC functions in addition to the MP/MC functions.
111  *
112  *    The test is done with different bulk size.
113  *
114  *    On each core, the test enqueues or dequeues objects during
115  *    TIME_S seconds. The number of successes and failures are stored on
116  *    each core, then summed and displayed.
117  *
118  *    The test checks that the number of enqueues is equal to the
119  *    number of dequeues.
120  */
121
122 #define RING_SIZE 4096
123 #define MAX_BULK 32
124 #define N 65536
125 #define TIME_S 5
126
127 static rte_atomic32_t synchro;
128
129 static struct rte_ring *r;
130
131 struct test_stats {
132         unsigned enq_success ;
133         unsigned enq_quota;
134         unsigned enq_fail;
135
136         unsigned deq_success;
137         unsigned deq_fail;
138 } __rte_cache_aligned;
139
140 static struct test_stats test_stats[RTE_MAX_LCORE];
141
142 static int
143 ring_enqueue_test(int (que_func)(struct rte_ring*, void * const *, unsigned),
144                 void* arg, unsigned bulk_or_burst)
145 {
146         unsigned success = 0;
147         unsigned quota = 0;
148         unsigned fail = 0;
149         unsigned i;
150         unsigned long dummy_obj;
151         void *obj_table[MAX_BULK];
152         int ret;
153         unsigned lcore_id = rte_lcore_id();
154         unsigned count = *((unsigned*)arg);
155         uint64_t start_cycles, end_cycles;
156         uint64_t time_diff = 0, hz = rte_get_hpet_hz();
157
158         /* init dummy object table */
159         for (i = 0; i< MAX_BULK; i++) {
160                 dummy_obj = lcore_id + 0x1000 + i;
161                 obj_table[i] = (void *)dummy_obj;
162         }
163
164         /* wait synchro for slaves */
165         if (lcore_id != rte_get_master_lcore())
166                 while (rte_atomic32_read(&synchro) == 0);
167
168         start_cycles = rte_get_hpet_cycles();
169
170         /* enqueue as many object as possible */
171         while (time_diff/hz < TIME_S) {
172                 for (i = 0; likely(i < N); i++) {
173                         ret = que_func(r, obj_table, count);
174                         /*
175                          * bulk_or_burst
176                          *       1: for bulk operation
177                          *   0: for burst operation
178                          */
179                         if (bulk_or_burst) {
180                                 /* The *count* objects enqueued, unless fail */
181                                 if (ret == 0)
182                                         success += count;
183                                 else if (ret == -EDQUOT)
184                                         quota += count;
185                                 else
186                                         fail++;
187                         } else {
188                                 /* The actual objects enqueued */
189                                 if (ret != 0)
190                                         success += (ret & RTE_RING_SZ_MASK);
191                                 else
192                                         fail++;
193                         }
194                 }
195                 end_cycles = rte_get_hpet_cycles();
196                 time_diff = end_cycles - start_cycles;
197         }
198
199         /* write statistics in a shared structure */
200         test_stats[lcore_id].enq_success = success;
201         test_stats[lcore_id].enq_quota = quota;
202         test_stats[lcore_id].enq_fail = fail;
203
204         return 0;
205 }
206
207 static int
208 ring_dequeue_test(int (que_func)(struct rte_ring*, void **, unsigned),
209                 void* arg, unsigned bulk_or_burst)
210 {
211         unsigned success = 0;
212         unsigned fail = 0;
213         unsigned i;
214         void *obj_table[MAX_BULK];
215         int ret;
216         unsigned lcore_id = rte_lcore_id();
217         unsigned count = *((unsigned*)arg);
218         uint64_t start_cycles, end_cycles;
219         uint64_t time_diff = 0, hz = rte_get_hpet_hz();
220
221         /* wait synchro for slaves */
222         if (lcore_id != rte_get_master_lcore())
223                 while (rte_atomic32_read(&synchro) == 0);
224
225         start_cycles = rte_get_hpet_cycles();
226
227         /* dequeue as many object as possible */
228         while (time_diff/hz < TIME_S) {
229                 for (i = 0; likely(i < N); i++) {
230                         ret = que_func(r, obj_table, count);
231                         /*
232                          * bulk_or_burst
233                          *       1: for bulk operation
234                          *   0: for burst operation
235                          */
236                         if (bulk_or_burst) {
237                                 if (ret == 0)
238                                         success += count;
239                                 else
240                                         fail++;
241                         } else {
242                                 if (ret != 0)
243                                         success += ret;
244                                 else
245                                         fail++;
246                         }
247                 }
248                 end_cycles = rte_get_hpet_cycles();
249                 time_diff = end_cycles - start_cycles;
250         }
251
252         /* write statistics in a shared structure */
253         test_stats[lcore_id].deq_success = success;
254         test_stats[lcore_id].deq_fail = fail;
255
256         return 0;
257 }
258
259 static int
260 test_ring_per_core_sp_enqueue(void *arg)
261 {
262         return ring_enqueue_test(&rte_ring_sp_enqueue_bulk, arg, 1);
263 }
264
265 static int
266 test_ring_per_core_mp_enqueue(void *arg)
267 {
268         return ring_enqueue_test(&rte_ring_mp_enqueue_bulk, arg, 1);
269 }
270
271 static int
272 test_ring_per_core_mc_dequeue(void *arg)
273 {
274         return ring_dequeue_test(&rte_ring_mc_dequeue_bulk, arg, 1);
275 }
276
277 static int
278 test_ring_per_core_sc_dequeue(void *arg)
279 {
280         return ring_dequeue_test(&rte_ring_sc_dequeue_bulk, arg, 1);
281 }
282
283 static int
284 test_ring_per_core_sp_enqueue_burst(void *arg)
285 {
286         return ring_enqueue_test(&rte_ring_sp_enqueue_burst, arg, 0);
287 }
288
289 static int
290 test_ring_per_core_mp_enqueue_burst(void *arg)
291 {
292         return ring_enqueue_test(&rte_ring_mp_enqueue_burst, arg, 0);
293 }
294
295 static int
296 test_ring_per_core_mc_dequeue_burst(void *arg)
297 {
298         return ring_dequeue_test(&rte_ring_mc_dequeue_burst, arg, 0);
299 }
300
301 static int
302 test_ring_per_core_sc_dequeue_burst(void *arg)
303 {
304         return ring_dequeue_test(&rte_ring_sc_dequeue_burst, arg, 0);
305 }
306
307 #define TEST_RING_VERIFY(exp)                                           \
308         if (!(exp)) {                                                   \
309                 printf("error at %s:%d\tcondition " #exp " failed\n",   \
310                     __func__, __LINE__);                                \
311                 rte_ring_dump(r);                                       \
312                 return (-1);                                            \
313         }
314
315 #define TEST_RING_FULL_EMTPY_ITER       8
316
317
318 static int
319 launch_cores(unsigned enq_core_count, unsigned deq_core_count,
320                 unsigned n_enq_bulk, unsigned n_deq_bulk,
321                 int sp, int sc, int bulk_not_burst)
322 {
323         void *obj;
324         unsigned lcore_id;
325         unsigned rate, deq_remain = 0;
326         unsigned enq_total, deq_total;
327         struct test_stats sum;
328         int (*enq_f)(void *);
329         int (*deq_f)(void *);
330         unsigned cores = enq_core_count + deq_core_count;
331         int ret;
332
333         rte_atomic32_set(&synchro, 0);
334
335         printf("ring_autotest e/d_core=%u,%u e/d_bulk=%u,%u ",
336                enq_core_count, deq_core_count, n_enq_bulk, n_deq_bulk);
337         printf("sp=%d sc=%d ", sp, sc);
338
339         if (bulk_not_burst) {
340                 /* set enqueue function to be used */
341                 if (sp)
342                         enq_f = test_ring_per_core_sp_enqueue;
343                 else
344                         enq_f = test_ring_per_core_mp_enqueue;
345
346                 /* set dequeue function to be used */
347                 if (sc)
348                         deq_f = test_ring_per_core_sc_dequeue;
349                 else
350                         deq_f = test_ring_per_core_mc_dequeue;
351
352         } else {
353                 /* set enqueue function to be used */
354                 if (sp)
355                         enq_f = test_ring_per_core_sp_enqueue_burst;
356                 else
357                         enq_f = test_ring_per_core_mp_enqueue_burst;
358
359                 /* set dequeue function to be used */
360                 if (sc)
361                         deq_f = test_ring_per_core_sc_dequeue_burst;
362                 else
363                         deq_f = test_ring_per_core_mc_dequeue_burst;
364         }
365
366         RTE_LCORE_FOREACH_SLAVE(lcore_id) {
367                 if (enq_core_count != 0) {
368                         enq_core_count--;
369                         rte_eal_remote_launch(enq_f, &n_enq_bulk, lcore_id);
370                 }
371                 if (deq_core_count != 1) {
372                         deq_core_count--;
373                         rte_eal_remote_launch(deq_f, &n_deq_bulk, lcore_id);
374                 }
375         }
376
377         memset(test_stats, 0, sizeof(test_stats));
378
379         /* start synchro and launch test on master */
380         rte_atomic32_set(&synchro, 1);
381         ret = deq_f(&n_deq_bulk);
382
383         /* wait all cores */
384         RTE_LCORE_FOREACH_SLAVE(lcore_id) {
385                 if (cores == 1)
386                         break;
387                 cores--;
388                 if (rte_eal_wait_lcore(lcore_id) < 0)
389                         ret = -1;
390         }
391
392         memset(&sum, 0, sizeof(sum));
393         for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
394                 sum.enq_success += test_stats[lcore_id].enq_success;
395                 sum.enq_quota += test_stats[lcore_id].enq_quota;
396                 sum.enq_fail += test_stats[lcore_id].enq_fail;
397                 sum.deq_success += test_stats[lcore_id].deq_success;
398                 sum.deq_fail += test_stats[lcore_id].deq_fail;
399         }
400
401         /* empty the ring */
402         while (rte_ring_sc_dequeue(r, &obj) == 0)
403                 deq_remain += 1;
404
405         if (ret < 0) {
406                 printf("per-lcore test returned -1\n");
407                 return -1;
408         }
409
410         enq_total = sum.enq_success + sum.enq_quota;
411         deq_total = sum.deq_success + deq_remain;
412
413         rate = deq_total/TIME_S;
414
415         printf("rate_persec=%u\n", rate);
416
417         if (enq_total != deq_total) {
418                 printf("invalid enq/deq_success counter: %u %u\n",
419                        enq_total, deq_total);
420                 return -1;
421         }
422
423         return 0;
424 }
425
426 static int
427 do_one_ring_test2(unsigned enq_core_count, unsigned deq_core_count,
428                   unsigned n_enq_bulk, unsigned n_deq_bulk, unsigned bulk_or_burst)
429 {
430         int sp, sc;
431         int do_sp, do_sc;
432         int ret;
433
434         do_sp = (enq_core_count == 1) ? 1 : 0;
435         do_sc = (deq_core_count  == 1) ? 1 : 0;
436
437         for (sp = 0; sp <= do_sp; sp ++) {
438                 for (sc = 0; sc <= do_sc; sc ++) {
439                         ret = launch_cores(enq_core_count, deq_core_count,
440                                         n_enq_bulk, n_deq_bulk, sp, sc, bulk_or_burst);
441                         if (ret < 0)
442                                 return -1;
443                 }
444         }
445         return 0;
446 }
447
448 static int
449 do_one_ring_test(unsigned enq_core_count, unsigned deq_core_count,
450                 unsigned bulk_or_burst)
451 {
452         unsigned bulk_enqueue_tab[] = { 1, 2, 4, 32, 0 };
453         unsigned bulk_dequeue_tab[] = { 1, 2, 4, 32, 0 };
454         unsigned *bulk_enqueue_ptr;
455         unsigned *bulk_dequeue_ptr;
456         int ret;
457
458         for (bulk_enqueue_ptr = bulk_enqueue_tab;
459              *bulk_enqueue_ptr;
460              bulk_enqueue_ptr++) {
461
462                 for (bulk_dequeue_ptr = bulk_dequeue_tab;
463                      *bulk_dequeue_ptr;
464                      bulk_dequeue_ptr++) {
465
466                         ret = do_one_ring_test2(enq_core_count, deq_core_count,
467                                                 *bulk_enqueue_ptr,
468                                                 *bulk_dequeue_ptr,
469                                                 bulk_or_burst);
470                         if (ret < 0)
471                                 return -1;
472                 }
473         }
474         return 0;
475 }
476
477 static int
478 check_live_watermark_change(__attribute__((unused)) void *dummy)
479 {
480         uint64_t hz = rte_get_hpet_hz();
481         void *obj_table[MAX_BULK];
482         unsigned watermark, watermark_old = 16;
483         uint64_t cur_time, end_time;
484         int64_t diff = 0;
485         int i, ret;
486         unsigned count = 4;
487
488         /* init the object table */
489         memset(obj_table, 0, sizeof(obj_table));
490         end_time = rte_get_hpet_cycles() + (hz * 2);
491
492         /* check that bulk and watermark are 4 and 32 (respectively) */
493         while (diff >= 0) {
494
495                 /* add in ring until we reach watermark */
496                 ret = 0;
497                 for (i = 0; i < 16; i ++) {
498                         if (ret != 0)
499                                 break;
500                         ret = rte_ring_enqueue_bulk(r, obj_table, count);
501                 }
502
503                 if (ret != -EDQUOT) {
504                         printf("Cannot enqueue objects, or watermark not "
505                                "reached (ret=%d)\n", ret);
506                         return -1;
507                 }
508
509                 /* read watermark, the only change allowed is from 16 to 32 */
510                 watermark = r->prod.watermark;
511                 if (watermark != watermark_old &&
512                     (watermark_old != 16 || watermark != 32)) {
513                         printf("Bad watermark change %u -> %u\n", watermark_old,
514                                watermark);
515                         return -1;
516                 }
517                 watermark_old = watermark;
518
519                 /* dequeue objects from ring */
520                 while (i--) {
521                         ret = rte_ring_dequeue_bulk(r, obj_table, count);
522                         if (ret != 0) {
523                                 printf("Cannot dequeue (ret=%d)\n", ret);
524                                 return -1;
525                         }
526                 }
527
528                 cur_time = rte_get_hpet_cycles();
529                 diff = end_time - cur_time;
530         }
531
532         if (watermark_old != 32 ) {
533                 printf(" watermark was not updated (wm=%u)\n",
534                        watermark_old);
535                 return -1;
536         }
537
538         return 0;
539 }
540
541 static int
542 test_live_watermark_change(void)
543 {
544         unsigned lcore_id = rte_lcore_id();
545         unsigned lcore_id2 = rte_get_next_lcore(lcore_id, 0, 1);
546
547         printf("Test watermark live modification\n");
548         rte_ring_set_water_mark(r, 16);
549
550         /* launch a thread that will enqueue and dequeue, checking
551          * watermark and quota */
552         rte_eal_remote_launch(check_live_watermark_change, NULL, lcore_id2);
553
554         rte_delay_ms(1000);
555         rte_ring_set_water_mark(r, 32);
556         rte_delay_ms(1000);
557
558         if (rte_eal_wait_lcore(lcore_id2) < 0)
559                 return -1;
560
561         return 0;
562 }
563
564 /* Test for catch on invalid watermark values */
565 static int
566 test_set_watermark( void ){
567         unsigned count;
568         int setwm;
569
570         struct rte_ring *r = rte_ring_lookup("test_ring_basic_ex");
571         if(r == NULL){
572                 printf( " ring lookup failed\n" );
573                 goto error;
574         }
575         count = r->prod.size*2;
576         setwm = rte_ring_set_water_mark(r, count);
577         if (setwm != -EINVAL){
578                 printf("Test failed to detect invalid watermark count value\n");
579                 goto error;
580         }
581
582         count = 0;
583         rte_ring_set_water_mark(r, count);
584         if (r->prod.watermark != r->prod.size) {
585                 printf("Test failed to detect invalid watermark count value\n");
586                 goto error;
587         }
588         return 0;
589
590 error:
591         return -1;
592 }
593
594 /*
595  * helper routine for test_ring_basic
596  */
597 static int
598 test_ring_basic_full_empty(void * const src[], void *dst[])
599 {
600         unsigned i, rand;
601         const unsigned rsz = RING_SIZE - 1;
602
603         printf("Basic full/empty test\n");
604
605         for (i = 0; TEST_RING_FULL_EMTPY_ITER != i; i++) {
606
607                 /* random shift in the ring */
608                 rand = RTE_MAX(rte_rand() % RING_SIZE, 1UL);
609                 printf("%s: iteration %u, random shift: %u;\n",
610                     __func__, i, rand);
611                 TEST_RING_VERIFY(-ENOBUFS != rte_ring_enqueue_bulk(r, src,
612                     rand));
613                 TEST_RING_VERIFY(0 == rte_ring_dequeue_bulk(r, dst, rand));
614
615                 /* fill the ring */
616                 TEST_RING_VERIFY(-ENOBUFS != rte_ring_enqueue_bulk(r, src,
617                     rsz));
618                 TEST_RING_VERIFY(0 == rte_ring_free_count(r));
619                 TEST_RING_VERIFY(rsz == rte_ring_count(r));
620                 TEST_RING_VERIFY(rte_ring_full(r));
621                 TEST_RING_VERIFY(0 == rte_ring_empty(r));
622
623                 /* empty the ring */
624                 TEST_RING_VERIFY(0 == rte_ring_dequeue_bulk(r, dst, rsz));
625                 TEST_RING_VERIFY(rsz == rte_ring_free_count(r));
626                 TEST_RING_VERIFY(0 == rte_ring_count(r));
627                 TEST_RING_VERIFY(0 == rte_ring_full(r));
628                 TEST_RING_VERIFY(rte_ring_empty(r));
629
630                 /* check data */
631                 TEST_RING_VERIFY(0 == memcmp(src, dst, rsz));
632                 rte_ring_dump(r);
633         }
634         return (0);
635 }
636
637 static int
638 test_ring_basic(void)
639 {
640         void **src = NULL, **cur_src = NULL, **dst = NULL, **cur_dst = NULL;
641         int ret;
642         unsigned i, num_elems;
643
644         /* alloc dummy object pointers */
645         src = malloc(RING_SIZE*2*sizeof(void *));
646         if (src == NULL)
647                 goto fail;
648
649         for (i = 0; i < RING_SIZE*2 ; i++) {
650                 src[i] = (void *)(unsigned long)i;
651         }
652         cur_src = src;
653
654         /* alloc some room for copied objects */
655         dst = malloc(RING_SIZE*2*sizeof(void *));
656         if (dst == NULL)
657                 goto fail;
658
659         memset(dst, 0, RING_SIZE*2*sizeof(void *));
660         cur_dst = dst;
661
662         printf("enqueue 1 obj\n");
663         ret = rte_ring_sp_enqueue_bulk(r, cur_src, 1);
664         cur_src += 1;
665         if (ret != 0)
666                 goto fail;
667
668         printf("enqueue 2 objs\n");
669         ret = rte_ring_sp_enqueue_bulk(r, cur_src, 2);
670         cur_src += 2;
671         if (ret != 0)
672                 goto fail;
673
674         printf("enqueue MAX_BULK objs\n");
675         ret = rte_ring_sp_enqueue_bulk(r, cur_src, MAX_BULK);
676         cur_src += MAX_BULK;
677         if (ret != 0)
678                 goto fail;
679
680         printf("dequeue 1 obj\n");
681         ret = rte_ring_sc_dequeue_bulk(r, cur_dst, 1);
682         cur_dst += 1;
683         if (ret != 0)
684                 goto fail;
685
686         printf("dequeue 2 objs\n");
687         ret = rte_ring_sc_dequeue_bulk(r, cur_dst, 2);
688         cur_dst += 2;
689         if (ret != 0)
690                 goto fail;
691
692         printf("dequeue MAX_BULK objs\n");
693         ret = rte_ring_sc_dequeue_bulk(r, cur_dst, MAX_BULK);
694         cur_dst += MAX_BULK;
695         if (ret != 0)
696                 goto fail;
697
698         /* check data */
699         if (memcmp(src, dst, cur_dst - dst)) {
700                 rte_hexdump("src", src, cur_src - src);
701                 rte_hexdump("dst", dst, cur_dst - dst);
702                 printf("data after dequeue is not the same\n");
703                 goto fail;
704         }
705         cur_src = src;
706         cur_dst = dst;
707
708         printf("enqueue 1 obj\n");
709         ret = rte_ring_mp_enqueue_bulk(r, cur_src, 1);
710         cur_src += 1;
711         if (ret != 0)
712                 goto fail;
713
714         printf("enqueue 2 objs\n");
715         ret = rte_ring_mp_enqueue_bulk(r, cur_src, 2);
716         cur_src += 2;
717         if (ret != 0)
718                 goto fail;
719
720         printf("enqueue MAX_BULK objs\n");
721         ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK);
722         cur_src += MAX_BULK;
723         if (ret != 0)
724                 goto fail;
725
726         printf("dequeue 1 obj\n");
727         ret = rte_ring_mc_dequeue_bulk(r, cur_dst, 1);
728         cur_dst += 1;
729         if (ret != 0)
730                 goto fail;
731
732         printf("dequeue 2 objs\n");
733         ret = rte_ring_mc_dequeue_bulk(r, cur_dst, 2);
734         cur_dst += 2;
735         if (ret != 0)
736                 goto fail;
737
738         printf("dequeue MAX_BULK objs\n");
739         ret = rte_ring_mc_dequeue_bulk(r, cur_dst, MAX_BULK);
740         cur_dst += MAX_BULK;
741         if (ret != 0)
742                 goto fail;
743
744         /* check data */
745         if (memcmp(src, dst, cur_dst - dst)) {
746                 rte_hexdump("src", src, cur_src - src);
747                 rte_hexdump("dst", dst, cur_dst - dst);
748                 printf("data after dequeue is not the same\n");
749                 goto fail;
750         }
751         cur_src = src;
752         cur_dst = dst;
753
754         printf("fill and empty the ring\n");
755         for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
756                 ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK);
757                 cur_src += MAX_BULK;
758                 if (ret != 0)
759                         goto fail;
760                 ret = rte_ring_mc_dequeue_bulk(r, cur_dst, MAX_BULK);
761                 cur_dst += MAX_BULK;
762                 if (ret != 0)
763                         goto fail;
764         }
765
766         /* check data */
767         if (memcmp(src, dst, cur_dst - dst)) {
768                 rte_hexdump("src", src, cur_src - src);
769                 rte_hexdump("dst", dst, cur_dst - dst);
770                 printf("data after dequeue is not the same\n");
771                 goto fail;
772         }
773
774         if (test_ring_basic_full_empty(src, dst) != 0)
775                 goto fail;
776
777         cur_src = src;
778         cur_dst = dst;
779
780         printf("test watermark and default bulk enqueue / dequeue\n");
781         rte_ring_set_water_mark(r, 20);
782         num_elems = 16;
783
784         cur_src = src;
785         cur_dst = dst;
786
787         ret = rte_ring_enqueue_bulk(r, cur_src, num_elems);
788         cur_src += num_elems;
789         if (ret != 0) {
790                 printf("Cannot enqueue\n");
791                 goto fail;
792         }
793         ret = rte_ring_enqueue_bulk(r, cur_src, num_elems);
794         cur_src += num_elems;
795         if (ret != -EDQUOT) {
796                 printf("Watermark not exceeded\n");
797                 goto fail;
798         }
799         ret = rte_ring_dequeue_bulk(r, cur_dst, num_elems);
800         cur_dst += num_elems;
801         if (ret != 0) {
802                 printf("Cannot dequeue\n");
803                 goto fail;
804         }
805         ret = rte_ring_dequeue_bulk(r, cur_dst, num_elems);
806         cur_dst += num_elems;
807         if (ret != 0) {
808                 printf("Cannot dequeue2\n");
809                 goto fail;
810         }
811
812         /* check data */
813         if (memcmp(src, dst, cur_dst - dst)) {
814                 rte_hexdump("src", src, cur_src - src);
815                 rte_hexdump("dst", dst, cur_dst - dst);
816                 printf("data after dequeue is not the same\n");
817                 goto fail;
818         }
819
820         cur_src = src;
821         cur_dst = dst;
822
823         ret = rte_ring_mp_enqueue(r, cur_src);
824         if (ret != 0)
825                 goto fail;
826
827         ret = rte_ring_mc_dequeue(r, cur_dst);
828         if (ret != 0)
829                 goto fail;
830
831         if (src)
832                 free(src);
833         if (dst)
834                 free(dst);
835         return 0;
836
837  fail:
838         if (src)
839                 free(src);
840         if (dst)
841                 free(dst);
842         return -1;
843 }
844
845 static int
846 test_ring_burst_basic(void)
847 {
848         void **src = NULL, **cur_src = NULL, **dst = NULL, **cur_dst = NULL;
849         int ret;
850         unsigned i;
851
852         /* alloc dummy object pointers */
853         src = malloc(RING_SIZE*2*sizeof(void *));
854         if (src == NULL)
855                 goto fail;
856
857         for (i = 0; i < RING_SIZE*2 ; i++) {
858                 src[i] = (void *)(unsigned long)i;
859         }
860         cur_src = src;
861
862         /* alloc some room for copied objects */
863         dst = malloc(RING_SIZE*2*sizeof(void *));
864         if (dst == NULL)
865                 goto fail;
866
867         memset(dst, 0, RING_SIZE*2*sizeof(void *));
868         cur_dst = dst;
869
870         printf("Test SP & SC basic functions \n");
871         printf("enqueue 1 obj\n");
872         ret = rte_ring_sp_enqueue_burst(r, cur_src, 1);
873         cur_src += 1;
874         if ((ret & RTE_RING_SZ_MASK) != 1)
875                 goto fail;
876
877         printf("enqueue 2 objs\n");
878         ret = rte_ring_sp_enqueue_burst(r, cur_src, 2);
879         cur_src += 2;
880         if ((ret & RTE_RING_SZ_MASK) != 2)
881                 goto fail;
882
883         printf("enqueue MAX_BULK objs\n");
884         ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK) ;
885         cur_src += MAX_BULK;
886         if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
887                 goto fail;
888
889         printf("dequeue 1 obj\n");
890         ret = rte_ring_sc_dequeue_burst(r, cur_dst, 1) ;
891         cur_dst += 1;
892         if ((ret & RTE_RING_SZ_MASK) != 1)
893                 goto fail;
894
895         printf("dequeue 2 objs\n");
896         ret = rte_ring_sc_dequeue_burst(r, cur_dst, 2);
897         cur_dst += 2;
898         if ((ret & RTE_RING_SZ_MASK) != 2)
899                 goto fail;
900
901         printf("dequeue MAX_BULK objs\n");
902         ret = rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
903         cur_dst += MAX_BULK;
904         if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
905                 goto fail;
906
907         /* check data */
908         if (memcmp(src, dst, cur_dst - dst)) {
909                 rte_hexdump("src", src, cur_src - src);
910                 rte_hexdump("dst", dst, cur_dst - dst);
911                 printf("data after dequeue is not the same\n");
912                 goto fail;
913         }
914
915         cur_src = src;
916         cur_dst = dst;
917
918         printf("Test enqueue without enough memory space \n");
919         for (i = 0; i< (RING_SIZE/MAX_BULK - 1); i++) {
920                 ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
921                 cur_src += MAX_BULK;
922                 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK) {
923                         goto fail;
924                 }
925         }
926
927         printf("Enqueue 2 objects, free entries = MAX_BULK - 2  \n");
928         ret = rte_ring_sp_enqueue_burst(r, cur_src, 2);
929         cur_src += 2;
930         if ((ret & RTE_RING_SZ_MASK) != 2)
931                 goto fail;
932
933         printf("Enqueue the remaining entries = MAX_BULK - 2  \n");
934         /* Always one free entry left */
935         ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
936         cur_src += MAX_BULK - 3;
937         if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
938                 goto fail;
939
940         printf("Test if ring is full  \n");
941         if (rte_ring_full(r) != 1)
942                 goto fail;
943
944         printf("Test enqueue for a full entry  \n");
945         ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
946         if ((ret & RTE_RING_SZ_MASK) != 0)
947                 goto fail;
948
949         printf("Test dequeue without enough objects \n");
950         for (i = 0; i<RING_SIZE/MAX_BULK - 1; i++) {
951                 ret = rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
952                 cur_dst += MAX_BULK;
953                 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
954                         goto fail;
955         }
956
957         /* Available memory space for the exact MAX_BULK entries */
958         ret = rte_ring_sc_dequeue_burst(r, cur_dst, 2);
959         cur_dst += 2;
960         if ((ret & RTE_RING_SZ_MASK) != 2)
961                 goto fail;
962
963         ret = rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
964         cur_dst += MAX_BULK - 3;
965         if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
966                 goto fail;
967
968         printf("Test if ring is empty \n");
969         /* Check if ring is empty */
970         if (1 != rte_ring_empty(r))
971                 goto fail;
972
973         /* check data */
974         if (memcmp(src, dst, cur_dst - dst)) {
975                 rte_hexdump("src", src, cur_src - src);
976                 rte_hexdump("dst", dst, cur_dst - dst);
977                 printf("data after dequeue is not the same\n");
978                 goto fail;
979         }
980
981         cur_src = src;
982         cur_dst = dst;
983
984         printf("Test MP & MC basic functions \n");
985
986         printf("enqueue 1 obj\n");
987         ret = rte_ring_mp_enqueue_burst(r, cur_src, 1);
988         cur_src += 1;
989         if ((ret & RTE_RING_SZ_MASK) != 1)
990                 goto fail;
991
992         printf("enqueue 2 objs\n");
993         ret = rte_ring_mp_enqueue_burst(r, cur_src, 2);
994         cur_src += 2;
995         if ((ret & RTE_RING_SZ_MASK) != 2)
996                 goto fail;
997
998         printf("enqueue MAX_BULK objs\n");
999         ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
1000         cur_src += MAX_BULK;
1001         if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1002                 goto fail;
1003
1004         printf("dequeue 1 obj\n");
1005         ret = rte_ring_mc_dequeue_burst(r, cur_dst, 1);
1006         cur_dst += 1;
1007         if ((ret & RTE_RING_SZ_MASK) != 1)
1008                 goto fail;
1009
1010         printf("dequeue 2 objs\n");
1011         ret = rte_ring_mc_dequeue_burst(r, cur_dst, 2);
1012         cur_dst += 2;
1013         if ((ret & RTE_RING_SZ_MASK) != 2)
1014                 goto fail;
1015
1016         printf("dequeue MAX_BULK objs\n");
1017         ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK);
1018         cur_dst += MAX_BULK;
1019         if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1020                 goto fail;
1021
1022         /* check data */
1023         if (memcmp(src, dst, cur_dst - dst)) {
1024                 rte_hexdump("src", src, cur_src - src);
1025                 rte_hexdump("dst", dst, cur_dst - dst);
1026                 printf("data after dequeue is not the same\n");
1027                 goto fail;
1028         }
1029
1030         cur_src = src;
1031         cur_dst = dst;
1032
1033         printf("fill and empty the ring\n");
1034         for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
1035                 ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
1036                 cur_src += MAX_BULK;
1037                 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1038                         goto fail;
1039                 ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK);
1040                 cur_dst += MAX_BULK;
1041                 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1042                         goto fail;
1043         }
1044
1045         /* check data */
1046         if (memcmp(src, dst, cur_dst - dst)) {
1047                 rte_hexdump("src", src, cur_src - src);
1048                 rte_hexdump("dst", dst, cur_dst - dst);
1049                 printf("data after dequeue is not the same\n");
1050                 goto fail;
1051         }
1052
1053         cur_src = src;
1054         cur_dst = dst;
1055
1056         printf("Test enqueue without enough memory space \n");
1057         for (i = 0; i<RING_SIZE/MAX_BULK - 1; i++) {
1058                 ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
1059                 cur_src += MAX_BULK;
1060                 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1061                         goto fail;
1062         }
1063
1064         /* Available memory space for the exact MAX_BULK objects */
1065         ret = rte_ring_mp_enqueue_burst(r, cur_src, 2);
1066         cur_src += 2;
1067         if ((ret & RTE_RING_SZ_MASK) != 2)
1068                 goto fail;
1069
1070         ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
1071         cur_src += MAX_BULK - 3;
1072         if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
1073                 goto fail;
1074
1075
1076         printf("Test dequeue without enough objects \n");
1077         for (i = 0; i<RING_SIZE/MAX_BULK - 1; i++) {
1078                 ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK);
1079                 cur_dst += MAX_BULK;
1080                 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1081                         goto fail;
1082         }
1083
1084         /* Available objects - the exact MAX_BULK */
1085         ret = rte_ring_mc_dequeue_burst(r, cur_dst, 2);
1086         cur_dst += 2;
1087         if ((ret & RTE_RING_SZ_MASK) != 2)
1088                 goto fail;
1089
1090         ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK);
1091         cur_dst += MAX_BULK - 3;
1092         if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
1093                 goto fail;
1094
1095         /* check data */
1096         if (memcmp(src, dst, cur_dst - dst)) {
1097                 rte_hexdump("src", src, cur_src - src);
1098                 rte_hexdump("dst", dst, cur_dst - dst);
1099                 printf("data after dequeue is not the same\n");
1100                 goto fail;
1101         }
1102
1103         cur_src = src;
1104         cur_dst = dst;
1105
1106         printf("Covering rte_ring_enqueue_burst functions \n");
1107
1108         ret = rte_ring_enqueue_burst(r, cur_src, 2);
1109         cur_src += 2;
1110         if ((ret & RTE_RING_SZ_MASK) != 2)
1111                 goto fail;
1112
1113         ret = rte_ring_dequeue_burst(r, cur_dst, 2);
1114         cur_dst += 2;
1115         if (ret != 2)
1116                 goto fail;
1117
1118         /* Free memory before test completed */
1119         if (src)
1120                 free(src);
1121         if (dst)
1122                 free(dst);
1123         return 0;
1124
1125  fail:
1126         if (src)
1127                 free(src);
1128         if (dst)
1129                 free(dst);
1130         return -1;
1131 }
1132
1133 static int
1134 test_ring_stats(void)
1135 {
1136
1137 #ifndef RTE_LIBRTE_RING_DEBUG
1138         printf("Enable RTE_LIBRTE_RING_DEBUG to test ring stats.\n");
1139         return 0;
1140 #else
1141         void **src = NULL, **cur_src = NULL, **dst = NULL, **cur_dst = NULL;
1142         int ret;
1143         unsigned i;
1144         unsigned num_items            = 0;
1145         unsigned failed_enqueue_ops   = 0;
1146         unsigned failed_enqueue_items = 0;
1147         unsigned failed_dequeue_ops   = 0;
1148         unsigned failed_dequeue_items = 0;
1149         unsigned last_enqueue_ops     = 0;
1150         unsigned last_enqueue_items   = 0;
1151         unsigned last_quota_ops       = 0;
1152         unsigned last_quota_items     = 0;
1153         unsigned lcore_id = rte_lcore_id();
1154         struct rte_ring_debug_stats *ring_stats = &r->stats[lcore_id];
1155
1156         printf("Test the ring stats.\n");
1157
1158         /* Reset the watermark in case it was set in another test. */
1159         rte_ring_set_water_mark(r, 0);
1160
1161         /* Reset the ring stats. */
1162         memset(&r->stats[lcore_id], 0, sizeof(r->stats[lcore_id]));
1163
1164         /* Allocate some dummy object pointers. */
1165         src = malloc(RING_SIZE*2*sizeof(void *));
1166         if (src == NULL)
1167                 goto fail;
1168
1169         for (i = 0; i < RING_SIZE*2 ; i++) {
1170                 src[i] = (void *)(unsigned long)i;
1171         }
1172
1173         /* Allocate some memory for copied objects. */
1174         dst = malloc(RING_SIZE*2*sizeof(void *));
1175         if (dst == NULL)
1176                 goto fail;
1177
1178         memset(dst, 0, RING_SIZE*2*sizeof(void *));
1179
1180         /* Set the head and tail pointers. */
1181         cur_src = src;
1182         cur_dst = dst;
1183
1184         /* Do Enqueue tests. */
1185         printf("Test the dequeue stats.\n");
1186
1187         /* Fill the ring up to RING_SIZE -1. */
1188         printf("Fill the ring.\n");
1189         for (i = 0; i< (RING_SIZE/MAX_BULK); i++) {
1190                 rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
1191                 cur_src += MAX_BULK;
1192         }
1193
1194         /* Adjust for final enqueue = MAX_BULK -1. */
1195         cur_src--;
1196
1197         printf("Verify that the ring is full.\n");
1198         if (rte_ring_full(r) != 1)
1199                 goto fail;
1200
1201
1202         printf("Verify the enqueue success stats.\n");
1203         /* Stats should match above enqueue operations to fill the ring. */
1204         if (ring_stats->enq_success_bulk != (RING_SIZE/MAX_BULK))
1205                 goto fail;
1206
1207         /* Current max objects is RING_SIZE -1. */
1208         if (ring_stats->enq_success_objs != RING_SIZE -1)
1209                 goto fail;
1210
1211         /* Shouldn't have any failures yet. */
1212         if (ring_stats->enq_fail_bulk != 0)
1213                 goto fail;
1214         if (ring_stats->enq_fail_objs != 0)
1215                 goto fail;
1216
1217
1218         printf("Test stats for SP burst enqueue to a full ring.\n");
1219         num_items = 2;
1220         ret = rte_ring_sp_enqueue_burst(r, cur_src, num_items);
1221         if ((ret & RTE_RING_SZ_MASK) != 0)
1222                 goto fail;
1223
1224         failed_enqueue_ops   += 1;
1225         failed_enqueue_items += num_items;
1226
1227         /* The enqueue should have failed. */
1228         if (ring_stats->enq_fail_bulk != failed_enqueue_ops)
1229                 goto fail;
1230         if (ring_stats->enq_fail_objs != failed_enqueue_items)
1231                 goto fail;
1232
1233
1234         printf("Test stats for SP bulk enqueue to a full ring.\n");
1235         num_items = 4;
1236         ret = rte_ring_sp_enqueue_bulk(r, cur_src, num_items);
1237         if (ret != -ENOBUFS)
1238                 goto fail;
1239
1240         failed_enqueue_ops   += 1;
1241         failed_enqueue_items += num_items;
1242
1243         /* The enqueue should have failed. */
1244         if (ring_stats->enq_fail_bulk != failed_enqueue_ops)
1245                 goto fail;
1246         if (ring_stats->enq_fail_objs != failed_enqueue_items)
1247                 goto fail;
1248
1249
1250         printf("Test stats for MP burst enqueue to a full ring.\n");
1251         num_items = 8;
1252         ret = rte_ring_mp_enqueue_burst(r, cur_src, num_items);
1253         if ((ret & RTE_RING_SZ_MASK) != 0)
1254                 goto fail;
1255
1256         failed_enqueue_ops   += 1;
1257         failed_enqueue_items += num_items;
1258
1259         /* The enqueue should have failed. */
1260         if (ring_stats->enq_fail_bulk != failed_enqueue_ops)
1261                 goto fail;
1262         if (ring_stats->enq_fail_objs != failed_enqueue_items)
1263                 goto fail;
1264
1265
1266         printf("Test stats for MP bulk enqueue to a full ring.\n");
1267         num_items = 16;
1268         ret = rte_ring_mp_enqueue_bulk(r, cur_src, num_items);
1269         if (ret != -ENOBUFS)
1270                 goto fail;
1271
1272         failed_enqueue_ops   += 1;
1273         failed_enqueue_items += num_items;
1274
1275         /* The enqueue should have failed. */
1276         if (ring_stats->enq_fail_bulk != failed_enqueue_ops)
1277                 goto fail;
1278         if (ring_stats->enq_fail_objs != failed_enqueue_items)
1279                 goto fail;
1280
1281
1282         /* Do Dequeue tests. */
1283         printf("Test the dequeue stats.\n");
1284
1285         printf("Empty the ring.\n");
1286         for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
1287                 rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
1288                 cur_dst += MAX_BULK;
1289         }
1290
1291         /* There was only RING_SIZE -1 objects to dequeue. */
1292         cur_dst++;
1293
1294         printf("Verify ring is empty.\n");
1295         if (1 != rte_ring_empty(r))
1296                 goto fail;
1297
1298         printf("Verify the dequeue success stats.\n");
1299         /* Stats should match above dequeue operations. */
1300         if (ring_stats->deq_success_bulk != (RING_SIZE/MAX_BULK))
1301                 goto fail;
1302
1303         /* Objects dequeued is RING_SIZE -1. */
1304         if (ring_stats->deq_success_objs != RING_SIZE -1)
1305                 goto fail;
1306
1307         /* Shouldn't have any dequeue failure stats yet. */
1308         if (ring_stats->deq_fail_bulk != 0)
1309                 goto fail;
1310
1311         printf("Test stats for SC burst dequeue with an empty ring.\n");
1312         num_items = 2;
1313         ret = rte_ring_sc_dequeue_burst(r, cur_dst, num_items);
1314         if ((ret & RTE_RING_SZ_MASK) != 0)
1315                 goto fail;
1316
1317         failed_dequeue_ops   += 1;
1318         failed_dequeue_items += num_items;
1319
1320         /* The dequeue should have failed. */
1321         if (ring_stats->deq_fail_bulk != failed_dequeue_ops)
1322                 goto fail;
1323         if (ring_stats->deq_fail_objs != failed_dequeue_items)
1324                 goto fail;
1325
1326
1327         printf("Test stats for SC bulk dequeue with an empty ring.\n");
1328         num_items = 4;
1329         ret = rte_ring_sc_dequeue_bulk(r, cur_dst, num_items);
1330         if (ret != -ENOENT)
1331                 goto fail;
1332
1333         failed_dequeue_ops   += 1;
1334         failed_dequeue_items += num_items;
1335
1336         /* The dequeue should have failed. */
1337         if (ring_stats->deq_fail_bulk != failed_dequeue_ops)
1338                 goto fail;
1339         if (ring_stats->deq_fail_objs != failed_dequeue_items)
1340                 goto fail;
1341
1342
1343         printf("Test stats for MC burst dequeue with an empty ring.\n");
1344         num_items = 8;
1345         ret = rte_ring_mc_dequeue_burst(r, cur_dst, num_items);
1346         if ((ret & RTE_RING_SZ_MASK) != 0)
1347                 goto fail;
1348         failed_dequeue_ops   += 1;
1349         failed_dequeue_items += num_items;
1350
1351         /* The dequeue should have failed. */
1352         if (ring_stats->deq_fail_bulk != failed_dequeue_ops)
1353                 goto fail;
1354         if (ring_stats->deq_fail_objs != failed_dequeue_items)
1355                 goto fail;
1356
1357
1358         printf("Test stats for MC bulk dequeue with an empty ring.\n");
1359         num_items = 16;
1360         ret = rte_ring_mc_dequeue_bulk(r, cur_dst, num_items);
1361         if (ret != -ENOENT)
1362                 goto fail;
1363
1364         failed_dequeue_ops   += 1;
1365         failed_dequeue_items += num_items;
1366
1367         /* The dequeue should have failed. */
1368         if (ring_stats->deq_fail_bulk != failed_dequeue_ops)
1369                 goto fail;
1370         if (ring_stats->deq_fail_objs != failed_dequeue_items)
1371                 goto fail;
1372
1373
1374         printf("Test total enqueue/dequeue stats.\n");
1375         /* At this point the enqueue and dequeue stats should be the same. */
1376         if (ring_stats->enq_success_bulk != ring_stats->deq_success_bulk)
1377                 goto fail;
1378         if (ring_stats->enq_success_objs != ring_stats->deq_success_objs)
1379                 goto fail;
1380         if (ring_stats->enq_fail_bulk    != ring_stats->deq_fail_bulk)
1381                 goto fail;
1382         if (ring_stats->enq_fail_objs    != ring_stats->deq_fail_objs)
1383                 goto fail;
1384
1385
1386         /* Watermark Tests. */
1387         printf("Test the watermark/quota stats.\n");
1388
1389         printf("Verify the initial watermark stats.\n");
1390         /* Watermark stats should be 0 since there is no watermark. */
1391         if (ring_stats->enq_quota_bulk != 0)
1392                 goto fail;
1393         if (ring_stats->enq_quota_objs != 0)
1394                 goto fail;
1395
1396         /* Set a watermark. */
1397         rte_ring_set_water_mark(r, 16);
1398
1399         /* Reset pointers. */
1400         cur_src = src;
1401         cur_dst = dst;
1402
1403         last_enqueue_ops   = ring_stats->enq_success_bulk;
1404         last_enqueue_items = ring_stats->enq_success_objs;
1405
1406
1407         printf("Test stats for SP burst enqueue below watermark.\n");
1408         num_items = 8;
1409         ret = rte_ring_sp_enqueue_burst(r, cur_src, num_items);
1410         if ((ret & RTE_RING_SZ_MASK) != num_items)
1411                 goto fail;
1412
1413         /* Watermark stats should still be 0. */
1414         if (ring_stats->enq_quota_bulk != 0)
1415                 goto fail;
1416         if (ring_stats->enq_quota_objs != 0)
1417                 goto fail;
1418
1419         /* Success stats should have increased. */
1420         if (ring_stats->enq_success_bulk != last_enqueue_ops + 1)
1421                 goto fail;
1422         if (ring_stats->enq_success_objs != last_enqueue_items + num_items)
1423                 goto fail;
1424
1425         last_enqueue_ops   = ring_stats->enq_success_bulk;
1426         last_enqueue_items = ring_stats->enq_success_objs;
1427
1428
1429         printf("Test stats for SP burst enqueue at watermark.\n");
1430         num_items = 8;
1431         ret = rte_ring_sp_enqueue_burst(r, cur_src, num_items);
1432         if ((ret & RTE_RING_SZ_MASK) != num_items)
1433                 goto fail;
1434
1435         /* Watermark stats should have changed. */
1436         if (ring_stats->enq_quota_bulk != 1)
1437                 goto fail;
1438         if (ring_stats->enq_quota_objs != num_items)
1439                 goto fail;
1440
1441         last_quota_ops   = ring_stats->enq_quota_bulk;
1442         last_quota_items = ring_stats->enq_quota_objs;
1443
1444
1445         printf("Test stats for SP burst enqueue above watermark.\n");
1446         num_items = 1;
1447         ret = rte_ring_sp_enqueue_burst(r, cur_src, num_items);
1448         if ((ret & RTE_RING_SZ_MASK) != num_items)
1449                 goto fail;
1450
1451         /* Watermark stats should have changed. */
1452         if (ring_stats->enq_quota_bulk != last_quota_ops +1)
1453                 goto fail;
1454         if (ring_stats->enq_quota_objs != last_quota_items + num_items)
1455                 goto fail;
1456
1457         last_quota_ops   = ring_stats->enq_quota_bulk;
1458         last_quota_items = ring_stats->enq_quota_objs;
1459
1460
1461         printf("Test stats for MP burst enqueue above watermark.\n");
1462         num_items = 2;
1463         ret = rte_ring_mp_enqueue_burst(r, cur_src, num_items);
1464         if ((ret & RTE_RING_SZ_MASK) != num_items)
1465                 goto fail;
1466
1467         /* Watermark stats should have changed. */
1468         if (ring_stats->enq_quota_bulk != last_quota_ops +1)
1469                 goto fail;
1470         if (ring_stats->enq_quota_objs != last_quota_items + num_items)
1471                 goto fail;
1472
1473         last_quota_ops   = ring_stats->enq_quota_bulk;
1474         last_quota_items = ring_stats->enq_quota_objs;
1475
1476
1477         printf("Test stats for SP bulk enqueue above watermark.\n");
1478         num_items = 4;
1479         ret = rte_ring_sp_enqueue_bulk(r, cur_src, num_items);
1480         if (ret != -EDQUOT)
1481                 goto fail;
1482
1483         /* Watermark stats should have changed. */
1484         if (ring_stats->enq_quota_bulk != last_quota_ops +1)
1485                 goto fail;
1486         if (ring_stats->enq_quota_objs != last_quota_items + num_items)
1487                 goto fail;
1488
1489         last_quota_ops   = ring_stats->enq_quota_bulk;
1490         last_quota_items = ring_stats->enq_quota_objs;
1491
1492
1493         printf("Test stats for MP bulk enqueue above watermark.\n");
1494         num_items = 8;
1495         ret = rte_ring_mp_enqueue_bulk(r, cur_src, num_items);
1496         if (ret != -EDQUOT)
1497                 goto fail;
1498
1499         /* Watermark stats should have changed. */
1500         if (ring_stats->enq_quota_bulk != last_quota_ops +1)
1501                 goto fail;
1502         if (ring_stats->enq_quota_objs != last_quota_items + num_items)
1503                 goto fail;
1504
1505         printf("Test watermark success stats.\n");
1506         /* Success stats should be same as last non-watermarked enqueue. */
1507         if (ring_stats->enq_success_bulk != last_enqueue_ops)
1508                 goto fail;
1509         if (ring_stats->enq_success_objs != last_enqueue_items)
1510                 goto fail;
1511
1512
1513         /* Cleanup. */
1514
1515         /* Empty the ring. */
1516         for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
1517                 rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
1518                 cur_dst += MAX_BULK;
1519         }
1520
1521         /* Reset the watermark. */
1522         rte_ring_set_water_mark(r, 0);
1523
1524         /* Reset the ring stats. */
1525         memset(&r->stats[lcore_id], 0, sizeof(r->stats[lcore_id]));
1526
1527         /* Free memory before test completed */
1528         if (src)
1529                 free(src);
1530         if (dst)
1531                 free(dst);
1532         return 0;
1533
1534 fail:
1535         if (src)
1536                 free(src);
1537         if (dst)
1538                 free(dst);
1539         return -1;
1540 #endif
1541 }
1542
1543 /*
1544  * it will always fail to create ring with a wrong ring size number in this function
1545  */
1546 static int
1547 test_ring_creation_with_wrong_size(void)
1548 {
1549         struct rte_ring * rp = NULL;
1550
1551         /* Test if ring size is not power of 2 */
1552         rp = rte_ring_create("test_bad_ring_size", RING_SIZE + 1, SOCKET_ID_ANY, 0);
1553         if (NULL != rp) {
1554                 return -1;
1555         }
1556
1557         /* Test if ring size is exceeding the limit */
1558         rp = rte_ring_create("test_bad_ring_size", (RTE_RING_SZ_MASK + 1), SOCKET_ID_ANY, 0);
1559         if (NULL != rp) {
1560                 return -1;
1561         }
1562         return 0;
1563 }
1564
1565 /*
1566  * it tests if it would always fail to create ring with an used ring name
1567  */
1568 static int
1569 test_ring_creation_with_an_used_name(void)
1570 {
1571         struct rte_ring * rp;
1572
1573         rp = rte_ring_create("test", RING_SIZE, SOCKET_ID_ANY, 0);
1574         if (NULL != rp)
1575                 return -1;
1576
1577         return 0;
1578 }
1579
1580 /*
1581  * Test to if a non-power of 2 count causes the create
1582  * function to fail correctly
1583  */
1584 static int
1585 test_create_count_odd(void)
1586 {
1587         struct rte_ring *r = rte_ring_create("test_ring_count",
1588                         4097, SOCKET_ID_ANY, 0 );
1589         if(r != NULL){
1590                 return -1;
1591         }
1592         return 0;
1593 }
1594
1595 static int
1596 test_lookup_null(void)
1597 {
1598         struct rte_ring *rlp = rte_ring_lookup("ring_not_found");
1599         if (rlp ==NULL)
1600         if (rte_errno != ENOENT){
1601                 printf( "test failed to returnn error on null pointer\n");
1602                 return -1;
1603         }
1604         return 0;
1605 }
1606
1607 /*
1608  * it tests some more basic ring operations
1609  */
1610 static int
1611 test_ring_basic_ex(void)
1612 {
1613         int ret = -1;
1614         unsigned i;
1615         struct rte_ring * rp;
1616         void **obj = NULL;
1617
1618         obj = (void **)rte_zmalloc("test_ring_basic_ex_malloc", (RING_SIZE * sizeof(void *)), 0);
1619         if (obj == NULL) {
1620                 printf("test_ring_basic_ex fail to rte_malloc\n");
1621                 goto fail_test;
1622         }
1623
1624         rp = rte_ring_create("test_ring_basic_ex", RING_SIZE, SOCKET_ID_ANY,
1625                         RING_F_SP_ENQ | RING_F_SC_DEQ);
1626         if (rp == NULL) {
1627                 printf("test_ring_basic_ex fail to create ring\n");
1628                 goto fail_test;
1629         }
1630
1631         if (rte_ring_lookup("test_ring_basic_ex") != rp) {
1632                 goto fail_test;
1633         }
1634
1635         if (rte_ring_empty(rp) != 1) {
1636                 printf("test_ring_basic_ex ring is not empty but it should be\n");
1637                 goto fail_test;
1638         }
1639
1640         printf("%u ring entries are now free\n", rte_ring_free_count(rp));
1641
1642         for (i = 0; i < RING_SIZE; i ++) {
1643                 rte_ring_enqueue(rp, obj[i]);
1644         }
1645
1646         if (rte_ring_full(rp) != 1) {
1647                 printf("test_ring_basic_ex ring is not full but it should be\n");
1648                 goto fail_test;
1649         }
1650
1651         for (i = 0; i < RING_SIZE; i ++) {
1652                 rte_ring_dequeue(rp, &obj[i]);
1653         }
1654
1655         if (rte_ring_empty(rp) != 1) {
1656                 printf("test_ring_basic_ex ring is not empty but it should be\n");
1657                 goto fail_test;
1658         }
1659
1660         /* Covering the ring burst operation */
1661         ret = rte_ring_enqueue_burst(rp, obj, 2);
1662         if ((ret & RTE_RING_SZ_MASK) != 2) {
1663                 printf("test_ring_basic_ex: rte_ring_enqueue_burst fails \n");
1664                 goto fail_test;
1665         }
1666
1667         ret = rte_ring_dequeue_burst(rp, obj, 2);
1668         if (ret != 2) {
1669                 printf("test_ring_basic_ex: rte_ring_dequeue_burst fails \n");
1670                 goto fail_test;
1671         }
1672
1673         ret = 0;
1674 fail_test:
1675         if (obj != NULL)
1676                 rte_free(obj);
1677
1678         return ret;
1679 }
1680
1681 int
1682 test_ring(void)
1683 {
1684         unsigned enq_core_count, deq_core_count;
1685
1686         /* some more basic operations */
1687         if (test_ring_basic_ex() < 0)
1688                 return -1;
1689
1690         rte_atomic32_init(&synchro);
1691
1692         if (r == NULL)
1693                 r = rte_ring_create("test", RING_SIZE, SOCKET_ID_ANY, 0);
1694         if (r == NULL)
1695                 return -1;
1696
1697         /* retrieve the ring from its name */
1698         if (rte_ring_lookup("test") != r) {
1699                 printf("Cannot lookup ring from its name\n");
1700                 return -1;
1701         }
1702
1703         /* burst operations */
1704         if (test_ring_burst_basic() < 0)
1705                 return -1;
1706
1707         /* basic operations */
1708         if (test_ring_basic() < 0)
1709                 return -1;
1710
1711         /* ring stats */
1712         if (test_ring_stats() < 0)
1713                 return -1;
1714
1715         /* basic operations */
1716         if (test_live_watermark_change() < 0)
1717                 return -1;
1718
1719         if ( test_set_watermark() < 0){
1720                 printf ("Test failed to detect invalid parameter\n");
1721                 return -1;
1722         }
1723         else
1724                 printf ( "Test detected forced bad watermark values\n");
1725
1726         if ( test_create_count_odd() < 0){
1727                         printf ("Test failed to detect odd count\n");
1728                         return -1;
1729                 }
1730                 else
1731                         printf ( "Test detected odd count\n");
1732
1733         if ( test_lookup_null() < 0){
1734                                 printf ("Test failed to detect NULL ring lookup\n");
1735                                 return -1;
1736                         }
1737                         else
1738                                 printf ( "Test detected NULL ring lookup \n");
1739
1740         printf("start performance tests \n");
1741
1742         /* one lcore for enqueue, one for dequeue */
1743         enq_core_count = 1;
1744         deq_core_count = 1;
1745         if (do_one_ring_test(enq_core_count, deq_core_count, 1) < 0)
1746                 return -1;
1747
1748         /* max cores for enqueue, one for dequeue */
1749         enq_core_count = rte_lcore_count() - 1;
1750         deq_core_count = 1;
1751         if (do_one_ring_test(enq_core_count, deq_core_count, 1) < 0)
1752                 return -1;
1753
1754         /* max cores for dequeue, one for enqueue */
1755         enq_core_count = 1;
1756         deq_core_count = rte_lcore_count() - 1;
1757         if (do_one_ring_test(enq_core_count, deq_core_count, 1) < 0)
1758                 return -1;
1759
1760         /* half for enqueue and half for dequeue */
1761         enq_core_count = rte_lcore_count() / 2;
1762         deq_core_count = rte_lcore_count() / 2;
1763         if (do_one_ring_test(enq_core_count, deq_core_count, 1) < 0)
1764                 return -1;
1765
1766         printf("start performance tests - burst operations \n");
1767
1768         /* one lcore for enqueue, one for dequeue */
1769         enq_core_count = 1;
1770         deq_core_count = 1;
1771         if (do_one_ring_test(enq_core_count, deq_core_count, 0) < 0)
1772                 return -1;
1773
1774         /* max cores for enqueue, one for dequeue */
1775         enq_core_count = rte_lcore_count() - 1;
1776         deq_core_count = 1;
1777         if (do_one_ring_test(enq_core_count, deq_core_count, 0) < 0)
1778                 return -1;
1779
1780         /* max cores for dequeue, one for enqueue */
1781         enq_core_count = 1;
1782         deq_core_count = rte_lcore_count() - 1;
1783         if (do_one_ring_test(enq_core_count, deq_core_count, 0) < 0)
1784                 return -1;
1785
1786         /* half for enqueue and half for dequeue */
1787         enq_core_count = rte_lcore_count() / 2;
1788         deq_core_count = rte_lcore_count() / 2;
1789         if (do_one_ring_test(enq_core_count, deq_core_count, 0) < 0)
1790                 return -1;
1791
1792         /* test of creating ring with wrong size */
1793         if (test_ring_creation_with_wrong_size() < 0)
1794                 return -1;
1795
1796         /* test of creation ring with an used name */
1797         if (test_ring_creation_with_an_used_name() < 0)
1798                 return -1;
1799
1800         /* dump the ring status */
1801         rte_ring_list_dump();
1802
1803         return 0;
1804 }