X-Git-Url: http://git.droids-corp.org/?a=blobdiff_plain;f=test%2Ftest%2Fautotest_runner.py;h=36941a40a16cac1ae7401292a0bd0ea3ccedc62a;hb=c652031bf46709a3a9e7bda21722ad7627f7944e;hp=d6ae57e762ccb1e11af93b71dc6b9b70741c6b73;hpb=82dc85c4e63a61c9120e8790215a6ce65e496955;p=dpdk.git diff --git a/test/test/autotest_runner.py b/test/test/autotest_runner.py index d6ae57e762..36941a40a1 100644 --- a/test/test/autotest_runner.py +++ b/test/test/autotest_runner.py @@ -6,16 +6,16 @@ from __future__ import print_function import StringIO import csv -import multiprocessing +from multiprocessing import Pool, Queue import pexpect import re import subprocess import sys import time +import glob +import os # wait for prompt - - def wait_prompt(child): try: child.sendline() @@ -28,22 +28,47 @@ def wait_prompt(child): else: return False -# run a test group -# each result tuple in results list consists of: -# result value (0 or -1) -# result string -# test name -# total test run time (double) -# raw test log -# test report (if not available, should be None) -# -# this function needs to be outside AutotestRunner class -# because otherwise Pool won't work (or rather it will require -# quite a bit of effort to make it work). + +# get all valid NUMA nodes +def get_numa_nodes(): + return [ + int( + re.match(r"node(\d+)", os.path.basename(node)) + .group(1) + ) + for node in glob.glob("/sys/devices/system/node/node*") + ] + + +# find first (or any, really) CPU on a particular node, will be used to spread +# processes around NUMA nodes to avoid exhausting memory on particular node +def first_cpu_on_node(node_nr): + cpu_path = glob.glob("/sys/devices/system/node/node%d/cpu*" % node_nr)[0] + cpu_name = os.path.basename(cpu_path) + m = re.match(r"cpu(\d+)", cpu_name) + return int(m.group(1)) + + +pool_child = None # per-process child -def run_test_group(cmdline, prefix, target, test): +# we initialize each worker with a queue because we need per-pool unique +# command-line arguments, but we cannot do different arguments in an initializer +# because the API doesn't allow per-worker initializer arguments. so, instead, +# we will initialize with a shared queue, and dequeue command-line arguments +# from this queue +def pool_init(queue, result_queue): + global pool_child + + cmdline, prefix = queue.get() start_time = time.time() + name = ("Start %s" % prefix) if prefix != "" else "Start" + + # use default prefix if no prefix was specified + prefix_cmdline = "--file-prefix=%s" % prefix if prefix != "" else "" + + # append prefix to cmdline + cmdline = "%s %s" % (cmdline, prefix_cmdline) # prepare logging of init startuplog = StringIO.StringIO() @@ -54,24 +79,60 @@ def run_test_group(cmdline, prefix, target, test): print("\n%s %s\n" % ("=" * 20, prefix), file=startuplog) print("\ncmdline=%s" % cmdline, file=startuplog) - child = pexpect.spawn(cmdline, logfile=startuplog) + pool_child = pexpect.spawn(cmdline, logfile=startuplog) # wait for target to boot - if not wait_prompt(child): - child.close() + if not wait_prompt(pool_child): + pool_child.close() + + result = tuple((-1, + "Fail [No prompt]", + name, + time.time() - start_time, + startuplog.getvalue(), + None)) + pool_child = None + else: + result = tuple((0, + "Success", + name, + time.time() - start_time, + startuplog.getvalue(), + None)) + except: + result = tuple((-1, + "Fail [Can't run]", + name, + time.time() - start_time, + startuplog.getvalue(), + None)) + pool_child = None - return -1, "Fail [No prompt]", "Start %s" % prefix,\ - time.time() - start_time, startuplog.getvalue(), None + result_queue.put(result) - except: - return -1, "Fail [Can't run]", "Start %s" % prefix,\ - time.time() - start_time, startuplog.getvalue(), None + +# run a test +# each result tuple in results list consists of: +# result value (0 or -1) +# result string +# test name +# total test run time (double) +# raw test log +# test report (if not available, should be None) +# +# this function needs to be outside AutotestRunner class because otherwise Pool +# won't work (or rather it will require quite a bit of effort to make it work). +def run_test(target, test): + global pool_child + + if pool_child is None: + return -1, "Fail [No test process]", test["Name"], 0, "", None # create log buffer for each test # in multiprocessing environment, the logging would be # interleaved and will create a mess, hence the buffering logfile = StringIO.StringIO() - child.logfile = logfile + pool_child.logfile = logfile # make a note when the test started start_time = time.time() @@ -81,7 +142,7 @@ def run_test_group(cmdline, prefix, target, test): print("\n%s %s\n" % ("-" * 20, test["Name"]), file=logfile) # run test function associated with the test - result = test["Func"](child, test["Command"]) + result = test["Func"](pool_child, test["Command"]) # make a note when the test was finished end_time = time.time() @@ -109,15 +170,6 @@ def run_test_group(cmdline, prefix, target, test): result = (-1, "Fail [Crash]", test["Name"], end_time - start_time, logfile.getvalue(), None) - # regardless of whether test has crashed, try quitting it - try: - child.sendline("quit") - child.close() - # if the test crashed, just do nothing instead - except: - # nop - pass - # return test results return result @@ -137,7 +189,7 @@ class AutotestRunner: blacklist = [] whitelist = [] - def __init__(self, cmdline, target, blacklist, whitelist): + def __init__(self, cmdline, target, blacklist, whitelist, n_processes): self.cmdline = cmdline self.target = target self.binary = cmdline.split()[0] @@ -146,6 +198,8 @@ class AutotestRunner: self.skipped = [] self.parallel_tests = [] self.non_parallel_tests = [] + self.n_processes = n_processes + self.active_processes = 0 # log file filename logfile = "%s.log" % target @@ -159,11 +213,8 @@ class AutotestRunner: self.csvwriter.writerow(["test_name", "test_result", "result_str"]) # set up cmdline string - def __get_cmdline(self): - cmdline = self.cmdline - - # affinitize startup so that tests don't fail on i686 - cmdline = "taskset 1 " + cmdline + def __get_cmdline(self, cpu_nr): + cmdline = ("taskset -c %i " % cpu_nr) + self.cmdline return cmdline @@ -241,6 +292,51 @@ class AutotestRunner: return True + def __run_test_group(self, test_group, worker_cmdlines): + group_queue = Queue() + init_result_queue = Queue() + for proc, cmdline in enumerate(worker_cmdlines): + prefix = "test%i" % proc if len(worker_cmdlines) > 1 else "" + group_queue.put(tuple((cmdline, prefix))) + + # create a pool of worker threads + # we will initialize child in the initializer, and we don't need to + # close the child because when the pool worker gets destroyed, child + # closes the process + pool = Pool(processes=len(worker_cmdlines), + initializer=pool_init, + initargs=(group_queue, init_result_queue)) + + results = [] + + # process all initialization results + for _ in range(len(worker_cmdlines)): + self.__process_result(init_result_queue.get()) + + # run all tests asynchronously + for test in test_group: + result = pool.apply_async(run_test, (self.target, test)) + results.append(result) + + # tell the pool to stop all processes once done + pool.close() + + # iterate while we have group execution results to get + while len(results) > 0: + # iterate over a copy to be able to safely delete results + # this iterates over a list of group results + for async_result in results[:]: + # if the thread hasn't finished yet, continue + if not async_result.ready(): + continue + + res = async_result.get() + + self.__process_result(res) + + # remove result from results list once we're done with it + results.remove(async_result) + # iterate over test groups and run tests associated with them def run_all_tests(self): # filter groups @@ -253,77 +349,60 @@ class AutotestRunner: self.non_parallel_tests) ) - # create a pool of worker threads - pool = multiprocessing.Pool(processes=1) - - results = [] + parallel_cmdlines = [] + # FreeBSD doesn't have NUMA support + numa_nodes = get_numa_nodes() + if len(numa_nodes) > 0: + for proc in range(self.n_processes): + # spread cpu affinity between NUMA nodes to have less chance of + # running out of memory while running multiple test apps in + # parallel. to do that, alternate between NUMA nodes in a round + # robin fashion, and pick an arbitrary CPU from that node to + # taskset our execution to + numa_node = numa_nodes[self.active_processes % len(numa_nodes)] + cpu_nr = first_cpu_on_node(numa_node) + parallel_cmdlines += [self.__get_cmdline(cpu_nr)] + # increase number of active processes so that the next cmdline + # gets a different NUMA node + self.active_processes += 1 + else: + parallel_cmdlines = [self.cmdline] * self.n_processes - # whatever happens, try to save as much logs as possible - try: + print("Running tests with %d workers" % self.n_processes) - # create table header - print("") - print("Test name".ljust(30) + "Test result".ljust(29) + - "Test".center(9) + "Total".center(9)) - print("=" * 80) + # create table header + print("") + print("Test name".ljust(30) + "Test result".ljust(29) + + "Test".center(9) + "Total".center(9)) + print("=" * 80) - # print out skipped autotests if there were any - if len(self.skipped): - print("Skipped autotests:") + if len(self.skipped): + print("Skipped autotests:") - # print out any skipped tests - for result in self.skipped: - # unpack result tuple - test_result, result_str, test_name, _, _, _ = result - self.csvwriter.writerow([test_name, test_result, - result_str]) + # print out any skipped tests + for result in self.skipped: + # unpack result tuple + test_result, result_str, test_name, _, _, _ = result + self.csvwriter.writerow([test_name, test_result, result_str]) - t = ("%s:" % test_name).ljust(30) - t += result_str.ljust(29) - t += "[00m 00s]" + t = ("%s:" % test_name).ljust(30) + t += result_str.ljust(29) + t += "[00m 00s]" - print(t) + print(t) - # make a note of tests start time - self.start = time.time() + # make a note of tests start time + self.start = time.time() + # whatever happens, try to save as much logs as possible + try: if len(self.parallel_tests) > 0: print("Parallel autotests:") - # assign worker threads to run test groups - for test_group in self.parallel_tests: - result = pool.apply_async(run_test_group, - [self.__get_cmdline(), - "", - self.target, - test_group]) - results.append(result) - - # iterate while we have group execution results to get - while len(results) > 0: - - # iterate over a copy to be able to safely delete results - # this iterates over a list of group results - for group_result in results[:]: - - # if the thread hasn't finished yet, continue - if not group_result.ready(): - continue - - res = group_result.get() - - self.__process_result(res) - - # remove result from results list once we're done with it - results.remove(group_result) + self.__run_test_group(self.parallel_tests, parallel_cmdlines) if len(self.non_parallel_tests) > 0: print("Non-parallel autotests:") - # run non_parallel tests. they are run one by one, synchronously - for test_group in self.non_parallel_tests: - group_result = run_test_group( - self.__get_cmdline(), "", self.target, test_group) - - self.__process_result(group_result) + self.__run_test_group(self.non_parallel_tests, [self.cmdline]) # get total run time cur_time = time.time()