2825a9dce617d3101be85da632e3f235a72a4656
[dpdk.git] / app / test / test_ring_stress_impl.h
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2020 Intel Corporation
3  */
4
5 #include "test_ring_stress.h"
6
7 /**
8  * Stress test for ring enqueue/dequeue operations.
9  * Performs the following pattern on each worker:
10  * dequeue/read-write data from the dequeued objects/enqueue.
11  * Serves as both functional and performance test of ring
12  * enqueue/dequeue operations under high contention
13  * (for both over committed and non-over committed scenarios).
14  */
15
16 #define RING_NAME       "RING_STRESS"
17 #define BULK_NUM        32
18 #define RING_SIZE       (2 * BULK_NUM * RTE_MAX_LCORE)
19
20 enum {
21         WRK_CMD_STOP,
22         WRK_CMD_RUN,
23 };
24
25 static uint32_t wrk_cmd __rte_cache_aligned = WRK_CMD_STOP;
26
27 /* test run-time in seconds */
28 static const uint32_t run_time = 60;
29 static const uint32_t verbose;
30
31 struct lcore_stat {
32         uint64_t nb_cycle;
33         struct {
34                 uint64_t nb_call;
35                 uint64_t nb_obj;
36                 uint64_t nb_cycle;
37                 uint64_t max_cycle;
38                 uint64_t min_cycle;
39         } op;
40 };
41
42 struct lcore_arg {
43         struct rte_ring *rng;
44         struct lcore_stat stats;
45 } __rte_cache_aligned;
46
47 struct ring_elem {
48         uint32_t cnt[RTE_CACHE_LINE_SIZE / sizeof(uint32_t)];
49 } __rte_cache_aligned;
50
51 /*
52  * redefinable functions
53  */
54 static uint32_t
55 _st_ring_dequeue_bulk(struct rte_ring *r, void **obj, uint32_t n,
56         uint32_t *avail);
57
58 static uint32_t
59 _st_ring_enqueue_bulk(struct rte_ring *r, void * const *obj, uint32_t n,
60         uint32_t *free);
61
62 static int
63 _st_ring_init(struct rte_ring *r, const char *name, uint32_t num);
64
65
66 static void
67 lcore_stat_update(struct lcore_stat *ls, uint64_t call, uint64_t obj,
68         uint64_t tm, int32_t prcs)
69 {
70         ls->op.nb_call += call;
71         ls->op.nb_obj += obj;
72         ls->op.nb_cycle += tm;
73         if (prcs) {
74                 ls->op.max_cycle = RTE_MAX(ls->op.max_cycle, tm);
75                 ls->op.min_cycle = RTE_MIN(ls->op.min_cycle, tm);
76         }
77 }
78
79 static void
80 lcore_op_stat_aggr(struct lcore_stat *ms, const struct lcore_stat *ls)
81 {
82
83         ms->op.nb_call += ls->op.nb_call;
84         ms->op.nb_obj += ls->op.nb_obj;
85         ms->op.nb_cycle += ls->op.nb_cycle;
86         ms->op.max_cycle = RTE_MAX(ms->op.max_cycle, ls->op.max_cycle);
87         ms->op.min_cycle = RTE_MIN(ms->op.min_cycle, ls->op.min_cycle);
88 }
89
90 static void
91 lcore_stat_aggr(struct lcore_stat *ms, const struct lcore_stat *ls)
92 {
93         ms->nb_cycle = RTE_MAX(ms->nb_cycle, ls->nb_cycle);
94         lcore_op_stat_aggr(ms, ls);
95 }
96
97 static void
98 lcore_stat_dump(FILE *f, uint32_t lc, const struct lcore_stat *ls)
99 {
100         long double st;
101
102         st = (long double)rte_get_timer_hz() / US_PER_S;
103
104         if (lc == UINT32_MAX)
105                 fprintf(f, "%s(AGGREGATE)={\n", __func__);
106         else
107                 fprintf(f, "%s(lcore=%u)={\n", __func__, lc);
108
109         fprintf(f, "\tnb_cycle=%" PRIu64 "(%.2Lf usec),\n",
110                 ls->nb_cycle, (long double)ls->nb_cycle / st);
111
112         fprintf(f, "\tDEQ+ENQ={\n");
113
114         fprintf(f, "\t\tnb_call=%" PRIu64 ",\n", ls->op.nb_call);
115         fprintf(f, "\t\tnb_obj=%" PRIu64 ",\n", ls->op.nb_obj);
116         fprintf(f, "\t\tnb_cycle=%" PRIu64 ",\n", ls->op.nb_cycle);
117         fprintf(f, "\t\tobj/call(avg): %.2Lf\n",
118                 (long double)ls->op.nb_obj / ls->op.nb_call);
119         fprintf(f, "\t\tcycles/obj(avg): %.2Lf\n",
120                 (long double)ls->op.nb_cycle / ls->op.nb_obj);
121         fprintf(f, "\t\tcycles/call(avg): %.2Lf\n",
122                 (long double)ls->op.nb_cycle / ls->op.nb_call);
123
124         /* if min/max cycles per call stats was collected */
125         if (ls->op.min_cycle != UINT64_MAX) {
126                 fprintf(f, "\t\tmax cycles/call=%" PRIu64 "(%.2Lf usec),\n",
127                         ls->op.max_cycle,
128                         (long double)ls->op.max_cycle / st);
129                 fprintf(f, "\t\tmin cycles/call=%" PRIu64 "(%.2Lf usec),\n",
130                         ls->op.min_cycle,
131                         (long double)ls->op.min_cycle / st);
132         }
133
134         fprintf(f, "\t},\n");
135         fprintf(f, "};\n");
136 }
137
138 static void
139 fill_ring_elm(struct ring_elem *elm, uint32_t fill)
140 {
141         uint32_t i;
142
143         for (i = 0; i != RTE_DIM(elm->cnt); i++)
144                 elm->cnt[i] = fill;
145 }
146
147 static int32_t
148 check_updt_elem(struct ring_elem *elm[], uint32_t num,
149         const struct ring_elem *check, const struct ring_elem *fill)
150 {
151         uint32_t i;
152
153         static rte_spinlock_t dump_lock;
154
155         for (i = 0; i != num; i++) {
156                 if (memcmp(check, elm[i], sizeof(*check)) != 0) {
157                         rte_spinlock_lock(&dump_lock);
158                         printf("%s(lc=%u, num=%u) failed at %u-th iter, "
159                                 "offending object: %p\n",
160                                 __func__, rte_lcore_id(), num, i, elm[i]);
161                         rte_memdump(stdout, "expected", check, sizeof(*check));
162                         rte_memdump(stdout, "result", elm[i], sizeof(*elm[i]));
163                         rte_spinlock_unlock(&dump_lock);
164                         return -EINVAL;
165                 }
166                 memcpy(elm[i], fill, sizeof(*elm[i]));
167         }
168
169         return 0;
170 }
171
172 static int
173 check_ring_op(uint32_t exp, uint32_t res, uint32_t lc,
174         const char *fname, const char *opname)
175 {
176         if (exp != res) {
177                 printf("%s(lc=%u) failure: %s expected: %u, returned %u\n",
178                         fname, lc, opname, exp, res);
179                 return -ENOSPC;
180         }
181         return 0;
182 }
183
184 static int
185 test_worker(void *arg, const char *fname, int32_t prcs)
186 {
187         int32_t rc;
188         uint32_t lc, n, num;
189         uint64_t cl, tm0, tm1;
190         struct lcore_arg *la;
191         struct ring_elem def_elm, loc_elm;
192         struct ring_elem *obj[2 * BULK_NUM];
193
194         la = arg;
195         lc = rte_lcore_id();
196
197         fill_ring_elm(&def_elm, UINT32_MAX);
198         fill_ring_elm(&loc_elm, lc);
199
200         /* Acquire ordering is not required as the main is not
201          * really releasing any data through 'wrk_cmd' to
202          * the worker.
203          */
204         while (__atomic_load_n(&wrk_cmd, __ATOMIC_RELAXED) != WRK_CMD_RUN)
205                 rte_pause();
206
207         cl = rte_rdtsc_precise();
208
209         do {
210                 /* num in interval [7/8, 11/8] of BULK_NUM */
211                 num = 7 * BULK_NUM / 8 + rte_rand() % (BULK_NUM / 2);
212
213                 /* reset all pointer values */
214                 memset(obj, 0, sizeof(obj));
215
216                 /* dequeue num elems */
217                 tm0 = (prcs != 0) ? rte_rdtsc_precise() : 0;
218                 n = _st_ring_dequeue_bulk(la->rng, (void **)obj, num, NULL);
219                 tm0 = (prcs != 0) ? rte_rdtsc_precise() - tm0 : 0;
220
221                 /* check return value and objects */
222                 rc = check_ring_op(num, n, lc, fname,
223                         RTE_STR(_st_ring_dequeue_bulk));
224                 if (rc == 0)
225                         rc = check_updt_elem(obj, num, &def_elm, &loc_elm);
226                 if (rc != 0)
227                         break;
228
229                 /* enqueue num elems */
230                 rte_compiler_barrier();
231                 rc = check_updt_elem(obj, num, &loc_elm, &def_elm);
232                 if (rc != 0)
233                         break;
234
235                 tm1 = (prcs != 0) ? rte_rdtsc_precise() : 0;
236                 n = _st_ring_enqueue_bulk(la->rng, (void **)obj, num, NULL);
237                 tm1 = (prcs != 0) ? rte_rdtsc_precise() - tm1 : 0;
238
239                 /* check return value */
240                 rc = check_ring_op(num, n, lc, fname,
241                         RTE_STR(_st_ring_enqueue_bulk));
242                 if (rc != 0)
243                         break;
244
245                 lcore_stat_update(&la->stats, 1, num, tm0 + tm1, prcs);
246
247         } while (__atomic_load_n(&wrk_cmd, __ATOMIC_RELAXED) == WRK_CMD_RUN);
248
249         cl = rte_rdtsc_precise() - cl;
250         if (prcs == 0)
251                 lcore_stat_update(&la->stats, 0, 0, cl, 0);
252         la->stats.nb_cycle = cl;
253         return rc;
254 }
255 static int
256 test_worker_prcs(void *arg)
257 {
258         return test_worker(arg, __func__, 1);
259 }
260
261 static int
262 test_worker_avg(void *arg)
263 {
264         return test_worker(arg, __func__, 0);
265 }
266
267 static void
268 mt1_fini(struct rte_ring *rng, void *data)
269 {
270         rte_free(rng);
271         rte_free(data);
272 }
273
274 static int
275 mt1_init(struct rte_ring **rng, void **data, uint32_t num)
276 {
277         int32_t rc;
278         size_t sz;
279         uint32_t i, nr;
280         struct rte_ring *r;
281         struct ring_elem *elm;
282         void *p;
283
284         *rng = NULL;
285         *data = NULL;
286
287         sz = num * sizeof(*elm);
288         elm = rte_zmalloc(NULL, sz, __alignof__(*elm));
289         if (elm == NULL) {
290                 printf("%s: alloc(%zu) for %u elems data failed",
291                         __func__, sz, num);
292                 return -ENOMEM;
293         }
294
295         *data = elm;
296
297         /* alloc ring */
298         nr = 2 * num;
299         sz = rte_ring_get_memsize(nr);
300         r = rte_zmalloc(NULL, sz, __alignof__(*r));
301         if (r == NULL) {
302                 printf("%s: alloc(%zu) for FIFO with %u elems failed",
303                         __func__, sz, nr);
304                 return -ENOMEM;
305         }
306
307         *rng = r;
308
309         rc = _st_ring_init(r, RING_NAME, nr);
310         if (rc != 0) {
311                 printf("%s: _st_ring_init(%p, %u) failed, error: %d(%s)\n",
312                         __func__, r, nr, rc, strerror(-rc));
313                 return rc;
314         }
315
316         for (i = 0; i != num; i++) {
317                 fill_ring_elm(elm + i, UINT32_MAX);
318                 p = elm + i;
319                 if (_st_ring_enqueue_bulk(r, &p, 1, NULL) != 1)
320                         break;
321         }
322
323         if (i != num) {
324                 printf("%s: _st_ring_enqueue(%p, %u) returned %u\n",
325                         __func__, r, num, i);
326                 return -ENOSPC;
327         }
328
329         return 0;
330 }
331
332 static int
333 test_mt1(int (*test)(void *))
334 {
335         int32_t rc;
336         uint32_t lc, mc;
337         struct rte_ring *r;
338         void *data;
339         struct lcore_arg arg[RTE_MAX_LCORE];
340
341         static const struct lcore_stat init_stat = {
342                 .op.min_cycle = UINT64_MAX,
343         };
344
345         rc = mt1_init(&r, &data, RING_SIZE);
346         if (rc != 0) {
347                 mt1_fini(r, data);
348                 return rc;
349         }
350
351         memset(arg, 0, sizeof(arg));
352
353         /* launch on all workers */
354         RTE_LCORE_FOREACH_WORKER(lc) {
355                 arg[lc].rng = r;
356                 arg[lc].stats = init_stat;
357                 rte_eal_remote_launch(test, &arg[lc], lc);
358         }
359
360         /* signal worker to start test */
361         __atomic_store_n(&wrk_cmd, WRK_CMD_RUN, __ATOMIC_RELEASE);
362
363         usleep(run_time * US_PER_S);
364
365         /* signal worker to start test */
366         __atomic_store_n(&wrk_cmd, WRK_CMD_STOP, __ATOMIC_RELEASE);
367
368         /* wait for workers and collect stats. */
369         mc = rte_lcore_id();
370         arg[mc].stats = init_stat;
371
372         rc = 0;
373         RTE_LCORE_FOREACH_WORKER(lc) {
374                 rc |= rte_eal_wait_lcore(lc);
375                 lcore_stat_aggr(&arg[mc].stats, &arg[lc].stats);
376                 if (verbose != 0)
377                         lcore_stat_dump(stdout, lc, &arg[lc].stats);
378         }
379
380         lcore_stat_dump(stdout, UINT32_MAX, &arg[mc].stats);
381         mt1_fini(r, data);
382         return rc;
383 }
384
385 static const struct test_case tests[] = {
386         {
387                 .name = "MT-WRK_ENQ_DEQ-MST_NONE-PRCS",
388                 .func = test_mt1,
389                 .wfunc = test_worker_prcs,
390         },
391         {
392                 .name = "MT-WRK_ENQ_DEQ-MST_NONE-AVG",
393                 .func = test_mt1,
394                 .wfunc = test_worker_avg,
395         },
396 };