6c2aca48392acc17e2445deaa2cf385c3ec4da1a
[dpdk.git] / app / test / test_ring_perf.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2014 Intel Corporation
3  * Copyright(c) 2019 Arm Limited
4  */
5
6
7 #include <stdio.h>
8 #include <inttypes.h>
9 #include <rte_ring.h>
10 #include <rte_cycles.h>
11 #include <rte_launch.h>
12 #include <rte_pause.h>
13 #include <string.h>
14
15 #include "test.h"
16
17 /*
18  * Ring
19  * ====
20  *
21  * Measures performance of various operations using rdtsc
22  *  * Empty ring dequeue
23  *  * Enqueue/dequeue of bursts in 1 threads
24  *  * Enqueue/dequeue of bursts in 2 threads
25  *  * Enqueue/dequeue of bursts in all available threads
26  */
27
28 #define RING_NAME "RING_PERF"
29 #define RING_SIZE 4096
30 #define MAX_BURST 32
31
32 /*
33  * the sizes to enqueue and dequeue in testing
34  * (marked volatile so they won't be seen as compile-time constants)
35  */
36 static const volatile unsigned bulk_sizes[] = { 8, 32 };
37
38 struct lcore_pair {
39         unsigned c1, c2;
40 };
41
42 static volatile unsigned lcore_count = 0;
43
44 /**** Functions to analyse our core mask to get cores for different tests ***/
45
46 static int
47 get_two_hyperthreads(struct lcore_pair *lcp)
48 {
49         unsigned id1, id2;
50         unsigned c1, c2, s1, s2;
51         RTE_LCORE_FOREACH(id1) {
52                 /* inner loop just re-reads all id's. We could skip the first few
53                  * elements, but since number of cores is small there is little point
54                  */
55                 RTE_LCORE_FOREACH(id2) {
56                         if (id1 == id2)
57                                 continue;
58
59                         c1 = rte_lcore_to_cpu_id(id1);
60                         c2 = rte_lcore_to_cpu_id(id2);
61                         s1 = rte_lcore_to_socket_id(id1);
62                         s2 = rte_lcore_to_socket_id(id2);
63                         if ((c1 == c2) && (s1 == s2)){
64                                 lcp->c1 = id1;
65                                 lcp->c2 = id2;
66                                 return 0;
67                         }
68                 }
69         }
70         return 1;
71 }
72
73 static int
74 get_two_cores(struct lcore_pair *lcp)
75 {
76         unsigned id1, id2;
77         unsigned c1, c2, s1, s2;
78         RTE_LCORE_FOREACH(id1) {
79                 RTE_LCORE_FOREACH(id2) {
80                         if (id1 == id2)
81                                 continue;
82
83                         c1 = rte_lcore_to_cpu_id(id1);
84                         c2 = rte_lcore_to_cpu_id(id2);
85                         s1 = rte_lcore_to_socket_id(id1);
86                         s2 = rte_lcore_to_socket_id(id2);
87                         if ((c1 != c2) && (s1 == s2)){
88                                 lcp->c1 = id1;
89                                 lcp->c2 = id2;
90                                 return 0;
91                         }
92                 }
93         }
94         return 1;
95 }
96
97 static int
98 get_two_sockets(struct lcore_pair *lcp)
99 {
100         unsigned id1, id2;
101         unsigned s1, s2;
102         RTE_LCORE_FOREACH(id1) {
103                 RTE_LCORE_FOREACH(id2) {
104                         if (id1 == id2)
105                                 continue;
106                         s1 = rte_lcore_to_socket_id(id1);
107                         s2 = rte_lcore_to_socket_id(id2);
108                         if (s1 != s2){
109                                 lcp->c1 = id1;
110                                 lcp->c2 = id2;
111                                 return 0;
112                         }
113                 }
114         }
115         return 1;
116 }
117
118 /* Get cycle counts for dequeuing from an empty ring. Should be 2 or 3 cycles */
119 static void
120 test_empty_dequeue(struct rte_ring *r)
121 {
122         const unsigned iter_shift = 26;
123         const unsigned iterations = 1<<iter_shift;
124         unsigned i = 0;
125         void *burst[MAX_BURST];
126
127         const uint64_t sc_start = rte_rdtsc();
128         for (i = 0; i < iterations; i++)
129                 rte_ring_sc_dequeue_bulk(r, burst, bulk_sizes[0], NULL);
130         const uint64_t sc_end = rte_rdtsc();
131
132         const uint64_t mc_start = rte_rdtsc();
133         for (i = 0; i < iterations; i++)
134                 rte_ring_mc_dequeue_bulk(r, burst, bulk_sizes[0], NULL);
135         const uint64_t mc_end = rte_rdtsc();
136
137         printf("SC empty dequeue: %.2F\n",
138                         (double)(sc_end-sc_start) / iterations);
139         printf("MC empty dequeue: %.2F\n",
140                         (double)(mc_end-mc_start) / iterations);
141 }
142
143 /*
144  * for the separate enqueue and dequeue threads they take in one param
145  * and return two. Input = burst size, output = cycle average for sp/sc & mp/mc
146  */
147 struct thread_params {
148         struct rte_ring *r;
149         unsigned size;        /* input value, the burst size */
150         double spsc, mpmc;    /* output value, the single or multi timings */
151 };
152
153 /*
154  * Function that uses rdtsc to measure timing for ring enqueue. Needs pair
155  * thread running dequeue_bulk function
156  */
157 static int
158 enqueue_bulk(void *p)
159 {
160         const unsigned iter_shift = 23;
161         const unsigned iterations = 1<<iter_shift;
162         struct thread_params *params = p;
163         struct rte_ring *r = params->r;
164         const unsigned size = params->size;
165         unsigned i;
166         void *burst[MAX_BURST] = {0};
167
168 #ifdef RTE_USE_C11_MEM_MODEL
169         if (__atomic_add_fetch(&lcore_count, 1, __ATOMIC_RELAXED) != 2)
170 #else
171         if (__sync_add_and_fetch(&lcore_count, 1) != 2)
172 #endif
173                 while(lcore_count != 2)
174                         rte_pause();
175
176         const uint64_t sp_start = rte_rdtsc();
177         for (i = 0; i < iterations; i++)
178                 while (rte_ring_sp_enqueue_bulk(r, burst, size, NULL) == 0)
179                         rte_pause();
180         const uint64_t sp_end = rte_rdtsc();
181
182         const uint64_t mp_start = rte_rdtsc();
183         for (i = 0; i < iterations; i++)
184                 while (rte_ring_mp_enqueue_bulk(r, burst, size, NULL) == 0)
185                         rte_pause();
186         const uint64_t mp_end = rte_rdtsc();
187
188         params->spsc = ((double)(sp_end - sp_start))/(iterations*size);
189         params->mpmc = ((double)(mp_end - mp_start))/(iterations*size);
190         return 0;
191 }
192
193 /*
194  * Function that uses rdtsc to measure timing for ring dequeue. Needs pair
195  * thread running enqueue_bulk function
196  */
197 static int
198 dequeue_bulk(void *p)
199 {
200         const unsigned iter_shift = 23;
201         const unsigned iterations = 1<<iter_shift;
202         struct thread_params *params = p;
203         struct rte_ring *r = params->r;
204         const unsigned size = params->size;
205         unsigned i;
206         void *burst[MAX_BURST] = {0};
207
208 #ifdef RTE_USE_C11_MEM_MODEL
209         if (__atomic_add_fetch(&lcore_count, 1, __ATOMIC_RELAXED) != 2)
210 #else
211         if (__sync_add_and_fetch(&lcore_count, 1) != 2)
212 #endif
213                 while(lcore_count != 2)
214                         rte_pause();
215
216         const uint64_t sc_start = rte_rdtsc();
217         for (i = 0; i < iterations; i++)
218                 while (rte_ring_sc_dequeue_bulk(r, burst, size, NULL) == 0)
219                         rte_pause();
220         const uint64_t sc_end = rte_rdtsc();
221
222         const uint64_t mc_start = rte_rdtsc();
223         for (i = 0; i < iterations; i++)
224                 while (rte_ring_mc_dequeue_bulk(r, burst, size, NULL) == 0)
225                         rte_pause();
226         const uint64_t mc_end = rte_rdtsc();
227
228         params->spsc = ((double)(sc_end - sc_start))/(iterations*size);
229         params->mpmc = ((double)(mc_end - mc_start))/(iterations*size);
230         return 0;
231 }
232
233 /*
234  * Function that calls the enqueue and dequeue bulk functions on pairs of cores.
235  * used to measure ring perf between hyperthreads, cores and sockets.
236  */
237 static void
238 run_on_core_pair(struct lcore_pair *cores, struct rte_ring *r,
239                 lcore_function_t f1, lcore_function_t f2)
240 {
241         struct thread_params param1 = {0}, param2 = {0};
242         unsigned i;
243         for (i = 0; i < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); i++) {
244                 lcore_count = 0;
245                 param1.size = param2.size = bulk_sizes[i];
246                 param1.r = param2.r = r;
247                 if (cores->c1 == rte_get_master_lcore()) {
248                         rte_eal_remote_launch(f2, &param2, cores->c2);
249                         f1(&param1);
250                         rte_eal_wait_lcore(cores->c2);
251                 } else {
252                         rte_eal_remote_launch(f1, &param1, cores->c1);
253                         rte_eal_remote_launch(f2, &param2, cores->c2);
254                         rte_eal_wait_lcore(cores->c1);
255                         rte_eal_wait_lcore(cores->c2);
256                 }
257                 printf("SP/SC bulk enq/dequeue (size: %u): %.2F\n", bulk_sizes[i],
258                                 param1.spsc + param2.spsc);
259                 printf("MP/MC bulk enq/dequeue (size: %u): %.2F\n", bulk_sizes[i],
260                                 param1.mpmc + param2.mpmc);
261         }
262 }
263
264 static rte_atomic32_t synchro;
265 static uint64_t queue_count[RTE_MAX_LCORE];
266
267 #define TIME_MS 100
268
269 static int
270 load_loop_fn(void *p)
271 {
272         uint64_t time_diff = 0;
273         uint64_t begin = 0;
274         uint64_t hz = rte_get_timer_hz();
275         uint64_t lcount = 0;
276         const unsigned int lcore = rte_lcore_id();
277         struct thread_params *params = p;
278         void *burst[MAX_BURST] = {0};
279
280         /* wait synchro for slaves */
281         if (lcore != rte_get_master_lcore())
282                 while (rte_atomic32_read(&synchro) == 0)
283                         rte_pause();
284
285         begin = rte_get_timer_cycles();
286         while (time_diff < hz * TIME_MS / 1000) {
287                 rte_ring_mp_enqueue_bulk(params->r, burst, params->size, NULL);
288                 rte_ring_mc_dequeue_bulk(params->r, burst, params->size, NULL);
289                 lcount++;
290                 time_diff = rte_get_timer_cycles() - begin;
291         }
292         queue_count[lcore] = lcount;
293         return 0;
294 }
295
296 static int
297 run_on_all_cores(struct rte_ring *r)
298 {
299         uint64_t total = 0;
300         struct thread_params param;
301         unsigned int i, c;
302
303         memset(&param, 0, sizeof(struct thread_params));
304         for (i = 0; i < RTE_DIM(bulk_sizes); i++) {
305                 printf("\nBulk enq/dequeue count on size %u\n", bulk_sizes[i]);
306                 param.size = bulk_sizes[i];
307                 param.r = r;
308
309                 /* clear synchro and start slaves */
310                 rte_atomic32_set(&synchro, 0);
311                 if (rte_eal_mp_remote_launch(load_loop_fn, &param,
312                         SKIP_MASTER) < 0)
313                         return -1;
314
315                 /* start synchro and launch test on master */
316                 rte_atomic32_set(&synchro, 1);
317                 load_loop_fn(&param);
318
319                 rte_eal_mp_wait_lcore();
320
321                 RTE_LCORE_FOREACH(c) {
322                         printf("Core [%u] count = %"PRIu64"\n",
323                                         c, queue_count[c]);
324                         total += queue_count[c];
325                 }
326
327                 printf("Total count (size: %u): %"PRIu64"\n",
328                                 bulk_sizes[i], total);
329         }
330
331         return 0;
332 }
333
334 /*
335  * Test function that determines how long an enqueue + dequeue of a single item
336  * takes on a single lcore. Result is for comparison with the bulk enq+deq.
337  */
338 static void
339 test_single_enqueue_dequeue(struct rte_ring *r)
340 {
341         const unsigned iter_shift = 24;
342         const unsigned iterations = 1<<iter_shift;
343         unsigned i = 0;
344         void *burst = NULL;
345
346         const uint64_t sc_start = rte_rdtsc();
347         for (i = 0; i < iterations; i++) {
348                 rte_ring_sp_enqueue(r, burst);
349                 rte_ring_sc_dequeue(r, &burst);
350         }
351         const uint64_t sc_end = rte_rdtsc();
352
353         const uint64_t mc_start = rte_rdtsc();
354         for (i = 0; i < iterations; i++) {
355                 rte_ring_mp_enqueue(r, burst);
356                 rte_ring_mc_dequeue(r, &burst);
357         }
358         const uint64_t mc_end = rte_rdtsc();
359
360         printf("SP/SC single enq/dequeue: %.2F\n",
361                         ((double)(sc_end-sc_start)) / iterations);
362         printf("MP/MC single enq/dequeue: %.2F\n",
363                         ((double)(mc_end-mc_start)) / iterations);
364 }
365
366 /*
367  * Test that does both enqueue and dequeue on a core using the burst() API calls
368  * instead of the bulk() calls used in other tests. Results should be the same
369  * as for the bulk function called on a single lcore.
370  */
371 static void
372 test_burst_enqueue_dequeue(struct rte_ring *r)
373 {
374         const unsigned iter_shift = 23;
375         const unsigned iterations = 1<<iter_shift;
376         unsigned sz, i = 0;
377         void *burst[MAX_BURST] = {0};
378
379         for (sz = 0; sz < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); sz++) {
380                 const uint64_t sc_start = rte_rdtsc();
381                 for (i = 0; i < iterations; i++) {
382                         rte_ring_sp_enqueue_burst(r, burst,
383                                         bulk_sizes[sz], NULL);
384                         rte_ring_sc_dequeue_burst(r, burst,
385                                         bulk_sizes[sz], NULL);
386                 }
387                 const uint64_t sc_end = rte_rdtsc();
388
389                 const uint64_t mc_start = rte_rdtsc();
390                 for (i = 0; i < iterations; i++) {
391                         rte_ring_mp_enqueue_burst(r, burst,
392                                         bulk_sizes[sz], NULL);
393                         rte_ring_mc_dequeue_burst(r, burst,
394                                         bulk_sizes[sz], NULL);
395                 }
396                 const uint64_t mc_end = rte_rdtsc();
397
398                 double mc_avg = ((double)(mc_end-mc_start) / iterations) /
399                                         bulk_sizes[sz];
400                 double sc_avg = ((double)(sc_end-sc_start) / iterations) /
401                                         bulk_sizes[sz];
402
403                 printf("SP/SC burst enq/dequeue (size: %u): %.2F\n",
404                                 bulk_sizes[sz], sc_avg);
405                 printf("MP/MC burst enq/dequeue (size: %u): %.2F\n",
406                                 bulk_sizes[sz], mc_avg);
407         }
408 }
409
410 /* Times enqueue and dequeue on a single lcore */
411 static void
412 test_bulk_enqueue_dequeue(struct rte_ring *r)
413 {
414         const unsigned iter_shift = 23;
415         const unsigned iterations = 1<<iter_shift;
416         unsigned sz, i = 0;
417         void *burst[MAX_BURST] = {0};
418
419         for (sz = 0; sz < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); sz++) {
420                 const uint64_t sc_start = rte_rdtsc();
421                 for (i = 0; i < iterations; i++) {
422                         rte_ring_sp_enqueue_bulk(r, burst,
423                                         bulk_sizes[sz], NULL);
424                         rte_ring_sc_dequeue_bulk(r, burst,
425                                         bulk_sizes[sz], NULL);
426                 }
427                 const uint64_t sc_end = rte_rdtsc();
428
429                 const uint64_t mc_start = rte_rdtsc();
430                 for (i = 0; i < iterations; i++) {
431                         rte_ring_mp_enqueue_bulk(r, burst,
432                                         bulk_sizes[sz], NULL);
433                         rte_ring_mc_dequeue_bulk(r, burst,
434                                         bulk_sizes[sz], NULL);
435                 }
436                 const uint64_t mc_end = rte_rdtsc();
437
438                 double sc_avg = ((double)(sc_end-sc_start) /
439                                 (iterations * bulk_sizes[sz]));
440                 double mc_avg = ((double)(mc_end-mc_start) /
441                                 (iterations * bulk_sizes[sz]));
442
443                 printf("SP/SC bulk enq/dequeue (size: %u): %.2F\n", bulk_sizes[sz],
444                                 sc_avg);
445                 printf("MP/MC bulk enq/dequeue (size: %u): %.2F\n", bulk_sizes[sz],
446                                 mc_avg);
447         }
448 }
449
450 static int
451 test_ring_perf(void)
452 {
453         struct lcore_pair cores;
454         struct rte_ring *r = NULL;
455
456         r = rte_ring_create(RING_NAME, RING_SIZE, rte_socket_id(), 0);
457         if (r == NULL)
458                 return -1;
459
460         printf("### Testing single element and burst enq/deq ###\n");
461         test_single_enqueue_dequeue(r);
462         test_burst_enqueue_dequeue(r);
463
464         printf("\n### Testing empty dequeue ###\n");
465         test_empty_dequeue(r);
466
467         printf("\n### Testing using a single lcore ###\n");
468         test_bulk_enqueue_dequeue(r);
469
470         if (get_two_hyperthreads(&cores) == 0) {
471                 printf("\n### Testing using two hyperthreads ###\n");
472                 run_on_core_pair(&cores, r, enqueue_bulk, dequeue_bulk);
473         }
474         if (get_two_cores(&cores) == 0) {
475                 printf("\n### Testing using two physical cores ###\n");
476                 run_on_core_pair(&cores, r, enqueue_bulk, dequeue_bulk);
477         }
478         if (get_two_sockets(&cores) == 0) {
479                 printf("\n### Testing using two NUMA nodes ###\n");
480                 run_on_core_pair(&cores, r, enqueue_bulk, dequeue_bulk);
481         }
482
483         printf("\n### Testing using all slave nodes ###\n");
484         run_on_all_cores(r);
485
486         rte_ring_free(r);
487         return 0;
488 }
489
490 REGISTER_TEST_COMMAND(ring_perf_autotest, test_ring_perf);