summaryrefslogtreecommitdiffstats
path: root/test/run_tests.py
diff options
context:
space:
mode:
authorKlement Sekera <klement.sekera@gmail.com>2022-04-26 19:02:15 +0200
committerOle Tr�an <otroan@employees.org>2022-05-10 18:52:08 +0000
commitd9b0c6fbf7aa5bd9af84264105b39c82028a4a29 (patch)
tree4f786cfd8ebc2443cb11e11b74c8657204068898 /test/run_tests.py
parentf90348bcb4afd0af2611cefc43b17ef3042b511c (diff)
tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is much faster, stable PEP8 compliant code style checker offering also automatic formatting. It aims to be very stable and produce smallest diffs. It's used by many small and big projects. Running checkstyle with black takes a few seconds with a terse output. Thus, test-checkstyle-diff is no longer necessary. Expand scope of checkstyle to all python files in the repo, replacing test-checkstyle with checkstyle-python. Also, fixstyle-python is now available for automatic style formatting. Note: python virtualenv has been consolidated in test/Makefile, test/requirements*.txt which will eventually be moved to a central location. This is required to simply the automated generation of docker executor images in the CI. Type: improvement Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8 Signed-off-by: Klement Sekera <klement.sekera@gmail.com> Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'test/run_tests.py')
-rw-r--r--test/run_tests.py511
1 files changed, 305 insertions, 206 deletions
diff --git a/test/run_tests.py b/test/run_tests.py
index 8cb631ed660..5df37efba6b 100644
--- a/test/run_tests.py
+++ b/test/run_tests.py
@@ -16,12 +16,28 @@ from multiprocessing.queues import Queue
from multiprocessing.managers import BaseManager
import framework
from config import config, num_cpus, available_cpus, max_vpp_cpus
-from framework import VppTestRunner, VppTestCase, \
- get_testcase_doc_name, get_test_description, PASS, FAIL, ERROR, SKIP, \
- TEST_RUN, SKIP_CPU_SHORTAGE
+from framework import (
+ VppTestRunner,
+ VppTestCase,
+ get_testcase_doc_name,
+ get_test_description,
+ PASS,
+ FAIL,
+ ERROR,
+ SKIP,
+ TEST_RUN,
+ SKIP_CPU_SHORTAGE,
+)
from debug import spawn_gdb, start_vpp_in_gdb
-from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \
- colorize, single_line_delim
+from log import (
+ get_parallel_logger,
+ double_line_delim,
+ RED,
+ YELLOW,
+ GREEN,
+ colorize,
+ single_line_delim,
+)
from discover_tests import discover_tests
import sanity_run_vpp
from subprocess import check_output, CalledProcessError
@@ -50,7 +66,7 @@ class StreamQueueManager(BaseManager):
pass
-StreamQueueManager.register('StreamQueue', StreamQueue)
+StreamQueueManager.register("StreamQueue", StreamQueue)
class TestResult(dict):
@@ -68,9 +84,11 @@ class TestResult(dict):
self.testcases_by_id = testcases_by_id
def was_successful(self):
- return 0 == len(self[FAIL]) == len(self[ERROR]) \
- and len(self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]) \
+ return (
+ 0 == len(self[FAIL]) == len(self[ERROR])
+ and len(self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE])
== self.testcase_suite.countTestCases()
+ )
def no_tests_run(self):
return 0 == len(self[TEST_RUN])
@@ -90,10 +108,11 @@ class TestResult(dict):
def get_testcase_names(self, test_id):
# could be tearDownClass (test_ipsec_esp.TestIpsecEsp1)
setup_teardown_match = re.match(
- r'((tearDownClass)|(setUpClass)) \((.+\..+)\)', test_id)
+ r"((tearDownClass)|(setUpClass)) \((.+\..+)\)", test_id
+ )
if setup_teardown_match:
test_name, _, _, testcase_name = setup_teardown_match.groups()
- if len(testcase_name.split('.')) == 2:
+ if len(testcase_name.split(".")) == 2:
for key in self.testcases_by_id.keys():
if key.startswith(testcase_name):
testcase_name = key
@@ -107,8 +126,7 @@ class TestResult(dict):
def _get_test_description(self, test_id):
if test_id in self.testcases_by_id:
- desc = get_test_description(descriptions,
- self.testcases_by_id[test_id])
+ desc = get_test_description(descriptions, self.testcases_by_id[test_id])
else:
desc = test_id
return desc
@@ -121,17 +139,20 @@ class TestResult(dict):
return doc_name
-def test_runner_wrapper(suite, keep_alive_pipe, stdouterr_queue,
- finished_pipe, result_pipe, logger):
+def test_runner_wrapper(
+ suite, keep_alive_pipe, stdouterr_queue, finished_pipe, result_pipe, logger
+):
sys.stdout = stdouterr_queue
sys.stderr = stdouterr_queue
VppTestCase.parallel_handler = logger.handlers[0]
- result = VppTestRunner(keep_alive_pipe=keep_alive_pipe,
- descriptions=descriptions,
- verbosity=config.verbose,
- result_pipe=result_pipe,
- failfast=config.failfast,
- print_summary=False).run(suite)
+ result = VppTestRunner(
+ keep_alive_pipe=keep_alive_pipe,
+ descriptions=descriptions,
+ verbosity=config.verbose,
+ result_pipe=result_pipe,
+ failfast=config.failfast,
+ print_summary=False,
+ ).run(suite)
finished_pipe.send(result.wasSuccessful())
finished_pipe.close()
keep_alive_pipe.close()
@@ -139,21 +160,23 @@ def test_runner_wrapper(suite, keep_alive_pipe, stdouterr_queue,
class TestCaseWrapper(object):
def __init__(self, testcase_suite, manager):
- self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(
- duplex=False)
+ self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(duplex=False)
self.finished_parent_end, self.finished_child_end = Pipe(duplex=False)
self.result_parent_end, self.result_child_end = Pipe(duplex=False)
self.testcase_suite = testcase_suite
self.stdouterr_queue = manager.StreamQueue(ctx=get_context())
self.logger = get_parallel_logger(self.stdouterr_queue)
- self.child = Process(target=test_runner_wrapper,
- args=(testcase_suite,
- self.keep_alive_child_end,
- self.stdouterr_queue,
- self.finished_child_end,
- self.result_child_end,
- self.logger)
- )
+ self.child = Process(
+ target=test_runner_wrapper,
+ args=(
+ testcase_suite,
+ self.keep_alive_child_end,
+ self.stdouterr_queue,
+ self.finished_child_end,
+ self.result_child_end,
+ self.logger,
+ ),
+ )
self.child.start()
self.last_test_temp_dir = None
self.last_test_vpp_binary = None
@@ -187,18 +210,20 @@ class TestCaseWrapper(object):
if self.last_test_id in self.testcases_by_id:
test = self.testcases_by_id[self.last_test_id]
class_name = unittest.util.strclass(test.__class__)
- test_name = "'{}' ({})".format(get_test_description(descriptions,
- test),
- self.last_test_id)
+ test_name = "'{}' ({})".format(
+ get_test_description(descriptions, test), self.last_test_id
+ )
else:
test_name = self.last_test_id
- class_name = re.match(r'((tearDownClass)|(setUpClass)) '
- r'\((.+\..+)\)', test_name).groups()[3]
+ class_name = re.match(
+ r"((tearDownClass)|(setUpClass)) " r"\((.+\..+)\)", test_name
+ ).groups()[3]
if class_name not in self.testclasess_with_core:
self.testclasess_with_core[class_name] = (
test_name,
self.last_test_vpp_binary,
- self.last_test_temp_dir)
+ self.last_test_temp_dir,
+ )
def close_pipes(self):
self.keep_alive_child_end.close()
@@ -219,8 +244,9 @@ class TestCaseWrapper(object):
return self.testcase_suite.get_assigned_cpus()
-def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
- read_testcases):
+def stdouterr_reader_wrapper(
+ unread_testcases, finished_unread_testcases, read_testcases
+):
read_testcase = None
while read_testcases.is_set() or unread_testcases:
if finished_unread_testcases:
@@ -229,7 +255,7 @@ def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases,
elif unread_testcases:
read_testcase = unread_testcases.pop()
if read_testcase:
- data = ''
+ data = ""
while data is not None:
sys.stdout.write(data)
data = read_testcase.stdouterr_queue.get()
@@ -243,52 +269,62 @@ def handle_failed_suite(logger, last_test_temp_dir, vpp_pid, vpp_binary):
if last_test_temp_dir:
# Need to create link in case of a timeout or core dump without failure
lttd = os.path.basename(last_test_temp_dir)
- link_path = '%s%s-FAILED' % (config.failed_dir, lttd)
+ link_path = "%s%s-FAILED" % (config.failed_dir, lttd)
if not os.path.exists(link_path):
os.symlink(last_test_temp_dir, link_path)
- logger.error("Symlink to failed testcase directory: %s -> %s"
- % (link_path, lttd))
+ logger.error(
+ "Symlink to failed testcase directory: %s -> %s" % (link_path, lttd)
+ )
# Report core existence
core_path = get_core_path(last_test_temp_dir)
if os.path.exists(core_path):
logger.error(
- "Core-file exists in test temporary directory: %s!" %
- core_path)
+ "Core-file exists in test temporary directory: %s!" % core_path
+ )
check_core_path(logger, core_path)
logger.debug("Running 'file %s':" % core_path)
try:
info = check_output(["file", core_path])
logger.debug(info)
except CalledProcessError as e:
- logger.error("Subprocess returned with return code "
- "while running `file' utility on core-file "
- "returned: "
- "rc=%s", e.returncode)
+ logger.error(
+ "Subprocess returned with return code "
+ "while running `file' utility on core-file "
+ "returned: "
+ "rc=%s",
+ e.returncode,
+ )
except OSError as e:
- logger.error("Subprocess returned with OS error while "
- "running 'file' utility "
- "on core-file: "
- "(%s) %s", e.errno, e.strerror)
+ logger.error(
+ "Subprocess returned with OS error while "
+ "running 'file' utility "
+ "on core-file: "
+ "(%s) %s",
+ e.errno,
+ e.strerror,
+ )
except Exception as e:
- logger.exception("Unexpected error running `file' utility "
- "on core-file")
+ logger.exception("Unexpected error running `file' utility on core-file")
logger.error(f"gdb {vpp_binary} {core_path}")
if vpp_pid:
# Copy api post mortem
api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid
if os.path.isfile(api_post_mortem_path):
- logger.error("Copying api_post_mortem.%d to %s" %
- (vpp_pid, last_test_temp_dir))
+ logger.error(
+ "Copying api_post_mortem.%d to %s" % (vpp_pid, last_test_temp_dir)
+ )
shutil.copy2(api_post_mortem_path, last_test_temp_dir)
def check_and_handle_core(vpp_binary, tempdir, core_crash_test):
if is_core_present(tempdir):
if debug_core:
- print('VPP core detected in %s. Last test running was %s' %
- (tempdir, core_crash_test))
+ print(
+ "VPP core detected in %s. Last test running was %s"
+ % (tempdir, core_crash_test)
+ )
print(single_line_delim)
spawn_gdb(vpp_binary, get_core_path(tempdir))
print(single_line_delim)
@@ -305,10 +341,9 @@ def handle_cores(failed_testcases):
check_and_handle_core(vpp_binary, tempdir, test)
-def process_finished_testsuite(wrapped_testcase_suite,
- finished_testcase_suites,
- failed_wrapped_testcases,
- results):
+def process_finished_testsuite(
+ wrapped_testcase_suite, finished_testcase_suites, failed_wrapped_testcases, results
+):
results.append(wrapped_testcase_suite.result)
finished_testcase_suites.add(wrapped_testcase_suite)
stop_run = False
@@ -317,10 +352,12 @@ def process_finished_testsuite(wrapped_testcase_suite,
if not wrapped_testcase_suite.was_successful():
failed_wrapped_testcases.add(wrapped_testcase_suite)
- handle_failed_suite(wrapped_testcase_suite.logger,
- wrapped_testcase_suite.last_test_temp_dir,
- wrapped_testcase_suite.vpp_pid,
- wrapped_testcase_suite.last_test_vpp_binary,)
+ handle_failed_suite(
+ wrapped_testcase_suite.logger,
+ wrapped_testcase_suite.last_test_temp_dir,
+ wrapped_testcase_suite.vpp_pid,
+ wrapped_testcase_suite.last_test_vpp_binary,
+ )
return stop_run
@@ -355,17 +392,17 @@ def run_forked(testcase_suites):
nonlocal wrapped_testcase_suites
nonlocal unread_testcases
nonlocal free_cpus
- suite.assign_cpus(free_cpus[:suite.cpus_used])
- free_cpus = free_cpus[suite.cpus_used:]
+ suite.assign_cpus(free_cpus[: suite.cpus_used])
+ free_cpus = free_cpus[suite.cpus_used :]
wrapper = TestCaseWrapper(suite, manager)
wrapped_testcase_suites.add(wrapper)
unread_testcases.add(wrapper)
on_suite_start(suite)
def can_run_suite(suite):
- return (tests_running < max_concurrent_tests and
- (suite.cpus_used <= len(free_cpus) or
- suite.cpus_used > max_vpp_cpus))
+ return tests_running < max_concurrent_tests and (
+ suite.cpus_used <= len(free_cpus) or suite.cpus_used > max_vpp_cpus
+ )
while free_cpus and testcase_suites:
a_suite = testcase_suites[0]
@@ -385,10 +422,10 @@ def run_forked(testcase_suites):
read_from_testcases = threading.Event()
read_from_testcases.set()
- stdouterr_thread = threading.Thread(target=stdouterr_reader_wrapper,
- args=(unread_testcases,
- finished_unread_testcases,
- read_from_testcases))
+ stdouterr_thread = threading.Thread(
+ target=stdouterr_reader_wrapper,
+ args=(unread_testcases, finished_unread_testcases, read_from_testcases),
+ )
stdouterr_thread.start()
failed_wrapped_testcases = set()
@@ -400,59 +437,75 @@ def run_forked(testcase_suites):
for wrapped_testcase_suite in wrapped_testcase_suites:
while wrapped_testcase_suite.result_parent_end.poll():
wrapped_testcase_suite.result.process_result(
- *wrapped_testcase_suite.result_parent_end.recv())
+ *wrapped_testcase_suite.result_parent_end.recv()
+ )
wrapped_testcase_suite.last_heard = time.time()
while wrapped_testcase_suite.keep_alive_parent_end.poll():
- wrapped_testcase_suite.last_test, \
- wrapped_testcase_suite.last_test_vpp_binary, \
- wrapped_testcase_suite.last_test_temp_dir, \
- wrapped_testcase_suite.vpp_pid = \
- wrapped_testcase_suite.keep_alive_parent_end.recv()
+ (
+ wrapped_testcase_suite.last_test,
+ wrapped_testcase_suite.last_test_vpp_binary,
+ wrapped_testcase_suite.last_test_temp_dir,
+ wrapped_testcase_suite.vpp_pid,
+ ) = wrapped_testcase_suite.keep_alive_parent_end.recv()
wrapped_testcase_suite.last_heard = time.time()
if wrapped_testcase_suite.finished_parent_end.poll():
wrapped_testcase_suite.finished_parent_end.recv()
wrapped_testcase_suite.last_heard = time.time()
- stop_run = process_finished_testsuite(
- wrapped_testcase_suite,
- finished_testcase_suites,
- failed_wrapped_testcases,
- results) or stop_run
+ stop_run = (
+ process_finished_testsuite(
+ wrapped_testcase_suite,
+ finished_testcase_suites,
+ failed_wrapped_testcases,
+ results,
+ )
+ or stop_run
+ )
continue
fail = False
- if wrapped_testcase_suite.last_heard + config.timeout < \
- time.time():
+ if wrapped_testcase_suite.last_heard + config.timeout < time.time():
fail = True
wrapped_testcase_suite.logger.critical(
"Child test runner process timed out "
- "(last test running was `%s' in `%s')!" %
- (wrapped_testcase_suite.last_test,
- wrapped_testcase_suite.last_test_temp_dir))
+ "(last test running was `%s' in `%s')!"
+ % (
+ wrapped_testcase_suite.last_test,
+ wrapped_testcase_suite.last_test_temp_dir,
+ )
+ )
elif not wrapped_testcase_suite.child.is_alive():
fail = True
wrapped_testcase_suite.logger.critical(
"Child test runner process unexpectedly died "
- "(last test running was `%s' in `%s')!" %
- (wrapped_testcase_suite.last_test,
- wrapped_testcase_suite.last_test_temp_dir))
- elif wrapped_testcase_suite.last_test_temp_dir and \
- wrapped_testcase_suite.last_test_vpp_binary:
- if is_core_present(
- wrapped_testcase_suite.last_test_temp_dir):
+ "(last test running was `%s' in `%s')!"
+ % (
+ wrapped_testcase_suite.last_test,
+ wrapped_testcase_suite.last_test_temp_dir,
+ )
+ )
+ elif (
+ wrapped_testcase_suite.last_test_temp_dir
+ and wrapped_testcase_suite.last_test_vpp_binary
+ ):
+ if is_core_present(wrapped_testcase_suite.last_test_temp_dir):
wrapped_testcase_suite.add_testclass_with_core()
if wrapped_testcase_suite.core_detected_at is None:
- wrapped_testcase_suite.core_detected_at = \
- time.time()
- elif wrapped_testcase_suite.core_detected_at + \
- core_timeout < time.time():
+ wrapped_testcase_suite.core_detected_at = time.time()
+ elif (
+ wrapped_testcase_suite.core_detected_at + core_timeout
+ < time.time()
+ ):
wrapped_testcase_suite.logger.critical(
"Child test runner process unresponsive and "
"core-file exists in test temporary directory "
- "(last test running was `%s' in `%s')!" %
- (wrapped_testcase_suite.last_test,
- wrapped_testcase_suite.last_test_temp_dir))
+ "(last test running was `%s' in `%s')!"
+ % (
+ wrapped_testcase_suite.last_test,
+ wrapped_testcase_suite.last_test_temp_dir,
+ )
+ )
fail = True
if fail:
@@ -461,19 +514,23 @@ def run_forked(testcase_suites):
# terminating the child process tends to leave orphan
# VPP process around
if wrapped_testcase_suite.vpp_pid:
- os.kill(wrapped_testcase_suite.vpp_pid,
- signal.SIGTERM)
+ os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM)
except OSError:
# already dead
pass
wrapped_testcase_suite.result.crashed = True
wrapped_testcase_suite.result.process_result(
- wrapped_testcase_suite.last_test_id, ERROR)
- stop_run = process_finished_testsuite(
- wrapped_testcase_suite,
- finished_testcase_suites,
- failed_wrapped_testcases,
- results) or stop_run
+ wrapped_testcase_suite.last_test_id, ERROR
+ )
+ stop_run = (
+ process_finished_testsuite(
+ wrapped_testcase_suite,
+ finished_testcase_suites,
+ failed_wrapped_testcases,
+ results,
+ )
+ or stop_run
+ )
for finished_testcase in finished_testcase_suites:
# Somewhat surprisingly, the join below may
@@ -484,9 +541,9 @@ def run_forked(testcase_suites):
join_end = time.time()
if join_end - join_start >= test_finished_join_timeout:
finished_testcase.logger.error(
- "Timeout joining finished test: %s (pid %d)" %
- (finished_testcase.last_test,
- finished_testcase.child.pid))
+ "Timeout joining finished test: %s (pid %d)"
+ % (finished_testcase.last_test, finished_testcase.child.pid)
+ )
finished_testcase.close_pipes()
wrapped_testcase_suites.remove(finished_testcase)
finished_unread_testcases.add(finished_testcase)
@@ -548,7 +605,7 @@ class TestSuiteWrapper(unittest.TestSuite):
class SplitToSuitesCallback:
def __init__(self, filter_callback):
self.suites = {}
- self.suite_name = 'default'
+ self.suite_name = "default"
self.filter_callback = filter_callback
self.filtered = TestSuiteWrapper()
@@ -573,28 +630,27 @@ def parse_test_filter(test_filter):
filter_class_name = None
filter_func_name = None
if f:
- if '.' in f:
- parts = f.split('.')
+ if "." in f:
+ parts = f.split(".")
if len(parts) > 3:
- raise Exception("Unrecognized %s option: %s" %
- (test_option, f))
+ raise Exception("Unrecognized %s option: %s" % (test_option, f))
if len(parts) > 2:
- if parts[2] not in ('*', ''):
+ if parts[2] not in ("*", ""):
filter_func_name = parts[2]
- if parts[1] not in ('*', ''):
+ if parts[1] not in ("*", ""):
filter_class_name = parts[1]
- if parts[0] not in ('*', ''):
- if parts[0].startswith('test_'):
+ if parts[0] not in ("*", ""):
+ if parts[0].startswith("test_"):
filter_file_name = parts[0]
else:
- filter_file_name = 'test_%s' % parts[0]
+ filter_file_name = "test_%s" % parts[0]
else:
- if f.startswith('test_'):
+ if f.startswith("test_"):
filter_file_name = f
else:
- filter_file_name = 'test_%s' % f
+ filter_file_name = "test_%s" % f
if filter_file_name:
- filter_file_name = '%s.py' % filter_file_name
+ filter_file_name = "%s.py" % filter_file_name
return filter_file_name, filter_class_name, filter_func_name
@@ -608,7 +664,7 @@ def filter_tests(tests, filter_cb):
result.addTest(x)
elif isinstance(t, unittest.TestCase):
# this is a single test
- parts = t.id().split('.')
+ parts = t.id().split(".")
# t.id() for common cases like this:
# test_classifier.TestClassifier.test_acl_ip
# apply filtering only if it is so
@@ -645,11 +701,11 @@ class FilterByClassList:
self.classes_with_filenames = classes_with_filenames
def __call__(self, file_name, class_name, func_name):
- return '.'.join([file_name, class_name]) in self.classes_with_filenames
+ return ".".join([file_name, class_name]) in self.classes_with_filenames
def suite_from_failed(suite, failed):
- failed = {x.rsplit('.', 1)[0] for x in failed}
+ failed = {x.rsplit(".", 1)[0] for x in failed}
filter_cb = FilterByClassList(failed)
suite = filter_tests(suite, filter_cb)
return suite
@@ -695,9 +751,9 @@ class AllResults(dict):
return retval
def print_results(self):
- print('')
+ print("")
print(double_line_delim)
- print('TEST RESULTS:')
+ print("TEST RESULTS:")
def indent_results(lines):
lines = list(filter(None, lines))
@@ -707,62 +763,86 @@ class AllResults(dict):
padding = " " * (maximum - l.index(":"))
print(f"{padding}{l}")
- indent_results([
- f'Scheduled tests: {self.all_testcases}',
- f'Executed tests: {self[TEST_RUN]}',
- f'Passed tests: {colorize(self[PASS], GREEN)}',
- f'Skipped tests: {colorize(self[SKIP], YELLOW)}'
- if self[SKIP] else None,
- f'Not Executed tests: {colorize(self.not_executed, RED)}'
- if self.not_executed else None,
- f'Failures: {colorize(self[FAIL], RED)}' if self[FAIL] else None,
- f'Errors: {colorize(self[ERROR], RED)}' if self[ERROR] else None,
- 'Tests skipped due to lack of CPUS: '
- f'{colorize(self[SKIP_CPU_SHORTAGE], YELLOW)}'
- if self[SKIP_CPU_SHORTAGE] else None
- ])
+ indent_results(
+ [
+ f"Scheduled tests: {self.all_testcases}",
+ f"Executed tests: {self[TEST_RUN]}",
+ f"Passed tests: {colorize(self[PASS], GREEN)}",
+ f"Skipped tests: {colorize(self[SKIP], YELLOW)}"
+ if self[SKIP]
+ else None,
+ f"Not Executed tests: {colorize(self.not_executed, RED)}"
+ if self.not_executed
+ else None,
+ f"Failures: {colorize(self[FAIL], RED)}" if self[FAIL] else None,
+ f"Errors: {colorize(self[ERROR], RED)}" if self[ERROR] else None,
+ "Tests skipped due to lack of CPUS: "
+ f"{colorize(self[SKIP_CPU_SHORTAGE], YELLOW)}"
+ if self[SKIP_CPU_SHORTAGE]
+ else None,
+ ]
+ )
if self.all_failed > 0:
- print('FAILURES AND ERRORS IN TESTS:')
+ print("FAILURES AND ERRORS IN TESTS:")
for result in self.results_per_suite:
failed_testcase_ids = result[FAIL]
errored_testcase_ids = result[ERROR]
old_testcase_name = None
if failed_testcase_ids:
for failed_test_id in failed_testcase_ids:
- new_testcase_name, test_name = \
- result.get_testcase_names(failed_test_id)
+ new_testcase_name, test_name = result.get_testcase_names(
+ failed_test_id
+ )
if new_testcase_name != old_testcase_name:
- print(' Testcase name: {}'.format(
- colorize(new_testcase_name, RED)))
+ print(
+ " Testcase name: {}".format(
+ colorize(new_testcase_name, RED)
+ )
+ )
old_testcase_name = new_testcase_name
- print(' FAILURE: {} [{}]'.format(
- colorize(test_name, RED), failed_test_id))
+ print(
+ " FAILURE: {} [{}]".format(
+ colorize(test_name, RED), failed_test_id
+ )
+ )
if errored_testcase_ids:
for errored_test_id in errored_testcase_ids:
- new_testcase_name, test_name = \
- result.get_testcase_names(errored_test_id)
+ new_testcase_name, test_name = result.get_testcase_names(
+ errored_test_id
+ )
if new_testcase_name != old_testcase_name:
- print(' Testcase name: {}'.format(
- colorize(new_testcase_name, RED)))
+ print(
+ " Testcase name: {}".format(
+ colorize(new_testcase_name, RED)
+ )
+ )
old_testcase_name = new_testcase_name
- print(' ERROR: {} [{}]'.format(
- colorize(test_name, RED), errored_test_id))
+ print(
+ " ERROR: {} [{}]".format(
+ colorize(test_name, RED), errored_test_id
+ )
+ )
if self.testsuites_no_tests_run:
- print('TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:')
+ print("TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:")
tc_classes = set()
for testsuite in self.testsuites_no_tests_run:
for testcase in testsuite:
tc_classes.add(get_testcase_doc_name(testcase))
for tc_class in tc_classes:
- print(' {}'.format(colorize(tc_class, RED)))
+ print(" {}".format(colorize(tc_class, RED)))
if self[SKIP_CPU_SHORTAGE]:
print()
- print(colorize(' SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT'
- ' ENOUGH CPUS AVAILABLE', YELLOW))
+ print(
+ colorize(
+ " SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
+ " ENOUGH CPUS AVAILABLE",
+ YELLOW,
+ )
+ )
print(double_line_delim)
- print('')
+ print("")
@property
def not_executed(self):
@@ -805,7 +885,7 @@ def parse_results(results):
return return_code, results_per_suite.rerun
-if __name__ == '__main__':
+if __name__ == "__main__":
print(f"Config is: {config}")
@@ -831,35 +911,41 @@ if __name__ == '__main__':
print(f"OS reports {num_cpus} available cpu(s).")
test_jobs = config.jobs
- if test_jobs == 'auto':
+ if test_jobs == "auto":
if run_interactive:
max_concurrent_tests = 1
- print('Interactive mode required, running tests consecutively.')
+ print("Interactive mode required, running tests consecutively.")
else:
max_concurrent_tests = num_cpus
- print(f"Running at most {max_concurrent_tests} python test "
- "processes concurrently.")
+ print(
+ f"Running at most {max_concurrent_tests} python test "
+ "processes concurrently."
+ )
else:
max_concurrent_tests = test_jobs
- print(f"Running at most {max_concurrent_tests} python test processes "
- "concurrently as set by 'TEST_JOBS'.")
+ print(
+ f"Running at most {max_concurrent_tests} python test processes "
+ "concurrently as set by 'TEST_JOBS'."
+ )
print(f"Using at most {max_vpp_cpus} cpus for VPP threads.")
if run_interactive and max_concurrent_tests > 1:
raise NotImplementedError(
- 'Running tests interactively (DEBUG is gdb[server] or ATTACH or '
- 'STEP is set) in parallel (TEST_JOBS is more than 1) is not '
- 'supported')
+ "Running tests interactively (DEBUG is gdb[server] or ATTACH or "
+ "STEP is set) in parallel (TEST_JOBS is more than 1) is not "
+ "supported"
+ )
descriptions = True
print("Running tests using custom test runner.")
- filter_file, filter_class, filter_func = \
- parse_test_filter(config.filter)
+ filter_file, filter_class, filter_func = parse_test_filter(config.filter)
- print("Selected filters: file=%s, class=%s, function=%s" % (
- filter_file, filter_class, filter_func))
+ print(
+ "Selected filters: file=%s, class=%s, function=%s"
+ % (filter_file, filter_class, filter_func)
+ )
filter_cb = FilterByTestOption(filter_file, filter_class, filter_func)
@@ -882,17 +968,19 @@ if __name__ == '__main__':
# in stopTest() (for that to trigger, test function must run)
for t in testcase_suite:
for m in dir(t):
- if m.startswith('test_'):
+ if m.startswith("test_"):
setattr(t, m, lambda: t.skipTest("not enough cpus"))
- setattr(t.__class__, 'setUpClass', lambda: None)
- setattr(t.__class__, 'tearDownClass', lambda: None)
- setattr(t, 'setUp', lambda: None)
- setattr(t, 'tearDown', lambda: None)
+ setattr(t.__class__, "setUpClass", lambda: None)
+ setattr(t.__class__, "tearDownClass", lambda: None)
+ setattr(t, "setUp", lambda: None)
+ setattr(t, "tearDown", lambda: None)
t.__class__.skipped_due_to_cpu_lack = True
suites.append(testcase_suite)
- print("%s out of %s tests match specified filters" % (
- tests_amount, tests_amount + cb.filtered.countTestCases()))
+ print(
+ "%s out of %s tests match specified filters"
+ % (tests_amount, tests_amount + cb.filtered.countTestCases())
+ )
if not config.extended:
print("Not running extended tests (some tests will be skipped)")
@@ -903,49 +991,60 @@ if __name__ == '__main__':
if run_interactive and suites:
# don't fork if requiring interactive terminal
- print('Running tests in foreground in the current process')
+ print("Running tests in foreground in the current process")
full_suite = unittest.TestSuite()
free_cpus = list(available_cpus)
cpu_shortage = False
for suite in suites:
if suite.cpus_used <= max_vpp_cpus:
- suite.assign_cpus(free_cpus[:suite.cpus_used])
+ suite.assign_cpus(free_cpus[: suite.cpus_used])
else:
suite.assign_cpus([])
cpu_shortage = True
full_suite.addTests(suites)
- result = VppTestRunner(verbosity=config.verbose,
- failfast=config.failfast,
- print_summary=True).run(full_suite)
+ result = VppTestRunner(
+ verbosity=config.verbose, failfast=config.failfast, print_summary=True
+ ).run(full_suite)
was_successful = result.wasSuccessful()
if not was_successful:
for test_case_info in result.failed_test_cases_info:
- handle_failed_suite(test_case_info.logger,
- test_case_info.tempdir,
- test_case_info.vpp_pid,
- config.vpp)
+ handle_failed_suite(
+ test_case_info.logger,
+ test_case_info.tempdir,
+ test_case_info.vpp_pid,
+ config.vpp,
+ )
if test_case_info in result.core_crash_test_cases_info:
- check_and_handle_core(test_case_info.vpp_bin_path,
- test_case_info.tempdir,
- test_case_info.core_crash_test)
+ check_and_handle_core(
+ test_case_info.vpp_bin_path,
+ test_case_info.tempdir,
+ test_case_info.core_crash_test,
+ )
if cpu_shortage:
print()
- print(colorize('SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT'
- ' ENOUGH CPUS AVAILABLE', YELLOW))
+ print(
+ colorize(
+ "SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT"
+ " ENOUGH CPUS AVAILABLE",
+ YELLOW,
+ )
+ )
print()
sys.exit(not was_successful)
else:
- print('Running each VPPTestCase in a separate background process'
- f' with at most {max_concurrent_tests} parallel python test '
- 'process(es)')
+ print(
+ "Running each VPPTestCase in a separate background process"
+ f" with at most {max_concurrent_tests} parallel python test "
+ "process(es)"
+ )
exit_code = 0
while suites and attempts > 0:
results = run_forked(suites)
exit_code, suites = parse_results(results)
attempts -= 1
if exit_code == 0:
- print('Test run was successful')
+ print("Test run was successful")
else:
- print('%s attempt(s) left.' % attempts)
+ print("%s attempt(s) left." % attempts)
sys.exit(exit_code)