4 * Copyright(c) 2010-2013 Intel Corporation. All rights reserved.
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * * Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * * Neither the name of Intel Corporation nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
42 #include <sys/queue.h>
44 #include <rte_common.h>
46 #include <rte_memory.h>
47 #include <rte_memzone.h>
48 #include <rte_launch.h>
49 #include <rte_cycles.h>
50 #include <rte_tailq.h>
52 #include <rte_per_lcore.h>
53 #include <rte_lcore.h>
54 #include <rte_atomic.h>
55 #include <rte_branch_prediction.h>
56 #include <rte_malloc.h>
58 #include <rte_random.h>
59 #include <rte_common.h>
60 #include <rte_errno.h>
62 #include <cmdline_parse.h>
70 * #. Basic tests: done on one core:
72 * - Using single producer/single consumer functions:
74 * - Enqueue one object, two objects, MAX_BULK objects
75 * - Dequeue one object, two objects, MAX_BULK objects
76 * - Check that dequeued pointers are correct
78 * - Using multi producers/multi consumers functions:
80 * - Enqueue one object, two objects, MAX_BULK objects
81 * - Dequeue one object, two objects, MAX_BULK objects
82 * - Check that dequeued pointers are correct
84 * - Test watermark and default bulk enqueue/dequeue:
87 * - Set default bulk value
88 * - Enqueue objects, check that -EDQUOT is returned when
89 * watermark is exceeded
90 * - Check that dequeued pointers are correct
92 * #. Check live watermark change
94 * - Start a loop on another lcore that will enqueue and dequeue
95 * objects in a ring. It will monitor the value of watermark.
96 * - At the same time, change the watermark on the master lcore.
97 * - The slave lcore will check that watermark changes from 16 to 32.
99 * #. Performance tests.
101 * This test is done on the following configurations:
103 * - One core enqueuing, one core dequeuing
104 * - One core enqueuing, other cores dequeuing
105 * - One core dequeuing, other cores enqueuing
106 * - Half of the cores enqueuing, the other half dequeuing
108 * When only one core enqueues/dequeues, the test is done with the
109 * SP/SC functions in addition to the MP/MC functions.
111 * The test is done with different bulk size.
113 * On each core, the test enqueues or dequeues objects during
114 * TIME_S seconds. The number of successes and failures are stored on
115 * each core, then summed and displayed.
117 * The test checks that the number of enqueues is equal to the
118 * number of dequeues.
121 #define RING_SIZE 4096
126 static rte_atomic32_t synchro;
128 static struct rte_ring *r;
131 unsigned enq_success ;
135 unsigned deq_success;
137 } __rte_cache_aligned;
139 static struct test_stats test_stats[RTE_MAX_LCORE];
142 ring_enqueue_test(int (que_func)(struct rte_ring*, void * const *, unsigned),
143 void* arg, unsigned bulk_or_burst)
145 unsigned success = 0;
149 unsigned long dummy_obj;
150 void *obj_table[MAX_BULK];
152 unsigned lcore_id = rte_lcore_id();
153 unsigned count = *((unsigned*)arg);
154 uint64_t start_cycles, end_cycles;
155 uint64_t time_diff = 0, hz = rte_get_hpet_hz();
157 /* init dummy object table */
158 for (i = 0; i< MAX_BULK; i++) {
159 dummy_obj = lcore_id + 0x1000 + i;
160 obj_table[i] = (void *)dummy_obj;
163 /* wait synchro for slaves */
164 if (lcore_id != rte_get_master_lcore())
165 while (rte_atomic32_read(&synchro) == 0);
167 start_cycles = rte_get_hpet_cycles();
169 /* enqueue as many object as possible */
170 while (time_diff/hz < TIME_S) {
171 for (i = 0; likely(i < N); i++) {
172 ret = que_func(r, obj_table, count);
175 * 1: for bulk operation
176 * 0: for burst operation
179 /* The *count* objects enqueued, unless fail */
182 else if (ret == -EDQUOT)
187 /* The actual objects enqueued */
189 success += (ret & RTE_RING_SZ_MASK);
194 end_cycles = rte_get_hpet_cycles();
195 time_diff = end_cycles - start_cycles;
198 /* write statistics in a shared structure */
199 test_stats[lcore_id].enq_success = success;
200 test_stats[lcore_id].enq_quota = quota;
201 test_stats[lcore_id].enq_fail = fail;
207 ring_dequeue_test(int (que_func)(struct rte_ring*, void **, unsigned),
208 void* arg, unsigned bulk_or_burst)
210 unsigned success = 0;
213 void *obj_table[MAX_BULK];
215 unsigned lcore_id = rte_lcore_id();
216 unsigned count = *((unsigned*)arg);
217 uint64_t start_cycles, end_cycles;
218 uint64_t time_diff = 0, hz = rte_get_hpet_hz();
220 /* wait synchro for slaves */
221 if (lcore_id != rte_get_master_lcore())
222 while (rte_atomic32_read(&synchro) == 0);
224 start_cycles = rte_get_hpet_cycles();
226 /* dequeue as many object as possible */
227 while (time_diff/hz < TIME_S) {
228 for (i = 0; likely(i < N); i++) {
229 ret = que_func(r, obj_table, count);
232 * 1: for bulk operation
233 * 0: for burst operation
247 end_cycles = rte_get_hpet_cycles();
248 time_diff = end_cycles - start_cycles;
251 /* write statistics in a shared structure */
252 test_stats[lcore_id].deq_success = success;
253 test_stats[lcore_id].deq_fail = fail;
259 test_ring_per_core_sp_enqueue(void *arg)
261 return ring_enqueue_test(&rte_ring_sp_enqueue_bulk, arg, 1);
265 test_ring_per_core_mp_enqueue(void *arg)
267 return ring_enqueue_test(&rte_ring_mp_enqueue_bulk, arg, 1);
271 test_ring_per_core_mc_dequeue(void *arg)
273 return ring_dequeue_test(&rte_ring_mc_dequeue_bulk, arg, 1);
277 test_ring_per_core_sc_dequeue(void *arg)
279 return ring_dequeue_test(&rte_ring_sc_dequeue_bulk, arg, 1);
283 test_ring_per_core_sp_enqueue_burst(void *arg)
285 return ring_enqueue_test(&rte_ring_sp_enqueue_burst, arg, 0);
289 test_ring_per_core_mp_enqueue_burst(void *arg)
291 return ring_enqueue_test(&rte_ring_mp_enqueue_burst, arg, 0);
295 test_ring_per_core_mc_dequeue_burst(void *arg)
297 return ring_dequeue_test(&rte_ring_mc_dequeue_burst, arg, 0);
301 test_ring_per_core_sc_dequeue_burst(void *arg)
303 return ring_dequeue_test(&rte_ring_sc_dequeue_burst, arg, 0);
306 #define TEST_RING_VERIFY(exp) \
308 printf("error at %s:%d\tcondition " #exp " failed\n", \
309 __func__, __LINE__); \
314 #define TEST_RING_FULL_EMTPY_ITER 8
318 launch_cores(unsigned enq_core_count, unsigned deq_core_count,
319 unsigned n_enq_bulk, unsigned n_deq_bulk,
320 int sp, int sc, int bulk_not_burst)
324 unsigned rate, deq_remain = 0;
325 unsigned enq_total, deq_total;
326 struct test_stats sum;
327 int (*enq_f)(void *);
328 int (*deq_f)(void *);
329 unsigned cores = enq_core_count + deq_core_count;
332 rte_atomic32_set(&synchro, 0);
334 printf("ring_autotest e/d_core=%u,%u e/d_bulk=%u,%u ",
335 enq_core_count, deq_core_count, n_enq_bulk, n_deq_bulk);
336 printf("sp=%d sc=%d ", sp, sc);
338 if (bulk_not_burst) {
339 /* set enqueue function to be used */
341 enq_f = test_ring_per_core_sp_enqueue;
343 enq_f = test_ring_per_core_mp_enqueue;
345 /* set dequeue function to be used */
347 deq_f = test_ring_per_core_sc_dequeue;
349 deq_f = test_ring_per_core_mc_dequeue;
352 /* set enqueue function to be used */
354 enq_f = test_ring_per_core_sp_enqueue_burst;
356 enq_f = test_ring_per_core_mp_enqueue_burst;
358 /* set dequeue function to be used */
360 deq_f = test_ring_per_core_sc_dequeue_burst;
362 deq_f = test_ring_per_core_mc_dequeue_burst;
365 RTE_LCORE_FOREACH_SLAVE(lcore_id) {
366 if (enq_core_count != 0) {
368 rte_eal_remote_launch(enq_f, &n_enq_bulk, lcore_id);
370 if (deq_core_count != 1) {
372 rte_eal_remote_launch(deq_f, &n_deq_bulk, lcore_id);
376 memset(test_stats, 0, sizeof(test_stats));
378 /* start synchro and launch test on master */
379 rte_atomic32_set(&synchro, 1);
380 ret = deq_f(&n_deq_bulk);
383 RTE_LCORE_FOREACH_SLAVE(lcore_id) {
387 if (rte_eal_wait_lcore(lcore_id) < 0)
391 memset(&sum, 0, sizeof(sum));
392 for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
393 sum.enq_success += test_stats[lcore_id].enq_success;
394 sum.enq_quota += test_stats[lcore_id].enq_quota;
395 sum.enq_fail += test_stats[lcore_id].enq_fail;
396 sum.deq_success += test_stats[lcore_id].deq_success;
397 sum.deq_fail += test_stats[lcore_id].deq_fail;
401 while (rte_ring_sc_dequeue(r, &obj) == 0)
405 printf("per-lcore test returned -1\n");
409 enq_total = sum.enq_success + sum.enq_quota;
410 deq_total = sum.deq_success + deq_remain;
412 rate = deq_total/TIME_S;
414 printf("rate_persec=%u\n", rate);
416 if (enq_total != deq_total) {
417 printf("invalid enq/deq_success counter: %u %u\n",
418 enq_total, deq_total);
426 do_one_ring_test2(unsigned enq_core_count, unsigned deq_core_count,
427 unsigned n_enq_bulk, unsigned n_deq_bulk, unsigned bulk_or_burst)
433 do_sp = (enq_core_count == 1) ? 1 : 0;
434 do_sc = (deq_core_count == 1) ? 1 : 0;
436 for (sp = 0; sp <= do_sp; sp ++) {
437 for (sc = 0; sc <= do_sc; sc ++) {
438 ret = launch_cores(enq_core_count, deq_core_count,
439 n_enq_bulk, n_deq_bulk, sp, sc, bulk_or_burst);
448 do_one_ring_test(unsigned enq_core_count, unsigned deq_core_count,
449 unsigned bulk_or_burst)
451 unsigned bulk_enqueue_tab[] = { 1, 2, 4, 32, 0 };
452 unsigned bulk_dequeue_tab[] = { 1, 2, 4, 32, 0 };
453 unsigned *bulk_enqueue_ptr;
454 unsigned *bulk_dequeue_ptr;
457 for (bulk_enqueue_ptr = bulk_enqueue_tab;
459 bulk_enqueue_ptr++) {
461 for (bulk_dequeue_ptr = bulk_dequeue_tab;
463 bulk_dequeue_ptr++) {
465 ret = do_one_ring_test2(enq_core_count, deq_core_count,
477 check_live_watermark_change(__attribute__((unused)) void *dummy)
479 uint64_t hz = rte_get_hpet_hz();
480 void *obj_table[MAX_BULK];
481 unsigned watermark, watermark_old = 16;
482 uint64_t cur_time, end_time;
487 /* init the object table */
488 memset(obj_table, 0, sizeof(obj_table));
489 end_time = rte_get_hpet_cycles() + (hz * 2);
491 /* check that bulk and watermark are 4 and 32 (respectively) */
494 /* add in ring until we reach watermark */
496 for (i = 0; i < 16; i ++) {
499 ret = rte_ring_enqueue_bulk(r, obj_table, count);
502 if (ret != -EDQUOT) {
503 printf("Cannot enqueue objects, or watermark not "
504 "reached (ret=%d)\n", ret);
508 /* read watermark, the only change allowed is from 16 to 32 */
509 watermark = r->prod.watermark;
510 if (watermark != watermark_old &&
511 (watermark_old != 16 || watermark != 32)) {
512 printf("Bad watermark change %u -> %u\n", watermark_old,
516 watermark_old = watermark;
518 /* dequeue objects from ring */
520 ret = rte_ring_dequeue_bulk(r, obj_table, count);
522 printf("Cannot dequeue (ret=%d)\n", ret);
527 cur_time = rte_get_hpet_cycles();
528 diff = end_time - cur_time;
531 if (watermark_old != 32 ) {
532 printf(" watermark was not updated (wm=%u)\n",
541 test_live_watermark_change(void)
543 unsigned lcore_id = rte_lcore_id();
544 unsigned lcore_id2 = rte_get_next_lcore(lcore_id, 0, 1);
546 printf("Test watermark live modification\n");
547 rte_ring_set_water_mark(r, 16);
549 /* launch a thread that will enqueue and dequeue, checking
550 * watermark and quota */
551 rte_eal_remote_launch(check_live_watermark_change, NULL, lcore_id2);
554 rte_ring_set_water_mark(r, 32);
557 if (rte_eal_wait_lcore(lcore_id2) < 0)
563 /* Test for catch on invalid watermark values */
565 test_set_watermark( void ){
569 struct rte_ring *r = rte_ring_lookup("test_ring_basic_ex");
571 printf( " ring lookup failed\n" );
574 count = r->prod.size*2;
575 setwm = rte_ring_set_water_mark(r, count);
576 if (setwm != -EINVAL){
577 printf("Test failed to detect invalid watermark count value\n");
582 rte_ring_set_water_mark(r, count);
583 if (r->prod.watermark != r->prod.size) {
584 printf("Test failed to detect invalid watermark count value\n");
594 * helper routine for test_ring_basic
597 test_ring_basic_full_empty(void * const src[], void *dst[])
600 const unsigned rsz = RING_SIZE - 1;
602 printf("Basic full/empty test\n");
604 for (i = 0; TEST_RING_FULL_EMTPY_ITER != i; i++) {
606 /* random shift in the ring */
607 rand = RTE_MAX(rte_rand() % RING_SIZE, 1UL);
608 printf("%s: iteration %u, random shift: %u;\n",
610 TEST_RING_VERIFY(-ENOBUFS != rte_ring_enqueue_bulk(r, src,
612 TEST_RING_VERIFY(0 == rte_ring_dequeue_bulk(r, dst, rand));
615 TEST_RING_VERIFY(-ENOBUFS != rte_ring_enqueue_bulk(r, src,
617 TEST_RING_VERIFY(0 == rte_ring_free_count(r));
618 TEST_RING_VERIFY(rsz == rte_ring_count(r));
619 TEST_RING_VERIFY(rte_ring_full(r));
620 TEST_RING_VERIFY(0 == rte_ring_empty(r));
623 TEST_RING_VERIFY(0 == rte_ring_dequeue_bulk(r, dst, rsz));
624 TEST_RING_VERIFY(rsz == rte_ring_free_count(r));
625 TEST_RING_VERIFY(0 == rte_ring_count(r));
626 TEST_RING_VERIFY(0 == rte_ring_full(r));
627 TEST_RING_VERIFY(rte_ring_empty(r));
630 TEST_RING_VERIFY(0 == memcmp(src, dst, rsz));
637 test_ring_basic(void)
639 void **src = NULL, **cur_src = NULL, **dst = NULL, **cur_dst = NULL;
641 unsigned i, num_elems;
643 /* alloc dummy object pointers */
644 src = malloc(RING_SIZE*2*sizeof(void *));
648 for (i = 0; i < RING_SIZE*2 ; i++) {
649 src[i] = (void *)(unsigned long)i;
653 /* alloc some room for copied objects */
654 dst = malloc(RING_SIZE*2*sizeof(void *));
658 memset(dst, 0, RING_SIZE*2*sizeof(void *));
661 printf("enqueue 1 obj\n");
662 ret = rte_ring_sp_enqueue_bulk(r, cur_src, 1);
667 printf("enqueue 2 objs\n");
668 ret = rte_ring_sp_enqueue_bulk(r, cur_src, 2);
673 printf("enqueue MAX_BULK objs\n");
674 ret = rte_ring_sp_enqueue_bulk(r, cur_src, MAX_BULK);
679 printf("dequeue 1 obj\n");
680 ret = rte_ring_sc_dequeue_bulk(r, cur_dst, 1);
685 printf("dequeue 2 objs\n");
686 ret = rte_ring_sc_dequeue_bulk(r, cur_dst, 2);
691 printf("dequeue MAX_BULK objs\n");
692 ret = rte_ring_sc_dequeue_bulk(r, cur_dst, MAX_BULK);
698 if (memcmp(src, dst, cur_dst - dst)) {
699 test_hexdump("src", src, cur_src - src);
700 test_hexdump("dst", dst, cur_dst - dst);
701 printf("data after dequeue is not the same\n");
707 printf("enqueue 1 obj\n");
708 ret = rte_ring_mp_enqueue_bulk(r, cur_src, 1);
713 printf("enqueue 2 objs\n");
714 ret = rte_ring_mp_enqueue_bulk(r, cur_src, 2);
719 printf("enqueue MAX_BULK objs\n");
720 ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK);
725 printf("dequeue 1 obj\n");
726 ret = rte_ring_mc_dequeue_bulk(r, cur_dst, 1);
731 printf("dequeue 2 objs\n");
732 ret = rte_ring_mc_dequeue_bulk(r, cur_dst, 2);
737 printf("dequeue MAX_BULK objs\n");
738 ret = rte_ring_mc_dequeue_bulk(r, cur_dst, MAX_BULK);
744 if (memcmp(src, dst, cur_dst - dst)) {
745 test_hexdump("src", src, cur_src - src);
746 test_hexdump("dst", dst, cur_dst - dst);
747 printf("data after dequeue is not the same\n");
753 printf("fill and empty the ring\n");
754 for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
755 ret = rte_ring_mp_enqueue_bulk(r, cur_src, MAX_BULK);
759 ret = rte_ring_mc_dequeue_bulk(r, cur_dst, MAX_BULK);
766 if (memcmp(src, dst, cur_dst - dst)) {
767 test_hexdump("src", src, cur_src - src);
768 test_hexdump("dst", dst, cur_dst - dst);
769 printf("data after dequeue is not the same\n");
773 if (test_ring_basic_full_empty(src, dst) != 0)
779 printf("test watermark and default bulk enqueue / dequeue\n");
780 rte_ring_set_water_mark(r, 20);
786 ret = rte_ring_enqueue_bulk(r, cur_src, num_elems);
787 cur_src += num_elems;
789 printf("Cannot enqueue\n");
792 ret = rte_ring_enqueue_bulk(r, cur_src, num_elems);
793 cur_src += num_elems;
794 if (ret != -EDQUOT) {
795 printf("Watermark not exceeded\n");
798 ret = rte_ring_dequeue_bulk(r, cur_dst, num_elems);
799 cur_dst += num_elems;
801 printf("Cannot dequeue\n");
804 ret = rte_ring_dequeue_bulk(r, cur_dst, num_elems);
805 cur_dst += num_elems;
807 printf("Cannot dequeue2\n");
812 if (memcmp(src, dst, cur_dst - dst)) {
813 test_hexdump("src", src, cur_src - src);
814 test_hexdump("dst", dst, cur_dst - dst);
815 printf("data after dequeue is not the same\n");
822 ret = rte_ring_mp_enqueue(r, cur_src);
826 ret = rte_ring_mc_dequeue(r, cur_dst);
845 test_ring_burst_basic(void)
847 void **src = NULL, **cur_src = NULL, **dst = NULL, **cur_dst = NULL;
851 /* alloc dummy object pointers */
852 src = malloc(RING_SIZE*2*sizeof(void *));
856 for (i = 0; i < RING_SIZE*2 ; i++) {
857 src[i] = (void *)(unsigned long)i;
861 /* alloc some room for copied objects */
862 dst = malloc(RING_SIZE*2*sizeof(void *));
866 memset(dst, 0, RING_SIZE*2*sizeof(void *));
869 printf("Test SP & SC basic functions \n");
870 printf("enqueue 1 obj\n");
871 ret = rte_ring_sp_enqueue_burst(r, cur_src, 1);
873 if ((ret & RTE_RING_SZ_MASK) != 1)
876 printf("enqueue 2 objs\n");
877 ret = rte_ring_sp_enqueue_burst(r, cur_src, 2);
879 if ((ret & RTE_RING_SZ_MASK) != 2)
882 printf("enqueue MAX_BULK objs\n");
883 ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK) ;
885 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
888 printf("dequeue 1 obj\n");
889 ret = rte_ring_sc_dequeue_burst(r, cur_dst, 1) ;
891 if ((ret & RTE_RING_SZ_MASK) != 1)
894 printf("dequeue 2 objs\n");
895 ret = rte_ring_sc_dequeue_burst(r, cur_dst, 2);
897 if ((ret & RTE_RING_SZ_MASK) != 2)
900 printf("dequeue MAX_BULK objs\n");
901 ret = rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
903 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
907 if (memcmp(src, dst, cur_dst - dst)) {
908 test_hexdump("src", src, cur_src - src);
909 test_hexdump("dst", dst, cur_dst - dst);
910 printf("data after dequeue is not the same\n");
917 printf("Test enqueue without enough memory space \n");
918 for (i = 0; i< (RING_SIZE/MAX_BULK - 1); i++) {
919 ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
921 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK) {
926 printf("Enqueue 2 objects, free entries = MAX_BULK - 2 \n");
927 ret = rte_ring_sp_enqueue_burst(r, cur_src, 2);
929 if ((ret & RTE_RING_SZ_MASK) != 2)
932 printf("Enqueue the remaining entries = MAX_BULK - 2 \n");
933 /* Always one free entry left */
934 ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
935 cur_src += MAX_BULK - 3;
936 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
939 printf("Test if ring is full \n");
940 if (rte_ring_full(r) != 1)
943 printf("Test enqueue for a full entry \n");
944 ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
945 if ((ret & RTE_RING_SZ_MASK) != 0)
948 printf("Test dequeue without enough objects \n");
949 for (i = 0; i<RING_SIZE/MAX_BULK - 1; i++) {
950 ret = rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
952 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
956 /* Available memory space for the exact MAX_BULK entries */
957 ret = rte_ring_sc_dequeue_burst(r, cur_dst, 2);
959 if ((ret & RTE_RING_SZ_MASK) != 2)
962 ret = rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
963 cur_dst += MAX_BULK - 3;
964 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
967 printf("Test if ring is empty \n");
968 /* Check if ring is empty */
969 if (1 != rte_ring_empty(r))
973 if (memcmp(src, dst, cur_dst - dst)) {
974 test_hexdump("src", src, cur_src - src);
975 test_hexdump("dst", dst, cur_dst - dst);
976 printf("data after dequeue is not the same\n");
983 printf("Test MP & MC basic functions \n");
985 printf("enqueue 1 obj\n");
986 ret = rte_ring_mp_enqueue_burst(r, cur_src, 1);
988 if ((ret & RTE_RING_SZ_MASK) != 1)
991 printf("enqueue 2 objs\n");
992 ret = rte_ring_mp_enqueue_burst(r, cur_src, 2);
994 if ((ret & RTE_RING_SZ_MASK) != 2)
997 printf("enqueue MAX_BULK objs\n");
998 ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
1000 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1003 printf("dequeue 1 obj\n");
1004 ret = rte_ring_mc_dequeue_burst(r, cur_dst, 1);
1006 if ((ret & RTE_RING_SZ_MASK) != 1)
1009 printf("dequeue 2 objs\n");
1010 ret = rte_ring_mc_dequeue_burst(r, cur_dst, 2);
1012 if ((ret & RTE_RING_SZ_MASK) != 2)
1015 printf("dequeue MAX_BULK objs\n");
1016 ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK);
1017 cur_dst += MAX_BULK;
1018 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1022 if (memcmp(src, dst, cur_dst - dst)) {
1023 test_hexdump("src", src, cur_src - src);
1024 test_hexdump("dst", dst, cur_dst - dst);
1025 printf("data after dequeue is not the same\n");
1032 printf("fill and empty the ring\n");
1033 for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
1034 ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
1035 cur_src += MAX_BULK;
1036 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1038 ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK);
1039 cur_dst += MAX_BULK;
1040 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1045 if (memcmp(src, dst, cur_dst - dst)) {
1046 test_hexdump("src", src, cur_src - src);
1047 test_hexdump("dst", dst, cur_dst - dst);
1048 printf("data after dequeue is not the same\n");
1055 printf("Test enqueue without enough memory space \n");
1056 for (i = 0; i<RING_SIZE/MAX_BULK - 1; i++) {
1057 ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
1058 cur_src += MAX_BULK;
1059 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1063 /* Available memory space for the exact MAX_BULK objects */
1064 ret = rte_ring_mp_enqueue_burst(r, cur_src, 2);
1066 if ((ret & RTE_RING_SZ_MASK) != 2)
1069 ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
1070 cur_src += MAX_BULK - 3;
1071 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
1075 printf("Test dequeue without enough objects \n");
1076 for (i = 0; i<RING_SIZE/MAX_BULK - 1; i++) {
1077 ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK);
1078 cur_dst += MAX_BULK;
1079 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
1083 /* Available objects - the exact MAX_BULK */
1084 ret = rte_ring_mc_dequeue_burst(r, cur_dst, 2);
1086 if ((ret & RTE_RING_SZ_MASK) != 2)
1089 ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK);
1090 cur_dst += MAX_BULK - 3;
1091 if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
1095 if (memcmp(src, dst, cur_dst - dst)) {
1096 test_hexdump("src", src, cur_src - src);
1097 test_hexdump("dst", dst, cur_dst - dst);
1098 printf("data after dequeue is not the same\n");
1105 printf("Covering rte_ring_enqueue_burst functions \n");
1107 ret = rte_ring_enqueue_burst(r, cur_src, 2);
1109 if ((ret & RTE_RING_SZ_MASK) != 2)
1112 ret = rte_ring_dequeue_burst(r, cur_dst, 2);
1117 /* Free memory before test completed */
1133 test_ring_stats(void)
1136 #ifndef RTE_LIBRTE_RING_DEBUG
1137 printf("Enable RTE_LIBRTE_RING_DEBUG to test ring stats.\n");
1140 void **src = NULL, **cur_src = NULL, **dst = NULL, **cur_dst = NULL;
1143 unsigned num_items = 0;
1144 unsigned failed_enqueue_ops = 0;
1145 unsigned failed_enqueue_items = 0;
1146 unsigned failed_dequeue_ops = 0;
1147 unsigned failed_dequeue_items = 0;
1148 unsigned last_enqueue_ops = 0;
1149 unsigned last_enqueue_items = 0;
1150 unsigned last_quota_ops = 0;
1151 unsigned last_quota_items = 0;
1152 unsigned lcore_id = rte_lcore_id();
1153 struct rte_ring_debug_stats *ring_stats = &r->stats[lcore_id];
1155 printf("Test the ring stats.\n");
1157 /* Reset the watermark in case it was set in another test. */
1158 rte_ring_set_water_mark(r, 0);
1160 /* Reset the ring stats. */
1161 memset(&r->stats[lcore_id], 0, sizeof(r->stats[lcore_id]));
1163 /* Allocate some dummy object pointers. */
1164 src = malloc(RING_SIZE*2*sizeof(void *));
1168 for (i = 0; i < RING_SIZE*2 ; i++) {
1169 src[i] = (void *)(unsigned long)i;
1172 /* Allocate some memory for copied objects. */
1173 dst = malloc(RING_SIZE*2*sizeof(void *));
1177 memset(dst, 0, RING_SIZE*2*sizeof(void *));
1179 /* Set the head and tail pointers. */
1183 /* Do Enqueue tests. */
1184 printf("Test the dequeue stats.\n");
1186 /* Fill the ring up to RING_SIZE -1. */
1187 printf("Fill the ring.\n");
1188 for (i = 0; i< (RING_SIZE/MAX_BULK); i++) {
1189 rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
1190 cur_src += MAX_BULK;
1193 /* Adjust for final enqueue = MAX_BULK -1. */
1196 printf("Verify that the ring is full.\n");
1197 if (rte_ring_full(r) != 1)
1201 printf("Verify the enqueue success stats.\n");
1202 /* Stats should match above enqueue operations to fill the ring. */
1203 if (ring_stats->enq_success_bulk != (RING_SIZE/MAX_BULK))
1206 /* Current max objects is RING_SIZE -1. */
1207 if (ring_stats->enq_success_objs != RING_SIZE -1)
1210 /* Shouldn't have any failures yet. */
1211 if (ring_stats->enq_fail_bulk != 0)
1213 if (ring_stats->enq_fail_objs != 0)
1217 printf("Test stats for SP burst enqueue to a full ring.\n");
1219 ret = rte_ring_sp_enqueue_burst(r, cur_src, num_items);
1220 if ((ret & RTE_RING_SZ_MASK) != 0)
1223 failed_enqueue_ops += 1;
1224 failed_enqueue_items += num_items;
1226 /* The enqueue should have failed. */
1227 if (ring_stats->enq_fail_bulk != failed_enqueue_ops)
1229 if (ring_stats->enq_fail_objs != failed_enqueue_items)
1233 printf("Test stats for SP bulk enqueue to a full ring.\n");
1235 ret = rte_ring_sp_enqueue_bulk(r, cur_src, num_items);
1236 if (ret != -ENOBUFS)
1239 failed_enqueue_ops += 1;
1240 failed_enqueue_items += num_items;
1242 /* The enqueue should have failed. */
1243 if (ring_stats->enq_fail_bulk != failed_enqueue_ops)
1245 if (ring_stats->enq_fail_objs != failed_enqueue_items)
1249 printf("Test stats for MP burst enqueue to a full ring.\n");
1251 ret = rte_ring_mp_enqueue_burst(r, cur_src, num_items);
1252 if ((ret & RTE_RING_SZ_MASK) != 0)
1255 failed_enqueue_ops += 1;
1256 failed_enqueue_items += num_items;
1258 /* The enqueue should have failed. */
1259 if (ring_stats->enq_fail_bulk != failed_enqueue_ops)
1261 if (ring_stats->enq_fail_objs != failed_enqueue_items)
1265 printf("Test stats for MP bulk enqueue to a full ring.\n");
1267 ret = rte_ring_mp_enqueue_bulk(r, cur_src, num_items);
1268 if (ret != -ENOBUFS)
1271 failed_enqueue_ops += 1;
1272 failed_enqueue_items += num_items;
1274 /* The enqueue should have failed. */
1275 if (ring_stats->enq_fail_bulk != failed_enqueue_ops)
1277 if (ring_stats->enq_fail_objs != failed_enqueue_items)
1281 /* Do Dequeue tests. */
1282 printf("Test the dequeue stats.\n");
1284 printf("Empty the ring.\n");
1285 for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
1286 rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
1287 cur_dst += MAX_BULK;
1290 /* There was only RING_SIZE -1 objects to dequeue. */
1293 printf("Verify ring is empty.\n");
1294 if (1 != rte_ring_empty(r))
1297 printf("Verify the dequeue success stats.\n");
1298 /* Stats should match above dequeue operations. */
1299 if (ring_stats->deq_success_bulk != (RING_SIZE/MAX_BULK))
1302 /* Objects dequeued is RING_SIZE -1. */
1303 if (ring_stats->deq_success_objs != RING_SIZE -1)
1306 /* Shouldn't have any dequeue failure stats yet. */
1307 if (ring_stats->deq_fail_bulk != 0)
1310 printf("Test stats for SC burst dequeue with an empty ring.\n");
1312 ret = rte_ring_sc_dequeue_burst(r, cur_dst, num_items);
1313 if ((ret & RTE_RING_SZ_MASK) != 0)
1316 failed_dequeue_ops += 1;
1317 failed_dequeue_items += num_items;
1319 /* The dequeue should have failed. */
1320 if (ring_stats->deq_fail_bulk != failed_dequeue_ops)
1322 if (ring_stats->deq_fail_objs != failed_dequeue_items)
1326 printf("Test stats for SC bulk dequeue with an empty ring.\n");
1328 ret = rte_ring_sc_dequeue_bulk(r, cur_dst, num_items);
1332 failed_dequeue_ops += 1;
1333 failed_dequeue_items += num_items;
1335 /* The dequeue should have failed. */
1336 if (ring_stats->deq_fail_bulk != failed_dequeue_ops)
1338 if (ring_stats->deq_fail_objs != failed_dequeue_items)
1342 printf("Test stats for MC burst dequeue with an empty ring.\n");
1344 ret = rte_ring_mc_dequeue_burst(r, cur_dst, num_items);
1345 if ((ret & RTE_RING_SZ_MASK) != 0)
1347 failed_dequeue_ops += 1;
1348 failed_dequeue_items += num_items;
1350 /* The dequeue should have failed. */
1351 if (ring_stats->deq_fail_bulk != failed_dequeue_ops)
1353 if (ring_stats->deq_fail_objs != failed_dequeue_items)
1357 printf("Test stats for MC bulk dequeue with an empty ring.\n");
1359 ret = rte_ring_mc_dequeue_bulk(r, cur_dst, num_items);
1363 failed_dequeue_ops += 1;
1364 failed_dequeue_items += num_items;
1366 /* The dequeue should have failed. */
1367 if (ring_stats->deq_fail_bulk != failed_dequeue_ops)
1369 if (ring_stats->deq_fail_objs != failed_dequeue_items)
1373 printf("Test total enqueue/dequeue stats.\n");
1374 /* At this point the enqueue and dequeue stats should be the same. */
1375 if (ring_stats->enq_success_bulk != ring_stats->deq_success_bulk)
1377 if (ring_stats->enq_success_objs != ring_stats->deq_success_objs)
1379 if (ring_stats->enq_fail_bulk != ring_stats->deq_fail_bulk)
1381 if (ring_stats->enq_fail_objs != ring_stats->deq_fail_objs)
1385 /* Watermark Tests. */
1386 printf("Test the watermark/quota stats.\n");
1388 printf("Verify the initial watermark stats.\n");
1389 /* Watermark stats should be 0 since there is no watermark. */
1390 if (ring_stats->enq_quota_bulk != 0)
1392 if (ring_stats->enq_quota_objs != 0)
1395 /* Set a watermark. */
1396 rte_ring_set_water_mark(r, 16);
1398 /* Reset pointers. */
1402 last_enqueue_ops = ring_stats->enq_success_bulk;
1403 last_enqueue_items = ring_stats->enq_success_objs;
1406 printf("Test stats for SP burst enqueue below watermark.\n");
1408 ret = rte_ring_sp_enqueue_burst(r, cur_src, num_items);
1409 if ((ret & RTE_RING_SZ_MASK) != num_items)
1412 /* Watermark stats should still be 0. */
1413 if (ring_stats->enq_quota_bulk != 0)
1415 if (ring_stats->enq_quota_objs != 0)
1418 /* Success stats should have increased. */
1419 if (ring_stats->enq_success_bulk != last_enqueue_ops + 1)
1421 if (ring_stats->enq_success_objs != last_enqueue_items + num_items)
1424 last_enqueue_ops = ring_stats->enq_success_bulk;
1425 last_enqueue_items = ring_stats->enq_success_objs;
1428 printf("Test stats for SP burst enqueue at watermark.\n");
1430 ret = rte_ring_sp_enqueue_burst(r, cur_src, num_items);
1431 if ((ret & RTE_RING_SZ_MASK) != num_items)
1434 /* Watermark stats should have changed. */
1435 if (ring_stats->enq_quota_bulk != 1)
1437 if (ring_stats->enq_quota_objs != num_items)
1440 last_quota_ops = ring_stats->enq_quota_bulk;
1441 last_quota_items = ring_stats->enq_quota_objs;
1444 printf("Test stats for SP burst enqueue above watermark.\n");
1446 ret = rte_ring_sp_enqueue_burst(r, cur_src, num_items);
1447 if ((ret & RTE_RING_SZ_MASK) != num_items)
1450 /* Watermark stats should have changed. */
1451 if (ring_stats->enq_quota_bulk != last_quota_ops +1)
1453 if (ring_stats->enq_quota_objs != last_quota_items + num_items)
1456 last_quota_ops = ring_stats->enq_quota_bulk;
1457 last_quota_items = ring_stats->enq_quota_objs;
1460 printf("Test stats for MP burst enqueue above watermark.\n");
1462 ret = rte_ring_mp_enqueue_burst(r, cur_src, num_items);
1463 if ((ret & RTE_RING_SZ_MASK) != num_items)
1466 /* Watermark stats should have changed. */
1467 if (ring_stats->enq_quota_bulk != last_quota_ops +1)
1469 if (ring_stats->enq_quota_objs != last_quota_items + num_items)
1472 last_quota_ops = ring_stats->enq_quota_bulk;
1473 last_quota_items = ring_stats->enq_quota_objs;
1476 printf("Test stats for SP bulk enqueue above watermark.\n");
1478 ret = rte_ring_sp_enqueue_bulk(r, cur_src, num_items);
1482 /* Watermark stats should have changed. */
1483 if (ring_stats->enq_quota_bulk != last_quota_ops +1)
1485 if (ring_stats->enq_quota_objs != last_quota_items + num_items)
1488 last_quota_ops = ring_stats->enq_quota_bulk;
1489 last_quota_items = ring_stats->enq_quota_objs;
1492 printf("Test stats for MP bulk enqueue above watermark.\n");
1494 ret = rte_ring_mp_enqueue_bulk(r, cur_src, num_items);
1498 /* Watermark stats should have changed. */
1499 if (ring_stats->enq_quota_bulk != last_quota_ops +1)
1501 if (ring_stats->enq_quota_objs != last_quota_items + num_items)
1504 printf("Test watermark success stats.\n");
1505 /* Success stats should be same as last non-watermarked enqueue. */
1506 if (ring_stats->enq_success_bulk != last_enqueue_ops)
1508 if (ring_stats->enq_success_objs != last_enqueue_items)
1514 /* Empty the ring. */
1515 for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
1516 rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
1517 cur_dst += MAX_BULK;
1520 /* Reset the watermark. */
1521 rte_ring_set_water_mark(r, 0);
1523 /* Reset the ring stats. */
1524 memset(&r->stats[lcore_id], 0, sizeof(r->stats[lcore_id]));
1526 /* Free memory before test completed */
1543 * it will always fail to create ring with a wrong ring size number in this function
1546 test_ring_creation_with_wrong_size(void)
1548 struct rte_ring * rp = NULL;
1550 /* Test if ring size is not power of 2 */
1551 rp = rte_ring_create("test_bad_ring_size", RING_SIZE + 1, SOCKET_ID_ANY, 0);
1556 /* Test if ring size is exceeding the limit */
1557 rp = rte_ring_create("test_bad_ring_size", (RTE_RING_SZ_MASK + 1), SOCKET_ID_ANY, 0);
1565 * it tests if it would always fail to create ring with an used ring name
1568 test_ring_creation_with_an_used_name(void)
1570 struct rte_ring * rp;
1572 rp = rte_ring_create("test", RING_SIZE, SOCKET_ID_ANY, 0);
1580 * Test to if a non-power of 2 count causes the create
1581 * function to fail correctly
1584 test_create_count_odd(void)
1586 struct rte_ring *r = rte_ring_create("test_ring_count",
1587 4097, SOCKET_ID_ANY, 0 );
1595 test_lookup_null(void)
1597 struct rte_ring *rlp = rte_ring_lookup("ring_not_found");
1599 if (rte_errno != ENOENT){
1600 printf( "test failed to returnn error on null pointer\n");
1607 * it tests some more basic ring operations
1610 test_ring_basic_ex(void)
1614 struct rte_ring * rp;
1617 obj = (void **)rte_zmalloc("test_ring_basic_ex_malloc", (RING_SIZE * sizeof(void *)), 0);
1619 printf("test_ring_basic_ex fail to rte_malloc\n");
1623 rp = rte_ring_create("test_ring_basic_ex", RING_SIZE, SOCKET_ID_ANY,
1624 RING_F_SP_ENQ | RING_F_SC_DEQ);
1626 printf("test_ring_basic_ex fail to create ring\n");
1630 if (rte_ring_lookup("test_ring_basic_ex") != rp) {
1634 if (rte_ring_empty(rp) != 1) {
1635 printf("test_ring_basic_ex ring is not empty but it should be\n");
1639 printf("%u ring entries are now free\n", rte_ring_free_count(rp));
1641 for (i = 0; i < RING_SIZE; i ++) {
1642 rte_ring_enqueue(rp, obj[i]);
1645 if (rte_ring_full(rp) != 1) {
1646 printf("test_ring_basic_ex ring is not full but it should be\n");
1650 for (i = 0; i < RING_SIZE; i ++) {
1651 rte_ring_dequeue(rp, &obj[i]);
1654 if (rte_ring_empty(rp) != 1) {
1655 printf("test_ring_basic_ex ring is not empty but it should be\n");
1659 /* Covering the ring burst operation */
1660 ret = rte_ring_enqueue_burst(rp, obj, 2);
1661 if ((ret & RTE_RING_SZ_MASK) != 2) {
1662 printf("test_ring_basic_ex: rte_ring_enqueue_burst fails \n");
1666 ret = rte_ring_dequeue_burst(rp, obj, 2);
1668 printf("test_ring_basic_ex: rte_ring_dequeue_burst fails \n");
1683 unsigned enq_core_count, deq_core_count;
1685 /* some more basic operations */
1686 if (test_ring_basic_ex() < 0)
1689 rte_atomic32_init(&synchro);
1692 r = rte_ring_create("test", RING_SIZE, SOCKET_ID_ANY, 0);
1696 /* retrieve the ring from its name */
1697 if (rte_ring_lookup("test") != r) {
1698 printf("Cannot lookup ring from its name\n");
1702 /* burst operations */
1703 if (test_ring_burst_basic() < 0)
1706 /* basic operations */
1707 if (test_ring_basic() < 0)
1711 if (test_ring_stats() < 0)
1714 /* basic operations */
1715 if (test_live_watermark_change() < 0)
1718 if ( test_set_watermark() < 0){
1719 printf ("Test failed to detect invalid parameter\n");
1723 printf ( "Test detected forced bad watermark values\n");
1725 if ( test_create_count_odd() < 0){
1726 printf ("Test failed to detect odd count\n");
1730 printf ( "Test detected odd count\n");
1732 if ( test_lookup_null() < 0){
1733 printf ("Test failed to detect NULL ring lookup\n");
1737 printf ( "Test detected NULL ring lookup \n");
1739 printf("start performance tests \n");
1741 /* one lcore for enqueue, one for dequeue */
1744 if (do_one_ring_test(enq_core_count, deq_core_count, 1) < 0)
1747 /* max cores for enqueue, one for dequeue */
1748 enq_core_count = rte_lcore_count() - 1;
1750 if (do_one_ring_test(enq_core_count, deq_core_count, 1) < 0)
1753 /* max cores for dequeue, one for enqueue */
1755 deq_core_count = rte_lcore_count() - 1;
1756 if (do_one_ring_test(enq_core_count, deq_core_count, 1) < 0)
1759 /* half for enqueue and half for dequeue */
1760 enq_core_count = rte_lcore_count() / 2;
1761 deq_core_count = rte_lcore_count() / 2;
1762 if (do_one_ring_test(enq_core_count, deq_core_count, 1) < 0)
1765 printf("start performance tests - burst operations \n");
1767 /* one lcore for enqueue, one for dequeue */
1770 if (do_one_ring_test(enq_core_count, deq_core_count, 0) < 0)
1773 /* max cores for enqueue, one for dequeue */
1774 enq_core_count = rte_lcore_count() - 1;
1776 if (do_one_ring_test(enq_core_count, deq_core_count, 0) < 0)
1779 /* max cores for dequeue, one for enqueue */
1781 deq_core_count = rte_lcore_count() - 1;
1782 if (do_one_ring_test(enq_core_count, deq_core_count, 0) < 0)
1785 /* half for enqueue and half for dequeue */
1786 enq_core_count = rte_lcore_count() / 2;
1787 deq_core_count = rte_lcore_count() / 2;
1788 if (do_one_ring_test(enq_core_count, deq_core_count, 0) < 0)
1791 /* test of creating ring with wrong size */
1792 if (test_ring_creation_with_wrong_size() < 0)
1795 /* test of creation ring with an used name */
1796 if (test_ring_creation_with_an_used_name() < 0)
1799 /* dump the ring status */
1800 rte_ring_list_dump();