4 * Copyright(c) 2010-2012 Intel Corporation. All rights reserved.
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
42 #include <sys/queue.h>
44 #include <rte_common.h>
46 #include <rte_memory.h>
47 #include <rte_memzone.h>
48 #include <rte_launch.h>
49 #include <rte_cycles.h>
50 #include <rte_tailq.h>
52 #include <rte_per_lcore.h>
53 #include <rte_lcore.h>
54 #include <rte_atomic.h>
55 #include <rte_branch_prediction.h>
56 #include <rte_malloc.h>
58 #include <rte_random.h>
59 #include <rte_common.h>
60 #include <rte_errno.h>
62 #include <cmdline_parse.h>
70 * #. Basic tests: done on one core:
72 * - Using single producer/single consumer functions:
74 * - Enqueue one object, two objects, MAX_BULK objects
75 * - Dequeue one object, two objects, MAX_BULK objects
76 * - Check that dequeued pointers are correct
78 * - Using multi producers/multi consumers functions:
80 * - Enqueue one object, two objects, MAX_BULK objects
81 * - Dequeue one object, two objects, MAX_BULK objects
82 * - Check that dequeued pointers are correct
84 * - Test watermark and default bulk enqueue/dequeue:
87 * - Set default bulk value
88 * - Enqueue objects, check that -EDQUOT is returned when
89 * watermark is exceeded
90 * - Check that dequeued pointers are correct
92 * #. Check live watermark change
94 * - Start a loop on another lcore that will enqueue and dequeue
95 * objects in a ring. It will monitor the value of watermark.
96 * - At the same time, change the watermark on the master lcore.
97 * - The slave lcore will check that watermark changes from 16 to 32.
99 * #. Performance tests.
101 * This test is done on the following configurations:
103 * - One core enqueuing, one core dequeuing
104 * - One core enqueuing, other cores dequeuing
105 * - One core dequeuing, other cores enqueuing
106 * - Half of the cores enqueuing, the other half dequeuing
108 * When only one core enqueues/dequeues, the test is done with the
109 * SP/SC functions in addition to the MP/MC functions.
111 * The test is done with different bulk size.
113 * On each core, the test enqueues or dequeues objects during
114 * TIME_S seconds. The number of successes and failures are stored on
115 * each core, then summed and displayed.
117 * The test checks that the number of enqueues is equal to the
118 * number of dequeues.
121 #define RING_SIZE 4096
126 static rte_atomic32_t synchro;
128 static struct rte_ring *r;
131 unsigned enq_success ;
135 unsigned deq_success;
137 } __rte_cache_aligned;
139 static struct test_stats test_stats[RTE_MAX_LCORE];
142 ring_enqueue_test(int (que_func)(struct rte_ring*, void * const *, unsigned),
143 void* arg, unsigned bulk_or_burst)
145 unsigned success = 0;
149 unsigned long dummy_obj;
150 void *obj_table[MAX_BULK];
152 unsigned lcore_id = rte_lcore_id();
153 unsigned count = *((unsigned*)arg);
154 uint64_t start_cycles, end_cycles;
155 uint64_t time_diff = 0, hz = rte_get_hpet_hz();
157 /* init dummy object table */
158 for (i = 0; i< MAX_BULK; i++) {
159 dummy_obj = lcore_id + 0x1000 + i;
160 obj_table[i] = (void *)dummy_obj;
163 /* wait synchro for slaves */
164 if (lcore_id != rte_get_master_lcore())
165 while (rte_atomic32_read(&synchro) == 0);
167 start_cycles = rte_get_hpet_cycles();
169 /* enqueue as many object as possible */
170 while (time_diff/hz < TIME_S) {
171 for (i = 0; likely(i < N); i++) {
172 ret = que_func(r, obj_table, count);
175 * 1: for bulk operation
176 * 0: for burst operation
179 /* The *count* objects enqueued, unless fail */
182 else if (ret == -EDQUOT)
187 /* The actual objects enqueued */
189 success += (ret & RTE_RING_SZ_MASK);
194 end_cycles = rte_get_hpet_cycles();
195 time_diff = end_cycles - start_cycles;
198 /* write statistics in a shared structure */
199 test_stats[lcore_id].enq_success = success;
200 test_stats[lcore_id].enq_quota = quota;
201 test_stats[lcore_id].enq_fail = fail;
207 ring_dequeue_test(int (que_func)(struct rte_ring*, void **, unsigned),
208 void* arg, unsigned bulk_or_burst)
210 unsigned success = 0;
213 void *obj_table[MAX_BULK];
215 unsigned lcore_id = rte_lcore_id();
216 unsigned count = *((unsigned*)arg);
217 uint64_t start_cycles, end_cycles;
218 uint64_t time_diff = 0, hz = rte_get_hpet_hz();
220 /* wait synchro for slaves */
221 if (lcore_id != rte_get_master_lcore())
222 while (rte_atomic32_read(&synchro) == 0);
224 start_cycles = rte_get_hpet_cycles();
226 /* dequeue as many object as possible */
227 while (time_diff/hz < TIME_S) {
228 for (i = 0; likely(i < N); i++) {
229 ret = que_func(r, obj_table, count);
232 * 1: for bulk operation
233 * 0: for burst operation
247 end_cycles = rte_get_hpet_cycles();
248 time_diff = end_cycles - start_cycles;
251 /* write statistics in a shared structure */
252 test_stats[lcore_id].deq_success = success;
253 test_stats[lcore_id].deq_fail = fail;
259 test_ring_per_core_sp_enqueue(void *arg)
261 return ring_enqueue_test(&rte_ring_sp_enqueue_bulk, arg, 1);
265 test_ring_per_core_mp_enqueue(void *arg)
267 return ring_enqueue_test(&rte_ring_mp_enqueue_bulk, arg, 1);
271 test_ring_per_core_mc_dequeue(void *arg)
273 return ring_dequeue_test(&rte_ring_mc_dequeue_bulk, arg, 1);
277 test_ring_per_core_sc_dequeue(void *arg)
279 return ring_dequeue_test(&rte_ring_sc_dequeue_bulk, arg, 1);
283 test_ring_per_core_sp_enqueue_burst(void *arg)
285 return ring_enqueue_test(&rte_ring_sp_enqueue_burst, arg, 0);
289 test_ring_per_core_mp_enqueue_burst(void *arg)
291 return ring_enqueue_test(&rte_ring_mp_enqueue_burst, arg, 0);
295 test_ring_per_core_mc_dequeue_burst(void *arg)
297 return ring_dequeue_test(&rte_ring_mc_dequeue_burst, arg, 0);
301 test_ring_per_core_sc_dequeue_burst(void *arg)
303 return ring_dequeue_test(&rte_ring_sc_dequeue_burst, arg, 0);
306 #define TEST_RING_VERIFY(exp) \
308 printf("error at %s:%d\tcondition " #exp " failed\n", \
309 __func__, __LINE__); \
314 #define TEST_RING_FULL_EMTPY_ITER 8
318 launch_cores(unsigned enq_core_count, unsigned deq_core_count,
319 unsigned n_enq_bulk, unsigned n_deq_bulk,
320 int sp, int sc, int bulk_not_burst)
324 unsigned rate, deq_remain = 0;
325 unsigned enq_total, deq_total;
326 struct test_stats sum;
327 int (*enq_f)(void *);
328 int (*deq_f)(void *);
329 unsigned cores = enq_core_count + deq_core_count;
332 rte_atomic32_set(&synchro, 0);
334 printf("ring_autotest e/d_core=%u,%u e/d_bulk=%u,%u ",
335 enq_core_count, deq_core_count, n_enq_bulk, n_deq_bulk);
336 printf("sp=%d sc=%d ", sp, sc);
338 if (bulk_not_burst) {
339 /* set enqueue function to be used */
341 enq_f = test_ring_per_core_sp_enqueue;
343 enq_f = test_ring_per_core_mp_enqueue;
345 /* set dequeue function to be used */
347 deq_f = test_ring_per_core_sc_dequeue;
349 deq_f = test_ring_per_core_mc_dequeue;
352 /* set enqueue function to be used */
354 enq_f = test_ring_per_core_sp_enqueue_burst;
356 enq_f = test_ring_per_core_mp_enqueue_burst;
358 /* set dequeue function to be used */
360 deq_f = test_ring_per_core_sc_dequeue_burst;
362 deq_f = test_ring_per_core_mc_dequeue_burst;
365 RTE_LCORE_FOREACH_SLAVE(lcore_id) {
366 if (enq_core_count != 0) {
368 rte_eal_remote_launch(enq_f, &n_enq_bulk, lcore_id);
370 if (deq_core_count != 1) {
372 rte_eal_remote_launch(deq_f, &n_deq_bulk, lcore_id);
376 memset(test_stats, 0, sizeof(test_stats));
378 /* start synchro and launch test on master */
379 rte_atomic32_set(&synchro, 1);
380 ret = deq_f(&n_deq_bulk);
383 RTE_LCORE_FOREACH_SLAVE(lcore_id) {
387 if (rte_eal_wait_lcore(lcore_id) < 0)
391 memset(&sum, 0, sizeof(sum));
392 for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
393 sum.enq_success += test_stats[lcore_id].enq_success;
394 sum.enq_quota += test_stats[lcore_id].enq_quota;
395 sum.enq_fail += test_stats[lcore_id].enq_fail;
396 sum.deq_success += test_stats[lcore_id].deq_success;
397 sum.deq_fail += test_stats[lcore_id].deq_fail;
401 while (rte_ring_sc_dequeue(r, &obj) == 0)
405 printf("per-lcore test returned -1\n");
409 enq_total = sum.enq_success + sum.enq_quota;
410 deq_total = sum.deq_success + deq_remain;
412 rate = deq_total/TIME_S;
414 printf("rate_persec=%u\n", rate);
416 if (enq_total != deq_total) {
417 printf("invalid enq/deq_success counter: %u %u\n",
418 enq_total, deq_total);
426 do_one_ring_test2(unsigned enq_core_count, unsigned deq_core_count,
427 unsigned n_enq_bulk, unsigned n_deq_bulk, unsigned bulk_or_burst)
433 do_sp = (enq_core_count == 1) ? 1 : 0;
434 do_sc = (deq_core_count == 1) ? 1 : 0;
436 for (sp = 0; sp <= do_sp; sp ++) {
437 for (sc = 0; sc <= do_sc; sc ++) {
438 ret = launch_cores(enq_core_count, deq_core_count,
439 n_enq_bulk, n_deq_bulk, sp, sc, bulk_or_burst);
448 do_one_ring_test(unsigned enq_core_count, unsigned deq_core_count,
449 unsigned bulk_or_burst)
451 unsigned bulk_enqueue_tab[] = { 1, 2, 4, 32, 0 };
452 unsigned bulk_dequeue_tab[] = { 1, 2, 4, 32, 0 };
453 unsigned *bulk_enqueue_ptr;
454 unsigned *bulk_dequeue_ptr;
457 for (bulk_enqueue_ptr = bulk_enqueue_tab;
459 bulk_enqueue_ptr++) {
461 for (bulk_dequeue_ptr = bulk_dequeue_tab;
463 bulk_dequeue_ptr++) {
465 ret = do_one_ring_test2(enq_core_count, deq_core_count,
477 check_live_watermark_change(__attribute__((unused)) void *dummy)
479 uint64_t hz = rte_get_hpet_hz();
480 void *obj_table[MAX_BULK];
481 unsigned watermark, watermark_old = 16;
482 uint64_t cur_time, end_time;
487 /* init the object table */
488 memset(obj_table, 0, sizeof(obj_table));
489 end_time = rte_get_hpet_cycles() + (hz * 2);
491 /* check that bulk and watermark are 4 and 32 (respectively) */
494 /* add in ring until we reach watermark */
496 for (i = 0; i < 16; i ++) {
499 ret = rte_ring_enqueue_bulk(r, obj_table, count);
502 if (ret != -EDQUOT) {
503 printf("Cannot enqueue objects, or watermark not "
504 "reached (ret=%d)\n", ret);
508 /* read watermark, the only change allowed is from 16 to 32 */
509 watermark = r->prod.watermark;
510 if (watermark != watermark_old &&
511 (watermark_old != 16 || watermark != 32)) {
512 printf("Bad watermark change %u -> %u\n", watermark_old,
516 watermark_old = watermark;
518 /* dequeue objects from ring */
520 ret = rte_ring_dequeue_bulk(r, obj_table, count);
522 printf("Cannot dequeue (ret=%d)\n", ret);
527 cur_time = rte_get_hpet_cycles();
528 diff = end_time - cur_time;
531 if (watermark_old != 32 ) {
532 printf(" watermark was not updated (wm=%u)\n",
541 test_live_watermark_change(void)
543 unsigned lcore_id = rte_lcore_id();
544 unsigned lcore_id2 = rte_get_next_lcore(lcore_id, 0, 1);
546 printf("Test watermark live modification\n");
547 rte_ring_set_water_mark(r, 16);
549 /* launch a thread that will enqueue and dequeue, checking
550 * watermark and quota */
551 rte_eal_remote_launch(check_live_watermark_change, NULL, lcore_id2);
554 rte_ring_set_water_mark(r, 32);
557 if (rte_eal_wait_lcore(lcore_id2) < 0)
563 /* Test for catch on invalid watermark values */
565 test_set_watermark( void ){
569 struct rte_ring *r = rte_ring_lookup("test_ring_basic_ex");
571 printf( " ring lookup failed\n" );
574 count = r->prod.size*2;
575 setwm = rte_ring_set_water_mark(r, count);
576 if (setwm != -EINVAL){
577 printf("Test failed to detect invalid watermark count value\n");
582 setwm = rte_ring_set_water_mark(r, count);
583 if (r->prod.watermark != r->prod.size) {
584 printf("Test failed to detect invalid watermark count value\n");
594 * helper routine for test_ring_basic
597 test_ring_basic_full_empty(void * const src[], void *dst[])
600 const unsigned rsz = RING_SIZE - 1;
602 printf("Basic full/empty test\n");
604 for (i = 0; TEST_RING_FULL_EMTPY_ITER != i; i++) {
606 /* random shift in the ring */
607 rand = RTE_MAX(rte_rand() % RING_SIZE, 1UL);
608 printf("%s: iteration %u, random shift: %u;\n",
610 TEST_RING_VERIFY(-ENOBUFS != rte_ring_enqueue_bulk(r, src,
612 TEST_RING_VERIFY(0 == rte_ring_dequeue_bulk(r, dst, rand));
615 TEST_RING_VERIFY(-ENOBUFS != rte_ring_enqueue_bulk(r, src,
617 TEST_RING_VERIFY(0 == rte_ring_free_count(r));
618 TEST_RING_VERIFY(rsz == rte_ring_count(r));
619 TEST_RING_VERIFY(rte_ring_full(r));
620 TEST_RING_VERIFY(0 == rte_ring_empty(r));
623 TEST_RING_VERIFY(0 == rte_ring_dequeue_bulk(r, dst, rsz));
624 TEST_RING_VERIFY(rsz == rte_ring_free_count(r));
625 TEST_RING_VERIFY(0 == rte_ring_count(r));
626 TEST_RING_VERIFY(0 == rte_ring_full(r));
627 TEST_RING_VERIFY(rte_ring_empty(r));
630 TEST_RING_VERIFY(0 == memcmp(src, dst, rsz));
637 test_ring_basic(void)
639 void **src = NULL, **cur_src = NULL, **dst = NULL, **cur_dst = NULL;
641 unsigned i, num_elems;
643 /* alloc dummy object pointers */
644 src = malloc(RING_SIZE*2*sizeof(void *));
648 for (i = 0; i < RING_SIZE*2 ; i++) {
649 src[i] = (void *)(unsigned long)i;
653 /* alloc some room for copied objects */
654 dst = malloc(RING_SIZE*2*sizeof(void *));
658 memset(dst, 0, RING_SIZE*2*sizeof(void *));
661 printf("enqueue 1 obj\n");
662 ret = rte_ring_sp_enqueue_bulk(r, cur_src, 1);
667 printf("enqueue 2 objs\n");
668 ret = rte_ring_sp_enqueue_bulk(r, cur_src, 2);
673 printf("enqueue MAX_BULK objs\n");
674 ret = rte_ring_sp_enqueue_bulk(r, cur_src, MAX_BULK);
679 printf("dequeue 1 obj\n");
680 ret = rte_ring_sc_dequeue_bulk(r, cur_dst, 1);
685 printf("dequeue 2 objs\n");
686 ret = rte_ring_sc_dequeue_bulk(r, cur_dst, 2);
691 printf("dequeue MAX_BULK objs\n");
692 ret = rte_ring_sc_dequeue_bulk(r, cur_dst, MAX_BULK);
698 if (memcmp(src, dst, cur_dst - dst)) {
699 test_hexdump("src", src, cur_src - src);
700 test_hexdump("dst", dst, cur_dst - dst);
701 printf("data after dequeue is not the same\n");
707 printf("enqueue 1 obj\n");
708 ret = rte_ring_mp_enqueue_bulk(r, cur_src, 1);
713 printf("enqueue 2 objs\n");
714 ret = rte_ring_mp_enqueue_bulk(r, cur_src, 2);
719 printf("enqueue MAX_BULK objs\n");
720 ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK);
725 printf("dequeue 1 obj\n");
726 ret = rte_ring_mc_dequeue_bulk(r, cur_dst, 1);
731 printf("dequeue 2 objs\n");
732 ret = rte_ring_mc_dequeue_bulk(r, cur_dst, 2);
737 printf("dequeue MAX_BULK objs\n");
738 ret = rte_ring_mc_dequeue_bulk(r, cur_dst, MAX_BULK);
744 if (memcmp(src, dst, cur_dst - dst)) {
745 test_hexdump("src", src, cur_src - src);
746 test_hexdump("dst", dst, cur_dst - dst);
747 printf("data after dequeue is not the same\n");
753 printf("fill and empty the ring\n");
754 for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
755 ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK);
759 ret = rte_ring_mc_dequeue_bulk(r, cur_dst, MAX_BULK);
766 if (memcmp(src, dst, cur_dst - dst)) {
767 test_hexdump("src", src, cur_src - src);
768 test_hexdump("dst", dst, cur_dst - dst);
769 printf("data after dequeue is not the same\n");
773 if (test_ring_basic_full_empty(src, dst) != 0)
779 printf("test watermark and default bulk enqueue / dequeue\n");
780 rte_ring_set_water_mark(r, 20);
786 ret = rte_ring_enqueue_bulk(r, cur_src, num_elems);
787 cur_src += num_elems;
789 printf("Cannot enqueue\n");
792 ret = rte_ring_enqueue_bulk(r, cur_src, num_elems);
793 cur_src += num_elems;
794 if (ret != -EDQUOT) {
795 printf("Watermark not exceeded\n");
798 ret = rte_ring_dequeue_bulk(r, cur_dst, num_elems);
799 cur_dst += num_elems;
801 printf("Cannot dequeue\n");
804 ret = rte_ring_dequeue_bulk(r, cur_dst, num_elems);
805 cur_dst += num_elems;
807 printf("Cannot dequeue2\n");
812 if (memcmp(src, dst, cur_dst - dst)) {
813 test_hexdump("src", src, cur_src - src);
814 test_hexdump("dst", dst, cur_dst - dst);
815 printf("data after dequeue is not the same\n");
822 ret = rte_ring_mp_enqueue(r, cur_src);
827 ret = rte_ring_mc_dequeue(r, cur_dst);
850 test_ring_burst_basic(void)
852 void **src = NULL, **cur_src = NULL, **dst = NULL, **cur_dst = NULL;
856 /* alloc dummy object pointers */
857 src = malloc(RING_SIZE*2*sizeof(void *));
861 for (i = 0; i < RING_SIZE*2 ; i++) {
862 src[i] = (void *)(unsigned long)i;
866 /* alloc some room for copied objects */
867 dst = malloc(RING_SIZE*2*sizeof(void *));
871 memset(dst, 0, RING_SIZE*2*sizeof(void *));
874 printf("Test SP & SC basic functions \n");
875 printf("enqueue 1 obj\n");
876 ret = rte_ring_sp_enqueue_burst(r, cur_src, 1);
878 if ((ret & RTE_RING_SZ_MASK) != 1)
881 printf("enqueue 2 objs\n");
882 ret = rte_ring_sp_enqueue_burst(r, cur_src, 2);
884 if ((ret & RTE_RING_SZ_MASK) != 2)
887 printf("enqueue MAX_BULK objs\n");
888 ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK) ;
890 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
893 printf("dequeue 1 obj\n");
894 ret = rte_ring_sc_dequeue_burst(r, cur_dst, 1) ;
896 if ((ret & RTE_RING_SZ_MASK) != 1)
899 printf("dequeue 2 objs\n");
900 ret = rte_ring_sc_dequeue_burst(r, cur_dst, 2);
902 if ((ret & RTE_RING_SZ_MASK) != 2)
905 printf("dequeue MAX_BULK objs\n");
906 ret = rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
908 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
912 if (memcmp(src, dst, cur_dst - dst)) {
913 test_hexdump("src", src, cur_src - src);
914 test_hexdump("dst", dst, cur_dst - dst);
915 printf("data after dequeue is not the same\n");
922 printf("Test enqueue without enough memory space \n");
923 for (i = 0; i< (RING_SIZE/MAX_BULK - 1); i++) {
924 ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
926 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK) {
931 printf("Enqueue 2 objects, free entries = MAX_BULK - 2 \n");
932 ret = rte_ring_sp_enqueue_burst(r, cur_src, 2);
934 if ((ret & RTE_RING_SZ_MASK) != 2)
937 printf("Enqueue the remaining entries = MAX_BULK - 2 \n");
938 /* Always one free entry left */
939 ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
940 cur_src += MAX_BULK - 3;
941 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
944 printf("Test if ring is full \n");
945 if (rte_ring_full(r) != 1)
948 printf("Test enqueue for a full entry \n");
949 ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
950 if ((ret & RTE_RING_SZ_MASK) != 0)
953 printf("Test dequeue without enough objects \n");
954 for (i = 0; i<RING_SIZE/MAX_BULK - 1; i++) {
955 ret = rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
957 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
961 /* Available memory space for the exact MAX_BULK entries */
962 ret = rte_ring_sc_dequeue_burst(r, cur_dst, 2);
964 if ((ret & RTE_RING_SZ_MASK) != 2)
967 ret = rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
968 cur_dst += MAX_BULK - 3;
969 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
972 printf("Test if ring is empty \n");
973 /* Check if ring is empty */
974 if (1 != rte_ring_empty(r))
978 if (memcmp(src, dst, cur_dst - dst)) {
979 test_hexdump("src", src, cur_src - src);
980 test_hexdump("dst", dst, cur_dst - dst);
981 printf("data after dequeue is not the same\n");
988 printf("Test MP & MC basic functions \n");
990 printf("enqueue 1 obj\n");
991 ret = rte_ring_mp_enqueue_burst(r, cur_src, 1);
993 if ((ret & RTE_RING_SZ_MASK) != 1)
996 printf("enqueue 2 objs\n");
997 ret = rte_ring_mp_enqueue_burst(r, cur_src, 2);
999 if ((ret & RTE_RING_SZ_MASK) != 2)
1002 printf("enqueue MAX_BULK objs\n");
1003 ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
1004 cur_src += MAX_BULK;
1005 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1008 printf("dequeue 1 obj\n");
1009 ret = rte_ring_mc_dequeue_burst(r, cur_dst, 1);
1011 if ((ret & RTE_RING_SZ_MASK) != 1)
1014 printf("dequeue 2 objs\n");
1015 ret = rte_ring_mc_dequeue_burst(r, cur_dst, 2);
1017 if ((ret & RTE_RING_SZ_MASK) != 2)
1020 printf("dequeue MAX_BULK objs\n");
1021 ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK);
1022 cur_dst += MAX_BULK;
1023 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1027 if (memcmp(src, dst, cur_dst - dst)) {
1028 test_hexdump("src", src, cur_src - src);
1029 test_hexdump("dst", dst, cur_dst - dst);
1030 printf("data after dequeue is not the same\n");
1037 printf("fill and empty the ring\n");
1038 for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
1039 ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
1040 cur_src += MAX_BULK;
1041 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1043 ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK);
1044 cur_dst += MAX_BULK;
1045 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1050 if (memcmp(src, dst, cur_dst - dst)) {
1051 test_hexdump("src", src, cur_src - src);
1052 test_hexdump("dst", dst, cur_dst - dst);
1053 printf("data after dequeue is not the same\n");
1060 printf("Test enqueue without enough memory space \n");
1061 for (i = 0; i<RING_SIZE/MAX_BULK - 1; i++) {
1062 ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
1063 cur_src += MAX_BULK;
1064 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1068 /* Available memory space for the exact MAX_BULK objects */
1069 ret = rte_ring_mp_enqueue_burst(r, cur_src, 2);
1071 if ((ret & RTE_RING_SZ_MASK) != 2)
1074 ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
1075 cur_src += MAX_BULK - 3;
1076 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
1080 printf("Test dequeue without enough objects \n");
1081 for (i = 0; i<RING_SIZE/MAX_BULK - 1; i++) {
1082 ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK);
1083 cur_dst += MAX_BULK;
1084 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1088 /* Available objects - the exact MAX_BULK */
1089 ret = rte_ring_mc_dequeue_burst(r, cur_dst, 2);
1091 if ((ret & RTE_RING_SZ_MASK) != 2)
1094 ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK);
1095 cur_dst += MAX_BULK - 3;
1096 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
1100 if (memcmp(src, dst, cur_dst - dst)) {
1101 test_hexdump("src", src, cur_src - src);
1102 test_hexdump("dst", dst, cur_dst - dst);
1103 printf("data after dequeue is not the same\n");
1110 printf("Covering rte_ring_enqueue_burst functions \n");
1112 ret = rte_ring_enqueue_burst(r, cur_src, 2);
1114 if ((ret & RTE_RING_SZ_MASK) != 2)
1117 ret = rte_ring_dequeue_burst(r, cur_dst, 2);
1122 /* Free memory before test completed */
1138 test_ring_stats(void)
1141 #ifndef RTE_LIBRTE_RING_DEBUG
1142 printf("Enable RTE_LIBRTE_RING_DEBUG to test ring stats.\n");
1145 void **src = NULL, **cur_src = NULL, **dst = NULL, **cur_dst = NULL;
1148 unsigned num_items = 0;
1149 unsigned failed_enqueue_ops = 0;
1150 unsigned failed_enqueue_items = 0;
1151 unsigned failed_dequeue_ops = 0;
1152 unsigned failed_dequeue_items = 0;
1153 unsigned last_enqueue_ops = 0;
1154 unsigned last_enqueue_items = 0;
1155 unsigned last_quota_ops = 0;
1156 unsigned last_quota_items = 0;
1157 unsigned lcore_id = rte_lcore_id();
1158 struct rte_ring_debug_stats *ring_stats = &r->stats[lcore_id];
1160 printf("Test the ring stats.\n");
1162 /* Reset the watermark in case it was set in another test. */
1163 rte_ring_set_water_mark(r, 0);
1165 /* Reset the ring stats. */
1166 memset(&r->stats[lcore_id], 0, sizeof(r->stats[lcore_id]));
1168 /* Allocate some dummy object pointers. */
1169 src = malloc(RING_SIZE*2*sizeof(void *));
1173 for (i = 0; i < RING_SIZE*2 ; i++) {
1174 src[i] = (void *)(unsigned long)i;
1177 /* Allocate some memory for copied objects. */
1178 dst = malloc(RING_SIZE*2*sizeof(void *));
1182 memset(dst, 0, RING_SIZE*2*sizeof(void *));
1184 /* Set the head and tail pointers. */
1188 /* Do Enqueue tests. */
1189 printf("Test the dequeue stats.\n");
1191 /* Fill the ring up to RING_SIZE -1. */
1192 printf("Fill the ring.\n");
1193 for (i = 0; i< (RING_SIZE/MAX_BULK); i++) {
1194 rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
1195 cur_src += MAX_BULK;
1198 /* Adjust for final enqueue = MAX_BULK -1. */
1201 printf("Verify that the ring is full.\n");
1202 if (rte_ring_full(r) != 1)
1206 printf("Verify the enqueue success stats.\n");
1207 /* Stats should match above enqueue operations to fill the ring. */
1208 if (ring_stats->enq_success_bulk != (RING_SIZE/MAX_BULK))
1211 /* Current max objects is RING_SIZE -1. */
1212 if (ring_stats->enq_success_objs != RING_SIZE -1)
1215 /* Shouldn't have any failures yet. */
1216 if (ring_stats->enq_fail_bulk != 0)
1218 if (ring_stats->enq_fail_objs != 0)
1222 printf("Test stats for SP burst enqueue to a full ring.\n");
1224 ret = rte_ring_sp_enqueue_burst(r, cur_src, num_items);
1225 if ((ret & RTE_RING_SZ_MASK) != 0)
1228 failed_enqueue_ops += 1;
1229 failed_enqueue_items += num_items;
1231 /* The enqueue should have failed. */
1232 if (ring_stats->enq_fail_bulk != failed_enqueue_ops)
1234 if (ring_stats->enq_fail_objs != failed_enqueue_items)
1238 printf("Test stats for SP bulk enqueue to a full ring.\n");
1240 ret = rte_ring_sp_enqueue_bulk(r, cur_src, num_items);
1241 if (ret != -ENOBUFS)
1244 failed_enqueue_ops += 1;
1245 failed_enqueue_items += num_items;
1247 /* The enqueue should have failed. */
1248 if (ring_stats->enq_fail_bulk != failed_enqueue_ops)
1250 if (ring_stats->enq_fail_objs != failed_enqueue_items)
1254 printf("Test stats for MP burst enqueue to a full ring.\n");
1256 ret = rte_ring_mp_enqueue_burst(r, cur_src, num_items);
1257 if ((ret & RTE_RING_SZ_MASK) != 0)
1260 failed_enqueue_ops += 1;
1261 failed_enqueue_items += num_items;
1263 /* The enqueue should have failed. */
1264 if (ring_stats->enq_fail_bulk != failed_enqueue_ops)
1266 if (ring_stats->enq_fail_objs != failed_enqueue_items)
1270 printf("Test stats for MP bulk enqueue to a full ring.\n");
1272 ret = rte_ring_mp_enqueue_bulk(r, cur_src, num_items);
1273 if (ret != -ENOBUFS)
1276 failed_enqueue_ops += 1;
1277 failed_enqueue_items += num_items;
1279 /* The enqueue should have failed. */
1280 if (ring_stats->enq_fail_bulk != failed_enqueue_ops)
1282 if (ring_stats->enq_fail_objs != failed_enqueue_items)
1286 /* Do Dequeue tests. */
1287 printf("Test the dequeue stats.\n");
1289 printf("Empty the ring.\n");
1290 for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
1291 rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
1292 cur_dst += MAX_BULK;
1295 /* There was only RING_SIZE -1 objects to dequeue. */
1298 printf("Verify ring is empty.\n");
1299 if (1 != rte_ring_empty(r))
1302 printf("Verify the dequeue success stats.\n");
1303 /* Stats should match above dequeue operations. */
1304 if (ring_stats->deq_success_bulk != (RING_SIZE/MAX_BULK))
1307 /* Objects dequeued is RING_SIZE -1. */
1308 if (ring_stats->deq_success_objs != RING_SIZE -1)
1311 /* Shouldn't have any dequeue failure stats yet. */
1312 if (ring_stats->deq_fail_bulk != 0)
1315 printf("Test stats for SC burst dequeue with an empty ring.\n");
1317 ret = rte_ring_sc_dequeue_burst(r, cur_dst, num_items);
1318 if ((ret & RTE_RING_SZ_MASK) != 0)
1321 failed_dequeue_ops += 1;
1322 failed_dequeue_items += num_items;
1324 /* The dequeue should have failed. */
1325 if (ring_stats->deq_fail_bulk != failed_dequeue_ops)
1327 if (ring_stats->deq_fail_objs != failed_dequeue_items)
1331 printf("Test stats for SC bulk dequeue with an empty ring.\n");
1333 ret = rte_ring_sc_dequeue_bulk(r, cur_dst, num_items);
1337 failed_dequeue_ops += 1;
1338 failed_dequeue_items += num_items;
1340 /* The dequeue should have failed. */
1341 if (ring_stats->deq_fail_bulk != failed_dequeue_ops)
1343 if (ring_stats->deq_fail_objs != failed_dequeue_items)
1347 printf("Test stats for MC burst dequeue with an empty ring.\n");
1349 ret = rte_ring_mc_dequeue_burst(r, cur_dst, num_items);
1350 if ((ret & RTE_RING_SZ_MASK) != 0)
1352 failed_dequeue_ops += 1;
1353 failed_dequeue_items += num_items;
1355 /* The dequeue should have failed. */
1356 if (ring_stats->deq_fail_bulk != failed_dequeue_ops)
1358 if (ring_stats->deq_fail_objs != failed_dequeue_items)
1362 printf("Test stats for MC bulk dequeue with an empty ring.\n");
1364 ret = rte_ring_mc_dequeue_bulk(r, cur_dst, num_items);
1368 failed_dequeue_ops += 1;
1369 failed_dequeue_items += num_items;
1371 /* The dequeue should have failed. */
1372 if (ring_stats->deq_fail_bulk != failed_dequeue_ops)
1374 if (ring_stats->deq_fail_objs != failed_dequeue_items)
1378 printf("Test total enqueue/dequeue stats.\n");
1379 /* At this point the enqueue and dequeue stats should be the same. */
1380 if (ring_stats->enq_success_bulk != ring_stats->deq_success_bulk)
1382 if (ring_stats->enq_success_objs != ring_stats->deq_success_objs)
1384 if (ring_stats->enq_fail_bulk != ring_stats->deq_fail_bulk)
1386 if (ring_stats->enq_fail_objs != ring_stats->deq_fail_objs)
1390 /* Watermark Tests. */
1391 printf("Test the watermark/quota stats.\n");
1393 printf("Verify the initial watermark stats.\n");
1394 /* Watermark stats should be 0 since there is no watermark. */
1395 if (ring_stats->enq_quota_bulk != 0)
1397 if (ring_stats->enq_quota_objs != 0)
1400 /* Set a watermark. */
1401 rte_ring_set_water_mark(r, 16);
1403 /* Reset pointers. */
1407 last_enqueue_ops = ring_stats->enq_success_bulk;
1408 last_enqueue_items = ring_stats->enq_success_objs;
1411 printf("Test stats for SP burst enqueue below watermark.\n");
1413 ret = rte_ring_sp_enqueue_burst(r, cur_src, num_items);
1414 if ((ret & RTE_RING_SZ_MASK) != num_items)
1417 /* Watermark stats should still be 0. */
1418 if (ring_stats->enq_quota_bulk != 0)
1420 if (ring_stats->enq_quota_objs != 0)
1423 /* Success stats should have increased. */
1424 if (ring_stats->enq_success_bulk != last_enqueue_ops + 1)
1426 if (ring_stats->enq_success_objs != last_enqueue_items + num_items)
1429 last_enqueue_ops = ring_stats->enq_success_bulk;
1430 last_enqueue_items = ring_stats->enq_success_objs;
1433 printf("Test stats for SP burst enqueue at watermark.\n");
1435 ret = rte_ring_sp_enqueue_burst(r, cur_src, num_items);
1436 if ((ret & RTE_RING_SZ_MASK) != num_items)
1439 /* Watermark stats should have changed. */
1440 if (ring_stats->enq_quota_bulk != 1)
1442 if (ring_stats->enq_quota_objs != num_items)
1445 last_quota_ops = ring_stats->enq_quota_bulk;
1446 last_quota_items = ring_stats->enq_quota_objs;
1449 printf("Test stats for SP burst enqueue above watermark.\n");
1451 ret = rte_ring_sp_enqueue_burst(r, cur_src, num_items);
1452 if ((ret & RTE_RING_SZ_MASK) != num_items)
1455 /* Watermark stats should have changed. */
1456 if (ring_stats->enq_quota_bulk != last_quota_ops +1)
1458 if (ring_stats->enq_quota_objs != last_quota_items + num_items)
1461 last_quota_ops = ring_stats->enq_quota_bulk;
1462 last_quota_items = ring_stats->enq_quota_objs;
1465 printf("Test stats for MP burst enqueue above watermark.\n");
1467 ret = rte_ring_mp_enqueue_burst(r, cur_src, num_items);
1468 if ((ret & RTE_RING_SZ_MASK) != num_items)
1471 /* Watermark stats should have changed. */
1472 if (ring_stats->enq_quota_bulk != last_quota_ops +1)
1474 if (ring_stats->enq_quota_objs != last_quota_items + num_items)
1477 last_quota_ops = ring_stats->enq_quota_bulk;
1478 last_quota_items = ring_stats->enq_quota_objs;
1481 printf("Test stats for SP bulk enqueue above watermark.\n");
1483 ret = rte_ring_sp_enqueue_bulk(r, cur_src, num_items);
1487 /* Watermark stats should have changed. */
1488 if (ring_stats->enq_quota_bulk != last_quota_ops +1)
1490 if (ring_stats->enq_quota_objs != last_quota_items + num_items)
1493 last_quota_ops = ring_stats->enq_quota_bulk;
1494 last_quota_items = ring_stats->enq_quota_objs;
1497 printf("Test stats for MP bulk enqueue above watermark.\n");
1499 ret = rte_ring_mp_enqueue_bulk(r, cur_src, num_items);
1503 /* Watermark stats should have changed. */
1504 if (ring_stats->enq_quota_bulk != last_quota_ops +1)
1506 if (ring_stats->enq_quota_objs != last_quota_items + num_items)
1509 printf("Test watermark success stats.\n");
1510 /* Success stats should be same as last non-watermarked enqueue. */
1511 if (ring_stats->enq_success_bulk != last_enqueue_ops)
1513 if (ring_stats->enq_success_objs != last_enqueue_items)
1519 /* Empty the ring. */
1520 for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
1521 rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
1522 cur_dst += MAX_BULK;
1525 /* Reset the watermark. */
1526 rte_ring_set_water_mark(r, 0);
1528 /* Reset the ring stats. */
1529 memset(&r->stats[lcore_id], 0, sizeof(r->stats[lcore_id]));
1531 /* Free memory before test completed */
1548 * it will always fail to create ring with a wrong ring size number in this function
1551 test_ring_creation_with_wrong_size(void)
1553 struct rte_ring * rp = NULL;
1555 /* Test if ring size is not power of 2 */
1556 rp = rte_ring_create("test_bad_ring_size", RING_SIZE + 1, SOCKET_ID_ANY, 0);
1561 /* Test if ring size is exceeding the limit */
1562 rp = rte_ring_create("test_bad_ring_size", (RTE_RING_SZ_MASK + 1), SOCKET_ID_ANY, 0);
1570 * it tests if it would always fail to create ring with an used ring name
1573 test_ring_creation_with_an_used_name(void)
1575 struct rte_ring * rp;
1577 rp = rte_ring_create("test", RING_SIZE, SOCKET_ID_ANY, 0);
1585 * Test to if a non-power of 2 count causes the create
1586 * function to fail correctly
1589 test_create_count_odd(void)
1591 struct rte_ring *r = rte_ring_create("test_ring_count",
1592 4097, SOCKET_ID_ANY, 0 );
1600 test_lookup_null(void)
1602 struct rte_ring *rlp = rte_ring_lookup("ring_not_found");
1604 if (rte_errno != ENOENT){
1605 printf( "test failed to returnn error on null pointer\n");
1612 * it tests some more basic ring operations
1615 test_ring_basic_ex(void)
1619 struct rte_ring * rp;
1622 obj = (void **)rte_zmalloc("test_ring_basic_ex_malloc", (RING_SIZE * sizeof(void *)), 0);
1624 printf("test_ring_basic_ex fail to rte_malloc\n");
1628 rp = rte_ring_create("test_ring_basic_ex", RING_SIZE, SOCKET_ID_ANY,
1629 RING_F_SP_ENQ | RING_F_SC_DEQ);
1631 printf("test_ring_basic_ex fail to create ring\n");
1635 if (rte_ring_lookup("test_ring_basic_ex") != rp) {
1639 if (rte_ring_empty(rp) != 1) {
1640 printf("test_ring_basic_ex ring is not empty but it should be\n");
1644 printf("%u ring entries are now free\n", rte_ring_free_count(rp));
1646 for (i = 0; i < RING_SIZE; i ++) {
1647 rte_ring_enqueue(rp, obj[i]);
1650 if (rte_ring_full(rp) != 1) {
1651 printf("test_ring_basic_ex ring is not full but it should be\n");
1655 for (i = 0; i < RING_SIZE; i ++) {
1656 rte_ring_dequeue(rp, &obj[i]);
1659 if (rte_ring_empty(rp) != 1) {
1660 printf("test_ring_basic_ex ring is not empty but it should be\n");
1664 /* Covering the ring burst operation */
1665 ret = rte_ring_enqueue_burst(rp, obj, 2);
1666 if ((ret & RTE_RING_SZ_MASK) != 2) {
1667 printf("test_ring_basic_ex: rte_ring_enqueue_burst fails \n");
1671 ret = rte_ring_dequeue_burst(rp, obj, 2);
1673 printf("test_ring_basic_ex: rte_ring_dequeue_burst fails \n");
1688 unsigned enq_core_count, deq_core_count;
1690 /* some more basic operations */
1691 if (test_ring_basic_ex() < 0)
1694 rte_atomic32_init(&synchro);
1697 r = rte_ring_create("test", RING_SIZE, SOCKET_ID_ANY, 0);
1701 /* retrieve the ring from its name */
1702 if (rte_ring_lookup("test") != r) {
1703 printf("Cannot lookup ring from its name\n");
1707 /* burst operations */
1708 if (test_ring_burst_basic() < 0)
1711 /* basic operations */
1712 if (test_ring_basic() < 0)
1716 if (test_ring_stats() < 0)
1719 /* basic operations */
1720 if (test_live_watermark_change() < 0)
1723 if ( test_set_watermark() < 0){
1724 printf ("Test failed to detect invalid parameter\n");
1728 printf ( "Test detected forced bad watermark values\n");
1730 if ( test_create_count_odd() < 0){
1731 printf ("Test failed to detect odd count\n");
1735 printf ( "Test detected odd count\n");
1737 if ( test_lookup_null() < 0){
1738 printf ("Test failed to detect NULL ring lookup\n");
1742 printf ( "Test detected NULL ring lookup \n");
1744 printf("start performance tests \n");
1746 /* one lcore for enqueue, one for dequeue */
1749 if (do_one_ring_test(enq_core_count, deq_core_count, 1) < 0)
1752 /* max cores for enqueue, one for dequeue */
1753 enq_core_count = rte_lcore_count() - 1;
1755 if (do_one_ring_test(enq_core_count, deq_core_count, 1) < 0)
1758 /* max cores for dequeue, one for enqueue */
1760 deq_core_count = rte_lcore_count() - 1;
1761 if (do_one_ring_test(enq_core_count, deq_core_count, 1) < 0)
1764 /* half for enqueue and half for dequeue */
1765 enq_core_count = rte_lcore_count() / 2;
1766 deq_core_count = rte_lcore_count() / 2;
1767 if (do_one_ring_test(enq_core_count, deq_core_count, 1) < 0)
1770 printf("start performance tests - burst operations \n");
1772 /* one lcore for enqueue, one for dequeue */
1775 if (do_one_ring_test(enq_core_count, deq_core_count, 0) < 0)
1778 /* max cores for enqueue, one for dequeue */
1779 enq_core_count = rte_lcore_count() - 1;
1781 if (do_one_ring_test(enq_core_count, deq_core_count, 0) < 0)
1784 /* max cores for dequeue, one for enqueue */
1786 deq_core_count = rte_lcore_count() - 1;
1787 if (do_one_ring_test(enq_core_count, deq_core_count, 0) < 0)
1790 /* half for enqueue and half for dequeue */
1791 enq_core_count = rte_lcore_count() / 2;
1792 deq_core_count = rte_lcore_count() / 2;
1793 if (do_one_ring_test(enq_core_count, deq_core_count, 0) < 0)
1796 /* test of creating ring with wrong size */
1797 if (test_ring_creation_with_wrong_size() < 0)
1800 /* test of creation ring with an used name */
1801 if (test_ring_creation_with_an_used_name() < 0)
1804 /* dump the ring status */
1805 rte_ring_list_dump();