update copyright date to 2013
[dpdk.git] / app / test / test_ring.c
1 /*-
2  *   BSD LICENSE
3  * 
4  *   Copyright(c) 2010-2013 Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  * 
7  *   Redistribution and use in source and binary forms, with or without 
8  *   modification, are permitted provided that the following conditions 
9  *   are met:
10  * 
11  *     * Redistributions of source code must retain the above copyright 
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright 
14  *       notice, this list of conditions and the following disclaimer in 
15  *       the documentation and/or other materials provided with the 
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its 
18  *       contributors may be used to endorse or promote products derived 
19  *       from this software without specific prior written permission.
20  * 
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  * 
33  */
34
35 #include <string.h>
36 #include <stdarg.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <stdint.h>
40 #include <inttypes.h>
41 #include <errno.h>
42 #include <sys/queue.h>
43
44 #include <rte_common.h>
45 #include <rte_log.h>
46 #include <rte_memory.h>
47 #include <rte_memzone.h>
48 #include <rte_launch.h>
49 #include <rte_cycles.h>
50 #include <rte_tailq.h>
51 #include <rte_eal.h>
52 #include <rte_per_lcore.h>
53 #include <rte_lcore.h>
54 #include <rte_atomic.h>
55 #include <rte_branch_prediction.h>
56 #include <rte_malloc.h>
57 #include <rte_ring.h>
58 #include <rte_random.h>
59 #include <rte_common.h>
60 #include <rte_errno.h>
61
62 #include <cmdline_parse.h>
63
64 #include "test.h"
65
66 /*
67  * Ring
68  * ====
69  *
70  * #. Basic tests: done on one core:
71  *
72  *    - Using single producer/single consumer functions:
73  *
74  *      - Enqueue one object, two objects, MAX_BULK objects
75  *      - Dequeue one object, two objects, MAX_BULK objects
76  *      - Check that dequeued pointers are correct
77  *
78  *    - Using multi producers/multi consumers functions:
79  *
80  *      - Enqueue one object, two objects, MAX_BULK objects
81  *      - Dequeue one object, two objects, MAX_BULK objects
82  *      - Check that dequeued pointers are correct
83  *
84  *    - Test watermark and default bulk enqueue/dequeue:
85  *
86  *      - Set watermark
87  *      - Set default bulk value
88  *      - Enqueue objects, check that -EDQUOT is returned when
89  *        watermark is exceeded
90  *      - Check that dequeued pointers are correct
91  *
92  * #. Check live watermark change
93  *
94  *    - Start a loop on another lcore that will enqueue and dequeue
95  *      objects in a ring. It will monitor the value of watermark.
96  *    - At the same time, change the watermark on the master lcore.
97  *    - The slave lcore will check that watermark changes from 16 to 32.
98  *
99  * #. Performance tests.
100  *
101  *    This test is done on the following configurations:
102  *
103  *    - One core enqueuing, one core dequeuing
104  *    - One core enqueuing, other cores dequeuing
105  *    - One core dequeuing, other cores enqueuing
106  *    - Half of the cores enqueuing, the other half dequeuing
107  *
108  *    When only one core enqueues/dequeues, the test is done with the
109  *    SP/SC functions in addition to the MP/MC functions.
110  *
111  *    The test is done with different bulk size.
112  *
113  *    On each core, the test enqueues or dequeues objects during
114  *    TIME_S seconds. The number of successes and failures are stored on
115  *    each core, then summed and displayed.
116  *
117  *    The test checks that the number of enqueues is equal to the
118  *    number of dequeues.
119  */
120
121 #define RING_SIZE 4096
122 #define MAX_BULK 32
123 #define N 65536
124 #define TIME_S 5
125
126 static rte_atomic32_t synchro;
127
128 static struct rte_ring *r;
129
130 struct test_stats {
131         unsigned enq_success ;
132         unsigned enq_quota;
133         unsigned enq_fail;
134
135         unsigned deq_success;
136         unsigned deq_fail;
137 } __rte_cache_aligned;
138
139 static struct test_stats test_stats[RTE_MAX_LCORE];
140
141 static int
142 ring_enqueue_test(int (que_func)(struct rte_ring*, void * const *, unsigned),
143                 void* arg, unsigned bulk_or_burst)
144 {
145         unsigned success = 0;
146         unsigned quota = 0;
147         unsigned fail = 0;
148         unsigned i;
149         unsigned long dummy_obj;
150         void *obj_table[MAX_BULK];
151         int ret;
152         unsigned lcore_id = rte_lcore_id();
153         unsigned count = *((unsigned*)arg);
154         uint64_t start_cycles, end_cycles;
155         uint64_t time_diff = 0, hz = rte_get_hpet_hz();
156
157         /* init dummy object table */
158         for (i = 0; i< MAX_BULK; i++) {
159                 dummy_obj = lcore_id + 0x1000 + i;
160                 obj_table[i] = (void *)dummy_obj;
161         }
162
163         /* wait synchro for slaves */
164         if (lcore_id != rte_get_master_lcore())
165                 while (rte_atomic32_read(&synchro) == 0);
166
167         start_cycles = rte_get_hpet_cycles();
168
169         /* enqueue as many object as possible */
170         while (time_diff/hz < TIME_S) {
171                 for (i = 0; likely(i < N); i++) {
172                         ret = que_func(r, obj_table, count);
173                         /*
174                          * bulk_or_burst
175                          *       1: for bulk operation
176                          *   0: for burst operation
177                          */
178                         if (bulk_or_burst) {
179                                 /* The *count* objects enqueued, unless fail */
180                                 if (ret == 0)
181                                         success += count;
182                                 else if (ret == -EDQUOT)
183                                         quota += count;
184                                 else
185                                         fail++;
186                         } else {
187                                 /* The actual objects enqueued */
188                                 if (ret != 0)
189                                         success += (ret & RTE_RING_SZ_MASK);
190                                 else
191                                         fail++;
192                         }
193                 }
194                 end_cycles = rte_get_hpet_cycles();
195                 time_diff = end_cycles - start_cycles;
196         }
197
198         /* write statistics in a shared structure */
199         test_stats[lcore_id].enq_success = success;
200         test_stats[lcore_id].enq_quota = quota;
201         test_stats[lcore_id].enq_fail = fail;
202
203         return 0;
204 }
205
206 static int
207 ring_dequeue_test(int (que_func)(struct rte_ring*, void **, unsigned),
208                 void* arg, unsigned bulk_or_burst)
209 {
210         unsigned success = 0;
211         unsigned fail = 0;
212         unsigned i;
213         void *obj_table[MAX_BULK];
214         int ret;
215         unsigned lcore_id = rte_lcore_id();
216         unsigned count = *((unsigned*)arg);
217         uint64_t start_cycles, end_cycles;
218         uint64_t time_diff = 0, hz = rte_get_hpet_hz();
219
220         /* wait synchro for slaves */
221         if (lcore_id != rte_get_master_lcore())
222                 while (rte_atomic32_read(&synchro) == 0);
223
224         start_cycles = rte_get_hpet_cycles();
225
226         /* dequeue as many object as possible */
227         while (time_diff/hz < TIME_S) {
228                 for (i = 0; likely(i < N); i++) {
229                         ret = que_func(r, obj_table, count);
230                         /*
231                          * bulk_or_burst
232                          *       1: for bulk operation
233                          *   0: for burst operation
234                          */
235                         if (bulk_or_burst) {
236                                 if (ret == 0)
237                                         success += count;
238                                 else
239                                         fail++;
240                         } else {
241                                 if (ret != 0)
242                                         success += ret;
243                                 else
244                                         fail++;
245                         }
246                 }
247                 end_cycles = rte_get_hpet_cycles();
248                 time_diff = end_cycles - start_cycles;
249         }
250
251         /* write statistics in a shared structure */
252         test_stats[lcore_id].deq_success = success;
253         test_stats[lcore_id].deq_fail = fail;
254
255         return 0;
256 }
257
258 static int
259 test_ring_per_core_sp_enqueue(void *arg)
260 {
261         return ring_enqueue_test(&rte_ring_sp_enqueue_bulk, arg, 1);
262 }
263
264 static int
265 test_ring_per_core_mp_enqueue(void *arg)
266 {
267         return ring_enqueue_test(&rte_ring_mp_enqueue_bulk, arg, 1);
268 }
269
270 static int
271 test_ring_per_core_mc_dequeue(void *arg)
272 {
273         return ring_dequeue_test(&rte_ring_mc_dequeue_bulk, arg, 1);
274 }
275
276 static int
277 test_ring_per_core_sc_dequeue(void *arg)
278 {
279         return ring_dequeue_test(&rte_ring_sc_dequeue_bulk, arg, 1);
280 }
281
282 static int
283 test_ring_per_core_sp_enqueue_burst(void *arg)
284 {
285         return ring_enqueue_test(&rte_ring_sp_enqueue_burst, arg, 0);
286 }
287
288 static int
289 test_ring_per_core_mp_enqueue_burst(void *arg)
290 {
291         return ring_enqueue_test(&rte_ring_mp_enqueue_burst, arg, 0);
292 }
293
294 static int
295 test_ring_per_core_mc_dequeue_burst(void *arg)
296 {
297         return ring_dequeue_test(&rte_ring_mc_dequeue_burst, arg, 0);
298 }
299
300 static int
301 test_ring_per_core_sc_dequeue_burst(void *arg)
302 {
303         return ring_dequeue_test(&rte_ring_sc_dequeue_burst, arg, 0);
304 }
305
306 #define TEST_RING_VERIFY(exp)                                           \
307         if (!(exp)) {                                                   \
308                 printf("error at %s:%d\tcondition " #exp " failed\n",   \
309                     __func__, __LINE__);                                \
310                 rte_ring_dump(r);                                       \
311                 return (-1);                                            \
312         }
313
314 #define TEST_RING_FULL_EMTPY_ITER       8
315
316
317 static int
318 launch_cores(unsigned enq_core_count, unsigned deq_core_count,
319                 unsigned n_enq_bulk, unsigned n_deq_bulk,
320                 int sp, int sc, int bulk_not_burst)
321 {
322         void *obj;
323         unsigned lcore_id;
324         unsigned rate, deq_remain = 0;
325         unsigned enq_total, deq_total;
326         struct test_stats sum;
327         int (*enq_f)(void *);
328         int (*deq_f)(void *);
329         unsigned cores = enq_core_count + deq_core_count;
330         int ret;
331
332         rte_atomic32_set(&synchro, 0);
333
334         printf("ring_autotest e/d_core=%u,%u e/d_bulk=%u,%u ",
335                enq_core_count, deq_core_count, n_enq_bulk, n_deq_bulk);
336         printf("sp=%d sc=%d ", sp, sc);
337
338         if (bulk_not_burst) {
339                 /* set enqueue function to be used */
340                 if (sp)
341                         enq_f = test_ring_per_core_sp_enqueue;
342                 else
343                         enq_f = test_ring_per_core_mp_enqueue;
344
345                 /* set dequeue function to be used */
346                 if (sc)
347                         deq_f = test_ring_per_core_sc_dequeue;
348                 else
349                         deq_f = test_ring_per_core_mc_dequeue;
350
351         } else {
352                 /* set enqueue function to be used */
353                 if (sp)
354                         enq_f = test_ring_per_core_sp_enqueue_burst;
355                 else
356                         enq_f = test_ring_per_core_mp_enqueue_burst;
357
358                 /* set dequeue function to be used */
359                 if (sc)
360                         deq_f = test_ring_per_core_sc_dequeue_burst;
361                 else
362                         deq_f = test_ring_per_core_mc_dequeue_burst;
363         }
364
365         RTE_LCORE_FOREACH_SLAVE(lcore_id) {
366                 if (enq_core_count != 0) {
367                         enq_core_count--;
368                         rte_eal_remote_launch(enq_f, &n_enq_bulk, lcore_id);
369                 }
370                 if (deq_core_count != 1) {
371                         deq_core_count--;
372                         rte_eal_remote_launch(deq_f, &n_deq_bulk, lcore_id);
373                 }
374         }
375
376         memset(test_stats, 0, sizeof(test_stats));
377
378         /* start synchro and launch test on master */
379         rte_atomic32_set(&synchro, 1);
380         ret = deq_f(&n_deq_bulk);
381
382         /* wait all cores */
383         RTE_LCORE_FOREACH_SLAVE(lcore_id) {
384                 if (cores == 1)
385                         break;
386                 cores--;
387                 if (rte_eal_wait_lcore(lcore_id) < 0)
388                         ret = -1;
389         }
390
391         memset(&sum, 0, sizeof(sum));
392         for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
393                 sum.enq_success += test_stats[lcore_id].enq_success;
394                 sum.enq_quota += test_stats[lcore_id].enq_quota;
395                 sum.enq_fail += test_stats[lcore_id].enq_fail;
396                 sum.deq_success += test_stats[lcore_id].deq_success;
397                 sum.deq_fail += test_stats[lcore_id].deq_fail;
398         }
399
400         /* empty the ring */
401         while (rte_ring_sc_dequeue(r, &obj) == 0)
402                 deq_remain += 1;
403
404         if (ret < 0) {
405                 printf("per-lcore test returned -1\n");
406                 return -1;
407         }
408
409         enq_total = sum.enq_success + sum.enq_quota;
410         deq_total = sum.deq_success + deq_remain;
411
412         rate = deq_total/TIME_S;
413
414         printf("rate_persec=%u\n", rate);
415
416         if (enq_total != deq_total) {
417                 printf("invalid enq/deq_success counter: %u %u\n",
418                        enq_total, deq_total);
419                 return -1;
420         }
421
422         return 0;
423 }
424
425 static int
426 do_one_ring_test2(unsigned enq_core_count, unsigned deq_core_count,
427                   unsigned n_enq_bulk, unsigned n_deq_bulk, unsigned bulk_or_burst)
428 {
429         int sp, sc;
430         int do_sp, do_sc;
431         int ret;
432
433         do_sp = (enq_core_count == 1) ? 1 : 0;
434         do_sc = (deq_core_count  == 1) ? 1 : 0;
435
436         for (sp = 0; sp <= do_sp; sp ++) {
437                 for (sc = 0; sc <= do_sc; sc ++) {
438                         ret = launch_cores(enq_core_count, deq_core_count,
439                                         n_enq_bulk, n_deq_bulk, sp, sc, bulk_or_burst);
440                         if (ret < 0)
441                                 return -1;
442                 }
443         }
444         return 0;
445 }
446
447 static int
448 do_one_ring_test(unsigned enq_core_count, unsigned deq_core_count,
449                 unsigned bulk_or_burst)
450 {
451         unsigned bulk_enqueue_tab[] = { 1, 2, 4, 32, 0 };
452         unsigned bulk_dequeue_tab[] = { 1, 2, 4, 32, 0 };
453         unsigned *bulk_enqueue_ptr;
454         unsigned *bulk_dequeue_ptr;
455         int ret;
456
457         for (bulk_enqueue_ptr = bulk_enqueue_tab;
458              *bulk_enqueue_ptr;
459              bulk_enqueue_ptr++) {
460
461                 for (bulk_dequeue_ptr = bulk_dequeue_tab;
462                      *bulk_dequeue_ptr;
463                      bulk_dequeue_ptr++) {
464
465                         ret = do_one_ring_test2(enq_core_count, deq_core_count,
466                                                 *bulk_enqueue_ptr,
467                                                 *bulk_dequeue_ptr,
468                                                 bulk_or_burst);
469                         if (ret < 0)
470                                 return -1;
471                 }
472         }
473         return 0;
474 }
475
476 static int
477 check_live_watermark_change(__attribute__((unused)) void *dummy)
478 {
479         uint64_t hz = rte_get_hpet_hz();
480         void *obj_table[MAX_BULK];
481         unsigned watermark, watermark_old = 16;
482         uint64_t cur_time, end_time;
483         int64_t diff = 0;
484         int i, ret;
485         unsigned count = 4;
486
487         /* init the object table */
488         memset(obj_table, 0, sizeof(obj_table));
489         end_time = rte_get_hpet_cycles() + (hz * 2);
490
491         /* check that bulk and watermark are 4 and 32 (respectively) */
492         while (diff >= 0) {
493
494                 /* add in ring until we reach watermark */
495                 ret = 0;
496                 for (i = 0; i < 16; i ++) {
497                         if (ret != 0)
498                                 break;
499                         ret = rte_ring_enqueue_bulk(r, obj_table, count);
500                 }
501
502                 if (ret != -EDQUOT) {
503                         printf("Cannot enqueue objects, or watermark not "
504                                "reached (ret=%d)\n", ret);
505                         return -1;
506                 }
507
508                 /* read watermark, the only change allowed is from 16 to 32 */
509                 watermark = r->prod.watermark;
510                 if (watermark != watermark_old &&
511                     (watermark_old != 16 || watermark != 32)) {
512                         printf("Bad watermark change %u -> %u\n", watermark_old,
513                                watermark);
514                         return -1;
515                 }
516                 watermark_old = watermark;
517
518                 /* dequeue objects from ring */
519                 while (i--) {
520                         ret = rte_ring_dequeue_bulk(r, obj_table, count);
521                         if (ret != 0) {
522                                 printf("Cannot dequeue (ret=%d)\n", ret);
523                                 return -1;
524                         }
525                 }
526
527                 cur_time = rte_get_hpet_cycles();
528                 diff = end_time - cur_time;
529         }
530
531         if (watermark_old != 32 ) {
532                 printf(" watermark was not updated (wm=%u)\n",
533                        watermark_old);
534                 return -1;
535         }
536
537         return 0;
538 }
539
540 static int
541 test_live_watermark_change(void)
542 {
543         unsigned lcore_id = rte_lcore_id();
544         unsigned lcore_id2 = rte_get_next_lcore(lcore_id, 0, 1);
545
546         printf("Test watermark live modification\n");
547         rte_ring_set_water_mark(r, 16);
548
549         /* launch a thread that will enqueue and dequeue, checking
550          * watermark and quota */
551         rte_eal_remote_launch(check_live_watermark_change, NULL, lcore_id2);
552
553         rte_delay_ms(1000);
554         rte_ring_set_water_mark(r, 32);
555         rte_delay_ms(1000);
556
557         if (rte_eal_wait_lcore(lcore_id2) < 0)
558                 return -1;
559
560         return 0;
561 }
562
563 /* Test for catch on invalid watermark values */
564 static int
565 test_set_watermark( void ){
566         unsigned count;
567         int setwm;
568
569         struct rte_ring *r = rte_ring_lookup("test_ring_basic_ex");
570         if(r == NULL){
571                 printf( " ring lookup failed\n" );
572                 goto error;
573         }
574         count = r->prod.size*2;
575         setwm = rte_ring_set_water_mark(r, count);
576         if (setwm != -EINVAL){
577                 printf("Test failed to detect invalid watermark count value\n");
578                 goto error;
579         }
580
581         count = 0;
582         rte_ring_set_water_mark(r, count);
583         if (r->prod.watermark != r->prod.size) {
584                 printf("Test failed to detect invalid watermark count value\n");
585                 goto error;
586         }
587         return 0;
588
589 error:
590         return -1;
591 }
592
593 /*
594  * helper routine for test_ring_basic
595  */
596 static int
597 test_ring_basic_full_empty(void * const src[], void *dst[])
598 {
599         unsigned i, rand;
600         const unsigned rsz = RING_SIZE - 1;
601
602         printf("Basic full/empty test\n");
603
604         for (i = 0; TEST_RING_FULL_EMTPY_ITER != i; i++) {
605
606                 /* random shift in the ring */
607                 rand = RTE_MAX(rte_rand() % RING_SIZE, 1UL);
608                 printf("%s: iteration %u, random shift: %u;\n",
609                     __func__, i, rand);
610                 TEST_RING_VERIFY(-ENOBUFS != rte_ring_enqueue_bulk(r, src,
611                     rand));
612                 TEST_RING_VERIFY(0 == rte_ring_dequeue_bulk(r, dst, rand));
613
614                 /* fill the ring */
615                 TEST_RING_VERIFY(-ENOBUFS != rte_ring_enqueue_bulk(r, src,
616                     rsz));
617                 TEST_RING_VERIFY(0 == rte_ring_free_count(r));
618                 TEST_RING_VERIFY(rsz == rte_ring_count(r));
619                 TEST_RING_VERIFY(rte_ring_full(r));
620                 TEST_RING_VERIFY(0 == rte_ring_empty(r));
621
622                 /* empty the ring */
623                 TEST_RING_VERIFY(0 == rte_ring_dequeue_bulk(r, dst, rsz));
624                 TEST_RING_VERIFY(rsz == rte_ring_free_count(r));
625                 TEST_RING_VERIFY(0 == rte_ring_count(r));
626                 TEST_RING_VERIFY(0 == rte_ring_full(r));
627                 TEST_RING_VERIFY(rte_ring_empty(r));
628
629                 /* check data */
630                 TEST_RING_VERIFY(0 == memcmp(src, dst, rsz));
631                 rte_ring_dump(r);
632         }
633         return (0);
634 }
635
636 static int
637 test_ring_basic(void)
638 {
639         void **src = NULL, **cur_src = NULL, **dst = NULL, **cur_dst = NULL;
640         int ret;
641         unsigned i, num_elems;
642
643         /* alloc dummy object pointers */
644         src = malloc(RING_SIZE*2*sizeof(void *));
645         if (src == NULL)
646                 goto fail;
647
648         for (i = 0; i < RING_SIZE*2 ; i++) {
649                 src[i] = (void *)(unsigned long)i;
650         }
651         cur_src = src;
652
653         /* alloc some room for copied objects */
654         dst = malloc(RING_SIZE*2*sizeof(void *));
655         if (dst == NULL)
656                 goto fail;
657
658         memset(dst, 0, RING_SIZE*2*sizeof(void *));
659         cur_dst = dst;
660
661         printf("enqueue 1 obj\n");
662         ret = rte_ring_sp_enqueue_bulk(r, cur_src, 1);
663         cur_src += 1;
664         if (ret != 0)
665                 goto fail;
666
667         printf("enqueue 2 objs\n");
668         ret = rte_ring_sp_enqueue_bulk(r, cur_src, 2);
669         cur_src += 2;
670         if (ret != 0)
671                 goto fail;
672
673         printf("enqueue MAX_BULK objs\n");
674         ret = rte_ring_sp_enqueue_bulk(r, cur_src, MAX_BULK);
675         cur_src += MAX_BULK;
676         if (ret != 0)
677                 goto fail;
678
679         printf("dequeue 1 obj\n");
680         ret = rte_ring_sc_dequeue_bulk(r, cur_dst, 1);
681         cur_dst += 1;
682         if (ret != 0)
683                 goto fail;
684
685         printf("dequeue 2 objs\n");
686         ret = rte_ring_sc_dequeue_bulk(r, cur_dst, 2);
687         cur_dst += 2;
688         if (ret != 0)
689                 goto fail;
690
691         printf("dequeue MAX_BULK objs\n");
692         ret = rte_ring_sc_dequeue_bulk(r, cur_dst, MAX_BULK);
693         cur_dst += MAX_BULK;
694         if (ret != 0)
695                 goto fail;
696
697         /* check data */
698         if (memcmp(src, dst, cur_dst - dst)) {
699                 test_hexdump("src", src, cur_src - src);
700                 test_hexdump("dst", dst, cur_dst - dst);
701                 printf("data after dequeue is not the same\n");
702                 goto fail;
703         }
704         cur_src = src;
705         cur_dst = dst;
706
707         printf("enqueue 1 obj\n");
708         ret = rte_ring_mp_enqueue_bulk(r, cur_src, 1);
709         cur_src += 1;
710         if (ret != 0)
711                 goto fail;
712
713         printf("enqueue 2 objs\n");
714         ret = rte_ring_mp_enqueue_bulk(r, cur_src, 2);
715         cur_src += 2;
716         if (ret != 0)
717                 goto fail;
718
719         printf("enqueue MAX_BULK objs\n");
720         ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK);
721         cur_src += MAX_BULK;
722         if (ret != 0)
723                 goto fail;
724
725         printf("dequeue 1 obj\n");
726         ret = rte_ring_mc_dequeue_bulk(r, cur_dst, 1);
727         cur_dst += 1;
728         if (ret != 0)
729                 goto fail;
730
731         printf("dequeue 2 objs\n");
732         ret = rte_ring_mc_dequeue_bulk(r, cur_dst, 2);
733         cur_dst += 2;
734         if (ret != 0)
735                 goto fail;
736
737         printf("dequeue MAX_BULK objs\n");
738         ret = rte_ring_mc_dequeue_bulk(r, cur_dst, MAX_BULK);
739         cur_dst += MAX_BULK;
740         if (ret != 0)
741                 goto fail;
742
743         /* check data */
744         if (memcmp(src, dst, cur_dst - dst)) {
745                 test_hexdump("src", src, cur_src - src);
746                 test_hexdump("dst", dst, cur_dst - dst);
747                 printf("data after dequeue is not the same\n");
748                 goto fail;
749         }
750         cur_src = src;
751         cur_dst = dst;
752
753         printf("fill and empty the ring\n");
754         for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
755                 ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK);
756                 cur_src += MAX_BULK;
757                 if (ret != 0)
758                         goto fail;
759                 ret = rte_ring_mc_dequeue_bulk(r, cur_dst, MAX_BULK);
760                 cur_dst += MAX_BULK;
761                 if (ret != 0)
762                         goto fail;
763         }
764
765         /* check data */
766         if (memcmp(src, dst, cur_dst - dst)) {
767                 test_hexdump("src", src, cur_src - src);
768                 test_hexdump("dst", dst, cur_dst - dst);
769                 printf("data after dequeue is not the same\n");
770                 goto fail;
771         }
772
773         if (test_ring_basic_full_empty(src, dst) != 0)
774                 goto fail;
775
776         cur_src = src;
777         cur_dst = dst;
778
779         printf("test watermark and default bulk enqueue / dequeue\n");
780         rte_ring_set_water_mark(r, 20);
781         num_elems = 16;
782
783         cur_src = src;
784         cur_dst = dst;
785
786         ret = rte_ring_enqueue_bulk(r, cur_src, num_elems);
787         cur_src += num_elems;
788         if (ret != 0) {
789                 printf("Cannot enqueue\n");
790                 goto fail;
791         }
792         ret = rte_ring_enqueue_bulk(r, cur_src, num_elems);
793         cur_src += num_elems;
794         if (ret != -EDQUOT) {
795                 printf("Watermark not exceeded\n");
796                 goto fail;
797         }
798         ret = rte_ring_dequeue_bulk(r, cur_dst, num_elems);
799         cur_dst += num_elems;
800         if (ret != 0) {
801                 printf("Cannot dequeue\n");
802                 goto fail;
803         }
804         ret = rte_ring_dequeue_bulk(r, cur_dst, num_elems);
805         cur_dst += num_elems;
806         if (ret != 0) {
807                 printf("Cannot dequeue2\n");
808                 goto fail;
809         }
810
811         /* check data */
812         if (memcmp(src, dst, cur_dst - dst)) {
813                 test_hexdump("src", src, cur_src - src);
814                 test_hexdump("dst", dst, cur_dst - dst);
815                 printf("data after dequeue is not the same\n");
816                 goto fail;
817         }
818
819         cur_src = src;
820         cur_dst = dst;
821
822         ret = rte_ring_mp_enqueue(r, cur_src);
823         if (ret != 0)
824                 goto fail;
825
826         ret = rte_ring_mc_dequeue(r, cur_dst);
827         if (ret != 0)
828                 goto fail;
829
830         if (src)
831                 free(src);
832         if (dst)
833                 free(dst);
834         return 0;
835
836  fail:
837         if (src)
838                 free(src);
839         if (dst)
840                 free(dst);
841         return -1;
842 }
843
844 static int
845 test_ring_burst_basic(void)
846 {
847         void **src = NULL, **cur_src = NULL, **dst = NULL, **cur_dst = NULL;
848         int ret;
849         unsigned i;
850
851         /* alloc dummy object pointers */
852         src = malloc(RING_SIZE*2*sizeof(void *));
853         if (src == NULL)
854                 goto fail;
855
856         for (i = 0; i < RING_SIZE*2 ; i++) {
857                 src[i] = (void *)(unsigned long)i;
858         }
859         cur_src = src;
860
861         /* alloc some room for copied objects */
862         dst = malloc(RING_SIZE*2*sizeof(void *));
863         if (dst == NULL)
864                 goto fail;
865
866         memset(dst, 0, RING_SIZE*2*sizeof(void *));
867         cur_dst = dst;
868
869         printf("Test SP & SC basic functions \n");
870         printf("enqueue 1 obj\n");
871         ret = rte_ring_sp_enqueue_burst(r, cur_src, 1);
872         cur_src += 1;
873         if ((ret & RTE_RING_SZ_MASK) != 1)
874                 goto fail;
875
876         printf("enqueue 2 objs\n");
877         ret = rte_ring_sp_enqueue_burst(r, cur_src, 2);
878         cur_src += 2;
879         if ((ret & RTE_RING_SZ_MASK) != 2)
880                 goto fail;
881
882         printf("enqueue MAX_BULK objs\n");
883         ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK) ;
884         cur_src += MAX_BULK;
885         if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
886                 goto fail;
887
888         printf("dequeue 1 obj\n");
889         ret = rte_ring_sc_dequeue_burst(r, cur_dst, 1) ;
890         cur_dst += 1;
891         if ((ret & RTE_RING_SZ_MASK) != 1)
892                 goto fail;
893
894         printf("dequeue 2 objs\n");
895         ret = rte_ring_sc_dequeue_burst(r, cur_dst, 2);
896         cur_dst += 2;
897         if ((ret & RTE_RING_SZ_MASK) != 2)
898                 goto fail;
899
900         printf("dequeue MAX_BULK objs\n");
901         ret = rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
902         cur_dst += MAX_BULK;
903         if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
904                 goto fail;
905
906         /* check data */
907         if (memcmp(src, dst, cur_dst - dst)) {
908                 test_hexdump("src", src, cur_src - src);
909                 test_hexdump("dst", dst, cur_dst - dst);
910                 printf("data after dequeue is not the same\n");
911                 goto fail;
912         }
913
914         cur_src = src;
915         cur_dst = dst;
916
917         printf("Test enqueue without enough memory space \n");
918         for (i = 0; i< (RING_SIZE/MAX_BULK - 1); i++) {
919                 ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
920                 cur_src += MAX_BULK;
921                 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK) {
922                         goto fail;
923                 }
924         }
925
926         printf("Enqueue 2 objects, free entries = MAX_BULK - 2  \n");
927         ret = rte_ring_sp_enqueue_burst(r, cur_src, 2);
928         cur_src += 2;
929         if ((ret & RTE_RING_SZ_MASK) != 2)
930                 goto fail;
931
932         printf("Enqueue the remaining entries = MAX_BULK - 2  \n");
933         /* Always one free entry left */
934         ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
935         cur_src += MAX_BULK - 3;
936         if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
937                 goto fail;
938
939         printf("Test if ring is full  \n");
940         if (rte_ring_full(r) != 1)
941                 goto fail;
942
943         printf("Test enqueue for a full entry  \n");
944         ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
945         if ((ret & RTE_RING_SZ_MASK) != 0)
946                 goto fail;
947
948         printf("Test dequeue without enough objects \n");
949         for (i = 0; i<RING_SIZE/MAX_BULK - 1; i++) {
950                 ret = rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
951                 cur_dst += MAX_BULK;
952                 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
953                         goto fail;
954         }
955
956         /* Available memory space for the exact MAX_BULK entries */
957         ret = rte_ring_sc_dequeue_burst(r, cur_dst, 2);
958         cur_dst += 2;
959         if ((ret & RTE_RING_SZ_MASK) != 2)
960                 goto fail;
961
962         ret = rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
963         cur_dst += MAX_BULK - 3;
964         if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
965                 goto fail;
966
967         printf("Test if ring is empty \n");
968         /* Check if ring is empty */
969         if (1 != rte_ring_empty(r))
970                 goto fail;
971
972         /* check data */
973         if (memcmp(src, dst, cur_dst - dst)) {
974                 test_hexdump("src", src, cur_src - src);
975                 test_hexdump("dst", dst, cur_dst - dst);
976                 printf("data after dequeue is not the same\n");
977                 goto fail;
978         }
979
980         cur_src = src;
981         cur_dst = dst;
982
983         printf("Test MP & MC basic functions \n");
984
985         printf("enqueue 1 obj\n");
986         ret = rte_ring_mp_enqueue_burst(r, cur_src, 1);
987         cur_src += 1;
988         if ((ret & RTE_RING_SZ_MASK) != 1)
989                 goto fail;
990
991         printf("enqueue 2 objs\n");
992         ret = rte_ring_mp_enqueue_burst(r, cur_src, 2);
993         cur_src += 2;
994         if ((ret & RTE_RING_SZ_MASK) != 2)
995                 goto fail;
996
997         printf("enqueue MAX_BULK objs\n");
998         ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
999         cur_src += MAX_BULK;
1000         if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1001                 goto fail;
1002
1003         printf("dequeue 1 obj\n");
1004         ret = rte_ring_mc_dequeue_burst(r, cur_dst, 1);
1005         cur_dst += 1;
1006         if ((ret & RTE_RING_SZ_MASK) != 1)
1007                 goto fail;
1008
1009         printf("dequeue 2 objs\n");
1010         ret = rte_ring_mc_dequeue_burst(r, cur_dst, 2);
1011         cur_dst += 2;
1012         if ((ret & RTE_RING_SZ_MASK) != 2)
1013                 goto fail;
1014
1015         printf("dequeue MAX_BULK objs\n");
1016         ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK);
1017         cur_dst += MAX_BULK;
1018         if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1019                 goto fail;
1020
1021         /* check data */
1022         if (memcmp(src, dst, cur_dst - dst)) {
1023                 test_hexdump("src", src, cur_src - src);
1024                 test_hexdump("dst", dst, cur_dst - dst);
1025                 printf("data after dequeue is not the same\n");
1026                 goto fail;
1027         }
1028
1029         cur_src = src;
1030         cur_dst = dst;
1031
1032         printf("fill and empty the ring\n");
1033         for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
1034                 ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
1035                 cur_src += MAX_BULK;
1036                 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1037                         goto fail;
1038                 ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK);
1039                 cur_dst += MAX_BULK;
1040                 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1041                         goto fail;
1042         }
1043
1044         /* check data */
1045         if (memcmp(src, dst, cur_dst - dst)) {
1046                 test_hexdump("src", src, cur_src - src);
1047                 test_hexdump("dst", dst, cur_dst - dst);
1048                 printf("data after dequeue is not the same\n");
1049                 goto fail;
1050         }
1051
1052         cur_src = src;
1053         cur_dst = dst;
1054
1055         printf("Test enqueue without enough memory space \n");
1056         for (i = 0; i<RING_SIZE/MAX_BULK - 1; i++) {
1057                 ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
1058                 cur_src += MAX_BULK;
1059                 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1060                         goto fail;
1061         }
1062
1063         /* Available memory space for the exact MAX_BULK objects */
1064         ret = rte_ring_mp_enqueue_burst(r, cur_src, 2);
1065         cur_src += 2;
1066         if ((ret & RTE_RING_SZ_MASK) != 2)
1067                 goto fail;
1068
1069         ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
1070         cur_src += MAX_BULK - 3;
1071         if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
1072                 goto fail;
1073
1074
1075         printf("Test dequeue without enough objects \n");
1076         for (i = 0; i<RING_SIZE/MAX_BULK - 1; i++) {
1077                 ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK);
1078                 cur_dst += MAX_BULK;
1079                 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1080                         goto fail;
1081         }
1082
1083         /* Available objects - the exact MAX_BULK */
1084         ret = rte_ring_mc_dequeue_burst(r, cur_dst, 2);
1085         cur_dst += 2;
1086         if ((ret & RTE_RING_SZ_MASK) != 2)
1087                 goto fail;
1088
1089         ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK);
1090         cur_dst += MAX_BULK - 3;
1091         if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
1092                 goto fail;
1093
1094         /* check data */
1095         if (memcmp(src, dst, cur_dst - dst)) {
1096                 test_hexdump("src", src, cur_src - src);
1097                 test_hexdump("dst", dst, cur_dst - dst);
1098                 printf("data after dequeue is not the same\n");
1099                 goto fail;
1100         }
1101
1102         cur_src = src;
1103         cur_dst = dst;
1104
1105         printf("Covering rte_ring_enqueue_burst functions \n");
1106
1107         ret = rte_ring_enqueue_burst(r, cur_src, 2);
1108         cur_src += 2;
1109         if ((ret & RTE_RING_SZ_MASK) != 2)
1110                 goto fail;
1111
1112         ret = rte_ring_dequeue_burst(r, cur_dst, 2);
1113         cur_dst += 2;
1114         if (ret != 2)
1115                 goto fail;
1116
1117         /* Free memory before test completed */
1118         if (src)
1119                 free(src);
1120         if (dst)
1121                 free(dst);
1122         return 0;
1123
1124  fail:
1125         if (src)
1126                 free(src);
1127         if (dst)
1128                 free(dst);
1129         return -1;
1130 }
1131
1132 static int
1133 test_ring_stats(void)
1134 {
1135
1136 #ifndef RTE_LIBRTE_RING_DEBUG
1137         printf("Enable RTE_LIBRTE_RING_DEBUG to test ring stats.\n");
1138         return 0;
1139 #else
1140         void **src = NULL, **cur_src = NULL, **dst = NULL, **cur_dst = NULL;
1141         int ret;
1142         unsigned i;
1143         unsigned num_items            = 0;
1144         unsigned failed_enqueue_ops   = 0;
1145         unsigned failed_enqueue_items = 0;
1146         unsigned failed_dequeue_ops   = 0;
1147         unsigned failed_dequeue_items = 0;
1148         unsigned last_enqueue_ops     = 0;
1149         unsigned last_enqueue_items   = 0;
1150         unsigned last_quota_ops       = 0;
1151         unsigned last_quota_items     = 0;
1152         unsigned lcore_id = rte_lcore_id();
1153         struct rte_ring_debug_stats *ring_stats = &r->stats[lcore_id];
1154
1155         printf("Test the ring stats.\n");
1156
1157         /* Reset the watermark in case it was set in another test. */
1158         rte_ring_set_water_mark(r, 0);
1159
1160         /* Reset the ring stats. */
1161         memset(&r->stats[lcore_id], 0, sizeof(r->stats[lcore_id]));
1162
1163         /* Allocate some dummy object pointers. */
1164         src = malloc(RING_SIZE*2*sizeof(void *));
1165         if (src == NULL)
1166                 goto fail;
1167
1168         for (i = 0; i < RING_SIZE*2 ; i++) {
1169                 src[i] = (void *)(unsigned long)i;
1170         }
1171
1172         /* Allocate some memory for copied objects. */
1173         dst = malloc(RING_SIZE*2*sizeof(void *));
1174         if (dst == NULL)
1175                 goto fail;
1176
1177         memset(dst, 0, RING_SIZE*2*sizeof(void *));
1178
1179         /* Set the head and tail pointers. */
1180         cur_src = src;
1181         cur_dst = dst;
1182
1183         /* Do Enqueue tests. */
1184         printf("Test the dequeue stats.\n");
1185
1186         /* Fill the ring up to RING_SIZE -1. */
1187         printf("Fill the ring.\n");
1188         for (i = 0; i< (RING_SIZE/MAX_BULK); i++) {
1189                 rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
1190                 cur_src += MAX_BULK;
1191         }
1192
1193         /* Adjust for final enqueue = MAX_BULK -1. */
1194         cur_src--;
1195
1196         printf("Verify that the ring is full.\n");
1197         if (rte_ring_full(r) != 1)
1198                 goto fail;
1199
1200
1201         printf("Verify the enqueue success stats.\n");
1202         /* Stats should match above enqueue operations to fill the ring. */
1203         if (ring_stats->enq_success_bulk != (RING_SIZE/MAX_BULK))
1204                 goto fail;
1205
1206         /* Current max objects is RING_SIZE -1. */
1207         if (ring_stats->enq_success_objs != RING_SIZE -1)
1208                 goto fail;
1209
1210         /* Shouldn't have any failures yet. */
1211         if (ring_stats->enq_fail_bulk != 0)
1212                 goto fail;
1213         if (ring_stats->enq_fail_objs != 0)
1214                 goto fail;
1215
1216
1217         printf("Test stats for SP burst enqueue to a full ring.\n");
1218         num_items = 2;
1219         ret = rte_ring_sp_enqueue_burst(r, cur_src, num_items);
1220         if ((ret & RTE_RING_SZ_MASK) != 0)
1221                 goto fail;
1222
1223         failed_enqueue_ops   += 1;
1224         failed_enqueue_items += num_items;
1225
1226         /* The enqueue should have failed. */
1227         if (ring_stats->enq_fail_bulk != failed_enqueue_ops)
1228                 goto fail;
1229         if (ring_stats->enq_fail_objs != failed_enqueue_items)
1230                 goto fail;
1231
1232
1233         printf("Test stats for SP bulk enqueue to a full ring.\n");
1234         num_items = 4;
1235         ret = rte_ring_sp_enqueue_bulk(r, cur_src, num_items);
1236         if (ret != -ENOBUFS)
1237                 goto fail;
1238
1239         failed_enqueue_ops   += 1;
1240         failed_enqueue_items += num_items;
1241
1242         /* The enqueue should have failed. */
1243         if (ring_stats->enq_fail_bulk != failed_enqueue_ops)
1244                 goto fail;
1245         if (ring_stats->enq_fail_objs != failed_enqueue_items)
1246                 goto fail;
1247
1248
1249         printf("Test stats for MP burst enqueue to a full ring.\n");
1250         num_items = 8;
1251         ret = rte_ring_mp_enqueue_burst(r, cur_src, num_items);
1252         if ((ret & RTE_RING_SZ_MASK) != 0)
1253                 goto fail;
1254
1255         failed_enqueue_ops   += 1;
1256         failed_enqueue_items += num_items;
1257
1258         /* The enqueue should have failed. */
1259         if (ring_stats->enq_fail_bulk != failed_enqueue_ops)
1260                 goto fail;
1261         if (ring_stats->enq_fail_objs != failed_enqueue_items)
1262                 goto fail;
1263
1264
1265         printf("Test stats for MP bulk enqueue to a full ring.\n");
1266         num_items = 16;
1267         ret = rte_ring_mp_enqueue_bulk(r, cur_src, num_items);
1268         if (ret != -ENOBUFS)
1269                 goto fail;
1270
1271         failed_enqueue_ops   += 1;
1272         failed_enqueue_items += num_items;
1273
1274         /* The enqueue should have failed. */
1275         if (ring_stats->enq_fail_bulk != failed_enqueue_ops)
1276                 goto fail;
1277         if (ring_stats->enq_fail_objs != failed_enqueue_items)
1278                 goto fail;
1279
1280
1281         /* Do Dequeue tests. */
1282         printf("Test the dequeue stats.\n");
1283
1284         printf("Empty the ring.\n");
1285         for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
1286                 rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
1287                 cur_dst += MAX_BULK;
1288         }
1289
1290         /* There was only RING_SIZE -1 objects to dequeue. */
1291         cur_dst++;
1292
1293         printf("Verify ring is empty.\n");
1294         if (1 != rte_ring_empty(r))
1295                 goto fail;
1296
1297         printf("Verify the dequeue success stats.\n");
1298         /* Stats should match above dequeue operations. */
1299         if (ring_stats->deq_success_bulk != (RING_SIZE/MAX_BULK))
1300                 goto fail;
1301
1302         /* Objects dequeued is RING_SIZE -1. */
1303         if (ring_stats->deq_success_objs != RING_SIZE -1)
1304                 goto fail;
1305
1306         /* Shouldn't have any dequeue failure stats yet. */
1307         if (ring_stats->deq_fail_bulk != 0)
1308                 goto fail;
1309
1310         printf("Test stats for SC burst dequeue with an empty ring.\n");
1311         num_items = 2;
1312         ret = rte_ring_sc_dequeue_burst(r, cur_dst, num_items);
1313         if ((ret & RTE_RING_SZ_MASK) != 0)
1314                 goto fail;
1315
1316         failed_dequeue_ops   += 1;
1317         failed_dequeue_items += num_items;
1318
1319         /* The dequeue should have failed. */
1320         if (ring_stats->deq_fail_bulk != failed_dequeue_ops)
1321                 goto fail;
1322         if (ring_stats->deq_fail_objs != failed_dequeue_items)
1323                 goto fail;
1324
1325
1326         printf("Test stats for SC bulk dequeue with an empty ring.\n");
1327         num_items = 4;
1328         ret = rte_ring_sc_dequeue_bulk(r, cur_dst, num_items);
1329         if (ret != -ENOENT)
1330                 goto fail;
1331
1332         failed_dequeue_ops   += 1;
1333         failed_dequeue_items += num_items;
1334
1335         /* The dequeue should have failed. */
1336         if (ring_stats->deq_fail_bulk != failed_dequeue_ops)
1337                 goto fail;
1338         if (ring_stats->deq_fail_objs != failed_dequeue_items)
1339                 goto fail;
1340
1341
1342         printf("Test stats for MC burst dequeue with an empty ring.\n");
1343         num_items = 8;
1344         ret = rte_ring_mc_dequeue_burst(r, cur_dst, num_items);
1345         if ((ret & RTE_RING_SZ_MASK) != 0)
1346                 goto fail;
1347         failed_dequeue_ops   += 1;
1348         failed_dequeue_items += num_items;
1349
1350         /* The dequeue should have failed. */
1351         if (ring_stats->deq_fail_bulk != failed_dequeue_ops)
1352                 goto fail;
1353         if (ring_stats->deq_fail_objs != failed_dequeue_items)
1354                 goto fail;
1355
1356
1357         printf("Test stats for MC bulk dequeue with an empty ring.\n");
1358         num_items = 16;
1359         ret = rte_ring_mc_dequeue_bulk(r, cur_dst, num_items);
1360         if (ret != -ENOENT)
1361                 goto fail;
1362
1363         failed_dequeue_ops   += 1;
1364         failed_dequeue_items += num_items;
1365
1366         /* The dequeue should have failed. */
1367         if (ring_stats->deq_fail_bulk != failed_dequeue_ops)
1368                 goto fail;
1369         if (ring_stats->deq_fail_objs != failed_dequeue_items)
1370                 goto fail;
1371
1372
1373         printf("Test total enqueue/dequeue stats.\n");
1374         /* At this point the enqueue and dequeue stats should be the same. */
1375         if (ring_stats->enq_success_bulk != ring_stats->deq_success_bulk)
1376                 goto fail;
1377         if (ring_stats->enq_success_objs != ring_stats->deq_success_objs)
1378                 goto fail;
1379         if (ring_stats->enq_fail_bulk    != ring_stats->deq_fail_bulk)
1380                 goto fail;
1381         if (ring_stats->enq_fail_objs    != ring_stats->deq_fail_objs)
1382                 goto fail;
1383
1384
1385         /* Watermark Tests. */
1386         printf("Test the watermark/quota stats.\n");
1387
1388         printf("Verify the initial watermark stats.\n");
1389         /* Watermark stats should be 0 since there is no watermark. */
1390         if (ring_stats->enq_quota_bulk != 0)
1391                 goto fail;
1392         if (ring_stats->enq_quota_objs != 0)
1393                 goto fail;
1394
1395         /* Set a watermark. */
1396         rte_ring_set_water_mark(r, 16);
1397
1398         /* Reset pointers. */
1399         cur_src = src;
1400         cur_dst = dst;
1401
1402         last_enqueue_ops   = ring_stats->enq_success_bulk;
1403         last_enqueue_items = ring_stats->enq_success_objs;
1404
1405
1406         printf("Test stats for SP burst enqueue below watermark.\n");
1407         num_items = 8;
1408         ret = rte_ring_sp_enqueue_burst(r, cur_src, num_items);
1409         if ((ret & RTE_RING_SZ_MASK) != num_items)
1410                 goto fail;
1411
1412         /* Watermark stats should still be 0. */
1413         if (ring_stats->enq_quota_bulk != 0)
1414                 goto fail;
1415         if (ring_stats->enq_quota_objs != 0)
1416                 goto fail;
1417
1418         /* Success stats should have increased. */
1419         if (ring_stats->enq_success_bulk != last_enqueue_ops + 1)
1420                 goto fail;
1421         if (ring_stats->enq_success_objs != last_enqueue_items + num_items)
1422                 goto fail;
1423
1424         last_enqueue_ops   = ring_stats->enq_success_bulk;
1425         last_enqueue_items = ring_stats->enq_success_objs;
1426
1427
1428         printf("Test stats for SP burst enqueue at watermark.\n");
1429         num_items = 8;
1430         ret = rte_ring_sp_enqueue_burst(r, cur_src, num_items);
1431         if ((ret & RTE_RING_SZ_MASK) != num_items)
1432                 goto fail;
1433
1434         /* Watermark stats should have changed. */
1435         if (ring_stats->enq_quota_bulk != 1)
1436                 goto fail;
1437         if (ring_stats->enq_quota_objs != num_items)
1438                 goto fail;
1439
1440         last_quota_ops   = ring_stats->enq_quota_bulk;
1441         last_quota_items = ring_stats->enq_quota_objs;
1442
1443
1444         printf("Test stats for SP burst enqueue above watermark.\n");
1445         num_items = 1;
1446         ret = rte_ring_sp_enqueue_burst(r, cur_src, num_items);
1447         if ((ret & RTE_RING_SZ_MASK) != num_items)
1448                 goto fail;
1449
1450         /* Watermark stats should have changed. */
1451         if (ring_stats->enq_quota_bulk != last_quota_ops +1)
1452                 goto fail;
1453         if (ring_stats->enq_quota_objs != last_quota_items + num_items)
1454                 goto fail;
1455
1456         last_quota_ops   = ring_stats->enq_quota_bulk;
1457         last_quota_items = ring_stats->enq_quota_objs;
1458
1459
1460         printf("Test stats for MP burst enqueue above watermark.\n");
1461         num_items = 2;
1462         ret = rte_ring_mp_enqueue_burst(r, cur_src, num_items);
1463         if ((ret & RTE_RING_SZ_MASK) != num_items)
1464                 goto fail;
1465
1466         /* Watermark stats should have changed. */
1467         if (ring_stats->enq_quota_bulk != last_quota_ops +1)
1468                 goto fail;
1469         if (ring_stats->enq_quota_objs != last_quota_items + num_items)
1470                 goto fail;
1471
1472         last_quota_ops   = ring_stats->enq_quota_bulk;
1473         last_quota_items = ring_stats->enq_quota_objs;
1474
1475
1476         printf("Test stats for SP bulk enqueue above watermark.\n");
1477         num_items = 4;
1478         ret = rte_ring_sp_enqueue_bulk(r, cur_src, num_items);
1479         if (ret != -EDQUOT)
1480                 goto fail;
1481
1482         /* Watermark stats should have changed. */
1483         if (ring_stats->enq_quota_bulk != last_quota_ops +1)
1484                 goto fail;
1485         if (ring_stats->enq_quota_objs != last_quota_items + num_items)
1486                 goto fail;
1487
1488         last_quota_ops   = ring_stats->enq_quota_bulk;
1489         last_quota_items = ring_stats->enq_quota_objs;
1490
1491
1492         printf("Test stats for MP bulk enqueue above watermark.\n");
1493         num_items = 8;
1494         ret = rte_ring_mp_enqueue_bulk(r, cur_src, num_items);
1495         if (ret != -EDQUOT)
1496                 goto fail;
1497
1498         /* Watermark stats should have changed. */
1499         if (ring_stats->enq_quota_bulk != last_quota_ops +1)
1500                 goto fail;
1501         if (ring_stats->enq_quota_objs != last_quota_items + num_items)
1502                 goto fail;
1503
1504         printf("Test watermark success stats.\n");
1505         /* Success stats should be same as last non-watermarked enqueue. */
1506         if (ring_stats->enq_success_bulk != last_enqueue_ops)
1507                 goto fail;
1508         if (ring_stats->enq_success_objs != last_enqueue_items)
1509                 goto fail;
1510
1511
1512         /* Cleanup. */
1513
1514         /* Empty the ring. */
1515         for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
1516                 rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
1517                 cur_dst += MAX_BULK;
1518         }
1519
1520         /* Reset the watermark. */
1521         rte_ring_set_water_mark(r, 0);
1522
1523         /* Reset the ring stats. */
1524         memset(&r->stats[lcore_id], 0, sizeof(r->stats[lcore_id]));
1525
1526         /* Free memory before test completed */
1527         if (src)
1528                 free(src);
1529         if (dst)
1530                 free(dst);
1531         return 0;
1532
1533 fail:
1534         if (src)
1535                 free(src);
1536         if (dst)
1537                 free(dst);
1538         return -1;
1539 #endif
1540 }
1541
1542 /*
1543  * it will always fail to create ring with a wrong ring size number in this function
1544  */
1545 static int
1546 test_ring_creation_with_wrong_size(void)
1547 {
1548         struct rte_ring * rp = NULL;
1549
1550         /* Test if ring size is not power of 2 */
1551         rp = rte_ring_create("test_bad_ring_size", RING_SIZE + 1, SOCKET_ID_ANY, 0);
1552         if (NULL != rp) {
1553                 return -1;
1554         }
1555
1556         /* Test if ring size is exceeding the limit */
1557         rp = rte_ring_create("test_bad_ring_size", (RTE_RING_SZ_MASK + 1), SOCKET_ID_ANY, 0);
1558         if (NULL != rp) {
1559                 return -1;
1560         }
1561         return 0;
1562 }
1563
1564 /*
1565  * it tests if it would always fail to create ring with an used ring name
1566  */
1567 static int
1568 test_ring_creation_with_an_used_name(void)
1569 {
1570         struct rte_ring * rp;
1571
1572         rp = rte_ring_create("test", RING_SIZE, SOCKET_ID_ANY, 0);
1573         if (NULL != rp)
1574                 return -1;
1575
1576         return 0;
1577 }
1578
1579 /*
1580  * Test to if a non-power of 2 count causes the create
1581  * function to fail correctly
1582  */
1583 static int
1584 test_create_count_odd(void)
1585 {
1586         struct rte_ring *r = rte_ring_create("test_ring_count",
1587                         4097, SOCKET_ID_ANY, 0 );
1588         if(r != NULL){
1589                 return -1;
1590         }
1591         return 0;
1592 }
1593
1594 static int
1595 test_lookup_null(void)
1596 {
1597         struct rte_ring *rlp = rte_ring_lookup("ring_not_found");
1598         if (rlp ==NULL)
1599         if (rte_errno != ENOENT){
1600                 printf( "test failed to returnn error on null pointer\n");
1601                 return -1;
1602         }
1603         return 0;
1604 }
1605
1606 /*
1607  * it tests some more basic ring operations
1608  */
1609 static int
1610 test_ring_basic_ex(void)
1611 {
1612         int ret = -1;
1613         unsigned i;
1614         struct rte_ring * rp;
1615         void **obj = NULL;
1616
1617         obj = (void **)rte_zmalloc("test_ring_basic_ex_malloc", (RING_SIZE * sizeof(void *)), 0);
1618         if (obj == NULL) {
1619                 printf("test_ring_basic_ex fail to rte_malloc\n");
1620                 goto fail_test;
1621         }
1622
1623         rp = rte_ring_create("test_ring_basic_ex", RING_SIZE, SOCKET_ID_ANY,
1624                         RING_F_SP_ENQ | RING_F_SC_DEQ);
1625         if (rp == NULL) {
1626                 printf("test_ring_basic_ex fail to create ring\n");
1627                 goto fail_test;
1628         }
1629
1630         if (rte_ring_lookup("test_ring_basic_ex") != rp) {
1631                 goto fail_test;
1632         }
1633
1634         if (rte_ring_empty(rp) != 1) {
1635                 printf("test_ring_basic_ex ring is not empty but it should be\n");
1636                 goto fail_test;
1637         }
1638
1639         printf("%u ring entries are now free\n", rte_ring_free_count(rp));
1640
1641         for (i = 0; i < RING_SIZE; i ++) {
1642                 rte_ring_enqueue(rp, obj[i]);
1643         }
1644
1645         if (rte_ring_full(rp) != 1) {
1646                 printf("test_ring_basic_ex ring is not full but it should be\n");
1647                 goto fail_test;
1648         }
1649
1650         for (i = 0; i < RING_SIZE; i ++) {
1651                 rte_ring_dequeue(rp, &obj[i]);
1652         }
1653
1654         if (rte_ring_empty(rp) != 1) {
1655                 printf("test_ring_basic_ex ring is not empty but it should be\n");
1656                 goto fail_test;
1657         }
1658
1659         /* Covering the ring burst operation */
1660         ret = rte_ring_enqueue_burst(rp, obj, 2);
1661         if ((ret & RTE_RING_SZ_MASK) != 2) {
1662                 printf("test_ring_basic_ex: rte_ring_enqueue_burst fails \n");
1663                 goto fail_test;
1664         }
1665
1666         ret = rte_ring_dequeue_burst(rp, obj, 2);
1667         if (ret != 2) {
1668                 printf("test_ring_basic_ex: rte_ring_dequeue_burst fails \n");
1669                 goto fail_test;
1670         }
1671
1672         ret = 0;
1673 fail_test:
1674         if (obj != NULL)
1675                 rte_free(obj);
1676
1677         return ret;
1678 }
1679
1680 int
1681 test_ring(void)
1682 {
1683         unsigned enq_core_count, deq_core_count;
1684
1685         /* some more basic operations */
1686         if (test_ring_basic_ex() < 0)
1687                 return -1;
1688
1689         rte_atomic32_init(&synchro);
1690
1691         if (r == NULL)
1692                 r = rte_ring_create("test", RING_SIZE, SOCKET_ID_ANY, 0);
1693         if (r == NULL)
1694                 return -1;
1695
1696         /* retrieve the ring from its name */
1697         if (rte_ring_lookup("test") != r) {
1698                 printf("Cannot lookup ring from its name\n");
1699                 return -1;
1700         }
1701
1702         /* burst operations */
1703         if (test_ring_burst_basic() < 0)
1704                 return -1;
1705
1706         /* basic operations */
1707         if (test_ring_basic() < 0)
1708                 return -1;
1709
1710         /* ring stats */
1711         if (test_ring_stats() < 0)
1712                 return -1;
1713
1714         /* basic operations */
1715         if (test_live_watermark_change() < 0)
1716                 return -1;
1717
1718         if ( test_set_watermark() < 0){
1719                 printf ("Test failed to detect invalid parameter\n");
1720                 return -1;
1721         }
1722         else
1723                 printf ( "Test detected forced bad watermark values\n");
1724
1725         if ( test_create_count_odd() < 0){
1726                         printf ("Test failed to detect odd count\n");
1727                         return -1;
1728                 }
1729                 else
1730                         printf ( "Test detected odd count\n");
1731
1732         if ( test_lookup_null() < 0){
1733                                 printf ("Test failed to detect NULL ring lookup\n");
1734                                 return -1;
1735                         }
1736                         else
1737                                 printf ( "Test detected NULL ring lookup \n");
1738
1739         printf("start performance tests \n");
1740
1741         /* one lcore for enqueue, one for dequeue */
1742         enq_core_count = 1;
1743         deq_core_count = 1;
1744         if (do_one_ring_test(enq_core_count, deq_core_count, 1) < 0)
1745                 return -1;
1746
1747         /* max cores for enqueue, one for dequeue */
1748         enq_core_count = rte_lcore_count() - 1;
1749         deq_core_count = 1;
1750         if (do_one_ring_test(enq_core_count, deq_core_count, 1) < 0)
1751                 return -1;
1752
1753         /* max cores for dequeue, one for enqueue */
1754         enq_core_count = 1;
1755         deq_core_count = rte_lcore_count() - 1;
1756         if (do_one_ring_test(enq_core_count, deq_core_count, 1) < 0)
1757                 return -1;
1758
1759         /* half for enqueue and half for dequeue */
1760         enq_core_count = rte_lcore_count() / 2;
1761         deq_core_count = rte_lcore_count() / 2;
1762         if (do_one_ring_test(enq_core_count, deq_core_count, 1) < 0)
1763                 return -1;
1764
1765         printf("start performance tests - burst operations \n");
1766
1767         /* one lcore for enqueue, one for dequeue */
1768         enq_core_count = 1;
1769         deq_core_count = 1;
1770         if (do_one_ring_test(enq_core_count, deq_core_count, 0) < 0)
1771                 return -1;
1772
1773         /* max cores for enqueue, one for dequeue */
1774         enq_core_count = rte_lcore_count() - 1;
1775         deq_core_count = 1;
1776         if (do_one_ring_test(enq_core_count, deq_core_count, 0) < 0)
1777                 return -1;
1778
1779         /* max cores for dequeue, one for enqueue */
1780         enq_core_count = 1;
1781         deq_core_count = rte_lcore_count() - 1;
1782         if (do_one_ring_test(enq_core_count, deq_core_count, 0) < 0)
1783                 return -1;
1784
1785         /* half for enqueue and half for dequeue */
1786         enq_core_count = rte_lcore_count() / 2;
1787         deq_core_count = rte_lcore_count() / 2;
1788         if (do_one_ring_test(enq_core_count, deq_core_count, 0) < 0)
1789                 return -1;
1790
1791         /* test of creating ring with wrong size */
1792         if (test_ring_creation_with_wrong_size() < 0)
1793                 return -1;
1794
1795         /* test of creation ring with an used name */
1796         if (test_ring_creation_with_an_used_name() < 0)
1797                 return -1;
1798
1799         /* dump the ring status */
1800         rte_ring_list_dump();
1801
1802         return 0;
1803 }