ring: add burst API
[dpdk.git] / app / test / test_ring.c
1 /*-
2  *   BSD LICENSE
3  * 
4  *   Copyright(c) 2010-2012 Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  * 
7  *   Redistribution and use in source and binary forms, with or without 
8  *   modification, are permitted provided that the following conditions 
9  *   are met:
10  * 
11  *     * Redistributions of source code must retain the above copyright 
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright 
14  *       notice, this list of conditions and the following disclaimer in 
15  *       the documentation and/or other materials provided with the 
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its 
18  *       contributors may be used to endorse or promote products derived 
19  *       from this software without specific prior written permission.
20  * 
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  * 
33  */
34
35 #include <string.h>
36 #include <stdarg.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <stdint.h>
40 #include <inttypes.h>
41 #include <errno.h>
42 #include <sys/queue.h>
43
44 #include <rte_common.h>
45 #include <rte_log.h>
46 #include <rte_memory.h>
47 #include <rte_memzone.h>
48 #include <rte_launch.h>
49 #include <rte_cycles.h>
50 #include <rte_tailq.h>
51 #include <rte_eal.h>
52 #include <rte_per_lcore.h>
53 #include <rte_lcore.h>
54 #include <rte_atomic.h>
55 #include <rte_branch_prediction.h>
56 #include <rte_malloc.h>
57 #include <rte_ring.h>
58 #include <rte_random.h>
59 #include <rte_common.h>
60 #include <rte_errno.h>
61
62 #include <cmdline_parse.h>
63
64 #include "test.h"
65
66 /*
67  * Ring
68  * ====
69  *
70  * #. Basic tests: done on one core:
71  *
72  *    - Using single producer/single consumer functions:
73  *
74  *      - Enqueue one object, two objects, MAX_BULK objects
75  *      - Dequeue one object, two objects, MAX_BULK objects
76  *      - Check that dequeued pointers are correct
77  *
78  *    - Using multi producers/multi consumers functions:
79  *
80  *      - Enqueue one object, two objects, MAX_BULK objects
81  *      - Dequeue one object, two objects, MAX_BULK objects
82  *      - Check that dequeued pointers are correct
83  *
84  *    - Test watermark and default bulk enqueue/dequeue:
85  *
86  *      - Set watermark
87  *      - Set default bulk value
88  *      - Enqueue objects, check that -EDQUOT is returned when
89  *        watermark is exceeded
90  *      - Check that dequeued pointers are correct
91  *
92  * #. Check live watermark change
93  *
94  *    - Start a loop on another lcore that will enqueue and dequeue
95  *      objects in a ring. It will monitor the value of watermark.
96  *    - At the same time, change the watermark on the master lcore.
97  *    - The slave lcore will check that watermark changes from 16 to 32.
98  *
99  * #. Performance tests.
100  *
101  *    This test is done on the following configurations:
102  *
103  *    - One core enqueuing, one core dequeuing
104  *    - One core enqueuing, other cores dequeuing
105  *    - One core dequeuing, other cores enqueuing
106  *    - Half of the cores enqueuing, the other half dequeuing
107  *
108  *    When only one core enqueues/dequeues, the test is done with the
109  *    SP/SC functions in addition to the MP/MC functions.
110  *
111  *    The test is done with different bulk size.
112  *
113  *    On each core, the test enqueues or dequeues objects during
114  *    TIME_S seconds. The number of successes and failures are stored on
115  *    each core, then summed and displayed.
116  *
117  *    The test checks that the number of enqueues is equal to the
118  *    number of dequeues.
119  */
120
121 #define RING_SIZE 4096
122 #define MAX_BULK 32
123 #define N 65536
124 #define TIME_S 5
125
126 static rte_atomic32_t synchro;
127
128 static struct rte_ring *r;
129
130 struct test_stats {
131         unsigned enq_success ;
132         unsigned enq_quota;
133         unsigned enq_fail;
134
135         unsigned deq_success;
136         unsigned deq_fail;
137 } __rte_cache_aligned;
138
139 static struct test_stats test_stats[RTE_MAX_LCORE];
140
141 static int
142 ring_enqueue_test(int (que_func)(struct rte_ring*, void * const *, unsigned),
143                 void* arg, unsigned bulk_or_burst)
144 {
145         unsigned success = 0;
146         unsigned quota = 0;
147         unsigned fail = 0;
148         unsigned i;
149         unsigned long dummy_obj;
150         void *obj_table[MAX_BULK];
151         int ret;
152         unsigned lcore_id = rte_lcore_id();
153         unsigned count = *((unsigned*)arg);
154         uint64_t start_cycles, end_cycles;
155         uint64_t time_diff = 0, hz = rte_get_hpet_hz();
156
157         /* init dummy object table */
158         for (i = 0; i< MAX_BULK; i++) {
159                 dummy_obj = lcore_id + 0x1000 + i;
160                 obj_table[i] = (void *)dummy_obj;
161         }
162
163         /* wait synchro for slaves */
164         if (lcore_id != rte_get_master_lcore())
165                 while (rte_atomic32_read(&synchro) == 0);
166
167         start_cycles = rte_get_hpet_cycles();
168
169         /* enqueue as many object as possible */
170         while (time_diff/hz < TIME_S) {
171                 for (i = 0; likely(i < N); i++) {
172                         ret = que_func(r, obj_table, count);
173                         /*
174                          * bulk_or_burst
175                          *       1: for bulk operation
176                          *   0: for burst operation
177                          */
178                         if (bulk_or_burst) {
179                                 /* The *count* objects enqueued, unless fail */
180                                 if (ret == 0)
181                                         success += count;
182                                 else if (ret == -EDQUOT)
183                                         quota += count;
184                                 else
185                                         fail++;
186                         } else {
187                                 /* The actual objects enqueued */
188                                 if (ret != 0)
189                                         success += (ret & RTE_RING_SZ_MASK);
190                                 else
191                                         fail++;
192                         }
193                 }
194                 end_cycles = rte_get_hpet_cycles();
195                 time_diff = end_cycles - start_cycles;
196         }
197
198         /* write statistics in a shared structure */
199         test_stats[lcore_id].enq_success = success;
200         test_stats[lcore_id].enq_quota = quota;
201         test_stats[lcore_id].enq_fail = fail;
202
203         return 0;
204 }
205
206 static int
207 ring_dequeue_test(int (que_func)(struct rte_ring*, void **, unsigned),
208                 void* arg, unsigned bulk_or_burst)
209 {
210         unsigned success = 0;
211         unsigned fail = 0;
212         unsigned i;
213         void *obj_table[MAX_BULK];
214         int ret;
215         unsigned lcore_id = rte_lcore_id();
216         unsigned count = *((unsigned*)arg);
217         uint64_t start_cycles, end_cycles;
218         uint64_t time_diff = 0, hz = rte_get_hpet_hz();
219
220         /* wait synchro for slaves */
221         if (lcore_id != rte_get_master_lcore())
222                 while (rte_atomic32_read(&synchro) == 0);
223
224         start_cycles = rte_get_hpet_cycles();
225
226         /* dequeue as many object as possible */
227         while (time_diff/hz < TIME_S) {
228                 for (i = 0; likely(i < N); i++) {
229                         ret = que_func(r, obj_table, count);
230                         /*
231                          * bulk_or_burst
232                          *       1: for bulk operation
233                          *   0: for burst operation
234                          */
235                         if (bulk_or_burst) {
236                                 if (ret == 0)
237                                         success += count;
238                                 else
239                                         fail++;
240                         } else {
241                                 if (ret != 0)
242                                         success += ret;
243                                 else
244                                         fail++;
245                         }
246                 }
247                 end_cycles = rte_get_hpet_cycles();
248                 time_diff = end_cycles - start_cycles;
249         }
250
251         /* write statistics in a shared structure */
252         test_stats[lcore_id].deq_success = success;
253         test_stats[lcore_id].deq_fail = fail;
254
255         return 0;
256 }
257
258 static int
259 test_ring_per_core_sp_enqueue(void *arg)
260 {
261         return ring_enqueue_test(&rte_ring_sp_enqueue_bulk, arg, 1);
262 }
263
264 static int
265 test_ring_per_core_mp_enqueue(void *arg)
266 {
267         return ring_enqueue_test(&rte_ring_mp_enqueue_bulk, arg, 1);
268 }
269
270 static int
271 test_ring_per_core_mc_dequeue(void *arg)
272 {
273         return ring_dequeue_test(&rte_ring_mc_dequeue_bulk, arg, 1);
274 }
275
276 static int
277 test_ring_per_core_sc_dequeue(void *arg)
278 {
279         return ring_dequeue_test(&rte_ring_sc_dequeue_bulk, arg, 1);
280 }
281
282 static int
283 test_ring_per_core_sp_enqueue_burst(void *arg)
284 {
285         return ring_enqueue_test(&rte_ring_sp_enqueue_burst, arg, 0);
286 }
287
288 static int
289 test_ring_per_core_mp_enqueue_burst(void *arg)
290 {
291         return ring_enqueue_test(&rte_ring_mp_enqueue_burst, arg, 0);
292 }
293
294 static int
295 test_ring_per_core_mc_dequeue_burst(void *arg)
296 {
297         return ring_dequeue_test(&rte_ring_mc_dequeue_burst, arg, 0);
298 }
299
300 static int
301 test_ring_per_core_sc_dequeue_burst(void *arg)
302 {
303         return ring_dequeue_test(&rte_ring_sc_dequeue_burst, arg, 0);
304 }
305
306 #define TEST_RING_VERIFY(exp)                                           \
307         if (!(exp)) {                                                   \
308                 printf("error at %s:%d\tcondition " #exp " failed\n",   \
309                     __func__, __LINE__);                                \
310                 rte_ring_dump(r);                                       \
311                 return (-1);                                            \
312         }
313
314 #define TEST_RING_FULL_EMTPY_ITER       8
315
316
317 static int
318 launch_cores(unsigned enq_core_count, unsigned deq_core_count,
319                 unsigned n_enq_bulk, unsigned n_deq_bulk,
320                 int sp, int sc, int bulk_not_burst)
321 {
322         void *obj;
323         unsigned lcore_id;
324         unsigned rate, deq_remain = 0;
325         unsigned enq_total, deq_total;
326         struct test_stats sum;
327         int (*enq_f)(void *);
328         int (*deq_f)(void *);
329         unsigned cores = enq_core_count + deq_core_count;
330         int ret;
331
332         rte_atomic32_set(&synchro, 0);
333
334         printf("ring_autotest e/d_core=%u,%u e/d_bulk=%u,%u ",
335                enq_core_count, deq_core_count, n_enq_bulk, n_deq_bulk);
336         printf("sp=%d sc=%d ", sp, sc);
337
338         if (bulk_not_burst) {
339                 /* set enqueue function to be used */
340                 if (sp)
341                         enq_f = test_ring_per_core_sp_enqueue;
342                 else
343                         enq_f = test_ring_per_core_mp_enqueue;
344
345                 /* set dequeue function to be used */
346                 if (sc)
347                         deq_f = test_ring_per_core_sc_dequeue;
348                 else
349                         deq_f = test_ring_per_core_mc_dequeue;
350
351         } else {
352                 /* set enqueue function to be used */
353                 if (sp)
354                         enq_f = test_ring_per_core_sp_enqueue_burst;
355                 else
356                         enq_f = test_ring_per_core_mp_enqueue_burst;
357
358                 /* set dequeue function to be used */
359                 if (sc)
360                         deq_f = test_ring_per_core_sc_dequeue_burst;
361                 else
362                         deq_f = test_ring_per_core_mc_dequeue_burst;
363         }
364
365         RTE_LCORE_FOREACH_SLAVE(lcore_id) {
366                 if (enq_core_count != 0) {
367                         enq_core_count--;
368                         rte_eal_remote_launch(enq_f, &n_enq_bulk, lcore_id);
369                 }
370                 if (deq_core_count != 1) {
371                         deq_core_count--;
372                         rte_eal_remote_launch(deq_f, &n_deq_bulk, lcore_id);
373                 }
374         }
375
376         memset(test_stats, 0, sizeof(test_stats));
377
378         /* start synchro and launch test on master */
379         rte_atomic32_set(&synchro, 1);
380         ret = deq_f(&n_deq_bulk);
381
382         /* wait all cores */
383         RTE_LCORE_FOREACH_SLAVE(lcore_id) {
384                 if (cores == 1)
385                         break;
386                 cores--;
387                 if (rte_eal_wait_lcore(lcore_id) < 0)
388                         ret = -1;
389         }
390
391         memset(&sum, 0, sizeof(sum));
392         for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
393                 sum.enq_success += test_stats[lcore_id].enq_success;
394                 sum.enq_quota += test_stats[lcore_id].enq_quota;
395                 sum.enq_fail += test_stats[lcore_id].enq_fail;
396                 sum.deq_success += test_stats[lcore_id].deq_success;
397                 sum.deq_fail += test_stats[lcore_id].deq_fail;
398         }
399
400         /* empty the ring */
401         while (rte_ring_sc_dequeue(r, &obj) == 0)
402                 deq_remain += 1;
403
404         if (ret < 0) {
405                 printf("per-lcore test returned -1\n");
406                 return -1;
407         }
408
409         enq_total = sum.enq_success + sum.enq_quota;
410         deq_total = sum.deq_success + deq_remain;
411
412         rate = deq_total/TIME_S;
413
414         printf("rate_persec=%u\n", rate);
415
416         if (enq_total != deq_total) {
417                 printf("invalid enq/deq_success counter: %u %u\n",
418                        enq_total, deq_total);
419                 return -1;
420         }
421
422         return 0;
423 }
424
425 static int
426 do_one_ring_test2(unsigned enq_core_count, unsigned deq_core_count,
427                   unsigned n_enq_bulk, unsigned n_deq_bulk, unsigned bulk_or_burst)
428 {
429         int sp, sc;
430         int do_sp, do_sc;
431         int ret;
432
433         do_sp = (enq_core_count == 1) ? 1 : 0;
434         do_sc = (deq_core_count  == 1) ? 1 : 0;
435
436         for (sp = 0; sp <= do_sp; sp ++) {
437                 for (sc = 0; sc <= do_sc; sc ++) {
438                         ret = launch_cores(enq_core_count, deq_core_count,
439                                         n_enq_bulk, n_deq_bulk, sp, sc, bulk_or_burst);
440                         if (ret < 0)
441                                 return -1;
442                 }
443         }
444         return 0;
445 }
446
447 static int
448 do_one_ring_test(unsigned enq_core_count, unsigned deq_core_count,
449                 unsigned bulk_or_burst)
450 {
451         unsigned bulk_enqueue_tab[] = { 1, 2, 4, 32, 0 };
452         unsigned bulk_dequeue_tab[] = { 1, 2, 4, 32, 0 };
453         unsigned *bulk_enqueue_ptr;
454         unsigned *bulk_dequeue_ptr;
455         int ret;
456
457         for (bulk_enqueue_ptr = bulk_enqueue_tab;
458              *bulk_enqueue_ptr;
459              bulk_enqueue_ptr++) {
460
461                 for (bulk_dequeue_ptr = bulk_dequeue_tab;
462                      *bulk_dequeue_ptr;
463                      bulk_dequeue_ptr++) {
464
465                         ret = do_one_ring_test2(enq_core_count, deq_core_count,
466                                                 *bulk_enqueue_ptr,
467                                                 *bulk_dequeue_ptr,
468                                                 bulk_or_burst);
469                         if (ret < 0)
470                                 return -1;
471                 }
472         }
473         return 0;
474 }
475
476 static int
477 check_live_watermark_change(__attribute__((unused)) void *dummy)
478 {
479         uint64_t hz = rte_get_hpet_hz();
480         void *obj_table[MAX_BULK];
481         unsigned watermark, watermark_old = 16;
482         uint64_t cur_time, end_time;
483         int64_t diff = 0;
484         int i, ret;
485         unsigned count = 4;
486
487         /* init the object table */
488         memset(obj_table, 0, sizeof(obj_table));
489         end_time = rte_get_hpet_cycles() + (hz * 2);
490
491         /* check that bulk and watermark are 4 and 32 (respectively) */
492         while (diff >= 0) {
493
494                 /* add in ring until we reach watermark */
495                 ret = 0;
496                 for (i = 0; i < 16; i ++) {
497                         if (ret != 0)
498                                 break;
499                         ret = rte_ring_enqueue_bulk(r, obj_table, count);
500                 }
501
502                 if (ret != -EDQUOT) {
503                         printf("Cannot enqueue objects, or watermark not "
504                                "reached (ret=%d)\n", ret);
505                         return -1;
506                 }
507
508                 /* read watermark, the only change allowed is from 16 to 32 */
509                 watermark = r->prod.watermark;
510                 if (watermark != watermark_old &&
511                     (watermark_old != 16 || watermark != 32)) {
512                         printf("Bad watermark change %u -> %u\n", watermark_old,
513                                watermark);
514                         return -1;
515                 }
516                 watermark_old = watermark;
517
518                 /* dequeue objects from ring */
519                 while (i--) {
520                         ret = rte_ring_dequeue_bulk(r, obj_table, count);
521                         if (ret != 0) {
522                                 printf("Cannot dequeue (ret=%d)\n", ret);
523                                 return -1;
524                         }
525                 }
526
527                 cur_time = rte_get_hpet_cycles();
528                 diff = end_time - cur_time;
529         }
530
531         if (watermark_old != 32 ) {
532                 printf(" watermark was not updated (wm=%u)\n",
533                        watermark_old);
534                 return -1;
535         }
536
537         return 0;
538 }
539
540 static int
541 test_live_watermark_change(void)
542 {
543         unsigned lcore_id = rte_lcore_id();
544         unsigned lcore_id2 = rte_get_next_lcore(lcore_id, 0, 1);
545
546         printf("Test watermark live modification\n");
547         rte_ring_set_water_mark(r, 16);
548
549         /* launch a thread that will enqueue and dequeue, checking
550          * watermark and quota */
551         rte_eal_remote_launch(check_live_watermark_change, NULL, lcore_id2);
552
553         rte_delay_ms(1000);
554         rte_ring_set_water_mark(r, 32);
555         rte_delay_ms(1000);
556
557         if (rte_eal_wait_lcore(lcore_id2) < 0)
558                 return -1;
559
560         return 0;
561 }
562
563 /* Test for catch on invalid watermark values */
564 static int
565 test_set_watermark( void ){
566         unsigned count;
567         int setwm;
568
569         struct rte_ring *r = rte_ring_lookup("test_ring_basic_ex");
570         if(r == NULL){
571                 printf( " ring lookup failed\n" );
572                 goto error;
573         }
574         count = r->prod.size*2;
575         setwm = rte_ring_set_water_mark(r, count);
576         if (setwm != -EINVAL){
577                 printf("Test failed to detect invalid watermark count value\n");
578                 goto error;
579         }
580
581         count = 0;
582         setwm = rte_ring_set_water_mark(r, count);
583         if (r->prod.watermark != r->prod.size) {
584                 printf("Test failed to detect invalid watermark count value\n");
585                 goto error;
586         }
587         return 0;
588
589 error:
590         return -1;
591 }
592
593 /*
594  * helper routine for test_ring_basic
595  */
596 static int
597 test_ring_basic_full_empty(void * const src[], void *dst[])
598 {
599         unsigned i, rand;
600         const unsigned rsz = RING_SIZE - 1;
601
602         printf("Basic full/empty test\n");
603
604         for (i = 0; TEST_RING_FULL_EMTPY_ITER != i; i++) {
605
606                 /* random shift in the ring */
607                 rand = RTE_MAX(rte_rand() % RING_SIZE, 1UL);
608                 printf("%s: iteration %u, random shift: %u;\n",
609                     __func__, i, rand);
610                 TEST_RING_VERIFY(-ENOBUFS != rte_ring_enqueue_bulk(r, src,
611                     rand));
612                 TEST_RING_VERIFY(0 == rte_ring_dequeue_bulk(r, dst, rand));
613
614                 /* fill the ring */
615                 TEST_RING_VERIFY(-ENOBUFS != rte_ring_enqueue_bulk(r, src,
616                     rsz));
617                 TEST_RING_VERIFY(0 == rte_ring_free_count(r));
618                 TEST_RING_VERIFY(rsz == rte_ring_count(r));
619                 TEST_RING_VERIFY(rte_ring_full(r));
620                 TEST_RING_VERIFY(0 == rte_ring_empty(r));
621
622                 /* empty the ring */
623                 TEST_RING_VERIFY(0 == rte_ring_dequeue_bulk(r, dst, rsz));
624                 TEST_RING_VERIFY(rsz == rte_ring_free_count(r));
625                 TEST_RING_VERIFY(0 == rte_ring_count(r));
626                 TEST_RING_VERIFY(0 == rte_ring_full(r));
627                 TEST_RING_VERIFY(rte_ring_empty(r));
628
629                 /* check data */
630                 TEST_RING_VERIFY(0 == memcmp(src, dst, rsz));
631                 rte_ring_dump(r);
632         }
633         return (0);
634 }
635
636 static int
637 test_ring_basic(void)
638 {
639         void **src = NULL, **cur_src = NULL, **dst = NULL, **cur_dst = NULL;
640         int ret;
641         unsigned i, num_elems;
642
643         /* alloc dummy object pointers */
644         src = malloc(RING_SIZE*2*sizeof(void *));
645         if (src == NULL)
646                 goto fail;
647
648         for (i = 0; i < RING_SIZE*2 ; i++) {
649                 src[i] = (void *)(unsigned long)i;
650         }
651         cur_src = src;
652
653         /* alloc some room for copied objects */
654         dst = malloc(RING_SIZE*2*sizeof(void *));
655         if (dst == NULL)
656                 goto fail;
657
658         memset(dst, 0, RING_SIZE*2*sizeof(void *));
659         cur_dst = dst;
660
661         printf("enqueue 1 obj\n");
662         ret = rte_ring_sp_enqueue_bulk(r, cur_src, 1);
663         cur_src += 1;
664         if (ret != 0)
665                 goto fail;
666
667         printf("enqueue 2 objs\n");
668         ret = rte_ring_sp_enqueue_bulk(r, cur_src, 2);
669         cur_src += 2;
670         if (ret != 0)
671                 goto fail;
672
673         printf("enqueue MAX_BULK objs\n");
674         ret = rte_ring_sp_enqueue_bulk(r, cur_src, MAX_BULK);
675         cur_src += MAX_BULK;
676         if (ret != 0)
677                 goto fail;
678
679         printf("dequeue 1 obj\n");
680         ret = rte_ring_sc_dequeue_bulk(r, cur_dst, 1);
681         cur_dst += 1;
682         if (ret != 0)
683                 goto fail;
684
685         printf("dequeue 2 objs\n");
686         ret = rte_ring_sc_dequeue_bulk(r, cur_dst, 2);
687         cur_dst += 2;
688         if (ret != 0)
689                 goto fail;
690
691         printf("dequeue MAX_BULK objs\n");
692         ret = rte_ring_sc_dequeue_bulk(r, cur_dst, MAX_BULK);
693         cur_dst += MAX_BULK;
694         if (ret != 0)
695                 goto fail;
696
697         /* check data */
698         if (memcmp(src, dst, cur_dst - dst)) {
699                 test_hexdump("src", src, cur_src - src);
700                 test_hexdump("dst", dst, cur_dst - dst);
701                 printf("data after dequeue is not the same\n");
702                 goto fail;
703         }
704         cur_src = src;
705         cur_dst = dst;
706
707         printf("enqueue 1 obj\n");
708         ret = rte_ring_mp_enqueue_bulk(r, cur_src, 1);
709         cur_src += 1;
710         if (ret != 0)
711                 goto fail;
712
713         printf("enqueue 2 objs\n");
714         ret = rte_ring_mp_enqueue_bulk(r, cur_src, 2);
715         cur_src += 2;
716         if (ret != 0)
717                 goto fail;
718
719         printf("enqueue MAX_BULK objs\n");
720         ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK);
721         cur_src += MAX_BULK;
722         if (ret != 0)
723                 goto fail;
724
725         printf("dequeue 1 obj\n");
726         ret = rte_ring_mc_dequeue_bulk(r, cur_dst, 1);
727         cur_dst += 1;
728         if (ret != 0)
729                 goto fail;
730
731         printf("dequeue 2 objs\n");
732         ret = rte_ring_mc_dequeue_bulk(r, cur_dst, 2);
733         cur_dst += 2;
734         if (ret != 0)
735                 goto fail;
736
737         printf("dequeue MAX_BULK objs\n");
738         ret = rte_ring_mc_dequeue_bulk(r, cur_dst, MAX_BULK);
739         cur_dst += MAX_BULK;
740         if (ret != 0)
741                 goto fail;
742
743         /* check data */
744         if (memcmp(src, dst, cur_dst - dst)) {
745                 test_hexdump("src", src, cur_src - src);
746                 test_hexdump("dst", dst, cur_dst - dst);
747                 printf("data after dequeue is not the same\n");
748                 goto fail;
749         }
750         cur_src = src;
751         cur_dst = dst;
752
753         printf("fill and empty the ring\n");
754         for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
755                 ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK);
756                 cur_src += MAX_BULK;
757                 if (ret != 0)
758                         goto fail;
759                 ret = rte_ring_mc_dequeue_bulk(r, cur_dst, MAX_BULK);
760                 cur_dst += MAX_BULK;
761                 if (ret != 0)
762                         goto fail;
763         }
764
765         /* check data */
766         if (memcmp(src, dst, cur_dst - dst)) {
767                 test_hexdump("src", src, cur_src - src);
768                 test_hexdump("dst", dst, cur_dst - dst);
769                 printf("data after dequeue is not the same\n");
770                 goto fail;
771         }
772
773         if (test_ring_basic_full_empty(src, dst) != 0)
774                 goto fail;
775
776         cur_src = src;
777         cur_dst = dst;
778
779         printf("test watermark and default bulk enqueue / dequeue\n");
780         rte_ring_set_water_mark(r, 20);
781         num_elems = 16;
782
783         cur_src = src;
784         cur_dst = dst;
785
786         ret = rte_ring_enqueue_bulk(r, cur_src, num_elems);
787         cur_src += num_elems;
788         if (ret != 0) {
789                 printf("Cannot enqueue\n");
790                 goto fail;
791         }
792         ret = rte_ring_enqueue_bulk(r, cur_src, num_elems);
793         cur_src += num_elems;
794         if (ret != -EDQUOT) {
795                 printf("Watermark not exceeded\n");
796                 goto fail;
797         }
798         ret = rte_ring_dequeue_bulk(r, cur_dst, num_elems);
799         cur_dst += num_elems;
800         if (ret != 0) {
801                 printf("Cannot dequeue\n");
802                 goto fail;
803         }
804         ret = rte_ring_dequeue_bulk(r, cur_dst, num_elems);
805         cur_dst += num_elems;
806         if (ret != 0) {
807                 printf("Cannot dequeue2\n");
808                 goto fail;
809         }
810
811         /* check data */
812         if (memcmp(src, dst, cur_dst - dst)) {
813                 test_hexdump("src", src, cur_src - src);
814                 test_hexdump("dst", dst, cur_dst - dst);
815                 printf("data after dequeue is not the same\n");
816                 goto fail;
817         }
818
819         cur_src = src;
820         cur_dst = dst;
821
822         ret = rte_ring_mp_enqueue(r, cur_src);
823         cur_src += 1;
824         if (ret != 0)
825                 goto fail;
826
827         ret = rte_ring_mc_dequeue(r, cur_dst);
828         cur_dst += 1;
829         if (ret != 0)
830                 goto fail;
831
832         cur_src = src;
833         cur_dst = dst;
834
835         if (src)
836                 free(src);
837         if (dst)
838                 free(dst);
839         return 0;
840
841  fail:
842         if (src)
843                 free(src);
844         if (dst)
845                 free(dst);
846         return -1;
847 }
848
849 static int
850 test_ring_burst_basic(void)
851 {
852         void **src = NULL, **cur_src = NULL, **dst = NULL, **cur_dst = NULL;
853         int ret;
854         unsigned i;
855
856         /* alloc dummy object pointers */
857         src = malloc(RING_SIZE*2*sizeof(void *));
858         if (src == NULL)
859                 goto fail;
860
861         for (i = 0; i < RING_SIZE*2 ; i++) {
862                 src[i] = (void *)(unsigned long)i;
863         }
864         cur_src = src;
865
866         /* alloc some room for copied objects */
867         dst = malloc(RING_SIZE*2*sizeof(void *));
868         if (dst == NULL)
869                 goto fail;
870
871         memset(dst, 0, RING_SIZE*2*sizeof(void *));
872         cur_dst = dst;
873
874         printf("Test SP & SC basic functions \n");
875         printf("enqueue 1 obj\n");
876         ret = rte_ring_sp_enqueue_burst(r, cur_src, 1);
877         cur_src += 1;
878         if ((ret & RTE_RING_SZ_MASK) != 1)
879                 goto fail;
880
881         printf("enqueue 2 objs\n");
882         ret = rte_ring_sp_enqueue_burst(r, cur_src, 2);
883         cur_src += 2;
884         if ((ret & RTE_RING_SZ_MASK) != 2)
885                 goto fail;
886
887         printf("enqueue MAX_BULK objs\n");
888         ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK) ;
889         cur_src += MAX_BULK;
890         if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
891                 goto fail;
892
893         printf("dequeue 1 obj\n");
894         ret = rte_ring_sc_dequeue_burst(r, cur_dst, 1) ;
895         cur_dst += 1;
896         if ((ret & RTE_RING_SZ_MASK) != 1)
897                 goto fail;
898
899         printf("dequeue 2 objs\n");
900         ret = rte_ring_sc_dequeue_burst(r, cur_dst, 2);
901         cur_dst += 2;
902         if ((ret & RTE_RING_SZ_MASK) != 2)
903                 goto fail;
904
905         printf("dequeue MAX_BULK objs\n");
906         ret = rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
907         cur_dst += MAX_BULK;
908         if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
909                 goto fail;
910
911         /* check data */
912         if (memcmp(src, dst, cur_dst - dst)) {
913                 test_hexdump("src", src, cur_src - src);
914                 test_hexdump("dst", dst, cur_dst - dst);
915                 printf("data after dequeue is not the same\n");
916                 goto fail;
917         }
918
919         cur_src = src;
920         cur_dst = dst;
921
922         printf("Test enqueue without enough memory space \n");
923         for (i = 0; i< (RING_SIZE/MAX_BULK - 1); i++) {
924                 ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
925                 cur_src += MAX_BULK;
926                 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK) {
927                         goto fail;
928                 }
929         }
930
931         printf("Enqueue 2 objects, free entries = MAX_BULK - 2  \n");
932         ret = rte_ring_sp_enqueue_burst(r, cur_src, 2);
933         cur_src += 2;
934         if ((ret & RTE_RING_SZ_MASK) != 2)
935                 goto fail;
936
937         printf("Enqueue the remaining entries = MAX_BULK - 2  \n");
938         /* Always one free entry left */
939         ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
940         cur_src += MAX_BULK - 3;
941         if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
942                 goto fail;
943
944         printf("Test if ring is full  \n");
945         if (rte_ring_full(r) != 1)
946                 goto fail;
947
948         printf("Test enqueue for a full entry  \n");
949         ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
950         if ((ret & RTE_RING_SZ_MASK) != 0)
951                 goto fail;
952
953         printf("Test dequeue without enough objects \n");
954         for (i = 0; i<RING_SIZE/MAX_BULK - 1; i++) {
955                 ret = rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
956                 cur_dst += MAX_BULK;
957                 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
958                         goto fail;
959         }
960
961         /* Available memory space for the exact MAX_BULK entries */
962         ret = rte_ring_sc_dequeue_burst(r, cur_dst, 2);
963         cur_dst += 2;
964         if ((ret & RTE_RING_SZ_MASK) != 2)
965                 goto fail;
966
967         ret = rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
968         cur_dst += MAX_BULK - 3;
969         if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
970                 goto fail;
971
972         printf("Test if ring is empty \n");
973         /* Check if ring is empty */
974         if (1 != rte_ring_empty(r))
975                 goto fail;
976
977         /* check data */
978         if (memcmp(src, dst, cur_dst - dst)) {
979                 test_hexdump("src", src, cur_src - src);
980                 test_hexdump("dst", dst, cur_dst - dst);
981                 printf("data after dequeue is not the same\n");
982                 goto fail;
983         }
984
985         cur_src = src;
986         cur_dst = dst;
987
988         printf("Test MP & MC basic functions \n");
989
990         printf("enqueue 1 obj\n");
991         ret = rte_ring_mp_enqueue_burst(r, cur_src, 1);
992         cur_src += 1;
993         if ((ret & RTE_RING_SZ_MASK) != 1)
994                 goto fail;
995
996         printf("enqueue 2 objs\n");
997         ret = rte_ring_mp_enqueue_burst(r, cur_src, 2);
998         cur_src += 2;
999         if ((ret & RTE_RING_SZ_MASK) != 2)
1000                 goto fail;
1001
1002         printf("enqueue MAX_BULK objs\n");
1003         ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
1004         cur_src += MAX_BULK;
1005         if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1006                 goto fail;
1007
1008         printf("dequeue 1 obj\n");
1009         ret = rte_ring_mc_dequeue_burst(r, cur_dst, 1);
1010         cur_dst += 1;
1011         if ((ret & RTE_RING_SZ_MASK) != 1)
1012                 goto fail;
1013
1014         printf("dequeue 2 objs\n");
1015         ret = rte_ring_mc_dequeue_burst(r, cur_dst, 2);
1016         cur_dst += 2;
1017         if ((ret & RTE_RING_SZ_MASK) != 2)
1018                 goto fail;
1019
1020         printf("dequeue MAX_BULK objs\n");
1021         ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK);
1022         cur_dst += MAX_BULK;
1023         if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1024                 goto fail;
1025
1026         /* check data */
1027         if (memcmp(src, dst, cur_dst - dst)) {
1028                 test_hexdump("src", src, cur_src - src);
1029                 test_hexdump("dst", dst, cur_dst - dst);
1030                 printf("data after dequeue is not the same\n");
1031                 goto fail;
1032         }
1033
1034         cur_src = src;
1035         cur_dst = dst;
1036
1037         printf("fill and empty the ring\n");
1038         for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
1039                 ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
1040                 cur_src += MAX_BULK;
1041                 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1042                         goto fail;
1043                 ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK);
1044                 cur_dst += MAX_BULK;
1045                 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1046                         goto fail;
1047         }
1048
1049         /* check data */
1050         if (memcmp(src, dst, cur_dst - dst)) {
1051                 test_hexdump("src", src, cur_src - src);
1052                 test_hexdump("dst", dst, cur_dst - dst);
1053                 printf("data after dequeue is not the same\n");
1054                 goto fail;
1055         }
1056
1057         cur_src = src;
1058         cur_dst = dst;
1059
1060         printf("Test enqueue without enough memory space \n");
1061         for (i = 0; i<RING_SIZE/MAX_BULK - 1; i++) {
1062                 ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
1063                 cur_src += MAX_BULK;
1064                 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1065                         goto fail;
1066         }
1067
1068         /* Available memory space for the exact MAX_BULK objects */
1069         ret = rte_ring_mp_enqueue_burst(r, cur_src, 2);
1070         cur_src += 2;
1071         if ((ret & RTE_RING_SZ_MASK) != 2)
1072                 goto fail;
1073
1074         ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
1075         cur_src += MAX_BULK - 3;
1076         if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
1077                 goto fail;
1078
1079
1080         printf("Test dequeue without enough objects \n");
1081         for (i = 0; i<RING_SIZE/MAX_BULK - 1; i++) {
1082                 ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK);
1083                 cur_dst += MAX_BULK;
1084                 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1085                         goto fail;
1086         }
1087
1088         /* Available objects - the exact MAX_BULK */
1089         ret = rte_ring_mc_dequeue_burst(r, cur_dst, 2);
1090         cur_dst += 2;
1091         if ((ret & RTE_RING_SZ_MASK) != 2)
1092                 goto fail;
1093
1094         ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK);
1095         cur_dst += MAX_BULK - 3;
1096         if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
1097                 goto fail;
1098
1099         /* check data */
1100         if (memcmp(src, dst, cur_dst - dst)) {
1101                 test_hexdump("src", src, cur_src - src);
1102                 test_hexdump("dst", dst, cur_dst - dst);
1103                 printf("data after dequeue is not the same\n");
1104                 goto fail;
1105         }
1106
1107         cur_src = src;
1108         cur_dst = dst;
1109
1110         printf("Covering rte_ring_enqueue_burst functions \n");
1111
1112         ret = rte_ring_enqueue_burst(r, cur_src, 2);
1113         cur_src += 2;
1114         if ((ret & RTE_RING_SZ_MASK) != 2)
1115                 goto fail;
1116
1117         ret = rte_ring_dequeue_burst(r, cur_dst, 2);
1118         cur_dst += 2;
1119         if (ret != 2)
1120                 goto fail;
1121
1122         /* Free memory before test completed */
1123         if (src)
1124                 free(src);
1125         if (dst)
1126                 free(dst);
1127         return 0;
1128
1129  fail:
1130         if (src)
1131                 free(src);
1132         if (dst)
1133                 free(dst);
1134         return -1;
1135 }
1136
1137 static int
1138 test_ring_stats(void)
1139 {
1140
1141 #ifndef RTE_LIBRTE_RING_DEBUG
1142         printf("Enable RTE_LIBRTE_RING_DEBUG to test ring stats.\n");
1143         return 0;
1144 #else
1145         void **src = NULL, **cur_src = NULL, **dst = NULL, **cur_dst = NULL;
1146         int ret;
1147         unsigned i;
1148         unsigned num_items            = 0;
1149         unsigned failed_enqueue_ops   = 0;
1150         unsigned failed_enqueue_items = 0;
1151         unsigned failed_dequeue_ops   = 0;
1152         unsigned failed_dequeue_items = 0;
1153         unsigned last_enqueue_ops     = 0;
1154         unsigned last_enqueue_items   = 0;
1155         unsigned last_quota_ops       = 0;
1156         unsigned last_quota_items     = 0;
1157         unsigned lcore_id = rte_lcore_id();
1158         struct rte_ring_debug_stats *ring_stats = &r->stats[lcore_id];
1159
1160         printf("Test the ring stats.\n");
1161
1162         /* Reset the watermark in case it was set in another test. */
1163         rte_ring_set_water_mark(r, 0);
1164
1165         /* Reset the ring stats. */
1166         memset(&r->stats[lcore_id], 0, sizeof(r->stats[lcore_id]));
1167
1168         /* Allocate some dummy object pointers. */
1169         src = malloc(RING_SIZE*2*sizeof(void *));
1170         if (src == NULL)
1171                 goto fail;
1172
1173         for (i = 0; i < RING_SIZE*2 ; i++) {
1174                 src[i] = (void *)(unsigned long)i;
1175         }
1176
1177         /* Allocate some memory for copied objects. */
1178         dst = malloc(RING_SIZE*2*sizeof(void *));
1179         if (dst == NULL)
1180                 goto fail;
1181
1182         memset(dst, 0, RING_SIZE*2*sizeof(void *));
1183
1184         /* Set the head and tail pointers. */
1185         cur_src = src;
1186         cur_dst = dst;
1187
1188         /* Do Enqueue tests. */
1189         printf("Test the dequeue stats.\n");
1190
1191         /* Fill the ring up to RING_SIZE -1. */
1192         printf("Fill the ring.\n");
1193         for (i = 0; i< (RING_SIZE/MAX_BULK); i++) {
1194                 rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
1195                 cur_src += MAX_BULK;
1196         }
1197
1198         /* Adjust for final enqueue = MAX_BULK -1. */
1199         cur_src--;
1200
1201         printf("Verify that the ring is full.\n");
1202         if (rte_ring_full(r) != 1)
1203                 goto fail;
1204
1205
1206         printf("Verify the enqueue success stats.\n");
1207         /* Stats should match above enqueue operations to fill the ring. */
1208         if (ring_stats->enq_success_bulk != (RING_SIZE/MAX_BULK))
1209                 goto fail;
1210
1211         /* Current max objects is RING_SIZE -1. */
1212         if (ring_stats->enq_success_objs != RING_SIZE -1)
1213                 goto fail;
1214
1215         /* Shouldn't have any failures yet. */
1216         if (ring_stats->enq_fail_bulk != 0)
1217                 goto fail;
1218         if (ring_stats->enq_fail_objs != 0)
1219                 goto fail;
1220
1221
1222         printf("Test stats for SP burst enqueue to a full ring.\n");
1223         num_items = 2;
1224         ret = rte_ring_sp_enqueue_burst(r, cur_src, num_items);
1225         if ((ret & RTE_RING_SZ_MASK) != 0)
1226                 goto fail;
1227
1228         failed_enqueue_ops   += 1;
1229         failed_enqueue_items += num_items;
1230
1231         /* The enqueue should have failed. */
1232         if (ring_stats->enq_fail_bulk != failed_enqueue_ops)
1233                 goto fail;
1234         if (ring_stats->enq_fail_objs != failed_enqueue_items)
1235                 goto fail;
1236
1237
1238         printf("Test stats for SP bulk enqueue to a full ring.\n");
1239         num_items = 4;
1240         ret = rte_ring_sp_enqueue_bulk(r, cur_src, num_items);
1241         if (ret != -ENOBUFS)
1242                 goto fail;
1243
1244         failed_enqueue_ops   += 1;
1245         failed_enqueue_items += num_items;
1246
1247         /* The enqueue should have failed. */
1248         if (ring_stats->enq_fail_bulk != failed_enqueue_ops)
1249                 goto fail;
1250         if (ring_stats->enq_fail_objs != failed_enqueue_items)
1251                 goto fail;
1252
1253
1254         printf("Test stats for MP burst enqueue to a full ring.\n");
1255         num_items = 8;
1256         ret = rte_ring_mp_enqueue_burst(r, cur_src, num_items);
1257         if ((ret & RTE_RING_SZ_MASK) != 0)
1258                 goto fail;
1259
1260         failed_enqueue_ops   += 1;
1261         failed_enqueue_items += num_items;
1262
1263         /* The enqueue should have failed. */
1264         if (ring_stats->enq_fail_bulk != failed_enqueue_ops)
1265                 goto fail;
1266         if (ring_stats->enq_fail_objs != failed_enqueue_items)
1267                 goto fail;
1268
1269
1270         printf("Test stats for MP bulk enqueue to a full ring.\n");
1271         num_items = 16;
1272         ret = rte_ring_mp_enqueue_bulk(r, cur_src, num_items);
1273         if (ret != -ENOBUFS)
1274                 goto fail;
1275
1276         failed_enqueue_ops   += 1;
1277         failed_enqueue_items += num_items;
1278
1279         /* The enqueue should have failed. */
1280         if (ring_stats->enq_fail_bulk != failed_enqueue_ops)
1281                 goto fail;
1282         if (ring_stats->enq_fail_objs != failed_enqueue_items)
1283                 goto fail;
1284
1285
1286         /* Do Dequeue tests. */
1287         printf("Test the dequeue stats.\n");
1288
1289         printf("Empty the ring.\n");
1290         for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
1291                 rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
1292                 cur_dst += MAX_BULK;
1293         }
1294
1295         /* There was only RING_SIZE -1 objects to dequeue. */
1296         cur_dst++;
1297
1298         printf("Verify ring is empty.\n");
1299         if (1 != rte_ring_empty(r))
1300                 goto fail;
1301
1302         printf("Verify the dequeue success stats.\n");
1303         /* Stats should match above dequeue operations. */
1304         if (ring_stats->deq_success_bulk != (RING_SIZE/MAX_BULK))
1305                 goto fail;
1306
1307         /* Objects dequeued is RING_SIZE -1. */
1308         if (ring_stats->deq_success_objs != RING_SIZE -1)
1309                 goto fail;
1310
1311         /* Shouldn't have any dequeue failure stats yet. */
1312         if (ring_stats->deq_fail_bulk != 0)
1313                 goto fail;
1314
1315         printf("Test stats for SC burst dequeue with an empty ring.\n");
1316         num_items = 2;
1317         ret = rte_ring_sc_dequeue_burst(r, cur_dst, num_items);
1318         if ((ret & RTE_RING_SZ_MASK) != 0)
1319                 goto fail;
1320
1321         failed_dequeue_ops   += 1;
1322         failed_dequeue_items += num_items;
1323
1324         /* The dequeue should have failed. */
1325         if (ring_stats->deq_fail_bulk != failed_dequeue_ops)
1326                 goto fail;
1327         if (ring_stats->deq_fail_objs != failed_dequeue_items)
1328                 goto fail;
1329
1330
1331         printf("Test stats for SC bulk dequeue with an empty ring.\n");
1332         num_items = 4;
1333         ret = rte_ring_sc_dequeue_bulk(r, cur_dst, num_items);
1334         if (ret != -ENOENT)
1335                 goto fail;
1336
1337         failed_dequeue_ops   += 1;
1338         failed_dequeue_items += num_items;
1339
1340         /* The dequeue should have failed. */
1341         if (ring_stats->deq_fail_bulk != failed_dequeue_ops)
1342                 goto fail;
1343         if (ring_stats->deq_fail_objs != failed_dequeue_items)
1344                 goto fail;
1345
1346
1347         printf("Test stats for MC burst dequeue with an empty ring.\n");
1348         num_items = 8;
1349         ret = rte_ring_mc_dequeue_burst(r, cur_dst, num_items);
1350         if ((ret & RTE_RING_SZ_MASK) != 0)
1351                 goto fail;
1352         failed_dequeue_ops   += 1;
1353         failed_dequeue_items += num_items;
1354
1355         /* The dequeue should have failed. */
1356         if (ring_stats->deq_fail_bulk != failed_dequeue_ops)
1357                 goto fail;
1358         if (ring_stats->deq_fail_objs != failed_dequeue_items)
1359                 goto fail;
1360
1361
1362         printf("Test stats for MC bulk dequeue with an empty ring.\n");
1363         num_items = 16;
1364         ret = rte_ring_mc_dequeue_bulk(r, cur_dst, num_items);
1365         if (ret != -ENOENT)
1366                 goto fail;
1367
1368         failed_dequeue_ops   += 1;
1369         failed_dequeue_items += num_items;
1370
1371         /* The dequeue should have failed. */
1372         if (ring_stats->deq_fail_bulk != failed_dequeue_ops)
1373                 goto fail;
1374         if (ring_stats->deq_fail_objs != failed_dequeue_items)
1375                 goto fail;
1376
1377
1378         printf("Test total enqueue/dequeue stats.\n");
1379         /* At this point the enqueue and dequeue stats should be the same. */
1380         if (ring_stats->enq_success_bulk != ring_stats->deq_success_bulk)
1381                 goto fail;
1382         if (ring_stats->enq_success_objs != ring_stats->deq_success_objs)
1383                 goto fail;
1384         if (ring_stats->enq_fail_bulk    != ring_stats->deq_fail_bulk)
1385                 goto fail;
1386         if (ring_stats->enq_fail_objs    != ring_stats->deq_fail_objs)
1387                 goto fail;
1388
1389
1390         /* Watermark Tests. */
1391         printf("Test the watermark/quota stats.\n");
1392
1393         printf("Verify the initial watermark stats.\n");
1394         /* Watermark stats should be 0 since there is no watermark. */
1395         if (ring_stats->enq_quota_bulk != 0)
1396                 goto fail;
1397         if (ring_stats->enq_quota_objs != 0)
1398                 goto fail;
1399
1400         /* Set a watermark. */
1401         rte_ring_set_water_mark(r, 16);
1402
1403         /* Reset pointers. */
1404         cur_src = src;
1405         cur_dst = dst;
1406
1407         last_enqueue_ops   = ring_stats->enq_success_bulk;
1408         last_enqueue_items = ring_stats->enq_success_objs;
1409
1410
1411         printf("Test stats for SP burst enqueue below watermark.\n");
1412         num_items = 8;
1413         ret = rte_ring_sp_enqueue_burst(r, cur_src, num_items);
1414         if ((ret & RTE_RING_SZ_MASK) != num_items)
1415                 goto fail;
1416
1417         /* Watermark stats should still be 0. */
1418         if (ring_stats->enq_quota_bulk != 0)
1419                 goto fail;
1420         if (ring_stats->enq_quota_objs != 0)
1421                 goto fail;
1422
1423         /* Success stats should have increased. */
1424         if (ring_stats->enq_success_bulk != last_enqueue_ops + 1)
1425                 goto fail;
1426         if (ring_stats->enq_success_objs != last_enqueue_items + num_items)
1427                 goto fail;
1428
1429         last_enqueue_ops   = ring_stats->enq_success_bulk;
1430         last_enqueue_items = ring_stats->enq_success_objs;
1431
1432
1433         printf("Test stats for SP burst enqueue at watermark.\n");
1434         num_items = 8;
1435         ret = rte_ring_sp_enqueue_burst(r, cur_src, num_items);
1436         if ((ret & RTE_RING_SZ_MASK) != num_items)
1437                 goto fail;
1438
1439         /* Watermark stats should have changed. */
1440         if (ring_stats->enq_quota_bulk != 1)
1441                 goto fail;
1442         if (ring_stats->enq_quota_objs != num_items)
1443                 goto fail;
1444
1445         last_quota_ops   = ring_stats->enq_quota_bulk;
1446         last_quota_items = ring_stats->enq_quota_objs;
1447
1448
1449         printf("Test stats for SP burst enqueue above watermark.\n");
1450         num_items = 1;
1451         ret = rte_ring_sp_enqueue_burst(r, cur_src, num_items);
1452         if ((ret & RTE_RING_SZ_MASK) != num_items)
1453                 goto fail;
1454
1455         /* Watermark stats should have changed. */
1456         if (ring_stats->enq_quota_bulk != last_quota_ops +1)
1457                 goto fail;
1458         if (ring_stats->enq_quota_objs != last_quota_items + num_items)
1459                 goto fail;
1460
1461         last_quota_ops   = ring_stats->enq_quota_bulk;
1462         last_quota_items = ring_stats->enq_quota_objs;
1463
1464
1465         printf("Test stats for MP burst enqueue above watermark.\n");
1466         num_items = 2;
1467         ret = rte_ring_mp_enqueue_burst(r, cur_src, num_items);
1468         if ((ret & RTE_RING_SZ_MASK) != num_items)
1469                 goto fail;
1470
1471         /* Watermark stats should have changed. */
1472         if (ring_stats->enq_quota_bulk != last_quota_ops +1)
1473                 goto fail;
1474         if (ring_stats->enq_quota_objs != last_quota_items + num_items)
1475                 goto fail;
1476
1477         last_quota_ops   = ring_stats->enq_quota_bulk;
1478         last_quota_items = ring_stats->enq_quota_objs;
1479
1480
1481         printf("Test stats for SP bulk enqueue above watermark.\n");
1482         num_items = 4;
1483         ret = rte_ring_sp_enqueue_bulk(r, cur_src, num_items);
1484         if (ret != -EDQUOT)
1485                 goto fail;
1486
1487         /* Watermark stats should have changed. */
1488         if (ring_stats->enq_quota_bulk != last_quota_ops +1)
1489                 goto fail;
1490         if (ring_stats->enq_quota_objs != last_quota_items + num_items)
1491                 goto fail;
1492
1493         last_quota_ops   = ring_stats->enq_quota_bulk;
1494         last_quota_items = ring_stats->enq_quota_objs;
1495
1496
1497         printf("Test stats for MP bulk enqueue above watermark.\n");
1498         num_items = 8;
1499         ret = rte_ring_mp_enqueue_bulk(r, cur_src, num_items);
1500         if (ret != -EDQUOT)
1501                 goto fail;
1502
1503         /* Watermark stats should have changed. */
1504         if (ring_stats->enq_quota_bulk != last_quota_ops +1)
1505                 goto fail;
1506         if (ring_stats->enq_quota_objs != last_quota_items + num_items)
1507                 goto fail;
1508
1509         printf("Test watermark success stats.\n");
1510         /* Success stats should be same as last non-watermarked enqueue. */
1511         if (ring_stats->enq_success_bulk != last_enqueue_ops)
1512                 goto fail;
1513         if (ring_stats->enq_success_objs != last_enqueue_items)
1514                 goto fail;
1515
1516
1517         /* Cleanup. */
1518
1519         /* Empty the ring. */
1520         for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
1521                 rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
1522                 cur_dst += MAX_BULK;
1523         }
1524
1525         /* Reset the watermark. */
1526         rte_ring_set_water_mark(r, 0);
1527
1528         /* Reset the ring stats. */
1529         memset(&r->stats[lcore_id], 0, sizeof(r->stats[lcore_id]));
1530
1531         /* Free memory before test completed */
1532         if (src)
1533                 free(src);
1534         if (dst)
1535                 free(dst);
1536         return 0;
1537
1538 fail:
1539         if (src)
1540                 free(src);
1541         if (dst)
1542                 free(dst);
1543         return -1;
1544 #endif
1545 }
1546
1547 /*
1548  * it will always fail to create ring with a wrong ring size number in this function
1549  */
1550 static int
1551 test_ring_creation_with_wrong_size(void)
1552 {
1553         struct rte_ring * rp = NULL;
1554
1555         /* Test if ring size is not power of 2 */
1556         rp = rte_ring_create("test_bad_ring_size", RING_SIZE + 1, SOCKET_ID_ANY, 0);
1557         if (NULL != rp) {
1558                 return -1;
1559         }
1560
1561         /* Test if ring size is exceeding the limit */
1562         rp = rte_ring_create("test_bad_ring_size", (RTE_RING_SZ_MASK + 1), SOCKET_ID_ANY, 0);
1563         if (NULL != rp) {
1564                 return -1;
1565         }
1566         return 0;
1567 }
1568
1569 /*
1570  * it tests if it would always fail to create ring with an used ring name
1571  */
1572 static int
1573 test_ring_creation_with_an_used_name(void)
1574 {
1575         struct rte_ring * rp;
1576
1577         rp = rte_ring_create("test", RING_SIZE, SOCKET_ID_ANY, 0);
1578         if (NULL != rp)
1579                 return -1;
1580
1581         return 0;
1582 }
1583
1584 /*
1585  * Test to if a non-power of 2 count causes the create
1586  * function to fail correctly
1587  */
1588 static int
1589 test_create_count_odd(void)
1590 {
1591         struct rte_ring *r = rte_ring_create("test_ring_count",
1592                         4097, SOCKET_ID_ANY, 0 );
1593         if(r != NULL){
1594                 return -1;
1595         }
1596         return 0;
1597 }
1598
1599 static int
1600 test_lookup_null(void)
1601 {
1602         struct rte_ring *rlp = rte_ring_lookup("ring_not_found");
1603         if (rlp ==NULL)
1604         if (rte_errno != ENOENT){
1605                 printf( "test failed to returnn error on null pointer\n");
1606                 return -1;
1607         }
1608         return 0;
1609 }
1610
1611 /*
1612  * it tests some more basic ring operations
1613  */
1614 static int
1615 test_ring_basic_ex(void)
1616 {
1617         int ret = -1;
1618         unsigned i;
1619         struct rte_ring * rp;
1620         void **obj = NULL;
1621
1622         obj = (void **)rte_zmalloc("test_ring_basic_ex_malloc", (RING_SIZE * sizeof(void *)), 0);
1623         if (obj == NULL) {
1624                 printf("test_ring_basic_ex fail to rte_malloc\n");
1625                 goto fail_test;
1626         }
1627
1628         rp = rte_ring_create("test_ring_basic_ex", RING_SIZE, SOCKET_ID_ANY,
1629                         RING_F_SP_ENQ | RING_F_SC_DEQ);
1630         if (rp == NULL) {
1631                 printf("test_ring_basic_ex fail to create ring\n");
1632                 goto fail_test;
1633         }
1634
1635         if (rte_ring_lookup("test_ring_basic_ex") != rp) {
1636                 goto fail_test;
1637         }
1638
1639         if (rte_ring_empty(rp) != 1) {
1640                 printf("test_ring_basic_ex ring is not empty but it should be\n");
1641                 goto fail_test;
1642         }
1643
1644         printf("%u ring entries are now free\n", rte_ring_free_count(rp));
1645
1646         for (i = 0; i < RING_SIZE; i ++) {
1647                 rte_ring_enqueue(rp, obj[i]);
1648         }
1649
1650         if (rte_ring_full(rp) != 1) {
1651                 printf("test_ring_basic_ex ring is not full but it should be\n");
1652                 goto fail_test;
1653         }
1654
1655         for (i = 0; i < RING_SIZE; i ++) {
1656                 rte_ring_dequeue(rp, &obj[i]);
1657         }
1658
1659         if (rte_ring_empty(rp) != 1) {
1660                 printf("test_ring_basic_ex ring is not empty but it should be\n");
1661                 goto fail_test;
1662         }
1663
1664         /* Covering the ring burst operation */
1665         ret = rte_ring_enqueue_burst(rp, obj, 2);
1666         if ((ret & RTE_RING_SZ_MASK) != 2) {
1667                 printf("test_ring_basic_ex: rte_ring_enqueue_burst fails \n");
1668                 goto fail_test;
1669         }
1670
1671         ret = rte_ring_dequeue_burst(rp, obj, 2);
1672         if (ret != 2) {
1673                 printf("test_ring_basic_ex: rte_ring_dequeue_burst fails \n");
1674                 goto fail_test;
1675         }
1676
1677         ret = 0;
1678 fail_test:
1679         if (obj != NULL)
1680                 rte_free(obj);
1681
1682         return ret;
1683 }
1684
1685 int
1686 test_ring(void)
1687 {
1688         unsigned enq_core_count, deq_core_count;
1689
1690         /* some more basic operations */
1691         if (test_ring_basic_ex() < 0)
1692                 return -1;
1693
1694         rte_atomic32_init(&synchro);
1695
1696         if (r == NULL)
1697                 r = rte_ring_create("test", RING_SIZE, SOCKET_ID_ANY, 0);
1698         if (r == NULL)
1699                 return -1;
1700
1701         /* retrieve the ring from its name */
1702         if (rte_ring_lookup("test") != r) {
1703                 printf("Cannot lookup ring from its name\n");
1704                 return -1;
1705         }
1706
1707         /* burst operations */
1708         if (test_ring_burst_basic() < 0)
1709                 return -1;
1710
1711         /* basic operations */
1712         if (test_ring_basic() < 0)
1713                 return -1;
1714
1715         /* ring stats */
1716         if (test_ring_stats() < 0)
1717                 return -1;
1718
1719         /* basic operations */
1720         if (test_live_watermark_change() < 0)
1721                 return -1;
1722
1723         if ( test_set_watermark() < 0){
1724                 printf ("Test failed to detect invalid parameter\n");
1725                 return -1;
1726         }
1727         else
1728                 printf ( "Test detected forced bad watermark values\n");
1729
1730         if ( test_create_count_odd() < 0){
1731                         printf ("Test failed to detect odd count\n");
1732                         return -1;
1733                 }
1734                 else
1735                         printf ( "Test detected odd count\n");
1736
1737         if ( test_lookup_null() < 0){
1738                                 printf ("Test failed to detect NULL ring lookup\n");
1739                                 return -1;
1740                         }
1741                         else
1742                                 printf ( "Test detected NULL ring lookup \n");
1743
1744         printf("start performance tests \n");
1745
1746         /* one lcore for enqueue, one for dequeue */
1747         enq_core_count = 1;
1748         deq_core_count = 1;
1749         if (do_one_ring_test(enq_core_count, deq_core_count, 1) < 0)
1750                 return -1;
1751
1752         /* max cores for enqueue, one for dequeue */
1753         enq_core_count = rte_lcore_count() - 1;
1754         deq_core_count = 1;
1755         if (do_one_ring_test(enq_core_count, deq_core_count, 1) < 0)
1756                 return -1;
1757
1758         /* max cores for dequeue, one for enqueue */
1759         enq_core_count = 1;
1760         deq_core_count = rte_lcore_count() - 1;
1761         if (do_one_ring_test(enq_core_count, deq_core_count, 1) < 0)
1762                 return -1;
1763
1764         /* half for enqueue and half for dequeue */
1765         enq_core_count = rte_lcore_count() / 2;
1766         deq_core_count = rte_lcore_count() / 2;
1767         if (do_one_ring_test(enq_core_count, deq_core_count, 1) < 0)
1768                 return -1;
1769
1770         printf("start performance tests - burst operations \n");
1771
1772         /* one lcore for enqueue, one for dequeue */
1773         enq_core_count = 1;
1774         deq_core_count = 1;
1775         if (do_one_ring_test(enq_core_count, deq_core_count, 0) < 0)
1776                 return -1;
1777
1778         /* max cores for enqueue, one for dequeue */
1779         enq_core_count = rte_lcore_count() - 1;
1780         deq_core_count = 1;
1781         if (do_one_ring_test(enq_core_count, deq_core_count, 0) < 0)
1782                 return -1;
1783
1784         /* max cores for dequeue, one for enqueue */
1785         enq_core_count = 1;
1786         deq_core_count = rte_lcore_count() - 1;
1787         if (do_one_ring_test(enq_core_count, deq_core_count, 0) < 0)
1788                 return -1;
1789
1790         /* half for enqueue and half for dequeue */
1791         enq_core_count = rte_lcore_count() / 2;
1792         deq_core_count = rte_lcore_count() / 2;
1793         if (do_one_ring_test(enq_core_count, deq_core_count, 0) < 0)
1794                 return -1;
1795
1796         /* test of creating ring with wrong size */
1797         if (test_ring_creation_with_wrong_size() < 0)
1798                 return -1;
1799
1800         /* test of creation ring with an used name */
1801         if (test_ring_creation_with_an_used_name() < 0)
1802                 return -1;
1803
1804         /* dump the ring status */
1805         rte_ring_list_dump();
1806
1807         return 0;
1808 }