aboutsummaryrefslogtreecommitdiffstats
path: root/test/run_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/run_tests.py')
-rw-r--r--test/run_tests.py511
1 files changed, 305 insertions, 206 deletions
diff --git a/test/run_tests.py b/test/run_tests.py
index 8cb631ed660..5df37efba6b 100644
--- a/test/run_tests.py
+++ b/test/run_tests.py
@@ -16,12 +16,28 @@ from multiprocessing.queues import Queue
from multiprocessing.managers import BaseManager
import framework
from config import config, num_cpus, available_cpus, max_vpp_cpus
-from framework import VppTestRunner, VppTestCase, \
- get_testcase_doc_name, get_test_description, PASS, FAIL, ERROR, SKIP, \
- TEST_RUN, SKIP_CPU_SHORTAGE
+from framework import (
+ VppTestRunner,
+ VppTestCase,
+ get_testcase_doc_name,
+ get_test_description,
+ PASS,
+ FAIL,
+ ERROR,
+ SKIP,
+ TEST_RUN,
+ SKIP_CPU_SHORTAGE,
+)
from debug import spawn_gdb, start_vpp_in_gdb
-from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
- colorize, single_line_delim
+from log import (
+ get_parallel_logger,
+ double_line_delim,
+ RED,
+ YELLOW,
+ GREEN,
+ colorize,
+ single_line_delim,
+)
from discover_tests import discover_tests
import sanity_run_vpp
from subprocess import check_output, CalledProcessError
@@ -50,7 +66,7 @@ class StreamQueueManager(BaseManager):
pass
-StreamQueueManager.register('StreamQueue', StreamQueue)
+StreamQueueManager.register("StreamQueue", StreamQueue)
class TestResult(dict):
@@ -68,9 +84,11 @@ class TestResult(dict):
self.testcases_by_id = testcases_by_id
def was_successful(self):
- return 0 == len(self[FAIL]) == len(self[ERROR]) \
- and len(self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]) \
+ return (
+ 0 == len(self[FAIL]) == len(self[ERROR])
+ and len(self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE])
== self.testcase_suite.countTestCases()
+ )
def no_tests_run(self):
return 0 == len(self[TEST_RUN])
@@ -90,10 +108,11 @@ class TestResult(dict):
def get_testcase_names(self, test_id):
# could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
setup_teardown_match = re.match(
- r'((tearDownClass)|(setUpClass)) \((.+\..+)\)', test_id)
+ r"((tearDownClass)|(setUpClass)) \((.+\..+)\)", test_id
+ )
if setup_teardown_match:
test_name, _, _, testcase_name = setup_teardown_match.groups()
- if len(testcase_name.split('.')) == 2:
+ if len(testcase_name.split(".")) == 2:
for key in self.testcases_by_id.keys():
if key.startswith(testcase_name):
testcase_name = key
@@ -107,8 +126,7 @@ class TestResult(dict):
def _get_test_description(self, test_id):
if test_id in self.testcases_by_id:
- desc = get_test_description(descriptions,
- self.testcases_by_id[test_id])
+ desc = get_test_description(descriptions, self.testcases_by_id[test_id])
else:
desc = test_id
return desc
@@ -121,17 +139,20 @@ class TestResult(dict):
return doc_name
-def test_runner_wrapper(suite, keep_alive_pipe, stdouterr_queue,
- finished_pipe, result_pipe, logger):
+def test_runner_wrapper(
+ suite, keep_alive_pipe, stdouterr_queue, finished_pipe, result_pipe, logger
+):
sys.stdout = stdouterr_queue
sys.stderr = stdouterr_queue
VppTestCase.parallel_handler = logger.handlers[0]
- result = VppTestRunner(keep_alive_pipe=keep_alive_pipe,
- descriptions=descriptions,
- verbosity=config.verbose,
- result_pipe=result_pipe,
- failfast=config.failfast,
- print_summary=False).run(suite)
+ result = VppTestRunner(
+ keep_alive_pipe=keep_alive_pipe,
+ descriptions=descriptions,
+ verbosity=config.verbose,
+ result_pipe=result_pipe,
+ failfast=config.failfast,
+ print_summary=False,
+ ).run(suite)
finished_pipe.send(result.wasSuccessful())
finished_pipe.close()
keep_alive_pipe.close()
@@ -139,21 +160,23 @@ def test_runner_wrapper(suite, keep_alive_pipe, stdouterr_queue,
class TestCaseWrapper(object):
def __init__(self, testcase_suite, manager):
- self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(
- duplex=False)
+ self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(duplex=False)
self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
self.result_parent_end, self.result_child_end = Pipe(duplex=False)
self.testcase_suite = testcase_suite
self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
self.logger = get_parallel_logger(self.stdouterr_queue)
- self.child = Process(target=test_runner_wrapper,
- args=(testcase_suite,
- self.keep_alive_child_end,
- self.stdouterr_queue,
- self.finished_child_end,
- self.result_child_end,
- self.logger)
- )
+ self.child = Process(
+ target=test_runner_wrapper,
+ args=(
+ testcase_suite,
+ self.keep_alive_child_end,
+ self.stdouterr_queue,
+ self.finished_child_end,
+ self.result_child_end,
+ self.logger,
+ ),
+ )
self.child.start()
self.last_test_temp_dir = None
self.last_test_vpp_binary = None
@@ -187,18 +210,20 @@ class TestCaseWrapper(object):
if self.last_test_id in self.testcases_by_id:
test = self.testcases_by_id[self.last_test_id]
class_name = unittest.util.strclass(test.__class__)
- test_name = "'{}' ({})".format(get_test_description(descriptions,
- test),
- self.last_test_id)
+ test_name = "'{}' ({})".format(
+ get_test_description(descriptions, test), self.last_test_id
+ )
else:
test_name = self.last_test_id
- class_name = re.match(r'((tearDownClass)|(setUpClass)) '
- r'\((.+\..+)\)', test_name).groups()[3]
+ class_name = re.match(
+ r"((tearDownClass)|(setUpClass)) " r"\((.+\..+)\)", test_name
+ ).groups()[3]
if class_name not in self.testclasess_with_core:
self.testclasess_with_core[class_name] = (
test_name,
self.last_test_vpp_binary,
- self.last_test_temp_dir)
+ self.last_test_temp_dir,
+ )
def close_pipes(self):
self.keep_alive_child_end.close()
@@ -219,8 +244,9 @@ class TestCaseWrapper(object):
return self.testcase_suite.get_assigned_cpus()
-def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
- read_testcases):
+def stdouterr_reader_wrapper(
+ unread_testcases, finished_unread_testcases, read_testcases
+):
read_testcase = None
while read_testcases.is_set() or unread_testcases:
if finished_unread_testcases:
@@ -229,7 +255,7 @@ def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
elif unread_testcases:
read_testcase = unread_testcases.pop()
if read_testcase:
- data = ''
+ data = ""
while data is not None:
sys.stdout.write(data)
data = read_testcase.stdouterr_queue.get()
@@ -243,52 +269,62 @@ def handle_failed_suite(logger, last_test_temp_dir, vpp_pid, vpp_binary):
if last_test_temp_dir:
# Need to create link in case of a timeout or core dump without failure
lttd = os.path.basename(last_test_temp_dir)
- link_path = '%s%s-FAILED' % (config.failed_dir, lttd)
+ link_path = "%s%s-FAILED" % (config.failed_dir, lttd)
if not os.path.exists(link_path):
os.symlink(last_test_temp_dir, link_path)
- logger.error("Symlink to failed testcase directory: %s -> %s"
- % (link_path, lttd))
+ logger.error(
+ "Symlink to failed testcase directory: %s -> %s" % (link_path, lttd)
+ )
# Report core existence
core_path = get_core_path(last_test_temp_dir)
if os.path.exists(core_path):
logger.error(
- "Core-file exists in test temporary directory: %s!" %
- core_path)
+ "Core-file exists in test temporary directory: %s!" % core_path
+ )
check_core_path(logger, core_path)
logger.debug("Running 'file %s':" % core_path)
try:
info = check_output(["file", core_path])
logger.debug(info)
except CalledProcessError as e:
- logger.error("Subprocess returned with return code "
- "while running `file' utility on core-file "
- "returned: "
- "rc=%s", e.returncode)
+ logger.error(
+ "Subprocess returned with return code "
+ "while running `file' utility on core-file "
+ "returned: "
+ "rc=%s",
+ e.returncode,
+ )
except OSError as e:
- logger.error("Subprocess returned with OS error while "
- "running 'file' utility "
- "on core-file: "
- "(%s) %s", e.errno, e.strerror)
+ logger.error(
+ "Subprocess returned with OS error while "
+ "running 'file' utility "
+ "on core-file: "
+ "(%s) %s",
+ e.errno,
+ e.strerror,
+ )
except Exception as e:
- logger.exception("Unexpected error running `file' utility "
- "on core-file")
+ logger.exception("Unexpected error running `file' utility on core-file")
logger.error(f"gdb {vpp_binary} {core_path}")
if vpp_pid:
# Copy api post mortem
api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
if os.path.isfile(api_post_mortem_path):
- logger.error("Copying api_post_mortem.%d to %s" %
- (vpp_pid, last_test_temp_dir))
+ logger.error(
+ "Copying api_post_mortem.%d to %s" % (vpp_pid, last_test_temp_dir)
+ )
shutil.copy2(api_post_mortem_path, last_test_temp_dir)
def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
if is_core_present(tempdir):
if debug_core:
- print('VPP core detected in %s. Last test running was %s' %
- (tempdir, core_crash_test))
+ print(
+ "VPP core detected in %s. Last test running was %s"
+ % (tempdir, core_crash_test)
+ )
print(single_line_delim)
spawn_gdb(vpp_binary, get_core_path(tempdir))
print(single_line_delim)
@@ -305,10 +341,9 @@ def handle_cores(failed_testcases):
check_and_handle_core(vpp_binary, tempdir, test)
-def process_finished_testsuite(wrapped_testcase_suite,
- finished_testcase_suites,
- failed_wrapped_testcases,
- results):
+def process_finished_testsuite(
+ wrapped_testcase_suite, finished_testcase_suites, failed_wrapped_testcases, results
+):
results.append(wrapped_testcase_suite.result)
finished_testcase_suites.add(wrapped_testcase_suite)
stop_run = False
@@ -317,10 +352,12 @@ def process_finished_testsuite(wrapped_testcase_suite,
if not wrapped_testcase_suite.was_successful():
failed_wrapped_testcases.add(wrapped_testcase_suite)
- handle_failed_suite(wrapped_testcase_suite.logger,
- wrapped_testcase_suite.last_test_temp_dir,
- wrapped_testcase_suite.vpp_pid,
- wrapped_testcase_suite.last_test_vpp_binary,)
+ handle_failed_suite(
+ wrapped_testcase_suite.logger,
+ wrapped_testcase_suite.last_test_temp_dir,
+ wrapped_testcase_suite.vpp_pid,
+ wrapped_testcase_suite.last_test_vpp_binary,
+ )
return stop_run
@@ -355,17 +392,17 @@ def run_forked(testcase_suites):
nonlocal wrapped_testcase_suites
nonlocal unread_testcases
nonlocal free_cpus
- suite.assign_cpus(free_cpus[:suite.cpus_used])
- free_cpus = free_cpus[suite.cpus_used:]
+ suite.assign_cpus(free_cpus[: suite.cpus_used])
+ free_cpus = free_cpus[suite.cpus_used :]
wrapper = TestCaseWrapper(suite, manager)
wrapped_testcase_suites.add(wrapper)
unread_testcases.add(wrapper)
on_suite_start(suite)
def can_run_suite(suite):
- return (tests_running < max_concurrent_tests and
- (suite.cpus_used <= len(free_cpus) or
- suite.cpus_used > max_vpp_cpus))
+ return tests_running < max_concurrent_tests and (
+ suite.cpus_used <= len(free_cpus) or suite.cpus_used > max_vpp_cpus
+ )
while free_cpus and testcase_suites:
a_suite = testcase_suites[0]
@@ -385,10 +422,10 @@ def run_forked(testcase_suites):
read_from_testcases = threading.Event()
read_from_testcases.set()
- stdouterr_thread = threading.Thread(target=stdouterr_reader_wrapper,
- args=(unread_testcases,
- finished_unread_testcases,
- read_from_testcases))
+ stdouterr_thread = threading.Thread(
+ target=stdouterr_reader_wrapper,
+ args=(unread_testcases, finished_unread_testcases, read_from_testcases),
+ )
stdouterr_thread.start()
failed_wrapped_testcases = set()
@@ -400,59 +437,75 @@ def run_forked(testcase_suites):
for wrapped_testcase_suite in wrapped_testcase_suites:
while wrapped_testcase_suite.result_parent_end.poll():
wrapped_testcase_suite.result.process_result(
- *wrapped_testcase_suite.result_parent_end.recv())
+ *wrapped_testcase_suite.result_parent_end.recv()
+ )
wrapped_testcase_suite.last_heard = time.time()
while wrapped_testcase_suite.keep_alive_parent_end.poll():
- wrapped_testcase_suite.last_test, \
- wrapped_testcase_suite.last_test_vpp_binary, \
- wrapped_testcase_suite.last_test_temp_dir, \
- wrapped_testcase_suite.vpp_pid = \
- wrapped_testcase_suite.keep_alive_parent_end.recv()
+ (
+ wrapped_testcase_suite.last_test,
+ wrapped_testcase_suite.last_test_vpp_binary,
+ wrapped_testcase_suite.last_test_temp_dir,
+ wrapped_testcase_suite.vpp_pid,
+ ) = wrapped_testcase_suite.keep_alive_parent_end.recv()
wrapped_testcase_suite.last_heard = time.time()
if wrapped_testcase_suite.finished_parent_end.poll():
wrapped_testcase_suite.finished_parent_end.recv()
wrapped_testcase_suite.last_heard = time.time()
- stop_run = process_finished_testsuite(
- wrapped_testcase_suite,
- finished_testcase_suites,
- failed_wrapped_testcases,
- results) or stop_run
+ stop_run = (
+ process_finished_testsuite(
+ wrapped_testcase_suite,
+ finished_testcase_suites,
+ failed_wrapped_testcases,
+ results,
+ )
+ or stop_run
+ )
continue
fail = False
- if wrapped_testcase_suite.last_heard + config.timeout < \
- time.time():
+ if wrapped_testcase_suite.last_heard + config.timeout < time.time():
fail = True
wrapped_testcase_suite.logger.critical(
"Child test runner process timed out "
- "(last test running was `%s' in `%s')!" %
- (wrapped_testcase_suite.last_test,
- wrapped_testcase_suite.last_test_temp_dir))
+ "(last test running was `%s' in `%s')!"
+ % (
+ wrapped_testcase_suite.last_test,
+ wrapped_testcase_suite.last_test_temp_dir,
+ )
+ )
elif not wrapped_testcase_suite.child.is_alive():
fail = True
wrapped_testcase_suite.logger.critical(
"Child test runner process unexpectedly died "
- "(last test running was `%s' in `%s')!" %
- (wrapped_testcase_suite.last_test,
- wrapped_testcase_suite.last_test_temp_dir))
- elif wrapped_testcase_suite.last_test_temp_dir and \
- wrapped_testcase_suite.last_test_vpp_binary:
- if is_core_present(
- wrapped_testcase_suite.last_test_temp_dir):
+ "(last test running was `%s' in `%s')!"
+ % (
+ wrapped_testcase_suite.last_test,
+ wrapped_testcase_suite.last_test_temp_dir,
+ )
+ )
+ elif (
+ wrapped_testcase_suite.last_test_temp_dir
+ and wrapped_testcase_suite.last_test_vpp_binary
+ ):
+ if is_core_present(wrapped_testcase_suite.last_test_temp_dir):
wrapped_testcase_suite.add_testclass_with_core()
if wrapped_testcase_suite.core_detected_at is None:
- wrapped_testcase_suite.core_detected_at = \
- time.time()
- elif wrapped_testcase_suite.core_detected_at + \
- core_timeout < time.time():
+ wrapped_testcase_suite.core_detected_at = time.time()
+ elif (
+ wrapped_testcase_suite.core_detected_at + core_timeout
+ < time.time()
+ ):
wrapped_testcase_suite.logger.critical(
"Child test runner process unresponsive and "
"core-file exists in test temporary directory "
- "(last test running was `%s' in `%s')!" %
- (wrapped_testcase_suite.last_test,
- wrapped_testcase_suite.last_test_temp_dir))
+ "(last test running was `%s' in `%s')!"
+ % (
+ wrapped_testcase_suite.last_test,
+ wrapped_testcase_suite.last_test_temp_dir,
+ )
+ )
fail = True
if fail:
@@ -461,19 +514,23 @@ def run_forked(testcase_suites):
# terminating the child process tends to leave orphan
# VPP process around
if wrapped_testcase_suite.vpp_pid:
- os.kill(wrapped_testcase_suite.vpp_pid,
- signal.SIGTERM)
+ os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
except OSError:
# already dead
pass
wrapped_testcase_suite.result.crashed = True
wrapped_testcase_suite.result.process_result(
- wrapped_testcase_suite.last_test_id, ERROR)
- stop_run = process_finished_testsuite(
- wrapped_testcase_suite,
- finished_testcase_suites,
- failed_wrapped_testcases,
- results) or stop_run
+ wrapped_testcase_suite.last_test_id, ERROR
+ )
+ stop_run = (
+ process_finished_testsuite(
+ wrapped_testcase_suite,
+ finished_testcase_suites,
+ failed_wrapped_testcases,
+ results,
+ )
+ or stop_run
+ )
for finished_testcase in finished_testcase_suites:
# Somewhat surprisingly, the join below may
@@ -484,9 +541,9 @@ def run_forked(testcase_suites):
join_end = time.time()
if join_end - join_start >= test_finished_join_timeout:
finished_testcase.logger.error(
- "Timeout joining finished test: %s (pid %d)" %
- (finished_testcase.last_test,
- finished_testcase.child.pid))
+ "Timeout joining finished test: %s (pid %d)"
+ % (finished_testcase.last_test, finished_testcase.child.pid)
+ )
finished_testcase.close_pipes()
wrapped_testcase_suites.remove(finished_testcase)
finished_unread_testcases.add(finished_testcase)
@@ -548,7 +605,7 @@ class TestSuiteWrapper(unittest.TestSuite):
class SplitToSuitesCallback:
def __init__(self, filter_callback):
self.suites = {}
- self.suite_name = 'default'
+ self.suite_name = "default"
self.filter_callback = filter_callback
self.filtered = TestSuiteWrapper()
@@ -573,28 +630,27 @@ def parse_test_filter(test_filter):
filter_class_name = None
filter_func_name = None
if f:
- if '.' in f:
- parts = f.split('.')
+ if "." in f:
+ parts = f.split(".")
if len(parts) > 3:
- raise Exception("Unrecognized %s option: %s" %
- (test_option, f))
+ raise Exception("Unrecognized %s option: %s" % (test_option, f))
if len(parts) > 2:
- if parts[2] not in ('*', ''):
+ if parts[2] not in ("*", ""):
filter_func_name = parts[2]
- if parts[1] not in ('*', ''):
+ if parts[1] not in ("*", ""):
filter_class_name = parts[1]
- if parts[0] not in ('*', ''):
- if parts[0].startswith('test_'):
+ if parts[0] not in ("*", ""):
+ if parts[0].startswith("test_"):
filter_file_name = parts[0]
else:
- filter_file_name = 'test_%s' % parts[0]
+ filter_file_name = "test_%s" % parts[0]
else:
- if f.startswith('test_'):
+ if f.startswith("test_"):
filter_file_name = f
else:
- filter_file_name = 'test_%s' % f
+ filter_file_name = "test_%s" % f
if filter_file_name:
- filter_file_name = '%s.py' % filter_file_name
+ filter_file_name = "%s.py" % filter_file_name
return filter_file_name, filter_class_name, filter_func_name
@@ -608,7 +664,7 @@ def filter_tests(tests, filter_cb):
result.addTest(x)
elif isinstance(t, unittest.TestCase):
# this is a single test
- parts = t.id().split('.')
+ parts = t.id().split(".")
# t.id() for common cases like this:
# test_classifier.TestClassifier.test_acl_ip
# apply filtering only if it is so
@@ -645,11 +701,11 @@ class FilterByClassList:
self.classes_with_filenames = classes_with_filenames
def __call__(self, file_name, class_name, func_name):
- return '.'.join([file_name, class_name]) in self.classes_with_filenames
+ return ".".join([file_name, class_name]) in self.classes_with_filenames
def suite_from_failed(suite, failed):
- failed = {x.rsplit('.', 1)[0] for x in failed}
+ failed = {x.rsplit(".", 1)[0] for x in failed}
filter_cb = FilterByClassList(failed)
suite = filter_tests(suite, filter_cb)
return suite
@@ -695,9 +751,9 @@ class AllResults(dict):
return retval
def print_results(self):
- print('')
+ print("")
print(double_line_delim)
- print('TEST RESULTS:')
+ print("TEST RESULTS:")
def indent_results(lines):
lines = list(filter(None, lines))
@@ -707,62 +763,86 @@ class AllResults(dict):
padding = " " * (maximum - l.index(":"))
print(f"{padding}{l}")
- indent_results([
- f'Scheduled tests: {self.all_testcases}',
- f'Executed tests: {self[TEST_RUN]}',
- f'Passed tests: {colorize(self[PASS], GREEN)}',
- f'Skipped tests: {colorize(self[SKIP], YELLOW)}'
- if self[SKIP] else None,
- f'Not Executed tests: {colorize(self.not_executed, RED)}'
- if self.not_executed else None,
- f'Failures: {colorize(self[FAIL], RED)}' if self[FAIL] else None,
- f'Errors: {colorize(self[ERROR], RED)}' if self[ERROR] else None,
- 'Tests skipped due to lack of CPUS: '
- f'{colorize(self[SKIP_CPU_SHORTAGE], YELLOW)}'
- if self[SKIP_CPU_SHORTAGE] else None
- ])
+ indent_results(
+ [
+ f"Scheduled tests: {self.all_testcases}",
+ f"Executed tests: {self[TEST_RUN]}",
+ f"Passed tests: {colorize(self[PASS], GREEN)}",
+ f"Skipped tests: {colorize(self[SKIP], YELLOW)}"
+ if self[SKIP]
+ else None,
+ f"Not Executed tests: {colorize(self.not_executed, RED)}"
+ if self.not_executed
+ else None,
+ f"Failures: {colorize(self[FAIL], RED)}" if self[FAIL] else None,
+ f"Errors: {colorize(self[ERROR], RED)}" if self[ERROR] else None,
+ "Tests skipped due to lack of CPUS: "
+ f"{colorize(self[SKIP_CPU_SHORTAGE], YELLOW)}"
+ if self[SKIP_CPU_SHORTAGE]
+ else None,
+ ]
+ )
if self.all_failed > 0:
- print('FAILURES AND ERRORS IN TESTS:')
+ print("FAILURES AND ERRORS IN TESTS:")
for result in self.results_per_suite:
failed_testcase_ids = result[FAIL]
errored_testcase_ids = result[ERROR]
old_testcase_name = None
if failed_testcase_ids:
for failed_test_id in failed_testcase_ids:
- new_testcase_name, test_name = \
- result.get_testcase_names(failed_test_id)
+ new_testcase_name, test_name = result.get_testcase_names(
+ failed_test_id
+ )
if new_testcase_name != old_testcase_name:
- print(' Testcase name: {}'.format(
- colorize(new_testcase_name, RED)))
+ print(
+ " Testcase name: {}".format(
+ colorize(new_testcase_name, RED)
+ )
+ )
old_testcase_name = new_testcase_name
- print(' FAILURE: {} [{}]'.format(
- colorize(test_name, RED), failed_test_id))
+ print(
+ " FAILURE: {} [{}]".format(
+ colorize(test_name, RED), failed_test_id
+ )
+ )
if errored_testcase_ids:
for errored_test_id in errored_testcase_ids:
- new_testcase_name, test_name = \
- result.get_testcase_names(errored_test_id)
+ new_testcase_name, test_name = result.get_testcase_names(
+ errored_test_id
+ )
if new_testcase_name != old_testcase_name:
- print(' Testcase name: {}'.format(
- colorize(new_testcase_name, RED)))
+ print(
+ " Testcase name: {}".format(
+ colorize(new_testcase_name, RED)
+ )
+ )
old_testcase_name = new_testcase_name
- print(' ERROR: {} [{}]'.format(
- colorize(test_name, RED), errored_test_id))
+ print(
+ " ERROR: {} [{}]".format(
+ colorize(test_name, RED), errored_test_id
+ )
+ )
if self.testsuites_no_tests_run:
- print('TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:')
+ print("TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:")
tc_classes = set()
for testsuite in self.testsuites_no_tests_run:
for testcase in testsuite:
tc_classes.add(get_testcase_doc_name(testcase))
for tc_class in tc_classes:
- print(' {}'.format(colorize(tc_class, RED)))
+ print(" {}".format(colorize(tc_class, RED)))
if self[SKIP_CPU_SHORTAGE]:
print()
- print(colorize(' SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT'
- ' ENOUGH CPUS AVAILABLE', YELLOW))
+ print(
+ colorize(
+ " SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
+ " ENOUGH CPUS AVAILABLE",
+ YELLOW,
+ )
+ )
print(double_line_delim)
- print('')
+ print("")
@property
def not_executed(self):
@@ -805,7 +885,7 @@ def parse_results(results):
return return_code, results_per_suite.rerun
-if __name__ == '__main__':
+if __name__ == "__main__":
print(f"Config is: {config}")
@@ -831,35 +911,41 @@ if __name__ == '__main__':
print(f"OS reports {num_cpus} available cpu(s).")
test_jobs = config.jobs
- if test_jobs == 'auto':
+ if test_jobs == "auto":
if run_interactive:
max_concurrent_tests = 1
- print('Interactive mode required, running tests consecutively.')
+ print("Interactive mode required, running tests consecutively.")
else:
max_concurrent_tests = num_cpus
- print(f"Running at most {max_concurrent_tests} python test "
- "processes concurrently.")
+ print(
+ f"Running at most {max_concurrent_tests} python test "
+ "processes concurrently."
+ )
else:
max_concurrent_tests = test_jobs
- print(f"Running at most {max_concurrent_tests} python test processes "
- "concurrently as set by 'TEST_JOBS'.")
+ print(
+ f"Running at most {max_concurrent_tests} python test processes "
+ "concurrently as set by 'TEST_JOBS'."
+ )
print(f"Using at most {max_vpp_cpus} cpus for VPP threads.")
if run_interactive and max_concurrent_tests > 1:
raise NotImplementedError(
- 'Running tests interactively (DEBUG is gdb[server] or ATTACH or '
- 'STEP is set) in parallel (TEST_JOBS is more than 1) is not '
- 'supported')
+ "Running tests interactively (DEBUG is gdb[server] or ATTACH or "
+ "STEP is set) in parallel (TEST_JOBS is more than 1) is not "
+ "supported"
+ )
descriptions = True
print("Running tests using custom test runner.")
- filter_file, filter_class, filter_func = \
- parse_test_filter(config.filter)
+ filter_file, filter_class, filter_func = parse_test_filter(config.filter)
- print("Selected filters: file=%s, class=%s, function=%s" % (
- filter_file, filter_class, filter_func))
+ print(
+ "Selected filters: file=%s, class=%s, function=%s"
+ % (filter_file, filter_class, filter_func)
+ )
filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
@@ -882,17 +968,19 @@ if __name__ == '__main__':
# in stopTest() (for that to trigger, test function must run)
for t in testcase_suite:
for m in dir(t):
- if m.startswith('test_'):
+ if m.startswith("test_"):
setattr(t, m, lambda: t.skipTest("not enough cpus"))
- setattr(t.__class__, 'setUpClass', lambda: None)
- setattr(t.__class__, 'tearDownClass', lambda: None)
- setattr(t, 'setUp', lambda: None)
- setattr(t, 'tearDown', lambda: None)
+ setattr(t.__class__, "setUpClass", lambda: None)
+ setattr(t.__class__, "tearDownClass", lambda: None)
+ setattr(t, "setUp", lambda: None)
+ setattr(t, "tearDown", lambda: None)
t.__class__.skipped_due_to_cpu_lack = True
suites.append(testcase_suite)
- print("%s out of %s tests match specified filters" % (
- tests_amount, tests_amount + cb.filtered.countTestCases()))
+ print(
+ "%s out of %s tests match specified filters"
+ % (tests_amount, tests_amount + cb.filtered.countTestCases())
+ )
if not config.extended:
print("Not running extended tests (some tests will be skipped)")
@@ -903,49 +991,60 @@ if __name__ == '__main__':
if run_interactive and suites:
# don't fork if requiring interactive terminal
- print('Running tests in foreground in the current process')
+ print("Running tests in foreground in the current process")
full_suite = unittest.TestSuite()
free_cpus = list(available_cpus)
cpu_shortage = False
for suite in suites:
if suite.cpus_used <= max_vpp_cpus:
- suite.assign_cpus(free_cpus[:suite.cpus_used])
+ suite.assign_cpus(free_cpus[: suite.cpus_used])
else:
suite.assign_cpus([])
cpu_shortage = True
full_suite.addTests(suites)
- result = VppTestRunner(verbosity=config.verbose,
- failfast=config.failfast,
- print_summary=True).run(full_suite)
+ result = VppTestRunner(
+ verbosity=config.verbose, failfast=config.failfast, print_summary=True
+ ).run(full_suite)
was_successful = result.wasSuccessful()
if not was_successful:
for test_case_info in result.failed_test_cases_info:
- handle_failed_suite(test_case_info.logger,
- test_case_info.tempdir,
- test_case_info.vpp_pid,
- config.vpp)
+ handle_failed_suite(
+ test_case_info.logger,
+ test_case_info.tempdir,
+ test_case_info.vpp_pid,
+ config.vpp,
+ )
if test_case_info in result.core_crash_test_cases_info:
- check_and_handle_core(test_case_info.vpp_bin_path,
- test_case_info.tempdir,
- test_case_info.core_crash_test)
+ check_and_handle_core(
+ test_case_info.vpp_bin_path,
+ test_case_info.tempdir,
+ test_case_info.core_crash_test,
+ )
if cpu_shortage:
print()
- print(colorize('SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT'
- ' ENOUGH CPUS AVAILABLE', YELLOW))
+ print(
+ colorize(
+ "SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
+ " ENOUGH CPUS AVAILABLE",
+ YELLOW,
+ )
+ )
print()
sys.exit(not was_successful)
else:
- print('Running each VPPTestCase in a separate background process'
- f' with at most {max_concurrent_tests} parallel python test '
- 'process(es)')
+ print(
+ "Running each VPPTestCase in a separate background process"
+ f" with at most {max_concurrent_tests} parallel python test "
+ "process(es)"
+ )
exit_code = 0
while suites and attempts > 0:
results = run_forked(suites)
exit_code, suites = parse_results(results)
attempts -= 1
if exit_code == 0:
- print('Test run was successful')
+ print("Test run was successful")
else:
- print('%s attempt(s) left.' % attempts)
+ print("%s attempt(s) left." % attempts)
sys.exit(exit_code)