# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
-# Script that uses qemu controlled by python-pexpect to check that
-# all autotests are working in the baremetal environment.
+# Script that uses either test app or qemu controlled by python-pexpect
-import sys, pexpect, time, os, re
+import sys, autotest_data, autotest_runner
-directory = sys.argv[2]
-target = sys.argv[3]
-log_file = "%s.txt"%(target)
-if "baremetal" in target:
- cmdline = "qemu-system-x86_64 -cdrom %s.iso -boot d "%(sys.argv[1])
- cmdline += "-m 2000 -smp 4 -nographic -net nic,model=e1000"
- platform = "QEMU x86_64"
-else:
- cmdline = "%s -c f -n 4"%(sys.argv[1])
- try:
- platform = open("/root/rte_platform_model.txt").read()
- except:
- platform = "unknown"
-print cmdline
+def usage():
+ print"Usage: autotest.py [test app|test iso image]",
+ print "[target] [whitelist|-blacklist]"
-report_hdr=""".. <COPYRIGHT_TAG>
+if len(sys.argv) < 3:
+ usage()
+ sys.exit(1)
-"""
+target = sys.argv[2]
test_whitelist=None
test_blacklist=None
-class SubTest:
- "Defines a subtest"
- def __init__(self, title, function, command=None, timeout=10, genreport=None):
- self.title = title
- self.function = function
- self.command = command
- self.timeout = timeout
- self.genreport = genreport
-
-class AutoTest:
- """This class contains all methods needed to launch several
- automatic tests, archive test results, log, and generate a nice
- test report in restructured text"""
-
- title = "new"
- mainlog = None
- logbuf = None
- literal = 0
- test_list = []
- report_list = []
- child = None
-
- def __init__(self, pexpectchild, filename, mode):
- "Init the Autotest class"
- self.mainlog = file(filename, mode)
- self.child = pexpectchild
- pexpectchild.logfile = self
- def register(self, filename, title, subtest_list):
- "Register a test with a list of subtests"
- test = {}
- test["filename"] = filename
- test["title"] = title
- test["subtest_list"] = subtest_list
- self.test_list.append(test)
-
- def start(self):
- "start the tests, and fill the internal report_list field"
- for t in self.test_list:
- report = {}
- report["date"] = time.asctime()
- report["title"] = t["title"]
- report["filename"] = t["filename"]
- report["subreport_list"] = []
- report["fails"] = 0
- report["success"] = 0
- report["subreport_list"] = []
- for st in t["subtest_list"]:
- if test_whitelist is not None and st.title not in test_whitelist:
- continue
- if test_blacklist is not None and st.title in test_blacklist:
- continue
- subreport = {}
- self.reportbuf = ""
- subreport["title"] = st.title
- subreport["func"] = st.function
- subreport["command"] = st.command
- subreport["timeout"] = st.timeout
- subreport["genreport"] = st.genreport
-
- # launch subtest
- print "%s (%s): "%(subreport["title"], subreport["command"]),
- sys.stdout.flush()
- start = time.time()
- res = subreport["func"](self.child,
- command = subreport["command"],
- timeout = subreport["timeout"])
- t = int(time.time() - start)
-
- subreport["time"] = "%dmn%d"%(t/60, t%60)
- subreport["result"] = res[0] # 0 or -1
- subreport["result_str"] = res[1] # cause of fail
- subreport["logs"] = self.reportbuf
- print "%s [%s]"%(subreport["result_str"], subreport["time"])
- if subreport["result"] == 0:
- report["success"] += 1
- else:
- report["fails"] += 1
- report["subreport_list"].append(subreport)
- self.report_list.append(report)
-
- def gen_report(self):
- for report in self.report_list:
- # main report header and stats
- self.literal = 0
- reportlog = file(report["filename"], "w")
- reportlog.write(report_hdr)
- reportlog.write(report["title"] + "\n")
- reportlog.write(re.sub(".", "=", report["title"]) + "\n\n")
- reportlog.write("Autogenerated test report:\n\n" )
- reportlog.write("- date: **%s**\n"%(report["date"]))
- reportlog.write("- target: **%s**\n"%(target))
- reportlog.write("- success: **%d**\n"%(report["success"]))
- reportlog.write("- fails: **%d**\n"%(report["fails"]))
- reportlog.write("- platform: **%s**\n\n"%(platform))
-
- # summary
- reportlog.write(".. csv-table:: Test results summary\n")
- reportlog.write(' :header: "Name", "Result"\n\n')
- for subreport in report["subreport_list"]:
- if subreport["result"] == 0:
- res_str = "Success"
- else:
- res_str = "Failure"
- reportlog.write(' "%s", "%s"\n'%(subreport["title"], res_str))
- reportlog.write('\n')
-
- # subreports
- for subreport in report["subreport_list"]:
- # print subtitle
- reportlog.write(subreport["title"] + "\n")
- reportlog.write(re.sub(".", "-", subreport["title"]) + "\n\n")
- # print logs
- reportlog.write("::\n \n ")
- s = subreport["logs"].replace("\n", "\n ")
- reportlog.write(s)
- # print result
- reportlog.write("\n\n")
- reportlog.write("**" + subreport["result_str"] + "**\n\n")
- # custom genreport
- if subreport["genreport"] != None:
- s = subreport["genreport"]()
- reportlog.write(s)
-
- reportlog.close()
-
- # displayed on console
- print
- print "-------------------------"
- print
- if report["fails"] == 0:
- print "All test OK"
- else:
- print "%s test(s) failed"%(report["fails"])
-
- # file API, to store logs from pexpect
- def write(self, buf):
- s = buf[:]
- s = s.replace("\r", "")
- self.mainlog.write(s)
- self.reportbuf += s
- def flush(self):
- self.mainlog.flush()
- def close(self):
- self.mainlog.close()
-
-
-# Try to match prompt: return 0 on success, else return -1
-def wait_prompt(child):
- for i in range(3):
- index = child.expect(["RTE>>", pexpect.TIMEOUT], timeout = 1)
- child.sendline("")
- if index == 0:
- return 0
- print "Cannot find prompt"
- return -1
-
-# Try to match prompt after boot: return 0 on success, else return -1
-def wait_boot(child):
- index = child.expect(["RTE>>", pexpect.TIMEOUT],
- timeout = 120)
- if index == 0:
- return 0
- if (wait_prompt(child) == -1):
- print "Target did not boot, failed"
- return -1
- return 0
-
-# quit RTE
-def quit(child):
- if wait_boot(child) != 0:
- return -1, "Cannot find prompt"
- child.sendline("quit")
- return 0, "Success"
-
-# Default function to launch an autotest that does not need to
-# interact with the user. Basically, this function calls the autotest
-# function through command line interface, then check that it displays
-# "Test OK" or "Test Failed".
-def default_autotest(child, command, timeout=10):
- if wait_prompt(child) != 0:
- return -1, "Failed: cannot find prompt"
- child.sendline(command)
- index = child.expect(["Test OK", "Test Failed",
- pexpect.TIMEOUT], timeout = timeout)
- if index == 1:
- return -1, "Failed"
- elif index == 2:
- return -1, "Failed [Timeout]"
- return 0, "Success"
-
-# wait boot
-def boot_autotest(child, **kargs):
- if wait_boot(child) != 0:
- return -1, "Cannot find prompt"
- return 0, "Success"
-
-# Test memory dump. We need to check that at least one memory zone is
-# displayed.
-def memory_autotest(child, command, **kargs):
- if wait_prompt(child) != 0:
- return -1, "Failed: cannot find prompt"
- child.sendline(command)
- regexp = "phys:0x[0-9a-f]*, len:0x([0-9a-f]*), virt:0x[0-9a-f]*, socket_id:[0-9]*"
- index = child.expect([regexp, pexpect.TIMEOUT], timeout = 180)
- if index != 0:
- return -1, "Failed: timeout"
- size = int(child.match.groups()[0], 16)
- if size <= 0:
- return -1, "Failed: bad size"
- index = child.expect(["Test OK", "Test Failed",
- pexpect.TIMEOUT], timeout = 10)
- if index == 1:
- return -1, "Failed: C code returned an error"
- elif index == 2:
- return -1, "Failed: timeout"
- return 0, "Success"
-
-# Test some libc functions including scanf. This requires a
-# interaction with the user (simulated in expect), so we cannot use
-# default_autotest() here.
-def string_autotest(child, command, **kargs):
- if wait_prompt(child) != 0:
- return -1, "Failed: cannot find prompt"
- child.sendline(command)
- index = child.expect(["Now, test scanf, enter this number",
- pexpect.TIMEOUT], timeout = 10)
- if index != 0:
- return -1, "Failed: timeout"
- child.sendline("123456")
- index = child.expect(["number=123456", pexpect.TIMEOUT], timeout = 10)
- if index != 0:
- return -1, "Failed: timeout (2)"
- index = child.expect(["Test OK", "Test Failed",
- pexpect.TIMEOUT], timeout = 10)
- if index != 0:
- return -1, "Failed: C code returned an error"
- return 0, "Success"
-
-# Test spinlock. This requires to check the order of displayed lines:
-# we cannot use default_autotest() here.
-def spinlock_autotest(child, command, **kargs):
- i = 0
- ir = 0
- if wait_prompt(child) != 0:
- return -1, "Failed: cannot find prompt"
- child.sendline(command)
- while True:
- index = child.expect(["Test OK",
- "Test Failed",
- "Hello from core ([0-9]*) !",
- "Hello from within recursive locks from ([0-9]*) !",
- pexpect.TIMEOUT], timeout = 20)
- # ok
- if index == 0:
- break
-
- # message, check ordering
- elif index == 2:
- if int(child.match.groups()[0]) < i:
- return -1, "Failed: bad order"
- i = int(child.match.groups()[0])
- elif index == 3:
- if int(child.match.groups()[0]) < ir:
- return -1, "Failed: bad order"
- ir = int(child.match.groups()[0])
-
- # fail
- else:
- return -1, "Failed: timeout or error"
-
- return 0, "Success"
-
-
-# Test rwlock. This requires to check the order of displayed lines:
-# we cannot use default_autotest() here.
-def rwlock_autotest(child, command, **kargs):
- i = 0
- if wait_prompt(child) != 0:
- return -1, "Failed: cannot find prompt"
- child.sendline(command)
- while True:
- index = child.expect(["Test OK",
- "Test Failed",
- "Hello from core ([0-9]*) !",
- "Global write lock taken on master core ([0-9]*)",
- pexpect.TIMEOUT], timeout = 10)
- # ok
- if index == 0:
- if i != 0xffff:
- return -1, "Failed: a message is missing"
- break
-
- # message, check ordering
- elif index == 2:
- if int(child.match.groups()[0]) < i:
- return -1, "Failed: bad order"
- i = int(child.match.groups()[0])
-
- # must be the last message, check ordering
- elif index == 3:
- i = 0xffff
-
- # fail
- else:
- return -1, "Failed: timeout or error"
-
- return 0, "Success"
-
-# Test logs. This requires to check the order of displayed lines:
-# we cannot use default_autotest() here.
-def logs_autotest(child, command, **kargs):
- i = 0
- if wait_prompt(child) != 0:
- return -1, "Failed: cannot find prompt"
- child.sendline(command)
-
- log_list = [
- "TESTAPP1: this is a debug level message",
- "TESTAPP1: this is a info level message",
- "TESTAPP1: this is a warning level message",
- "TESTAPP2: this is a info level message",
- "TESTAPP2: this is a warning level message",
- "TESTAPP1: this is a debug level message",
- "TESTAPP1: this is a debug level message",
- "TESTAPP1: this is a info level message",
- "TESTAPP1: this is a warning level message",
- "TESTAPP2: this is a info level message",
- "TESTAPP2: this is a warning level message",
- "TESTAPP1: this is a debug level message",
- ]
-
- for log_msg in log_list:
- index = child.expect([log_msg,
- "Test OK",
- "Test Failed",
- pexpect.TIMEOUT], timeout = 10)
-
- # not ok
- if index != 0:
- return -1, "Failed: timeout or error"
-
- index = child.expect(["Test OK",
- "Test Failed",
- pexpect.TIMEOUT], timeout = 10)
-
- return 0, "Success"
-
-# Test timers. This requires to check the order of displayed lines:
-# we cannot use default_autotest() here.
-def timer_autotest(child, command, **kargs):
- i = 0
- if wait_prompt(child) != 0:
- return -1, "Failed: cannot find prompt"
- child.sendline(command)
-
- index = child.expect(["Start timer stress tests \(30 seconds\)",
- "Test Failed",
- pexpect.TIMEOUT], timeout = 10)
-
- # not ok
- if index != 0:
- return -1, "Failed: timeout or error"
-
- index = child.expect(["Start timer basic tests \(30 seconds\)",
- "Test Failed",
- pexpect.TIMEOUT], timeout = 40)
-
- # not ok
- if index != 0:
- return -1, "Failed: timeout or error (2)"
-
- prev_lcore_timer1 = -1
-
- lcore_tim0 = -1
- lcore_tim1 = -1
- lcore_tim2 = -1
- lcore_tim3 = -1
-
- while True:
- index = child.expect(["TESTTIMER: ([0-9]*): callback id=([0-9]*) count=([0-9]*) on core ([0-9]*)",
- "Test OK",
- "Test Failed",
- pexpect.TIMEOUT], timeout = 10)
-
- if index == 1:
- break
-
- if index != 0:
- return -1, "Failed: timeout or error (3)"
-
- try:
- t = int(child.match.groups()[0])
- id = int(child.match.groups()[1])
- cnt = int(child.match.groups()[2])
- lcore = int(child.match.groups()[3])
- except:
- return -1, "Failed: cannot parse output"
-
- # timer0 always expires on the same core when cnt < 20
- if id == 0:
- if lcore_tim0 == -1:
- lcore_tim0 = lcore
- elif lcore != lcore_tim0 and cnt < 20:
- return -1, "Failed: lcore != lcore_tim0 (%d, %d)"%(lcore, lcore_tim0)
- if cnt > 21:
- return -1, "Failed: tim0 cnt > 21"
-
- # timer1 each time expires on a different core
- if id == 1:
- if lcore == lcore_tim1:
- return -1, "Failed: lcore == lcore_tim1 (%d, %d)"%(lcore, lcore_tim1)
- lcore_tim1 = lcore
- if cnt > 10:
- return -1, "Failed: tim1 cnt > 30"
-
- # timer0 always expires on the same core
- if id == 2:
- if lcore_tim2 == -1:
- lcore_tim2 = lcore
- elif lcore != lcore_tim2:
- return -1, "Failed: lcore != lcore_tim2 (%d, %d)"%(lcore, lcore_tim2)
- if cnt > 30:
- return -1, "Failed: tim2 cnt > 30"
-
- # timer0 always expires on the same core
- if id == 3:
- if lcore_tim3 == -1:
- lcore_tim3 = lcore
- elif lcore != lcore_tim3:
- return -1, "Failed: lcore_tim3 changed (%d -> %d)"%(lcore, lcore_tim3)
- if cnt > 30:
- return -1, "Failed: tim3 cnt > 30"
-
- # must be 2 different cores
- if lcore_tim0 == lcore_tim3:
- return -1, "Failed: lcore_tim0 (%d) == lcore_tim3 (%d)"%(lcore_tim0, lcore_tim3)
-
- return 0, "Success"
-
-# Ring autotest
-def ring_autotest(child, command, timeout=10):
- if wait_prompt(child) != 0:
- return -1, "Failed: cannot find prompt"
- child.sendline(command)
- index = child.expect(["Test OK", "Test Failed",
- pexpect.TIMEOUT], timeout = timeout)
- if index != 0:
- return -1, "Failed"
-
- child.sendline("set_watermark test 100")
- child.sendline("set_quota test 16")
- child.sendline("dump_ring test")
- index = child.expect([" watermark=100",
- pexpect.TIMEOUT], timeout = 1)
- if index != 0:
- return -1, "Failed: bad watermark"
-
- index = child.expect([" bulk_default=16",
- pexpect.TIMEOUT], timeout = 1)
- if index != 0:
- return -1, "Failed: bad quota"
-
- return 0, "Success"
-
-def ring_genreport():
- s = "Performance curves\n"
- s += "------------------\n\n"
- sdk = os.getenv("RTE_SDK")
- script = os.path.join(sdk, "app/test/graph_ring.py")
- title ='"Autotest %s %s"'%(target, time.asctime())
- filename = target + ".txt"
- os.system("/usr/bin/python %s %s %s"%(script, filename, title))
- for f in os.listdir("."):
- if not f.startswith("ring"):
- continue
- if not f.endswith(".svg"):
- continue
- # skip single producer/consumer
- if "_sc" in f:
- continue
- if "_sp" in f:
- continue
- f = f[:-4] + ".png"
- s += ".. figure:: ../../images/autotests/%s/%s\n"%(target, f)
- s += " :width: 50%\n\n"
- s += " %s\n\n"%(f)
- return s
-
-def mempool_genreport():
- s = "Performance curves\n"
- s += "------------------\n\n"
- sdk = os.getenv("RTE_SDK")
- script = os.path.join(sdk, "app/test/graph_mempool.py")
- title ='"Autotest %s %s"'%(target, time.asctime())
- filename = target + ".txt"
- os.system("/usr/bin/python %s %s %s"%(script, filename, title))
- for f in os.listdir("."):
- if not f.startswith("mempool"):
- continue
- if not f.endswith(".svg"):
- continue
- # skip when n_keep = 128
- if "_128." in f:
- continue
- f = f[:-4] + ".png"
- s += ".. figure:: ../../images/autotests/%s/%s\n"%(target, f)
- s += " :width: 50%\n\n"
- s += " %s\n\n"%(f)
- return s
-
-#
-# main
-#
-
-if len(sys.argv) > 4:
- testlist=sys.argv[4].split(',')
- if testlist[0].startswith('-'):
- testlist[0]=testlist[0].lstrip('-')
- test_blacklist=testlist
- else:
- test_whitelist=testlist
-
-child = pexpect.spawn(cmdline)
-autotest = AutoTest(child, log_file,'w')
-
-# timeout for memcpy and hash test
+# get blacklist/whitelist
+if len(sys.argv) > 3:
+ testlist = sys.argv[3].split(',')
+ testlist = [test.lower() for test in testlist]
+ if testlist[0].startswith('-'):
+ testlist[0] = testlist[0].lstrip('-')
+ test_blacklist = testlist
+ else:
+ test_whitelist = testlist
+
+# adjust test command line
if "baremetal" in target:
- timeout = 60*180
+ cmdline = "qemu-system-x86_64 -cdrom %s.iso -boot d " % (sys.argv[1])
+ cmdline += "-m 2000 -smp 4 -nographic -net nic,model=e1000"
+ platform = "QEMU x86_64"
else:
- timeout = 180
-
-autotest.register("eal_report.rst", "EAL-%s"%(target),
- [ SubTest("Boot", boot_autotest, "boot_autotest"),
- SubTest("EAL Flags", default_autotest, "eal_flags_autotest"),
- SubTest("Version", default_autotest, "version_autotest"),
- SubTest("PCI", default_autotest, "pci_autotest"),
- SubTest("Memory", memory_autotest, "memory_autotest"),
- SubTest("Lcore launch", default_autotest, "per_lcore_autotest"),
- SubTest("Spinlock", spinlock_autotest, "spinlock_autotest"),
- SubTest("Rwlock", rwlock_autotest, "rwlock_autotest"),
- SubTest("Atomic", default_autotest, "atomic_autotest"),
- SubTest("Byte order", default_autotest, "byteorder_autotest"),
- SubTest("Prefetch", default_autotest, "prefetch_autotest"),
- SubTest("Debug", default_autotest, "debug_autotest"),
- SubTest("Cycles", default_autotest, "cycles_autotest"),
- SubTest("Logs", logs_autotest, "logs_autotest"),
- SubTest("Memzone", default_autotest, "memzone_autotest"),
- SubTest("Cpu flags", default_autotest, "cpuflags_autotest"),
- SubTest("Memcpy", default_autotest, "memcpy_autotest", timeout),
- SubTest("String Functions", default_autotest, "string_autotest"),
- SubTest("Alarm", default_autotest, "alarm_autotest", 30),
- SubTest("Interrupt", default_autotest, "interrupt_autotest"),
- ])
+ cmdline = "%s -c f -n 4"%(sys.argv[1])
-autotest.register("ring_report.rst", "Ring-%s"%(target),
- [ SubTest("Ring", ring_autotest, "ring_autotest", 30*60,
- ring_genreport)
- ])
-
-if "baremetal" in target:
- timeout = 60*60*3
-else:
- timeout = 60*30
+print cmdline
-autotest.register("mempool_report.rst", "Mempool-%s"%(target),
- [ SubTest("Mempool", default_autotest, "mempool_autotest",
- timeout, mempool_genreport)
- ])
-autotest.register("mbuf_report.rst", "Mbuf-%s"%(target),
- [ SubTest("Mbuf", default_autotest, "mbuf_autotest", timeout=120)
- ])
-autotest.register("timer_report.rst", "Timer-%s"%(target),
- [ SubTest("Timer", timer_autotest, "timer_autotest")
- ])
-autotest.register("malloc_report.rst", "Malloc-%s"%(target),
- [ SubTest("Malloc", default_autotest, "malloc_autotest")
- ])
+runner = autotest_runner.AutotestRunner(cmdline, target, test_blacklist, test_whitelist)
-# only do the hash autotest if supported by the platform
-if not (platform.startswith("Intel(R) Core(TM)2 Quad CPU") or
- platform.startswith("QEMU")):
- autotest.register("hash_report.rst", "Hash-%s"%(target),
- [ SubTest("Hash", default_autotest, "hash_autotest", timeout)
- ])
+for test_group in autotest_data.parallel_test_group_list:
+ runner.add_parallel_test_group(test_group)
-autotest.register("lpm_report.rst", "LPM-%s"%(target),
- [ SubTest("Lpm", default_autotest, "lpm_autotest", timeout)
- ])
-autotest.register("eal2_report.rst", "EAL2-%s"%(target),
- [ SubTest("TailQ", default_autotest, "tailq_autotest"),
- SubTest("Errno", default_autotest, "errno_autotest"),
- SubTest("Multiprocess", default_autotest, "multiprocess_autotest")
- ])
+for test_group in autotest_data.non_parallel_test_group_list:
+ runner.add_non_parallel_test_group(test_group)
-autotest.start()
-autotest.gen_report()
+runner.run_all_tests()
-quit(child)
-child.terminate()
-sys.exit(0)