test/service: fix race condition on stopping lcore
[dpdk.git] / app / test / test_rcu_qsbr.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright (c) 2019-2020 Arm Limited
3  */
4
5 #include <stdio.h>
6 #include <string.h>
7 #include <rte_pause.h>
8 #include <rte_rcu_qsbr.h>
9 #include <rte_hash.h>
10 #include <rte_hash_crc.h>
11 #include <rte_malloc.h>
12 #include <rte_cycles.h>
13 #include <rte_random.h>
14 #include <unistd.h>
15
16 #include "test.h"
17
18 /* Check condition and return an error if true. */
19 #define TEST_RCU_QSBR_RETURN_IF_ERROR(cond, str, ...) \
20 do { \
21         if (cond) { \
22                 printf("ERROR file %s, line %d: " str "\n", __FILE__, \
23                         __LINE__, ##__VA_ARGS__); \
24                 return -1; \
25         } \
26 } while (0)
27
28 /* Check condition and go to label if true. */
29 #define TEST_RCU_QSBR_GOTO_IF_ERROR(label, cond, str, ...) \
30 do { \
31         if (cond) { \
32                 printf("ERROR file %s, line %d: " str "\n", __FILE__, \
33                         __LINE__, ##__VA_ARGS__); \
34                 goto label; \
35         } \
36 } while (0)
37
38 /* Make sure that this has the same value as __RTE_QSBR_CNT_INIT */
39 #define TEST_RCU_QSBR_CNT_INIT 1
40
41 static uint16_t enabled_core_ids[RTE_MAX_LCORE];
42 static unsigned int num_cores;
43
44 static uint32_t *keys;
45 #define TOTAL_ENTRY (1024 * 8)
46 #define COUNTER_VALUE 4096
47 static uint32_t *hash_data[RTE_MAX_LCORE][TOTAL_ENTRY];
48 static uint8_t writer_done;
49 static uint8_t cb_failed;
50
51 static struct rte_rcu_qsbr *t[RTE_MAX_LCORE];
52 static struct rte_hash *h[RTE_MAX_LCORE];
53 static char hash_name[RTE_MAX_LCORE][8];
54
55 struct test_rcu_thread_info {
56         /* Index in RCU array */
57         int ir;
58         /* Index in hash array */
59         int ih;
60         /* lcore IDs registered on the RCU variable */
61         uint16_t r_core_ids[2];
62 };
63 static struct test_rcu_thread_info thread_info[RTE_MAX_LCORE/4];
64
65 static int
66 alloc_rcu(void)
67 {
68         int i;
69         size_t sz;
70
71         sz = rte_rcu_qsbr_get_memsize(RTE_MAX_LCORE);
72
73         for (i = 0; i < RTE_MAX_LCORE; i++)
74                 t[i] = (struct rte_rcu_qsbr *)rte_zmalloc(NULL, sz,
75                                                 RTE_CACHE_LINE_SIZE);
76
77         return 0;
78 }
79
80 static int
81 free_rcu(void)
82 {
83         int i;
84
85         for (i = 0; i < RTE_MAX_LCORE; i++)
86                 rte_free(t[i]);
87
88         return 0;
89 }
90
91 /*
92  * rte_rcu_qsbr_thread_register: Add a reader thread, to the list of threads
93  * reporting their quiescent state on a QS variable.
94  */
95 static int
96 test_rcu_qsbr_get_memsize(void)
97 {
98         size_t sz;
99
100         printf("\nTest rte_rcu_qsbr_thread_register()\n");
101
102         sz = rte_rcu_qsbr_get_memsize(0);
103         TEST_RCU_QSBR_RETURN_IF_ERROR((sz != 1), "Get Memsize for 0 threads");
104
105         sz = rte_rcu_qsbr_get_memsize(128);
106         /* For 128 threads,
107          * for machines with cache line size of 64B - 8384
108          * for machines with cache line size of 128 - 16768
109          */
110         if (RTE_CACHE_LINE_SIZE == 64)
111                 TEST_RCU_QSBR_RETURN_IF_ERROR((sz != 8384),
112                         "Get Memsize for 128 threads");
113         else if (RTE_CACHE_LINE_SIZE == 128)
114                 TEST_RCU_QSBR_RETURN_IF_ERROR((sz != 16768),
115                         "Get Memsize for 128 threads");
116
117         return 0;
118 }
119
120 /*
121  * rte_rcu_qsbr_init: Initialize a QSBR variable.
122  */
123 static int
124 test_rcu_qsbr_init(void)
125 {
126         int r;
127
128         printf("\nTest rte_rcu_qsbr_init()\n");
129
130         r = rte_rcu_qsbr_init(NULL, RTE_MAX_LCORE);
131         TEST_RCU_QSBR_RETURN_IF_ERROR((r != 1), "NULL variable");
132
133         return 0;
134 }
135
136 /*
137  * rte_rcu_qsbr_thread_register: Add a reader thread, to the list of threads
138  * reporting their quiescent state on a QS variable.
139  */
140 static int
141 test_rcu_qsbr_thread_register(void)
142 {
143         int ret;
144
145         printf("\nTest rte_rcu_qsbr_thread_register()\n");
146
147         ret = rte_rcu_qsbr_thread_register(NULL, enabled_core_ids[0]);
148         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "NULL variable check");
149
150         ret = rte_rcu_qsbr_thread_register(NULL, 100000);
151         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0),
152                 "NULL variable, invalid thread id");
153
154         rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
155
156         /* Register valid thread id */
157         ret = rte_rcu_qsbr_thread_register(t[0], enabled_core_ids[0]);
158         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 1), "Valid thread id");
159
160         /* Re-registering should not return error */
161         ret = rte_rcu_qsbr_thread_register(t[0], enabled_core_ids[0]);
162         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 1),
163                 "Already registered thread id");
164
165         /* Register valid thread id - max allowed thread id */
166         ret = rte_rcu_qsbr_thread_register(t[0], RTE_MAX_LCORE - 1);
167         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 1), "Max thread id");
168
169         ret = rte_rcu_qsbr_thread_register(t[0], 100000);
170         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0),
171                 "NULL variable, invalid thread id");
172
173         return 0;
174 }
175
176 /*
177  * rte_rcu_qsbr_thread_unregister: Remove a reader thread, from the list of
178  * threads reporting their quiescent state on a QS variable.
179  */
180 static int
181 test_rcu_qsbr_thread_unregister(void)
182 {
183         unsigned int num_threads[3] = {1, RTE_MAX_LCORE, 1};
184         unsigned int i, j;
185         unsigned int skip_thread_id;
186         uint64_t token;
187         int ret;
188
189         printf("\nTest rte_rcu_qsbr_thread_unregister()\n");
190
191         ret = rte_rcu_qsbr_thread_unregister(NULL, enabled_core_ids[0]);
192         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "NULL variable check");
193
194         ret = rte_rcu_qsbr_thread_unregister(NULL, 100000);
195         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0),
196                 "NULL variable, invalid thread id");
197
198         rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
199
200         rte_rcu_qsbr_thread_register(t[0], enabled_core_ids[0]);
201
202         ret = rte_rcu_qsbr_thread_unregister(t[0], 100000);
203         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0),
204                 "NULL variable, invalid thread id");
205
206         /* Find first disabled core */
207         for (i = 0; i < RTE_MAX_LCORE; i++) {
208                 if (enabled_core_ids[i] == 0)
209                         break;
210         }
211         /* Test with disabled lcore */
212         ret = rte_rcu_qsbr_thread_unregister(t[0], i);
213         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 1),
214                 "disabled thread id");
215         /* Unregister already unregistered core */
216         ret = rte_rcu_qsbr_thread_unregister(t[0], i);
217         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 1),
218                 "Already unregistered core");
219
220         /* Test with enabled lcore */
221         ret = rte_rcu_qsbr_thread_unregister(t[0], enabled_core_ids[0]);
222         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 1),
223                 "enabled thread id");
224         /* Unregister already unregistered core */
225         ret = rte_rcu_qsbr_thread_unregister(t[0], enabled_core_ids[0]);
226         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 1),
227                 "Already unregistered core");
228
229         /*
230          * Test with different thread_ids:
231          * 1 - thread_id = 0
232          * 2 - All possible thread_ids, from 0 to RTE_MAX_LCORE
233          * 3 - thread_id = RTE_MAX_LCORE - 1
234          */
235         for (j = 0; j < 3; j++) {
236                 rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
237
238                 for (i = 0; i < num_threads[j]; i++)
239                         rte_rcu_qsbr_thread_register(t[0],
240                                 (j == 2) ? (RTE_MAX_LCORE - 1) : i);
241
242                 token = rte_rcu_qsbr_start(t[0]);
243                 TEST_RCU_QSBR_RETURN_IF_ERROR(
244                         (token != (TEST_RCU_QSBR_CNT_INIT + 1)), "QSBR Start");
245                 skip_thread_id = rte_rand() % RTE_MAX_LCORE;
246                 /* Update quiescent state counter */
247                 for (i = 0; i < num_threads[j]; i++) {
248                         /* Skip one update */
249                         if ((j == 1) && (i == skip_thread_id))
250                                 continue;
251                         rte_rcu_qsbr_quiescent(t[0],
252                                 (j == 2) ? (RTE_MAX_LCORE - 1) : i);
253                 }
254
255                 if (j == 1) {
256                         /* Validate the updates */
257                         ret = rte_rcu_qsbr_check(t[0], token, false);
258                         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0),
259                                                 "Non-blocking QSBR check");
260                         /* Update the previously skipped thread */
261                         rte_rcu_qsbr_quiescent(t[0], skip_thread_id);
262                 }
263
264                 /* Validate the updates */
265                 ret = rte_rcu_qsbr_check(t[0], token, false);
266                 TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0),
267                                                 "Non-blocking QSBR check");
268
269                 for (i = 0; i < num_threads[j]; i++)
270                         rte_rcu_qsbr_thread_unregister(t[0],
271                                 (j == 2) ? (RTE_MAX_LCORE - 1) : i);
272
273                 /* Check with no thread registered */
274                 ret = rte_rcu_qsbr_check(t[0], token, true);
275                 TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0),
276                                                 "Blocking QSBR check");
277         }
278         return 0;
279 }
280
281 /*
282  * rte_rcu_qsbr_start: Ask the worker threads to report the quiescent state
283  * status.
284  */
285 static int
286 test_rcu_qsbr_start(void)
287 {
288         uint64_t token;
289         int i;
290
291         printf("\nTest rte_rcu_qsbr_start()\n");
292
293         rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
294
295         for (i = 0; i < 3; i++)
296                 rte_rcu_qsbr_thread_register(t[0], enabled_core_ids[i]);
297
298         token = rte_rcu_qsbr_start(t[0]);
299         TEST_RCU_QSBR_RETURN_IF_ERROR(
300                 (token != (TEST_RCU_QSBR_CNT_INIT + 1)), "QSBR Start");
301         return 0;
302 }
303
304 static int
305 test_rcu_qsbr_check_reader(void *arg)
306 {
307         struct rte_rcu_qsbr *temp;
308         uint8_t read_type = (uint8_t)((uintptr_t)arg);
309
310         temp = t[read_type];
311
312         /* Update quiescent state counter */
313         rte_rcu_qsbr_quiescent(temp, enabled_core_ids[0]);
314         rte_rcu_qsbr_quiescent(temp, enabled_core_ids[1]);
315         rte_rcu_qsbr_thread_unregister(temp, enabled_core_ids[2]);
316         rte_rcu_qsbr_quiescent(temp, enabled_core_ids[3]);
317         return 0;
318 }
319
320 /*
321  * rte_rcu_qsbr_check: Checks if all the worker threads have entered the queis-
322  * cent state 'n' number of times. 'n' is provided in rte_rcu_qsbr_start API.
323  */
324 static int
325 test_rcu_qsbr_check(void)
326 {
327         int i, ret;
328         uint64_t token;
329
330         printf("\nTest rte_rcu_qsbr_check()\n");
331
332         rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
333
334         token = rte_rcu_qsbr_start(t[0]);
335         TEST_RCU_QSBR_RETURN_IF_ERROR(
336                 (token != (TEST_RCU_QSBR_CNT_INIT + 1)), "QSBR Start");
337
338
339         ret = rte_rcu_qsbr_check(t[0], 0, false);
340         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "Token = 0");
341
342         ret = rte_rcu_qsbr_check(t[0], token, true);
343         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "Blocking QSBR check");
344
345         for (i = 0; i < 3; i++)
346                 rte_rcu_qsbr_thread_register(t[0], enabled_core_ids[i]);
347
348         ret = rte_rcu_qsbr_check(t[0], token, false);
349         /* Threads are offline, hence this should pass */
350         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "Non-blocking QSBR check");
351
352         token = rte_rcu_qsbr_start(t[0]);
353         TEST_RCU_QSBR_RETURN_IF_ERROR(
354                 (token != (TEST_RCU_QSBR_CNT_INIT + 2)), "QSBR Start");
355
356         ret = rte_rcu_qsbr_check(t[0], token, false);
357         /* Threads are offline, hence this should pass */
358         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "Non-blocking QSBR check");
359
360         for (i = 0; i < 3; i++)
361                 rte_rcu_qsbr_thread_unregister(t[0], enabled_core_ids[i]);
362
363         ret = rte_rcu_qsbr_check(t[0], token, true);
364         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "Blocking QSBR check");
365
366         rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
367
368         for (i = 0; i < 4; i++)
369                 rte_rcu_qsbr_thread_register(t[0], enabled_core_ids[i]);
370
371         token = rte_rcu_qsbr_start(t[0]);
372         TEST_RCU_QSBR_RETURN_IF_ERROR(
373                 (token != (TEST_RCU_QSBR_CNT_INIT + 1)), "QSBR Start");
374
375         rte_eal_remote_launch(test_rcu_qsbr_check_reader, NULL,
376                                                         enabled_core_ids[0]);
377
378         rte_eal_mp_wait_lcore();
379         ret = rte_rcu_qsbr_check(t[0], token, true);
380         TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 1), "Blocking QSBR check");
381
382         return 0;
383 }
384
385 static int
386 test_rcu_qsbr_synchronize_reader(void *arg)
387 {
388         uint32_t lcore_id = rte_lcore_id();
389         (void)arg;
390
391         /* Register and become online */
392         rte_rcu_qsbr_thread_register(t[0], lcore_id);
393         rte_rcu_qsbr_thread_online(t[0], lcore_id);
394
395         while (!writer_done)
396                 rte_rcu_qsbr_quiescent(t[0], lcore_id);
397
398         rte_rcu_qsbr_thread_offline(t[0], lcore_id);
399         rte_rcu_qsbr_thread_unregister(t[0], lcore_id);
400
401         return 0;
402 }
403
404 /*
405  * rte_rcu_qsbr_synchronize: Wait till all the reader threads have entered
406  * the queiscent state.
407  */
408 static int
409 test_rcu_qsbr_synchronize(void)
410 {
411         unsigned int i;
412
413         printf("\nTest rte_rcu_qsbr_synchronize()\n");
414
415         rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
416
417         /* Test if the API returns when there are no threads reporting
418          * QS on the variable.
419          */
420         rte_rcu_qsbr_synchronize(t[0], RTE_QSBR_THRID_INVALID);
421
422         /* Test if the API returns when there are threads registered
423          * but not online.
424          */
425         for (i = 0; i < RTE_MAX_LCORE; i++)
426                 rte_rcu_qsbr_thread_register(t[0], i);
427         rte_rcu_qsbr_synchronize(t[0], RTE_QSBR_THRID_INVALID);
428
429         /* Test if the API returns when the caller is also
430          * reporting the QS status.
431          */
432         rte_rcu_qsbr_thread_online(t[0], 0);
433         rte_rcu_qsbr_synchronize(t[0], 0);
434         rte_rcu_qsbr_thread_offline(t[0], 0);
435
436         /* Check the other boundary */
437         rte_rcu_qsbr_thread_online(t[0], RTE_MAX_LCORE - 1);
438         rte_rcu_qsbr_synchronize(t[0], RTE_MAX_LCORE - 1);
439         rte_rcu_qsbr_thread_offline(t[0], RTE_MAX_LCORE - 1);
440
441         /* Test if the API returns after unregisterng all the threads */
442         for (i = 0; i < RTE_MAX_LCORE; i++)
443                 rte_rcu_qsbr_thread_unregister(t[0], i);
444         rte_rcu_qsbr_synchronize(t[0], RTE_QSBR_THRID_INVALID);
445
446         /* Test if the API returns with the live threads */
447         writer_done = 0;
448         for (i = 0; i < num_cores; i++)
449                 rte_eal_remote_launch(test_rcu_qsbr_synchronize_reader,
450                         NULL, enabled_core_ids[i]);
451         rte_rcu_qsbr_synchronize(t[0], RTE_QSBR_THRID_INVALID);
452         rte_rcu_qsbr_synchronize(t[0], RTE_QSBR_THRID_INVALID);
453         rte_rcu_qsbr_synchronize(t[0], RTE_QSBR_THRID_INVALID);
454         rte_rcu_qsbr_synchronize(t[0], RTE_QSBR_THRID_INVALID);
455         rte_rcu_qsbr_synchronize(t[0], RTE_QSBR_THRID_INVALID);
456
457         writer_done = 1;
458         rte_eal_mp_wait_lcore();
459
460         return 0;
461 }
462
463 /*
464  * rte_rcu_qsbr_thread_online: Add a registered reader thread, to
465  * the list of threads reporting their quiescent state on a QS variable.
466  */
467 static int
468 test_rcu_qsbr_thread_online(void)
469 {
470         int i, ret;
471         uint64_t token;
472
473         printf("Test rte_rcu_qsbr_thread_online()\n");
474
475         rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
476
477         /* Register 2 threads to validate that only the
478          * online thread is waited upon.
479          */
480         rte_rcu_qsbr_thread_register(t[0], enabled_core_ids[0]);
481         rte_rcu_qsbr_thread_register(t[0], enabled_core_ids[1]);
482
483         /* Use qsbr_start to verify that the thread_online API
484          * succeeded.
485          */
486         token = rte_rcu_qsbr_start(t[0]);
487
488         /* Make the thread online */
489         rte_rcu_qsbr_thread_online(t[0], enabled_core_ids[0]);
490
491         /* Check if the thread is online */
492         ret = rte_rcu_qsbr_check(t[0], token, true);
493         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "thread online");
494
495         /* Check if the online thread, can report QS */
496         token = rte_rcu_qsbr_start(t[0]);
497         rte_rcu_qsbr_quiescent(t[0], enabled_core_ids[0]);
498         ret = rte_rcu_qsbr_check(t[0], token, true);
499         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "thread update");
500
501         /* Make all the threads online */
502         rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
503         token = rte_rcu_qsbr_start(t[0]);
504         for (i = 0; i < RTE_MAX_LCORE; i++) {
505                 rte_rcu_qsbr_thread_register(t[0], i);
506                 rte_rcu_qsbr_thread_online(t[0], i);
507         }
508         /* Check if all the threads are online */
509         ret = rte_rcu_qsbr_check(t[0], token, true);
510         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "thread online");
511         /* Check if all the online threads can report QS */
512         token = rte_rcu_qsbr_start(t[0]);
513         for (i = 0; i < RTE_MAX_LCORE; i++)
514                 rte_rcu_qsbr_quiescent(t[0], i);
515         ret = rte_rcu_qsbr_check(t[0], token, true);
516         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "thread update");
517
518         return 0;
519 }
520
521 /*
522  * rte_rcu_qsbr_thread_offline: Remove a registered reader thread, from
523  * the list of threads reporting their quiescent state on a QS variable.
524  */
525 static int
526 test_rcu_qsbr_thread_offline(void)
527 {
528         int i, ret;
529         uint64_t token;
530
531         printf("\nTest rte_rcu_qsbr_thread_offline()\n");
532
533         rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
534
535         rte_rcu_qsbr_thread_register(t[0], enabled_core_ids[0]);
536
537         /* Make the thread offline */
538         rte_rcu_qsbr_thread_offline(t[0], enabled_core_ids[0]);
539
540         /* Use qsbr_start to verify that the thread_offline API
541          * succeeded.
542          */
543         token = rte_rcu_qsbr_start(t[0]);
544         /* Check if the thread is offline */
545         ret = rte_rcu_qsbr_check(t[0], token, true);
546         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "thread offline");
547
548         /* Bring an offline thread online and check if it can
549          * report QS.
550          */
551         rte_rcu_qsbr_thread_online(t[0], enabled_core_ids[0]);
552         /* Check if the online thread, can report QS */
553         token = rte_rcu_qsbr_start(t[0]);
554         rte_rcu_qsbr_quiescent(t[0], enabled_core_ids[0]);
555         ret = rte_rcu_qsbr_check(t[0], token, true);
556         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "offline to online");
557
558         /*
559          * Check a sequence of online/status/offline/status/online/status
560          */
561         rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
562         token = rte_rcu_qsbr_start(t[0]);
563         /* Make the threads online */
564         for (i = 0; i < RTE_MAX_LCORE; i++) {
565                 rte_rcu_qsbr_thread_register(t[0], i);
566                 rte_rcu_qsbr_thread_online(t[0], i);
567         }
568
569         /* Check if all the threads are online */
570         ret = rte_rcu_qsbr_check(t[0], token, true);
571         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "thread online");
572
573         /* Check if all the online threads can report QS */
574         token = rte_rcu_qsbr_start(t[0]);
575         for (i = 0; i < RTE_MAX_LCORE; i++)
576                 rte_rcu_qsbr_quiescent(t[0], i);
577         ret = rte_rcu_qsbr_check(t[0], token, true);
578         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "report QS");
579
580         /* Make all the threads offline */
581         for (i = 0; i < RTE_MAX_LCORE; i++)
582                 rte_rcu_qsbr_thread_offline(t[0], i);
583         /* Make sure these threads are not being waited on */
584         token = rte_rcu_qsbr_start(t[0]);
585         ret = rte_rcu_qsbr_check(t[0], token, true);
586         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "offline QS");
587
588         /* Make the threads online */
589         for (i = 0; i < RTE_MAX_LCORE; i++)
590                 rte_rcu_qsbr_thread_online(t[0], i);
591         /* Check if all the online threads can report QS */
592         token = rte_rcu_qsbr_start(t[0]);
593         for (i = 0; i < RTE_MAX_LCORE; i++)
594                 rte_rcu_qsbr_quiescent(t[0], i);
595         ret = rte_rcu_qsbr_check(t[0], token, true);
596         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "online again");
597
598         return 0;
599 }
600
601 static void
602 test_rcu_qsbr_free_resource1(void *p, void *e, unsigned int n)
603 {
604         if (p != NULL || e != NULL || n != 1) {
605                 printf("%s: Test failed\n", __func__);
606                 cb_failed = 1;
607         }
608 }
609
610 static void
611 test_rcu_qsbr_free_resource2(void *p, void *e, unsigned int n)
612 {
613         if (p != NULL || e == NULL || n != 1) {
614                 printf("%s: Test failed\n", __func__);
615                 cb_failed = 1;
616         }
617 }
618
619 /*
620  * rte_rcu_qsbr_dq_create: create a queue used to store the data structure
621  * elements that can be freed later. This queue is referred to as 'defer queue'.
622  */
623 static int
624 test_rcu_qsbr_dq_create(void)
625 {
626         char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
627         struct rte_rcu_qsbr_dq_parameters params;
628         struct rte_rcu_qsbr_dq *dq;
629
630         printf("\nTest rte_rcu_qsbr_dq_create()\n");
631
632         /* Pass invalid parameters */
633         dq = rte_rcu_qsbr_dq_create(NULL);
634         TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
635
636         memset(&params, 0, sizeof(struct rte_rcu_qsbr_dq_parameters));
637         dq = rte_rcu_qsbr_dq_create(&params);
638         TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
639
640         snprintf(rcu_dq_name, sizeof(rcu_dq_name), "TEST_RCU");
641         params.name = rcu_dq_name;
642         dq = rte_rcu_qsbr_dq_create(&params);
643         TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
644
645         params.free_fn = test_rcu_qsbr_free_resource1;
646         dq = rte_rcu_qsbr_dq_create(&params);
647         TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
648
649         rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
650         params.v = t[0];
651         dq = rte_rcu_qsbr_dq_create(&params);
652         TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
653
654         params.size = 1;
655         dq = rte_rcu_qsbr_dq_create(&params);
656         TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
657
658         params.esize = 3;
659         dq = rte_rcu_qsbr_dq_create(&params);
660         TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
661
662         params.trigger_reclaim_limit = 0;
663         params.max_reclaim_size = 0;
664         dq = rte_rcu_qsbr_dq_create(&params);
665         TEST_RCU_QSBR_RETURN_IF_ERROR((dq != NULL), "dq create invalid params");
666
667         /* Pass all valid parameters */
668         params.esize = 16;
669         params.trigger_reclaim_limit = 0;
670         params.max_reclaim_size = params.size;
671         dq = rte_rcu_qsbr_dq_create(&params);
672         TEST_RCU_QSBR_RETURN_IF_ERROR((dq == NULL), "dq create valid params");
673         rte_rcu_qsbr_dq_delete(dq);
674
675         params.esize = 16;
676         params.flags = RTE_RCU_QSBR_DQ_MT_UNSAFE;
677         dq = rte_rcu_qsbr_dq_create(&params);
678         TEST_RCU_QSBR_RETURN_IF_ERROR((dq == NULL), "dq create valid params");
679         rte_rcu_qsbr_dq_delete(dq);
680
681         return 0;
682 }
683
684 /*
685  * rte_rcu_qsbr_dq_enqueue: enqueue one resource to the defer queue,
686  * to be freed later after at least one grace period is over.
687  */
688 static int
689 test_rcu_qsbr_dq_enqueue(void)
690 {
691         int ret;
692         uint64_t r;
693         char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
694         struct rte_rcu_qsbr_dq_parameters params;
695         struct rte_rcu_qsbr_dq *dq;
696
697         printf("\nTest rte_rcu_qsbr_dq_enqueue()\n");
698
699         /* Create a queue with simple parameters */
700         memset(&params, 0, sizeof(struct rte_rcu_qsbr_dq_parameters));
701         snprintf(rcu_dq_name, sizeof(rcu_dq_name), "TEST_RCU");
702         params.name = rcu_dq_name;
703         params.free_fn = test_rcu_qsbr_free_resource1;
704         rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
705         params.v = t[0];
706         params.size = 1;
707         params.esize = 16;
708         params.trigger_reclaim_limit = 0;
709         params.max_reclaim_size = params.size;
710         dq = rte_rcu_qsbr_dq_create(&params);
711         TEST_RCU_QSBR_RETURN_IF_ERROR((dq == NULL), "dq create valid params");
712
713         /* Pass invalid parameters */
714         ret = rte_rcu_qsbr_dq_enqueue(NULL, NULL);
715         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq enqueue invalid params");
716
717         ret = rte_rcu_qsbr_dq_enqueue(dq, NULL);
718         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq enqueue invalid params");
719
720         ret = rte_rcu_qsbr_dq_enqueue(NULL, &r);
721         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq enqueue invalid params");
722
723         ret = rte_rcu_qsbr_dq_delete(dq);
724         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 1), "dq delete valid params");
725
726         return 0;
727 }
728
729 /*
730  * rte_rcu_qsbr_dq_reclaim: Reclaim resources from the defer queue.
731  */
732 static int
733 test_rcu_qsbr_dq_reclaim(void)
734 {
735         int ret;
736         char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
737         struct rte_rcu_qsbr_dq_parameters params;
738         struct rte_rcu_qsbr_dq *dq;
739
740         printf("\nTest rte_rcu_qsbr_dq_reclaim()\n");
741
742         /* Pass invalid parameters */
743         ret = rte_rcu_qsbr_dq_reclaim(NULL, 10, NULL, NULL, NULL);
744         TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 1), "dq reclaim invalid params");
745
746         /* Pass invalid parameters */
747         memset(&params, 0, sizeof(struct rte_rcu_qsbr_dq_parameters));
748         snprintf(rcu_dq_name, sizeof(rcu_dq_name), "TEST_RCU");
749         params.name = rcu_dq_name;
750         params.free_fn = test_rcu_qsbr_free_resource1;
751         rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
752         params.v = t[0];
753         params.size = 1;
754         params.esize = 3;
755         params.trigger_reclaim_limit = 0;
756         params.max_reclaim_size = params.size;
757         dq = rte_rcu_qsbr_dq_create(&params);
758         ret = rte_rcu_qsbr_dq_reclaim(dq, 0, NULL, NULL, NULL);
759         TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 1), "dq reclaim invalid params");
760
761         return 0;
762 }
763
764 /*
765  * rte_rcu_qsbr_dq_delete: Delete a defer queue.
766  */
767 static int
768 test_rcu_qsbr_dq_delete(void)
769 {
770         int ret;
771         char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
772         struct rte_rcu_qsbr_dq_parameters params;
773         struct rte_rcu_qsbr_dq *dq;
774
775         printf("\nTest rte_rcu_qsbr_dq_delete()\n");
776
777         /* Pass invalid parameters */
778         ret = rte_rcu_qsbr_dq_delete(NULL);
779         TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 0), "dq delete invalid params");
780
781         memset(&params, 0, sizeof(struct rte_rcu_qsbr_dq_parameters));
782         snprintf(rcu_dq_name, sizeof(rcu_dq_name), "TEST_RCU");
783         params.name = rcu_dq_name;
784         params.free_fn = test_rcu_qsbr_free_resource1;
785         rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
786         params.v = t[0];
787         params.size = 1;
788         params.esize = 16;
789         params.trigger_reclaim_limit = 0;
790         params.max_reclaim_size = params.size;
791         dq = rte_rcu_qsbr_dq_create(&params);
792         TEST_RCU_QSBR_RETURN_IF_ERROR((dq == NULL), "dq create valid params");
793         ret = rte_rcu_qsbr_dq_delete(dq);
794         TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 0), "dq delete valid params");
795
796         return 0;
797 }
798
799 /*
800  * rte_rcu_qsbr_dq_enqueue: enqueue one resource to the defer queue,
801  * to be freed later after at least one grace period is over.
802  */
803 static int
804 test_rcu_qsbr_dq_functional(int32_t size, int32_t esize, uint32_t flags)
805 {
806         int i, j, ret;
807         char rcu_dq_name[RTE_RCU_QSBR_DQ_NAMESIZE];
808         struct rte_rcu_qsbr_dq_parameters params;
809         struct rte_rcu_qsbr_dq *dq;
810         uint64_t *e;
811         uint64_t sc = 200;
812         int max_entries;
813
814         printf("\nTest rte_rcu_qsbr_dq_xxx functional tests()\n");
815         printf("Size = %d, esize = %d, flags = 0x%x\n", size, esize, flags);
816
817         e = (uint64_t *)rte_zmalloc(NULL, esize, RTE_CACHE_LINE_SIZE);
818         if (e == NULL)
819                 return 0;
820         cb_failed = 0;
821
822         /* Initialize the RCU variable. No threads are registered */
823         rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
824
825         /* Create a queue with simple parameters */
826         memset(&params, 0, sizeof(struct rte_rcu_qsbr_dq_parameters));
827         snprintf(rcu_dq_name, sizeof(rcu_dq_name), "TEST_RCU");
828         params.name = rcu_dq_name;
829         params.flags = flags;
830         params.free_fn = test_rcu_qsbr_free_resource2;
831         params.v = t[0];
832         params.size = size;
833         params.esize = esize;
834         params.trigger_reclaim_limit = size >> 3;
835         params.max_reclaim_size = (size >> 4)?(size >> 4):1;
836         dq = rte_rcu_qsbr_dq_create(&params);
837         TEST_RCU_QSBR_RETURN_IF_ERROR((dq == NULL), "dq create valid params");
838
839         /* Given the size calculate the maximum number of entries
840          * that can be stored on the defer queue (look at the logic used
841          * in capacity calculation of rte_ring).
842          */
843         max_entries = rte_align32pow2(size + 1) - 1;
844         printf("max_entries = %d\n", max_entries);
845
846         /* Enqueue few counters starting with the value 'sc' */
847         /* The queue size will be rounded up to 2. The enqueue API also
848          * reclaims if the queue size is above certain limit. Since, there
849          * are no threads registered, reclamation succeeds. Hence, it should
850          * be possible to enqueue more than the provided queue size.
851          */
852         for (i = 0; i < 10; i++) {
853                 ret = rte_rcu_qsbr_dq_enqueue(dq, e);
854                 TEST_RCU_QSBR_GOTO_IF_ERROR(end, (ret != 0),
855                         "dq enqueue functional, i = %d", i);
856                 for (j = 0; j < esize/8; j++)
857                         e[j] = sc++;
858         }
859
860         /* Validate that call back function did not return any error */
861         TEST_RCU_QSBR_GOTO_IF_ERROR(end, (cb_failed == 1), "CB failed");
862
863         /* Register a thread on the RCU QSBR variable. Reclamation will not
864          * succeed. It should not be possible to enqueue more than the size
865          * number of resources.
866          */
867         rte_rcu_qsbr_thread_register(t[0], 1);
868         rte_rcu_qsbr_thread_online(t[0], 1);
869
870         for (i = 0; i < max_entries; i++) {
871                 ret = rte_rcu_qsbr_dq_enqueue(dq, e);
872                 TEST_RCU_QSBR_GOTO_IF_ERROR(end, (ret != 0),
873                         "dq enqueue functional, max_entries = %d, i = %d",
874                         max_entries, i);
875                 for (j = 0; j < esize/8; j++)
876                         e[j] = sc++;
877         }
878
879         /* Enqueue fails as queue is full */
880         ret = rte_rcu_qsbr_dq_enqueue(dq, e);
881         TEST_RCU_QSBR_GOTO_IF_ERROR(end, (ret == 0), "defer queue is not full");
882
883         /* Delete should fail as there are elements in defer queue which
884          * cannot be reclaimed.
885          */
886         ret = rte_rcu_qsbr_dq_delete(dq);
887         TEST_RCU_QSBR_RETURN_IF_ERROR((ret == 0), "dq delete valid params");
888
889         /* Report quiescent state, enqueue should succeed */
890         rte_rcu_qsbr_quiescent(t[0], 1);
891         for (i = 0; i < max_entries; i++) {
892                 ret = rte_rcu_qsbr_dq_enqueue(dq, e);
893                 TEST_RCU_QSBR_GOTO_IF_ERROR(end, (ret != 0),
894                         "dq enqueue functional");
895                 for (j = 0; j < esize/8; j++)
896                         e[j] = sc++;
897         }
898
899         /* Validate that call back function did not return any error */
900         TEST_RCU_QSBR_GOTO_IF_ERROR(end, (cb_failed == 1), "CB failed");
901
902         /* Queue is full */
903         ret = rte_rcu_qsbr_dq_enqueue(dq, e);
904         TEST_RCU_QSBR_GOTO_IF_ERROR(end, (ret == 0), "defer queue is not full");
905
906         /* Report quiescent state, delete should succeed */
907         rte_rcu_qsbr_quiescent(t[0], 1);
908         ret = rte_rcu_qsbr_dq_delete(dq);
909         TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 0), "dq delete valid params");
910
911         rte_free(e);
912
913         /* Validate that call back function did not return any error */
914         TEST_RCU_QSBR_RETURN_IF_ERROR((cb_failed == 1), "CB failed");
915
916         return 0;
917
918 end:
919         rte_free(e);
920         ret = rte_rcu_qsbr_dq_delete(dq);
921         TEST_RCU_QSBR_RETURN_IF_ERROR((ret != 0), "dq delete valid params");
922         return -1;
923 }
924
925 /*
926  * rte_rcu_qsbr_dump: Dump status of a single QS variable to a file
927  */
928 static int
929 test_rcu_qsbr_dump(void)
930 {
931         int i;
932
933         printf("\nTest rte_rcu_qsbr_dump()\n");
934
935         /* Negative tests */
936         rte_rcu_qsbr_dump(NULL, t[0]);
937         rte_rcu_qsbr_dump(stdout, NULL);
938         rte_rcu_qsbr_dump(NULL, NULL);
939
940         rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
941         rte_rcu_qsbr_init(t[1], RTE_MAX_LCORE);
942
943         /* QS variable with 0 core mask */
944         rte_rcu_qsbr_dump(stdout, t[0]);
945
946         rte_rcu_qsbr_thread_register(t[0], enabled_core_ids[0]);
947
948         for (i = 1; i < 3; i++)
949                 rte_rcu_qsbr_thread_register(t[1], enabled_core_ids[i]);
950
951         rte_rcu_qsbr_dump(stdout, t[0]);
952         rte_rcu_qsbr_dump(stdout, t[1]);
953         printf("\n");
954         return 0;
955 }
956
957 static int
958 test_rcu_qsbr_reader(void *arg)
959 {
960         struct rte_rcu_qsbr *temp;
961         struct rte_hash *hash = NULL;
962         int i;
963         uint32_t lcore_id = rte_lcore_id();
964         struct test_rcu_thread_info *ti;
965         uint32_t *pdata;
966
967         ti = (struct test_rcu_thread_info *)arg;
968         temp = t[ti->ir];
969         hash = h[ti->ih];
970
971         do {
972                 rte_rcu_qsbr_thread_register(temp, lcore_id);
973                 rte_rcu_qsbr_thread_online(temp, lcore_id);
974                 for (i = 0; i < TOTAL_ENTRY; i++) {
975                         rte_rcu_qsbr_lock(temp, lcore_id);
976                         if (rte_hash_lookup_data(hash, keys+i,
977                                         (void **)&pdata) != -ENOENT) {
978                                 pdata[lcore_id] = 0;
979                                 while (pdata[lcore_id] < COUNTER_VALUE)
980                                         pdata[lcore_id]++;
981                         }
982                         rte_rcu_qsbr_unlock(temp, lcore_id);
983                 }
984                 /* Update quiescent state counter */
985                 rte_rcu_qsbr_quiescent(temp, lcore_id);
986                 rte_rcu_qsbr_thread_offline(temp, lcore_id);
987                 rte_rcu_qsbr_thread_unregister(temp, lcore_id);
988         } while (!writer_done);
989
990         return 0;
991 }
992
993 static int
994 test_rcu_qsbr_writer(void *arg)
995 {
996         uint64_t token;
997         int32_t i, pos, del;
998         uint32_t c;
999         struct rte_rcu_qsbr *temp;
1000         struct rte_hash *hash = NULL;
1001         struct test_rcu_thread_info *ti;
1002
1003         ti = (struct test_rcu_thread_info *)arg;
1004         temp = t[ti->ir];
1005         hash = h[ti->ih];
1006
1007         /* Delete element from the shared data structure */
1008         del = rte_lcore_id() % TOTAL_ENTRY;
1009         pos = rte_hash_del_key(hash, keys + del);
1010         if (pos < 0) {
1011                 printf("Delete key failed #%d\n", keys[del]);
1012                 return -1;
1013         }
1014         /* Start the quiescent state query process */
1015         token = rte_rcu_qsbr_start(temp);
1016         /* Check the quiescent state status */
1017         rte_rcu_qsbr_check(temp, token, true);
1018         for (i = 0; i < 2; i++) {
1019                 c = hash_data[ti->ih][del][ti->r_core_ids[i]];
1020                 if (c != COUNTER_VALUE && c != 0) {
1021                         printf("Reader lcore id %u did not complete = %u\t",
1022                                 rte_lcore_id(), c);
1023                         return -1;
1024                 }
1025         }
1026
1027         if (rte_hash_free_key_with_position(hash, pos) < 0) {
1028                 printf("Failed to free the key #%d\n", keys[del]);
1029                 return -1;
1030         }
1031         rte_free(hash_data[ti->ih][del]);
1032         hash_data[ti->ih][del] = NULL;
1033
1034         return 0;
1035 }
1036
1037 static struct rte_hash *
1038 init_hash(int hash_id)
1039 {
1040         int i;
1041         struct rte_hash *h = NULL;
1042
1043         sprintf(hash_name[hash_id], "hash%d", hash_id);
1044         struct rte_hash_parameters hash_params = {
1045                 .entries = TOTAL_ENTRY,
1046                 .key_len = sizeof(uint32_t),
1047                 .hash_func_init_val = 0,
1048                 .socket_id = rte_socket_id(),
1049                 .hash_func = rte_hash_crc,
1050                 .extra_flag =
1051                         RTE_HASH_EXTRA_FLAGS_RW_CONCURRENCY_LF,
1052                 .name = hash_name[hash_id],
1053         };
1054
1055         h = rte_hash_create(&hash_params);
1056         if (h == NULL) {
1057                 printf("Hash create Failed\n");
1058                 return NULL;
1059         }
1060
1061         for (i = 0; i < TOTAL_ENTRY; i++) {
1062                 hash_data[hash_id][i] =
1063                         rte_zmalloc(NULL, sizeof(uint32_t) * RTE_MAX_LCORE, 0);
1064                 if (hash_data[hash_id][i] == NULL) {
1065                         printf("No memory\n");
1066                         return NULL;
1067                 }
1068         }
1069         keys = rte_malloc(NULL, sizeof(uint32_t) * TOTAL_ENTRY, 0);
1070         if (keys == NULL) {
1071                 printf("No memory\n");
1072                 return NULL;
1073         }
1074
1075         for (i = 0; i < TOTAL_ENTRY; i++)
1076                 keys[i] = i;
1077
1078         for (i = 0; i < TOTAL_ENTRY; i++) {
1079                 if (rte_hash_add_key_data(h, keys + i,
1080                                 (void *)((uintptr_t)hash_data[hash_id][i]))
1081                                 < 0) {
1082                         printf("Hash key add Failed #%d\n", i);
1083                         return NULL;
1084                 }
1085         }
1086         return h;
1087 }
1088
1089 /*
1090  * Functional test:
1091  * Single writer, Single QS variable, simultaneous QSBR Queries
1092  */
1093 static int
1094 test_rcu_qsbr_sw_sv_3qs(void)
1095 {
1096         uint64_t token[3];
1097         uint32_t c;
1098         int i;
1099         int32_t pos[3];
1100
1101         writer_done = 0;
1102
1103         printf("Test: 1 writer, 1 QSBR variable, simultaneous QSBR queries\n");
1104
1105         rte_rcu_qsbr_init(t[0], RTE_MAX_LCORE);
1106
1107         /* Shared data structure created */
1108         h[0] = init_hash(0);
1109         if (h[0] == NULL) {
1110                 printf("Hash init failed\n");
1111                 goto error;
1112         }
1113
1114         /* No need to fill the registered core IDs as the writer
1115          * thread is not launched.
1116          */
1117         thread_info[0].ir = 0;
1118         thread_info[0].ih = 0;
1119
1120         /* Reader threads are launched */
1121         for (i = 0; i < 4; i++)
1122                 rte_eal_remote_launch(test_rcu_qsbr_reader, &thread_info[0],
1123                                         enabled_core_ids[i]);
1124
1125         /* Delete element from the shared data structure */
1126         pos[0] = rte_hash_del_key(h[0], keys + 0);
1127         if (pos[0] < 0) {
1128                 printf("Delete key failed #%d\n", keys[0]);
1129                 goto error;
1130         }
1131         /* Start the quiescent state query process */
1132         token[0] = rte_rcu_qsbr_start(t[0]);
1133
1134         /* Delete element from the shared data structure */
1135         pos[1] = rte_hash_del_key(h[0], keys + 3);
1136         if (pos[1] < 0) {
1137                 printf("Delete key failed #%d\n", keys[3]);
1138                 goto error;
1139         }
1140         /* Start the quiescent state query process */
1141         token[1] = rte_rcu_qsbr_start(t[0]);
1142
1143         /* Delete element from the shared data structure */
1144         pos[2] = rte_hash_del_key(h[0], keys + 6);
1145         if (pos[2] < 0) {
1146                 printf("Delete key failed #%d\n", keys[6]);
1147                 goto error;
1148         }
1149         /* Start the quiescent state query process */
1150         token[2] = rte_rcu_qsbr_start(t[0]);
1151
1152         /* Check the quiescent state status */
1153         rte_rcu_qsbr_check(t[0], token[0], true);
1154         for (i = 0; i < 4; i++) {
1155                 c = hash_data[0][0][enabled_core_ids[i]];
1156                 if (c != COUNTER_VALUE && c != 0) {
1157                         printf("Reader lcore %d did not complete #0 = %d\n",
1158                                 enabled_core_ids[i], c);
1159                         goto error;
1160                 }
1161         }
1162
1163         if (rte_hash_free_key_with_position(h[0], pos[0]) < 0) {
1164                 printf("Failed to free the key #%d\n", keys[0]);
1165                 goto error;
1166         }
1167         rte_free(hash_data[0][0]);
1168         hash_data[0][0] = NULL;
1169
1170         /* Check the quiescent state status */
1171         rte_rcu_qsbr_check(t[0], token[1], true);
1172         for (i = 0; i < 4; i++) {
1173                 c = hash_data[0][3][enabled_core_ids[i]];
1174                 if (c != COUNTER_VALUE && c != 0) {
1175                         printf("Reader lcore %d did not complete #3 = %d\n",
1176                                 enabled_core_ids[i], c);
1177                         goto error;
1178                 }
1179         }
1180
1181         if (rte_hash_free_key_with_position(h[0], pos[1]) < 0) {
1182                 printf("Failed to free the key #%d\n", keys[3]);
1183                 goto error;
1184         }
1185         rte_free(hash_data[0][3]);
1186         hash_data[0][3] = NULL;
1187
1188         /* Check the quiescent state status */
1189         rte_rcu_qsbr_check(t[0], token[2], true);
1190         for (i = 0; i < 4; i++) {
1191                 c = hash_data[0][6][enabled_core_ids[i]];
1192                 if (c != COUNTER_VALUE && c != 0) {
1193                         printf("Reader lcore %d did not complete #6 = %d\n",
1194                                 enabled_core_ids[i], c);
1195                         goto error;
1196                 }
1197         }
1198
1199         if (rte_hash_free_key_with_position(h[0], pos[2]) < 0) {
1200                 printf("Failed to free the key #%d\n", keys[6]);
1201                 goto error;
1202         }
1203         rte_free(hash_data[0][6]);
1204         hash_data[0][6] = NULL;
1205
1206         writer_done = 1;
1207
1208         /* Wait and check return value from reader threads */
1209         for (i = 0; i < 4; i++)
1210                 if (rte_eal_wait_lcore(enabled_core_ids[i]) < 0)
1211                         goto error;
1212         rte_hash_free(h[0]);
1213         rte_free(keys);
1214
1215         return 0;
1216
1217 error:
1218         writer_done = 1;
1219         /* Wait until all readers have exited */
1220         rte_eal_mp_wait_lcore();
1221
1222         rte_hash_free(h[0]);
1223         rte_free(keys);
1224         for (i = 0; i < TOTAL_ENTRY; i++)
1225                 rte_free(hash_data[0][i]);
1226
1227         return -1;
1228 }
1229
1230 /*
1231  * Multi writer, Multiple QS variable, simultaneous QSBR queries
1232  */
1233 static int
1234 test_rcu_qsbr_mw_mv_mqs(void)
1235 {
1236         unsigned int i, j;
1237         unsigned int test_cores;
1238
1239         writer_done = 0;
1240         test_cores = num_cores / 4;
1241         test_cores = test_cores * 4;
1242
1243         printf("Test: %d writers, %d QSBR variable, simultaneous QSBR queries\n",
1244                test_cores / 2, test_cores / 4);
1245
1246         for (i = 0; i < test_cores / 4; i++) {
1247                 j = i * 4;
1248                 rte_rcu_qsbr_init(t[i], RTE_MAX_LCORE);
1249                 h[i] = init_hash(i);
1250                 if (h[i] == NULL) {
1251                         printf("Hash init failed\n");
1252                         goto error;
1253                 }
1254                 thread_info[i].ir = i;
1255                 thread_info[i].ih = i;
1256                 thread_info[i].r_core_ids[0] = enabled_core_ids[j];
1257                 thread_info[i].r_core_ids[1] = enabled_core_ids[j + 1];
1258
1259                 /* Reader threads are launched */
1260                 rte_eal_remote_launch(test_rcu_qsbr_reader,
1261                                         (void *)&thread_info[i],
1262                                         enabled_core_ids[j]);
1263                 rte_eal_remote_launch(test_rcu_qsbr_reader,
1264                                         (void *)&thread_info[i],
1265                                         enabled_core_ids[j + 1]);
1266
1267                 /* Writer threads are launched */
1268                 rte_eal_remote_launch(test_rcu_qsbr_writer,
1269                                         (void *)&thread_info[i],
1270                                         enabled_core_ids[j + 2]);
1271                 rte_eal_remote_launch(test_rcu_qsbr_writer,
1272                                         (void *)&thread_info[i],
1273                                         enabled_core_ids[j + 3]);
1274         }
1275
1276         /* Wait and check return value from writer threads */
1277         for (i = 0; i < test_cores / 4; i++) {
1278                 j = i * 4;
1279                 if (rte_eal_wait_lcore(enabled_core_ids[j + 2]) < 0)
1280                         goto error;
1281
1282                 if (rte_eal_wait_lcore(enabled_core_ids[j + 3]) < 0)
1283                         goto error;
1284         }
1285         writer_done = 1;
1286
1287         /* Wait and check return value from reader threads */
1288         for (i = 0; i < test_cores / 4; i++) {
1289                 j = i * 4;
1290                 if (rte_eal_wait_lcore(enabled_core_ids[j]) < 0)
1291                         goto error;
1292
1293                 if (rte_eal_wait_lcore(enabled_core_ids[j + 1]) < 0)
1294                         goto error;
1295         }
1296
1297         for (i = 0; i < test_cores / 4; i++)
1298                 rte_hash_free(h[i]);
1299
1300         rte_free(keys);
1301
1302         return 0;
1303
1304 error:
1305         writer_done = 1;
1306         /* Wait until all readers and writers have exited */
1307         rte_eal_mp_wait_lcore();
1308
1309         for (i = 0; i < test_cores / 4; i++)
1310                 rte_hash_free(h[i]);
1311         rte_free(keys);
1312         for (j = 0; j < test_cores / 4; j++)
1313                 for (i = 0; i < TOTAL_ENTRY; i++)
1314                         rte_free(hash_data[j][i]);
1315
1316         return -1;
1317 }
1318
1319 static int
1320 test_rcu_qsbr_main(void)
1321 {
1322         uint16_t core_id;
1323
1324         if (rte_lcore_count() < 5) {
1325                 printf("Not enough cores for rcu_qsbr_autotest, expecting at least 5\n");
1326                 return TEST_SKIPPED;
1327         }
1328
1329         num_cores = 0;
1330         RTE_LCORE_FOREACH_SLAVE(core_id) {
1331                 enabled_core_ids[num_cores] = core_id;
1332                 num_cores++;
1333         }
1334
1335         /* Error-checking test cases */
1336         if (test_rcu_qsbr_get_memsize() < 0)
1337                 goto test_fail;
1338
1339         if (test_rcu_qsbr_init() < 0)
1340                 goto test_fail;
1341
1342         alloc_rcu();
1343
1344         if (test_rcu_qsbr_thread_register() < 0)
1345                 goto test_fail;
1346
1347         if (test_rcu_qsbr_thread_unregister() < 0)
1348                 goto test_fail;
1349
1350         if (test_rcu_qsbr_start() < 0)
1351                 goto test_fail;
1352
1353         if (test_rcu_qsbr_check() < 0)
1354                 goto test_fail;
1355
1356         if (test_rcu_qsbr_synchronize() < 0)
1357                 goto test_fail;
1358
1359         if (test_rcu_qsbr_dump() < 0)
1360                 goto test_fail;
1361
1362         if (test_rcu_qsbr_thread_online() < 0)
1363                 goto test_fail;
1364
1365         if (test_rcu_qsbr_thread_offline() < 0)
1366                 goto test_fail;
1367
1368         if (test_rcu_qsbr_dq_create() < 0)
1369                 goto test_fail;
1370
1371         if (test_rcu_qsbr_dq_reclaim() < 0)
1372                 goto test_fail;
1373
1374         if (test_rcu_qsbr_dq_delete() < 0)
1375                 goto test_fail;
1376
1377         if (test_rcu_qsbr_dq_enqueue() < 0)
1378                 goto test_fail;
1379
1380         printf("\nFunctional tests\n");
1381
1382         if (test_rcu_qsbr_sw_sv_3qs() < 0)
1383                 goto test_fail;
1384
1385         if (test_rcu_qsbr_mw_mv_mqs() < 0)
1386                 goto test_fail;
1387
1388         if (test_rcu_qsbr_dq_functional(1, 8, 0) < 0)
1389                 goto test_fail;
1390
1391         if (test_rcu_qsbr_dq_functional(2, 8, RTE_RCU_QSBR_DQ_MT_UNSAFE) < 0)
1392                 goto test_fail;
1393
1394         if (test_rcu_qsbr_dq_functional(303, 16, 0) < 0)
1395                 goto test_fail;
1396
1397         if (test_rcu_qsbr_dq_functional(7, 128, RTE_RCU_QSBR_DQ_MT_UNSAFE) < 0)
1398                 goto test_fail;
1399
1400         free_rcu();
1401
1402         printf("\n");
1403         return 0;
1404
1405 test_fail:
1406         free_rcu();
1407
1408         return -1;
1409 }
1410
1411 REGISTER_TEST_COMMAND(rcu_qsbr_autotest, test_rcu_qsbr_main);