#!/usr/bin/python
# BSD LICENSE
-#
+#
# Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
# All rights reserved.
-#
+#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
-#
+#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# * Neither the name of Intel Corporation nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
-#
+#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# The main logic behind running autotests in parallel
-import multiprocessing, sys, pexpect, time, os, StringIO, csv
+import multiprocessing, subprocess, sys, pexpect, re, time, os, StringIO, csv
# wait for prompt
def wait_prompt(child):
startuplog = StringIO.StringIO()
print >>startuplog, "\n%s %s\n" % ("="*20, test_group["Prefix"])
+ print >>startuplog, "\ncmdline=%s" % cmdline
child = pexpect.spawn(cmdline, logfile=startuplog)
results.append((0, "Success", "Start %s" % test_group["Prefix"],
time.time() - start_time, startuplog.getvalue(), None))
+ # parse the binary for available test commands
+ binary = cmdline.split()[0]
+ stripped = 'not stripped' not in subprocess.check_output(['file', binary])
+ if not stripped:
+ symbols = subprocess.check_output(['nm', binary]).decode('utf-8')
+ avail_cmds = re.findall('test_register_(\w+)', symbols)
+
# run all tests in test group
for test in test_group["Tests"]:
print >>logfile, "\n%s %s\n" % ("-"*20, test["Name"])
# run test function associated with the test
- result = test["Func"](child, test["Command"])
+ if stripped or test["Command"] in avail_cmds:
+ result = test["Func"](child, test["Command"])
+ else:
+ result = (0, "Skipped [Not Available]")
# make a note when the test was finished
end_time = time.time()
results.append(result)
# regardless of whether test has crashed, try quitting it
- try:
+ try:
child.sendline("quit")
child.close()
# if the test crashed, just do nothing instead
self.logfile = open(logfile, "w")
csvfile = open(csvfile, "w")
self.csvwriter = csv.writer(csvfile)
-
+
# prepare results table
self.csvwriter.writerow(["test_name","test_result","result_str"])
def __get_cmdline(self, test):
cmdline = self.cmdline
- # perform additional linuxapp adjustments
- if not "baremetal" in self.target:
-
- # append memory limitations for each test
- # otherwise tests won't run in parallel
- if not "i686" in self.target:
- cmdline += " --socket-mem=%s"% test["Memory"]
- else:
- # affinitize startup so that tests don't fail on i686
- cmdline = "taskset 1 " + cmdline
- cmdline += " -m " + str(sum(map(int,test["Memory"].split(","))))
+ # append memory limitations for each test
+ # otherwise tests won't run in parallel
+ if not "i686" in self.target:
+ cmdline += " --socket-mem=%s"% test["Memory"]
+ else:
+ # affinitize startup so that tests don't fail on i686
+ cmdline = "taskset 1 " + cmdline
+ cmdline += " -m " + str(sum(map(int,test["Memory"].split(","))))
- # set group prefix for autotest group
- # otherwise they won't run in parallel
- cmdline += " --file-prefix=%s"% test["Prefix"]
+ # set group prefix for autotest group
+ # otherwise they won't run in parallel
+ cmdline += " --file-prefix=%s"% test["Prefix"]
- return cmdline
return cmdline
def add_non_parallel_test_group(self,test_group):
self.non_parallel_test_groups.append(test_group)
-
-
+
+
def __process_results(self, results):
# this iterates over individual test results
for i, result in enumerate(results):
-
+
# increase total number of tests that were run
# do not include "start" test
if i > 0:
# if test failed and it wasn't a "start" test
if test_result < 0 and not i == 0:
self.fails += 1
-
+
# collect logs
self.log_buffers.append(log)
-
+
# create report if it exists
if report:
try:
else:
with f:
f.write(report)
-
+
# write test result to CSV file
if i != 0:
self.csvwriter.writerow([test_name, test_result, result_str])
# dump tests are specified in full e.g. "Dump_mempool"
if "_autotest" in test_id:
test_id = test_id[:-len("_autotest")]
-
+
# filter out blacklisted/whitelisted tests
if self.blacklist and test_id in self.blacklist:
test_group["Tests"].remove(test)
# put the numbers backwards so that we start
# deleting from the end, not from the beginning
groups_to_remove.insert(0, i)
-
+
# remove test groups that need to be removed
for i in groups_to_remove:
del test_groups[i]
-
+
return test_groups
-
+
# iterate over test groups and run tests associated with them
self.__filter_groups(self.parallel_test_groups)
self.non_parallel_test_groups = \
self.__filter_groups(self.non_parallel_test_groups)
-
+
# create a pool of worker threads
- if not "baremetal" in self.target:
- pool = multiprocessing.Pool(processes=1)
- else:
- # we can't be sure running baremetal tests in parallel
- # will work, so let's stay on the safe side
- pool = multiprocessing.Pool(processes=1)
-
+ pool = multiprocessing.Pool(processes=1)
+
results = []
-
+
# whatever happens, try to save as much logs as possible
try:
continue
res = group_result.get()
-
+
self.__process_results(res)
# remove result from results list once we're done with it
# run non_parallel tests. they are run one by one, synchronously
for test_group in self.non_parallel_test_groups:
group_result = run_test_group(self.__get_cmdline(test_group), test_group)
-
+
self.__process_results(group_result)
-
+
# get total run time
- cur_time = time.time()
+ cur_time = time.time()
total_time = int(cur_time - self.start)
# print out summary
except:
print "Exception occured"
print sys.exc_info()
+ self.fails = 1
# drop logs from all executions to a logfile
for buf in self.log_buffers:
self.logfile.write(buf.replace("\r",""))
-
+
log_buffers = []
+ return self.fails