pdump: replace constant for device name size
[dpdk.git] / app / test / autotest_runner.py
index 36941a4..8aa4d45 100644 (file)
@@ -1,10 +1,10 @@
+#!/usr/bin/env python3
 # SPDX-License-Identifier: BSD-3-Clause
 # Copyright(c) 2010-2014 Intel Corporation
 
 # The main logic behind running autotests in parallel
 
-from __future__ import print_function
-import StringIO
+import io
 import csv
 from multiprocessing import Pool, Queue
 import pexpect
@@ -43,11 +43,14 @@ def get_numa_nodes():
 # find first (or any, really) CPU on a particular node, will be used to spread
 # processes around NUMA nodes to avoid exhausting memory on particular node
 def first_cpu_on_node(node_nr):
-    cpu_path = glob.glob("/sys/devices/system/node/node%d/cpu*" % node_nr)[0]
-    cpu_name = os.path.basename(cpu_path)
-    m = re.match(r"cpu(\d+)", cpu_name)
-    return int(m.group(1))
-
+    cpu_path = glob.glob("/sys/devices/system/node/node%d/cpu*" % node_nr)
+    r = re.compile(r"cpu(\d+)")
+    cpu_name = filter(None,
+            map(r.match,
+                map(os.path.basename, cpu_path)
+            )
+    )
+    return int(next(cpu_name).group(1))
 
 pool_child = None  # per-process child
 
@@ -71,7 +74,7 @@ def pool_init(queue, result_queue):
     cmdline = "%s %s" % (cmdline, prefix_cmdline)
 
     # prepare logging of init
-    startuplog = StringIO.StringIO()
+    startuplog = io.StringIO()
 
     # run test app
     try:
@@ -79,8 +82,7 @@ def pool_init(queue, result_queue):
         print("\n%s %s\n" % ("=" * 20, prefix), file=startuplog)
         print("\ncmdline=%s" % cmdline, file=startuplog)
 
-        pool_child = pexpect.spawn(cmdline, logfile=startuplog)
-
+        pool_child = pexpect.spawn(cmdline, logfile=startuplog, encoding='utf-8')
         # wait for target to boot
         if not wait_prompt(pool_child):
             pool_child.close()
@@ -131,7 +133,7 @@ def run_test(target, test):
     # create log buffer for each test
     # in multiprocessing environment, the logging would be
     # interleaved and will create a mess, hence the buffering
-    logfile = StringIO.StringIO()
+    logfile = io.StringIO()
     pool_child.logfile = logfile
 
     # make a note when the test started
@@ -186,21 +188,30 @@ class AutotestRunner:
     n_tests = 0
     fails = 0
     log_buffers = []
-    blacklist = []
-    whitelist = []
+    blocklist = []
+    allowlist = []
 
-    def __init__(self, cmdline, target, blacklist, whitelist, n_processes):
+    def __init__(self, cmdline, target, blocklist, allowlist, n_processes):
         self.cmdline = cmdline
         self.target = target
-        self.binary = cmdline.split()[0]
-        self.blacklist = blacklist
-        self.whitelist = whitelist
+        self.blocklist = blocklist
+        self.allowlist = allowlist
         self.skipped = []
         self.parallel_tests = []
         self.non_parallel_tests = []
         self.n_processes = n_processes
         self.active_processes = 0
 
+        # parse the binary for available test commands
+        binary = cmdline.split()[0]
+        stripped = 'not stripped' not in \
+                   subprocess.check_output(['file', binary]).decode()
+        if not stripped:
+            symbols = subprocess.check_output(['nm', binary]).decode()
+            self.avail_cmds = re.findall('test_register_(\w+)', symbols)
+        else:
+            self.avail_cmds = None
+
         # log file filename
         logfile = "%s.log" % target
         csvfile = "%s.csv" % target
@@ -258,7 +269,7 @@ class AutotestRunner:
         self.csvwriter.writerow([test_name, test_result, result_str])
 
     # this function checks individual test and decides if this test should be in
-    # the group by comparing it against  whitelist/blacklist. it also checks if
+    # the group by comparing it against allowlist/blocklist. it also checks if
     # the test is compiled into the binary, and marks it as skipped if necessary
     def __filter_test(self, test):
         test_cmd = test["Command"]
@@ -268,27 +279,17 @@ class AutotestRunner:
         if "_autotest" in test_id:
             test_id = test_id[:-len("_autotest")]
 
-        # filter out blacklisted/whitelisted tests
-        if self.blacklist and test_id in self.blacklist:
+        # filter out blocked/allowed tests
+        if self.blocklist and test_id in self.blocklist:
             return False
-        if self.whitelist and test_id not in self.whitelist:
+        if self.allowlist and test_id not in self.allowlist:
             return False
 
         # if test wasn't compiled in, remove it as well
-
-        # parse the binary for available test commands
-        stripped = 'not stripped' not in \
-                   subprocess.check_output(['file', self.binary])
-        if not stripped:
-            symbols = subprocess.check_output(['nm',
-                                               self.binary]).decode('utf-8')
-            avail_cmds = re.findall('test_register_(\w+)', symbols)
-
-            if test_cmd not in avail_cmds:
-                # notify user
-                result = 0, "Skipped [Not compiled]", test_id, 0, "", None
-                self.skipped.append(tuple(result))
-                return False
+        if self.avail_cmds and test_cmd not in self.avail_cmds:
+            result = 0, "Skipped [Not compiled]", test_id, 0, "", None
+            self.skipped.append(tuple(result))
+            return False
 
         return True