app: use common macro RTE_DIM
[dpdk.git] / app / test / test_ring_perf.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2014 Intel Corporation
3  * Copyright(c) 2019 Arm Limited
4  */
5
6
7 #include <stdio.h>
8 #include <inttypes.h>
9 #include <rte_ring.h>
10 #include <rte_cycles.h>
11 #include <rte_launch.h>
12 #include <rte_pause.h>
13 #include <string.h>
14
15 #include "test.h"
16 #include "test_ring.h"
17
18 /*
19  * Ring performance test cases, measures performance of various operations
20  * using rdtsc for legacy and 16B size ring elements.
21  */
22
23 #define RING_NAME "RING_PERF"
24 #define RING_SIZE 4096
25 #define MAX_BURST 32
26
27 /*
28  * the sizes to enqueue and dequeue in testing
29  * (marked volatile so they won't be seen as compile-time constants)
30  */
31 static const volatile unsigned bulk_sizes[] = { 8, 32 };
32
33 struct lcore_pair {
34         unsigned c1, c2;
35 };
36
37 static volatile unsigned lcore_count = 0;
38
39 static void
40 test_ring_print_test_string(unsigned int api_type, int esize,
41         unsigned int bsz, double value)
42 {
43         if (esize == -1)
44                 printf("legacy APIs");
45         else
46                 printf("elem APIs: element size %dB", esize);
47
48         if (api_type == TEST_RING_IGNORE_API_TYPE)
49                 return;
50
51         if ((api_type & TEST_RING_THREAD_DEF) == TEST_RING_THREAD_DEF)
52                 printf(": default enqueue/dequeue: ");
53         else if ((api_type & TEST_RING_THREAD_SPSC) == TEST_RING_THREAD_SPSC)
54                 printf(": SP/SC: ");
55         else if ((api_type & TEST_RING_THREAD_MPMC) == TEST_RING_THREAD_MPMC)
56                 printf(": MP/MC: ");
57
58         if ((api_type & TEST_RING_ELEM_SINGLE) == TEST_RING_ELEM_SINGLE)
59                 printf("single: ");
60         else if ((api_type & TEST_RING_ELEM_BULK) == TEST_RING_ELEM_BULK)
61                 printf("bulk (size: %u): ", bsz);
62         else if ((api_type & TEST_RING_ELEM_BURST) == TEST_RING_ELEM_BURST)
63                 printf("burst (size: %u): ", bsz);
64
65         printf("%.2F\n", value);
66 }
67
68 /**** Functions to analyse our core mask to get cores for different tests ***/
69
70 static int
71 get_two_hyperthreads(struct lcore_pair *lcp)
72 {
73         unsigned id1, id2;
74         unsigned c1, c2, s1, s2;
75         RTE_LCORE_FOREACH(id1) {
76                 /* inner loop just re-reads all id's. We could skip the first few
77                  * elements, but since number of cores is small there is little point
78                  */
79                 RTE_LCORE_FOREACH(id2) {
80                         if (id1 == id2)
81                                 continue;
82
83                         c1 = rte_lcore_to_cpu_id(id1);
84                         c2 = rte_lcore_to_cpu_id(id2);
85                         s1 = rte_lcore_to_socket_id(id1);
86                         s2 = rte_lcore_to_socket_id(id2);
87                         if ((c1 == c2) && (s1 == s2)){
88                                 lcp->c1 = id1;
89                                 lcp->c2 = id2;
90                                 return 0;
91                         }
92                 }
93         }
94         return 1;
95 }
96
97 static int
98 get_two_cores(struct lcore_pair *lcp)
99 {
100         unsigned id1, id2;
101         unsigned c1, c2, s1, s2;
102         RTE_LCORE_FOREACH(id1) {
103                 RTE_LCORE_FOREACH(id2) {
104                         if (id1 == id2)
105                                 continue;
106
107                         c1 = rte_lcore_to_cpu_id(id1);
108                         c2 = rte_lcore_to_cpu_id(id2);
109                         s1 = rte_lcore_to_socket_id(id1);
110                         s2 = rte_lcore_to_socket_id(id2);
111                         if ((c1 != c2) && (s1 == s2)){
112                                 lcp->c1 = id1;
113                                 lcp->c2 = id2;
114                                 return 0;
115                         }
116                 }
117         }
118         return 1;
119 }
120
121 static int
122 get_two_sockets(struct lcore_pair *lcp)
123 {
124         unsigned id1, id2;
125         unsigned s1, s2;
126         RTE_LCORE_FOREACH(id1) {
127                 RTE_LCORE_FOREACH(id2) {
128                         if (id1 == id2)
129                                 continue;
130                         s1 = rte_lcore_to_socket_id(id1);
131                         s2 = rte_lcore_to_socket_id(id2);
132                         if (s1 != s2){
133                                 lcp->c1 = id1;
134                                 lcp->c2 = id2;
135                                 return 0;
136                         }
137                 }
138         }
139         return 1;
140 }
141
142 /* Get cycle counts for dequeuing from an empty ring. Should be 2 or 3 cycles */
143 static void
144 test_empty_dequeue(struct rte_ring *r, const int esize,
145                         const unsigned int api_type)
146 {
147         const unsigned int iter_shift = 26;
148         const unsigned int iterations = 1 << iter_shift;
149         unsigned int i = 0;
150         void *burst[MAX_BURST];
151
152         const uint64_t start = rte_rdtsc();
153         for (i = 0; i < iterations; i++)
154                 test_ring_dequeue(r, burst, esize, bulk_sizes[0], api_type);
155         const uint64_t end = rte_rdtsc();
156
157         test_ring_print_test_string(api_type, esize, bulk_sizes[0],
158                                         ((double)(end - start)) / iterations);
159 }
160
161 /*
162  * for the separate enqueue and dequeue threads they take in one param
163  * and return two. Input = burst size, output = cycle average for sp/sc & mp/mc
164  */
165 struct thread_params {
166         struct rte_ring *r;
167         unsigned size;        /* input value, the burst size */
168         double spsc, mpmc;    /* output value, the single or multi timings */
169 };
170
171 /*
172  * Helper function to call bulk SP/MP enqueue functions.
173  * flag == 0 -> enqueue
174  * flag == 1 -> dequeue
175  */
176 static __rte_always_inline int
177 enqueue_dequeue_bulk_helper(const unsigned int flag, const int esize,
178         struct thread_params *p)
179 {
180         int ret;
181         const unsigned int iter_shift = 23;
182         const unsigned int iterations = 1 << iter_shift;
183         struct rte_ring *r = p->r;
184         unsigned int bsize = p->size;
185         unsigned int i;
186         void *burst = NULL;
187
188 #ifdef RTE_USE_C11_MEM_MODEL
189         if (__atomic_add_fetch(&lcore_count, 1, __ATOMIC_RELAXED) != 2)
190 #else
191         if (__sync_add_and_fetch(&lcore_count, 1) != 2)
192 #endif
193                 while(lcore_count != 2)
194                         rte_pause();
195
196         burst = test_ring_calloc(MAX_BURST, esize);
197         if (burst == NULL)
198                 return -1;
199
200         const uint64_t sp_start = rte_rdtsc();
201         for (i = 0; i < iterations; i++)
202                 do {
203                         if (flag == 0)
204                                 ret = test_ring_enqueue(r, burst, esize, bsize,
205                                                 TEST_RING_THREAD_SPSC |
206                                                 TEST_RING_ELEM_BULK);
207                         else if (flag == 1)
208                                 ret = test_ring_dequeue(r, burst, esize, bsize,
209                                                 TEST_RING_THREAD_SPSC |
210                                                 TEST_RING_ELEM_BULK);
211                         if (ret == 0)
212                                 rte_pause();
213                 } while (!ret);
214         const uint64_t sp_end = rte_rdtsc();
215
216         const uint64_t mp_start = rte_rdtsc();
217         for (i = 0; i < iterations; i++)
218                 do {
219                         if (flag == 0)
220                                 ret = test_ring_enqueue(r, burst, esize, bsize,
221                                                 TEST_RING_THREAD_MPMC |
222                                                 TEST_RING_ELEM_BULK);
223                         else if (flag == 1)
224                                 ret = test_ring_dequeue(r, burst, esize, bsize,
225                                                 TEST_RING_THREAD_MPMC |
226                                                 TEST_RING_ELEM_BULK);
227                         if (ret == 0)
228                                 rte_pause();
229                 } while (!ret);
230         const uint64_t mp_end = rte_rdtsc();
231
232         p->spsc = ((double)(sp_end - sp_start))/(iterations * bsize);
233         p->mpmc = ((double)(mp_end - mp_start))/(iterations * bsize);
234         return 0;
235 }
236
237 /*
238  * Function that uses rdtsc to measure timing for ring enqueue. Needs pair
239  * thread running dequeue_bulk function
240  */
241 static int
242 enqueue_bulk(void *p)
243 {
244         struct thread_params *params = p;
245
246         return enqueue_dequeue_bulk_helper(0, -1, params);
247 }
248
249 static int
250 enqueue_bulk_16B(void *p)
251 {
252         struct thread_params *params = p;
253
254         return enqueue_dequeue_bulk_helper(0, 16, params);
255 }
256
257 /*
258  * Function that uses rdtsc to measure timing for ring dequeue. Needs pair
259  * thread running enqueue_bulk function
260  */
261 static int
262 dequeue_bulk(void *p)
263 {
264         struct thread_params *params = p;
265
266         return enqueue_dequeue_bulk_helper(1, -1, params);
267 }
268
269 static int
270 dequeue_bulk_16B(void *p)
271 {
272         struct thread_params *params = p;
273
274         return enqueue_dequeue_bulk_helper(1, 16, params);
275 }
276
277 /*
278  * Function that calls the enqueue and dequeue bulk functions on pairs of cores.
279  * used to measure ring perf between hyperthreads, cores and sockets.
280  */
281 static int
282 run_on_core_pair(struct lcore_pair *cores, struct rte_ring *r, const int esize)
283 {
284         lcore_function_t *f1, *f2;
285         struct thread_params param1 = {0}, param2 = {0};
286         unsigned i;
287
288         if (esize == -1) {
289                 f1 = enqueue_bulk;
290                 f2 = dequeue_bulk;
291         } else {
292                 f1 = enqueue_bulk_16B;
293                 f2 = dequeue_bulk_16B;
294         }
295
296         for (i = 0; i < RTE_DIM(bulk_sizes); i++) {
297                 lcore_count = 0;
298                 param1.size = param2.size = bulk_sizes[i];
299                 param1.r = param2.r = r;
300                 if (cores->c1 == rte_get_master_lcore()) {
301                         rte_eal_remote_launch(f2, &param2, cores->c2);
302                         f1(&param1);
303                         rte_eal_wait_lcore(cores->c2);
304                 } else {
305                         rte_eal_remote_launch(f1, &param1, cores->c1);
306                         rte_eal_remote_launch(f2, &param2, cores->c2);
307                         if (rte_eal_wait_lcore(cores->c1) < 0)
308                                 return -1;
309                         if (rte_eal_wait_lcore(cores->c2) < 0)
310                                 return -1;
311                 }
312                 test_ring_print_test_string(
313                         TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BULK,
314                         esize, bulk_sizes[i], param1.spsc + param2.spsc);
315                 test_ring_print_test_string(
316                         TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK,
317                         esize, bulk_sizes[i], param1.mpmc + param2.mpmc);
318         }
319
320         return 0;
321 }
322
323 static rte_atomic32_t synchro;
324 static uint64_t queue_count[RTE_MAX_LCORE];
325
326 #define TIME_MS 100
327
328 static int
329 load_loop_fn_helper(struct thread_params *p, const int esize)
330 {
331         uint64_t time_diff = 0;
332         uint64_t begin = 0;
333         uint64_t hz = rte_get_timer_hz();
334         uint64_t lcount = 0;
335         const unsigned int lcore = rte_lcore_id();
336         struct thread_params *params = p;
337         void *burst = NULL;
338
339         burst = test_ring_calloc(MAX_BURST, esize);
340         if (burst == NULL)
341                 return -1;
342
343         /* wait synchro for slaves */
344         if (lcore != rte_get_master_lcore())
345                 while (rte_atomic32_read(&synchro) == 0)
346                         rte_pause();
347
348         begin = rte_get_timer_cycles();
349         while (time_diff < hz * TIME_MS / 1000) {
350                 test_ring_enqueue(params->r, burst, esize, params->size,
351                                 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK);
352                 test_ring_dequeue(params->r, burst, esize, params->size,
353                                 TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK);
354                 lcount++;
355                 time_diff = rte_get_timer_cycles() - begin;
356         }
357         queue_count[lcore] = lcount;
358
359         rte_free(burst);
360
361         return 0;
362 }
363
364 static int
365 load_loop_fn(void *p)
366 {
367         struct thread_params *params = p;
368
369         return load_loop_fn_helper(params, -1);
370 }
371
372 static int
373 load_loop_fn_16B(void *p)
374 {
375         struct thread_params *params = p;
376
377         return load_loop_fn_helper(params, 16);
378 }
379
380 static int
381 run_on_all_cores(struct rte_ring *r, const int esize)
382 {
383         uint64_t total = 0;
384         struct thread_params param;
385         lcore_function_t *lcore_f;
386         unsigned int i, c;
387
388         if (esize == -1)
389                 lcore_f = load_loop_fn;
390         else
391                 lcore_f = load_loop_fn_16B;
392
393         memset(&param, 0, sizeof(struct thread_params));
394         for (i = 0; i < RTE_DIM(bulk_sizes); i++) {
395                 printf("\nBulk enq/dequeue count on size %u\n", bulk_sizes[i]);
396                 param.size = bulk_sizes[i];
397                 param.r = r;
398
399                 /* clear synchro and start slaves */
400                 rte_atomic32_set(&synchro, 0);
401                 if (rte_eal_mp_remote_launch(lcore_f, &param, SKIP_MASTER) < 0)
402                         return -1;
403
404                 /* start synchro and launch test on master */
405                 rte_atomic32_set(&synchro, 1);
406                 lcore_f(&param);
407
408                 rte_eal_mp_wait_lcore();
409
410                 RTE_LCORE_FOREACH(c) {
411                         printf("Core [%u] count = %"PRIu64"\n",
412                                         c, queue_count[c]);
413                         total += queue_count[c];
414                 }
415
416                 printf("Total count (size: %u): %"PRIu64"\n",
417                                 bulk_sizes[i], total);
418         }
419
420         return 0;
421 }
422
423 /*
424  * Test function that determines how long an enqueue + dequeue of a single item
425  * takes on a single lcore. Result is for comparison with the bulk enq+deq.
426  */
427 static int
428 test_single_enqueue_dequeue(struct rte_ring *r, const int esize,
429         const unsigned int api_type)
430 {
431         const unsigned int iter_shift = 24;
432         const unsigned int iterations = 1 << iter_shift;
433         unsigned int i = 0;
434         void *burst = NULL;
435
436         /* alloc dummy object pointers */
437         burst = test_ring_calloc(1, esize);
438         if (burst == NULL)
439                 return -1;
440
441         const uint64_t start = rte_rdtsc();
442         for (i = 0; i < iterations; i++) {
443                 test_ring_enqueue(r, burst, esize, 1, api_type);
444                 test_ring_dequeue(r, burst, esize, 1, api_type);
445         }
446         const uint64_t end = rte_rdtsc();
447
448         test_ring_print_test_string(api_type, esize, 1,
449                                         ((double)(end - start)) / iterations);
450
451         rte_free(burst);
452
453         return 0;
454 }
455
456 /*
457  * Test that does both enqueue and dequeue on a core using the burst/bulk API
458  * calls Results should be the same as for the bulk function called on a
459  * single lcore.
460  */
461 static int
462 test_burst_bulk_enqueue_dequeue(struct rte_ring *r, const int esize,
463         const unsigned int api_type)
464 {
465         const unsigned int iter_shift = 23;
466         const unsigned int iterations = 1 << iter_shift;
467         unsigned int sz, i = 0;
468         void **burst = NULL;
469
470         burst = test_ring_calloc(MAX_BURST, esize);
471         if (burst == NULL)
472                 return -1;
473
474         for (sz = 0; sz < RTE_DIM(bulk_sizes); sz++) {
475                 const uint64_t start = rte_rdtsc();
476                 for (i = 0; i < iterations; i++) {
477                         test_ring_enqueue(r, burst, esize, bulk_sizes[sz],
478                                                 api_type);
479                         test_ring_dequeue(r, burst, esize, bulk_sizes[sz],
480                                                 api_type);
481                 }
482                 const uint64_t end = rte_rdtsc();
483
484                 test_ring_print_test_string(api_type, esize, bulk_sizes[sz],
485                                         ((double)(end - start)) / iterations);
486         }
487
488         rte_free(burst);
489
490         return 0;
491 }
492
493 /* Run all tests for a given element size */
494 static __rte_always_inline int
495 test_ring_perf_esize(const int esize)
496 {
497         struct lcore_pair cores;
498         struct rte_ring *r = NULL;
499
500         /*
501          * Performance test for legacy/_elem APIs
502          * SP-SC/MP-MC, single
503          */
504         r = test_ring_create(RING_NAME, esize, RING_SIZE, rte_socket_id(), 0);
505         if (r == NULL)
506                 goto test_fail;
507
508         printf("\n### Testing single element enq/deq ###\n");
509         if (test_single_enqueue_dequeue(r, esize,
510                         TEST_RING_THREAD_SPSC | TEST_RING_ELEM_SINGLE) < 0)
511                 goto test_fail;
512         if (test_single_enqueue_dequeue(r, esize,
513                         TEST_RING_THREAD_MPMC | TEST_RING_ELEM_SINGLE) < 0)
514                 goto test_fail;
515
516         printf("\n### Testing burst enq/deq ###\n");
517         if (test_burst_bulk_enqueue_dequeue(r, esize,
518                         TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BURST) < 0)
519                 goto test_fail;
520         if (test_burst_bulk_enqueue_dequeue(r, esize,
521                         TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BURST) < 0)
522                 goto test_fail;
523
524         printf("\n### Testing bulk enq/deq ###\n");
525         if (test_burst_bulk_enqueue_dequeue(r, esize,
526                         TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BULK) < 0)
527                 goto test_fail;
528         if (test_burst_bulk_enqueue_dequeue(r, esize,
529                         TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK) < 0)
530                 goto test_fail;
531
532         printf("\n### Testing empty bulk deq ###\n");
533         test_empty_dequeue(r, esize,
534                         TEST_RING_THREAD_SPSC | TEST_RING_ELEM_BULK);
535         test_empty_dequeue(r, esize,
536                         TEST_RING_THREAD_MPMC | TEST_RING_ELEM_BULK);
537
538         if (get_two_hyperthreads(&cores) == 0) {
539                 printf("\n### Testing using two hyperthreads ###\n");
540                 if (run_on_core_pair(&cores, r, esize) < 0)
541                         goto test_fail;
542         }
543
544         if (get_two_cores(&cores) == 0) {
545                 printf("\n### Testing using two physical cores ###\n");
546                 if (run_on_core_pair(&cores, r, esize) < 0)
547                         goto test_fail;
548         }
549         if (get_two_sockets(&cores) == 0) {
550                 printf("\n### Testing using two NUMA nodes ###\n");
551                 if (run_on_core_pair(&cores, r, esize) < 0)
552                         goto test_fail;
553         }
554
555         printf("\n### Testing using all slave nodes ###\n");
556         if (run_on_all_cores(r, esize) < 0)
557                 goto test_fail;
558
559         rte_ring_free(r);
560
561         return 0;
562
563 test_fail:
564         rte_ring_free(r);
565
566         return -1;
567 }
568
569 static int
570 test_ring_perf(void)
571 {
572         /* Run all the tests for different element sizes */
573         if (test_ring_perf_esize(-1) == -1)
574                 return -1;
575
576         if (test_ring_perf_esize(16) == -1)
577                 return -1;
578
579         return 0;
580 }
581
582 REGISTER_TEST_COMMAND(ring_perf_autotest, test_ring_perf);