diff options
author | Klement Sekera <klement.sekera@gmail.com> | 2022-04-26 19:02:15 +0200 |
---|---|---|
committer | Ole Tr�an <otroan@employees.org> | 2022-05-10 18:52:08 +0000 |
commit | d9b0c6fbf7aa5bd9af84264105b39c82028a4a29 (patch) | |
tree | 4f786cfd8ebc2443cb11e11b74c8657204068898 /test/run_tests.py | |
parent | f90348bcb4afd0af2611cefc43b17ef3042b511c (diff) |
tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is
much faster, stable PEP8 compliant code style checker offering also
automatic formatting. It aims to be very stable and produce smallest
diffs. It's used by many small and big projects.
Running checkstyle with black takes a few seconds with a terse output.
Thus, test-checkstyle-diff is no longer necessary.
Expand scope of checkstyle to all python files in the repo, replacing
test-checkstyle with checkstyle-python.
Also, fixstyle-python is now available for automatic style formatting.
Note: python virtualenv has been consolidated in test/Makefile,
test/requirements*.txt which will eventually be moved to a central
location. This is required to simply the automated generation of
docker executor images in the CI.
Type: improvement
Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8
Signed-off-by: Klement Sekera <klement.sekera@gmail.com>
Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'test/run_tests.py')
-rw-r--r-- | test/run_tests.py | 511 |
1 files changed, 305 insertions, 206 deletions
diff --git a/test/run_tests.py b/test/run_tests.py index 8cb631ed660..5df37efba6b 100644 --- a/test/run_tests.py +++ b/test/run_tests.py @@ -16,12 +16,28 @@ from multiprocessing.queues import Queue from multiprocessing.managers import BaseManager import framework from config import config, num_cpus, available_cpus, max_vpp_cpus -from framework import VppTestRunner, VppTestCase, \ - get_testcase_doc_name, get_test_description, PASS, FAIL, ERROR, SKIP, \ - TEST_RUN, SKIP_CPU_SHORTAGE +from framework import ( + VppTestRunner, + VppTestCase, + get_testcase_doc_name, + get_test_description, + PASS, + FAIL, + ERROR, + SKIP, + TEST_RUN, + SKIP_CPU_SHORTAGE, +) from debug import spawn_gdb, start_vpp_in_gdb -from log import get_parallel_logger, double_line_delim, RED, YELLOW, GREEN, \ - colorize, single_line_delim +from log import ( + get_parallel_logger, + double_line_delim, + RED, + YELLOW, + GREEN, + colorize, + single_line_delim, +) from discover_tests import discover_tests import sanity_run_vpp from subprocess import check_output, CalledProcessError @@ -50,7 +66,7 @@ class StreamQueueManager(BaseManager): pass -StreamQueueManager.register('StreamQueue', StreamQueue) +StreamQueueManager.register("StreamQueue", StreamQueue) class TestResult(dict): @@ -68,9 +84,11 @@ class TestResult(dict): self.testcases_by_id = testcases_by_id def was_successful(self): - return 0 == len(self[FAIL]) == len(self[ERROR]) \ - and len(self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]) \ + return ( + 0 == len(self[FAIL]) == len(self[ERROR]) + and len(self[PASS] + self[SKIP] + self[SKIP_CPU_SHORTAGE]) == self.testcase_suite.countTestCases() + ) def no_tests_run(self): return 0 == len(self[TEST_RUN]) @@ -90,10 +108,11 @@ class TestResult(dict): def get_testcase_names(self, test_id): # could be tearDownClass (test_ipsec_esp.TestIpsecEsp1) setup_teardown_match = re.match( - r'((tearDownClass)|(setUpClass)) \((.+\..+)\)', test_id) + r"((tearDownClass)|(setUpClass)) \((.+\..+)\)", test_id + ) if setup_teardown_match: test_name, _, _, testcase_name = setup_teardown_match.groups() - if len(testcase_name.split('.')) == 2: + if len(testcase_name.split(".")) == 2: for key in self.testcases_by_id.keys(): if key.startswith(testcase_name): testcase_name = key @@ -107,8 +126,7 @@ class TestResult(dict): def _get_test_description(self, test_id): if test_id in self.testcases_by_id: - desc = get_test_description(descriptions, - self.testcases_by_id[test_id]) + desc = get_test_description(descriptions, self.testcases_by_id[test_id]) else: desc = test_id return desc @@ -121,17 +139,20 @@ class TestResult(dict): return doc_name -def test_runner_wrapper(suite, keep_alive_pipe, stdouterr_queue, - finished_pipe, result_pipe, logger): +def test_runner_wrapper( + suite, keep_alive_pipe, stdouterr_queue, finished_pipe, result_pipe, logger +): sys.stdout = stdouterr_queue sys.stderr = stdouterr_queue VppTestCase.parallel_handler = logger.handlers[0] - result = VppTestRunner(keep_alive_pipe=keep_alive_pipe, - descriptions=descriptions, - verbosity=config.verbose, - result_pipe=result_pipe, - failfast=config.failfast, - print_summary=False).run(suite) + result = VppTestRunner( + keep_alive_pipe=keep_alive_pipe, + descriptions=descriptions, + verbosity=config.verbose, + result_pipe=result_pipe, + failfast=config.failfast, + print_summary=False, + ).run(suite) finished_pipe.send(result.wasSuccessful()) finished_pipe.close() keep_alive_pipe.close() @@ -139,21 +160,23 @@ def test_runner_wrapper(suite, keep_alive_pipe, stdouterr_queue, class TestCaseWrapper(object): def __init__(self, testcase_suite, manager): - self.keep_alive_parent_end, self.keep_alive_child_end = Pipe( - duplex=False) + self.keep_alive_parent_end, self.keep_alive_child_end = Pipe(duplex=False) self.finished_parent_end, self.finished_child_end = Pipe(duplex=False) self.result_parent_end, self.result_child_end = Pipe(duplex=False) self.testcase_suite = testcase_suite self.stdouterr_queue = manager.StreamQueue(ctx=get_context()) self.logger = get_parallel_logger(self.stdouterr_queue) - self.child = Process(target=test_runner_wrapper, - args=(testcase_suite, - self.keep_alive_child_end, - self.stdouterr_queue, - self.finished_child_end, - self.result_child_end, - self.logger) - ) + self.child = Process( + target=test_runner_wrapper, + args=( + testcase_suite, + self.keep_alive_child_end, + self.stdouterr_queue, + self.finished_child_end, + self.result_child_end, + self.logger, + ), + ) self.child.start() self.last_test_temp_dir = None self.last_test_vpp_binary = None @@ -187,18 +210,20 @@ class TestCaseWrapper(object): if self.last_test_id in self.testcases_by_id: test = self.testcases_by_id[self.last_test_id] class_name = unittest.util.strclass(test.__class__) - test_name = "'{}' ({})".format(get_test_description(descriptions, - test), - self.last_test_id) + test_name = "'{}' ({})".format( + get_test_description(descriptions, test), self.last_test_id + ) else: test_name = self.last_test_id - class_name = re.match(r'((tearDownClass)|(setUpClass)) ' - r'\((.+\..+)\)', test_name).groups()[3] + class_name = re.match( + r"((tearDownClass)|(setUpClass)) " r"\((.+\..+)\)", test_name + ).groups()[3] if class_name not in self.testclasess_with_core: self.testclasess_with_core[class_name] = ( test_name, self.last_test_vpp_binary, - self.last_test_temp_dir) + self.last_test_temp_dir, + ) def close_pipes(self): self.keep_alive_child_end.close() @@ -219,8 +244,9 @@ class TestCaseWrapper(object): return self.testcase_suite.get_assigned_cpus() -def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases, - read_testcases): +def stdouterr_reader_wrapper( + unread_testcases, finished_unread_testcases, read_testcases +): read_testcase = None while read_testcases.is_set() or unread_testcases: if finished_unread_testcases: @@ -229,7 +255,7 @@ def stdouterr_reader_wrapper(unread_testcases, finished_unread_testcases, elif unread_testcases: read_testcase = unread_testcases.pop() if read_testcase: - data = '' + data = "" while data is not None: sys.stdout.write(data) data = read_testcase.stdouterr_queue.get() @@ -243,52 +269,62 @@ def handle_failed_suite(logger, last_test_temp_dir, vpp_pid, vpp_binary): if last_test_temp_dir: # Need to create link in case of a timeout or core dump without failure lttd = os.path.basename(last_test_temp_dir) - link_path = '%s%s-FAILED' % (config.failed_dir, lttd) + link_path = "%s%s-FAILED" % (config.failed_dir, lttd) if not os.path.exists(link_path): os.symlink(last_test_temp_dir, link_path) - logger.error("Symlink to failed testcase directory: %s -> %s" - % (link_path, lttd)) + logger.error( + "Symlink to failed testcase directory: %s -> %s" % (link_path, lttd) + ) # Report core existence core_path = get_core_path(last_test_temp_dir) if os.path.exists(core_path): logger.error( - "Core-file exists in test temporary directory: %s!" % - core_path) + "Core-file exists in test temporary directory: %s!" % core_path + ) check_core_path(logger, core_path) logger.debug("Running 'file %s':" % core_path) try: info = check_output(["file", core_path]) logger.debug(info) except CalledProcessError as e: - logger.error("Subprocess returned with return code " - "while running `file' utility on core-file " - "returned: " - "rc=%s", e.returncode) + logger.error( + "Subprocess returned with return code " + "while running `file' utility on core-file " + "returned: " + "rc=%s", + e.returncode, + ) except OSError as e: - logger.error("Subprocess returned with OS error while " - "running 'file' utility " - "on core-file: " - "(%s) %s", e.errno, e.strerror) + logger.error( + "Subprocess returned with OS error while " + "running 'file' utility " + "on core-file: " + "(%s) %s", + e.errno, + e.strerror, + ) except Exception as e: - logger.exception("Unexpected error running `file' utility " - "on core-file") + logger.exception("Unexpected error running `file' utility on core-file") logger.error(f"gdb {vpp_binary} {core_path}") if vpp_pid: # Copy api post mortem api_post_mortem_path = "/tmp/api_post_mortem.%d" % vpp_pid if os.path.isfile(api_post_mortem_path): - logger.error("Copying api_post_mortem.%d to %s" % - (vpp_pid, last_test_temp_dir)) + logger.error( + "Copying api_post_mortem.%d to %s" % (vpp_pid, last_test_temp_dir) + ) shutil.copy2(api_post_mortem_path, last_test_temp_dir) def check_and_handle_core(vpp_binary, tempdir, core_crash_test): if is_core_present(tempdir): if debug_core: - print('VPP core detected in %s. Last test running was %s' % - (tempdir, core_crash_test)) + print( + "VPP core detected in %s. Last test running was %s" + % (tempdir, core_crash_test) + ) print(single_line_delim) spawn_gdb(vpp_binary, get_core_path(tempdir)) print(single_line_delim) @@ -305,10 +341,9 @@ def handle_cores(failed_testcases): check_and_handle_core(vpp_binary, tempdir, test) -def process_finished_testsuite(wrapped_testcase_suite, - finished_testcase_suites, - failed_wrapped_testcases, - results): +def process_finished_testsuite( + wrapped_testcase_suite, finished_testcase_suites, failed_wrapped_testcases, results +): results.append(wrapped_testcase_suite.result) finished_testcase_suites.add(wrapped_testcase_suite) stop_run = False @@ -317,10 +352,12 @@ def process_finished_testsuite(wrapped_testcase_suite, if not wrapped_testcase_suite.was_successful(): failed_wrapped_testcases.add(wrapped_testcase_suite) - handle_failed_suite(wrapped_testcase_suite.logger, - wrapped_testcase_suite.last_test_temp_dir, - wrapped_testcase_suite.vpp_pid, - wrapped_testcase_suite.last_test_vpp_binary,) + handle_failed_suite( + wrapped_testcase_suite.logger, + wrapped_testcase_suite.last_test_temp_dir, + wrapped_testcase_suite.vpp_pid, + wrapped_testcase_suite.last_test_vpp_binary, + ) return stop_run @@ -355,17 +392,17 @@ def run_forked(testcase_suites): nonlocal wrapped_testcase_suites nonlocal unread_testcases nonlocal free_cpus - suite.assign_cpus(free_cpus[:suite.cpus_used]) - free_cpus = free_cpus[suite.cpus_used:] + suite.assign_cpus(free_cpus[: suite.cpus_used]) + free_cpus = free_cpus[suite.cpus_used :] wrapper = TestCaseWrapper(suite, manager) wrapped_testcase_suites.add(wrapper) unread_testcases.add(wrapper) on_suite_start(suite) def can_run_suite(suite): - return (tests_running < max_concurrent_tests and - (suite.cpus_used <= len(free_cpus) or - suite.cpus_used > max_vpp_cpus)) + return tests_running < max_concurrent_tests and ( + suite.cpus_used <= len(free_cpus) or suite.cpus_used > max_vpp_cpus + ) while free_cpus and testcase_suites: a_suite = testcase_suites[0] @@ -385,10 +422,10 @@ def run_forked(testcase_suites): read_from_testcases = threading.Event() read_from_testcases.set() - stdouterr_thread = threading.Thread(target=stdouterr_reader_wrapper, - args=(unread_testcases, - finished_unread_testcases, - read_from_testcases)) + stdouterr_thread = threading.Thread( + target=stdouterr_reader_wrapper, + args=(unread_testcases, finished_unread_testcases, read_from_testcases), + ) stdouterr_thread.start() failed_wrapped_testcases = set() @@ -400,59 +437,75 @@ def run_forked(testcase_suites): for wrapped_testcase_suite in wrapped_testcase_suites: while wrapped_testcase_suite.result_parent_end.poll(): wrapped_testcase_suite.result.process_result( - *wrapped_testcase_suite.result_parent_end.recv()) + *wrapped_testcase_suite.result_parent_end.recv() + ) wrapped_testcase_suite.last_heard = time.time() while wrapped_testcase_suite.keep_alive_parent_end.poll(): - wrapped_testcase_suite.last_test, \ - wrapped_testcase_suite.last_test_vpp_binary, \ - wrapped_testcase_suite.last_test_temp_dir, \ - wrapped_testcase_suite.vpp_pid = \ - wrapped_testcase_suite.keep_alive_parent_end.recv() + ( + wrapped_testcase_suite.last_test, + wrapped_testcase_suite.last_test_vpp_binary, + wrapped_testcase_suite.last_test_temp_dir, + wrapped_testcase_suite.vpp_pid, + ) = wrapped_testcase_suite.keep_alive_parent_end.recv() wrapped_testcase_suite.last_heard = time.time() if wrapped_testcase_suite.finished_parent_end.poll(): wrapped_testcase_suite.finished_parent_end.recv() wrapped_testcase_suite.last_heard = time.time() - stop_run = process_finished_testsuite( - wrapped_testcase_suite, - finished_testcase_suites, - failed_wrapped_testcases, - results) or stop_run + stop_run = ( + process_finished_testsuite( + wrapped_testcase_suite, + finished_testcase_suites, + failed_wrapped_testcases, + results, + ) + or stop_run + ) continue fail = False - if wrapped_testcase_suite.last_heard + config.timeout < \ - time.time(): + if wrapped_testcase_suite.last_heard + config.timeout < time.time(): fail = True wrapped_testcase_suite.logger.critical( "Child test runner process timed out " - "(last test running was `%s' in `%s')!" % - (wrapped_testcase_suite.last_test, - wrapped_testcase_suite.last_test_temp_dir)) + "(last test running was `%s' in `%s')!" + % ( + wrapped_testcase_suite.last_test, + wrapped_testcase_suite.last_test_temp_dir, + ) + ) elif not wrapped_testcase_suite.child.is_alive(): fail = True wrapped_testcase_suite.logger.critical( "Child test runner process unexpectedly died " - "(last test running was `%s' in `%s')!" % - (wrapped_testcase_suite.last_test, - wrapped_testcase_suite.last_test_temp_dir)) - elif wrapped_testcase_suite.last_test_temp_dir and \ - wrapped_testcase_suite.last_test_vpp_binary: - if is_core_present( - wrapped_testcase_suite.last_test_temp_dir): + "(last test running was `%s' in `%s')!" + % ( + wrapped_testcase_suite.last_test, + wrapped_testcase_suite.last_test_temp_dir, + ) + ) + elif ( + wrapped_testcase_suite.last_test_temp_dir + and wrapped_testcase_suite.last_test_vpp_binary + ): + if is_core_present(wrapped_testcase_suite.last_test_temp_dir): wrapped_testcase_suite.add_testclass_with_core() if wrapped_testcase_suite.core_detected_at is None: - wrapped_testcase_suite.core_detected_at = \ - time.time() - elif wrapped_testcase_suite.core_detected_at + \ - core_timeout < time.time(): + wrapped_testcase_suite.core_detected_at = time.time() + elif ( + wrapped_testcase_suite.core_detected_at + core_timeout + < time.time() + ): wrapped_testcase_suite.logger.critical( "Child test runner process unresponsive and " "core-file exists in test temporary directory " - "(last test running was `%s' in `%s')!" % - (wrapped_testcase_suite.last_test, - wrapped_testcase_suite.last_test_temp_dir)) + "(last test running was `%s' in `%s')!" + % ( + wrapped_testcase_suite.last_test, + wrapped_testcase_suite.last_test_temp_dir, + ) + ) fail = True if fail: @@ -461,19 +514,23 @@ def run_forked(testcase_suites): # terminating the child process tends to leave orphan # VPP process around if wrapped_testcase_suite.vpp_pid: - os.kill(wrapped_testcase_suite.vpp_pid, - signal.SIGTERM) + os.kill(wrapped_testcase_suite.vpp_pid, signal.SIGTERM) except OSError: # already dead pass wrapped_testcase_suite.result.crashed = True wrapped_testcase_suite.result.process_result( - wrapped_testcase_suite.last_test_id, ERROR) - stop_run = process_finished_testsuite( - wrapped_testcase_suite, - finished_testcase_suites, - failed_wrapped_testcases, - results) or stop_run + wrapped_testcase_suite.last_test_id, ERROR + ) + stop_run = ( + process_finished_testsuite( + wrapped_testcase_suite, + finished_testcase_suites, + failed_wrapped_testcases, + results, + ) + or stop_run + ) for finished_testcase in finished_testcase_suites: # Somewhat surprisingly, the join below may @@ -484,9 +541,9 @@ def run_forked(testcase_suites): join_end = time.time() if join_end - join_start >= test_finished_join_timeout: finished_testcase.logger.error( - "Timeout joining finished test: %s (pid %d)" % - (finished_testcase.last_test, - finished_testcase.child.pid)) + "Timeout joining finished test: %s (pid %d)" + % (finished_testcase.last_test, finished_testcase.child.pid) + ) finished_testcase.close_pipes() wrapped_testcase_suites.remove(finished_testcase) finished_unread_testcases.add(finished_testcase) @@ -548,7 +605,7 @@ class TestSuiteWrapper(unittest.TestSuite): class SplitToSuitesCallback: def __init__(self, filter_callback): self.suites = {} - self.suite_name = 'default' + self.suite_name = "default" self.filter_callback = filter_callback self.filtered = TestSuiteWrapper() @@ -573,28 +630,27 @@ def parse_test_filter(test_filter): filter_class_name = None filter_func_name = None if f: - if '.' in f: - parts = f.split('.') + if "." in f: + parts = f.split(".") if len(parts) > 3: - raise Exception("Unrecognized %s option: %s" % - (test_option, f)) + raise Exception("Unrecognized %s option: %s" % (test_option, f)) if len(parts) > 2: - if parts[2] not in ('*', ''): + if parts[2] not in ("*", ""): filter_func_name = parts[2] - if parts[1] not in ('*', ''): + if parts[1] not in ("*", ""): filter_class_name = parts[1] - if parts[0] not in ('*', ''): - if parts[0].startswith('test_'): + if parts[0] not in ("*", ""): + if parts[0].startswith("test_"): filter_file_name = parts[0] else: - filter_file_name = 'test_%s' % parts[0] + filter_file_name = "test_%s" % parts[0] else: - if f.startswith('test_'): + if f.startswith("test_"): filter_file_name = f else: - filter_file_name = 'test_%s' % f + filter_file_name = "test_%s" % f if filter_file_name: - filter_file_name = '%s.py' % filter_file_name + filter_file_name = "%s.py" % filter_file_name return filter_file_name, filter_class_name, filter_func_name @@ -608,7 +664,7 @@ def filter_tests(tests, filter_cb): result.addTest(x) elif isinstance(t, unittest.TestCase): # this is a single test - parts = t.id().split('.') + parts = t.id().split(".") # t.id() for common cases like this: # test_classifier.TestClassifier.test_acl_ip # apply filtering only if it is so @@ -645,11 +701,11 @@ class FilterByClassList: self.classes_with_filenames = classes_with_filenames def __call__(self, file_name, class_name, func_name): - return '.'.join([file_name, class_name]) in self.classes_with_filenames + return ".".join([file_name, class_name]) in self.classes_with_filenames def suite_from_failed(suite, failed): - failed = {x.rsplit('.', 1)[0] for x in failed} + failed = {x.rsplit(".", 1)[0] for x in failed} filter_cb = FilterByClassList(failed) suite = filter_tests(suite, filter_cb) return suite @@ -695,9 +751,9 @@ class AllResults(dict): return retval def print_results(self): - print('') + print("") print(double_line_delim) - print('TEST RESULTS:') + print("TEST RESULTS:") def indent_results(lines): lines = list(filter(None, lines)) @@ -707,62 +763,86 @@ class AllResults(dict): padding = " " * (maximum - l.index(":")) print(f"{padding}{l}") - indent_results([ - f'Scheduled tests: {self.all_testcases}', - f'Executed tests: {self[TEST_RUN]}', - f'Passed tests: {colorize(self[PASS], GREEN)}', - f'Skipped tests: {colorize(self[SKIP], YELLOW)}' - if self[SKIP] else None, - f'Not Executed tests: {colorize(self.not_executed, RED)}' - if self.not_executed else None, - f'Failures: {colorize(self[FAIL], RED)}' if self[FAIL] else None, - f'Errors: {colorize(self[ERROR], RED)}' if self[ERROR] else None, - 'Tests skipped due to lack of CPUS: ' - f'{colorize(self[SKIP_CPU_SHORTAGE], YELLOW)}' - if self[SKIP_CPU_SHORTAGE] else None - ]) + indent_results( + [ + f"Scheduled tests: {self.all_testcases}", + f"Executed tests: {self[TEST_RUN]}", + f"Passed tests: {colorize(self[PASS], GREEN)}", + f"Skipped tests: {colorize(self[SKIP], YELLOW)}" + if self[SKIP] + else None, + f"Not Executed tests: {colorize(self.not_executed, RED)}" + if self.not_executed + else None, + f"Failures: {colorize(self[FAIL], RED)}" if self[FAIL] else None, + f"Errors: {colorize(self[ERROR], RED)}" if self[ERROR] else None, + "Tests skipped due to lack of CPUS: " + f"{colorize(self[SKIP_CPU_SHORTAGE], YELLOW)}" + if self[SKIP_CPU_SHORTAGE] + else None, + ] + ) if self.all_failed > 0: - print('FAILURES AND ERRORS IN TESTS:') + print("FAILURES AND ERRORS IN TESTS:") for result in self.results_per_suite: failed_testcase_ids = result[FAIL] errored_testcase_ids = result[ERROR] old_testcase_name = None if failed_testcase_ids: for failed_test_id in failed_testcase_ids: - new_testcase_name, test_name = \ - result.get_testcase_names(failed_test_id) + new_testcase_name, test_name = result.get_testcase_names( + failed_test_id + ) if new_testcase_name != old_testcase_name: - print(' Testcase name: {}'.format( - colorize(new_testcase_name, RED))) + print( + " Testcase name: {}".format( + colorize(new_testcase_name, RED) + ) + ) old_testcase_name = new_testcase_name - print(' FAILURE: {} [{}]'.format( - colorize(test_name, RED), failed_test_id)) + print( + " FAILURE: {} [{}]".format( + colorize(test_name, RED), failed_test_id + ) + ) if errored_testcase_ids: for errored_test_id in errored_testcase_ids: - new_testcase_name, test_name = \ - result.get_testcase_names(errored_test_id) + new_testcase_name, test_name = result.get_testcase_names( + errored_test_id + ) if new_testcase_name != old_testcase_name: - print(' Testcase name: {}'.format( - colorize(new_testcase_name, RED))) + print( + " Testcase name: {}".format( + colorize(new_testcase_name, RED) + ) + ) old_testcase_name = new_testcase_name - print(' ERROR: {} [{}]'.format( - colorize(test_name, RED), errored_test_id)) + print( + " ERROR: {} [{}]".format( + colorize(test_name, RED), errored_test_id + ) + ) if self.testsuites_no_tests_run: - print('TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:') + print("TESTCASES WHERE NO TESTS WERE SUCCESSFULLY EXECUTED:") tc_classes = set() for testsuite in self.testsuites_no_tests_run: for testcase in testsuite: tc_classes.add(get_testcase_doc_name(testcase)) for tc_class in tc_classes: - print(' {}'.format(colorize(tc_class, RED))) + print(" {}".format(colorize(tc_class, RED))) if self[SKIP_CPU_SHORTAGE]: print() - print(colorize(' SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT' - ' ENOUGH CPUS AVAILABLE', YELLOW)) + print( + colorize( + " SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT" + " ENOUGH CPUS AVAILABLE", + YELLOW, + ) + ) print(double_line_delim) - print('') + print("") @property def not_executed(self): @@ -805,7 +885,7 @@ def parse_results(results): return return_code, results_per_suite.rerun -if __name__ == '__main__': +if __name__ == "__main__": print(f"Config is: {config}") @@ -831,35 +911,41 @@ if __name__ == '__main__': print(f"OS reports {num_cpus} available cpu(s).") test_jobs = config.jobs - if test_jobs == 'auto': + if test_jobs == "auto": if run_interactive: max_concurrent_tests = 1 - print('Interactive mode required, running tests consecutively.') + print("Interactive mode required, running tests consecutively.") else: max_concurrent_tests = num_cpus - print(f"Running at most {max_concurrent_tests} python test " - "processes concurrently.") + print( + f"Running at most {max_concurrent_tests} python test " + "processes concurrently." + ) else: max_concurrent_tests = test_jobs - print(f"Running at most {max_concurrent_tests} python test processes " - "concurrently as set by 'TEST_JOBS'.") + print( + f"Running at most {max_concurrent_tests} python test processes " + "concurrently as set by 'TEST_JOBS'." + ) print(f"Using at most {max_vpp_cpus} cpus for VPP threads.") if run_interactive and max_concurrent_tests > 1: raise NotImplementedError( - 'Running tests interactively (DEBUG is gdb[server] or ATTACH or ' - 'STEP is set) in parallel (TEST_JOBS is more than 1) is not ' - 'supported') + "Running tests interactively (DEBUG is gdb[server] or ATTACH or " + "STEP is set) in parallel (TEST_JOBS is more than 1) is not " + "supported" + ) descriptions = True print("Running tests using custom test runner.") - filter_file, filter_class, filter_func = \ - parse_test_filter(config.filter) + filter_file, filter_class, filter_func = parse_test_filter(config.filter) - print("Selected filters: file=%s, class=%s, function=%s" % ( - filter_file, filter_class, filter_func)) + print( + "Selected filters: file=%s, class=%s, function=%s" + % (filter_file, filter_class, filter_func) + ) filter_cb = FilterByTestOption(filter_file, filter_class, filter_func) @@ -882,17 +968,19 @@ if __name__ == '__main__': # in stopTest() (for that to trigger, test function must run) for t in testcase_suite: for m in dir(t): - if m.startswith('test_'): + if m.startswith("test_"): setattr(t, m, lambda: t.skipTest("not enough cpus")) - setattr(t.__class__, 'setUpClass', lambda: None) - setattr(t.__class__, 'tearDownClass', lambda: None) - setattr(t, 'setUp', lambda: None) - setattr(t, 'tearDown', lambda: None) + setattr(t.__class__, "setUpClass", lambda: None) + setattr(t.__class__, "tearDownClass", lambda: None) + setattr(t, "setUp", lambda: None) + setattr(t, "tearDown", lambda: None) t.__class__.skipped_due_to_cpu_lack = True suites.append(testcase_suite) - print("%s out of %s tests match specified filters" % ( - tests_amount, tests_amount + cb.filtered.countTestCases())) + print( + "%s out of %s tests match specified filters" + % (tests_amount, tests_amount + cb.filtered.countTestCases()) + ) if not config.extended: print("Not running extended tests (some tests will be skipped)") @@ -903,49 +991,60 @@ if __name__ == '__main__': if run_interactive and suites: # don't fork if requiring interactive terminal - print('Running tests in foreground in the current process') + print("Running tests in foreground in the current process") full_suite = unittest.TestSuite() free_cpus = list(available_cpus) cpu_shortage = False for suite in suites: if suite.cpus_used <= max_vpp_cpus: - suite.assign_cpus(free_cpus[:suite.cpus_used]) + suite.assign_cpus(free_cpus[: suite.cpus_used]) else: suite.assign_cpus([]) cpu_shortage = True full_suite.addTests(suites) - result = VppTestRunner(verbosity=config.verbose, - failfast=config.failfast, - print_summary=True).run(full_suite) + result = VppTestRunner( + verbosity=config.verbose, failfast=config.failfast, print_summary=True + ).run(full_suite) was_successful = result.wasSuccessful() if not was_successful: for test_case_info in result.failed_test_cases_info: - handle_failed_suite(test_case_info.logger, - test_case_info.tempdir, - test_case_info.vpp_pid, - config.vpp) + handle_failed_suite( + test_case_info.logger, + test_case_info.tempdir, + test_case_info.vpp_pid, + config.vpp, + ) if test_case_info in result.core_crash_test_cases_info: - check_and_handle_core(test_case_info.vpp_bin_path, - test_case_info.tempdir, - test_case_info.core_crash_test) + check_and_handle_core( + test_case_info.vpp_bin_path, + test_case_info.tempdir, + test_case_info.core_crash_test, + ) if cpu_shortage: print() - print(colorize('SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT' - ' ENOUGH CPUS AVAILABLE', YELLOW)) + print( + colorize( + "SOME TESTS WERE SKIPPED BECAUSE THERE ARE NOT" + " ENOUGH CPUS AVAILABLE", + YELLOW, + ) + ) print() sys.exit(not was_successful) else: - print('Running each VPPTestCase in a separate background process' - f' with at most {max_concurrent_tests} parallel python test ' - 'process(es)') + print( + "Running each VPPTestCase in a separate background process" + f" with at most {max_concurrent_tests} parallel python test " + "process(es)" + ) exit_code = 0 while suites and attempts > 0: results = run_forked(suites) exit_code, suites = parse_results(results) attempts -= 1 if exit_code == 0: - print('Test run was successful') + print("Test run was successful") else: - print('%s attempt(s) left.' % attempts) + print("%s attempt(s) left." % attempts) sys.exit(exit_code) |