first public release
[dpdk.git] / app / test / test_ring.c
1 /*-
2  *   BSD LICENSE
3  * 
4  *   Copyright(c) 2010-2012 Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  * 
7  *   Redistribution and use in source and binary forms, with or without 
8  *   modification, are permitted provided that the following conditions 
9  *   are met:
10  * 
11  *     * Redistributions of source code must retain the above copyright 
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright 
14  *       notice, this list of conditions and the following disclaimer in 
15  *       the documentation and/or other materials provided with the 
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its 
18  *       contributors may be used to endorse or promote products derived 
19  *       from this software without specific prior written permission.
20  * 
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  * 
33  *  version: DPDK.L.1.2.3-3
34  */
35
36 #include <string.h>
37 #include <stdarg.h>
38 #include <stdio.h>
39 #include <stdlib.h>
40 #include <stdint.h>
41 #include <inttypes.h>
42 #include <errno.h>
43 #include <sys/queue.h>
44
45 #include <rte_common.h>
46 #include <rte_log.h>
47 #include <rte_memory.h>
48 #include <rte_memzone.h>
49 #include <rte_launch.h>
50 #include <rte_cycles.h>
51 #include <rte_tailq.h>
52 #include <rte_eal.h>
53 #include <rte_per_lcore.h>
54 #include <rte_lcore.h>
55 #include <rte_atomic.h>
56 #include <rte_branch_prediction.h>
57 #include <rte_malloc.h>
58 #include <rte_ring.h>
59 #include <rte_random.h>
60 #include <rte_common.h>
61 #include <rte_errno.h>
62
63 #include <cmdline_parse.h>
64
65 #include "test.h"
66
67 /*
68  * Ring
69  * ====
70  *
71  * #. Basic tests: done on one core:
72  *
73  *    - Using single producer/single consumer functions:
74  *
75  *      - Enqueue one object, two objects, MAX_BULK objects
76  *      - Dequeue one object, two objects, MAX_BULK objects
77  *      - Check that dequeued pointers are correct
78  *
79  *    - Using multi producers/multi consumers functions:
80  *
81  *      - Enqueue one object, two objects, MAX_BULK objects
82  *      - Dequeue one object, two objects, MAX_BULK objects
83  *      - Check that dequeued pointers are correct
84  *
85  *    - Test watermark and default bulk enqueue/dequeue:
86  *
87  *      - Set watermark
88  *      - Set default bulk value
89  *      - Enqueue objects, check that -EDQUOT is returned when
90  *        watermark is exceeded
91  *      - Check that dequeued pointers are correct
92  *
93  * #. Check quota and watermark
94  *
95  *    - Start a loop on another lcore that will enqueue and dequeue
96  *      objects in a ring. It will monitor the value of quota (default
97  *      bulk count) and watermark.
98  *    - At the same time, change the quota and the watermark on the
99  *      master lcore.
100  *    - The slave lcore will check that bulk count changes from 4 to
101  *      8, and watermark changes from 16 to 32.
102  *
103  * #. Performance tests.
104  *
105  *    This test is done on the following configurations:
106  *
107  *    - One core enqueuing, one core dequeuing
108  *    - One core enqueuing, other cores dequeuing
109  *    - One core dequeuing, other cores enqueuing
110  *    - Half of the cores enqueuing, the other half dequeuing
111  *
112  *    When only one core enqueues/dequeues, the test is done with the
113  *    SP/SC functions in addition to the MP/MC functions.
114  *
115  *    The test is done with different bulk size.
116  *
117  *    On each core, the test enqueues or dequeues objects during
118  *    TIME_S seconds. The number of successes and failures are stored on
119  *    each core, then summed and displayed.
120  *
121  *    The test checks that the number of enqueues is equal to the
122  *    number of dequeues.
123  */
124
125 #define RING_SIZE 4096
126 #define MAX_BULK 32
127 #define N 65536
128 #define TIME_S 5
129
130 static rte_atomic32_t synchro;
131
132 static unsigned bulk_enqueue;
133 static unsigned bulk_dequeue;
134 static struct rte_ring *r;
135
136 struct test_stats {
137         unsigned enq_success ;
138         unsigned enq_quota;
139         unsigned enq_fail;
140
141         unsigned deq_success;
142         unsigned deq_fail;
143 } __rte_cache_aligned;
144
145 static struct test_stats test_stats[RTE_MAX_LCORE];
146
147 #define DEFINE_ENQUEUE_FUNCTION(name, enq_code)                 \
148 static int                                                      \
149 name(__attribute__((unused)) void *arg)                         \
150 {                                                               \
151         unsigned success = 0;                                   \
152         unsigned quota = 0;                                     \
153         unsigned fail = 0;                                      \
154         unsigned i;                                             \
155         unsigned long dummy_obj;                                \
156         void *obj_table[MAX_BULK];                              \
157         int ret;                                                \
158         unsigned lcore_id = rte_lcore_id();                     \
159         uint64_t start_cycles, end_cycles;                      \
160         uint64_t time_diff = 0, hz = rte_get_hpet_hz();         \
161                                                                 \
162         /* init dummy object table */                           \
163         for (i = 0; i< MAX_BULK; i++) {                         \
164                 dummy_obj = lcore_id + 0x1000 + i;              \
165                 obj_table[i] = (void *)dummy_obj;               \
166         }                                                       \
167                                                                 \
168         /* wait synchro for slaves */                           \
169         if (lcore_id != rte_get_master_lcore())                 \
170                 while (rte_atomic32_read(&synchro) == 0);       \
171                                                                 \
172         start_cycles = rte_get_hpet_cycles();                   \
173                                                                 \
174         /* enqueue as many object as possible */                \
175         while (time_diff/hz < TIME_S) {                         \
176                 for (i = 0; likely(i < N); i++) {               \
177                         ret = enq_code;                         \
178                         if (ret == 0)                           \
179                                 success++;                      \
180                         else if (ret == -EDQUOT)                \
181                                 quota++;                        \
182                         else                                    \
183                                 fail++;                         \
184                 }                                               \
185                 end_cycles = rte_get_hpet_cycles();             \
186                 time_diff = end_cycles - start_cycles;          \
187         }                                                       \
188                                                                 \
189         /* write statistics in a shared structure */            \
190         test_stats[lcore_id].enq_success = success;             \
191         test_stats[lcore_id].enq_quota = quota;                 \
192         test_stats[lcore_id].enq_fail = fail;                   \
193                                                                 \
194         return 0;                                               \
195 }
196
197 #define DEFINE_DEQUEUE_FUNCTION(name, deq_code)                 \
198 static int                                                      \
199 name(__attribute__((unused)) void *arg)                         \
200 {                                                               \
201         unsigned success = 0;                                   \
202         unsigned fail = 0;                                      \
203         unsigned i;                                             \
204         void *obj_table[MAX_BULK];                              \
205         int ret;                                                \
206         unsigned lcore_id = rte_lcore_id();                     \
207         uint64_t start_cycles, end_cycles;                      \
208         uint64_t time_diff = 0, hz = rte_get_hpet_hz();         \
209                                                                 \
210         /* wait synchro for slaves */                           \
211         if (lcore_id != rte_get_master_lcore())                 \
212                 while (rte_atomic32_read(&synchro) == 0);       \
213                                                                 \
214         start_cycles = rte_get_hpet_cycles();                   \
215                                                                 \
216         /* dequeue as many object as possible */                \
217         while (time_diff/hz < TIME_S) {                         \
218                 for (i = 0; likely(i < N); i++) {               \
219                         ret = deq_code;                         \
220                         if (ret == 0)                           \
221                                 success++;                      \
222                         else                                    \
223                                 fail++;                         \
224                 }                                               \
225                 end_cycles = rte_get_hpet_cycles();             \
226                 time_diff = end_cycles - start_cycles;          \
227         }                                                       \
228                                                                 \
229         /* write statistics in a shared structure */            \
230         test_stats[lcore_id].deq_success = success;             \
231         test_stats[lcore_id].deq_fail = fail;                   \
232                                                                 \
233         return 0;                                               \
234 }
235
236 DEFINE_ENQUEUE_FUNCTION(test_ring_per_core_sp_enqueue,
237                         rte_ring_sp_enqueue_bulk(r, obj_table, bulk_enqueue))
238
239 DEFINE_DEQUEUE_FUNCTION(test_ring_per_core_sc_dequeue,
240                         rte_ring_sc_dequeue_bulk(r, obj_table, bulk_dequeue))
241
242 DEFINE_ENQUEUE_FUNCTION(test_ring_per_core_mp_enqueue,
243                         rte_ring_mp_enqueue_bulk(r, obj_table, bulk_enqueue))
244
245 DEFINE_DEQUEUE_FUNCTION(test_ring_per_core_mc_dequeue,
246                         rte_ring_mc_dequeue_bulk(r, obj_table, bulk_dequeue))
247
248 #define TEST_RING_VERIFY(exp)                                           \
249         if (!(exp)) {                                                   \
250                 printf("error at %s:%d\tcondition " #exp " failed\n",   \
251                     __func__, __LINE__);                                \
252                 rte_ring_dump(r);                                       \
253                 return (-1);                                            \
254         }
255
256 #define TEST_RING_FULL_EMTPY_ITER       8
257
258
259 static int
260 launch_cores(unsigned enq_core_count, unsigned deq_core_count, int sp, int sc)
261 {
262         void *obj;
263         unsigned lcore_id;
264         unsigned rate, deq_remain = 0;
265         unsigned enq_total, deq_total;
266         struct test_stats sum;
267         int (*enq_f)(void *);
268         int (*deq_f)(void *);
269         unsigned cores = enq_core_count + deq_core_count;
270         int ret;
271
272         rte_atomic32_set(&synchro, 0);
273
274         printf("ring_autotest e/d_core=%u,%u e/d_bulk=%u,%u ",
275                enq_core_count, deq_core_count, bulk_enqueue, bulk_dequeue);
276         printf("sp=%d sc=%d ", sp, sc);
277
278         /* set enqueue function to be used */
279         if (sp)
280                 enq_f = test_ring_per_core_sp_enqueue;
281         else
282                 enq_f = test_ring_per_core_mp_enqueue;
283
284         /* set dequeue function to be used */
285         if (sc)
286                 deq_f = test_ring_per_core_sc_dequeue;
287         else
288                 deq_f = test_ring_per_core_mc_dequeue;
289
290         RTE_LCORE_FOREACH_SLAVE(lcore_id) {
291                 if (enq_core_count != 0) {
292                         enq_core_count--;
293                         rte_eal_remote_launch(enq_f, NULL, lcore_id);
294                 }
295                 if (deq_core_count != 1) {
296                         deq_core_count--;
297                         rte_eal_remote_launch(deq_f, NULL, lcore_id);
298                 }
299         }
300
301         memset(test_stats, 0, sizeof(test_stats));
302
303         /* start synchro and launch test on master */
304         rte_atomic32_set(&synchro, 1);
305         ret = deq_f(NULL);
306
307         /* wait all cores */
308         RTE_LCORE_FOREACH_SLAVE(lcore_id) {
309                 if (cores == 1)
310                         break;
311                 cores--;
312                 if (rte_eal_wait_lcore(lcore_id) < 0)
313                         ret = -1;
314         }
315
316         memset(&sum, 0, sizeof(sum));
317         for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
318                 sum.enq_success += test_stats[lcore_id].enq_success;
319                 sum.enq_quota += test_stats[lcore_id].enq_quota;
320                 sum.enq_fail += test_stats[lcore_id].enq_fail;
321                 sum.deq_success += test_stats[lcore_id].deq_success;
322                 sum.deq_fail += test_stats[lcore_id].deq_fail;
323         }
324
325         /* empty the ring */
326         while (rte_ring_sc_dequeue(r, &obj) == 0)
327                 deq_remain += 1;
328
329         if (ret < 0) {
330                 printf("per-lcore test returned -1\n");
331                 return -1;
332         }
333
334         enq_total = (sum.enq_success * bulk_enqueue) +
335                 (sum.enq_quota * bulk_enqueue);
336         deq_total = (sum.deq_success * bulk_dequeue) + deq_remain;
337
338         rate = deq_total/TIME_S;
339
340         printf("rate_persec=%u\n", rate);
341
342         if (enq_total != deq_total) {
343                 printf("invalid enq/deq_success counter: %u %u\n",
344                        enq_total, deq_total);
345                 return -1;
346         }
347
348         return 0;
349 }
350
351 static int
352 do_one_ring_test2(unsigned enq_core_count, unsigned deq_core_count,
353                   unsigned n_enq_bulk, unsigned n_deq_bulk)
354 {
355         int sp, sc;
356         int do_sp, do_sc;
357         int ret;
358
359         bulk_enqueue = n_enq_bulk;
360         bulk_dequeue = n_deq_bulk;
361
362         do_sp = (enq_core_count == 1) ? 1 : 0;
363         do_sc = (deq_core_count  == 1) ? 1 : 0;
364
365         for (sp = 0; sp <= do_sp; sp ++) {
366                 for (sc = 0; sc <= do_sc; sc ++) {
367                         ret = launch_cores(enq_core_count,
368                                            deq_core_count,
369                                            sp, sc);
370                         if (ret < 0)
371                                 return -1;
372                 }
373         }
374         return 0;
375 }
376
377 static int
378 do_one_ring_test(unsigned enq_core_count, unsigned deq_core_count)
379 {
380         unsigned bulk_enqueue_tab[] = { 1, 2, 4, 32, 0 };
381         unsigned bulk_dequeue_tab[] = { 1, 2, 4, 32, 0 };
382         unsigned *bulk_enqueue_ptr;
383         unsigned *bulk_dequeue_ptr;
384         int ret;
385
386         for (bulk_enqueue_ptr = bulk_enqueue_tab;
387              *bulk_enqueue_ptr;
388              bulk_enqueue_ptr++) {
389
390                 for (bulk_dequeue_ptr = bulk_dequeue_tab;
391                      *bulk_dequeue_ptr;
392                      bulk_dequeue_ptr++) {
393
394                         ret = do_one_ring_test2(enq_core_count, deq_core_count,
395                                                 *bulk_enqueue_ptr,
396                                                 *bulk_dequeue_ptr);
397                         if (ret < 0)
398                                 return -1;
399                 }
400         }
401         return 0;
402 }
403
404 static int
405 check_quota_and_watermark(__attribute__((unused)) void *dummy)
406 {
407         uint64_t hz = rte_get_hpet_hz();
408         void *obj_table[MAX_BULK];
409         unsigned watermark, watermark_old = 16;
410         uint64_t cur_time, end_time;
411         int64_t diff = 0;
412         int i, ret;
413         unsigned quota, quota_old = 4;
414
415         /* init the object table */
416         memset(obj_table, 0, sizeof(obj_table));
417         end_time = rte_get_hpet_cycles() + (hz * 2);
418
419         /* check that bulk and watermark are 4 and 32 (respectively) */
420         while (diff >= 0) {
421
422                 /* read quota, the only change allowed is from 4 to 8 */
423                 quota = rte_ring_get_bulk_count(r);
424                 if (quota != quota_old && (quota_old != 4 || quota != 8)) {
425                         printf("Bad quota change %u -> %u\n", quota_old,
426                                quota);
427                         return -1;
428                 }
429                 quota_old = quota;
430
431                 /* add in ring until we reach watermark */
432                 ret = 0;
433                 for (i = 0; i < 16; i ++) {
434                         if (ret != 0)
435                                 break;
436                         ret = rte_ring_enqueue_bulk(r, obj_table, quota);
437                 }
438
439                 if (ret != -EDQUOT) {
440                         printf("Cannot enqueue objects, or watermark not "
441                                "reached (ret=%d)\n", ret);
442                         return -1;
443                 }
444
445                 /* read watermark, the only change allowed is from 16 to 32 */
446                 watermark = i * quota;
447                 if (watermark != watermark_old &&
448                     (watermark_old != 16 || watermark != 32)) {
449                         printf("Bad watermark change %u -> %u\n", watermark_old,
450                                watermark);
451                         return -1;
452                 }
453                 watermark_old = watermark;
454
455                 /* dequeue objects from ring */
456                 while (i--) {
457                         ret = rte_ring_dequeue_bulk(r, obj_table, quota);
458                         if (ret != 0) {
459                                 printf("Cannot dequeue (ret=%d)\n", ret);
460                                 return -1;
461                         }
462                 }
463
464                 cur_time = rte_get_hpet_cycles();
465                 diff = end_time - cur_time;
466         }
467
468         if (watermark_old != 32 || quota_old != 8) {
469                 printf("quota or watermark was not updated (q=%u wm=%u)\n",
470                        quota_old, watermark_old);
471                 return -1;
472         }
473
474         return 0;
475 }
476
477 static int
478 test_quota_and_watermark(void)
479 {
480         unsigned lcore_id = rte_lcore_id();
481         unsigned lcore_id2 = rte_get_next_lcore(lcore_id, 0, 1);
482
483         printf("Test quota and watermark live modification\n");
484
485         rte_ring_set_bulk_count(r, 4);
486         rte_ring_set_water_mark(r, 16);
487
488         /* launch a thread that will enqueue and dequeue, checking
489          * watermark and quota */
490         rte_eal_remote_launch(check_quota_and_watermark, NULL, lcore_id2);
491
492         rte_delay_ms(1000);
493         rte_ring_set_bulk_count(r, 8);
494         rte_ring_set_water_mark(r, 32);
495         rte_delay_ms(1000);
496
497         if (rte_eal_wait_lcore(lcore_id2) < 0)
498                 return -1;
499
500         return 0;
501 }
502 /* Test for catch on invalid watermark values */
503 static int
504 test_set_watermark( void ){
505         unsigned count;
506         int setwm;
507
508         struct rte_ring *r = rte_ring_lookup("test_ring_basic_ex");
509         if(r == NULL){
510                 printf( " ring lookup failed\n" );
511                 goto error;
512         }
513         count = r->prod.size*2;
514         setwm = rte_ring_set_water_mark(r, count);
515         if (setwm != -EINVAL){
516                 printf("Test failed to detect invalid watermark count value\n");
517                 goto error;
518         }
519
520         count = 0;
521         setwm = rte_ring_set_water_mark(r, count);
522         if (r->prod.watermark != r->prod.size) {
523                 printf("Test failed to detect invalid watermark count value\n");
524                 goto error;
525         }
526         return 0;
527
528 error:
529         return -1;
530 }
531
532 /*
533  * helper routine for test_ring_basic
534  */
535 static int
536 test_ring_basic_full_empty(void * const src[], void *dst[])
537 {
538         unsigned i, rand;
539         const unsigned rsz = RING_SIZE - 1;
540
541         printf("Basic full/empty test\n");
542
543         for (i = 0; TEST_RING_FULL_EMTPY_ITER != i; i++) {
544
545                 /* random shift in the ring */
546                 rand = RTE_MAX(rte_rand() % RING_SIZE, 1UL);
547                 printf("%s: iteration %u, random shift: %u;\n",
548                     __func__, i, rand);
549                 TEST_RING_VERIFY(-ENOBUFS != rte_ring_enqueue_bulk(r, src,
550                     rand));
551                 TEST_RING_VERIFY(0 == rte_ring_dequeue_bulk(r, dst, rand));
552
553                 /* fill the ring */
554                 TEST_RING_VERIFY(-ENOBUFS != rte_ring_enqueue_bulk(r, src,
555                     rsz));
556                 TEST_RING_VERIFY(0 == rte_ring_free_count(r));
557                 TEST_RING_VERIFY(rsz == rte_ring_count(r));
558                 TEST_RING_VERIFY(rte_ring_full(r));
559                 TEST_RING_VERIFY(0 == rte_ring_empty(r));
560
561                 /* empty the ring */
562                 TEST_RING_VERIFY(0 == rte_ring_dequeue_bulk(r, dst, rsz));
563                 TEST_RING_VERIFY(rsz == rte_ring_free_count(r));
564                 TEST_RING_VERIFY(0 == rte_ring_count(r));
565                 TEST_RING_VERIFY(0 == rte_ring_full(r));
566                 TEST_RING_VERIFY(rte_ring_empty(r));
567
568                 /* check data */
569                 TEST_RING_VERIFY(0 == memcmp(src, dst, rsz));
570                 rte_ring_dump(r);
571         }
572         return (0);
573 }
574
575 static int
576 test_ring_basic(void)
577 {
578         void **src = NULL, **cur_src = NULL, **dst = NULL, **cur_dst = NULL;
579         int ret;
580         unsigned i, n;
581
582         /* alloc dummy object pointers */
583         src = malloc(RING_SIZE*2*sizeof(void *));
584         if (src == NULL)
585                 goto fail;
586
587         for (i = 0; i < RING_SIZE*2 ; i++) {
588                 src[i] = (void *)(unsigned long)i;
589         }
590         cur_src = src;
591
592         /* alloc some room for copied objects */
593         dst = malloc(RING_SIZE*2*sizeof(void *));
594         if (dst == NULL)
595                 goto fail;
596
597         memset(dst, 0, RING_SIZE*2*sizeof(void *));
598         cur_dst = dst;
599
600         printf("enqueue 1 obj\n");
601         ret = rte_ring_sp_enqueue_bulk(r, cur_src, 1);
602         cur_src += 1;
603         if (ret != 0)
604                 goto fail;
605
606         printf("enqueue 2 objs\n");
607         ret = rte_ring_sp_enqueue_bulk(r, cur_src, 2);
608         cur_src += 2;
609         if (ret != 0)
610                 goto fail;
611
612         printf("enqueue MAX_BULK objs\n");
613         ret = rte_ring_sp_enqueue_bulk(r, cur_src, MAX_BULK);
614         cur_src += MAX_BULK;
615         if (ret != 0)
616                 goto fail;
617
618         printf("dequeue 1 obj\n");
619         ret = rte_ring_sc_dequeue_bulk(r, cur_dst, 1);
620         cur_dst += 1;
621         if (ret != 0)
622                 goto fail;
623
624         printf("dequeue 2 objs\n");
625         ret = rte_ring_sc_dequeue_bulk(r, cur_dst, 2);
626         cur_dst += 2;
627         if (ret != 0)
628                 goto fail;
629
630         printf("dequeue MAX_BULK objs\n");
631         ret = rte_ring_sc_dequeue_bulk(r, cur_dst, MAX_BULK);
632         cur_dst += MAX_BULK;
633         if (ret != 0)
634                 goto fail;
635
636         /* check data */
637         if (memcmp(src, dst, cur_dst - dst)) {
638                 test_hexdump("src", src, cur_src - src);
639                 test_hexdump("dst", dst, cur_dst - dst);
640                 printf("data after dequeue is not the same\n");
641                 goto fail;
642         }
643         cur_src = src;
644         cur_dst = dst;
645
646         printf("enqueue 1 obj\n");
647         ret = rte_ring_mp_enqueue_bulk(r, cur_src, 1);
648         cur_src += 1;
649         if (ret != 0)
650                 goto fail;
651
652         printf("enqueue 2 objs\n");
653         ret = rte_ring_mp_enqueue_bulk(r, cur_src, 2);
654         cur_src += 2;
655         if (ret != 0)
656                 goto fail;
657
658         printf("enqueue MAX_BULK objs\n");
659         ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK);
660         cur_src += MAX_BULK;
661         if (ret != 0)
662                 goto fail;
663
664         printf("dequeue 1 obj\n");
665         ret = rte_ring_mc_dequeue_bulk(r, cur_dst, 1);
666         cur_dst += 1;
667         if (ret != 0)
668                 goto fail;
669
670         printf("dequeue 2 objs\n");
671         ret = rte_ring_mc_dequeue_bulk(r, cur_dst, 2);
672         cur_dst += 2;
673         if (ret != 0)
674                 goto fail;
675
676         printf("dequeue MAX_BULK objs\n");
677         ret = rte_ring_mc_dequeue_bulk(r, cur_dst, MAX_BULK);
678         cur_dst += MAX_BULK;
679         if (ret != 0)
680                 goto fail;
681
682         /* check data */
683         if (memcmp(src, dst, cur_dst - dst)) {
684                 test_hexdump("src", src, cur_src - src);
685                 test_hexdump("dst", dst, cur_dst - dst);
686                 printf("data after dequeue is not the same\n");
687                 goto fail;
688         }
689         cur_src = src;
690         cur_dst = dst;
691
692         printf("fill and empty the ring\n");
693         for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
694                 ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK);
695                 cur_src += MAX_BULK;
696                 if (ret != 0)
697                         goto fail;
698                 ret = rte_ring_mc_dequeue_bulk(r, cur_dst, MAX_BULK);
699                 cur_dst += MAX_BULK;
700                 if (ret != 0)
701                         goto fail;
702         }
703
704         /* check data */
705         if (memcmp(src, dst, cur_dst - dst)) {
706                 test_hexdump("src", src, cur_src - src);
707                 test_hexdump("dst", dst, cur_dst - dst);
708                 printf("data after dequeue is not the same\n");
709                 goto fail;
710         }
711
712         if (test_ring_basic_full_empty(src, dst) != 0)
713                 goto fail;
714
715         cur_src = src;
716         cur_dst = dst;
717
718         printf("test watermark and default bulk enqueue / dequeue\n");
719         rte_ring_set_bulk_count(r, 16);
720         rte_ring_set_water_mark(r, 20);
721         n = rte_ring_get_bulk_count(r);
722         if (n != 16) {
723                 printf("rte_ring_get_bulk_count() returned %u instead "
724                        "of 16\n", n);
725                 goto fail;
726         }
727
728         cur_src = src;
729         cur_dst = dst;
730         ret = rte_ring_enqueue_bulk(r, cur_src, n);
731         cur_src += 16;
732         if (ret != 0) {
733                 printf("Cannot enqueue\n");
734                 goto fail;
735         }
736         ret = rte_ring_enqueue_bulk(r, cur_src, n);
737         cur_src += 16;
738         if (ret != -EDQUOT) {
739                 printf("Watermark not exceeded\n");
740                 goto fail;
741         }
742         ret = rte_ring_dequeue_bulk(r, cur_dst, n);
743         cur_dst += 16;
744         if (ret != 0) {
745                 printf("Cannot dequeue\n");
746                 goto fail;
747         }
748         ret = rte_ring_dequeue_bulk(r, cur_dst, n);
749         cur_dst += 16;
750         if (ret != 0) {
751                 printf("Cannot dequeue2\n");
752                 goto fail;
753         }
754
755         /* check data */
756         if (memcmp(src, dst, cur_dst - dst)) {
757                 test_hexdump("src", src, cur_src - src);
758                 test_hexdump("dst", dst, cur_dst - dst);
759                 printf("data after dequeue is not the same\n");
760                 goto fail;
761         }
762         cur_src = src;
763         cur_dst = dst;
764
765         if (src)
766                 free(src);
767         if (dst)
768                 free(dst);
769         return 0;
770
771  fail:
772         if (src)
773                 free(src);
774         if (dst)
775                 free(dst);
776         return -1;
777 }
778
779 /*
780  * it will always fail to create ring with a wrong ring size number in this function
781  */
782 static int
783 test_ring_creation_with_wrong_size(void)
784 {
785         struct rte_ring * rp = NULL;
786
787         rp = rte_ring_create("test_bad_ring_size", RING_SIZE+1, SOCKET_ID_ANY, 0);
788         if (NULL != rp) {
789                 return -1;
790         }
791
792         return 0;
793 }
794
795 /*
796  * it tests if it would always fail to create ring with an used ring name
797  */
798 static int
799 test_ring_creation_with_an_used_name(void)
800 {
801         struct rte_ring * rp;
802
803         rp = rte_ring_create("test", RING_SIZE, SOCKET_ID_ANY, 0);
804         if (NULL != rp)
805                 return -1;
806
807         return 0;
808 }
809
810 /*
811  * Test to if a non-power of 2 count causes the create
812  * function to fail correctly
813  */
814 static int
815 test_create_count_odd(void)
816 {
817         struct rte_ring *r = rte_ring_create("test_ring_count",
818                         4097, SOCKET_ID_ANY, 0 );
819         if(r != NULL){
820                 return -1;
821         }
822         return 0;
823 }
824
825 static int
826 test_lookup_null(void)
827 {
828         struct rte_ring *rlp = rte_ring_lookup("ring_not_found");
829         if (rlp ==NULL)
830         if (rte_errno != ENOENT){
831                 printf( "test failed to returnn error on null pointer\n");
832                 return -1;
833         }
834         return 0;
835 }
836
837 /*
838  * it tests some more basic ring operations
839  */
840 static int
841 test_ring_basic_ex(void)
842 {
843         int ret = -1;
844         unsigned i;
845         struct rte_ring * rp;
846         void **obj = NULL;
847
848         obj = (void **)rte_zmalloc("test_ring_basic_ex_malloc", (RING_SIZE * sizeof(void *)), 0);
849         if (obj == NULL) {
850                 printf("test_ring_basic_ex fail to rte_malloc\n");
851                 goto fail_test;
852         }
853
854         rp = rte_ring_create("test_ring_basic_ex", RING_SIZE, SOCKET_ID_ANY, 0);
855         if (rp == NULL) {
856                 printf("test_ring_basic_ex fail to create ring\n");
857                 goto fail_test;
858         }
859
860         if (rte_ring_lookup("test_ring_basic_ex") != rp) {
861                 goto fail_test;
862         }
863
864         if (rte_ring_empty(rp) != 1) {
865                 printf("test_ring_basic_ex ring is not empty but it should be\n");
866                 goto fail_test;
867         }
868
869         printf("%u ring entries are now free\n", rte_ring_free_count(rp));
870
871         for (i = 0; i < RING_SIZE; i ++) {
872                 rte_ring_enqueue(rp, obj[i]);
873         }
874
875         if (rte_ring_full(rp) != 1) {
876                 printf("test_ring_basic_ex ring is not full but it should be\n");
877                 goto fail_test;
878         }
879
880         for (i = 0; i < RING_SIZE; i ++) {
881                 rte_ring_dequeue(rp, &obj[i]);
882         }
883
884         if (rte_ring_empty(rp) != 1) {
885                 printf("test_ring_basic_ex ring is not empty but it should be\n");
886                 goto fail_test;
887         }
888
889         ret = 0;
890 fail_test:
891         if (obj != NULL)
892                 rte_free(obj);
893
894         return ret;
895 }
896
897 int
898 test_ring(void)
899 {
900         unsigned enq_core_count, deq_core_count;
901
902         /* some more basic operations */
903         if (test_ring_basic_ex() < 0)
904                 return -1;
905
906         rte_atomic32_init(&synchro);
907
908         if (r == NULL)
909                 r = rte_ring_create("test", RING_SIZE, SOCKET_ID_ANY, 0);
910         if (r == NULL)
911                 return -1;
912
913         /* retrieve the ring from its name */
914         if (rte_ring_lookup("test") != r) {
915                 printf("Cannot lookup ring from its name\n");
916                 return -1;
917         }
918
919         /* basic operations */
920         if (test_ring_basic() < 0)
921                 return -1;
922
923         /* basic operations */
924         if (test_quota_and_watermark() < 0)
925                 return -1;
926
927         if ( test_set_watermark() < 0){
928                 printf ("Test failed to detect invalid parameter\n");
929                 return -1;
930         }
931         else
932                 printf ( "Test detected forced bad watermark values\n");
933
934         if ( test_create_count_odd() < 0){
935                         printf ("Test failed to detect odd count\n");
936                         return -1;
937                 }
938                 else
939                         printf ( "Test detected odd count\n");
940
941         if ( test_lookup_null() < 0){
942                                 printf ("Test failed to detect NULL ring lookup\n");
943                                 return -1;
944                         }
945                         else
946                                 printf ( "Test detected NULL ring lookup \n");
947
948
949         printf("start performance tests\n");
950
951         /* one lcore for enqueue, one for dequeue */
952         enq_core_count = 1;
953         deq_core_count = 1;
954         if (do_one_ring_test(enq_core_count, deq_core_count) < 0)
955                 return -1;
956
957         /* max cores for enqueue, one for dequeue */
958         enq_core_count = rte_lcore_count() - 1;
959         deq_core_count = 1;
960         if (do_one_ring_test(enq_core_count, deq_core_count) < 0)
961                 return -1;
962
963         /* max cores for dequeue, one for enqueue */
964         enq_core_count = 1;
965         deq_core_count = rte_lcore_count() - 1;
966         if (do_one_ring_test(enq_core_count, deq_core_count) < 0)
967                 return -1;
968
969         /* half for enqueue and half for dequeue */
970         enq_core_count = rte_lcore_count() / 2;
971         deq_core_count = rte_lcore_count() / 2;
972         if (do_one_ring_test(enq_core_count, deq_core_count) < 0)
973                 return -1;
974
975         /* test of creating ring with wrong size */
976         if (test_ring_creation_with_wrong_size() < 0)
977                 return -1;
978
979         /* test of creation ring with an used name */
980         if (test_ring_creation_with_an_used_name() < 0)
981                 return -1;
982
983         /* dump the ring status */
984         rte_ring_list_dump();
985
986         return 0;
987 }