diff options
author | juraj.linkes <juraj.linkes@pantheon.tech> | 2018-07-16 14:22:01 +0200 |
---|---|---|
committer | Dave Barach <openvpp@barachs.net> | 2018-08-23 14:19:35 +0000 |
commit | 184870ac5a266c37987e4a4d97ab4d4efefacb1f (patch) | |
tree | 5f9bb444c2f1670b6620b2c62177cb0642ac37fc /test/framework.py | |
parent | 99ddcc3f8b5549252fcf834e2a25aa39ba0b880e (diff) |
CSIT-1139: Implement parallel test execution
The implementation of parallel test execution in VPP Test Framework.
- VPPTestCase test methods are grouped together
- tests are running in separate processes
- VPP instances spawned by tests are assigned to different cores
- output from these processes is redirected through pipes and printed
out testcase by testcase
- TEST_JOBS env var is used to specify the number of parallel processes
- improved test summary
- a bit of code cleanup
Change-Id: I9ca93904d9fe2c3daf980500c64a8611838ae28c
Signed-off-by: juraj.linkes <juraj.linkes@pantheon.tech>
Diffstat (limited to 'test/framework.py')
-rw-r--r-- | test/framework.py | 204 |
1 files changed, 72 insertions, 132 deletions
diff --git a/test/framework.py b/test/framework.py index 4f7c76a6939..6a5477d2ea7 100644 --- a/test/framework.py +++ b/test/framework.py @@ -11,6 +11,7 @@ import time import faulthandler import random import copy +import psutil from collections import deque from threading import Thread, Event from inspect import getdoc, isclass @@ -212,6 +213,35 @@ class VppTestCase(unittest.TestCase): raise Exception("Unrecognized DEBUG option: '%s'" % d) @classmethod + def get_least_used_cpu(self): + cpu_usage_list = [set(range(psutil.cpu_count()))] + vpp_processes = [p for p in psutil.process_iter(attrs=['pid', 'name']) + if 'vpp_main' == p.info['name']] + for vpp_process in vpp_processes: + for cpu_usage_set in cpu_usage_list: + try: + cpu_num = vpp_process.cpu_num() + if cpu_num in cpu_usage_set: + cpu_usage_set_index = cpu_usage_list.index( + cpu_usage_set) + if cpu_usage_set_index == len(cpu_usage_list) - 1: + cpu_usage_list.append({cpu_num}) + else: + cpu_usage_list[cpu_usage_set_index + 1].add( + cpu_num) + cpu_usage_set.remove(cpu_num) + break + except psutil.NoSuchProcess: + pass + + for cpu_usage_set in cpu_usage_list: + if len(cpu_usage_set) > 0: + min_usage_set = cpu_usage_set + break + + return random.choice(tuple(min_usage_set)) + + @classmethod def setUpConstants(cls): """ Set-up the test case class based on environment variables """ s = os.getenv("STEP", "n") @@ -241,10 +271,14 @@ class VppTestCase(unittest.TestCase): coredump_size = "coredump-size %s" % size if coredump_size is None: coredump_size = "coredump-size unlimited" + + cpu_core_number = cls.get_least_used_cpu() + cls.vpp_cmdline = [cls.vpp_bin, "unix", "{", "nodaemon", debug_cli, "full-coredump", coredump_size, "}", "api-trace", "{", "on", "}", "api-segment", "{", "prefix", cls.shm_prefix, "}", + "cpu", "{", "main-core", str(cpu_core_number), "}", "plugins", "{", "plugin", "dpdk_plugin.so", "{", "disable", "}", "plugin", "unittest_plugin.so", "{", "enable", "}", "}", ] @@ -310,7 +344,10 @@ class VppTestCase(unittest.TestCase): """ gc.collect() # run garbage collection first random.seed() - cls.logger = getLogger(cls.__name__) + if not hasattr(cls, 'logger'): + cls.logger = getLogger(cls.__name__) + else: + cls.logger.name = cls.__name__ cls.tempdir = tempfile.mkdtemp( prefix='vpp-unittest-%s-' % cls.__name__) cls.file_handler = FileHandler("%s/log.txt" % cls.tempdir) @@ -319,7 +356,7 @@ class VppTestCase(unittest.TestCase): datefmt="%H:%M:%S")) cls.file_handler.setLevel(DEBUG) cls.logger.addHandler(cls.file_handler) - cls.shm_prefix = cls.tempdir.split("/")[-1] + cls.shm_prefix = os.path.basename(cls.tempdir) os.chdir(cls.tempdir) cls.logger.info("Temporary dir is %s, shm prefix is %s", cls.tempdir, cls.shm_prefix) @@ -392,8 +429,11 @@ class VppTestCase(unittest.TestCase): raw_input("When done debugging, press ENTER to kill the " "process and finish running the testcase...") - cls.pump_thread_stop_flag.set() - os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up') + # first signal that we want to stop the pump thread, then wake it up + if hasattr(cls, 'pump_thread_stop_flag'): + cls.pump_thread_stop_flag.set() + if hasattr(cls, 'pump_thread_wakeup_pipe'): + os.write(cls.pump_thread_wakeup_pipe[1], 'ding dong wake up') if hasattr(cls, 'pump_thread'): cls.logger.debug("Waiting for pump thread to stop") cls.pump_thread.join() @@ -859,6 +899,19 @@ class VppTestCase(unittest.TestCase): return rx +def get_testcase_doc_name(test): + return getdoc(test.__class__).splitlines()[0] + + +def get_test_description(descriptions, test): + # TODO: if none print warning not raise exception + short_description = test.shortDescription() + if descriptions and short_description: + return short_description + else: + return str(test) + + class TestCasePrinter(object): _shared_state = {} @@ -870,7 +923,7 @@ class TestCasePrinter(object): def print_test_case_heading_if_first_time(self, case): if case.__class__ not in self._test_case_set: print(double_line_delim) - print(colorize(getdoc(case.__class__).splitlines()[0], YELLOW)) + print(colorize(get_testcase_doc_name(case), GREEN)) print(double_line_delim) self._test_case_set.add(case.__class__) @@ -944,41 +997,22 @@ class VppTestResult(unittest.TestResult): if hasattr(test, 'tempdir'): try: failed_dir = os.getenv('VPP_TEST_FAILED_DIR') - link_path = '%s/%s-FAILED' % (failed_dir, - test.tempdir.split("/")[-1]) + link_path = os.path.join(failed_dir, '%s-FAILED' % + os.path.basename(test.tempdir)) if logger: logger.debug("creating a link to the failed test") logger.debug("os.symlink(%s, %s)" % (test.tempdir, link_path)) - os.symlink(test.tempdir, link_path) + if os.path.exists(link_path): + if logger: + logger.debug('symlink already exists') + else: + os.symlink(test.tempdir, link_path) + except Exception as e: if logger: logger.error(e) - def send_failure_through_pipe(self, test): - if hasattr(self, 'test_framework_failed_pipe'): - pipe = self.test_framework_failed_pipe - if pipe: - if test.__class__.__name__ == "_ErrorHolder": - x = str(test) - if x.startswith("setUpClass"): - # x looks like setUpClass (test_function.test_class) - cls = x.split(".")[1].split(")")[0] - for t in self.test_suite: - if t.__class__.__name__ == cls: - pipe.send(t.__class__) - break - else: - raise Exception("Can't find class name `%s' " - "(from ErrorHolder) in test suite " - "`%s'" % (cls, self.test_suite)) - else: - raise Exception("FIXME: unexpected special case - " - "ErrorHolder description is `%s'" % - str(test)) - else: - pipe.send(test.__class__) - def addFailure(self, test, err): """ Record a test failed result @@ -1002,8 +1036,6 @@ class VppTestResult(unittest.TestResult): else: self.result_string = colorize("FAIL", RED) + ' [no temp dir]' - self.send_failure_through_pipe(test) - def addError(self, test, err): """ Record a test error result @@ -1027,8 +1059,6 @@ class VppTestResult(unittest.TestResult): else: self.result_string = colorize("ERROR", RED) + ' [no temp dir]' - self.send_failure_through_pipe(test) - def getDescription(self, test): """ Get test description @@ -1037,12 +1067,7 @@ class VppTestResult(unittest.TestResult): :returns: test description """ - # TODO: if none print warning not raise exception - short_description = test.shortDescription() - if self.descriptions and short_description: - return short_description - else: - return str(test) + return get_test_description(self.descriptions, test) def startTest(self, test): """ @@ -1100,22 +1125,6 @@ class VppTestResult(unittest.TestResult): self.stream.writeln("%s" % err) -class Filter_by_test_option: - def __init__(self, filter_file_name, filter_class_name, filter_func_name): - self.filter_file_name = filter_file_name - self.filter_class_name = filter_class_name - self.filter_func_name = filter_func_name - - def __call__(self, file_name, class_name, func_name): - if self.filter_file_name and file_name != self.filter_file_name: - return False - if self.filter_class_name and class_name != self.filter_class_name: - return False - if self.filter_func_name and func_name != self.filter_func_name: - return False - return True - - class VppTestRunner(unittest.TextTestRunner): """ A basic test runner implementation which prints results to standard error. @@ -1125,9 +1134,8 @@ class VppTestRunner(unittest.TextTestRunner): """Class maintaining the results of the tests""" return VppTestResult - def __init__(self, keep_alive_pipe=None, failed_pipe=None, - stream=sys.stderr, descriptions=True, - verbosity=1, failfast=False, buffer=False, resultclass=None): + def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1, + failfast=False, buffer=False, resultclass=None): # ignore stream setting here, use hard-coded stdout to be in sync # with prints from VppTestCase methods ... super(VppTestRunner, self).__init__(sys.stdout, descriptions, @@ -1135,63 +1143,6 @@ class VppTestRunner(unittest.TextTestRunner): resultclass) reporter = KeepAliveReporter() reporter.pipe = keep_alive_pipe - # this is super-ugly, but very simple to implement and works as long - # as we run only one test at the same time - VppTestResult.test_framework_failed_pipe = failed_pipe - - test_option = "TEST" - - def parse_test_option(self): - f = os.getenv(self.test_option, None) - filter_file_name = None - filter_class_name = None - filter_func_name = None - if f: - if '.' in f: - parts = f.split('.') - if len(parts) > 3: - raise Exception("Unrecognized %s option: %s" % - (self.test_option, f)) - if len(parts) > 2: - if parts[2] not in ('*', ''): - filter_func_name = parts[2] - if parts[1] not in ('*', ''): - filter_class_name = parts[1] - if parts[0] not in ('*', ''): - if parts[0].startswith('test_'): - filter_file_name = parts[0] - else: - filter_file_name = 'test_%s' % parts[0] - else: - if f.startswith('test_'): - filter_file_name = f - else: - filter_file_name = 'test_%s' % f - return filter_file_name, filter_class_name, filter_func_name - - @staticmethod - def filter_tests(tests, filter_cb): - result = unittest.suite.TestSuite() - for t in tests: - if isinstance(t, unittest.suite.TestSuite): - # this is a bunch of tests, recursively filter... - x = VppTestRunner.filter_tests(t, filter_cb) - if x.countTestCases() > 0: - result.addTest(x) - elif isinstance(t, unittest.TestCase): - # this is a single test - parts = t.id().split('.') - # t.id() for common cases like this: - # test_classifier.TestClassifier.test_acl_ip - # apply filtering only if it is so - if len(parts) == 3: - if not filter_cb(parts[0], parts[1], parts[2]): - continue - result.addTest(t) - else: - # unexpected object, don't touch it - result.addTest(t) - return result def run(self, test): """ @@ -1201,20 +1152,9 @@ class VppTestRunner(unittest.TextTestRunner): """ faulthandler.enable() # emit stack trace to stderr if killed by signal - print("Running tests using custom test runner") # debug message - filter_file, filter_class, filter_func = self.parse_test_option() - print("Active filters: file=%s, class=%s, function=%s" % ( - filter_file, filter_class, filter_func)) - filter_cb = Filter_by_test_option( - filter_file, filter_class, filter_func) - filtered = self.filter_tests(test, filter_cb) - print("%s out of %s tests match specified filters" % ( - filtered.countTestCases(), test.countTestCases())) - if not running_extended_tests(): - print("Not running extended tests (some tests will be skipped)") - # super-ugly hack #2 - VppTestResult.test_suite = filtered - return super(VppTestRunner, self).run(filtered) + + result = super(VppTestRunner, self).run(test) + return result class Worker(Thread): |