From 50d769054872d8c43e7cb66119e5387ca53deb90 Mon Sep 17 00:00:00 2001 From: Intel Date: Thu, 20 Dec 2012 00:00:00 +0100 Subject: [PATCH] ring: add burst API Signed-off-by: Intel --- app/test/test_ring.c | 1386 ++++++++++++++++++++++++++++-------- lib/librte_ring/rte_ring.c | 6 +- lib/librte_ring/rte_ring.h | 558 +++++++++++---- 3 files changed, 1523 insertions(+), 427 deletions(-) diff --git a/app/test/test_ring.c b/app/test/test_ring.c index b522964eb3..9a07479dbe 100644 --- a/app/test/test_ring.c +++ b/app/test/test_ring.c @@ -89,15 +89,12 @@ * watermark is exceeded * - Check that dequeued pointers are correct * - * #. Check quota and watermark + * #. Check live watermark change * * - Start a loop on another lcore that will enqueue and dequeue - * objects in a ring. It will monitor the value of quota (default - * bulk count) and watermark. - * - At the same time, change the quota and the watermark on the - * master lcore. - * - The slave lcore will check that bulk count changes from 4 to - * 8, and watermark changes from 16 to 32. + * objects in a ring. It will monitor the value of watermark. + * - At the same time, change the watermark on the master lcore. + * - The slave lcore will check that watermark changes from 16 to 32. * * #. Performance tests. * @@ -128,8 +125,6 @@ static rte_atomic32_t synchro; -static unsigned bulk_enqueue; -static unsigned bulk_dequeue; static struct rte_ring *r; struct test_stats { @@ -143,106 +138,170 @@ struct test_stats { static struct test_stats test_stats[RTE_MAX_LCORE]; -#define DEFINE_ENQUEUE_FUNCTION(name, enq_code) \ -static int \ -name(__attribute__((unused)) void *arg) \ -{ \ - unsigned success = 0; \ - unsigned quota = 0; \ - unsigned fail = 0; \ - unsigned i; \ - unsigned long dummy_obj; \ - void *obj_table[MAX_BULK]; \ - int ret; \ - unsigned lcore_id = rte_lcore_id(); \ - uint64_t start_cycles, end_cycles; \ - uint64_t time_diff = 0, hz = rte_get_hpet_hz(); \ - \ - /* init dummy object table */ \ - for (i = 0; i< MAX_BULK; i++) { \ - dummy_obj = lcore_id + 0x1000 + i; \ - obj_table[i] = (void *)dummy_obj; \ - } \ - \ - /* wait synchro for slaves */ \ - if (lcore_id != rte_get_master_lcore()) \ - while (rte_atomic32_read(&synchro) == 0); \ - \ - start_cycles = rte_get_hpet_cycles(); \ - \ - /* enqueue as many object as possible */ \ - while (time_diff/hz < TIME_S) { \ - for (i = 0; likely(i < N); i++) { \ - ret = enq_code; \ - if (ret == 0) \ - success++; \ - else if (ret == -EDQUOT) \ - quota++; \ - else \ - fail++; \ - } \ - end_cycles = rte_get_hpet_cycles(); \ - time_diff = end_cycles - start_cycles; \ - } \ - \ - /* write statistics in a shared structure */ \ - test_stats[lcore_id].enq_success = success; \ - test_stats[lcore_id].enq_quota = quota; \ - test_stats[lcore_id].enq_fail = fail; \ - \ - return 0; \ +static int +ring_enqueue_test(int (que_func)(struct rte_ring*, void * const *, unsigned), + void* arg, unsigned bulk_or_burst) +{ + unsigned success = 0; + unsigned quota = 0; + unsigned fail = 0; + unsigned i; + unsigned long dummy_obj; + void *obj_table[MAX_BULK]; + int ret; + unsigned lcore_id = rte_lcore_id(); + unsigned count = *((unsigned*)arg); + uint64_t start_cycles, end_cycles; + uint64_t time_diff = 0, hz = rte_get_hpet_hz(); + + /* init dummy object table */ + for (i = 0; i< MAX_BULK; i++) { + dummy_obj = lcore_id + 0x1000 + i; + obj_table[i] = (void *)dummy_obj; + } + + /* wait synchro for slaves */ + if (lcore_id != rte_get_master_lcore()) + while (rte_atomic32_read(&synchro) == 0); + + start_cycles = rte_get_hpet_cycles(); + + /* enqueue as many object as possible */ + while (time_diff/hz < TIME_S) { + for (i = 0; likely(i < N); i++) { + ret = que_func(r, obj_table, count); + /* + * bulk_or_burst + * 1: for bulk operation + * 0: for burst operation + */ + if (bulk_or_burst) { + /* The *count* objects enqueued, unless fail */ + if (ret == 0) + success += count; + else if (ret == -EDQUOT) + quota += count; + else + fail++; + } else { + /* The actual objects enqueued */ + if (ret != 0) + success += (ret & RTE_RING_SZ_MASK); + else + fail++; + } + } + end_cycles = rte_get_hpet_cycles(); + time_diff = end_cycles - start_cycles; + } + + /* write statistics in a shared structure */ + test_stats[lcore_id].enq_success = success; + test_stats[lcore_id].enq_quota = quota; + test_stats[lcore_id].enq_fail = fail; + + return 0; +} + +static int +ring_dequeue_test(int (que_func)(struct rte_ring*, void **, unsigned), + void* arg, unsigned bulk_or_burst) +{ + unsigned success = 0; + unsigned fail = 0; + unsigned i; + void *obj_table[MAX_BULK]; + int ret; + unsigned lcore_id = rte_lcore_id(); + unsigned count = *((unsigned*)arg); + uint64_t start_cycles, end_cycles; + uint64_t time_diff = 0, hz = rte_get_hpet_hz(); + + /* wait synchro for slaves */ + if (lcore_id != rte_get_master_lcore()) + while (rte_atomic32_read(&synchro) == 0); + + start_cycles = rte_get_hpet_cycles(); + + /* dequeue as many object as possible */ + while (time_diff/hz < TIME_S) { + for (i = 0; likely(i < N); i++) { + ret = que_func(r, obj_table, count); + /* + * bulk_or_burst + * 1: for bulk operation + * 0: for burst operation + */ + if (bulk_or_burst) { + if (ret == 0) + success += count; + else + fail++; + } else { + if (ret != 0) + success += ret; + else + fail++; + } + } + end_cycles = rte_get_hpet_cycles(); + time_diff = end_cycles - start_cycles; + } + + /* write statistics in a shared structure */ + test_stats[lcore_id].deq_success = success; + test_stats[lcore_id].deq_fail = fail; + + return 0; +} + +static int +test_ring_per_core_sp_enqueue(void *arg) +{ + return ring_enqueue_test(&rte_ring_sp_enqueue_bulk, arg, 1); } -#define DEFINE_DEQUEUE_FUNCTION(name, deq_code) \ -static int \ -name(__attribute__((unused)) void *arg) \ -{ \ - unsigned success = 0; \ - unsigned fail = 0; \ - unsigned i; \ - void *obj_table[MAX_BULK]; \ - int ret; \ - unsigned lcore_id = rte_lcore_id(); \ - uint64_t start_cycles, end_cycles; \ - uint64_t time_diff = 0, hz = rte_get_hpet_hz(); \ - \ - /* wait synchro for slaves */ \ - if (lcore_id != rte_get_master_lcore()) \ - while (rte_atomic32_read(&synchro) == 0); \ - \ - start_cycles = rte_get_hpet_cycles(); \ - \ - /* dequeue as many object as possible */ \ - while (time_diff/hz < TIME_S) { \ - for (i = 0; likely(i < N); i++) { \ - ret = deq_code; \ - if (ret == 0) \ - success++; \ - else \ - fail++; \ - } \ - end_cycles = rte_get_hpet_cycles(); \ - time_diff = end_cycles - start_cycles; \ - } \ - \ - /* write statistics in a shared structure */ \ - test_stats[lcore_id].deq_success = success; \ - test_stats[lcore_id].deq_fail = fail; \ - \ - return 0; \ +static int +test_ring_per_core_mp_enqueue(void *arg) +{ + return ring_enqueue_test(&rte_ring_mp_enqueue_bulk, arg, 1); +} + +static int +test_ring_per_core_mc_dequeue(void *arg) +{ + return ring_dequeue_test(&rte_ring_mc_dequeue_bulk, arg, 1); +} + +static int +test_ring_per_core_sc_dequeue(void *arg) +{ + return ring_dequeue_test(&rte_ring_sc_dequeue_bulk, arg, 1); } -DEFINE_ENQUEUE_FUNCTION(test_ring_per_core_sp_enqueue, - rte_ring_sp_enqueue_bulk(r, obj_table, bulk_enqueue)) +static int +test_ring_per_core_sp_enqueue_burst(void *arg) +{ + return ring_enqueue_test(&rte_ring_sp_enqueue_burst, arg, 0); +} -DEFINE_DEQUEUE_FUNCTION(test_ring_per_core_sc_dequeue, - rte_ring_sc_dequeue_bulk(r, obj_table, bulk_dequeue)) +static int +test_ring_per_core_mp_enqueue_burst(void *arg) +{ + return ring_enqueue_test(&rte_ring_mp_enqueue_burst, arg, 0); +} -DEFINE_ENQUEUE_FUNCTION(test_ring_per_core_mp_enqueue, - rte_ring_mp_enqueue_bulk(r, obj_table, bulk_enqueue)) +static int +test_ring_per_core_mc_dequeue_burst(void *arg) +{ + return ring_dequeue_test(&rte_ring_mc_dequeue_burst, arg, 0); +} -DEFINE_DEQUEUE_FUNCTION(test_ring_per_core_mc_dequeue, - rte_ring_mc_dequeue_bulk(r, obj_table, bulk_dequeue)) +static int +test_ring_per_core_sc_dequeue_burst(void *arg) +{ + return ring_dequeue_test(&rte_ring_sc_dequeue_burst, arg, 0); +} #define TEST_RING_VERIFY(exp) \ if (!(exp)) { \ @@ -256,7 +315,9 @@ DEFINE_DEQUEUE_FUNCTION(test_ring_per_core_mc_dequeue, static int -launch_cores(unsigned enq_core_count, unsigned deq_core_count, int sp, int sc) +launch_cores(unsigned enq_core_count, unsigned deq_core_count, + unsigned n_enq_bulk, unsigned n_deq_bulk, + int sp, int sc, int bulk_not_burst) { void *obj; unsigned lcore_id; @@ -271,29 +332,44 @@ launch_cores(unsigned enq_core_count, unsigned deq_core_count, int sp, int sc) rte_atomic32_set(&synchro, 0); printf("ring_autotest e/d_core=%u,%u e/d_bulk=%u,%u ", - enq_core_count, deq_core_count, bulk_enqueue, bulk_dequeue); + enq_core_count, deq_core_count, n_enq_bulk, n_deq_bulk); printf("sp=%d sc=%d ", sp, sc); - /* set enqueue function to be used */ - if (sp) - enq_f = test_ring_per_core_sp_enqueue; - else - enq_f = test_ring_per_core_mp_enqueue; + if (bulk_not_burst) { + /* set enqueue function to be used */ + if (sp) + enq_f = test_ring_per_core_sp_enqueue; + else + enq_f = test_ring_per_core_mp_enqueue; - /* set dequeue function to be used */ - if (sc) - deq_f = test_ring_per_core_sc_dequeue; - else - deq_f = test_ring_per_core_mc_dequeue; + /* set dequeue function to be used */ + if (sc) + deq_f = test_ring_per_core_sc_dequeue; + else + deq_f = test_ring_per_core_mc_dequeue; + + } else { + /* set enqueue function to be used */ + if (sp) + enq_f = test_ring_per_core_sp_enqueue_burst; + else + enq_f = test_ring_per_core_mp_enqueue_burst; + + /* set dequeue function to be used */ + if (sc) + deq_f = test_ring_per_core_sc_dequeue_burst; + else + deq_f = test_ring_per_core_mc_dequeue_burst; + } RTE_LCORE_FOREACH_SLAVE(lcore_id) { if (enq_core_count != 0) { enq_core_count--; - rte_eal_remote_launch(enq_f, NULL, lcore_id); + rte_eal_remote_launch(enq_f, &n_enq_bulk, lcore_id); } if (deq_core_count != 1) { deq_core_count--; - rte_eal_remote_launch(deq_f, NULL, lcore_id); + rte_eal_remote_launch(deq_f, &n_deq_bulk, lcore_id); } } @@ -301,7 +377,7 @@ launch_cores(unsigned enq_core_count, unsigned deq_core_count, int sp, int sc) /* start synchro and launch test on master */ rte_atomic32_set(&synchro, 1); - ret = deq_f(NULL); + ret = deq_f(&n_deq_bulk); /* wait all cores */ RTE_LCORE_FOREACH_SLAVE(lcore_id) { @@ -330,9 +406,8 @@ launch_cores(unsigned enq_core_count, unsigned deq_core_count, int sp, int sc) return -1; } - enq_total = (sum.enq_success * bulk_enqueue) + - (sum.enq_quota * bulk_enqueue); - deq_total = (sum.deq_success * bulk_dequeue) + deq_remain; + enq_total = sum.enq_success + sum.enq_quota; + deq_total = sum.deq_success + deq_remain; rate = deq_total/TIME_S; @@ -349,23 +424,19 @@ launch_cores(unsigned enq_core_count, unsigned deq_core_count, int sp, int sc) static int do_one_ring_test2(unsigned enq_core_count, unsigned deq_core_count, - unsigned n_enq_bulk, unsigned n_deq_bulk) + unsigned n_enq_bulk, unsigned n_deq_bulk, unsigned bulk_or_burst) { int sp, sc; int do_sp, do_sc; int ret; - bulk_enqueue = n_enq_bulk; - bulk_dequeue = n_deq_bulk; - do_sp = (enq_core_count == 1) ? 1 : 0; do_sc = (deq_core_count == 1) ? 1 : 0; for (sp = 0; sp <= do_sp; sp ++) { for (sc = 0; sc <= do_sc; sc ++) { - ret = launch_cores(enq_core_count, - deq_core_count, - sp, sc); + ret = launch_cores(enq_core_count, deq_core_count, + n_enq_bulk, n_deq_bulk, sp, sc, bulk_or_burst); if (ret < 0) return -1; } @@ -374,7 +445,8 @@ do_one_ring_test2(unsigned enq_core_count, unsigned deq_core_count, } static int -do_one_ring_test(unsigned enq_core_count, unsigned deq_core_count) +do_one_ring_test(unsigned enq_core_count, unsigned deq_core_count, + unsigned bulk_or_burst) { unsigned bulk_enqueue_tab[] = { 1, 2, 4, 32, 0 }; unsigned bulk_dequeue_tab[] = { 1, 2, 4, 32, 0 }; @@ -392,7 +464,8 @@ do_one_ring_test(unsigned enq_core_count, unsigned deq_core_count) ret = do_one_ring_test2(enq_core_count, deq_core_count, *bulk_enqueue_ptr, - *bulk_dequeue_ptr); + *bulk_dequeue_ptr, + bulk_or_burst); if (ret < 0) return -1; } @@ -401,7 +474,7 @@ do_one_ring_test(unsigned enq_core_count, unsigned deq_core_count) } static int -check_quota_and_watermark(__attribute__((unused)) void *dummy) +check_live_watermark_change(__attribute__((unused)) void *dummy) { uint64_t hz = rte_get_hpet_hz(); void *obj_table[MAX_BULK]; @@ -409,7 +482,7 @@ check_quota_and_watermark(__attribute__((unused)) void *dummy) uint64_t cur_time, end_time; int64_t diff = 0; int i, ret; - unsigned quota, quota_old = 4; + unsigned count = 4; /* init the object table */ memset(obj_table, 0, sizeof(obj_table)); @@ -418,21 +491,12 @@ check_quota_and_watermark(__attribute__((unused)) void *dummy) /* check that bulk and watermark are 4 and 32 (respectively) */ while (diff >= 0) { - /* read quota, the only change allowed is from 4 to 8 */ - quota = rte_ring_get_bulk_count(r); - if (quota != quota_old && (quota_old != 4 || quota != 8)) { - printf("Bad quota change %u -> %u\n", quota_old, - quota); - return -1; - } - quota_old = quota; - /* add in ring until we reach watermark */ ret = 0; for (i = 0; i < 16; i ++) { if (ret != 0) break; - ret = rte_ring_enqueue_bulk(r, obj_table, quota); + ret = rte_ring_enqueue_bulk(r, obj_table, count); } if (ret != -EDQUOT) { @@ -442,7 +506,7 @@ check_quota_and_watermark(__attribute__((unused)) void *dummy) } /* read watermark, the only change allowed is from 16 to 32 */ - watermark = i * quota; + watermark = r->prod.watermark; if (watermark != watermark_old && (watermark_old != 16 || watermark != 32)) { printf("Bad watermark change %u -> %u\n", watermark_old, @@ -453,7 +517,7 @@ check_quota_and_watermark(__attribute__((unused)) void *dummy) /* dequeue objects from ring */ while (i--) { - ret = rte_ring_dequeue_bulk(r, obj_table, quota); + ret = rte_ring_dequeue_bulk(r, obj_table, count); if (ret != 0) { printf("Cannot dequeue (ret=%d)\n", ret); return -1; @@ -464,9 +528,9 @@ check_quota_and_watermark(__attribute__((unused)) void *dummy) diff = end_time - cur_time; } - if (watermark_old != 32 || quota_old != 8) { - printf("quota or watermark was not updated (q=%u wm=%u)\n", - quota_old, watermark_old); + if (watermark_old != 32 ) { + printf(" watermark was not updated (wm=%u)\n", + watermark_old); return -1; } @@ -474,22 +538,19 @@ check_quota_and_watermark(__attribute__((unused)) void *dummy) } static int -test_quota_and_watermark(void) +test_live_watermark_change(void) { unsigned lcore_id = rte_lcore_id(); unsigned lcore_id2 = rte_get_next_lcore(lcore_id, 0, 1); - printf("Test quota and watermark live modification\n"); - - rte_ring_set_bulk_count(r, 4); + printf("Test watermark live modification\n"); rte_ring_set_water_mark(r, 16); /* launch a thread that will enqueue and dequeue, checking * watermark and quota */ - rte_eal_remote_launch(check_quota_and_watermark, NULL, lcore_id2); + rte_eal_remote_launch(check_live_watermark_change, NULL, lcore_id2); rte_delay_ms(1000); - rte_ring_set_bulk_count(r, 8); rte_ring_set_water_mark(r, 32); rte_delay_ms(1000); @@ -498,6 +559,7 @@ test_quota_and_watermark(void) return 0; } + /* Test for catch on invalid watermark values */ static int test_set_watermark( void ){ @@ -576,7 +638,7 @@ test_ring_basic(void) { void **src = NULL, **cur_src = NULL, **dst = NULL, **cur_dst = NULL; int ret; - unsigned i, n; + unsigned i, num_elems; /* alloc dummy object pointers */ src = malloc(RING_SIZE*2*sizeof(void *)); @@ -715,37 +777,32 @@ test_ring_basic(void) cur_dst = dst; printf("test watermark and default bulk enqueue / dequeue\n"); - rte_ring_set_bulk_count(r, 16); rte_ring_set_water_mark(r, 20); - n = rte_ring_get_bulk_count(r); - if (n != 16) { - printf("rte_ring_get_bulk_count() returned %u instead " - "of 16\n", n); - goto fail; - } + num_elems = 16; cur_src = src; cur_dst = dst; - ret = rte_ring_enqueue_bulk(r, cur_src, n); - cur_src += 16; + + ret = rte_ring_enqueue_bulk(r, cur_src, num_elems); + cur_src += num_elems; if (ret != 0) { printf("Cannot enqueue\n"); goto fail; } - ret = rte_ring_enqueue_bulk(r, cur_src, n); - cur_src += 16; + ret = rte_ring_enqueue_bulk(r, cur_src, num_elems); + cur_src += num_elems; if (ret != -EDQUOT) { printf("Watermark not exceeded\n"); goto fail; } - ret = rte_ring_dequeue_bulk(r, cur_dst, n); - cur_dst += 16; + ret = rte_ring_dequeue_bulk(r, cur_dst, num_elems); + cur_dst += num_elems; if (ret != 0) { printf("Cannot dequeue\n"); goto fail; } - ret = rte_ring_dequeue_bulk(r, cur_dst, n); - cur_dst += 16; + ret = rte_ring_dequeue_bulk(r, cur_dst, num_elems); + cur_dst += num_elems; if (ret != 0) { printf("Cannot dequeue2\n"); goto fail; @@ -758,6 +815,20 @@ test_ring_basic(void) printf("data after dequeue is not the same\n"); goto fail; } + + cur_src = src; + cur_dst = dst; + + ret = rte_ring_mp_enqueue(r, cur_src); + cur_src += 1; + if (ret != 0) + goto fail; + + ret = rte_ring_mc_dequeue(r, cur_dst); + cur_dst += 1; + if (ret != 0) + goto fail; + cur_src = src; cur_dst = dst; @@ -775,152 +846,878 @@ test_ring_basic(void) return -1; } -/* - * it will always fail to create ring with a wrong ring size number in this function - */ static int -test_ring_creation_with_wrong_size(void) +test_ring_burst_basic(void) { - struct rte_ring * rp = NULL; + void **src = NULL, **cur_src = NULL, **dst = NULL, **cur_dst = NULL; + int ret; + unsigned i; - rp = rte_ring_create("test_bad_ring_size", RING_SIZE+1, SOCKET_ID_ANY, 0); - if (NULL != rp) { - return -1; + /* alloc dummy object pointers */ + src = malloc(RING_SIZE*2*sizeof(void *)); + if (src == NULL) + goto fail; + + for (i = 0; i < RING_SIZE*2 ; i++) { + src[i] = (void *)(unsigned long)i; } + cur_src = src; - return 0; -} + /* alloc some room for copied objects */ + dst = malloc(RING_SIZE*2*sizeof(void *)); + if (dst == NULL) + goto fail; -/* - * it tests if it would always fail to create ring with an used ring name - */ -static int -test_ring_creation_with_an_used_name(void) -{ - struct rte_ring * rp; + memset(dst, 0, RING_SIZE*2*sizeof(void *)); + cur_dst = dst; - rp = rte_ring_create("test", RING_SIZE, SOCKET_ID_ANY, 0); - if (NULL != rp) - return -1; + printf("Test SP & SC basic functions \n"); + printf("enqueue 1 obj\n"); + ret = rte_ring_sp_enqueue_burst(r, cur_src, 1); + cur_src += 1; + if ((ret & RTE_RING_SZ_MASK) != 1) + goto fail; - return 0; -} + printf("enqueue 2 objs\n"); + ret = rte_ring_sp_enqueue_burst(r, cur_src, 2); + cur_src += 2; + if ((ret & RTE_RING_SZ_MASK) != 2) + goto fail; -/* - * Test to if a non-power of 2 count causes the create - * function to fail correctly - */ -static int -test_create_count_odd(void) -{ - struct rte_ring *r = rte_ring_create("test_ring_count", - 4097, SOCKET_ID_ANY, 0 ); - if(r != NULL){ - return -1; - } - return 0; -} + printf("enqueue MAX_BULK objs\n"); + ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK) ; + cur_src += MAX_BULK; + if ((ret & RTE_RING_SZ_MASK) != MAX_BULK) + goto fail; -static int -test_lookup_null(void) -{ - struct rte_ring *rlp = rte_ring_lookup("ring_not_found"); - if (rlp ==NULL) - if (rte_errno != ENOENT){ - printf( "test failed to returnn error on null pointer\n"); - return -1; - } - return 0; -} + printf("dequeue 1 obj\n"); + ret = rte_ring_sc_dequeue_burst(r, cur_dst, 1) ; + cur_dst += 1; + if ((ret & RTE_RING_SZ_MASK) != 1) + goto fail; -/* - * it tests some more basic ring operations - */ -static int -test_ring_basic_ex(void) -{ - int ret = -1; - unsigned i; - struct rte_ring * rp; - void **obj = NULL; + printf("dequeue 2 objs\n"); + ret = rte_ring_sc_dequeue_burst(r, cur_dst, 2); + cur_dst += 2; + if ((ret & RTE_RING_SZ_MASK) != 2) + goto fail; - obj = (void **)rte_zmalloc("test_ring_basic_ex_malloc", (RING_SIZE * sizeof(void *)), 0); - if (obj == NULL) { - printf("test_ring_basic_ex fail to rte_malloc\n"); - goto fail_test; - } + printf("dequeue MAX_BULK objs\n"); + ret = rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK); + cur_dst += MAX_BULK; + if ((ret & RTE_RING_SZ_MASK) != MAX_BULK) + goto fail; - rp = rte_ring_create("test_ring_basic_ex", RING_SIZE, SOCKET_ID_ANY, 0); - if (rp == NULL) { - printf("test_ring_basic_ex fail to create ring\n"); - goto fail_test; + /* check data */ + if (memcmp(src, dst, cur_dst - dst)) { + test_hexdump("src", src, cur_src - src); + test_hexdump("dst", dst, cur_dst - dst); + printf("data after dequeue is not the same\n"); + goto fail; } - if (rte_ring_lookup("test_ring_basic_ex") != rp) { - goto fail_test; - } + cur_src = src; + cur_dst = dst; - if (rte_ring_empty(rp) != 1) { - printf("test_ring_basic_ex ring is not empty but it should be\n"); - goto fail_test; + printf("Test enqueue without enough memory space \n"); + for (i = 0; i< (RING_SIZE/MAX_BULK - 1); i++) { + ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK); + cur_src += MAX_BULK; + if ((ret & RTE_RING_SZ_MASK) != MAX_BULK) { + goto fail; + } } - printf("%u ring entries are now free\n", rte_ring_free_count(rp)); + printf("Enqueue 2 objects, free entries = MAX_BULK - 2 \n"); + ret = rte_ring_sp_enqueue_burst(r, cur_src, 2); + cur_src += 2; + if ((ret & RTE_RING_SZ_MASK) != 2) + goto fail; - for (i = 0; i < RING_SIZE; i ++) { - rte_ring_enqueue(rp, obj[i]); - } + printf("Enqueue the remaining entries = MAX_BULK - 2 \n"); + /* Always one free entry left */ + ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK); + cur_src += MAX_BULK - 3; + if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3) + goto fail; - if (rte_ring_full(rp) != 1) { - printf("test_ring_basic_ex ring is not full but it should be\n"); - goto fail_test; - } + printf("Test if ring is full \n"); + if (rte_ring_full(r) != 1) + goto fail; - for (i = 0; i < RING_SIZE; i ++) { - rte_ring_dequeue(rp, &obj[i]); - } + printf("Test enqueue for a full entry \n"); + ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK); + if ((ret & RTE_RING_SZ_MASK) != 0) + goto fail; - if (rte_ring_empty(rp) != 1) { - printf("test_ring_basic_ex ring is not empty but it should be\n"); - goto fail_test; + printf("Test dequeue without enough objects \n"); + for (i = 0; istats[lcore_id]; + + printf("Test the ring stats.\n"); + + /* Reset the watermark in case it was set in another test. */ + rte_ring_set_water_mark(r, 0); + + /* Reset the ring stats. */ + memset(&r->stats[lcore_id], 0, sizeof(r->stats[lcore_id])); + + /* Allocate some dummy object pointers. */ + src = malloc(RING_SIZE*2*sizeof(void *)); + if (src == NULL) + goto fail; + + for (i = 0; i < RING_SIZE*2 ; i++) { + src[i] = (void *)(unsigned long)i; + } + + /* Allocate some memory for copied objects. */ + dst = malloc(RING_SIZE*2*sizeof(void *)); + if (dst == NULL) + goto fail; + + memset(dst, 0, RING_SIZE*2*sizeof(void *)); + + /* Set the head and tail pointers. */ + cur_src = src; + cur_dst = dst; + + /* Do Enqueue tests. */ + printf("Test the dequeue stats.\n"); + + /* Fill the ring up to RING_SIZE -1. */ + printf("Fill the ring.\n"); + for (i = 0; i< (RING_SIZE/MAX_BULK); i++) { + rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK); + cur_src += MAX_BULK; + } + + /* Adjust for final enqueue = MAX_BULK -1. */ + cur_src--; + + printf("Verify that the ring is full.\n"); + if (rte_ring_full(r) != 1) + goto fail; + + + printf("Verify the enqueue success stats.\n"); + /* Stats should match above enqueue operations to fill the ring. */ + if (ring_stats->enq_success_bulk != (RING_SIZE/MAX_BULK)) + goto fail; + + /* Current max objects is RING_SIZE -1. */ + if (ring_stats->enq_success_objs != RING_SIZE -1) + goto fail; + + /* Shouldn't have any failures yet. */ + if (ring_stats->enq_fail_bulk != 0) + goto fail; + if (ring_stats->enq_fail_objs != 0) + goto fail; + + + printf("Test stats for SP burst enqueue to a full ring.\n"); + num_items = 2; + ret = rte_ring_sp_enqueue_burst(r, cur_src, num_items); + if ((ret & RTE_RING_SZ_MASK) != 0) + goto fail; + + failed_enqueue_ops += 1; + failed_enqueue_items += num_items; + + /* The enqueue should have failed. */ + if (ring_stats->enq_fail_bulk != failed_enqueue_ops) + goto fail; + if (ring_stats->enq_fail_objs != failed_enqueue_items) + goto fail; + + + printf("Test stats for SP bulk enqueue to a full ring.\n"); + num_items = 4; + ret = rte_ring_sp_enqueue_bulk(r, cur_src, num_items); + if (ret != -ENOBUFS) + goto fail; + + failed_enqueue_ops += 1; + failed_enqueue_items += num_items; + + /* The enqueue should have failed. */ + if (ring_stats->enq_fail_bulk != failed_enqueue_ops) + goto fail; + if (ring_stats->enq_fail_objs != failed_enqueue_items) + goto fail; + + + printf("Test stats for MP burst enqueue to a full ring.\n"); + num_items = 8; + ret = rte_ring_mp_enqueue_burst(r, cur_src, num_items); + if ((ret & RTE_RING_SZ_MASK) != 0) + goto fail; + + failed_enqueue_ops += 1; + failed_enqueue_items += num_items; + + /* The enqueue should have failed. */ + if (ring_stats->enq_fail_bulk != failed_enqueue_ops) + goto fail; + if (ring_stats->enq_fail_objs != failed_enqueue_items) + goto fail; + + + printf("Test stats for MP bulk enqueue to a full ring.\n"); + num_items = 16; + ret = rte_ring_mp_enqueue_bulk(r, cur_src, num_items); + if (ret != -ENOBUFS) + goto fail; + + failed_enqueue_ops += 1; + failed_enqueue_items += num_items; + + /* The enqueue should have failed. */ + if (ring_stats->enq_fail_bulk != failed_enqueue_ops) + goto fail; + if (ring_stats->enq_fail_objs != failed_enqueue_items) + goto fail; + + + /* Do Dequeue tests. */ + printf("Test the dequeue stats.\n"); + + printf("Empty the ring.\n"); + for (i = 0; ideq_success_bulk != (RING_SIZE/MAX_BULK)) + goto fail; + + /* Objects dequeued is RING_SIZE -1. */ + if (ring_stats->deq_success_objs != RING_SIZE -1) + goto fail; + + /* Shouldn't have any dequeue failure stats yet. */ + if (ring_stats->deq_fail_bulk != 0) + goto fail; + + printf("Test stats for SC burst dequeue with an empty ring.\n"); + num_items = 2; + ret = rte_ring_sc_dequeue_burst(r, cur_dst, num_items); + if ((ret & RTE_RING_SZ_MASK) != 0) + goto fail; + + failed_dequeue_ops += 1; + failed_dequeue_items += num_items; + + /* The dequeue should have failed. */ + if (ring_stats->deq_fail_bulk != failed_dequeue_ops) + goto fail; + if (ring_stats->deq_fail_objs != failed_dequeue_items) + goto fail; + + + printf("Test stats for SC bulk dequeue with an empty ring.\n"); + num_items = 4; + ret = rte_ring_sc_dequeue_bulk(r, cur_dst, num_items); + if (ret != -ENOENT) + goto fail; + + failed_dequeue_ops += 1; + failed_dequeue_items += num_items; + + /* The dequeue should have failed. */ + if (ring_stats->deq_fail_bulk != failed_dequeue_ops) + goto fail; + if (ring_stats->deq_fail_objs != failed_dequeue_items) + goto fail; + + + printf("Test stats for MC burst dequeue with an empty ring.\n"); + num_items = 8; + ret = rte_ring_mc_dequeue_burst(r, cur_dst, num_items); + if ((ret & RTE_RING_SZ_MASK) != 0) + goto fail; + failed_dequeue_ops += 1; + failed_dequeue_items += num_items; + + /* The dequeue should have failed. */ + if (ring_stats->deq_fail_bulk != failed_dequeue_ops) + goto fail; + if (ring_stats->deq_fail_objs != failed_dequeue_items) + goto fail; + + + printf("Test stats for MC bulk dequeue with an empty ring.\n"); + num_items = 16; + ret = rte_ring_mc_dequeue_bulk(r, cur_dst, num_items); + if (ret != -ENOENT) + goto fail; + + failed_dequeue_ops += 1; + failed_dequeue_items += num_items; + + /* The dequeue should have failed. */ + if (ring_stats->deq_fail_bulk != failed_dequeue_ops) + goto fail; + if (ring_stats->deq_fail_objs != failed_dequeue_items) + goto fail; + + + printf("Test total enqueue/dequeue stats.\n"); + /* At this point the enqueue and dequeue stats should be the same. */ + if (ring_stats->enq_success_bulk != ring_stats->deq_success_bulk) + goto fail; + if (ring_stats->enq_success_objs != ring_stats->deq_success_objs) + goto fail; + if (ring_stats->enq_fail_bulk != ring_stats->deq_fail_bulk) + goto fail; + if (ring_stats->enq_fail_objs != ring_stats->deq_fail_objs) + goto fail; + + + /* Watermark Tests. */ + printf("Test the watermark/quota stats.\n"); + + printf("Verify the initial watermark stats.\n"); + /* Watermark stats should be 0 since there is no watermark. */ + if (ring_stats->enq_quota_bulk != 0) + goto fail; + if (ring_stats->enq_quota_objs != 0) + goto fail; + + /* Set a watermark. */ + rte_ring_set_water_mark(r, 16); + + /* Reset pointers. */ + cur_src = src; + cur_dst = dst; + + last_enqueue_ops = ring_stats->enq_success_bulk; + last_enqueue_items = ring_stats->enq_success_objs; + + + printf("Test stats for SP burst enqueue below watermark.\n"); + num_items = 8; + ret = rte_ring_sp_enqueue_burst(r, cur_src, num_items); + if ((ret & RTE_RING_SZ_MASK) != num_items) + goto fail; + + /* Watermark stats should still be 0. */ + if (ring_stats->enq_quota_bulk != 0) + goto fail; + if (ring_stats->enq_quota_objs != 0) + goto fail; + + /* Success stats should have increased. */ + if (ring_stats->enq_success_bulk != last_enqueue_ops + 1) + goto fail; + if (ring_stats->enq_success_objs != last_enqueue_items + num_items) + goto fail; + + last_enqueue_ops = ring_stats->enq_success_bulk; + last_enqueue_items = ring_stats->enq_success_objs; + + + printf("Test stats for SP burst enqueue at watermark.\n"); + num_items = 8; + ret = rte_ring_sp_enqueue_burst(r, cur_src, num_items); + if ((ret & RTE_RING_SZ_MASK) != num_items) + goto fail; + + /* Watermark stats should have changed. */ + if (ring_stats->enq_quota_bulk != 1) + goto fail; + if (ring_stats->enq_quota_objs != num_items) + goto fail; + + last_quota_ops = ring_stats->enq_quota_bulk; + last_quota_items = ring_stats->enq_quota_objs; + + + printf("Test stats for SP burst enqueue above watermark.\n"); + num_items = 1; + ret = rte_ring_sp_enqueue_burst(r, cur_src, num_items); + if ((ret & RTE_RING_SZ_MASK) != num_items) + goto fail; + + /* Watermark stats should have changed. */ + if (ring_stats->enq_quota_bulk != last_quota_ops +1) + goto fail; + if (ring_stats->enq_quota_objs != last_quota_items + num_items) + goto fail; + + last_quota_ops = ring_stats->enq_quota_bulk; + last_quota_items = ring_stats->enq_quota_objs; + + + printf("Test stats for MP burst enqueue above watermark.\n"); + num_items = 2; + ret = rte_ring_mp_enqueue_burst(r, cur_src, num_items); + if ((ret & RTE_RING_SZ_MASK) != num_items) + goto fail; + + /* Watermark stats should have changed. */ + if (ring_stats->enq_quota_bulk != last_quota_ops +1) + goto fail; + if (ring_stats->enq_quota_objs != last_quota_items + num_items) + goto fail; + + last_quota_ops = ring_stats->enq_quota_bulk; + last_quota_items = ring_stats->enq_quota_objs; + + + printf("Test stats for SP bulk enqueue above watermark.\n"); + num_items = 4; + ret = rte_ring_sp_enqueue_bulk(r, cur_src, num_items); + if (ret != -EDQUOT) + goto fail; + + /* Watermark stats should have changed. */ + if (ring_stats->enq_quota_bulk != last_quota_ops +1) + goto fail; + if (ring_stats->enq_quota_objs != last_quota_items + num_items) + goto fail; + + last_quota_ops = ring_stats->enq_quota_bulk; + last_quota_items = ring_stats->enq_quota_objs; + + + printf("Test stats for MP bulk enqueue above watermark.\n"); + num_items = 8; + ret = rte_ring_mp_enqueue_bulk(r, cur_src, num_items); + if (ret != -EDQUOT) + goto fail; + + /* Watermark stats should have changed. */ + if (ring_stats->enq_quota_bulk != last_quota_ops +1) + goto fail; + if (ring_stats->enq_quota_objs != last_quota_items + num_items) + goto fail; + + printf("Test watermark success stats.\n"); + /* Success stats should be same as last non-watermarked enqueue. */ + if (ring_stats->enq_success_bulk != last_enqueue_ops) + goto fail; + if (ring_stats->enq_success_objs != last_enqueue_items) + goto fail; + + + /* Cleanup. */ + + /* Empty the ring. */ + for (i = 0; istats[lcore_id], 0, sizeof(r->stats[lcore_id])); + + /* Free memory before test completed */ + if (src) + free(src); + if (dst) + free(dst); + return 0; + +fail: + if (src) + free(src); + if (dst) + free(dst); + return -1; +#endif +} + +/* + * it will always fail to create ring with a wrong ring size number in this function + */ +static int +test_ring_creation_with_wrong_size(void) +{ + struct rte_ring * rp = NULL; + + /* Test if ring size is not power of 2 */ + rp = rte_ring_create("test_bad_ring_size", RING_SIZE + 1, SOCKET_ID_ANY, 0); + if (NULL != rp) { + return -1; + } + + /* Test if ring size is exceeding the limit */ + rp = rte_ring_create("test_bad_ring_size", (RTE_RING_SZ_MASK + 1), SOCKET_ID_ANY, 0); + if (NULL != rp) { + return -1; + } + return 0; +} + +/* + * it tests if it would always fail to create ring with an used ring name + */ +static int +test_ring_creation_with_an_used_name(void) +{ + struct rte_ring * rp; + + rp = rte_ring_create("test", RING_SIZE, SOCKET_ID_ANY, 0); + if (NULL != rp) + return -1; + + return 0; +} + +/* + * Test to if a non-power of 2 count causes the create + * function to fail correctly + */ +static int +test_create_count_odd(void) +{ + struct rte_ring *r = rte_ring_create("test_ring_count", + 4097, SOCKET_ID_ANY, 0 ); + if(r != NULL){ + return -1; + } + return 0; +} + +static int +test_lookup_null(void) +{ + struct rte_ring *rlp = rte_ring_lookup("ring_not_found"); + if (rlp ==NULL) + if (rte_errno != ENOENT){ + printf( "test failed to returnn error on null pointer\n"); + return -1; + } + return 0; +} + +/* + * it tests some more basic ring operations + */ +static int +test_ring_basic_ex(void) +{ + int ret = -1; + unsigned i; + struct rte_ring * rp; + void **obj = NULL; + + obj = (void **)rte_zmalloc("test_ring_basic_ex_malloc", (RING_SIZE * sizeof(void *)), 0); + if (obj == NULL) { + printf("test_ring_basic_ex fail to rte_malloc\n"); + goto fail_test; + } + + rp = rte_ring_create("test_ring_basic_ex", RING_SIZE, SOCKET_ID_ANY, + RING_F_SP_ENQ | RING_F_SC_DEQ); + if (rp == NULL) { + printf("test_ring_basic_ex fail to create ring\n"); + goto fail_test; + } + + if (rte_ring_lookup("test_ring_basic_ex") != rp) { + goto fail_test; + } + + if (rte_ring_empty(rp) != 1) { + printf("test_ring_basic_ex ring is not empty but it should be\n"); + goto fail_test; + } + + printf("%u ring entries are now free\n", rte_ring_free_count(rp)); + + for (i = 0; i < RING_SIZE; i ++) { + rte_ring_enqueue(rp, obj[i]); + } + + if (rte_ring_full(rp) != 1) { + printf("test_ring_basic_ex ring is not full but it should be\n"); + goto fail_test; + } + + for (i = 0; i < RING_SIZE; i ++) { + rte_ring_dequeue(rp, &obj[i]); + } + + if (rte_ring_empty(rp) != 1) { + printf("test_ring_basic_ex ring is not empty but it should be\n"); + goto fail_test; + } + + /* Covering the ring burst operation */ + ret = rte_ring_enqueue_burst(rp, obj, 2); + if ((ret & RTE_RING_SZ_MASK) != 2) { + printf("test_ring_basic_ex: rte_ring_enqueue_burst fails \n"); + goto fail_test; + } + + ret = rte_ring_dequeue_burst(rp, obj, 2); + if (ret != 2) { + printf("test_ring_basic_ex: rte_ring_dequeue_burst fails \n"); + goto fail_test; } + ret = 0; +fail_test: + if (obj != NULL) + rte_free(obj); + + return ret; +} + +int +test_ring(void) +{ + unsigned enq_core_count, deq_core_count; + + /* some more basic operations */ + if (test_ring_basic_ex() < 0) + return -1; + + rte_atomic32_init(&synchro); + + if (r == NULL) + r = rte_ring_create("test", RING_SIZE, SOCKET_ID_ANY, 0); + if (r == NULL) + return -1; + + /* retrieve the ring from its name */ + if (rte_ring_lookup("test") != r) { + printf("Cannot lookup ring from its name\n"); + return -1; + } + + /* burst operations */ + if (test_ring_burst_basic() < 0) + return -1; + /* basic operations */ if (test_ring_basic() < 0) return -1; + /* ring stats */ + if (test_ring_stats() < 0) + return -1; + /* basic operations */ - if (test_quota_and_watermark() < 0) + if (test_live_watermark_change() < 0) return -1; if ( test_set_watermark() < 0){ @@ -944,31 +1741,56 @@ test_ring(void) else printf ( "Test detected NULL ring lookup \n"); + printf("start performance tests \n"); + + /* one lcore for enqueue, one for dequeue */ + enq_core_count = 1; + deq_core_count = 1; + if (do_one_ring_test(enq_core_count, deq_core_count, 1) < 0) + return -1; + + /* max cores for enqueue, one for dequeue */ + enq_core_count = rte_lcore_count() - 1; + deq_core_count = 1; + if (do_one_ring_test(enq_core_count, deq_core_count, 1) < 0) + return -1; + + /* max cores for dequeue, one for enqueue */ + enq_core_count = 1; + deq_core_count = rte_lcore_count() - 1; + if (do_one_ring_test(enq_core_count, deq_core_count, 1) < 0) + return -1; + + /* half for enqueue and half for dequeue */ + enq_core_count = rte_lcore_count() / 2; + deq_core_count = rte_lcore_count() / 2; + if (do_one_ring_test(enq_core_count, deq_core_count, 1) < 0) + return -1; - printf("start performance tests\n"); + printf("start performance tests - burst operations \n"); /* one lcore for enqueue, one for dequeue */ enq_core_count = 1; deq_core_count = 1; - if (do_one_ring_test(enq_core_count, deq_core_count) < 0) + if (do_one_ring_test(enq_core_count, deq_core_count, 0) < 0) return -1; /* max cores for enqueue, one for dequeue */ enq_core_count = rte_lcore_count() - 1; deq_core_count = 1; - if (do_one_ring_test(enq_core_count, deq_core_count) < 0) + if (do_one_ring_test(enq_core_count, deq_core_count, 0) < 0) return -1; /* max cores for dequeue, one for enqueue */ enq_core_count = 1; deq_core_count = rte_lcore_count() - 1; - if (do_one_ring_test(enq_core_count, deq_core_count) < 0) + if (do_one_ring_test(enq_core_count, deq_core_count, 0) < 0) return -1; /* half for enqueue and half for dequeue */ enq_core_count = rte_lcore_count() / 2; deq_core_count = rte_lcore_count() / 2; - if (do_one_ring_test(enq_core_count, deq_core_count) < 0) + if (do_one_ring_test(enq_core_count, deq_core_count, 0) < 0) return -1; /* test of creating ring with wrong size */ diff --git a/lib/librte_ring/rte_ring.c b/lib/librte_ring/rte_ring.c index 67e5223c67..2d6ed1f05d 100644 --- a/lib/librte_ring/rte_ring.c +++ b/lib/librte_ring/rte_ring.c @@ -128,9 +128,10 @@ rte_ring_create(const char *name, unsigned count, int socket_id, } /* count must be a power of 2 */ - if (!POWEROF2(count)) { + if ((!POWEROF2(count)) || (count > RTE_RING_SZ_MASK )) { rte_errno = EINVAL; - RTE_LOG(ERR, RING, "Requested size is not a power of 2\n"); + RTE_LOG(ERR, RING, "Requested size is invalid, must be power of 2, and " + "do not exceed the size limit %u\n", RTE_RING_SZ_MASK); return NULL; } @@ -205,7 +206,6 @@ rte_ring_dump(const struct rte_ring *r) printf(" watermark=0\n"); else printf(" watermark=%"PRIu32"\n", r->prod.watermark); - printf(" bulk_default=%"PRIu32"\n", r->prod.bulk_default); /* sum and dump statistics */ #ifdef RTE_LIBRTE_RING_DEBUG diff --git a/lib/librte_ring/rte_ring.h b/lib/librte_ring/rte_ring.h index 27bc60a4fb..b0f9860f0f 100644 --- a/lib/librte_ring/rte_ring.h +++ b/lib/librte_ring/rte_ring.h @@ -101,6 +101,10 @@ extern "C" { #include #include +enum rte_ring_queue_behavior { + RTE_RING_QUEUE_FIXED = 0, /* Enq/Deq a fixed number of items from a ring */ + RTE_RING_QUEUE_VARIABLE /* Enq/Deq as many items a possible from ring */ +}; #ifdef RTE_LIBRTE_RING_DEBUG /** @@ -140,7 +144,6 @@ struct rte_ring { /** Ring producer status. */ struct prod { - volatile uint32_t bulk_default; /**< Default bulk count. */ uint32_t watermark; /**< Maximum items before EDQUOT. */ uint32_t sp_enqueue; /**< True, if single producer. */ uint32_t size; /**< Size of ring. */ @@ -151,7 +154,6 @@ struct rte_ring { /** Ring consumer status. */ struct cons { - volatile uint32_t bulk_default; /**< Default bulk count. */ uint32_t sc_dequeue; /**< True, if single consumer. */ uint32_t size; /**< Size of the ring. */ uint32_t mask; /**< Mask (size-1) of ring. */ @@ -170,9 +172,11 @@ struct rte_ring { #define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */ #define RING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */ +#define RTE_RING_QUOT_EXCEED (1 << 31) /**< Quota exceed for burst ops */ +#define RTE_RING_SZ_MASK (unsigned)(0x0fffffff) /**< Ring size mask */ /** - * When debug is enabled, store ring statistics. + * @internal When debug is enabled, store ring statistics. * @param r * A pointer to the ring. * @param name @@ -195,7 +199,7 @@ struct rte_ring { * * This function uses ``memzone_reserve()`` to allocate memory. Its size is * set to *count*, which must be a power of two. Water marking is - * disabled by default. The default bulk count is initialized to 1. + * disabled by default. * Note that the real usable ring size is *count-1* instead of * *count*. * @@ -229,45 +233,6 @@ struct rte_ring { struct rte_ring *rte_ring_create(const char *name, unsigned count, int socket_id, unsigned flags); -/** - * Set the default bulk count for enqueue/dequeue. - * - * The parameter *count* is the default number of bulk elements to - * get/put when using ``rte_ring_*_{en,de}queue_bulk()``. It must be - * greater than 0 and less than half of the ring size. - * - * @param r - * A pointer to the ring structure. - * @param count - * A new water mark value. - * @return - * - 0: Success; default_bulk_count changed. - * - -EINVAL: Invalid count value. - */ -static inline int -rte_ring_set_bulk_count(struct rte_ring *r, unsigned count) -{ - if (unlikely(count == 0 || count >= r->prod.size)) - return -EINVAL; - - r->prod.bulk_default = r->cons.bulk_default = count; - return 0; -} - -/** - * Get the default bulk count for enqueue/dequeue. - * - * @param r - * A pointer to the ring structure. - * @return - * The default bulk count for enqueue/dequeue. - */ -static inline unsigned -rte_ring_get_bulk_count(struct rte_ring *r) -{ - return r->prod.bulk_default; -} - /** * Change the high water mark. * @@ -275,7 +240,7 @@ rte_ring_get_bulk_count(struct rte_ring *r) * *count* value. The *count* value must be greater than 0 and less * than the ring size. * - * This function can be called at any time (not necessarilly at + * This function can be called at any time (not necessarily at * initialization). * * @param r @@ -297,7 +262,7 @@ int rte_ring_set_water_mark(struct rte_ring *r, unsigned count); void rte_ring_dump(const struct rte_ring *r); /** - * Enqueue several objects on the ring (multi-producers safe). + * @internal Enqueue several objects on the ring (multi-producers safe). * * This function uses a "compare and set" instruction to move the * producer index atomically. @@ -307,20 +272,27 @@ void rte_ring_dump(const struct rte_ring *r); * @param obj_table * A pointer to a table of void * pointers (objects). * @param n - * The number of objects to add in the ring from the obj_table. The - * value must be strictly positive. + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring * @return + * Depend on the behavior value + * if behavior = RTE_RING_QUEUE_FIXED * - 0: Success; objects enqueue. * - -EDQUOT: Quota exceeded. The objects have been enqueued, but the * high water mark is exceeded. * - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued. + * if behavior = RTE_RING_QUEUE_VARIABLE + * - n: Actual number of objects enqueued. */ static inline int -rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, - unsigned n) +__rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table, + unsigned n, enum rte_ring_queue_behavior behavior) { uint32_t prod_head, prod_next; uint32_t cons_tail, free_entries; + const unsigned max = n; int success; unsigned i; uint32_t mask = r->prod.mask; @@ -328,6 +300,9 @@ rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, /* move prod.head atomically */ do { + /* Reset n to the initial burst count */ + n = max; + prod_head = r->prod.head; cons_tail = r->cons.tail; /* The subtraction is done between two unsigned 32bits value @@ -338,8 +313,19 @@ rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, /* check that we have enough room in ring */ if (unlikely(n > free_entries)) { - __RING_STAT_ADD(r, enq_fail, n); - return -ENOBUFS; + if (behavior == RTE_RING_QUEUE_FIXED) { + __RING_STAT_ADD(r, enq_fail, n); + return -ENOBUFS; + } + else { + /* No free entry available */ + if (unlikely(free_entries == 0)) { + __RING_STAT_ADD(r, enq_fail, n); + return 0; + } + + n = free_entries; + } } prod_next = prod_head + n; @@ -352,13 +338,14 @@ rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, r->ring[(prod_head + i) & mask] = obj_table[i]; rte_wmb(); - /* return -EDQUOT if we exceed the watermark */ + /* if we exceed the watermark */ if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) { - ret = -EDQUOT; + ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT : + (int)(n | RTE_RING_QUOT_EXCEED); __RING_STAT_ADD(r, enq_quota, n); } else { - ret = 0; + ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n; __RING_STAT_ADD(r, enq_success, n); } @@ -374,24 +361,30 @@ rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, } /** - * Enqueue several objects on a ring (NOT multi-producers safe). + * @internal Enqueue several objects on a ring (NOT multi-producers safe). * * @param r * A pointer to the ring structure. * @param obj_table * A pointer to a table of void * pointers (objects). * @param n - * The number of objects to add in the ring from the obj_table. The - * value must be strictly positive. + * The number of objects to add in the ring from the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring * @return - * - 0: Success; objects enqueued. + * Depend on the behavior value + * if behavior = RTE_RING_QUEUE_FIXED + * - 0: Success; objects enqueue. * - -EDQUOT: Quota exceeded. The objects have been enqueued, but the * high water mark is exceeded. - * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. + * - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued. + * if behavior = RTE_RING_QUEUE_VARIABLE + * - n: Actual number of objects enqueued. */ static inline int -rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, - unsigned n) +__rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table, + unsigned n, enum rte_ring_queue_behavior behavior) { uint32_t prod_head, cons_tail; uint32_t prod_next, free_entries; @@ -409,8 +402,19 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, /* check that we have enough room in ring */ if (unlikely(n > free_entries)) { - __RING_STAT_ADD(r, enq_fail, n); - return -ENOBUFS; + if (behavior == RTE_RING_QUEUE_FIXED) { + __RING_STAT_ADD(r, enq_fail, n); + return -ENOBUFS; + } + else { + /* No free entry available */ + if (unlikely(free_entries == 0)) { + __RING_STAT_ADD(r, enq_fail, n); + return 0; + } + + n = free_entries; + } } prod_next = prod_head + n; @@ -421,13 +425,14 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, r->ring[(prod_head + i) & mask] = obj_table[i]; rte_wmb(); - /* return -EDQUOT if we exceed the watermark */ + /* if we exceed the watermark */ if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) { - ret = -EDQUOT; + ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT : + (int)(n | RTE_RING_QUOT_EXCEED); __RING_STAT_ADD(r, enq_quota, n); } else { - ret = 0; + ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n; __RING_STAT_ADD(r, enq_success, n); } @@ -435,6 +440,213 @@ rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, return ret; } +/** + * @internal Dequeue several objects from a ring (multi-consumers safe). When + * the request objects are more than the available objects, only dequeue the + * actual number of objects + * + * This function uses a "compare and set" instruction to move the + * consumer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring + * @return + * Depend on the behavior value + * if behavior = RTE_RING_QUEUE_FIXED + * - 0: Success; objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue; no object is + * dequeued. + * if behavior = RTE_RING_QUEUE_VARIABLE + * - n: Actual number of objects dequeued. + */ + +static inline int +__rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table, + unsigned n, enum rte_ring_queue_behavior behavior) +{ + uint32_t cons_head, prod_tail; + uint32_t cons_next, entries; + const unsigned max = n; + int success; + unsigned i; + uint32_t mask = r->prod.mask; + + /* move cons.head atomically */ + do { + /* Restore n as it may change every loop */ + n = max; + + cons_head = r->cons.head; + prod_tail = r->prod.tail; + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * cons_head > prod_tail). So 'entries' is always between 0 + * and size(ring)-1. */ + entries = (prod_tail - cons_head); + + /* Set the actual entries for dequeue */ + if (unlikely(n > entries)) { + if (behavior == RTE_RING_QUEUE_FIXED) { + __RING_STAT_ADD(r, deq_fail, n); + return -ENOENT; + } + else { + if (unlikely(entries == 0)){ + __RING_STAT_ADD(r, deq_fail, n); + return 0; + } + + n = entries; + } + } + + cons_next = cons_head + n; + success = rte_atomic32_cmpset(&r->cons.head, cons_head, + cons_next); + } while (unlikely(success == 0)); + + /* copy in table */ + rte_rmb(); + for (i = 0; likely(i < n); i++) { + obj_table[i] = r->ring[(cons_head + i) & mask]; + } + + /* + * If there are other dequeues in progress that preceded us, + * we need to wait for them to complete + */ + while (unlikely(r->cons.tail != cons_head)) + rte_pause(); + + __RING_STAT_ADD(r, deq_success, n); + r->cons.tail = cons_next; + + return behavior == RTE_RING_QUEUE_FIXED ? 0 : n; +} + +/** + * @internal Dequeue several objects from a ring (NOT multi-consumers safe). + * When the request objects are more than the available objects, only dequeue + * the actual number of objects + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @param behavior + * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring + * RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring + * @return + * Depend on the behavior value + * if behavior = RTE_RING_QUEUE_FIXED + * - 0: Success; objects dequeued. + * - -ENOENT: Not enough entries in the ring to dequeue; no object is + * dequeued. + * if behavior = RTE_RING_QUEUE_VARIABLE + * - n: Actual number of objects dequeued. + */ +static inline int +__rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table, + unsigned n, enum rte_ring_queue_behavior behavior) +{ + uint32_t cons_head, prod_tail; + uint32_t cons_next, entries; + unsigned i; + uint32_t mask = r->prod.mask; + + cons_head = r->cons.head; + prod_tail = r->prod.tail; + /* The subtraction is done between two unsigned 32bits value + * (the result is always modulo 32 bits even if we have + * cons_head > prod_tail). So 'entries' is always between 0 + * and size(ring)-1. */ + entries = prod_tail - cons_head; + + if (unlikely(n > entries)) { + if (behavior == RTE_RING_QUEUE_FIXED) { + __RING_STAT_ADD(r, deq_fail, n); + return -ENOENT; + } + else { + if (unlikely(entries == 0)){ + __RING_STAT_ADD(r, deq_fail, n); + return 0; + } + + n = entries; + } + } + + cons_next = cons_head + n; + r->cons.head = cons_next; + + /* copy in table */ + rte_rmb(); + for (i = 0; likely(i < n); i++) { + obj_table[i] = r->ring[(cons_head + i) & mask]; + } + + __RING_STAT_ADD(r, deq_success, n); + r->cons.tail = cons_next; + return behavior == RTE_RING_QUEUE_FIXED ? 0 : n; +} + +/** + * Enqueue several objects on the ring (multi-producers safe). + * + * This function uses a "compare and set" instruction to move the + * producer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @return + * - 0: Success; objects enqueue. + * - -EDQUOT: Quota exceeded. The objects have been enqueued, but the + * high water mark is exceeded. + * - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued. + */ +static inline int +rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, + unsigned n) +{ + return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED); +} + +/** + * Enqueue several objects on a ring (NOT multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @return + * - 0: Success; objects enqueued. + * - -EDQUOT: Quota exceeded. The objects have been enqueued, but the + * high water mark is exceeded. + * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued. + */ +static inline int +rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table, + unsigned n) +{ + return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED); +} + /** * Enqueue several objects on a ring. * @@ -542,8 +754,7 @@ rte_ring_enqueue(struct rte_ring *r, void *obj) * @param obj_table * A pointer to a table of void * pointers (objects) that will be filled. * @param n - * The number of objects to dequeue from the ring to the obj_table, - * must be strictly positive + * The number of objects to dequeue from the ring to the obj_table. * @return * - 0: Success; objects dequeued. * - -ENOENT: Not enough entries in the ring to dequeue; no object is @@ -552,49 +763,7 @@ rte_ring_enqueue(struct rte_ring *r, void *obj) static inline int rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n) { - uint32_t cons_head, prod_tail; - uint32_t cons_next, entries; - int success; - unsigned i; - uint32_t mask = r->prod.mask; - - /* move cons.head atomically */ - do { - cons_head = r->cons.head; - prod_tail = r->prod.tail; - /* The subtraction is done between two unsigned 32bits value - * (the result is always modulo 32 bits even if we have - * cons_head > prod_tail). So 'entries' is always between 0 - * and size(ring)-1. */ - entries = (prod_tail - cons_head); - - /* check that we have enough entries in ring */ - if (unlikely(n > entries)) { - __RING_STAT_ADD(r, deq_fail, n); - return -ENOENT; - } - - cons_next = cons_head + n; - success = rte_atomic32_cmpset(&r->cons.head, cons_head, - cons_next); - } while (unlikely(success == 0)); - - /* copy in table */ - rte_rmb(); - for (i = 0; likely(i < n); i++) { - obj_table[i] = r->ring[(cons_head + i) & mask]; - } - - /* - * If there are other dequeues in progress that preceeded us, - * we need to wait for them to complete - */ - while (unlikely(r->cons.tail != cons_head)) - rte_pause(); - - __RING_STAT_ADD(r, deq_success, n); - r->cons.tail = cons_next; - return 0; + return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED); } /** @@ -615,37 +784,7 @@ rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n) static inline int rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n) { - uint32_t cons_head, prod_tail; - uint32_t cons_next, entries; - unsigned i; - uint32_t mask = r->prod.mask; - - cons_head = r->cons.head; - prod_tail = r->prod.tail; - /* The subtraction is done between two unsigned 32bits value - * (the result is always modulo 32 bits even if we have - * cons_head > prod_tail). So 'entries' is always between 0 - * and size(ring)-1. */ - entries = prod_tail - cons_head; - - /* check that we have enough entries in ring */ - if (unlikely(n > entries)) { - __RING_STAT_ADD(r, deq_fail, n); - return -ENOENT; - } - - cons_next = cons_head + n; - r->cons.head = cons_next; - - /* copy in table */ - rte_rmb(); - for (i = 0; likely(i < n); i++) { - obj_table[i] = r->ring[(cons_head + i) & mask]; - } - - __RING_STAT_ADD(r, deq_success, n); - r->cons.tail = cons_next; - return 0; + return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED); } /** @@ -822,6 +961,141 @@ void rte_ring_list_dump(void); */ struct rte_ring *rte_ring_lookup(const char *name); +/** + * Enqueue several objects on the ring (multi-producers safe). + * + * This function uses a "compare and set" instruction to move the + * producer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @return + * - n: Actual number of objects enqueued. + */ +static inline int +rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned n) +{ + return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE); +} + +/** + * Enqueue several objects on a ring (NOT multi-producers safe). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @return + * - n: Actual number of objects enqueued. + */ +static inline int +rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned n) +{ + return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE); +} + +/** + * Enqueue several objects on a ring. + * + * This function calls the multi-producer or the single-producer + * version depending on the default behavior that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects). + * @param n + * The number of objects to add in the ring from the obj_table. + * @return + * - n: Actual number of objects enqueued. + */ +static inline int +rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table, + unsigned n) +{ + if (r->prod.sp_enqueue) + return rte_ring_sp_enqueue_burst(r, obj_table, n); + else + return rte_ring_mp_enqueue_burst(r, obj_table, n); +} + +/** + * Dequeue several objects from a ring (multi-consumers safe). When the request + * objects are more than the available objects, only dequeue the actual number + * of objects + * + * This function uses a "compare and set" instruction to move the + * consumer index atomically. + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +static inline int +rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n) +{ + return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE); +} + +/** + * Dequeue several objects from a ring (NOT multi-consumers safe).When the + * request objects are more than the available objects, only dequeue the + * actual number of objects + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @return + * - n: Actual number of objects dequeued, 0 if ring is empty + */ +static inline int +rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n) +{ + return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE); +} + +/** + * Dequeue multiple objects from a ring up to a maximum number. + * + * This function calls the multi-consumers or the single-consumer + * version, depending on the default behaviour that was specified at + * ring creation time (see flags). + * + * @param r + * A pointer to the ring structure. + * @param obj_table + * A pointer to a table of void * pointers (objects) that will be filled. + * @param n + * The number of objects to dequeue from the ring to the obj_table. + * @return + * - Number of objects dequeued, or a negative error code on error + */ +static inline int +rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n) +{ + if (r->cons.sc_dequeue) + return rte_ring_sc_dequeue_burst(r, obj_table, n); + else + return rte_ring_mc_dequeue_burst(r, obj_table, n); +} + #ifdef __cplusplus } #endif -- 2.20.1