* watermark is exceeded
* - Check that dequeued pointers are correct
*
- * #. Check quota and watermark
+ * #. Check live watermark change
*
* - Start a loop on another lcore that will enqueue and dequeue
- * objects in a ring. It will monitor the value of quota (default
- * bulk count) and watermark.
- * - At the same time, change the quota and the watermark on the
- * master lcore.
- * - The slave lcore will check that bulk count changes from 4 to
- * 8, and watermark changes from 16 to 32.
+ * objects in a ring. It will monitor the value of watermark.
+ * - At the same time, change the watermark on the master lcore.
+ * - The slave lcore will check that watermark changes from 16 to 32.
*
* #. Performance tests.
*
static rte_atomic32_t synchro;
-static unsigned bulk_enqueue;
-static unsigned bulk_dequeue;
static struct rte_ring *r;
struct test_stats {
static struct test_stats test_stats[RTE_MAX_LCORE];
-#define DEFINE_ENQUEUE_FUNCTION(name, enq_code) \
-static int \
-name(__attribute__((unused)) void *arg) \
-{ \
- unsigned success = 0; \
- unsigned quota = 0; \
- unsigned fail = 0; \
- unsigned i; \
- unsigned long dummy_obj; \
- void *obj_table[MAX_BULK]; \
- int ret; \
- unsigned lcore_id = rte_lcore_id(); \
- uint64_t start_cycles, end_cycles; \
- uint64_t time_diff = 0, hz = rte_get_hpet_hz(); \
- \
- /* init dummy object table */ \
- for (i = 0; i< MAX_BULK; i++) { \
- dummy_obj = lcore_id + 0x1000 + i; \
- obj_table[i] = (void *)dummy_obj; \
- } \
- \
- /* wait synchro for slaves */ \
- if (lcore_id != rte_get_master_lcore()) \
- while (rte_atomic32_read(&synchro) == 0); \
- \
- start_cycles = rte_get_hpet_cycles(); \
- \
- /* enqueue as many object as possible */ \
- while (time_diff/hz < TIME_S) { \
- for (i = 0; likely(i < N); i++) { \
- ret = enq_code; \
- if (ret == 0) \
- success++; \
- else if (ret == -EDQUOT) \
- quota++; \
- else \
- fail++; \
- } \
- end_cycles = rte_get_hpet_cycles(); \
- time_diff = end_cycles - start_cycles; \
- } \
- \
- /* write statistics in a shared structure */ \
- test_stats[lcore_id].enq_success = success; \
- test_stats[lcore_id].enq_quota = quota; \
- test_stats[lcore_id].enq_fail = fail; \
- \
- return 0; \
+static int
+ring_enqueue_test(int (que_func)(struct rte_ring*, void * const *, unsigned),
+ void* arg, unsigned bulk_or_burst)
+{
+ unsigned success = 0;
+ unsigned quota = 0;
+ unsigned fail = 0;
+ unsigned i;
+ unsigned long dummy_obj;
+ void *obj_table[MAX_BULK];
+ int ret;
+ unsigned lcore_id = rte_lcore_id();
+ unsigned count = *((unsigned*)arg);
+ uint64_t start_cycles, end_cycles;
+ uint64_t time_diff = 0, hz = rte_get_hpet_hz();
+
+ /* init dummy object table */
+ for (i = 0; i< MAX_BULK; i++) {
+ dummy_obj = lcore_id + 0x1000 + i;
+ obj_table[i] = (void *)dummy_obj;
+ }
+
+ /* wait synchro for slaves */
+ if (lcore_id != rte_get_master_lcore())
+ while (rte_atomic32_read(&synchro) == 0);
+
+ start_cycles = rte_get_hpet_cycles();
+
+ /* enqueue as many object as possible */
+ while (time_diff/hz < TIME_S) {
+ for (i = 0; likely(i < N); i++) {
+ ret = que_func(r, obj_table, count);
+ /*
+ * bulk_or_burst
+ * 1: for bulk operation
+ * 0: for burst operation
+ */
+ if (bulk_or_burst) {
+ /* The *count* objects enqueued, unless fail */
+ if (ret == 0)
+ success += count;
+ else if (ret == -EDQUOT)
+ quota += count;
+ else
+ fail++;
+ } else {
+ /* The actual objects enqueued */
+ if (ret != 0)
+ success += (ret & RTE_RING_SZ_MASK);
+ else
+ fail++;
+ }
+ }
+ end_cycles = rte_get_hpet_cycles();
+ time_diff = end_cycles - start_cycles;
+ }
+
+ /* write statistics in a shared structure */
+ test_stats[lcore_id].enq_success = success;
+ test_stats[lcore_id].enq_quota = quota;
+ test_stats[lcore_id].enq_fail = fail;
+
+ return 0;
+}
+
+static int
+ring_dequeue_test(int (que_func)(struct rte_ring*, void **, unsigned),
+ void* arg, unsigned bulk_or_burst)
+{
+ unsigned success = 0;
+ unsigned fail = 0;
+ unsigned i;
+ void *obj_table[MAX_BULK];
+ int ret;
+ unsigned lcore_id = rte_lcore_id();
+ unsigned count = *((unsigned*)arg);
+ uint64_t start_cycles, end_cycles;
+ uint64_t time_diff = 0, hz = rte_get_hpet_hz();
+
+ /* wait synchro for slaves */
+ if (lcore_id != rte_get_master_lcore())
+ while (rte_atomic32_read(&synchro) == 0);
+
+ start_cycles = rte_get_hpet_cycles();
+
+ /* dequeue as many object as possible */
+ while (time_diff/hz < TIME_S) {
+ for (i = 0; likely(i < N); i++) {
+ ret = que_func(r, obj_table, count);
+ /*
+ * bulk_or_burst
+ * 1: for bulk operation
+ * 0: for burst operation
+ */
+ if (bulk_or_burst) {
+ if (ret == 0)
+ success += count;
+ else
+ fail++;
+ } else {
+ if (ret != 0)
+ success += ret;
+ else
+ fail++;
+ }
+ }
+ end_cycles = rte_get_hpet_cycles();
+ time_diff = end_cycles - start_cycles;
+ }
+
+ /* write statistics in a shared structure */
+ test_stats[lcore_id].deq_success = success;
+ test_stats[lcore_id].deq_fail = fail;
+
+ return 0;
+}
+
+static int
+test_ring_per_core_sp_enqueue(void *arg)
+{
+ return ring_enqueue_test(&rte_ring_sp_enqueue_bulk, arg, 1);
}
-#define DEFINE_DEQUEUE_FUNCTION(name, deq_code) \
-static int \
-name(__attribute__((unused)) void *arg) \
-{ \
- unsigned success = 0; \
- unsigned fail = 0; \
- unsigned i; \
- void *obj_table[MAX_BULK]; \
- int ret; \
- unsigned lcore_id = rte_lcore_id(); \
- uint64_t start_cycles, end_cycles; \
- uint64_t time_diff = 0, hz = rte_get_hpet_hz(); \
- \
- /* wait synchro for slaves */ \
- if (lcore_id != rte_get_master_lcore()) \
- while (rte_atomic32_read(&synchro) == 0); \
- \
- start_cycles = rte_get_hpet_cycles(); \
- \
- /* dequeue as many object as possible */ \
- while (time_diff/hz < TIME_S) { \
- for (i = 0; likely(i < N); i++) { \
- ret = deq_code; \
- if (ret == 0) \
- success++; \
- else \
- fail++; \
- } \
- end_cycles = rte_get_hpet_cycles(); \
- time_diff = end_cycles - start_cycles; \
- } \
- \
- /* write statistics in a shared structure */ \
- test_stats[lcore_id].deq_success = success; \
- test_stats[lcore_id].deq_fail = fail; \
- \
- return 0; \
+static int
+test_ring_per_core_mp_enqueue(void *arg)
+{
+ return ring_enqueue_test(&rte_ring_mp_enqueue_bulk, arg, 1);
+}
+
+static int
+test_ring_per_core_mc_dequeue(void *arg)
+{
+ return ring_dequeue_test(&rte_ring_mc_dequeue_bulk, arg, 1);
+}
+
+static int
+test_ring_per_core_sc_dequeue(void *arg)
+{
+ return ring_dequeue_test(&rte_ring_sc_dequeue_bulk, arg, 1);
}
-DEFINE_ENQUEUE_FUNCTION(test_ring_per_core_sp_enqueue,
- rte_ring_sp_enqueue_bulk(r, obj_table, bulk_enqueue))
+static int
+test_ring_per_core_sp_enqueue_burst(void *arg)
+{
+ return ring_enqueue_test(&rte_ring_sp_enqueue_burst, arg, 0);
+}
-DEFINE_DEQUEUE_FUNCTION(test_ring_per_core_sc_dequeue,
- rte_ring_sc_dequeue_bulk(r, obj_table, bulk_dequeue))
+static int
+test_ring_per_core_mp_enqueue_burst(void *arg)
+{
+ return ring_enqueue_test(&rte_ring_mp_enqueue_burst, arg, 0);
+}
-DEFINE_ENQUEUE_FUNCTION(test_ring_per_core_mp_enqueue,
- rte_ring_mp_enqueue_bulk(r, obj_table, bulk_enqueue))
+static int
+test_ring_per_core_mc_dequeue_burst(void *arg)
+{
+ return ring_dequeue_test(&rte_ring_mc_dequeue_burst, arg, 0);
+}
-DEFINE_DEQUEUE_FUNCTION(test_ring_per_core_mc_dequeue,
- rte_ring_mc_dequeue_bulk(r, obj_table, bulk_dequeue))
+static int
+test_ring_per_core_sc_dequeue_burst(void *arg)
+{
+ return ring_dequeue_test(&rte_ring_sc_dequeue_burst, arg, 0);
+}
#define TEST_RING_VERIFY(exp) \
if (!(exp)) { \
static int
-launch_cores(unsigned enq_core_count, unsigned deq_core_count, int sp, int sc)
+launch_cores(unsigned enq_core_count, unsigned deq_core_count,
+ unsigned n_enq_bulk, unsigned n_deq_bulk,
+ int sp, int sc, int bulk_not_burst)
{
void *obj;
unsigned lcore_id;
rte_atomic32_set(&synchro, 0);
printf("ring_autotest e/d_core=%u,%u e/d_bulk=%u,%u ",
- enq_core_count, deq_core_count, bulk_enqueue, bulk_dequeue);
+ enq_core_count, deq_core_count, n_enq_bulk, n_deq_bulk);
printf("sp=%d sc=%d ", sp, sc);
- /* set enqueue function to be used */
- if (sp)
- enq_f = test_ring_per_core_sp_enqueue;
- else
- enq_f = test_ring_per_core_mp_enqueue;
+ if (bulk_not_burst) {
+ /* set enqueue function to be used */
+ if (sp)
+ enq_f = test_ring_per_core_sp_enqueue;
+ else
+ enq_f = test_ring_per_core_mp_enqueue;
- /* set dequeue function to be used */
- if (sc)
- deq_f = test_ring_per_core_sc_dequeue;
- else
- deq_f = test_ring_per_core_mc_dequeue;
+ /* set dequeue function to be used */
+ if (sc)
+ deq_f = test_ring_per_core_sc_dequeue;
+ else
+ deq_f = test_ring_per_core_mc_dequeue;
+
+ } else {
+ /* set enqueue function to be used */
+ if (sp)
+ enq_f = test_ring_per_core_sp_enqueue_burst;
+ else
+ enq_f = test_ring_per_core_mp_enqueue_burst;
+
+ /* set dequeue function to be used */
+ if (sc)
+ deq_f = test_ring_per_core_sc_dequeue_burst;
+ else
+ deq_f = test_ring_per_core_mc_dequeue_burst;
+ }
RTE_LCORE_FOREACH_SLAVE(lcore_id) {
if (enq_core_count != 0) {
enq_core_count--;
- rte_eal_remote_launch(enq_f, NULL, lcore_id);
+ rte_eal_remote_launch(enq_f, &n_enq_bulk, lcore_id);
}
if (deq_core_count != 1) {
deq_core_count--;
- rte_eal_remote_launch(deq_f, NULL, lcore_id);
+ rte_eal_remote_launch(deq_f, &n_deq_bulk, lcore_id);
}
}
/* start synchro and launch test on master */
rte_atomic32_set(&synchro, 1);
- ret = deq_f(NULL);
+ ret = deq_f(&n_deq_bulk);
/* wait all cores */
RTE_LCORE_FOREACH_SLAVE(lcore_id) {
return -1;
}
- enq_total = (sum.enq_success * bulk_enqueue) +
- (sum.enq_quota * bulk_enqueue);
- deq_total = (sum.deq_success * bulk_dequeue) + deq_remain;
+ enq_total = sum.enq_success + sum.enq_quota;
+ deq_total = sum.deq_success + deq_remain;
rate = deq_total/TIME_S;
static int
do_one_ring_test2(unsigned enq_core_count, unsigned deq_core_count,
- unsigned n_enq_bulk, unsigned n_deq_bulk)
+ unsigned n_enq_bulk, unsigned n_deq_bulk, unsigned bulk_or_burst)
{
int sp, sc;
int do_sp, do_sc;
int ret;
- bulk_enqueue = n_enq_bulk;
- bulk_dequeue = n_deq_bulk;
-
do_sp = (enq_core_count == 1) ? 1 : 0;
do_sc = (deq_core_count == 1) ? 1 : 0;
for (sp = 0; sp <= do_sp; sp ++) {
for (sc = 0; sc <= do_sc; sc ++) {
- ret = launch_cores(enq_core_count,
- deq_core_count,
- sp, sc);
+ ret = launch_cores(enq_core_count, deq_core_count,
+ n_enq_bulk, n_deq_bulk, sp, sc, bulk_or_burst);
if (ret < 0)
return -1;
}
}
static int
-do_one_ring_test(unsigned enq_core_count, unsigned deq_core_count)
+do_one_ring_test(unsigned enq_core_count, unsigned deq_core_count,
+ unsigned bulk_or_burst)
{
unsigned bulk_enqueue_tab[] = { 1, 2, 4, 32, 0 };
unsigned bulk_dequeue_tab[] = { 1, 2, 4, 32, 0 };
ret = do_one_ring_test2(enq_core_count, deq_core_count,
*bulk_enqueue_ptr,
- *bulk_dequeue_ptr);
+ *bulk_dequeue_ptr,
+ bulk_or_burst);
if (ret < 0)
return -1;
}
}
static int
-check_quota_and_watermark(__attribute__((unused)) void *dummy)
+check_live_watermark_change(__attribute__((unused)) void *dummy)
{
uint64_t hz = rte_get_hpet_hz();
void *obj_table[MAX_BULK];
uint64_t cur_time, end_time;
int64_t diff = 0;
int i, ret;
- unsigned quota, quota_old = 4;
+ unsigned count = 4;
/* init the object table */
memset(obj_table, 0, sizeof(obj_table));
/* check that bulk and watermark are 4 and 32 (respectively) */
while (diff >= 0) {
- /* read quota, the only change allowed is from 4 to 8 */
- quota = rte_ring_get_bulk_count(r);
- if (quota != quota_old && (quota_old != 4 || quota != 8)) {
- printf("Bad quota change %u -> %u\n", quota_old,
- quota);
- return -1;
- }
- quota_old = quota;
-
/* add in ring until we reach watermark */
ret = 0;
for (i = 0; i < 16; i ++) {
if (ret != 0)
break;
- ret = rte_ring_enqueue_bulk(r, obj_table, quota);
+ ret = rte_ring_enqueue_bulk(r, obj_table, count);
}
if (ret != -EDQUOT) {
}
/* read watermark, the only change allowed is from 16 to 32 */
- watermark = i * quota;
+ watermark = r->prod.watermark;
if (watermark != watermark_old &&
(watermark_old != 16 || watermark != 32)) {
printf("Bad watermark change %u -> %u\n", watermark_old,
/* dequeue objects from ring */
while (i--) {
- ret = rte_ring_dequeue_bulk(r, obj_table, quota);
+ ret = rte_ring_dequeue_bulk(r, obj_table, count);
if (ret != 0) {
printf("Cannot dequeue (ret=%d)\n", ret);
return -1;
diff = end_time - cur_time;
}
- if (watermark_old != 32 || quota_old != 8) {
- printf("quota or watermark was not updated (q=%u wm=%u)\n",
- quota_old, watermark_old);
+ if (watermark_old != 32 ) {
+ printf(" watermark was not updated (wm=%u)\n",
+ watermark_old);
return -1;
}
}
static int
-test_quota_and_watermark(void)
+test_live_watermark_change(void)
{
unsigned lcore_id = rte_lcore_id();
unsigned lcore_id2 = rte_get_next_lcore(lcore_id, 0, 1);
- printf("Test quota and watermark live modification\n");
-
- rte_ring_set_bulk_count(r, 4);
+ printf("Test watermark live modification\n");
rte_ring_set_water_mark(r, 16);
/* launch a thread that will enqueue and dequeue, checking
* watermark and quota */
- rte_eal_remote_launch(check_quota_and_watermark, NULL, lcore_id2);
+ rte_eal_remote_launch(check_live_watermark_change, NULL, lcore_id2);
rte_delay_ms(1000);
- rte_ring_set_bulk_count(r, 8);
rte_ring_set_water_mark(r, 32);
rte_delay_ms(1000);
return 0;
}
+
/* Test for catch on invalid watermark values */
static int
test_set_watermark( void ){
{
void **src = NULL, **cur_src = NULL, **dst = NULL, **cur_dst = NULL;
int ret;
- unsigned i, n;
+ unsigned i, num_elems;
/* alloc dummy object pointers */
src = malloc(RING_SIZE*2*sizeof(void *));
cur_dst = dst;
printf("test watermark and default bulk enqueue / dequeue\n");
- rte_ring_set_bulk_count(r, 16);
rte_ring_set_water_mark(r, 20);
- n = rte_ring_get_bulk_count(r);
- if (n != 16) {
- printf("rte_ring_get_bulk_count() returned %u instead "
- "of 16\n", n);
- goto fail;
- }
+ num_elems = 16;
cur_src = src;
cur_dst = dst;
- ret = rte_ring_enqueue_bulk(r, cur_src, n);
- cur_src += 16;
+
+ ret = rte_ring_enqueue_bulk(r, cur_src, num_elems);
+ cur_src += num_elems;
if (ret != 0) {
printf("Cannot enqueue\n");
goto fail;
}
- ret = rte_ring_enqueue_bulk(r, cur_src, n);
- cur_src += 16;
+ ret = rte_ring_enqueue_bulk(r, cur_src, num_elems);
+ cur_src += num_elems;
if (ret != -EDQUOT) {
printf("Watermark not exceeded\n");
goto fail;
}
- ret = rte_ring_dequeue_bulk(r, cur_dst, n);
- cur_dst += 16;
+ ret = rte_ring_dequeue_bulk(r, cur_dst, num_elems);
+ cur_dst += num_elems;
if (ret != 0) {
printf("Cannot dequeue\n");
goto fail;
}
- ret = rte_ring_dequeue_bulk(r, cur_dst, n);
- cur_dst += 16;
+ ret = rte_ring_dequeue_bulk(r, cur_dst, num_elems);
+ cur_dst += num_elems;
if (ret != 0) {
printf("Cannot dequeue2\n");
goto fail;
printf("data after dequeue is not the same\n");
goto fail;
}
+
+ cur_src = src;
+ cur_dst = dst;
+
+ ret = rte_ring_mp_enqueue(r, cur_src);
+ cur_src += 1;
+ if (ret != 0)
+ goto fail;
+
+ ret = rte_ring_mc_dequeue(r, cur_dst);
+ cur_dst += 1;
+ if (ret != 0)
+ goto fail;
+
cur_src = src;
cur_dst = dst;
return -1;
}
-/*
- * it will always fail to create ring with a wrong ring size number in this function
- */
static int
-test_ring_creation_with_wrong_size(void)
+test_ring_burst_basic(void)
{
- struct rte_ring * rp = NULL;
+ void **src = NULL, **cur_src = NULL, **dst = NULL, **cur_dst = NULL;
+ int ret;
+ unsigned i;
- rp = rte_ring_create("test_bad_ring_size", RING_SIZE+1, SOCKET_ID_ANY, 0);
- if (NULL != rp) {
- return -1;
+ /* alloc dummy object pointers */
+ src = malloc(RING_SIZE*2*sizeof(void *));
+ if (src == NULL)
+ goto fail;
+
+ for (i = 0; i < RING_SIZE*2 ; i++) {
+ src[i] = (void *)(unsigned long)i;
}
+ cur_src = src;
- return 0;
-}
+ /* alloc some room for copied objects */
+ dst = malloc(RING_SIZE*2*sizeof(void *));
+ if (dst == NULL)
+ goto fail;
-/*
- * it tests if it would always fail to create ring with an used ring name
- */
-static int
-test_ring_creation_with_an_used_name(void)
-{
- struct rte_ring * rp;
+ memset(dst, 0, RING_SIZE*2*sizeof(void *));
+ cur_dst = dst;
- rp = rte_ring_create("test", RING_SIZE, SOCKET_ID_ANY, 0);
- if (NULL != rp)
- return -1;
+ printf("Test SP & SC basic functions \n");
+ printf("enqueue 1 obj\n");
+ ret = rte_ring_sp_enqueue_burst(r, cur_src, 1);
+ cur_src += 1;
+ if ((ret & RTE_RING_SZ_MASK) != 1)
+ goto fail;
- return 0;
-}
+ printf("enqueue 2 objs\n");
+ ret = rte_ring_sp_enqueue_burst(r, cur_src, 2);
+ cur_src += 2;
+ if ((ret & RTE_RING_SZ_MASK) != 2)
+ goto fail;
-/*
- * Test to if a non-power of 2 count causes the create
- * function to fail correctly
- */
-static int
-test_create_count_odd(void)
-{
- struct rte_ring *r = rte_ring_create("test_ring_count",
- 4097, SOCKET_ID_ANY, 0 );
- if(r != NULL){
- return -1;
- }
- return 0;
-}
+ printf("enqueue MAX_BULK objs\n");
+ ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK) ;
+ cur_src += MAX_BULK;
+ if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
+ goto fail;
-static int
-test_lookup_null(void)
-{
- struct rte_ring *rlp = rte_ring_lookup("ring_not_found");
- if (rlp ==NULL)
- if (rte_errno != ENOENT){
- printf( "test failed to returnn error on null pointer\n");
- return -1;
- }
- return 0;
-}
+ printf("dequeue 1 obj\n");
+ ret = rte_ring_sc_dequeue_burst(r, cur_dst, 1) ;
+ cur_dst += 1;
+ if ((ret & RTE_RING_SZ_MASK) != 1)
+ goto fail;
-/*
- * it tests some more basic ring operations
- */
-static int
-test_ring_basic_ex(void)
-{
- int ret = -1;
- unsigned i;
- struct rte_ring * rp;
- void **obj = NULL;
+ printf("dequeue 2 objs\n");
+ ret = rte_ring_sc_dequeue_burst(r, cur_dst, 2);
+ cur_dst += 2;
+ if ((ret & RTE_RING_SZ_MASK) != 2)
+ goto fail;
- obj = (void **)rte_zmalloc("test_ring_basic_ex_malloc", (RING_SIZE * sizeof(void *)), 0);
- if (obj == NULL) {
- printf("test_ring_basic_ex fail to rte_malloc\n");
- goto fail_test;
- }
+ printf("dequeue MAX_BULK objs\n");
+ ret = rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
+ cur_dst += MAX_BULK;
+ if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
+ goto fail;
- rp = rte_ring_create("test_ring_basic_ex", RING_SIZE, SOCKET_ID_ANY, 0);
- if (rp == NULL) {
- printf("test_ring_basic_ex fail to create ring\n");
- goto fail_test;
+ /* check data */
+ if (memcmp(src, dst, cur_dst - dst)) {
+ test_hexdump("src", src, cur_src - src);
+ test_hexdump("dst", dst, cur_dst - dst);
+ printf("data after dequeue is not the same\n");
+ goto fail;
}
- if (rte_ring_lookup("test_ring_basic_ex") != rp) {
- goto fail_test;
- }
+ cur_src = src;
+ cur_dst = dst;
- if (rte_ring_empty(rp) != 1) {
- printf("test_ring_basic_ex ring is not empty but it should be\n");
- goto fail_test;
+ printf("Test enqueue without enough memory space \n");
+ for (i = 0; i< (RING_SIZE/MAX_BULK - 1); i++) {
+ ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
+ cur_src += MAX_BULK;
+ if ((ret & RTE_RING_SZ_MASK) != MAX_BULK) {
+ goto fail;
+ }
}
- printf("%u ring entries are now free\n", rte_ring_free_count(rp));
+ printf("Enqueue 2 objects, free entries = MAX_BULK - 2 \n");
+ ret = rte_ring_sp_enqueue_burst(r, cur_src, 2);
+ cur_src += 2;
+ if ((ret & RTE_RING_SZ_MASK) != 2)
+ goto fail;
- for (i = 0; i < RING_SIZE; i ++) {
- rte_ring_enqueue(rp, obj[i]);
- }
+ printf("Enqueue the remaining entries = MAX_BULK - 2 \n");
+ /* Always one free entry left */
+ ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
+ cur_src += MAX_BULK - 3;
+ if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
+ goto fail;
- if (rte_ring_full(rp) != 1) {
- printf("test_ring_basic_ex ring is not full but it should be\n");
- goto fail_test;
- }
+ printf("Test if ring is full \n");
+ if (rte_ring_full(r) != 1)
+ goto fail;
- for (i = 0; i < RING_SIZE; i ++) {
- rte_ring_dequeue(rp, &obj[i]);
- }
+ printf("Test enqueue for a full entry \n");
+ ret = rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
+ if ((ret & RTE_RING_SZ_MASK) != 0)
+ goto fail;
- if (rte_ring_empty(rp) != 1) {
- printf("test_ring_basic_ex ring is not empty but it should be\n");
- goto fail_test;
+ printf("Test dequeue without enough objects \n");
+ for (i = 0; i<RING_SIZE/MAX_BULK - 1; i++) {
+ ret = rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
+ cur_dst += MAX_BULK;
+ if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
+ goto fail;
}
- ret = 0;
-fail_test:
- if (obj != NULL)
- rte_free(obj);
+ /* Available memory space for the exact MAX_BULK entries */
+ ret = rte_ring_sc_dequeue_burst(r, cur_dst, 2);
+ cur_dst += 2;
+ if ((ret & RTE_RING_SZ_MASK) != 2)
+ goto fail;
- return ret;
-}
+ ret = rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
+ cur_dst += MAX_BULK - 3;
+ if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
+ goto fail;
-int
-test_ring(void)
-{
- unsigned enq_core_count, deq_core_count;
+ printf("Test if ring is empty \n");
+ /* Check if ring is empty */
+ if (1 != rte_ring_empty(r))
+ goto fail;
- /* some more basic operations */
- if (test_ring_basic_ex() < 0)
- return -1;
+ /* check data */
+ if (memcmp(src, dst, cur_dst - dst)) {
+ test_hexdump("src", src, cur_src - src);
+ test_hexdump("dst", dst, cur_dst - dst);
+ printf("data after dequeue is not the same\n");
+ goto fail;
+ }
- rte_atomic32_init(&synchro);
+ cur_src = src;
+ cur_dst = dst;
- if (r == NULL)
- r = rte_ring_create("test", RING_SIZE, SOCKET_ID_ANY, 0);
- if (r == NULL)
- return -1;
+ printf("Test MP & MC basic functions \n");
- /* retrieve the ring from its name */
- if (rte_ring_lookup("test") != r) {
- printf("Cannot lookup ring from its name\n");
- return -1;
+ printf("enqueue 1 obj\n");
+ ret = rte_ring_mp_enqueue_burst(r, cur_src, 1);
+ cur_src += 1;
+ if ((ret & RTE_RING_SZ_MASK) != 1)
+ goto fail;
+
+ printf("enqueue 2 objs\n");
+ ret = rte_ring_mp_enqueue_burst(r, cur_src, 2);
+ cur_src += 2;
+ if ((ret & RTE_RING_SZ_MASK) != 2)
+ goto fail;
+
+ printf("enqueue MAX_BULK objs\n");
+ ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
+ cur_src += MAX_BULK;
+ if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
+ goto fail;
+
+ printf("dequeue 1 obj\n");
+ ret = rte_ring_mc_dequeue_burst(r, cur_dst, 1);
+ cur_dst += 1;
+ if ((ret & RTE_RING_SZ_MASK) != 1)
+ goto fail;
+
+ printf("dequeue 2 objs\n");
+ ret = rte_ring_mc_dequeue_burst(r, cur_dst, 2);
+ cur_dst += 2;
+ if ((ret & RTE_RING_SZ_MASK) != 2)
+ goto fail;
+
+ printf("dequeue MAX_BULK objs\n");
+ ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK);
+ cur_dst += MAX_BULK;
+ if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
+ goto fail;
+
+ /* check data */
+ if (memcmp(src, dst, cur_dst - dst)) {
+ test_hexdump("src", src, cur_src - src);
+ test_hexdump("dst", dst, cur_dst - dst);
+ printf("data after dequeue is not the same\n");
+ goto fail;
+ }
+
+ cur_src = src;
+ cur_dst = dst;
+
+ printf("fill and empty the ring\n");
+ for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
+ ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
+ cur_src += MAX_BULK;
+ if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
+ goto fail;
+ ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK);
+ cur_dst += MAX_BULK;
+ if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
+ goto fail;
+ }
+
+ /* check data */
+ if (memcmp(src, dst, cur_dst - dst)) {
+ test_hexdump("src", src, cur_src - src);
+ test_hexdump("dst", dst, cur_dst - dst);
+ printf("data after dequeue is not the same\n");
+ goto fail;
+ }
+
+ cur_src = src;
+ cur_dst = dst;
+
+ printf("Test enqueue without enough memory space \n");
+ for (i = 0; i<RING_SIZE/MAX_BULK - 1; i++) {
+ ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
+ cur_src += MAX_BULK;
+ if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
+ goto fail;
+ }
+
+ /* Available memory space for the exact MAX_BULK objects */
+ ret = rte_ring_mp_enqueue_burst(r, cur_src, 2);
+ cur_src += 2;
+ if ((ret & RTE_RING_SZ_MASK) != 2)
+ goto fail;
+
+ ret = rte_ring_mp_enqueue_burst(r, cur_src, MAX_BULK);
+ cur_src += MAX_BULK - 3;
+ if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
+ goto fail;
+
+
+ printf("Test dequeue without enough objects \n");
+ for (i = 0; i<RING_SIZE/MAX_BULK - 1; i++) {
+ ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK);
+ cur_dst += MAX_BULK;
+ if ((ret & RTE_RING_SZ_MASK) != MAX_BULK)
+ goto fail;
+ }
+
+ /* Available objects - the exact MAX_BULK */
+ ret = rte_ring_mc_dequeue_burst(r, cur_dst, 2);
+ cur_dst += 2;
+ if ((ret & RTE_RING_SZ_MASK) != 2)
+ goto fail;
+
+ ret = rte_ring_mc_dequeue_burst(r, cur_dst, MAX_BULK);
+ cur_dst += MAX_BULK - 3;
+ if ((ret & RTE_RING_SZ_MASK) != MAX_BULK - 3)
+ goto fail;
+
+ /* check data */
+ if (memcmp(src, dst, cur_dst - dst)) {
+ test_hexdump("src", src, cur_src - src);
+ test_hexdump("dst", dst, cur_dst - dst);
+ printf("data after dequeue is not the same\n");
+ goto fail;
+ }
+
+ cur_src = src;
+ cur_dst = dst;
+
+ printf("Covering rte_ring_enqueue_burst functions \n");
+
+ ret = rte_ring_enqueue_burst(r, cur_src, 2);
+ cur_src += 2;
+ if ((ret & RTE_RING_SZ_MASK) != 2)
+ goto fail;
+
+ ret = rte_ring_dequeue_burst(r, cur_dst, 2);
+ cur_dst += 2;
+ if (ret != 2)
+ goto fail;
+
+ /* Free memory before test completed */
+ if (src)
+ free(src);
+ if (dst)
+ free(dst);
+ return 0;
+
+ fail:
+ if (src)
+ free(src);
+ if (dst)
+ free(dst);
+ return -1;
+}
+
+static int
+test_ring_stats(void)
+{
+
+#ifndef RTE_LIBRTE_RING_DEBUG
+ printf("Enable RTE_LIBRTE_RING_DEBUG to test ring stats.\n");
+ return 0;
+#else
+ void **src = NULL, **cur_src = NULL, **dst = NULL, **cur_dst = NULL;
+ int ret;
+ unsigned i;
+ unsigned num_items = 0;
+ unsigned failed_enqueue_ops = 0;
+ unsigned failed_enqueue_items = 0;
+ unsigned failed_dequeue_ops = 0;
+ unsigned failed_dequeue_items = 0;
+ unsigned last_enqueue_ops = 0;
+ unsigned last_enqueue_items = 0;
+ unsigned last_quota_ops = 0;
+ unsigned last_quota_items = 0;
+ unsigned lcore_id = rte_lcore_id();
+ struct rte_ring_debug_stats *ring_stats = &r->stats[lcore_id];
+
+ printf("Test the ring stats.\n");
+
+ /* Reset the watermark in case it was set in another test. */
+ rte_ring_set_water_mark(r, 0);
+
+ /* Reset the ring stats. */
+ memset(&r->stats[lcore_id], 0, sizeof(r->stats[lcore_id]));
+
+ /* Allocate some dummy object pointers. */
+ src = malloc(RING_SIZE*2*sizeof(void *));
+ if (src == NULL)
+ goto fail;
+
+ for (i = 0; i < RING_SIZE*2 ; i++) {
+ src[i] = (void *)(unsigned long)i;
+ }
+
+ /* Allocate some memory for copied objects. */
+ dst = malloc(RING_SIZE*2*sizeof(void *));
+ if (dst == NULL)
+ goto fail;
+
+ memset(dst, 0, RING_SIZE*2*sizeof(void *));
+
+ /* Set the head and tail pointers. */
+ cur_src = src;
+ cur_dst = dst;
+
+ /* Do Enqueue tests. */
+ printf("Test the dequeue stats.\n");
+
+ /* Fill the ring up to RING_SIZE -1. */
+ printf("Fill the ring.\n");
+ for (i = 0; i< (RING_SIZE/MAX_BULK); i++) {
+ rte_ring_sp_enqueue_burst(r, cur_src, MAX_BULK);
+ cur_src += MAX_BULK;
+ }
+
+ /* Adjust for final enqueue = MAX_BULK -1. */
+ cur_src--;
+
+ printf("Verify that the ring is full.\n");
+ if (rte_ring_full(r) != 1)
+ goto fail;
+
+
+ printf("Verify the enqueue success stats.\n");
+ /* Stats should match above enqueue operations to fill the ring. */
+ if (ring_stats->enq_success_bulk != (RING_SIZE/MAX_BULK))
+ goto fail;
+
+ /* Current max objects is RING_SIZE -1. */
+ if (ring_stats->enq_success_objs != RING_SIZE -1)
+ goto fail;
+
+ /* Shouldn't have any failures yet. */
+ if (ring_stats->enq_fail_bulk != 0)
+ goto fail;
+ if (ring_stats->enq_fail_objs != 0)
+ goto fail;
+
+
+ printf("Test stats for SP burst enqueue to a full ring.\n");
+ num_items = 2;
+ ret = rte_ring_sp_enqueue_burst(r, cur_src, num_items);
+ if ((ret & RTE_RING_SZ_MASK) != 0)
+ goto fail;
+
+ failed_enqueue_ops += 1;
+ failed_enqueue_items += num_items;
+
+ /* The enqueue should have failed. */
+ if (ring_stats->enq_fail_bulk != failed_enqueue_ops)
+ goto fail;
+ if (ring_stats->enq_fail_objs != failed_enqueue_items)
+ goto fail;
+
+
+ printf("Test stats for SP bulk enqueue to a full ring.\n");
+ num_items = 4;
+ ret = rte_ring_sp_enqueue_bulk(r, cur_src, num_items);
+ if (ret != -ENOBUFS)
+ goto fail;
+
+ failed_enqueue_ops += 1;
+ failed_enqueue_items += num_items;
+
+ /* The enqueue should have failed. */
+ if (ring_stats->enq_fail_bulk != failed_enqueue_ops)
+ goto fail;
+ if (ring_stats->enq_fail_objs != failed_enqueue_items)
+ goto fail;
+
+
+ printf("Test stats for MP burst enqueue to a full ring.\n");
+ num_items = 8;
+ ret = rte_ring_mp_enqueue_burst(r, cur_src, num_items);
+ if ((ret & RTE_RING_SZ_MASK) != 0)
+ goto fail;
+
+ failed_enqueue_ops += 1;
+ failed_enqueue_items += num_items;
+
+ /* The enqueue should have failed. */
+ if (ring_stats->enq_fail_bulk != failed_enqueue_ops)
+ goto fail;
+ if (ring_stats->enq_fail_objs != failed_enqueue_items)
+ goto fail;
+
+
+ printf("Test stats for MP bulk enqueue to a full ring.\n");
+ num_items = 16;
+ ret = rte_ring_mp_enqueue_bulk(r, cur_src, num_items);
+ if (ret != -ENOBUFS)
+ goto fail;
+
+ failed_enqueue_ops += 1;
+ failed_enqueue_items += num_items;
+
+ /* The enqueue should have failed. */
+ if (ring_stats->enq_fail_bulk != failed_enqueue_ops)
+ goto fail;
+ if (ring_stats->enq_fail_objs != failed_enqueue_items)
+ goto fail;
+
+
+ /* Do Dequeue tests. */
+ printf("Test the dequeue stats.\n");
+
+ printf("Empty the ring.\n");
+ for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
+ rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
+ cur_dst += MAX_BULK;
+ }
+
+ /* There was only RING_SIZE -1 objects to dequeue. */
+ cur_dst++;
+
+ printf("Verify ring is empty.\n");
+ if (1 != rte_ring_empty(r))
+ goto fail;
+
+ printf("Verify the dequeue success stats.\n");
+ /* Stats should match above dequeue operations. */
+ if (ring_stats->deq_success_bulk != (RING_SIZE/MAX_BULK))
+ goto fail;
+
+ /* Objects dequeued is RING_SIZE -1. */
+ if (ring_stats->deq_success_objs != RING_SIZE -1)
+ goto fail;
+
+ /* Shouldn't have any dequeue failure stats yet. */
+ if (ring_stats->deq_fail_bulk != 0)
+ goto fail;
+
+ printf("Test stats for SC burst dequeue with an empty ring.\n");
+ num_items = 2;
+ ret = rte_ring_sc_dequeue_burst(r, cur_dst, num_items);
+ if ((ret & RTE_RING_SZ_MASK) != 0)
+ goto fail;
+
+ failed_dequeue_ops += 1;
+ failed_dequeue_items += num_items;
+
+ /* The dequeue should have failed. */
+ if (ring_stats->deq_fail_bulk != failed_dequeue_ops)
+ goto fail;
+ if (ring_stats->deq_fail_objs != failed_dequeue_items)
+ goto fail;
+
+
+ printf("Test stats for SC bulk dequeue with an empty ring.\n");
+ num_items = 4;
+ ret = rte_ring_sc_dequeue_bulk(r, cur_dst, num_items);
+ if (ret != -ENOENT)
+ goto fail;
+
+ failed_dequeue_ops += 1;
+ failed_dequeue_items += num_items;
+
+ /* The dequeue should have failed. */
+ if (ring_stats->deq_fail_bulk != failed_dequeue_ops)
+ goto fail;
+ if (ring_stats->deq_fail_objs != failed_dequeue_items)
+ goto fail;
+
+
+ printf("Test stats for MC burst dequeue with an empty ring.\n");
+ num_items = 8;
+ ret = rte_ring_mc_dequeue_burst(r, cur_dst, num_items);
+ if ((ret & RTE_RING_SZ_MASK) != 0)
+ goto fail;
+ failed_dequeue_ops += 1;
+ failed_dequeue_items += num_items;
+
+ /* The dequeue should have failed. */
+ if (ring_stats->deq_fail_bulk != failed_dequeue_ops)
+ goto fail;
+ if (ring_stats->deq_fail_objs != failed_dequeue_items)
+ goto fail;
+
+
+ printf("Test stats for MC bulk dequeue with an empty ring.\n");
+ num_items = 16;
+ ret = rte_ring_mc_dequeue_bulk(r, cur_dst, num_items);
+ if (ret != -ENOENT)
+ goto fail;
+
+ failed_dequeue_ops += 1;
+ failed_dequeue_items += num_items;
+
+ /* The dequeue should have failed. */
+ if (ring_stats->deq_fail_bulk != failed_dequeue_ops)
+ goto fail;
+ if (ring_stats->deq_fail_objs != failed_dequeue_items)
+ goto fail;
+
+
+ printf("Test total enqueue/dequeue stats.\n");
+ /* At this point the enqueue and dequeue stats should be the same. */
+ if (ring_stats->enq_success_bulk != ring_stats->deq_success_bulk)
+ goto fail;
+ if (ring_stats->enq_success_objs != ring_stats->deq_success_objs)
+ goto fail;
+ if (ring_stats->enq_fail_bulk != ring_stats->deq_fail_bulk)
+ goto fail;
+ if (ring_stats->enq_fail_objs != ring_stats->deq_fail_objs)
+ goto fail;
+
+
+ /* Watermark Tests. */
+ printf("Test the watermark/quota stats.\n");
+
+ printf("Verify the initial watermark stats.\n");
+ /* Watermark stats should be 0 since there is no watermark. */
+ if (ring_stats->enq_quota_bulk != 0)
+ goto fail;
+ if (ring_stats->enq_quota_objs != 0)
+ goto fail;
+
+ /* Set a watermark. */
+ rte_ring_set_water_mark(r, 16);
+
+ /* Reset pointers. */
+ cur_src = src;
+ cur_dst = dst;
+
+ last_enqueue_ops = ring_stats->enq_success_bulk;
+ last_enqueue_items = ring_stats->enq_success_objs;
+
+
+ printf("Test stats for SP burst enqueue below watermark.\n");
+ num_items = 8;
+ ret = rte_ring_sp_enqueue_burst(r, cur_src, num_items);
+ if ((ret & RTE_RING_SZ_MASK) != num_items)
+ goto fail;
+
+ /* Watermark stats should still be 0. */
+ if (ring_stats->enq_quota_bulk != 0)
+ goto fail;
+ if (ring_stats->enq_quota_objs != 0)
+ goto fail;
+
+ /* Success stats should have increased. */
+ if (ring_stats->enq_success_bulk != last_enqueue_ops + 1)
+ goto fail;
+ if (ring_stats->enq_success_objs != last_enqueue_items + num_items)
+ goto fail;
+
+ last_enqueue_ops = ring_stats->enq_success_bulk;
+ last_enqueue_items = ring_stats->enq_success_objs;
+
+
+ printf("Test stats for SP burst enqueue at watermark.\n");
+ num_items = 8;
+ ret = rte_ring_sp_enqueue_burst(r, cur_src, num_items);
+ if ((ret & RTE_RING_SZ_MASK) != num_items)
+ goto fail;
+
+ /* Watermark stats should have changed. */
+ if (ring_stats->enq_quota_bulk != 1)
+ goto fail;
+ if (ring_stats->enq_quota_objs != num_items)
+ goto fail;
+
+ last_quota_ops = ring_stats->enq_quota_bulk;
+ last_quota_items = ring_stats->enq_quota_objs;
+
+
+ printf("Test stats for SP burst enqueue above watermark.\n");
+ num_items = 1;
+ ret = rte_ring_sp_enqueue_burst(r, cur_src, num_items);
+ if ((ret & RTE_RING_SZ_MASK) != num_items)
+ goto fail;
+
+ /* Watermark stats should have changed. */
+ if (ring_stats->enq_quota_bulk != last_quota_ops +1)
+ goto fail;
+ if (ring_stats->enq_quota_objs != last_quota_items + num_items)
+ goto fail;
+
+ last_quota_ops = ring_stats->enq_quota_bulk;
+ last_quota_items = ring_stats->enq_quota_objs;
+
+
+ printf("Test stats for MP burst enqueue above watermark.\n");
+ num_items = 2;
+ ret = rte_ring_mp_enqueue_burst(r, cur_src, num_items);
+ if ((ret & RTE_RING_SZ_MASK) != num_items)
+ goto fail;
+
+ /* Watermark stats should have changed. */
+ if (ring_stats->enq_quota_bulk != last_quota_ops +1)
+ goto fail;
+ if (ring_stats->enq_quota_objs != last_quota_items + num_items)
+ goto fail;
+
+ last_quota_ops = ring_stats->enq_quota_bulk;
+ last_quota_items = ring_stats->enq_quota_objs;
+
+
+ printf("Test stats for SP bulk enqueue above watermark.\n");
+ num_items = 4;
+ ret = rte_ring_sp_enqueue_bulk(r, cur_src, num_items);
+ if (ret != -EDQUOT)
+ goto fail;
+
+ /* Watermark stats should have changed. */
+ if (ring_stats->enq_quota_bulk != last_quota_ops +1)
+ goto fail;
+ if (ring_stats->enq_quota_objs != last_quota_items + num_items)
+ goto fail;
+
+ last_quota_ops = ring_stats->enq_quota_bulk;
+ last_quota_items = ring_stats->enq_quota_objs;
+
+
+ printf("Test stats for MP bulk enqueue above watermark.\n");
+ num_items = 8;
+ ret = rte_ring_mp_enqueue_bulk(r, cur_src, num_items);
+ if (ret != -EDQUOT)
+ goto fail;
+
+ /* Watermark stats should have changed. */
+ if (ring_stats->enq_quota_bulk != last_quota_ops +1)
+ goto fail;
+ if (ring_stats->enq_quota_objs != last_quota_items + num_items)
+ goto fail;
+
+ printf("Test watermark success stats.\n");
+ /* Success stats should be same as last non-watermarked enqueue. */
+ if (ring_stats->enq_success_bulk != last_enqueue_ops)
+ goto fail;
+ if (ring_stats->enq_success_objs != last_enqueue_items)
+ goto fail;
+
+
+ /* Cleanup. */
+
+ /* Empty the ring. */
+ for (i = 0; i<RING_SIZE/MAX_BULK; i++) {
+ rte_ring_sc_dequeue_burst(r, cur_dst, MAX_BULK);
+ cur_dst += MAX_BULK;
+ }
+
+ /* Reset the watermark. */
+ rte_ring_set_water_mark(r, 0);
+
+ /* Reset the ring stats. */
+ memset(&r->stats[lcore_id], 0, sizeof(r->stats[lcore_id]));
+
+ /* Free memory before test completed */
+ if (src)
+ free(src);
+ if (dst)
+ free(dst);
+ return 0;
+
+fail:
+ if (src)
+ free(src);
+ if (dst)
+ free(dst);
+ return -1;
+#endif
+}
+
+/*
+ * it will always fail to create ring with a wrong ring size number in this function
+ */
+static int
+test_ring_creation_with_wrong_size(void)
+{
+ struct rte_ring * rp = NULL;
+
+ /* Test if ring size is not power of 2 */
+ rp = rte_ring_create("test_bad_ring_size", RING_SIZE + 1, SOCKET_ID_ANY, 0);
+ if (NULL != rp) {
+ return -1;
+ }
+
+ /* Test if ring size is exceeding the limit */
+ rp = rte_ring_create("test_bad_ring_size", (RTE_RING_SZ_MASK + 1), SOCKET_ID_ANY, 0);
+ if (NULL != rp) {
+ return -1;
+ }
+ return 0;
+}
+
+/*
+ * it tests if it would always fail to create ring with an used ring name
+ */
+static int
+test_ring_creation_with_an_used_name(void)
+{
+ struct rte_ring * rp;
+
+ rp = rte_ring_create("test", RING_SIZE, SOCKET_ID_ANY, 0);
+ if (NULL != rp)
+ return -1;
+
+ return 0;
+}
+
+/*
+ * Test to if a non-power of 2 count causes the create
+ * function to fail correctly
+ */
+static int
+test_create_count_odd(void)
+{
+ struct rte_ring *r = rte_ring_create("test_ring_count",
+ 4097, SOCKET_ID_ANY, 0 );
+ if(r != NULL){
+ return -1;
+ }
+ return 0;
+}
+
+static int
+test_lookup_null(void)
+{
+ struct rte_ring *rlp = rte_ring_lookup("ring_not_found");
+ if (rlp ==NULL)
+ if (rte_errno != ENOENT){
+ printf( "test failed to returnn error on null pointer\n");
+ return -1;
+ }
+ return 0;
+}
+
+/*
+ * it tests some more basic ring operations
+ */
+static int
+test_ring_basic_ex(void)
+{
+ int ret = -1;
+ unsigned i;
+ struct rte_ring * rp;
+ void **obj = NULL;
+
+ obj = (void **)rte_zmalloc("test_ring_basic_ex_malloc", (RING_SIZE * sizeof(void *)), 0);
+ if (obj == NULL) {
+ printf("test_ring_basic_ex fail to rte_malloc\n");
+ goto fail_test;
+ }
+
+ rp = rte_ring_create("test_ring_basic_ex", RING_SIZE, SOCKET_ID_ANY,
+ RING_F_SP_ENQ | RING_F_SC_DEQ);
+ if (rp == NULL) {
+ printf("test_ring_basic_ex fail to create ring\n");
+ goto fail_test;
+ }
+
+ if (rte_ring_lookup("test_ring_basic_ex") != rp) {
+ goto fail_test;
+ }
+
+ if (rte_ring_empty(rp) != 1) {
+ printf("test_ring_basic_ex ring is not empty but it should be\n");
+ goto fail_test;
+ }
+
+ printf("%u ring entries are now free\n", rte_ring_free_count(rp));
+
+ for (i = 0; i < RING_SIZE; i ++) {
+ rte_ring_enqueue(rp, obj[i]);
+ }
+
+ if (rte_ring_full(rp) != 1) {
+ printf("test_ring_basic_ex ring is not full but it should be\n");
+ goto fail_test;
+ }
+
+ for (i = 0; i < RING_SIZE; i ++) {
+ rte_ring_dequeue(rp, &obj[i]);
+ }
+
+ if (rte_ring_empty(rp) != 1) {
+ printf("test_ring_basic_ex ring is not empty but it should be\n");
+ goto fail_test;
+ }
+
+ /* Covering the ring burst operation */
+ ret = rte_ring_enqueue_burst(rp, obj, 2);
+ if ((ret & RTE_RING_SZ_MASK) != 2) {
+ printf("test_ring_basic_ex: rte_ring_enqueue_burst fails \n");
+ goto fail_test;
+ }
+
+ ret = rte_ring_dequeue_burst(rp, obj, 2);
+ if (ret != 2) {
+ printf("test_ring_basic_ex: rte_ring_dequeue_burst fails \n");
+ goto fail_test;
}
+ ret = 0;
+fail_test:
+ if (obj != NULL)
+ rte_free(obj);
+
+ return ret;
+}
+
+int
+test_ring(void)
+{
+ unsigned enq_core_count, deq_core_count;
+
+ /* some more basic operations */
+ if (test_ring_basic_ex() < 0)
+ return -1;
+
+ rte_atomic32_init(&synchro);
+
+ if (r == NULL)
+ r = rte_ring_create("test", RING_SIZE, SOCKET_ID_ANY, 0);
+ if (r == NULL)
+ return -1;
+
+ /* retrieve the ring from its name */
+ if (rte_ring_lookup("test") != r) {
+ printf("Cannot lookup ring from its name\n");
+ return -1;
+ }
+
+ /* burst operations */
+ if (test_ring_burst_basic() < 0)
+ return -1;
+
/* basic operations */
if (test_ring_basic() < 0)
return -1;
+ /* ring stats */
+ if (test_ring_stats() < 0)
+ return -1;
+
/* basic operations */
- if (test_quota_and_watermark() < 0)
+ if (test_live_watermark_change() < 0)
return -1;
if ( test_set_watermark() < 0){
else
printf ( "Test detected NULL ring lookup \n");
+ printf("start performance tests \n");
+
+ /* one lcore for enqueue, one for dequeue */
+ enq_core_count = 1;
+ deq_core_count = 1;
+ if (do_one_ring_test(enq_core_count, deq_core_count, 1) < 0)
+ return -1;
+
+ /* max cores for enqueue, one for dequeue */
+ enq_core_count = rte_lcore_count() - 1;
+ deq_core_count = 1;
+ if (do_one_ring_test(enq_core_count, deq_core_count, 1) < 0)
+ return -1;
+
+ /* max cores for dequeue, one for enqueue */
+ enq_core_count = 1;
+ deq_core_count = rte_lcore_count() - 1;
+ if (do_one_ring_test(enq_core_count, deq_core_count, 1) < 0)
+ return -1;
+
+ /* half for enqueue and half for dequeue */
+ enq_core_count = rte_lcore_count() / 2;
+ deq_core_count = rte_lcore_count() / 2;
+ if (do_one_ring_test(enq_core_count, deq_core_count, 1) < 0)
+ return -1;
- printf("start performance tests\n");
+ printf("start performance tests - burst operations \n");
/* one lcore for enqueue, one for dequeue */
enq_core_count = 1;
deq_core_count = 1;
- if (do_one_ring_test(enq_core_count, deq_core_count) < 0)
+ if (do_one_ring_test(enq_core_count, deq_core_count, 0) < 0)
return -1;
/* max cores for enqueue, one for dequeue */
enq_core_count = rte_lcore_count() - 1;
deq_core_count = 1;
- if (do_one_ring_test(enq_core_count, deq_core_count) < 0)
+ if (do_one_ring_test(enq_core_count, deq_core_count, 0) < 0)
return -1;
/* max cores for dequeue, one for enqueue */
enq_core_count = 1;
deq_core_count = rte_lcore_count() - 1;
- if (do_one_ring_test(enq_core_count, deq_core_count) < 0)
+ if (do_one_ring_test(enq_core_count, deq_core_count, 0) < 0)
return -1;
/* half for enqueue and half for dequeue */
enq_core_count = rte_lcore_count() / 2;
deq_core_count = rte_lcore_count() / 2;
- if (do_one_ring_test(enq_core_count, deq_core_count) < 0)
+ if (do_one_ring_test(enq_core_count, deq_core_count, 0) < 0)
return -1;
/* test of creating ring with wrong size */
#include <rte_atomic.h>
#include <rte_branch_prediction.h>
+enum rte_ring_queue_behavior {
+ RTE_RING_QUEUE_FIXED = 0, /* Enq/Deq a fixed number of items from a ring */
+ RTE_RING_QUEUE_VARIABLE /* Enq/Deq as many items a possible from ring */
+};
#ifdef RTE_LIBRTE_RING_DEBUG
/**
/** Ring producer status. */
struct prod {
- volatile uint32_t bulk_default; /**< Default bulk count. */
uint32_t watermark; /**< Maximum items before EDQUOT. */
uint32_t sp_enqueue; /**< True, if single producer. */
uint32_t size; /**< Size of ring. */
/** Ring consumer status. */
struct cons {
- volatile uint32_t bulk_default; /**< Default bulk count. */
uint32_t sc_dequeue; /**< True, if single consumer. */
uint32_t size; /**< Size of the ring. */
uint32_t mask; /**< Mask (size-1) of ring. */
#define RING_F_SP_ENQ 0x0001 /**< The default enqueue is "single-producer". */
#define RING_F_SC_DEQ 0x0002 /**< The default dequeue is "single-consumer". */
+#define RTE_RING_QUOT_EXCEED (1 << 31) /**< Quota exceed for burst ops */
+#define RTE_RING_SZ_MASK (unsigned)(0x0fffffff) /**< Ring size mask */
/**
- * When debug is enabled, store ring statistics.
+ * @internal When debug is enabled, store ring statistics.
* @param r
* A pointer to the ring.
* @param name
*
* This function uses ``memzone_reserve()`` to allocate memory. Its size is
* set to *count*, which must be a power of two. Water marking is
- * disabled by default. The default bulk count is initialized to 1.
+ * disabled by default.
* Note that the real usable ring size is *count-1* instead of
* *count*.
*
struct rte_ring *rte_ring_create(const char *name, unsigned count,
int socket_id, unsigned flags);
-/**
- * Set the default bulk count for enqueue/dequeue.
- *
- * The parameter *count* is the default number of bulk elements to
- * get/put when using ``rte_ring_*_{en,de}queue_bulk()``. It must be
- * greater than 0 and less than half of the ring size.
- *
- * @param r
- * A pointer to the ring structure.
- * @param count
- * A new water mark value.
- * @return
- * - 0: Success; default_bulk_count changed.
- * - -EINVAL: Invalid count value.
- */
-static inline int
-rte_ring_set_bulk_count(struct rte_ring *r, unsigned count)
-{
- if (unlikely(count == 0 || count >= r->prod.size))
- return -EINVAL;
-
- r->prod.bulk_default = r->cons.bulk_default = count;
- return 0;
-}
-
-/**
- * Get the default bulk count for enqueue/dequeue.
- *
- * @param r
- * A pointer to the ring structure.
- * @return
- * The default bulk count for enqueue/dequeue.
- */
-static inline unsigned
-rte_ring_get_bulk_count(struct rte_ring *r)
-{
- return r->prod.bulk_default;
-}
-
/**
* Change the high water mark.
*
* *count* value. The *count* value must be greater than 0 and less
* than the ring size.
*
- * This function can be called at any time (not necessarilly at
+ * This function can be called at any time (not necessarily at
* initialization).
*
* @param r
void rte_ring_dump(const struct rte_ring *r);
/**
- * Enqueue several objects on the ring (multi-producers safe).
+ * @internal Enqueue several objects on the ring (multi-producers safe).
*
* This function uses a "compare and set" instruction to move the
* producer index atomically.
* @param obj_table
* A pointer to a table of void * pointers (objects).
* @param n
- * The number of objects to add in the ring from the obj_table. The
- * value must be strictly positive.
+ * The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
+ * RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring
* @return
+ * Depend on the behavior value
+ * if behavior = RTE_RING_QUEUE_FIXED
* - 0: Success; objects enqueue.
* - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
* high water mark is exceeded.
* - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
+ * if behavior = RTE_RING_QUEUE_VARIABLE
+ * - n: Actual number of objects enqueued.
*/
static inline int
-rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
- unsigned n)
+__rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
+ unsigned n, enum rte_ring_queue_behavior behavior)
{
uint32_t prod_head, prod_next;
uint32_t cons_tail, free_entries;
+ const unsigned max = n;
int success;
unsigned i;
uint32_t mask = r->prod.mask;
/* move prod.head atomically */
do {
+ /* Reset n to the initial burst count */
+ n = max;
+
prod_head = r->prod.head;
cons_tail = r->cons.tail;
/* The subtraction is done between two unsigned 32bits value
/* check that we have enough room in ring */
if (unlikely(n > free_entries)) {
- __RING_STAT_ADD(r, enq_fail, n);
- return -ENOBUFS;
+ if (behavior == RTE_RING_QUEUE_FIXED) {
+ __RING_STAT_ADD(r, enq_fail, n);
+ return -ENOBUFS;
+ }
+ else {
+ /* No free entry available */
+ if (unlikely(free_entries == 0)) {
+ __RING_STAT_ADD(r, enq_fail, n);
+ return 0;
+ }
+
+ n = free_entries;
+ }
}
prod_next = prod_head + n;
r->ring[(prod_head + i) & mask] = obj_table[i];
rte_wmb();
- /* return -EDQUOT if we exceed the watermark */
+ /* if we exceed the watermark */
if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
- ret = -EDQUOT;
+ ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
+ (int)(n | RTE_RING_QUOT_EXCEED);
__RING_STAT_ADD(r, enq_quota, n);
}
else {
- ret = 0;
+ ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
__RING_STAT_ADD(r, enq_success, n);
}
}
/**
- * Enqueue several objects on a ring (NOT multi-producers safe).
+ * @internal Enqueue several objects on a ring (NOT multi-producers safe).
*
* @param r
* A pointer to the ring structure.
* @param obj_table
* A pointer to a table of void * pointers (objects).
* @param n
- * The number of objects to add in the ring from the obj_table. The
- * value must be strictly positive.
+ * The number of objects to add in the ring from the obj_table.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
+ * RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring
* @return
- * - 0: Success; objects enqueued.
+ * Depend on the behavior value
+ * if behavior = RTE_RING_QUEUE_FIXED
+ * - 0: Success; objects enqueue.
* - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
* high water mark is exceeded.
- * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
+ * - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
+ * if behavior = RTE_RING_QUEUE_VARIABLE
+ * - n: Actual number of objects enqueued.
*/
static inline int
-rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
- unsigned n)
+__rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
+ unsigned n, enum rte_ring_queue_behavior behavior)
{
uint32_t prod_head, cons_tail;
uint32_t prod_next, free_entries;
/* check that we have enough room in ring */
if (unlikely(n > free_entries)) {
- __RING_STAT_ADD(r, enq_fail, n);
- return -ENOBUFS;
+ if (behavior == RTE_RING_QUEUE_FIXED) {
+ __RING_STAT_ADD(r, enq_fail, n);
+ return -ENOBUFS;
+ }
+ else {
+ /* No free entry available */
+ if (unlikely(free_entries == 0)) {
+ __RING_STAT_ADD(r, enq_fail, n);
+ return 0;
+ }
+
+ n = free_entries;
+ }
}
prod_next = prod_head + n;
r->ring[(prod_head + i) & mask] = obj_table[i];
rte_wmb();
- /* return -EDQUOT if we exceed the watermark */
+ /* if we exceed the watermark */
if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
- ret = -EDQUOT;
+ ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
+ (int)(n | RTE_RING_QUOT_EXCEED);
__RING_STAT_ADD(r, enq_quota, n);
}
else {
- ret = 0;
+ ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
__RING_STAT_ADD(r, enq_success, n);
}
return ret;
}
+/**
+ * @internal Dequeue several objects from a ring (multi-consumers safe). When
+ * the request objects are more than the available objects, only dequeue the
+ * actual number of objects
+ *
+ * This function uses a "compare and set" instruction to move the
+ * consumer index atomically.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ * The number of objects to dequeue from the ring to the obj_table.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
+ * RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring
+ * @return
+ * Depend on the behavior value
+ * if behavior = RTE_RING_QUEUE_FIXED
+ * - 0: Success; objects dequeued.
+ * - -ENOENT: Not enough entries in the ring to dequeue; no object is
+ * dequeued.
+ * if behavior = RTE_RING_QUEUE_VARIABLE
+ * - n: Actual number of objects dequeued.
+ */
+
+static inline int
+__rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
+ unsigned n, enum rte_ring_queue_behavior behavior)
+{
+ uint32_t cons_head, prod_tail;
+ uint32_t cons_next, entries;
+ const unsigned max = n;
+ int success;
+ unsigned i;
+ uint32_t mask = r->prod.mask;
+
+ /* move cons.head atomically */
+ do {
+ /* Restore n as it may change every loop */
+ n = max;
+
+ cons_head = r->cons.head;
+ prod_tail = r->prod.tail;
+ /* The subtraction is done between two unsigned 32bits value
+ * (the result is always modulo 32 bits even if we have
+ * cons_head > prod_tail). So 'entries' is always between 0
+ * and size(ring)-1. */
+ entries = (prod_tail - cons_head);
+
+ /* Set the actual entries for dequeue */
+ if (unlikely(n > entries)) {
+ if (behavior == RTE_RING_QUEUE_FIXED) {
+ __RING_STAT_ADD(r, deq_fail, n);
+ return -ENOENT;
+ }
+ else {
+ if (unlikely(entries == 0)){
+ __RING_STAT_ADD(r, deq_fail, n);
+ return 0;
+ }
+
+ n = entries;
+ }
+ }
+
+ cons_next = cons_head + n;
+ success = rte_atomic32_cmpset(&r->cons.head, cons_head,
+ cons_next);
+ } while (unlikely(success == 0));
+
+ /* copy in table */
+ rte_rmb();
+ for (i = 0; likely(i < n); i++) {
+ obj_table[i] = r->ring[(cons_head + i) & mask];
+ }
+
+ /*
+ * If there are other dequeues in progress that preceded us,
+ * we need to wait for them to complete
+ */
+ while (unlikely(r->cons.tail != cons_head))
+ rte_pause();
+
+ __RING_STAT_ADD(r, deq_success, n);
+ r->cons.tail = cons_next;
+
+ return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
+}
+
+/**
+ * @internal Dequeue several objects from a ring (NOT multi-consumers safe).
+ * When the request objects are more than the available objects, only dequeue
+ * the actual number of objects
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ * The number of objects to dequeue from the ring to the obj_table.
+ * @param behavior
+ * RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
+ * RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring
+ * @return
+ * Depend on the behavior value
+ * if behavior = RTE_RING_QUEUE_FIXED
+ * - 0: Success; objects dequeued.
+ * - -ENOENT: Not enough entries in the ring to dequeue; no object is
+ * dequeued.
+ * if behavior = RTE_RING_QUEUE_VARIABLE
+ * - n: Actual number of objects dequeued.
+ */
+static inline int
+__rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
+ unsigned n, enum rte_ring_queue_behavior behavior)
+{
+ uint32_t cons_head, prod_tail;
+ uint32_t cons_next, entries;
+ unsigned i;
+ uint32_t mask = r->prod.mask;
+
+ cons_head = r->cons.head;
+ prod_tail = r->prod.tail;
+ /* The subtraction is done between two unsigned 32bits value
+ * (the result is always modulo 32 bits even if we have
+ * cons_head > prod_tail). So 'entries' is always between 0
+ * and size(ring)-1. */
+ entries = prod_tail - cons_head;
+
+ if (unlikely(n > entries)) {
+ if (behavior == RTE_RING_QUEUE_FIXED) {
+ __RING_STAT_ADD(r, deq_fail, n);
+ return -ENOENT;
+ }
+ else {
+ if (unlikely(entries == 0)){
+ __RING_STAT_ADD(r, deq_fail, n);
+ return 0;
+ }
+
+ n = entries;
+ }
+ }
+
+ cons_next = cons_head + n;
+ r->cons.head = cons_next;
+
+ /* copy in table */
+ rte_rmb();
+ for (i = 0; likely(i < n); i++) {
+ obj_table[i] = r->ring[(cons_head + i) & mask];
+ }
+
+ __RING_STAT_ADD(r, deq_success, n);
+ r->cons.tail = cons_next;
+ return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
+}
+
+/**
+ * Enqueue several objects on the ring (multi-producers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * producer index atomically.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @return
+ * - 0: Success; objects enqueue.
+ * - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
+ * high water mark is exceeded.
+ * - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
+ */
+static inline int
+rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
+ unsigned n)
+{
+ return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+}
+
+/**
+ * Enqueue several objects on a ring (NOT multi-producers safe).
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @return
+ * - 0: Success; objects enqueued.
+ * - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
+ * high water mark is exceeded.
+ * - -ENOBUFS: Not enough room in the ring to enqueue; no object is enqueued.
+ */
+static inline int
+rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
+ unsigned n)
+{
+ return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
+}
+
/**
* Enqueue several objects on a ring.
*
* @param obj_table
* A pointer to a table of void * pointers (objects) that will be filled.
* @param n
- * The number of objects to dequeue from the ring to the obj_table,
- * must be strictly positive
+ * The number of objects to dequeue from the ring to the obj_table.
* @return
* - 0: Success; objects dequeued.
* - -ENOENT: Not enough entries in the ring to dequeue; no object is
static inline int
rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
{
- uint32_t cons_head, prod_tail;
- uint32_t cons_next, entries;
- int success;
- unsigned i;
- uint32_t mask = r->prod.mask;
-
- /* move cons.head atomically */
- do {
- cons_head = r->cons.head;
- prod_tail = r->prod.tail;
- /* The subtraction is done between two unsigned 32bits value
- * (the result is always modulo 32 bits even if we have
- * cons_head > prod_tail). So 'entries' is always between 0
- * and size(ring)-1. */
- entries = (prod_tail - cons_head);
-
- /* check that we have enough entries in ring */
- if (unlikely(n > entries)) {
- __RING_STAT_ADD(r, deq_fail, n);
- return -ENOENT;
- }
-
- cons_next = cons_head + n;
- success = rte_atomic32_cmpset(&r->cons.head, cons_head,
- cons_next);
- } while (unlikely(success == 0));
-
- /* copy in table */
- rte_rmb();
- for (i = 0; likely(i < n); i++) {
- obj_table[i] = r->ring[(cons_head + i) & mask];
- }
-
- /*
- * If there are other dequeues in progress that preceeded us,
- * we need to wait for them to complete
- */
- while (unlikely(r->cons.tail != cons_head))
- rte_pause();
-
- __RING_STAT_ADD(r, deq_success, n);
- r->cons.tail = cons_next;
- return 0;
+ return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
}
/**
static inline int
rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
{
- uint32_t cons_head, prod_tail;
- uint32_t cons_next, entries;
- unsigned i;
- uint32_t mask = r->prod.mask;
-
- cons_head = r->cons.head;
- prod_tail = r->prod.tail;
- /* The subtraction is done between two unsigned 32bits value
- * (the result is always modulo 32 bits even if we have
- * cons_head > prod_tail). So 'entries' is always between 0
- * and size(ring)-1. */
- entries = prod_tail - cons_head;
-
- /* check that we have enough entries in ring */
- if (unlikely(n > entries)) {
- __RING_STAT_ADD(r, deq_fail, n);
- return -ENOENT;
- }
-
- cons_next = cons_head + n;
- r->cons.head = cons_next;
-
- /* copy in table */
- rte_rmb();
- for (i = 0; likely(i < n); i++) {
- obj_table[i] = r->ring[(cons_head + i) & mask];
- }
-
- __RING_STAT_ADD(r, deq_success, n);
- r->cons.tail = cons_next;
- return 0;
+ return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
}
/**
*/
struct rte_ring *rte_ring_lookup(const char *name);
+/**
+ * Enqueue several objects on the ring (multi-producers safe).
+ *
+ * This function uses a "compare and set" instruction to move the
+ * producer index atomically.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @return
+ * - n: Actual number of objects enqueued.
+ */
+static inline int
+rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
+ unsigned n)
+{
+ return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+}
+
+/**
+ * Enqueue several objects on a ring (NOT multi-producers safe).
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @return
+ * - n: Actual number of objects enqueued.
+ */
+static inline int
+rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
+ unsigned n)
+{
+ return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+}
+
+/**
+ * Enqueue several objects on a ring.
+ *
+ * This function calls the multi-producer or the single-producer
+ * version depending on the default behavior that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects).
+ * @param n
+ * The number of objects to add in the ring from the obj_table.
+ * @return
+ * - n: Actual number of objects enqueued.
+ */
+static inline int
+rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
+ unsigned n)
+{
+ if (r->prod.sp_enqueue)
+ return rte_ring_sp_enqueue_burst(r, obj_table, n);
+ else
+ return rte_ring_mp_enqueue_burst(r, obj_table, n);
+}
+
+/**
+ * Dequeue several objects from a ring (multi-consumers safe). When the request
+ * objects are more than the available objects, only dequeue the actual number
+ * of objects
+ *
+ * This function uses a "compare and set" instruction to move the
+ * consumer index atomically.
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ * The number of objects to dequeue from the ring to the obj_table.
+ * @return
+ * - n: Actual number of objects dequeued, 0 if ring is empty
+ */
+static inline int
+rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
+{
+ return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+}
+
+/**
+ * Dequeue several objects from a ring (NOT multi-consumers safe).When the
+ * request objects are more than the available objects, only dequeue the
+ * actual number of objects
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ * The number of objects to dequeue from the ring to the obj_table.
+ * @return
+ * - n: Actual number of objects dequeued, 0 if ring is empty
+ */
+static inline int
+rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
+{
+ return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
+}
+
+/**
+ * Dequeue multiple objects from a ring up to a maximum number.
+ *
+ * This function calls the multi-consumers or the single-consumer
+ * version, depending on the default behaviour that was specified at
+ * ring creation time (see flags).
+ *
+ * @param r
+ * A pointer to the ring structure.
+ * @param obj_table
+ * A pointer to a table of void * pointers (objects) that will be filled.
+ * @param n
+ * The number of objects to dequeue from the ring to the obj_table.
+ * @return
+ * - Number of objects dequeued, or a negative error code on error
+ */
+static inline int
+rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
+{
+ if (r->cons.sc_dequeue)
+ return rte_ring_sc_dequeue_burst(r, obj_table, n);
+ else
+ return rte_ring_mc_dequeue_burst(r, obj_table, n);
+}
+
#ifdef __cplusplus
}
#endif