1 /* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright(c) 2010-2014 Intel Corporation
3 * Copyright(c) 2019 Arm Limited
10 #include <rte_cycles.h>
11 #include <rte_launch.h>
12 #include <rte_pause.h>
16 #include "test_ring.h"
19 * Ring performance test cases, measures performance of various operations
20 * using rdtsc for legacy and 16B size ring elements.
23 #define RING_NAME "RING_PERF"
24 #define RING_SIZE 4096
28 * the sizes to enqueue and dequeue in testing
29 * (marked volatile so they won't be seen as compile-time constants)
31 static const volatile unsigned bulk_sizes[] = { 8, 32 };
37 static volatile unsigned lcore_count = 0;
40 test_ring_print_test_string(unsigned int api_type, int esize,
41 unsigned int bsz, double value)
44 printf("legacy APIs");
46 printf("elem APIs: element size %dB", esize);
48 if (api_type == TEST_RING_IGNORE_API_TYPE)
51 if ((api_type & TEST_RING_THREAD_DEF) == TEST_RING_THREAD_DEF)
52 printf(": default enqueue/dequeue: ");
53 else if ((api_type & TEST_RING_THREAD_SPSC) == TEST_RING_THREAD_SPSC)
55 else if ((api_type & TEST_RING_THREAD_MPMC) == TEST_RING_THREAD_MPMC)
58 if ((api_type & TEST_RING_ELEM_SINGLE) == TEST_RING_ELEM_SINGLE)
60 else if ((api_type & TEST_RING_ELEM_BULK) == TEST_RING_ELEM_BULK)
61 printf("bulk (size: %u): ", bsz);
62 else if ((api_type & TEST_RING_ELEM_BURST) == TEST_RING_ELEM_BURST)
63 printf("burst (size: %u): ", bsz);
65 printf("%.2F\n", value);
68 /**** Functions to analyse our core mask to get cores for different tests ***/
71 get_two_hyperthreads(struct lcore_pair *lcp)
74 unsigned c1, c2, s1, s2;
75 RTE_LCORE_FOREACH(id1) {
76 /* inner loop just re-reads all id's. We could skip the first few
77 * elements, but since number of cores is small there is little point
79 RTE_LCORE_FOREACH(id2) {
83 c1 = rte_lcore_to_cpu_id(id1);
84 c2 = rte_lcore_to_cpu_id(id2);
85 s1 = rte_lcore_to_socket_id(id1);
86 s2 = rte_lcore_to_socket_id(id2);
87 if ((c1 == c2) && (s1 == s2)){
98 get_two_cores(struct lcore_pair *lcp)
101 unsigned c1, c2, s1, s2;
102 RTE_LCORE_FOREACH(id1) {
103 RTE_LCORE_FOREACH(id2) {
107 c1 = rte_lcore_to_cpu_id(id1);
108 c2 = rte_lcore_to_cpu_id(id2);
109 s1 = rte_lcore_to_socket_id(id1);
110 s2 = rte_lcore_to_socket_id(id2);
111 if ((c1 != c2) && (s1 == s2)){
122 get_two_sockets(struct lcore_pair *lcp)
126 RTE_LCORE_FOREACH(id1) {
127 RTE_LCORE_FOREACH(id2) {
130 s1 = rte_lcore_to_socket_id(id1);
131 s2 = rte_lcore_to_socket_id(id2);
142 /* Get cycle counts for dequeuing from an empty ring. Should be 2 or 3 cycles */
144 test_empty_dequeue(struct rte_ring *r, const int esize,
145 const unsigned int api_type)
147 const unsigned int iter_shift = 26;
148 const unsigned int iterations = 1 << iter_shift;
150 void *burst[MAX_BURST];
152 const uint64_t start = rte_rdtsc();
153 for (i = 0; i < iterations; i++)
154 test_ring_dequeue(r, burst, esize, bulk_sizes[0], api_type);
155 const uint64_t end = rte_rdtsc();
157 test_ring_print_test_string(api_type, esize, bulk_sizes[0],
158 ((double)(end - start)) / iterations);
162 * for the separate enqueue and dequeue threads they take in one param
163 * and return two. Input = burst size, output = cycle average for sp/sc & mp/mc
165 struct thread_params {
167 unsigned size; /* input value, the burst size */
168 double spsc, mpmc; /* output value, the single or multi timings */
172 * Helper function to call bulk SP/MP enqueue functions.
173 * flag == 0 -> enqueue
174 * flag == 1 -> dequeue
176 static __rte_always_inline int
177 enqueue_dequeue_bulk_helper(const unsigned int flag, const int esize,
178 struct thread_params *p)
181 const unsigned int iter_shift = 15;
182 const unsigned int iterations = 1 << iter_shift;
183 struct rte_ring *r = p->r;
184 unsigned int bsize = p->size;
188 #ifdef RTE_USE_C11_MEM_MODEL
189 if (__atomic_add_fetch(&lcore_count, 1, __ATOMIC_RELAXED) != 2)
191 if (__sync_add_and_fetch(&lcore_count, 1) != 2)
193 while(lcore_count != 2)
196 burst = test_ring_calloc(MAX_BURST, esize);
200 const uint64_t sp_start = rte_rdtsc();
201 for (i = 0; i < iterations; i++)
204 ret = test_ring_enqueue(r, burst, esize, bsize,
205 TEST_RING_THREAD_SPSC |
206 TEST_RING_ELEM_BULK);
208 ret = test_ring_dequeue(r, burst, esize, bsize,
209 TEST_RING_THREAD_SPSC |
210 TEST_RING_ELEM_BULK);
214 const uint64_t sp_end = rte_rdtsc();
216 const uint64_t mp_start = rte_rdtsc();
217 for (i = 0; i < iterations; i++)
220 ret = test_ring_enqueue(r, burst, esize, bsize,
221 TEST_RING_THREAD_MPMC |
222 TEST_RING_ELEM_BULK);
224 ret = test_ring_dequeue(r, burst, esize, bsize,
225 TEST_RING_THREAD_MPMC |
226 TEST_RING_ELEM_BULK);
230 const uint64_t mp_end = rte_rdtsc();
232 p->spsc = ((double)(sp_end - sp_start))/(iterations * bsize);
233 p->mpmc = ((double)(mp_end - mp_start))/(iterations * bsize);
238 * Function that uses rdtsc to measure timing for ring enqueue. Needs pair
239 * thread running dequeue_bulk function
242 enqueue_bulk(void *p)
244 struct thread_params *params = p;
246 return enqueue_dequeue_bulk_helper(0, -1, params);
250 enqueue_bulk_16B(void *p)
252 struct thread_params *params = p;
254 return enqueue_dequeue_bulk_helper(0, 16, params);
258 * Function that uses rdtsc to measure timing for ring dequeue. Needs pair
259 * thread running enqueue_bulk function
262 dequeue_bulk(void *p)
264 struct thread_params *params = p;
266 return enqueue_dequeue_bulk_helper(1, -1, params);
270 dequeue_bulk_16B(void *p)
272 struct thread_params *params = p;
274 return enqueue_dequeue_bulk_helper(1, 16, params);
278 * Function that calls the enqueue and dequeue bulk functions on pairs of cores.
279 * used to measure ring perf between hyperthreads, cores and sockets.
282 run_on_core_pair(struct lcore_pair *cores, struct rte_ring *r, const int esize)
284 lcore_function_t *f1, *f2;
285 struct thread_params param1 = {0}, param2 = {0};
292 f1 = enqueue_bulk_16B;
293 f2 = dequeue_bulk_16B;
296 for (i = 0; i < RTE_DIM(bulk_sizes); i++) {
298 param1.size = param2.size = bulk_sizes[i];
299 param1.r = param2.r = r;
300 if (cores->c1 == rte_get_main_lcore()) {
301 rte_eal_remote_launch(f2, ¶m2, cores->c2);
303 rte_eal_wait_lcore(cores->c2);
305 rte_eal_remote_launch(f1, ¶m1, cores->c1);
306 rte_eal_remote_launch(f2, ¶m2, cores->c2);
307 if (rte_eal_wait_lcore(cores->c1) < 0)
309 if (rte_eal_wait_lcore(cores->c2) < 0)
312 test_ring_print_test_string(
313 TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BULK,
314 esize, bulk_sizes[i], param1.spsc + param2.spsc);
315 test_ring_print_test_string(
316 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK,
317 esize, bulk_sizes[i], param1.mpmc + param2.mpmc);
323 static rte_atomic32_t synchro;
324 static uint64_t queue_count[RTE_MAX_LCORE];
329 load_loop_fn_helper(struct thread_params *p, const int esize)
331 uint64_t time_diff = 0;
333 uint64_t hz = rte_get_timer_hz();
335 const unsigned int lcore = rte_lcore_id();
336 struct thread_params *params = p;
339 burst = test_ring_calloc(MAX_BURST, esize);
343 /* wait synchro for workers */
344 if (lcore != rte_get_main_lcore())
345 while (rte_atomic32_read(&synchro) == 0)
348 begin = rte_get_timer_cycles();
349 while (time_diff < hz * TIME_MS / 1000) {
350 test_ring_enqueue(params->r, burst, esize, params->size,
351 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK);
352 test_ring_dequeue(params->r, burst, esize, params->size,
353 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK);
355 time_diff = rte_get_timer_cycles() - begin;
357 queue_count[lcore] = lcount;
365 load_loop_fn(void *p)
367 struct thread_params *params = p;
369 return load_loop_fn_helper(params, -1);
373 load_loop_fn_16B(void *p)
375 struct thread_params *params = p;
377 return load_loop_fn_helper(params, 16);
381 run_on_all_cores(struct rte_ring *r, const int esize)
384 struct thread_params param;
385 lcore_function_t *lcore_f;
389 lcore_f = load_loop_fn;
391 lcore_f = load_loop_fn_16B;
393 memset(¶m, 0, sizeof(struct thread_params));
394 for (i = 0; i < RTE_DIM(bulk_sizes); i++) {
396 printf("\nBulk enq/dequeue count on size %u\n", bulk_sizes[i]);
397 param.size = bulk_sizes[i];
400 /* clear synchro and start workers */
401 rte_atomic32_set(&synchro, 0);
402 if (rte_eal_mp_remote_launch(lcore_f, ¶m, SKIP_MAIN) < 0)
405 /* start synchro and launch test on main */
406 rte_atomic32_set(&synchro, 1);
409 rte_eal_mp_wait_lcore();
411 RTE_LCORE_FOREACH(c) {
412 printf("Core [%u] count = %"PRIu64"\n",
414 total += queue_count[c];
417 printf("Total count (size: %u): %"PRIu64"\n",
418 bulk_sizes[i], total);
425 * Test function that determines how long an enqueue + dequeue of a single item
426 * takes on a single lcore. Result is for comparison with the bulk enq+deq.
429 test_single_enqueue_dequeue(struct rte_ring *r, const int esize,
430 const unsigned int api_type)
432 const unsigned int iter_shift = 24;
433 const unsigned int iterations = 1 << iter_shift;
437 /* alloc dummy object pointers */
438 burst = test_ring_calloc(1, esize);
442 const uint64_t start = rte_rdtsc();
443 for (i = 0; i < iterations; i++) {
444 test_ring_enqueue(r, burst, esize, 1, api_type);
445 test_ring_dequeue(r, burst, esize, 1, api_type);
447 const uint64_t end = rte_rdtsc();
449 test_ring_print_test_string(api_type, esize, 1,
450 ((double)(end - start)) / iterations);
458 * Test that does both enqueue and dequeue on a core using the burst/bulk API
459 * calls Results should be the same as for the bulk function called on a
463 test_burst_bulk_enqueue_dequeue(struct rte_ring *r, const int esize,
464 const unsigned int api_type)
466 const unsigned int iter_shift = 23;
467 const unsigned int iterations = 1 << iter_shift;
468 unsigned int sz, i = 0;
471 burst = test_ring_calloc(MAX_BURST, esize);
475 for (sz = 0; sz < RTE_DIM(bulk_sizes); sz++) {
476 const uint64_t start = rte_rdtsc();
477 for (i = 0; i < iterations; i++) {
478 test_ring_enqueue(r, burst, esize, bulk_sizes[sz],
480 test_ring_dequeue(r, burst, esize, bulk_sizes[sz],
483 const uint64_t end = rte_rdtsc();
485 test_ring_print_test_string(api_type, esize, bulk_sizes[sz],
486 ((double)(end - start)) / iterations);
494 /* Run all tests for a given element size */
495 static __rte_always_inline int
496 test_ring_perf_esize(const int esize)
498 struct lcore_pair cores;
499 struct rte_ring *r = NULL;
502 * Performance test for legacy/_elem APIs
503 * SP-SC/MP-MC, single
505 r = test_ring_create(RING_NAME, esize, RING_SIZE, rte_socket_id(), 0);
509 printf("\n### Testing single element enq/deq ###\n");
510 if (test_single_enqueue_dequeue(r, esize,
511 TEST_RING_THREAD_SPSC | TEST_RING_ELEM_SINGLE) < 0)
513 if (test_single_enqueue_dequeue(r, esize,
514 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_SINGLE) < 0)
517 printf("\n### Testing burst enq/deq ###\n");
518 if (test_burst_bulk_enqueue_dequeue(r, esize,
519 TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BURST) < 0)
521 if (test_burst_bulk_enqueue_dequeue(r, esize,
522 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BURST) < 0)
525 printf("\n### Testing bulk enq/deq ###\n");
526 if (test_burst_bulk_enqueue_dequeue(r, esize,
527 TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BULK) < 0)
529 if (test_burst_bulk_enqueue_dequeue(r, esize,
530 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK) < 0)
533 printf("\n### Testing empty bulk deq ###\n");
534 test_empty_dequeue(r, esize,
535 TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BULK);
536 test_empty_dequeue(r, esize,
537 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK);
539 if (get_two_hyperthreads(&cores) == 0) {
540 printf("\n### Testing using two hyperthreads ###\n");
541 if (run_on_core_pair(&cores, r, esize) < 0)
545 if (get_two_cores(&cores) == 0) {
546 printf("\n### Testing using two physical cores ###\n");
547 if (run_on_core_pair(&cores, r, esize) < 0)
550 if (get_two_sockets(&cores) == 0) {
551 printf("\n### Testing using two NUMA nodes ###\n");
552 if (run_on_core_pair(&cores, r, esize) < 0)
556 printf("\n### Testing using all worker nodes ###\n");
557 if (run_on_all_cores(r, esize) < 0)
573 /* Run all the tests for different element sizes */
574 if (test_ring_perf_esize(-1) == -1)
577 if (test_ring_perf_esize(16) == -1)
583 REGISTER_TEST_COMMAND(ring_perf_autotest, test_ring_perf);