1 # SPDX-License-Identifier: BSD-3-Clause
2 # Copyright(c) 2010-2014 Intel Corporation
4 # The main logic behind running autotests in parallel
6 from __future__ import print_function
19 def wait_prompt(child):
22 result = child.expect(["RTE>>", pexpect.TIMEOUT, pexpect.EOF],
32 # each result tuple in results list consists of:
33 # result value (0 or -1)
36 # total test run time (double)
38 # test report (if not available, should be None)
40 # this function needs to be outside AutotestRunner class
41 # because otherwise Pool won't work (or rather it will require
42 # quite a bit of effort to make it work).
45 def run_test_group(cmdline, prefix, target, test):
46 start_time = time.time()
48 # prepare logging of init
49 startuplog = StringIO.StringIO()
54 print("\n%s %s\n" % ("=" * 20, prefix), file=startuplog)
55 print("\ncmdline=%s" % cmdline, file=startuplog)
57 child = pexpect.spawn(cmdline, logfile=startuplog)
59 # wait for target to boot
60 if not wait_prompt(child):
63 return -1, "Fail [No prompt]", "Start %s" % prefix,\
64 time.time() - start_time, startuplog.getvalue(), None
67 return -1, "Fail [Can't run]", "Start %s" % prefix,\
68 time.time() - start_time, startuplog.getvalue(), None
70 # create log buffer for each test
71 # in multiprocessing environment, the logging would be
72 # interleaved and will create a mess, hence the buffering
73 logfile = StringIO.StringIO()
74 child.logfile = logfile
76 # make a note when the test started
77 start_time = time.time()
80 # print test name to log buffer
81 print("\n%s %s\n" % ("-" * 20, test["Name"]), file=logfile)
83 # run test function associated with the test
84 result = test["Func"](child, test["Command"])
86 # make a note when the test was finished
87 end_time = time.time()
89 log = logfile.getvalue()
91 # append test data to the result tuple
92 result += (test["Name"], end_time - start_time, log)
94 # call report function, if any defined, and supply it with
95 # target and complete log for test run
97 report = test["Report"](target, log)
99 # append report to results tuple
105 # make a note when the test crashed
106 end_time = time.time()
108 # mark test as failed
109 result = (-1, "Fail [Crash]", test["Name"],
110 end_time - start_time, logfile.getvalue(), None)
112 # regardless of whether test has crashed, try quitting it
114 child.sendline("quit")
116 # if the test crashed, just do nothing instead
121 # return test results
125 # class representing an instance of autotests run
126 class AutotestRunner:
128 parallel_test_groups = []
129 non_parallel_test_groups = []
140 def __init__(self, cmdline, target, blacklist, whitelist):
141 self.cmdline = cmdline
143 self.binary = cmdline.split()[0]
144 self.blacklist = blacklist
145 self.whitelist = whitelist
147 self.parallel_tests = []
148 self.non_parallel_tests = []
151 logfile = "%s.log" % target
152 csvfile = "%s.csv" % target
154 self.logfile = open(logfile, "w")
155 csvfile = open(csvfile, "w")
156 self.csvwriter = csv.writer(csvfile)
158 # prepare results table
159 self.csvwriter.writerow(["test_name", "test_result", "result_str"])
161 # set up cmdline string
162 def __get_cmdline(self):
163 cmdline = self.cmdline
165 # affinitize startup so that tests don't fail on i686
166 cmdline = "taskset 1 " + cmdline
170 def __process_result(self, result):
172 # unpack result tuple
173 test_result, result_str, test_name, \
174 test_time, log, report = result
177 cur_time = time.time()
178 total_time = int(cur_time - self.start)
180 # print results, test run time and total time since start
181 result = ("%s:" % test_name).ljust(30)
182 result += result_str.ljust(29)
183 result += "[%02dm %02ds]" % (test_time / 60, test_time % 60)
185 # don't print out total time every line, it's the same anyway
186 print(result + "[%02dm %02ds]" % (total_time / 60, total_time % 60))
188 # if test failed and it wasn't a "start" test
193 self.log_buffers.append(log)
195 # create report if it exists
198 f = open("%s_%s_report.rst" %
199 (self.target, test_name), "w")
201 print("Report for %s could not be created!" % test_name)
206 # write test result to CSV file
207 self.csvwriter.writerow([test_name, test_result, result_str])
209 # this function checks individual test and decides if this test should be in
210 # the group by comparing it against whitelist/blacklist. it also checks if
211 # the test is compiled into the binary, and marks it as skipped if necessary
212 def __filter_test(self, test):
213 test_cmd = test["Command"]
216 # dump tests are specified in full e.g. "Dump_mempool"
217 if "_autotest" in test_id:
218 test_id = test_id[:-len("_autotest")]
220 # filter out blacklisted/whitelisted tests
221 if self.blacklist and test_id in self.blacklist:
223 if self.whitelist and test_id not in self.whitelist:
226 # if test wasn't compiled in, remove it as well
228 # parse the binary for available test commands
229 stripped = 'not stripped' not in \
230 subprocess.check_output(['file', self.binary])
232 symbols = subprocess.check_output(['nm',
233 self.binary]).decode('utf-8')
234 avail_cmds = re.findall('test_register_(\w+)', symbols)
236 if test_cmd not in avail_cmds:
238 result = 0, "Skipped [Not compiled]", test_id, 0, "", None
239 self.skipped.append(tuple(result))
244 # iterate over test groups and run tests associated with them
245 def run_all_tests(self):
247 self.parallel_tests = list(
248 filter(self.__filter_test,
251 self.non_parallel_tests = list(
252 filter(self.__filter_test,
253 self.non_parallel_tests)
256 # create a pool of worker threads
257 pool = multiprocessing.Pool(processes=1)
261 # whatever happens, try to save as much logs as possible
264 # create table header
266 print("Test name".ljust(30) + "Test result".ljust(29) +
267 "Test".center(9) + "Total".center(9))
270 # print out skipped autotests if there were any
271 if len(self.skipped):
272 print("Skipped autotests:")
274 # print out any skipped tests
275 for result in self.skipped:
276 # unpack result tuple
277 test_result, result_str, test_name, _, _, _ = result
278 self.csvwriter.writerow([test_name, test_result,
281 t = ("%s:" % test_name).ljust(30)
282 t += result_str.ljust(29)
287 # make a note of tests start time
288 self.start = time.time()
290 if len(self.parallel_tests) > 0:
291 print("Parallel autotests:")
292 # assign worker threads to run test groups
293 for test_group in self.parallel_tests:
294 result = pool.apply_async(run_test_group,
295 [self.__get_cmdline(),
299 results.append(result)
301 # iterate while we have group execution results to get
302 while len(results) > 0:
304 # iterate over a copy to be able to safely delete results
305 # this iterates over a list of group results
306 for group_result in results[:]:
308 # if the thread hasn't finished yet, continue
309 if not group_result.ready():
312 res = group_result.get()
314 self.__process_result(res)
316 # remove result from results list once we're done with it
317 results.remove(group_result)
319 if len(self.non_parallel_tests) > 0:
320 print("Non-parallel autotests:")
321 # run non_parallel tests. they are run one by one, synchronously
322 for test_group in self.non_parallel_tests:
323 group_result = run_test_group(
324 self.__get_cmdline(), "", self.target, test_group)
326 self.__process_result(group_result)
329 cur_time = time.time()
330 total_time = int(cur_time - self.start)
334 print("Total run time: %02dm %02ds" % (total_time / 60,
337 print("Number of failed tests: %s" % str(self.fails))
339 # write summary to logfile
340 self.logfile.write("Summary\n")
341 self.logfile.write("Target: ".ljust(15) + "%s\n" % self.target)
342 self.logfile.write("Tests: ".ljust(15) + "%i\n" % self.n_tests)
343 self.logfile.write("Failed tests: ".ljust(
344 15) + "%i\n" % self.fails)
346 print("Exception occurred")
347 print(sys.exc_info())
350 # drop logs from all executions to a logfile
351 for buf in self.log_buffers:
352 self.logfile.write(buf.replace("\r", ""))