test/rib: speed up rib6 autotests
[dpdk.git] / app / test / autotest_runner.py
1 # SPDX-License-Identifier: BSD-3-Clause
2 # Copyright(c) 2010-2014 Intel Corporation
3
4 # The main logic behind running autotests in parallel
5
6 from __future__ import print_function
7 import StringIO
8 import csv
9 from multiprocessing import Pool, Queue
10 import pexpect
11 import re
12 import subprocess
13 import sys
14 import time
15 import glob
16 import os
17
18 # wait for prompt
19 def wait_prompt(child):
20     try:
21         child.sendline()
22         result = child.expect(["RTE>>", pexpect.TIMEOUT, pexpect.EOF],
23                               timeout=120)
24     except:
25         return False
26     if result == 0:
27         return True
28     else:
29         return False
30
31
32 # get all valid NUMA nodes
33 def get_numa_nodes():
34     return [
35         int(
36             re.match(r"node(\d+)", os.path.basename(node))
37             .group(1)
38         )
39         for node in glob.glob("/sys/devices/system/node/node*")
40     ]
41
42
43 # find first (or any, really) CPU on a particular node, will be used to spread
44 # processes around NUMA nodes to avoid exhausting memory on particular node
45 def first_cpu_on_node(node_nr):
46     cpu_path = glob.glob("/sys/devices/system/node/node%d/cpu*" % node_nr)
47     r = re.compile(r"cpu(\d+)")
48     cpu_name = filter(None,
49             map(r.match,
50                 map(os.path.basename, cpu_path)
51             )
52     )
53     # for compatibility between python 3 and 2 we need to make interable out
54     # of filter return as it returns list in python 2 and a generator in 3
55     m = next(iter(cpu_name))
56     return int(m.group(1))
57
58
59 pool_child = None  # per-process child
60
61
62 # we initialize each worker with a queue because we need per-pool unique
63 # command-line arguments, but we cannot do different arguments in an initializer
64 # because the API doesn't allow per-worker initializer arguments. so, instead,
65 # we will initialize with a shared queue, and dequeue command-line arguments
66 # from this queue
67 def pool_init(queue, result_queue):
68     global pool_child
69
70     cmdline, prefix = queue.get()
71     start_time = time.time()
72     name = ("Start %s" % prefix) if prefix != "" else "Start"
73
74     # use default prefix if no prefix was specified
75     prefix_cmdline = "--file-prefix=%s" % prefix if prefix != "" else ""
76
77     # append prefix to cmdline
78     cmdline = "%s %s" % (cmdline, prefix_cmdline)
79
80     # prepare logging of init
81     startuplog = StringIO.StringIO()
82
83     # run test app
84     try:
85
86         print("\n%s %s\n" % ("=" * 20, prefix), file=startuplog)
87         print("\ncmdline=%s" % cmdline, file=startuplog)
88
89         pool_child = pexpect.spawn(cmdline, logfile=startuplog)
90
91         # wait for target to boot
92         if not wait_prompt(pool_child):
93             pool_child.close()
94
95             result = tuple((-1,
96                             "Fail [No prompt]",
97                             name,
98                             time.time() - start_time,
99                             startuplog.getvalue(),
100                             None))
101             pool_child = None
102         else:
103             result = tuple((0,
104                             "Success",
105                             name,
106                             time.time() - start_time,
107                             startuplog.getvalue(),
108                             None))
109     except:
110         result = tuple((-1,
111                         "Fail [Can't run]",
112                         name,
113                         time.time() - start_time,
114                         startuplog.getvalue(),
115                         None))
116         pool_child = None
117
118     result_queue.put(result)
119
120
121 # run a test
122 # each result tuple in results list consists of:
123 #   result value (0 or -1)
124 #   result string
125 #   test name
126 #   total test run time (double)
127 #   raw test log
128 #   test report (if not available, should be None)
129 #
130 # this function needs to be outside AutotestRunner class because otherwise Pool
131 # won't work (or rather it will require quite a bit of effort to make it work).
132 def run_test(target, test):
133     global pool_child
134
135     if pool_child is None:
136         return -1, "Fail [No test process]", test["Name"], 0, "", None
137
138     # create log buffer for each test
139     # in multiprocessing environment, the logging would be
140     # interleaved and will create a mess, hence the buffering
141     logfile = StringIO.StringIO()
142     pool_child.logfile = logfile
143
144     # make a note when the test started
145     start_time = time.time()
146
147     try:
148         # print test name to log buffer
149         print("\n%s %s\n" % ("-" * 20, test["Name"]), file=logfile)
150
151         # run test function associated with the test
152         result = test["Func"](pool_child, test["Command"])
153
154         # make a note when the test was finished
155         end_time = time.time()
156
157         log = logfile.getvalue()
158
159         # append test data to the result tuple
160         result += (test["Name"], end_time - start_time, log)
161
162         # call report function, if any defined, and supply it with
163         # target and complete log for test run
164         if test["Report"]:
165             report = test["Report"](target, log)
166
167             # append report to results tuple
168             result += (report,)
169         else:
170             # report is None
171             result += (None,)
172     except:
173         # make a note when the test crashed
174         end_time = time.time()
175
176         # mark test as failed
177         result = (-1, "Fail [Crash]", test["Name"],
178                   end_time - start_time, logfile.getvalue(), None)
179
180     # return test results
181     return result
182
183
184 # class representing an instance of autotests run
185 class AutotestRunner:
186     cmdline = ""
187     parallel_test_groups = []
188     non_parallel_test_groups = []
189     logfile = None
190     csvwriter = None
191     target = ""
192     start = None
193     n_tests = 0
194     fails = 0
195     log_buffers = []
196     blacklist = []
197     whitelist = []
198
199     def __init__(self, cmdline, target, blacklist, whitelist, n_processes):
200         self.cmdline = cmdline
201         self.target = target
202         self.blacklist = blacklist
203         self.whitelist = whitelist
204         self.skipped = []
205         self.parallel_tests = []
206         self.non_parallel_tests = []
207         self.n_processes = n_processes
208         self.active_processes = 0
209
210         # parse the binary for available test commands
211         binary = cmdline.split()[0]
212         stripped = 'not stripped' not in \
213                    subprocess.check_output(['file', binary])
214         if not stripped:
215             symbols = subprocess.check_output(['nm', binary]).decode('utf-8')
216             self.avail_cmds = re.findall('test_register_(\w+)', symbols)
217         else:
218             self.avail_cmds = None
219
220         # log file filename
221         logfile = "%s.log" % target
222         csvfile = "%s.csv" % target
223
224         self.logfile = open(logfile, "w")
225         csvfile = open(csvfile, "w")
226         self.csvwriter = csv.writer(csvfile)
227
228         # prepare results table
229         self.csvwriter.writerow(["test_name", "test_result", "result_str"])
230
231     # set up cmdline string
232     def __get_cmdline(self, cpu_nr):
233         cmdline = ("taskset -c %i " % cpu_nr) + self.cmdline
234
235         return cmdline
236
237     def __process_result(self, result):
238
239         # unpack result tuple
240         test_result, result_str, test_name, \
241             test_time, log, report = result
242
243         # get total run time
244         cur_time = time.time()
245         total_time = int(cur_time - self.start)
246
247         # print results, test run time and total time since start
248         result = ("%s:" % test_name).ljust(30)
249         result += result_str.ljust(29)
250         result += "[%02dm %02ds]" % (test_time / 60, test_time % 60)
251
252         # don't print out total time every line, it's the same anyway
253         print(result + "[%02dm %02ds]" % (total_time / 60, total_time % 60))
254
255         # if test failed and it wasn't a "start" test
256         if test_result < 0:
257             self.fails += 1
258
259         # collect logs
260         self.log_buffers.append(log)
261
262         # create report if it exists
263         if report:
264             try:
265                 f = open("%s_%s_report.rst" %
266                          (self.target, test_name), "w")
267             except IOError:
268                 print("Report for %s could not be created!" % test_name)
269             else:
270                 with f:
271                     f.write(report)
272
273         # write test result to CSV file
274         self.csvwriter.writerow([test_name, test_result, result_str])
275
276     # this function checks individual test and decides if this test should be in
277     # the group by comparing it against  whitelist/blacklist. it also checks if
278     # the test is compiled into the binary, and marks it as skipped if necessary
279     def __filter_test(self, test):
280         test_cmd = test["Command"]
281         test_id = test_cmd
282
283         # dump tests are specified in full e.g. "Dump_mempool"
284         if "_autotest" in test_id:
285             test_id = test_id[:-len("_autotest")]
286
287         # filter out blacklisted/whitelisted tests
288         if self.blacklist and test_id in self.blacklist:
289             return False
290         if self.whitelist and test_id not in self.whitelist:
291             return False
292
293         # if test wasn't compiled in, remove it as well
294         if self.avail_cmds and test_cmd not in self.avail_cmds:
295             result = 0, "Skipped [Not compiled]", test_id, 0, "", None
296             self.skipped.append(tuple(result))
297             return False
298
299         return True
300
301     def __run_test_group(self, test_group, worker_cmdlines):
302         group_queue = Queue()
303         init_result_queue = Queue()
304         for proc, cmdline in enumerate(worker_cmdlines):
305             prefix = "test%i" % proc if len(worker_cmdlines) > 1 else ""
306             group_queue.put(tuple((cmdline, prefix)))
307
308         # create a pool of worker threads
309         # we will initialize child in the initializer, and we don't need to
310         # close the child because when the pool worker gets destroyed, child
311         # closes the process
312         pool = Pool(processes=len(worker_cmdlines),
313                     initializer=pool_init,
314                     initargs=(group_queue, init_result_queue))
315
316         results = []
317
318         # process all initialization results
319         for _ in range(len(worker_cmdlines)):
320             self.__process_result(init_result_queue.get())
321
322         # run all tests asynchronously
323         for test in test_group:
324             result = pool.apply_async(run_test, (self.target, test))
325             results.append(result)
326
327         # tell the pool to stop all processes once done
328         pool.close()
329
330         # iterate while we have group execution results to get
331         while len(results) > 0:
332             # iterate over a copy to be able to safely delete results
333             # this iterates over a list of group results
334             for async_result in results[:]:
335                 # if the thread hasn't finished yet, continue
336                 if not async_result.ready():
337                     continue
338
339                 res = async_result.get()
340
341                 self.__process_result(res)
342
343                 # remove result from results list once we're done with it
344                 results.remove(async_result)
345
346     # iterate over test groups and run tests associated with them
347     def run_all_tests(self):
348         # filter groups
349         self.parallel_tests = list(
350             filter(self.__filter_test,
351                    self.parallel_tests)
352         )
353         self.non_parallel_tests = list(
354             filter(self.__filter_test,
355                    self.non_parallel_tests)
356         )
357
358         parallel_cmdlines = []
359         # FreeBSD doesn't have NUMA support
360         numa_nodes = get_numa_nodes()
361         if len(numa_nodes) > 0:
362             for proc in range(self.n_processes):
363                 # spread cpu affinity between NUMA nodes to have less chance of
364                 # running out of memory while running multiple test apps in
365                 # parallel. to do that, alternate between NUMA nodes in a round
366                 # robin fashion, and pick an arbitrary CPU from that node to
367                 # taskset our execution to
368                 numa_node = numa_nodes[self.active_processes % len(numa_nodes)]
369                 cpu_nr = first_cpu_on_node(numa_node)
370                 parallel_cmdlines += [self.__get_cmdline(cpu_nr)]
371                 # increase number of active processes so that the next cmdline
372                 # gets a different NUMA node
373                 self.active_processes += 1
374         else:
375             parallel_cmdlines = [self.cmdline] * self.n_processes
376
377         print("Running tests with %d workers" % self.n_processes)
378
379         # create table header
380         print("")
381         print("Test name".ljust(30) + "Test result".ljust(29) +
382               "Test".center(9) + "Total".center(9))
383         print("=" * 80)
384
385         if len(self.skipped):
386             print("Skipped autotests:")
387
388             # print out any skipped tests
389             for result in self.skipped:
390                 # unpack result tuple
391                 test_result, result_str, test_name, _, _, _ = result
392                 self.csvwriter.writerow([test_name, test_result, result_str])
393
394                 t = ("%s:" % test_name).ljust(30)
395                 t += result_str.ljust(29)
396                 t += "[00m 00s]"
397
398                 print(t)
399
400         # make a note of tests start time
401         self.start = time.time()
402
403         # whatever happens, try to save as much logs as possible
404         try:
405             if len(self.parallel_tests) > 0:
406                 print("Parallel autotests:")
407                 self.__run_test_group(self.parallel_tests, parallel_cmdlines)
408
409             if len(self.non_parallel_tests) > 0:
410                 print("Non-parallel autotests:")
411                 self.__run_test_group(self.non_parallel_tests, [self.cmdline])
412
413             # get total run time
414             cur_time = time.time()
415             total_time = int(cur_time - self.start)
416
417             # print out summary
418             print("=" * 80)
419             print("Total run time: %02dm %02ds" % (total_time / 60,
420                                                    total_time % 60))
421             if self.fails != 0:
422                 print("Number of failed tests: %s" % str(self.fails))
423
424             # write summary to logfile
425             self.logfile.write("Summary\n")
426             self.logfile.write("Target: ".ljust(15) + "%s\n" % self.target)
427             self.logfile.write("Tests: ".ljust(15) + "%i\n" % self.n_tests)
428             self.logfile.write("Failed tests: ".ljust(
429                 15) + "%i\n" % self.fails)
430         except:
431             print("Exception occurred")
432             print(sys.exc_info())
433             self.fails = 1
434
435         # drop logs from all executions to a logfile
436         for buf in self.log_buffers:
437             self.logfile.write(buf.replace("\r", ""))
438
439         return self.fails