diff options
author | Klement Sekera <klement.sekera@gmail.com> | 2022-04-26 19:02:15 +0200 |
---|---|---|
committer | Ole Tr�an <otroan@employees.org> | 2022-05-10 18:52:08 +0000 |
commit | d9b0c6fbf7aa5bd9af84264105b39c82028a4a29 (patch) | |
tree | 4f786cfd8ebc2443cb11e11b74c8657204068898 /test/framework.py | |
parent | f90348bcb4afd0af2611cefc43b17ef3042b511c (diff) |
tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is
much faster, stable PEP8 compliant code style checker offering also
automatic formatting. It aims to be very stable and produce smallest
diffs. It's used by many small and big projects.
Running checkstyle with black takes a few seconds with a terse output.
Thus, test-checkstyle-diff is no longer necessary.
Expand scope of checkstyle to all python files in the repo, replacing
test-checkstyle with checkstyle-python.
Also, fixstyle-python is now available for automatic style formatting.
Note: python virtualenv has been consolidated in test/Makefile,
test/requirements*.txt which will eventually be moved to a central
location. This is required to simply the automated generation of
docker executor images in the CI.
Type: improvement
Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8
Signed-off-by: Klement Sekera <klement.sekera@gmail.com>
Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'test/framework.py')
-rw-r--r-- | test/framework.py | 880 |
1 files changed, 524 insertions, 356 deletions
diff --git a/test/framework.py b/test/framework.py index 8065518ff7a..05e59b577cb 100644 --- a/test/framework.py +++ b/test/framework.py @@ -37,8 +37,15 @@ from vpp_papi import VppEnum import vpp_papi from vpp_papi.vpp_stats import VPPStats from vpp_papi.vpp_transport_socket import VppTransportSocketIOError -from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \ - get_logger, colorize +from log import ( + RED, + GREEN, + YELLOW, + double_line_delim, + single_line_delim, + get_logger, + colorize, +) from vpp_object import VppObjectRegistry from util import ppp, is_core_present from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror @@ -49,7 +56,7 @@ from scapy.layers.inet6 import ICMPv6EchoReply logger = logging.getLogger(__name__) # Set up an empty logger for the testcase that can be overridden as necessary -null_logger = logging.getLogger('VppTestCase') +null_logger = logging.getLogger("VppTestCase") null_logger.addHandler(logging.NullHandler()) PASS = 0 @@ -72,10 +79,13 @@ if config.debug_framework: class VppDiedError(Exception): - """ exception for reporting that the subprocess has died.""" + """exception for reporting that the subprocess has died.""" - signals_by_value = {v: k for k, v in signal.__dict__.items() if - k.startswith('SIG') and not k.startswith('SIG_')} + signals_by_value = { + v: k + for k, v in signal.__dict__.items() + if k.startswith("SIG") and not k.startswith("SIG_") + } def __init__(self, rv=None, testcase=None, method_name=None): self.rv = rv @@ -89,15 +99,16 @@ class VppDiedError(Exception): pass if testcase is None and method_name is None: - in_msg = '' + in_msg = "" else: - in_msg = ' while running %s.%s' % (testcase, method_name) + in_msg = " while running %s.%s" % (testcase, method_name) if self.rv: - msg = "VPP subprocess died unexpectedly%s with return code: %d%s."\ - % (in_msg, self.rv, ' [%s]' % - (self.signal_name if - self.signal_name is not None else '')) + msg = "VPP subprocess died unexpectedly%s with return code: %d%s." % ( + in_msg, + self.rv, + " [%s]" % (self.signal_name if self.signal_name is not None else ""), + ) else: msg = "VPP subprocess died unexpectedly%s." % in_msg @@ -110,6 +121,7 @@ class _PacketInfo(object): Help process information about the next packet. Set variables to default values. """ + #: Store the index of the packet. index = -1 #: Store the index of the source packet generator interface of the packet. @@ -133,19 +145,23 @@ class _PacketInfo(object): def pump_output(testclass): - """ pump output from vpp stdout/stderr to proper queues """ + """pump output from vpp stdout/stderr to proper queues""" stdout_fragment = "" stderr_fragment = "" while not testclass.pump_thread_stop_flag.is_set(): - readable = select.select([testclass.vpp.stdout.fileno(), - testclass.vpp.stderr.fileno(), - testclass.pump_thread_wakeup_pipe[0]], - [], [])[0] + readable = select.select( + [ + testclass.vpp.stdout.fileno(), + testclass.vpp.stderr.fileno(), + testclass.pump_thread_wakeup_pipe[0], + ], + [], + [], + )[0] if testclass.vpp.stdout.fileno() in readable: read = os.read(testclass.vpp.stdout.fileno(), 102400) if len(read) > 0: - split = read.decode('ascii', - errors='backslashreplace').splitlines(True) + split = read.decode("ascii", errors="backslashreplace").splitlines(True) if len(stdout_fragment) > 0: split[0] = "%s%s" % (stdout_fragment, split[0]) if len(split) > 0 and split[-1].endswith("\n"): @@ -156,13 +172,11 @@ def pump_output(testclass): testclass.vpp_stdout_deque.extend(split[:limit]) if not config.cache_vpp_output: for line in split[:limit]: - testclass.logger.info( - "VPP STDOUT: %s" % line.rstrip("\n")) + testclass.logger.info("VPP STDOUT: %s" % line.rstrip("\n")) if testclass.vpp.stderr.fileno() in readable: read = os.read(testclass.vpp.stderr.fileno(), 102400) if len(read) > 0: - split = read.decode('ascii', - errors='backslashreplace').splitlines(True) + split = read.decode("ascii", errors="backslashreplace").splitlines(True) if len(stderr_fragment) > 0: split[0] = "%s%s" % (stderr_fragment, split[0]) if len(split) > 0 and split[-1].endswith("\n"): @@ -174,14 +188,13 @@ def pump_output(testclass): testclass.vpp_stderr_deque.extend(split[:limit]) if not config.cache_vpp_output: for line in split[:limit]: - testclass.logger.error( - "VPP STDERR: %s" % line.rstrip("\n")) + testclass.logger.error("VPP STDERR: %s" % line.rstrip("\n")) # ignoring the dummy pipe here intentionally - the # flag will take care of properly terminating the loop def _is_platform_aarch64(): - return platform.machine() == 'aarch64' + return platform.machine() == "aarch64" is_platform_aarch64 = _is_platform_aarch64() @@ -191,6 +204,7 @@ class KeepAliveReporter(object): """ Singleton object which reports test start to parent process """ + _shared_state = {} def __init__(self): @@ -216,7 +230,7 @@ class KeepAliveReporter(object): return if isclass(test): - desc = '%s (%s)' % (desc, unittest.util.strclass(test)) + desc = "%s (%s)" % (desc, unittest.util.strclass(test)) else: desc = test.id() @@ -240,6 +254,7 @@ def create_tag_decorator(e): except AttributeError: cls.test_tags = [e] return cls + return decorator @@ -250,7 +265,7 @@ tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN) class DummyVpp: returncode = None - pid = 0xcafebafe + pid = 0xCAFEBAFE def poll(self): pass @@ -300,7 +315,7 @@ class VppTestCase(CPUInterface, unittest.TestCase): @classmethod def has_tag(cls, tag): - """ if the test case has a given tag - return true """ + """if the test case has a given tag - return true""" try: return tag in cls.test_tags except AttributeError: @@ -309,15 +324,15 @@ class VppTestCase(CPUInterface, unittest.TestCase): @classmethod def is_tagged_run_solo(cls): - """ if the test case class is timing-sensitive - return true """ + """if the test case class is timing-sensitive - return true""" return cls.has_tag(TestCaseTag.RUN_SOLO) @classmethod def skip_fixme_asan(cls): - """ if @tag_fixme_asan & ASan is enabled - mark for skip """ + """if @tag_fixme_asan & ASan is enabled - mark for skip""" if cls.has_tag(TestCaseTag.FIXME_ASAN): - vpp_extra_cmake_args = os.environ.get('VPP_EXTRA_CMAKE_ARGS', '') - if 'DVPP_ENABLE_SANITIZE_ADDR=ON' in vpp_extra_cmake_args: + vpp_extra_cmake_args = os.environ.get("VPP_EXTRA_CMAKE_ARGS", "") + if "DVPP_ENABLE_SANITIZE_ADDR=ON" in vpp_extra_cmake_args: cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls) @classmethod @@ -364,7 +379,7 @@ class VppTestCase(CPUInterface, unittest.TestCase): @classmethod def setUpConstants(cls): - """ Set-up the test case class based on environment variables """ + """Set-up the test case class based on environment variables""" cls.step = config.step cls.plugin_path = ":".join(config.vpp_plugin_dir) cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir) @@ -385,34 +400,92 @@ class VppTestCase(CPUInterface, unittest.TestCase): api_fuzzing = config.api_fuzz if api_fuzzing is None: - api_fuzzing = 'off' + api_fuzzing = "off" cls.vpp_cmdline = [ config.vpp, - "unix", "{", "nodaemon", debug_cli, "full-coredump", - coredump_size, "runtime-dir", cls.tempdir, "}", - "api-trace", "{", "on", "}", - "api-segment", "{", "prefix", cls.get_api_segment_prefix(), "}", - "cpu", "{", "main-core", str(cls.cpus[0]), ] + "unix", + "{", + "nodaemon", + debug_cli, + "full-coredump", + coredump_size, + "runtime-dir", + cls.tempdir, + "}", + "api-trace", + "{", + "on", + "}", + "api-segment", + "{", + "prefix", + cls.get_api_segment_prefix(), + "}", + "cpu", + "{", + "main-core", + str(cls.cpus[0]), + ] if cls.extern_plugin_path not in (None, ""): - cls.extra_vpp_plugin_config.append( - "add-path %s" % cls.extern_plugin_path) + cls.extra_vpp_plugin_config.append("add-path %s" % cls.extern_plugin_path) if cls.get_vpp_worker_count(): - cls.vpp_cmdline.extend([ - "corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])]) - cls.vpp_cmdline.extend([ - "}", - "physmem", "{", "max-size", "32m", "}", - "statseg", "{", "socket-name", cls.get_stats_sock_path(), - cls.extra_vpp_statseg_config, "}", - "socksvr", "{", "socket-name", cls.get_api_sock_path(), "}", - "node { ", default_variant, "}", - "api-fuzz {", api_fuzzing, "}", - "plugins", "{", "plugin", "dpdk_plugin.so", "{", "disable", "}", - "plugin", "rdma_plugin.so", "{", "disable", "}", - "plugin", "lisp_unittest_plugin.so", "{", "enable", "}", - "plugin", "unittest_plugin.so", "{", "enable", "}" - ] + cls.extra_vpp_plugin_config + ["}", ]) + cls.vpp_cmdline.extend( + ["corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])] + ) + cls.vpp_cmdline.extend( + [ + "}", + "physmem", + "{", + "max-size", + "32m", + "}", + "statseg", + "{", + "socket-name", + cls.get_stats_sock_path(), + cls.extra_vpp_statseg_config, + "}", + "socksvr", + "{", + "socket-name", + cls.get_api_sock_path(), + "}", + "node { ", + default_variant, + "}", + "api-fuzz {", + api_fuzzing, + "}", + "plugins", + "{", + "plugin", + "dpdk_plugin.so", + "{", + "disable", + "}", + "plugin", + "rdma_plugin.so", + "{", + "disable", + "}", + "plugin", + "lisp_unittest_plugin.so", + "{", + "enable", + "}", + "plugin", + "unittest_plugin.so", + "{", + "enable", + "}", + ] + + cls.extra_vpp_plugin_config + + [ + "}", + ] + ) if cls.extra_vpp_punt_config is not None: cls.vpp_cmdline.extend(cls.extra_vpp_punt_config) @@ -435,17 +508,23 @@ class VppTestCase(CPUInterface, unittest.TestCase): print(single_line_delim) print("You can debug VPP using:") if cls.debug_gdbserver: - print(f"sudo gdb {config.vpp} " - f"-ex 'target remote localhost:{cls.gdbserver_port}'") - print("Now is the time to attach gdb by running the above " - "command, set up breakpoints etc., then resume VPP from " - "within gdb by issuing the 'continue' command") + print( + f"sudo gdb {config.vpp} " + f"-ex 'target remote localhost:{cls.gdbserver_port}'" + ) + print( + "Now is the time to attach gdb by running the above " + "command, set up breakpoints etc., then resume VPP from " + "within gdb by issuing the 'continue' command" + ) cls.gdbserver_port += 1 elif cls.debug_gdb: print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'") - print("Now is the time to attach gdb by running the above " - "command and set up breakpoints etc., then resume VPP from" - " within gdb by issuing the 'continue' command") + print( + "Now is the time to attach gdb by running the above " + "command and set up breakpoints etc., then resume VPP from" + " within gdb by issuing the 'continue' command" + ) print(single_line_delim) input("Press ENTER to continue running the testcase...") @@ -459,31 +538,35 @@ class VppTestCase(CPUInterface, unittest.TestCase): cmdline = cls.vpp_cmdline if cls.debug_gdbserver: - gdbserver = '/usr/bin/gdbserver' - if not os.path.isfile(gdbserver) or\ - not os.access(gdbserver, os.X_OK): - raise Exception("gdbserver binary '%s' does not exist or is " - "not executable" % gdbserver) - - cmdline = [gdbserver, 'localhost:{port}' - .format(port=cls.gdbserver_port)] + cls.vpp_cmdline + gdbserver = "/usr/bin/gdbserver" + if not os.path.isfile(gdbserver) or not os.access(gdbserver, os.X_OK): + raise Exception( + "gdbserver binary '%s' does not exist or is " + "not executable" % gdbserver + ) + + cmdline = [ + gdbserver, + "localhost:{port}".format(port=cls.gdbserver_port), + ] + cls.vpp_cmdline cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline)) try: - cls.vpp = subprocess.Popen(cmdline, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + cls.vpp = subprocess.Popen( + cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) except subprocess.CalledProcessError as e: - cls.logger.critical("Subprocess returned with non-0 return code: (" - "%s)", e.returncode) + cls.logger.critical( + "Subprocess returned with non-0 return code: (%s)", e.returncode + ) raise except OSError as e: - cls.logger.critical("Subprocess returned with OS error: " - "(%s) %s", e.errno, e.strerror) + cls.logger.critical( + "Subprocess returned with OS error: (%s) %s", e.errno, e.strerror + ) raise except Exception as e: - cls.logger.exception("Subprocess returned unexpected from " - "%s:", cmdline) + cls.logger.exception("Subprocess returned unexpected from %s:", cmdline) raise cls.wait_for_enter() @@ -504,11 +587,11 @@ class VppTestCase(CPUInterface, unittest.TestCase): ok = True break if not ok: - cls.logger.error("Timed out waiting for coredump to complete:" - " %s", corefile) + cls.logger.error( + "Timed out waiting for coredump to complete: %s", corefile + ) else: - cls.logger.error("Coredump complete: %s, size %d", - corefile, curr_size) + cls.logger.error("Coredump complete: %s, size %d", corefile, curr_size) @classmethod def get_stats_sock_path(cls): @@ -554,21 +637,24 @@ class VppTestCase(CPUInterface, unittest.TestCase): super(VppTestCase, cls).setUpClass() cls.logger = get_logger(cls.__name__) random.seed(config.rnd_seed) - if hasattr(cls, 'parallel_handler'): + if hasattr(cls, "parallel_handler"): cls.logger.addHandler(cls.parallel_handler) cls.logger.propagate = False cls.set_debug_flags(config.debug) cls.tempdir = cls.get_tempdir() cls.create_file_handler() cls.file_handler.setFormatter( - Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s', - datefmt="%H:%M:%S")) + Formatter(fmt="%(asctime)s,%(msecs)03d %(message)s", datefmt="%H:%M:%S") + ) cls.file_handler.setLevel(DEBUG) cls.logger.addHandler(cls.file_handler) cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__) os.chdir(cls.tempdir) - cls.logger.info("Temporary dir is %s, api socket is %s", - cls.tempdir, cls.get_api_sock_path()) + cls.logger.info( + "Temporary dir is %s, api socket is %s", + cls.tempdir, + cls.get_api_sock_path(), + ) cls.logger.debug("Random seed is %s", config.rnd_seed) cls.setUpConstants() cls.reset_packet_infos() @@ -586,9 +672,10 @@ class VppTestCase(CPUInterface, unittest.TestCase): cls.attach_vpp() else: cls.run_vpp() - cls.reporter.send_keep_alive(cls, 'setUpClass') + cls.reporter.send_keep_alive(cls, "setUpClass") VppTestResult.current_test_case_info = TestCaseInfo( - cls.logger, cls.tempdir, cls.vpp.pid, config.vpp) + cls.logger, cls.tempdir, cls.vpp.pid, config.vpp + ) cls.vpp_stdout_deque = deque() cls.vpp_stderr_deque = deque() if not cls.debug_attach: @@ -599,8 +686,7 @@ class VppTestCase(CPUInterface, unittest.TestCase): cls.pump_thread.start() if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach: cls.vapi_response_timeout = 0 - cls.vapi = VppPapiProvider(cls.__name__, cls, - cls.vapi_response_timeout) + cls.vapi = VppPapiProvider(cls.__name__, cls, cls.vapi_response_timeout) if cls.step: hook = hookmodule.StepHook(cls) else: @@ -613,7 +699,8 @@ class VppTestCase(CPUInterface, unittest.TestCase): cls.vpp_startup_failed = True cls.logger.critical( "VPP died shortly after startup, check the" - " output to standard error for possible cause") + " output to standard error for possible cause" + ) raise try: cls.vapi.connect() @@ -622,9 +709,14 @@ class VppTestCase(CPUInterface, unittest.TestCase): cls.vapi.disconnect() if cls.debug_gdbserver: - print(colorize("You're running VPP inside gdbserver but " - "VPP-API connection failed, did you forget " - "to 'continue' VPP from within gdb?", RED)) + print( + colorize( + "You're running VPP inside gdbserver but " + "VPP-API connection failed, did you forget " + "to 'continue' VPP from within gdb?", + RED, + ) + ) raise e if cls.debug_attach: last_line = cls.vapi.cli("show thread").split("\n")[-2] @@ -641,7 +733,7 @@ class VppTestCase(CPUInterface, unittest.TestCase): @classmethod def _debug_quit(cls): - if (cls.debug_gdbserver or cls.debug_gdb): + if cls.debug_gdbserver or cls.debug_gdb: try: cls.vpp.poll() @@ -650,8 +742,10 @@ class VppTestCase(CPUInterface, unittest.TestCase): print(double_line_delim) print("VPP or GDB server is still running") print(single_line_delim) - input("When done debugging, press ENTER to kill the " - "process and finish running the testcase...") + input( + "When done debugging, press ENTER to kill the " + "process and finish running the testcase..." + ) except AttributeError: pass @@ -663,25 +757,23 @@ class VppTestCase(CPUInterface, unittest.TestCase): cls._debug_quit() # first signal that we want to stop the pump thread, then wake it up - if hasattr(cls, 'pump_thread_stop_flag'): + if hasattr(cls, "pump_thread_stop_flag"): cls.pump_thread_stop_flag.set() - if hasattr(cls, 'pump_thread_wakeup_pipe'): - os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up') - if hasattr(cls, 'pump_thread'): + if hasattr(cls, "pump_thread_wakeup_pipe"): + os.write(cls.pump_thread_wakeup_pipe[1], b"ding dong wake up") + if hasattr(cls, "pump_thread"): cls.logger.debug("Waiting for pump thread to stop") cls.pump_thread.join() - if hasattr(cls, 'vpp_stderr_reader_thread'): + if hasattr(cls, "vpp_stderr_reader_thread"): cls.logger.debug("Waiting for stderr pump to stop") cls.vpp_stderr_reader_thread.join() - if hasattr(cls, 'vpp'): - if hasattr(cls, 'vapi'): + if hasattr(cls, "vpp"): + if hasattr(cls, "vapi"): cls.logger.debug(cls.vapi.vpp.get_stats()) - cls.logger.debug("Disconnecting class vapi client on %s", - cls.__name__) + cls.logger.debug("Disconnecting class vapi client on %s", cls.__name__) cls.vapi.disconnect() - cls.logger.debug("Deleting class vapi attribute on %s", - cls.__name__) + cls.logger.debug("Deleting class vapi attribute on %s", cls.__name__) del cls.vapi cls.vpp.poll() if not cls.debug_attach and cls.vpp.returncode is None: @@ -694,8 +786,7 @@ class VppTestCase(CPUInterface, unittest.TestCase): except subprocess.TimeoutExpired: cls.vpp.kill() outs, errs = cls.vpp.communicate() - cls.logger.debug("Deleting class vpp attribute on %s", - cls.__name__) + cls.logger.debug("Deleting class vpp attribute on %s", cls.__name__) if not cls.debug_attach: cls.vpp.stdout.close() cls.vpp.stderr.close() @@ -708,32 +799,31 @@ class VppTestCase(CPUInterface, unittest.TestCase): stdout_log = cls.logger.info stderr_log = cls.logger.info - if hasattr(cls, 'vpp_stdout_deque'): + if hasattr(cls, "vpp_stdout_deque"): stdout_log(single_line_delim) - stdout_log('VPP output to stdout while running %s:', cls.__name__) + stdout_log("VPP output to stdout while running %s:", cls.__name__) stdout_log(single_line_delim) vpp_output = "".join(cls.vpp_stdout_deque) - with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f: + with open(cls.tempdir + "/vpp_stdout.txt", "w") as f: f.write(vpp_output) - stdout_log('\n%s', vpp_output) + stdout_log("\n%s", vpp_output) stdout_log(single_line_delim) - if hasattr(cls, 'vpp_stderr_deque'): + if hasattr(cls, "vpp_stderr_deque"): stderr_log(single_line_delim) - stderr_log('VPP output to stderr while running %s:', cls.__name__) + stderr_log("VPP output to stderr while running %s:", cls.__name__) stderr_log(single_line_delim) vpp_output = "".join(cls.vpp_stderr_deque) - with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f: + with open(cls.tempdir + "/vpp_stderr.txt", "w") as f: f.write(vpp_output) - stderr_log('\n%s', vpp_output) + stderr_log("\n%s", vpp_output) stderr_log(single_line_delim) @classmethod def tearDownClass(cls): - """ Perform final cleanup after running all tests in this test-case """ - cls.logger.debug("--- tearDownClass() for %s called ---" % - cls.__name__) - cls.reporter.send_keep_alive(cls, 'tearDownClass') + """Perform final cleanup after running all tests in this test-case""" + cls.logger.debug("--- tearDownClass() for %s called ---" % cls.__name__) + cls.reporter.send_keep_alive(cls, "tearDownClass") cls.quit() cls.file_handler.close() cls.reset_packet_infos() @@ -741,14 +831,15 @@ class VppTestCase(CPUInterface, unittest.TestCase): debug_internal.on_tear_down_class(cls) def show_commands_at_teardown(self): - """ Allow subclass specific teardown logging additions.""" + """Allow subclass specific teardown logging additions.""" self.logger.info("--- No test specific show commands provided. ---") def tearDown(self): - """ Show various debug prints after each test """ - self.logger.debug("--- tearDown() for %s.%s(%s) called ---" % - (self.__class__.__name__, self._testMethodName, - self._testMethodDoc)) + """Show various debug prints after each test""" + self.logger.debug( + "--- tearDown() for %s.%s(%s) called ---" + % (self.__class__.__name__, self._testMethodName, self._testMethodDoc) + ) try: if not self.vpp_dead: @@ -769,32 +860,35 @@ class VppTestCase(CPUInterface, unittest.TestCase): tmp_api_trace = "/tmp/%s" % api_trace vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace) self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace)) - self.logger.info("Moving %s to %s\n" % (tmp_api_trace, - vpp_api_trace_log)) + self.logger.info("Moving %s to %s\n" % (tmp_api_trace, vpp_api_trace_log)) os.rename(tmp_api_trace, vpp_api_trace_log) except VppTransportSocketIOError: - self.logger.debug("VppTransportSocketIOError: Vpp dead. " - "Cannot log show commands.") + self.logger.debug( + "VppTransportSocketIOError: Vpp dead. Cannot log show commands." + ) self.vpp_dead = True else: self.registry.unregister_all(self.logger) def setUp(self): - """ Clear trace before running each test""" + """Clear trace before running each test""" super(VppTestCase, self).setUp() self.reporter.send_keep_alive(self) if self.vpp_dead: - raise VppDiedError(rv=None, testcase=self.__class__.__name__, - method_name=self._testMethodName) - self.sleep(.1, "during setUp") + raise VppDiedError( + rv=None, + testcase=self.__class__.__name__, + method_name=self._testMethodName, + ) + self.sleep(0.1, "during setUp") self.vpp_stdout_deque.append( - "--- test setUp() for %s.%s(%s) starts here ---\n" % - (self.__class__.__name__, self._testMethodName, - self._testMethodDoc)) + "--- test setUp() for %s.%s(%s) starts here ---\n" + % (self.__class__.__name__, self._testMethodName, self._testMethodDoc) + ) self.vpp_stderr_deque.append( - "--- test setUp() for %s.%s(%s) starts here ---\n" % - (self.__class__.__name__, self._testMethodName, - self._testMethodDoc)) + "--- test setUp() for %s.%s(%s) starts here ---\n" + % (self.__class__.__name__, self._testMethodName, self._testMethodDoc) + ) self.vapi.cli("clear trace") # store the test instance inside the test class - so that objects # holding the class can access instance methods (like assertEqual) @@ -816,7 +910,7 @@ class VppTestCase(CPUInterface, unittest.TestCase): @classmethod def register_pcap(cls, intf, worker): - """ Register a pcap in the testclass """ + """Register a pcap in the testclass""" # add to the list of captures with current timestamp cls._pcaps.append((intf, worker)) @@ -824,14 +918,14 @@ class VppTestCase(CPUInterface, unittest.TestCase): def get_vpp_time(cls): # processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT" # returns float("2.190522") - timestr = cls.vapi.cli('show clock') - head, sep, tail = timestr.partition(',') - head, sep, tail = head.partition('Time now') + timestr = cls.vapi.cli("show clock") + head, sep, tail = timestr.partition(",") + head, sep, tail = head.partition("Time now") return float(tail) @classmethod def sleep_on_vpp_time(cls, sec): - """ Sleep according to time in VPP world """ + """Sleep according to time in VPP world""" # On a busy system with many processes # we might end up with VPP time being slower than real world # So take that into account when waiting for VPP to do something @@ -841,34 +935,31 @@ class VppTestCase(CPUInterface, unittest.TestCase): @classmethod def pg_start(cls, trace=True): - """ Enable the PG, wait till it is done, then clean up """ + """Enable the PG, wait till it is done, then clean up""" for (intf, worker) in cls._old_pcaps: - intf.handle_old_pcap_file(intf.get_in_path(worker), - intf.in_history_counter) + intf.handle_old_pcap_file(intf.get_in_path(worker), intf.in_history_counter) cls._old_pcaps = [] if trace: cls.vapi.cli("clear trace") cls.vapi.cli("trace add pg-input 1000") - cls.vapi.cli('packet-generator enable') + cls.vapi.cli("packet-generator enable") # PG, when starts, runs to completion - # so let's avoid a race condition, # and wait a little till it's done. # Then clean it up - and then be gone. deadline = time.time() + 300 - while cls.vapi.cli('show packet-generator').find("Yes") != -1: + while cls.vapi.cli("show packet-generator").find("Yes") != -1: cls.sleep(0.01) # yield if time.time() > deadline: cls.logger.error("Timeout waiting for pg to stop") break for intf, worker in cls._pcaps: - cls.vapi.cli('packet-generator delete %s' % - intf.get_cap_name(worker)) + cls.vapi.cli("packet-generator delete %s" % intf.get_cap_name(worker)) cls._old_pcaps = cls._pcaps cls._pcaps = [] @classmethod - def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0, - mode=None): + def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0, mode=None): """ Create packet-generator interfaces. @@ -887,26 +978,30 @@ class VppTestCase(CPUInterface, unittest.TestCase): @classmethod def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0): pgmode = VppEnum.vl_api_pg_interface_mode_t - return cls.create_pg_interfaces_internal(interfaces, gso, gso_size, - pgmode.PG_API_MODE_IP4) + return cls.create_pg_interfaces_internal( + interfaces, gso, gso_size, pgmode.PG_API_MODE_IP4 + ) @classmethod def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0): pgmode = VppEnum.vl_api_pg_interface_mode_t - return cls.create_pg_interfaces_internal(interfaces, gso, gso_size, - pgmode.PG_API_MODE_IP6) + return cls.create_pg_interfaces_internal( + interfaces, gso, gso_size, pgmode.PG_API_MODE_IP6 + ) @classmethod def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0): pgmode = VppEnum.vl_api_pg_interface_mode_t - return cls.create_pg_interfaces_internal(interfaces, gso, gso_size, - pgmode.PG_API_MODE_ETHERNET) + return cls.create_pg_interfaces_internal( + interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET + ) @classmethod def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0): pgmode = VppEnum.vl_api_pg_interface_mode_t - return cls.create_pg_interfaces_internal(interfaces, gso, gso_size, - pgmode.PG_API_MODE_ETHERNET) + return cls.create_pg_interfaces_internal( + interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET + ) @classmethod def create_loopback_interfaces(cls, count): @@ -937,7 +1032,7 @@ class VppTestCase(CPUInterface, unittest.TestCase): return result @staticmethod - def extend_packet(packet, size, padding=' '): + def extend_packet(packet, size, padding=" "): """ Extend packet to given size by padding with spaces or custom padding NOTE: Currently works only when Raw layer is present. @@ -955,7 +1050,7 @@ class VppTestCase(CPUInterface, unittest.TestCase): @classmethod def reset_packet_infos(cls): - """ Reset the list of packet info objects and packet counts to zero """ + """Reset the list of packet info objects and packet counts to zero""" cls._packet_infos = {} cls._packet_count_for_dst_if_idx = {} @@ -997,11 +1092,10 @@ class VppTestCase(CPUInterface, unittest.TestCase): """ # retrieve payload, currently 18 bytes (4 x ints + 1 short) - return pack('iiiih', info.index, info.src, - info.dst, info.ip, info.proto) + return pack("iiiih", info.index, info.src, info.dst, info.ip, info.proto) @staticmethod - def payload_to_info(payload, payload_field='load'): + def payload_to_info(payload, payload_field="load"): """ Convert packet payload to _PacketInfo object @@ -1018,12 +1112,11 @@ class VppTestCase(CPUInterface, unittest.TestCase): payload_b = getattr(payload, payload_field)[:18] info = _PacketInfo() - info.index, info.src, info.dst, info.ip, info.proto \ - = unpack('iiiih', payload_b) + info.index, info.src, info.dst, info.ip, info.proto = unpack("iiiih", payload_b) # some SRv6 TCs depend on get an exception if bad values are detected if info.index > 0x4000: - raise ValueError('Index value is invalid') + raise ValueError("Index value is invalid") return info @@ -1086,32 +1179,38 @@ class VppTestCase(CPUInterface, unittest.TestCase): return try: msg = "Invalid %s: %d('%s') does not match expected value %d('%s')" - msg = msg % (getdoc(name_or_class).strip(), - real_value, str(name_or_class(real_value)), - expected_value, str(name_or_class(expected_value))) + msg = msg % ( + getdoc(name_or_class).strip(), + real_value, + str(name_or_class(real_value)), + expected_value, + str(name_or_class(expected_value)), + ) except Exception: msg = "Invalid %s: %s does not match expected value %s" % ( - name_or_class, real_value, expected_value) + name_or_class, + real_value, + expected_value, + ) self.assertEqual(real_value, expected_value, msg) - def assert_in_range(self, - real_value, - expected_min, - expected_max, - name=None): + def assert_in_range(self, real_value, expected_min, expected_max, name=None): if name is None: msg = None else: msg = "Invalid %s: %s out of range <%s,%s>" % ( - name, real_value, expected_min, expected_max) + name, + real_value, + expected_min, + expected_max, + ) self.assertTrue(expected_min <= real_value <= expected_max, msg) - def assert_packet_checksums_valid(self, packet, - ignore_zero_udp_checksums=True): + def assert_packet_checksums_valid(self, packet, ignore_zero_udp_checksums=True): received = packet.__class__(scapy.compat.raw(packet)) - udp_layers = ['UDP', 'UDPerror'] - checksum_fields = ['cksum', 'chksum'] + udp_layers = ["UDP", "UDPerror"] + checksum_fields = ["cksum", "chksum"] checksums = [] counter = 0 temp = received.__class__(scapy.compat.raw(received)) @@ -1122,9 +1221,11 @@ class VppTestCase(CPUInterface, unittest.TestCase): layer.remove_payload() for cf in checksum_fields: if hasattr(layer, cf): - if ignore_zero_udp_checksums and \ - 0 == getattr(layer, cf) and \ - layer.name in udp_layers: + if ( + ignore_zero_udp_checksums + and 0 == getattr(layer, cf) + and layer.name in udp_layers + ): continue delattr(temp.getlayer(counter), cf) checksums.append((counter, cf)) @@ -1137,71 +1238,76 @@ class VppTestCase(CPUInterface, unittest.TestCase): for layer, cf in checksums: calc_sum = getattr(temp[layer], cf) self.assert_equal( - getattr(received[layer], cf), calc_sum, - "packet checksum on layer #%d: %s" % (layer, temp[layer].name)) + getattr(received[layer], cf), + calc_sum, + "packet checksum on layer #%d: %s" % (layer, temp[layer].name), + ) self.logger.debug( - "Checksum field `%s` on `%s` layer has correct value `%s`" % - (cf, temp[layer].name, calc_sum)) - - def assert_checksum_valid(self, received_packet, layer, - field_name='chksum', - ignore_zero_checksum=False): - """ Check checksum of received packet on given layer """ + "Checksum field `%s` on `%s` layer has correct value `%s`" + % (cf, temp[layer].name, calc_sum) + ) + + def assert_checksum_valid( + self, received_packet, layer, field_name="chksum", ignore_zero_checksum=False + ): + """Check checksum of received packet on given layer""" received_packet_checksum = getattr(received_packet[layer], field_name) if ignore_zero_checksum and 0 == received_packet_checksum: return - recalculated = received_packet.__class__( - scapy.compat.raw(received_packet)) + recalculated = received_packet.__class__(scapy.compat.raw(received_packet)) delattr(recalculated[layer], field_name) recalculated = recalculated.__class__(scapy.compat.raw(recalculated)) - self.assert_equal(received_packet_checksum, - getattr(recalculated[layer], field_name), - "packet checksum on layer: %s" % layer) - - def assert_ip_checksum_valid(self, received_packet, - ignore_zero_checksum=False): - self.assert_checksum_valid(received_packet, 'IP', - ignore_zero_checksum=ignore_zero_checksum) - - def assert_tcp_checksum_valid(self, received_packet, - ignore_zero_checksum=False): - self.assert_checksum_valid(received_packet, 'TCP', - ignore_zero_checksum=ignore_zero_checksum) - - def assert_udp_checksum_valid(self, received_packet, - ignore_zero_checksum=True): - self.assert_checksum_valid(received_packet, 'UDP', - ignore_zero_checksum=ignore_zero_checksum) + self.assert_equal( + received_packet_checksum, + getattr(recalculated[layer], field_name), + "packet checksum on layer: %s" % layer, + ) + + def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False): + self.assert_checksum_valid( + received_packet, "IP", ignore_zero_checksum=ignore_zero_checksum + ) + + def assert_tcp_checksum_valid(self, received_packet, ignore_zero_checksum=False): + self.assert_checksum_valid( + received_packet, "TCP", ignore_zero_checksum=ignore_zero_checksum + ) + + def assert_udp_checksum_valid(self, received_packet, ignore_zero_checksum=True): + self.assert_checksum_valid( + received_packet, "UDP", ignore_zero_checksum=ignore_zero_checksum + ) def assert_embedded_icmp_checksum_valid(self, received_packet): if received_packet.haslayer(IPerror): - self.assert_checksum_valid(received_packet, 'IPerror') + self.assert_checksum_valid(received_packet, "IPerror") if received_packet.haslayer(TCPerror): - self.assert_checksum_valid(received_packet, 'TCPerror') + self.assert_checksum_valid(received_packet, "TCPerror") if received_packet.haslayer(UDPerror): - self.assert_checksum_valid(received_packet, 'UDPerror', - ignore_zero_checksum=True) + self.assert_checksum_valid( + received_packet, "UDPerror", ignore_zero_checksum=True + ) if received_packet.haslayer(ICMPerror): - self.assert_checksum_valid(received_packet, 'ICMPerror') + self.assert_checksum_valid(received_packet, "ICMPerror") def assert_icmp_checksum_valid(self, received_packet): - self.assert_checksum_valid(received_packet, 'ICMP') + self.assert_checksum_valid(received_packet, "ICMP") self.assert_embedded_icmp_checksum_valid(received_packet) def assert_icmpv6_checksum_valid(self, pkt): if pkt.haslayer(ICMPv6DestUnreach): - self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum') + self.assert_checksum_valid(pkt, "ICMPv6DestUnreach", "cksum") self.assert_embedded_icmp_checksum_valid(pkt) if pkt.haslayer(ICMPv6EchoRequest): - self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum') + self.assert_checksum_valid(pkt, "ICMPv6EchoRequest", "cksum") if pkt.haslayer(ICMPv6EchoReply): - self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum') + self.assert_checksum_valid(pkt, "ICMPv6EchoReply", "cksum") def get_counter(self, counter): if counter.startswith("/"): counter_value = self.statistics.get_counter(counter) else: - counters = self.vapi.cli("sh errors").split('\n') + counters = self.vapi.cli("sh errors").split("\n") counter_value = 0 for i in range(1, len(counters) - 1): results = counters[i].split() @@ -1210,8 +1316,7 @@ class VppTestCase(CPUInterface, unittest.TestCase): break return counter_value - def assert_counter_equal(self, counter, expected_value, - thread=None, index=0): + def assert_counter_equal(self, counter, expected_value, thread=None, index=0): c = self.get_counter(counter) if thread is not None: c = c[thread][index] @@ -1221,13 +1326,13 @@ class VppTestCase(CPUInterface, unittest.TestCase): def assert_packet_counter_equal(self, counter, expected_value): counter_value = self.get_counter(counter) - self.assert_equal(counter_value, expected_value, - "packet counter `%s'" % counter) + self.assert_equal( + counter_value, expected_value, "packet counter `%s'" % counter + ) def assert_error_counter_equal(self, counter, expected_value): counter_value = self.statistics[counter].sum() - self.assert_equal(counter_value, expected_value, - "error counter `%s'" % counter) + self.assert_equal(counter_value, expected_value, "error counter `%s'" % counter) @classmethod def sleep(cls, timeout, remark=None): @@ -1238,7 +1343,7 @@ class VppTestCase(CPUInterface, unittest.TestCase): # https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa if timeout == 0: # yield quantum - if hasattr(os, 'sched_yield'): + if hasattr(os, "sched_yield"): os.sched_yield() else: time.sleep(0) @@ -1249,13 +1354,18 @@ class VppTestCase(CPUInterface, unittest.TestCase): time.sleep(timeout) after = time.time() if after - before > 2 * timeout: - cls.logger.error("unexpected self.sleep() result - " - "slept for %es instead of ~%es!", - after - before, timeout) + cls.logger.error( + "unexpected self.sleep() result - slept for %es instead of ~%es!", + after - before, + timeout, + ) cls.logger.debug( "Finished sleep (%s) - slept %es (wanted %es)", - remark, after - before, timeout) + remark, + after - before, + timeout, + ) def virtual_sleep(self, timeout, remark=None): self.logger.debug("Moving VPP time by %s (%s)", timeout, remark) @@ -1285,7 +1395,8 @@ class VppTestCase(CPUInterface, unittest.TestCase): stats_snapshot[cntr].sum() + diff, f"'{cntr}' counter value (previous value: " f"{stats_snapshot[cntr].sum()}, " - f"expected diff: {diff})") + f"expected diff: {diff})", + ) else: try: self.assert_equal( @@ -1293,7 +1404,8 @@ class VppTestCase(CPUInterface, unittest.TestCase): stats_snapshot[cntr][:, sw_if_index].sum() + diff, f"'{cntr}' counter value (previous value: " f"{stats_snapshot[cntr][:, sw_if_index].sum()}, " - f"expected diff: {diff})") + f"expected diff: {diff})", + ) except IndexError: # if diff is 0, then this most probably a case where # test declares multiple interfaces but traffic hasn't @@ -1302,8 +1414,9 @@ class VppTestCase(CPUInterface, unittest.TestCase): if 0 != diff: raise - def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None, - stats_diff=None, trace=True, msg=None): + def send_and_assert_no_replies( + self, intf, pkts, remark="", timeout=None, stats_diff=None, trace=True, msg=None + ): if stats_diff: stats_snapshot = self.snapshot_stats(stats_diff) @@ -1324,8 +1437,17 @@ class VppTestCase(CPUInterface, unittest.TestCase): if stats_diff: self.compare_stats_with_snapshot(stats_diff, stats_snapshot) - def send_and_expect(self, intf, pkts, output, n_rx=None, worker=None, - trace=True, msg=None, stats_diff=None): + def send_and_expect( + self, + intf, + pkts, + output, + n_rx=None, + worker=None, + trace=True, + msg=None, + stats_diff=None, + ): if stats_diff: stats_snapshot = self.snapshot_stats(stats_diff) @@ -1343,8 +1465,9 @@ class VppTestCase(CPUInterface, unittest.TestCase): return rx - def send_and_expect_load_balancing(self, input, pkts, outputs, - worker=None, trace=True): + def send_and_expect_load_balancing( + self, input, pkts, outputs, worker=None, trace=True + ): self.pg_send(input, pkts, worker=worker, trace=trace) rxs = [] for oo in outputs: @@ -1355,9 +1478,7 @@ class VppTestCase(CPUInterface, unittest.TestCase): self.logger.debug(self.vapi.cli("show trace")) return rxs - def send_and_expect_some(self, intf, pkts, output, - worker=None, - trace=True): + def send_and_expect_some(self, intf, pkts, output, worker=None, trace=True): self.pg_send(intf, pkts, worker=worker, trace=trace) rx = output._get_capture(1) if trace: @@ -1366,8 +1487,7 @@ class VppTestCase(CPUInterface, unittest.TestCase): self.assertTrue(len(rx) < len(pkts)) return rx - def send_and_expect_only(self, intf, pkts, output, timeout=None, - stats_diff=None): + def send_and_expect_only(self, intf, pkts, output, timeout=None, stats_diff=None): if stats_diff: stats_snapshot = self.snapshot_stats(stats_diff) @@ -1427,8 +1547,7 @@ class VppTestResult(unittest.TestResult): core_crash_test_cases_info = set() current_test_case_info = None - def __init__(self, stream=None, descriptions=None, verbosity=None, - runner=None): + def __init__(self, stream=None, descriptions=None, verbosity=None, runner=None): """ :param stream File descriptor to store where to report test results. Set to the standard error stream by default. @@ -1453,9 +1572,9 @@ class VppTestResult(unittest.TestResult): """ if self.current_test_case_info: self.current_test_case_info.logger.debug( - "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__, - test._testMethodName, - test._testMethodDoc)) + "--- addSuccess() %s.%s(%s) called" + % (test.__class__.__name__, test._testMethodName, test._testMethodDoc) + ) unittest.TestResult.addSuccess(self, test) self.result_string = colorize("OK", GREEN) @@ -1471,9 +1590,14 @@ class VppTestResult(unittest.TestResult): """ if self.current_test_case_info: self.current_test_case_info.logger.debug( - "--- addSkip() %s.%s(%s) called, reason is %s" % - (test.__class__.__name__, test._testMethodName, - test._testMethodDoc, reason)) + "--- addSkip() %s.%s(%s) called, reason is %s" + % ( + test.__class__.__name__, + test._testMethodName, + test._testMethodDoc, + reason, + ) + ) unittest.TestResult.addSkip(self, test, reason) self.result_string = colorize("SKIP", YELLOW) @@ -1488,17 +1612,18 @@ class VppTestResult(unittest.TestResult): failed_dir = config.failed_dir link_path = os.path.join( failed_dir, - '%s-FAILED' % - os.path.basename(self.current_test_case_info.tempdir)) + "%s-FAILED" % os.path.basename(self.current_test_case_info.tempdir), + ) self.current_test_case_info.logger.debug( - "creating a link to the failed test") + "creating a link to the failed test" + ) self.current_test_case_info.logger.debug( - "os.symlink(%s, %s)" % - (self.current_test_case_info.tempdir, link_path)) + "os.symlink(%s, %s)" + % (self.current_test_case_info.tempdir, link_path) + ) if os.path.exists(link_path): - self.current_test_case_info.logger.debug( - 'symlink already exists') + self.current_test_case_info.logger.debug("symlink already exists") else: os.symlink(self.current_test_case_info.tempdir, link_path) @@ -1506,7 +1631,7 @@ class VppTestResult(unittest.TestResult): self.current_test_case_info.logger.error(e) def send_result_through_pipe(self, test, result): - if hasattr(self, 'test_framework_result_pipe'): + if hasattr(self, "test_framework_result_pipe"): pipe = self.test_framework_result_pipe if pipe: pipe.send((test.id(), result)) @@ -1516,32 +1641,37 @@ class VppTestResult(unittest.TestResult): if isinstance(test, unittest.suite._ErrorHolder): test_name = test.description else: - test_name = '%s.%s(%s)' % (test.__class__.__name__, - test._testMethodName, - test._testMethodDoc) + test_name = "%s.%s(%s)" % ( + test.__class__.__name__, + test._testMethodName, + test._testMethodDoc, + ) self.current_test_case_info.logger.debug( - "--- %s() %s called, err is %s" % - (fn_name, test_name, err)) + "--- %s() %s called, err is %s" % (fn_name, test_name, err) + ) self.current_test_case_info.logger.debug( - "formatted exception is:\n%s" % - "".join(format_exception(*err))) + "formatted exception is:\n%s" % "".join(format_exception(*err)) + ) def add_error(self, test, err, unittest_fn, error_type): if error_type == FAIL: - self.log_error(test, err, 'addFailure') + self.log_error(test, err, "addFailure") error_type_str = colorize("FAIL", RED) elif error_type == ERROR: - self.log_error(test, err, 'addError') + self.log_error(test, err, "addError") error_type_str = colorize("ERROR", RED) else: - raise Exception('Error type %s cannot be used to record an ' - 'error or a failure' % error_type) + raise Exception( + "Error type %s cannot be used to record an " + "error or a failure" % error_type + ) unittest_fn(self, test, err) if self.current_test_case_info: - self.result_string = "%s [ temp dir used by test case: %s ]" % \ - (error_type_str, - self.current_test_case_info.tempdir) + self.result_string = "%s [ temp dir used by test case: %s ]" % ( + error_type_str, + self.current_test_case_info.tempdir, + ) self.symlink_failed() self.failed_test_cases_info.add(self.current_test_case_info) if is_core_present(self.current_test_case_info.tempdir): @@ -1550,12 +1680,12 @@ class VppTestResult(unittest.TestResult): test_name = str(test) else: test_name = "'{!s}' ({!s})".format( - get_testcase_doc_name(test), test.id()) + get_testcase_doc_name(test), test.id() + ) self.current_test_case_info.core_crash_test = test_name - self.core_crash_test_cases_info.add( - self.current_test_case_info) + self.core_crash_test_cases_info.add(self.current_test_case_info) else: - self.result_string = '%s [no temp dir]' % error_type_str + self.result_string = "%s [no temp dir]" % error_type_str self.send_result_through_pipe(test, error_type) @@ -1613,15 +1743,13 @@ class VppTestResult(unittest.TestResult): # This block may overwrite the colorized title above, # but we want this to stand out and be fixed if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS): - test_title = colorize( - f"FIXME with VPP workers: {test_title}", RED) + test_title = colorize(f"FIXME with VPP workers: {test_title}", RED) if test.has_tag(TestCaseTag.FIXME_ASAN): - test_title = colorize( - f"FIXME with ASAN: {test_title}", RED) + test_title = colorize(f"FIXME with ASAN: {test_title}", RED) test.skip_fixme_asan() - if hasattr(test, 'vpp_worker_count'): + if hasattr(test, "vpp_worker_count"): if test.vpp_worker_count == 0: test_title += " [main thread only]" elif test.vpp_worker_count == 1: @@ -1633,7 +1761,9 @@ class VppTestResult(unittest.TestResult): test_title = colorize( f"{test_title} [skipped - not enough cpus, " f"required={test.__class__.get_cpus_required()}, " - f"available={max_vpp_cpus}]", YELLOW) + f"available={max_vpp_cpus}]", + YELLOW, + ) print(double_line_delim) print(test_title) @@ -1644,8 +1774,7 @@ class VppTestResult(unittest.TestResult): self.start_test = time.time() unittest.TestResult.startTest(self, test) if self.verbosity > 0: - self.stream.writeln( - "Starting " + self.getDescription(test) + " ...") + self.stream.writeln("Starting " + self.getDescription(test) + " ...") self.stream.writeln(single_line_delim) def stopTest(self, test): @@ -1659,14 +1788,19 @@ class VppTestResult(unittest.TestResult): if self.verbosity > 0: self.stream.writeln(single_line_delim) - self.stream.writeln("%-73s%s" % (self.getDescription(test), - self.result_string)) + self.stream.writeln( + "%-73s%s" % (self.getDescription(test), self.result_string) + ) self.stream.writeln(single_line_delim) else: - self.stream.writeln("%-68s %4.2f %s" % - (self.getDescription(test), - time.time() - self.start_test, - self.result_string)) + self.stream.writeln( + "%-68s %4.2f %s" + % ( + self.getDescription(test), + time.time() - self.start_test, + self.result_string, + ) + ) self.send_result_through_pipe(test, TEST_RUN) @@ -1676,12 +1810,12 @@ class VppTestResult(unittest.TestResult): """ if len(self.errors) > 0 or len(self.failures) > 0: self.stream.writeln() - self.printErrorList('ERROR', self.errors) - self.printErrorList('FAIL', self.failures) + self.printErrorList("ERROR", self.errors) + self.printErrorList("FAIL", self.failures) # ^^ that is the last output from unittest before summary if not self.runner.print_summary: - devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w')) + devnull = unittest.runner._WritelnDecorator(open(os.devnull, "w")) self.stream = devnull self.runner.stream = devnull @@ -1696,8 +1830,7 @@ class VppTestResult(unittest.TestResult): """ for test, err in errors: self.stream.writeln(double_line_delim) - self.stream.writeln("%s: %s" % - (flavour, self.getDescription(test))) + self.stream.writeln("%s: %s" % (flavour, self.getDescription(test))) self.stream.writeln(single_line_delim) self.stream.writeln("%s" % err) @@ -1712,14 +1845,23 @@ class VppTestRunner(unittest.TextTestRunner): """Class maintaining the results of the tests""" return VppTestResult - def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1, - result_pipe=None, failfast=False, buffer=False, - resultclass=None, print_summary=True, **kwargs): + def __init__( + self, + keep_alive_pipe=None, + descriptions=True, + verbosity=1, + result_pipe=None, + failfast=False, + buffer=False, + resultclass=None, + print_summary=True, + **kwargs, + ): # ignore stream setting here, use hard-coded stdout to be in sync # with prints from VppTestCase methods ... - super(VppTestRunner, self).__init__(sys.stdout, descriptions, - verbosity, failfast, buffer, - resultclass, **kwargs) + super(VppTestRunner, self).__init__( + sys.stdout, descriptions, verbosity, failfast, buffer, resultclass, **kwargs + ) KeepAliveReporter.pipe = keep_alive_pipe self.orig_stream = self.stream @@ -1728,10 +1870,7 @@ class VppTestRunner(unittest.TextTestRunner): self.print_summary = print_summary def _makeResult(self): - return self.resultclass(self.stream, - self.descriptions, - self.verbosity, - self) + return self.resultclass(self.stream, self.descriptions, self.verbosity, self) def run(self, test): """ @@ -1754,91 +1893,120 @@ class Worker(Thread): super(Worker, self).__init__(*args, **kwargs) self.logger = logger self.args = executable_args - if hasattr(self, 'testcase') and self.testcase.debug_all: + if hasattr(self, "testcase") and self.testcase.debug_all: if self.testcase.debug_gdbserver: - self.args = ['/usr/bin/gdbserver', 'localhost:{port}' - .format(port=self.testcase.gdbserver_port)] + args - elif self.testcase.debug_gdb and hasattr(self, 'wait_for_gdb'): + self.args = [ + "/usr/bin/gdbserver", + "localhost:{port}".format(port=self.testcase.gdbserver_port), + ] + args + elif self.testcase.debug_gdb and hasattr(self, "wait_for_gdb"): self.args.append(self.wait_for_gdb) self.app_bin = executable_args[0] self.app_name = os.path.basename(self.app_bin) - if hasattr(self, 'role'): - self.app_name += ' {role}'.format(role=self.role) + if hasattr(self, "role"): + self.app_name += " {role}".format(role=self.role) self.process = None self.result = None env = {} if env is None else env self.env = copy.deepcopy(env) def wait_for_enter(self): - if not hasattr(self, 'testcase'): + if not hasattr(self, "testcase"): return if self.testcase.debug_all and self.testcase.debug_gdbserver: print() print(double_line_delim) - print("Spawned GDB Server for '{app}' with PID: {pid}" - .format(app=self.app_name, pid=self.process.pid)) + print( + "Spawned GDB Server for '{app}' with PID: {pid}".format( + app=self.app_name, pid=self.process.pid + ) + ) elif self.testcase.debug_all and self.testcase.debug_gdb: print() print(double_line_delim) - print("Spawned '{app}' with PID: {pid}" - .format(app=self.app_name, pid=self.process.pid)) + print( + "Spawned '{app}' with PID: {pid}".format( + app=self.app_name, pid=self.process.pid + ) + ) else: return print(single_line_delim) print("You can debug '{app}' using:".format(app=self.app_name)) if self.testcase.debug_gdbserver: - print("sudo gdb " + self.app_bin + - " -ex 'target remote localhost:{port}'" - .format(port=self.testcase.gdbserver_port)) - print("Now is the time to attach gdb by running the above " - "command, set up breakpoints etc., then resume from " - "within gdb by issuing the 'continue' command") + print( + "sudo gdb " + + self.app_bin + + " -ex 'target remote localhost:{port}'".format( + port=self.testcase.gdbserver_port + ) + ) + print( + "Now is the time to attach gdb by running the above " + "command, set up breakpoints etc., then resume from " + "within gdb by issuing the 'continue' command" + ) self.testcase.gdbserver_port += 1 elif self.testcase.debug_gdb: - print("sudo gdb " + self.app_bin + - " -ex 'attach {pid}'".format(pid=self.process.pid)) - print("Now is the time to attach gdb by running the above " - "command and set up breakpoints etc., then resume from" - " within gdb by issuing the 'continue' command") + print( + "sudo gdb " + + self.app_bin + + " -ex 'attach {pid}'".format(pid=self.process.pid) + ) + print( + "Now is the time to attach gdb by running the above " + "command and set up breakpoints etc., then resume from" + " within gdb by issuing the 'continue' command" + ) print(single_line_delim) input("Press ENTER to continue running the testcase...") def run(self): executable = self.args[0] if not os.path.exists(executable) or not os.access( - executable, os.F_OK | os.X_OK): + executable, os.F_OK | os.X_OK + ): # Exit code that means some system file did not exist, # could not be opened, or had some other kind of error. self.result = os.EX_OSFILE raise EnvironmentError( - "executable '%s' is not found or executable." % executable) - self.logger.debug("Running executable '{app}': '{cmd}'" - .format(app=self.app_name, - cmd=' '.join(self.args))) + "executable '%s' is not found or executable." % executable + ) + self.logger.debug( + "Running executable '{app}': '{cmd}'".format( + app=self.app_name, cmd=" ".join(self.args) + ) + ) env = os.environ.copy() env.update(self.env) env["CK_LOG_FILE_NAME"] = "-" self.process = subprocess.Popen( - ['stdbuf', '-o0', '-e0'] + self.args, shell=False, env=env, - preexec_fn=os.setpgrp, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + ["stdbuf", "-o0", "-e0"] + self.args, + shell=False, + env=env, + preexec_fn=os.setpgrp, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) self.wait_for_enter() out, err = self.process.communicate() self.logger.debug("Finished running `{app}'".format(app=self.app_name)) self.logger.info("Return code is `%s'" % self.process.returncode) self.logger.info(single_line_delim) - self.logger.info("Executable `{app}' wrote to stdout:" - .format(app=self.app_name)) + self.logger.info( + "Executable `{app}' wrote to stdout:".format(app=self.app_name) + ) self.logger.info(single_line_delim) - self.logger.info(out.decode('utf-8')) + self.logger.info(out.decode("utf-8")) self.logger.info(single_line_delim) - self.logger.info("Executable `{app}' wrote to stderr:" - .format(app=self.app_name)) + self.logger.info( + "Executable `{app}' wrote to stderr:".format(app=self.app_name) + ) self.logger.info(single_line_delim) - self.logger.info(err.decode('utf-8')) + self.logger.info(err.decode("utf-8")) self.logger.info(single_line_delim) self.result = self.process.returncode -if __name__ == '__main__': +if __name__ == "__main__": pass |