aboutsummaryrefslogtreecommitdiffstats
path: root/test/framework.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/framework.py')
-rw-r--r--test/framework.py880
1 files changed, 524 insertions, 356 deletions
diff --git a/test/framework.py b/test/framework.py
index 8065518ff7a..05e59b577cb 100644
--- a/test/framework.py
+++ b/test/framework.py
@@ -37,8 +37,15 @@ from vpp_papi import VppEnum
import vpp_papi
from vpp_papi.vpp_stats import VPPStats
from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
-from log import RED, GREEN, YELLOW, double_line_delim, single_line_delim, \
- get_logger, colorize
+from log import (
+ RED,
+ GREEN,
+ YELLOW,
+ double_line_delim,
+ single_line_delim,
+ get_logger,
+ colorize,
+)
from vpp_object import VppObjectRegistry
from util import ppp, is_core_present
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
@@ -49,7 +56,7 @@ from scapy.layers.inet6 import ICMPv6EchoReply
logger = logging.getLogger(__name__)
# Set up an empty logger for the testcase that can be overridden as necessary
-null_logger = logging.getLogger('VppTestCase')
+null_logger = logging.getLogger("VppTestCase")
null_logger.addHandler(logging.NullHandler())
PASS = 0
@@ -72,10 +79,13 @@ if config.debug_framework:
class VppDiedError(Exception):
- """ exception for reporting that the subprocess has died."""
+ """exception for reporting that the subprocess has died."""
- signals_by_value = {v: k for k, v in signal.__dict__.items() if
- k.startswith('SIG') and not k.startswith('SIG_')}
+ signals_by_value = {
+ v: k
+ for k, v in signal.__dict__.items()
+ if k.startswith("SIG") and not k.startswith("SIG_")
+ }
def __init__(self, rv=None, testcase=None, method_name=None):
self.rv = rv
@@ -89,15 +99,16 @@ class VppDiedError(Exception):
pass
if testcase is None and method_name is None:
- in_msg = ''
+ in_msg = ""
else:
- in_msg = ' while running %s.%s' % (testcase, method_name)
+ in_msg = " while running %s.%s" % (testcase, method_name)
if self.rv:
- msg = "VPP subprocess died unexpectedly%s with return code: %d%s."\
- % (in_msg, self.rv, ' [%s]' %
- (self.signal_name if
- self.signal_name is not None else ''))
+ msg = "VPP subprocess died unexpectedly%s with return code: %d%s." % (
+ in_msg,
+ self.rv,
+ " [%s]" % (self.signal_name if self.signal_name is not None else ""),
+ )
else:
msg = "VPP subprocess died unexpectedly%s." % in_msg
@@ -110,6 +121,7 @@ class _PacketInfo(object):
Help process information about the next packet.
Set variables to default values.
"""
+
#: Store the index of the packet.
index = -1
#: Store the index of the source packet generator interface of the packet.
@@ -133,19 +145,23 @@ class _PacketInfo(object):
def pump_output(testclass):
- """ pump output from vpp stdout/stderr to proper queues """
+ """pump output from vpp stdout/stderr to proper queues"""
stdout_fragment = ""
stderr_fragment = ""
while not testclass.pump_thread_stop_flag.is_set():
- readable = select.select([testclass.vpp.stdout.fileno(),
- testclass.vpp.stderr.fileno(),
- testclass.pump_thread_wakeup_pipe[0]],
- [], [])[0]
+ readable = select.select(
+ [
+ testclass.vpp.stdout.fileno(),
+ testclass.vpp.stderr.fileno(),
+ testclass.pump_thread_wakeup_pipe[0],
+ ],
+ [],
+ [],
+ )[0]
if testclass.vpp.stdout.fileno() in readable:
read = os.read(testclass.vpp.stdout.fileno(), 102400)
if len(read) > 0:
- split = read.decode('ascii',
- errors='backslashreplace').splitlines(True)
+ split = read.decode("ascii", errors="backslashreplace").splitlines(True)
if len(stdout_fragment) > 0:
split[0] = "%s%s" % (stdout_fragment, split[0])
if len(split) > 0 and split[-1].endswith("\n"):
@@ -156,13 +172,11 @@ def pump_output(testclass):
testclass.vpp_stdout_deque.extend(split[:limit])
if not config.cache_vpp_output:
for line in split[:limit]:
- testclass.logger.info(
- "VPP STDOUT: %s" % line.rstrip("\n"))
+ testclass.logger.info("VPP STDOUT: %s" % line.rstrip("\n"))
if testclass.vpp.stderr.fileno() in readable:
read = os.read(testclass.vpp.stderr.fileno(), 102400)
if len(read) > 0:
- split = read.decode('ascii',
- errors='backslashreplace').splitlines(True)
+ split = read.decode("ascii", errors="backslashreplace").splitlines(True)
if len(stderr_fragment) > 0:
split[0] = "%s%s" % (stderr_fragment, split[0])
if len(split) > 0 and split[-1].endswith("\n"):
@@ -174,14 +188,13 @@ def pump_output(testclass):
testclass.vpp_stderr_deque.extend(split[:limit])
if not config.cache_vpp_output:
for line in split[:limit]:
- testclass.logger.error(
- "VPP STDERR: %s" % line.rstrip("\n"))
+ testclass.logger.error("VPP STDERR: %s" % line.rstrip("\n"))
# ignoring the dummy pipe here intentionally - the
# flag will take care of properly terminating the loop
def _is_platform_aarch64():
- return platform.machine() == 'aarch64'
+ return platform.machine() == "aarch64"
is_platform_aarch64 = _is_platform_aarch64()
@@ -191,6 +204,7 @@ class KeepAliveReporter(object):
"""
Singleton object which reports test start to parent process
"""
+
_shared_state = {}
def __init__(self):
@@ -216,7 +230,7 @@ class KeepAliveReporter(object):
return
if isclass(test):
- desc = '%s (%s)' % (desc, unittest.util.strclass(test))
+ desc = "%s (%s)" % (desc, unittest.util.strclass(test))
else:
desc = test.id()
@@ -240,6 +254,7 @@ def create_tag_decorator(e):
except AttributeError:
cls.test_tags = [e]
return cls
+
return decorator
@@ -250,7 +265,7 @@ tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
class DummyVpp:
returncode = None
- pid = 0xcafebafe
+ pid = 0xCAFEBAFE
def poll(self):
pass
@@ -300,7 +315,7 @@ class VppTestCase(CPUInterface, unittest.TestCase):
@classmethod
def has_tag(cls, tag):
- """ if the test case has a given tag - return true """
+ """if the test case has a given tag - return true"""
try:
return tag in cls.test_tags
except AttributeError:
@@ -309,15 +324,15 @@ class VppTestCase(CPUInterface, unittest.TestCase):
@classmethod
def is_tagged_run_solo(cls):
- """ if the test case class is timing-sensitive - return true """
+ """if the test case class is timing-sensitive - return true"""
return cls.has_tag(TestCaseTag.RUN_SOLO)
@classmethod
def skip_fixme_asan(cls):
- """ if @tag_fixme_asan & ASan is enabled - mark for skip """
+ """if @tag_fixme_asan & ASan is enabled - mark for skip"""
if cls.has_tag(TestCaseTag.FIXME_ASAN):
- vpp_extra_cmake_args = os.environ.get('VPP_EXTRA_CMAKE_ARGS', '')
- if 'DVPP_ENABLE_SANITIZE_ADDR=ON' in vpp_extra_cmake_args:
+ vpp_extra_cmake_args = os.environ.get("VPP_EXTRA_CMAKE_ARGS", "")
+ if "DVPP_ENABLE_SANITIZE_ADDR=ON" in vpp_extra_cmake_args:
cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
@classmethod
@@ -364,7 +379,7 @@ class VppTestCase(CPUInterface, unittest.TestCase):
@classmethod
def setUpConstants(cls):
- """ Set-up the test case class based on environment variables """
+ """Set-up the test case class based on environment variables"""
cls.step = config.step
cls.plugin_path = ":".join(config.vpp_plugin_dir)
cls.test_plugin_path = ":".join(config.vpp_test_plugin_dir)
@@ -385,34 +400,92 @@ class VppTestCase(CPUInterface, unittest.TestCase):
api_fuzzing = config.api_fuzz
if api_fuzzing is None:
- api_fuzzing = 'off'
+ api_fuzzing = "off"
cls.vpp_cmdline = [
config.vpp,
- "unix", "{", "nodaemon", debug_cli, "full-coredump",
- coredump_size, "runtime-dir", cls.tempdir, "}",
- "api-trace", "{", "on", "}",
- "api-segment", "{", "prefix", cls.get_api_segment_prefix(), "}",
- "cpu", "{", "main-core", str(cls.cpus[0]), ]
+ "unix",
+ "{",
+ "nodaemon",
+ debug_cli,
+ "full-coredump",
+ coredump_size,
+ "runtime-dir",
+ cls.tempdir,
+ "}",
+ "api-trace",
+ "{",
+ "on",
+ "}",
+ "api-segment",
+ "{",
+ "prefix",
+ cls.get_api_segment_prefix(),
+ "}",
+ "cpu",
+ "{",
+ "main-core",
+ str(cls.cpus[0]),
+ ]
if cls.extern_plugin_path not in (None, ""):
- cls.extra_vpp_plugin_config.append(
- "add-path %s" % cls.extern_plugin_path)
+ cls.extra_vpp_plugin_config.append("add-path %s" % cls.extern_plugin_path)
if cls.get_vpp_worker_count():
- cls.vpp_cmdline.extend([
- "corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])])
- cls.vpp_cmdline.extend([
- "}",
- "physmem", "{", "max-size", "32m", "}",
- "statseg", "{", "socket-name", cls.get_stats_sock_path(),
- cls.extra_vpp_statseg_config, "}",
- "socksvr", "{", "socket-name", cls.get_api_sock_path(), "}",
- "node { ", default_variant, "}",
- "api-fuzz {", api_fuzzing, "}",
- "plugins", "{", "plugin", "dpdk_plugin.so", "{", "disable", "}",
- "plugin", "rdma_plugin.so", "{", "disable", "}",
- "plugin", "lisp_unittest_plugin.so", "{", "enable", "}",
- "plugin", "unittest_plugin.so", "{", "enable", "}"
- ] + cls.extra_vpp_plugin_config + ["}", ])
+ cls.vpp_cmdline.extend(
+ ["corelist-workers", ",".join([str(x) for x in cls.cpus[1:]])]
+ )
+ cls.vpp_cmdline.extend(
+ [
+ "}",
+ "physmem",
+ "{",
+ "max-size",
+ "32m",
+ "}",
+ "statseg",
+ "{",
+ "socket-name",
+ cls.get_stats_sock_path(),
+ cls.extra_vpp_statseg_config,
+ "}",
+ "socksvr",
+ "{",
+ "socket-name",
+ cls.get_api_sock_path(),
+ "}",
+ "node { ",
+ default_variant,
+ "}",
+ "api-fuzz {",
+ api_fuzzing,
+ "}",
+ "plugins",
+ "{",
+ "plugin",
+ "dpdk_plugin.so",
+ "{",
+ "disable",
+ "}",
+ "plugin",
+ "rdma_plugin.so",
+ "{",
+ "disable",
+ "}",
+ "plugin",
+ "lisp_unittest_plugin.so",
+ "{",
+ "enable",
+ "}",
+ "plugin",
+ "unittest_plugin.so",
+ "{",
+ "enable",
+ "}",
+ ]
+ + cls.extra_vpp_plugin_config
+ + [
+ "}",
+ ]
+ )
if cls.extra_vpp_punt_config is not None:
cls.vpp_cmdline.extend(cls.extra_vpp_punt_config)
@@ -435,17 +508,23 @@ class VppTestCase(CPUInterface, unittest.TestCase):
print(single_line_delim)
print("You can debug VPP using:")
if cls.debug_gdbserver:
- print(f"sudo gdb {config.vpp} "
- f"-ex 'target remote localhost:{cls.gdbserver_port}'")
- print("Now is the time to attach gdb by running the above "
- "command, set up breakpoints etc., then resume VPP from "
- "within gdb by issuing the 'continue' command")
+ print(
+ f"sudo gdb {config.vpp} "
+ f"-ex 'target remote localhost:{cls.gdbserver_port}'"
+ )
+ print(
+ "Now is the time to attach gdb by running the above "
+ "command, set up breakpoints etc., then resume VPP from "
+ "within gdb by issuing the 'continue' command"
+ )
cls.gdbserver_port += 1
elif cls.debug_gdb:
print(f"sudo gdb {config.vpp} -ex 'attach {cls.vpp.pid}'")
- print("Now is the time to attach gdb by running the above "
- "command and set up breakpoints etc., then resume VPP from"
- " within gdb by issuing the 'continue' command")
+ print(
+ "Now is the time to attach gdb by running the above "
+ "command and set up breakpoints etc., then resume VPP from"
+ " within gdb by issuing the 'continue' command"
+ )
print(single_line_delim)
input("Press ENTER to continue running the testcase...")
@@ -459,31 +538,35 @@ class VppTestCase(CPUInterface, unittest.TestCase):
cmdline = cls.vpp_cmdline
if cls.debug_gdbserver:
- gdbserver = '/usr/bin/gdbserver'
- if not os.path.isfile(gdbserver) or\
- not os.access(gdbserver, os.X_OK):
- raise Exception("gdbserver binary '%s' does not exist or is "
- "not executable" % gdbserver)
-
- cmdline = [gdbserver, 'localhost:{port}'
- .format(port=cls.gdbserver_port)] + cls.vpp_cmdline
+ gdbserver = "/usr/bin/gdbserver"
+ if not os.path.isfile(gdbserver) or not os.access(gdbserver, os.X_OK):
+ raise Exception(
+ "gdbserver binary '%s' does not exist or is "
+ "not executable" % gdbserver
+ )
+
+ cmdline = [
+ gdbserver,
+ "localhost:{port}".format(port=cls.gdbserver_port),
+ ] + cls.vpp_cmdline
cls.logger.info("Gdbserver cmdline is %s", " ".join(cmdline))
try:
- cls.vpp = subprocess.Popen(cmdline,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
+ cls.vpp = subprocess.Popen(
+ cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE
+ )
except subprocess.CalledProcessError as e:
- cls.logger.critical("Subprocess returned with non-0 return code: ("
- "%s)", e.returncode)
+ cls.logger.critical(
+ "Subprocess returned with non-0 return code: (%s)", e.returncode
+ )
raise
except OSError as e:
- cls.logger.critical("Subprocess returned with OS error: "
- "(%s) %s", e.errno, e.strerror)
+ cls.logger.critical(
+ "Subprocess returned with OS error: (%s) %s", e.errno, e.strerror
+ )
raise
except Exception as e:
- cls.logger.exception("Subprocess returned unexpected from "
- "%s:", cmdline)
+ cls.logger.exception("Subprocess returned unexpected from %s:", cmdline)
raise
cls.wait_for_enter()
@@ -504,11 +587,11 @@ class VppTestCase(CPUInterface, unittest.TestCase):
ok = True
break
if not ok:
- cls.logger.error("Timed out waiting for coredump to complete:"
- " %s", corefile)
+ cls.logger.error(
+ "Timed out waiting for coredump to complete: %s", corefile
+ )
else:
- cls.logger.error("Coredump complete: %s, size %d",
- corefile, curr_size)
+ cls.logger.error("Coredump complete: %s, size %d", corefile, curr_size)
@classmethod
def get_stats_sock_path(cls):
@@ -554,21 +637,24 @@ class VppTestCase(CPUInterface, unittest.TestCase):
super(VppTestCase, cls).setUpClass()
cls.logger = get_logger(cls.__name__)
random.seed(config.rnd_seed)
- if hasattr(cls, 'parallel_handler'):
+ if hasattr(cls, "parallel_handler"):
cls.logger.addHandler(cls.parallel_handler)
cls.logger.propagate = False
cls.set_debug_flags(config.debug)
cls.tempdir = cls.get_tempdir()
cls.create_file_handler()
cls.file_handler.setFormatter(
- Formatter(fmt='%(asctime)s,%(msecs)03d %(message)s',
- datefmt="%H:%M:%S"))
+ Formatter(fmt="%(asctime)s,%(msecs)03d %(message)s", datefmt="%H:%M:%S")
+ )
cls.file_handler.setLevel(DEBUG)
cls.logger.addHandler(cls.file_handler)
cls.logger.debug("--- setUpClass() for %s called ---" % cls.__name__)
os.chdir(cls.tempdir)
- cls.logger.info("Temporary dir is %s, api socket is %s",
- cls.tempdir, cls.get_api_sock_path())
+ cls.logger.info(
+ "Temporary dir is %s, api socket is %s",
+ cls.tempdir,
+ cls.get_api_sock_path(),
+ )
cls.logger.debug("Random seed is %s", config.rnd_seed)
cls.setUpConstants()
cls.reset_packet_infos()
@@ -586,9 +672,10 @@ class VppTestCase(CPUInterface, unittest.TestCase):
cls.attach_vpp()
else:
cls.run_vpp()
- cls.reporter.send_keep_alive(cls, 'setUpClass')
+ cls.reporter.send_keep_alive(cls, "setUpClass")
VppTestResult.current_test_case_info = TestCaseInfo(
- cls.logger, cls.tempdir, cls.vpp.pid, config.vpp)
+ cls.logger, cls.tempdir, cls.vpp.pid, config.vpp
+ )
cls.vpp_stdout_deque = deque()
cls.vpp_stderr_deque = deque()
if not cls.debug_attach:
@@ -599,8 +686,7 @@ class VppTestCase(CPUInterface, unittest.TestCase):
cls.pump_thread.start()
if cls.debug_gdb or cls.debug_gdbserver or cls.debug_attach:
cls.vapi_response_timeout = 0
- cls.vapi = VppPapiProvider(cls.__name__, cls,
- cls.vapi_response_timeout)
+ cls.vapi = VppPapiProvider(cls.__name__, cls, cls.vapi_response_timeout)
if cls.step:
hook = hookmodule.StepHook(cls)
else:
@@ -613,7 +699,8 @@ class VppTestCase(CPUInterface, unittest.TestCase):
cls.vpp_startup_failed = True
cls.logger.critical(
"VPP died shortly after startup, check the"
- " output to standard error for possible cause")
+ " output to standard error for possible cause"
+ )
raise
try:
cls.vapi.connect()
@@ -622,9 +709,14 @@ class VppTestCase(CPUInterface, unittest.TestCase):
cls.vapi.disconnect()
if cls.debug_gdbserver:
- print(colorize("You're running VPP inside gdbserver but "
- "VPP-API connection failed, did you forget "
- "to 'continue' VPP from within gdb?", RED))
+ print(
+ colorize(
+ "You're running VPP inside gdbserver but "
+ "VPP-API connection failed, did you forget "
+ "to 'continue' VPP from within gdb?",
+ RED,
+ )
+ )
raise e
if cls.debug_attach:
last_line = cls.vapi.cli("show thread").split("\n")[-2]
@@ -641,7 +733,7 @@ class VppTestCase(CPUInterface, unittest.TestCase):
@classmethod
def _debug_quit(cls):
- if (cls.debug_gdbserver or cls.debug_gdb):
+ if cls.debug_gdbserver or cls.debug_gdb:
try:
cls.vpp.poll()
@@ -650,8 +742,10 @@ class VppTestCase(CPUInterface, unittest.TestCase):
print(double_line_delim)
print("VPP or GDB server is still running")
print(single_line_delim)
- input("When done debugging, press ENTER to kill the "
- "process and finish running the testcase...")
+ input(
+ "When done debugging, press ENTER to kill the "
+ "process and finish running the testcase..."
+ )
except AttributeError:
pass
@@ -663,25 +757,23 @@ class VppTestCase(CPUInterface, unittest.TestCase):
cls._debug_quit()
# first signal that we want to stop the pump thread, then wake it up
- if hasattr(cls, 'pump_thread_stop_flag'):
+ if hasattr(cls, "pump_thread_stop_flag"):
cls.pump_thread_stop_flag.set()
- if hasattr(cls, 'pump_thread_wakeup_pipe'):
- os.write(cls.pump_thread_wakeup_pipe[1], b'ding dong wake up')
- if hasattr(cls, 'pump_thread'):
+ if hasattr(cls, "pump_thread_wakeup_pipe"):
+ os.write(cls.pump_thread_wakeup_pipe[1], b"ding dong wake up")
+ if hasattr(cls, "pump_thread"):
cls.logger.debug("Waiting for pump thread to stop")
cls.pump_thread.join()
- if hasattr(cls, 'vpp_stderr_reader_thread'):
+ if hasattr(cls, "vpp_stderr_reader_thread"):
cls.logger.debug("Waiting for stderr pump to stop")
cls.vpp_stderr_reader_thread.join()
- if hasattr(cls, 'vpp'):
- if hasattr(cls, 'vapi'):
+ if hasattr(cls, "vpp"):
+ if hasattr(cls, "vapi"):
cls.logger.debug(cls.vapi.vpp.get_stats())
- cls.logger.debug("Disconnecting class vapi client on %s",
- cls.__name__)
+ cls.logger.debug("Disconnecting class vapi client on %s", cls.__name__)
cls.vapi.disconnect()
- cls.logger.debug("Deleting class vapi attribute on %s",
- cls.__name__)
+ cls.logger.debug("Deleting class vapi attribute on %s", cls.__name__)
del cls.vapi
cls.vpp.poll()
if not cls.debug_attach and cls.vpp.returncode is None:
@@ -694,8 +786,7 @@ class VppTestCase(CPUInterface, unittest.TestCase):
except subprocess.TimeoutExpired:
cls.vpp.kill()
outs, errs = cls.vpp.communicate()
- cls.logger.debug("Deleting class vpp attribute on %s",
- cls.__name__)
+ cls.logger.debug("Deleting class vpp attribute on %s", cls.__name__)
if not cls.debug_attach:
cls.vpp.stdout.close()
cls.vpp.stderr.close()
@@ -708,32 +799,31 @@ class VppTestCase(CPUInterface, unittest.TestCase):
stdout_log = cls.logger.info
stderr_log = cls.logger.info
- if hasattr(cls, 'vpp_stdout_deque'):
+ if hasattr(cls, "vpp_stdout_deque"):
stdout_log(single_line_delim)
- stdout_log('VPP output to stdout while running %s:', cls.__name__)
+ stdout_log("VPP output to stdout while running %s:", cls.__name__)
stdout_log(single_line_delim)
vpp_output = "".join(cls.vpp_stdout_deque)
- with open(cls.tempdir + '/vpp_stdout.txt', 'w') as f:
+ with open(cls.tempdir + "/vpp_stdout.txt", "w") as f:
f.write(vpp_output)
- stdout_log('\n%s', vpp_output)
+ stdout_log("\n%s", vpp_output)
stdout_log(single_line_delim)
- if hasattr(cls, 'vpp_stderr_deque'):
+ if hasattr(cls, "vpp_stderr_deque"):
stderr_log(single_line_delim)
- stderr_log('VPP output to stderr while running %s:', cls.__name__)
+ stderr_log("VPP output to stderr while running %s:", cls.__name__)
stderr_log(single_line_delim)
vpp_output = "".join(cls.vpp_stderr_deque)
- with open(cls.tempdir + '/vpp_stderr.txt', 'w') as f:
+ with open(cls.tempdir + "/vpp_stderr.txt", "w") as f:
f.write(vpp_output)
- stderr_log('\n%s', vpp_output)
+ stderr_log("\n%s", vpp_output)
stderr_log(single_line_delim)
@classmethod
def tearDownClass(cls):
- """ Perform final cleanup after running all tests in this test-case """
- cls.logger.debug("--- tearDownClass() for %s called ---" %
- cls.__name__)
- cls.reporter.send_keep_alive(cls, 'tearDownClass')
+ """Perform final cleanup after running all tests in this test-case"""
+ cls.logger.debug("--- tearDownClass() for %s called ---" % cls.__name__)
+ cls.reporter.send_keep_alive(cls, "tearDownClass")
cls.quit()
cls.file_handler.close()
cls.reset_packet_infos()
@@ -741,14 +831,15 @@ class VppTestCase(CPUInterface, unittest.TestCase):
debug_internal.on_tear_down_class(cls)
def show_commands_at_teardown(self):
- """ Allow subclass specific teardown logging additions."""
+ """Allow subclass specific teardown logging additions."""
self.logger.info("--- No test specific show commands provided. ---")
def tearDown(self):
- """ Show various debug prints after each test """
- self.logger.debug("--- tearDown() for %s.%s(%s) called ---" %
- (self.__class__.__name__, self._testMethodName,
- self._testMethodDoc))
+ """Show various debug prints after each test"""
+ self.logger.debug(
+ "--- tearDown() for %s.%s(%s) called ---"
+ % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
+ )
try:
if not self.vpp_dead:
@@ -769,32 +860,35 @@ class VppTestCase(CPUInterface, unittest.TestCase):
tmp_api_trace = "/tmp/%s" % api_trace
vpp_api_trace_log = "%s/%s" % (self.tempdir, api_trace)
self.logger.info(self.vapi.ppcli("api trace save %s" % api_trace))
- self.logger.info("Moving %s to %s\n" % (tmp_api_trace,
- vpp_api_trace_log))
+ self.logger.info("Moving %s to %s\n" % (tmp_api_trace, vpp_api_trace_log))
os.rename(tmp_api_trace, vpp_api_trace_log)
except VppTransportSocketIOError:
- self.logger.debug("VppTransportSocketIOError: Vpp dead. "
- "Cannot log show commands.")
+ self.logger.debug(
+ "VppTransportSocketIOError: Vpp dead. Cannot log show commands."
+ )
self.vpp_dead = True
else:
self.registry.unregister_all(self.logger)
def setUp(self):
- """ Clear trace before running each test"""
+ """Clear trace before running each test"""
super(VppTestCase, self).setUp()
self.reporter.send_keep_alive(self)
if self.vpp_dead:
- raise VppDiedError(rv=None, testcase=self.__class__.__name__,
- method_name=self._testMethodName)
- self.sleep(.1, "during setUp")
+ raise VppDiedError(
+ rv=None,
+ testcase=self.__class__.__name__,
+ method_name=self._testMethodName,
+ )
+ self.sleep(0.1, "during setUp")
self.vpp_stdout_deque.append(
- "--- test setUp() for %s.%s(%s) starts here ---\n" %
- (self.__class__.__name__, self._testMethodName,
- self._testMethodDoc))
+ "--- test setUp() for %s.%s(%s) starts here ---\n"
+ % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
+ )
self.vpp_stderr_deque.append(
- "--- test setUp() for %s.%s(%s) starts here ---\n" %
- (self.__class__.__name__, self._testMethodName,
- self._testMethodDoc))
+ "--- test setUp() for %s.%s(%s) starts here ---\n"
+ % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
+ )
self.vapi.cli("clear trace")
# store the test instance inside the test class - so that objects
# holding the class can access instance methods (like assertEqual)
@@ -816,7 +910,7 @@ class VppTestCase(CPUInterface, unittest.TestCase):
@classmethod
def register_pcap(cls, intf, worker):
- """ Register a pcap in the testclass """
+ """Register a pcap in the testclass"""
# add to the list of captures with current timestamp
cls._pcaps.append((intf, worker))
@@ -824,14 +918,14 @@ class VppTestCase(CPUInterface, unittest.TestCase):
def get_vpp_time(cls):
# processes e.g. "Time now 2.190522, Wed, 11 Mar 2020 17:29:54 GMT"
# returns float("2.190522")
- timestr = cls.vapi.cli('show clock')
- head, sep, tail = timestr.partition(',')
- head, sep, tail = head.partition('Time now')
+ timestr = cls.vapi.cli("show clock")
+ head, sep, tail = timestr.partition(",")
+ head, sep, tail = head.partition("Time now")
return float(tail)
@classmethod
def sleep_on_vpp_time(cls, sec):
- """ Sleep according to time in VPP world """
+ """Sleep according to time in VPP world"""
# On a busy system with many processes
# we might end up with VPP time being slower than real world
# So take that into account when waiting for VPP to do something
@@ -841,34 +935,31 @@ class VppTestCase(CPUInterface, unittest.TestCase):
@classmethod
def pg_start(cls, trace=True):
- """ Enable the PG, wait till it is done, then clean up """
+ """Enable the PG, wait till it is done, then clean up"""
for (intf, worker) in cls._old_pcaps:
- intf.handle_old_pcap_file(intf.get_in_path(worker),
- intf.in_history_counter)
+ intf.handle_old_pcap_file(intf.get_in_path(worker), intf.in_history_counter)
cls._old_pcaps = []
if trace:
cls.vapi.cli("clear trace")
cls.vapi.cli("trace add pg-input 1000")
- cls.vapi.cli('packet-generator enable')
+ cls.vapi.cli("packet-generator enable")
# PG, when starts, runs to completion -
# so let's avoid a race condition,
# and wait a little till it's done.
# Then clean it up - and then be gone.
deadline = time.time() + 300
- while cls.vapi.cli('show packet-generator').find("Yes") != -1:
+ while cls.vapi.cli("show packet-generator").find("Yes") != -1:
cls.sleep(0.01) # yield
if time.time() > deadline:
cls.logger.error("Timeout waiting for pg to stop")
break
for intf, worker in cls._pcaps:
- cls.vapi.cli('packet-generator delete %s' %
- intf.get_cap_name(worker))
+ cls.vapi.cli("packet-generator delete %s" % intf.get_cap_name(worker))
cls._old_pcaps = cls._pcaps
cls._pcaps = []
@classmethod
- def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0,
- mode=None):
+ def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0, mode=None):
"""
Create packet-generator interfaces.
@@ -887,26 +978,30 @@ class VppTestCase(CPUInterface, unittest.TestCase):
@classmethod
def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
pgmode = VppEnum.vl_api_pg_interface_mode_t
- return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
- pgmode.PG_API_MODE_IP4)
+ return cls.create_pg_interfaces_internal(
+ interfaces, gso, gso_size, pgmode.PG_API_MODE_IP4
+ )
@classmethod
def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
pgmode = VppEnum.vl_api_pg_interface_mode_t
- return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
- pgmode.PG_API_MODE_IP6)
+ return cls.create_pg_interfaces_internal(
+ interfaces, gso, gso_size, pgmode.PG_API_MODE_IP6
+ )
@classmethod
def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
pgmode = VppEnum.vl_api_pg_interface_mode_t
- return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
- pgmode.PG_API_MODE_ETHERNET)
+ return cls.create_pg_interfaces_internal(
+ interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
+ )
@classmethod
def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
pgmode = VppEnum.vl_api_pg_interface_mode_t
- return cls.create_pg_interfaces_internal(interfaces, gso, gso_size,
- pgmode.PG_API_MODE_ETHERNET)
+ return cls.create_pg_interfaces_internal(
+ interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
+ )
@classmethod
def create_loopback_interfaces(cls, count):
@@ -937,7 +1032,7 @@ class VppTestCase(CPUInterface, unittest.TestCase):
return result
@staticmethod
- def extend_packet(packet, size, padding=' '):
+ def extend_packet(packet, size, padding=" "):
"""
Extend packet to given size by padding with spaces or custom padding
NOTE: Currently works only when Raw layer is present.
@@ -955,7 +1050,7 @@ class VppTestCase(CPUInterface, unittest.TestCase):
@classmethod
def reset_packet_infos(cls):
- """ Reset the list of packet info objects and packet counts to zero """
+ """Reset the list of packet info objects and packet counts to zero"""
cls._packet_infos = {}
cls._packet_count_for_dst_if_idx = {}
@@ -997,11 +1092,10 @@ class VppTestCase(CPUInterface, unittest.TestCase):
"""
# retrieve payload, currently 18 bytes (4 x ints + 1 short)
- return pack('iiiih', info.index, info.src,
- info.dst, info.ip, info.proto)
+ return pack("iiiih", info.index, info.src, info.dst, info.ip, info.proto)
@staticmethod
- def payload_to_info(payload, payload_field='load'):
+ def payload_to_info(payload, payload_field="load"):
"""
Convert packet payload to _PacketInfo object
@@ -1018,12 +1112,11 @@ class VppTestCase(CPUInterface, unittest.TestCase):
payload_b = getattr(payload, payload_field)[:18]
info = _PacketInfo()
- info.index, info.src, info.dst, info.ip, info.proto \
- = unpack('iiiih', payload_b)
+ info.index, info.src, info.dst, info.ip, info.proto = unpack("iiiih", payload_b)
# some SRv6 TCs depend on get an exception if bad values are detected
if info.index > 0x4000:
- raise ValueError('Index value is invalid')
+ raise ValueError("Index value is invalid")
return info
@@ -1086,32 +1179,38 @@ class VppTestCase(CPUInterface, unittest.TestCase):
return
try:
msg = "Invalid %s: %d('%s') does not match expected value %d('%s')"
- msg = msg % (getdoc(name_or_class).strip(),
- real_value, str(name_or_class(real_value)),
- expected_value, str(name_or_class(expected_value)))
+ msg = msg % (
+ getdoc(name_or_class).strip(),
+ real_value,
+ str(name_or_class(real_value)),
+ expected_value,
+ str(name_or_class(expected_value)),
+ )
except Exception:
msg = "Invalid %s: %s does not match expected value %s" % (
- name_or_class, real_value, expected_value)
+ name_or_class,
+ real_value,
+ expected_value,
+ )
self.assertEqual(real_value, expected_value, msg)
- def assert_in_range(self,
- real_value,
- expected_min,
- expected_max,
- name=None):
+ def assert_in_range(self, real_value, expected_min, expected_max, name=None):
if name is None:
msg = None
else:
msg = "Invalid %s: %s out of range <%s,%s>" % (
- name, real_value, expected_min, expected_max)
+ name,
+ real_value,
+ expected_min,
+ expected_max,
+ )
self.assertTrue(expected_min <= real_value <= expected_max, msg)
- def assert_packet_checksums_valid(self, packet,
- ignore_zero_udp_checksums=True):
+ def assert_packet_checksums_valid(self, packet, ignore_zero_udp_checksums=True):
received = packet.__class__(scapy.compat.raw(packet))
- udp_layers = ['UDP', 'UDPerror']
- checksum_fields = ['cksum', 'chksum']
+ udp_layers = ["UDP", "UDPerror"]
+ checksum_fields = ["cksum", "chksum"]
checksums = []
counter = 0
temp = received.__class__(scapy.compat.raw(received))
@@ -1122,9 +1221,11 @@ class VppTestCase(CPUInterface, unittest.TestCase):
layer.remove_payload()
for cf in checksum_fields:
if hasattr(layer, cf):
- if ignore_zero_udp_checksums and \
- 0 == getattr(layer, cf) and \
- layer.name in udp_layers:
+ if (
+ ignore_zero_udp_checksums
+ and 0 == getattr(layer, cf)
+ and layer.name in udp_layers
+ ):
continue
delattr(temp.getlayer(counter), cf)
checksums.append((counter, cf))
@@ -1137,71 +1238,76 @@ class VppTestCase(CPUInterface, unittest.TestCase):
for layer, cf in checksums:
calc_sum = getattr(temp[layer], cf)
self.assert_equal(
- getattr(received[layer], cf), calc_sum,
- "packet checksum on layer #%d: %s" % (layer, temp[layer].name))
+ getattr(received[layer], cf),
+ calc_sum,
+ "packet checksum on layer #%d: %s" % (layer, temp[layer].name),
+ )
self.logger.debug(
- "Checksum field `%s` on `%s` layer has correct value `%s`" %
- (cf, temp[layer].name, calc_sum))
-
- def assert_checksum_valid(self, received_packet, layer,
- field_name='chksum',
- ignore_zero_checksum=False):
- """ Check checksum of received packet on given layer """
+ "Checksum field `%s` on `%s` layer has correct value `%s`"
+ % (cf, temp[layer].name, calc_sum)
+ )
+
+ def assert_checksum_valid(
+ self, received_packet, layer, field_name="chksum", ignore_zero_checksum=False
+ ):
+ """Check checksum of received packet on given layer"""
received_packet_checksum = getattr(received_packet[layer], field_name)
if ignore_zero_checksum and 0 == received_packet_checksum:
return
- recalculated = received_packet.__class__(
- scapy.compat.raw(received_packet))
+ recalculated = received_packet.__class__(scapy.compat.raw(received_packet))
delattr(recalculated[layer], field_name)
recalculated = recalculated.__class__(scapy.compat.raw(recalculated))
- self.assert_equal(received_packet_checksum,
- getattr(recalculated[layer], field_name),
- "packet checksum on layer: %s" % layer)
-
- def assert_ip_checksum_valid(self, received_packet,
- ignore_zero_checksum=False):
- self.assert_checksum_valid(received_packet, 'IP',
- ignore_zero_checksum=ignore_zero_checksum)
-
- def assert_tcp_checksum_valid(self, received_packet,
- ignore_zero_checksum=False):
- self.assert_checksum_valid(received_packet, 'TCP',
- ignore_zero_checksum=ignore_zero_checksum)
-
- def assert_udp_checksum_valid(self, received_packet,
- ignore_zero_checksum=True):
- self.assert_checksum_valid(received_packet, 'UDP',
- ignore_zero_checksum=ignore_zero_checksum)
+ self.assert_equal(
+ received_packet_checksum,
+ getattr(recalculated[layer], field_name),
+ "packet checksum on layer: %s" % layer,
+ )
+
+ def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False):
+ self.assert_checksum_valid(
+ received_packet, "IP", ignore_zero_checksum=ignore_zero_checksum
+ )
+
+ def assert_tcp_checksum_valid(self, received_packet, ignore_zero_checksum=False):
+ self.assert_checksum_valid(
+ received_packet, "TCP", ignore_zero_checksum=ignore_zero_checksum
+ )
+
+ def assert_udp_checksum_valid(self, received_packet, ignore_zero_checksum=True):
+ self.assert_checksum_valid(
+ received_packet, "UDP", ignore_zero_checksum=ignore_zero_checksum
+ )
def assert_embedded_icmp_checksum_valid(self, received_packet):
if received_packet.haslayer(IPerror):
- self.assert_checksum_valid(received_packet, 'IPerror')
+ self.assert_checksum_valid(received_packet, "IPerror")
if received_packet.haslayer(TCPerror):
- self.assert_checksum_valid(received_packet, 'TCPerror')
+ self.assert_checksum_valid(received_packet, "TCPerror")
if received_packet.haslayer(UDPerror):
- self.assert_checksum_valid(received_packet, 'UDPerror',
- ignore_zero_checksum=True)
+ self.assert_checksum_valid(
+ received_packet, "UDPerror", ignore_zero_checksum=True
+ )
if received_packet.haslayer(ICMPerror):
- self.assert_checksum_valid(received_packet, 'ICMPerror')
+ self.assert_checksum_valid(received_packet, "ICMPerror")
def assert_icmp_checksum_valid(self, received_packet):
- self.assert_checksum_valid(received_packet, 'ICMP')
+ self.assert_checksum_valid(received_packet, "ICMP")
self.assert_embedded_icmp_checksum_valid(received_packet)
def assert_icmpv6_checksum_valid(self, pkt):
if pkt.haslayer(ICMPv6DestUnreach):
- self.assert_checksum_valid(pkt, 'ICMPv6DestUnreach', 'cksum')
+ self.assert_checksum_valid(pkt, "ICMPv6DestUnreach", "cksum")
self.assert_embedded_icmp_checksum_valid(pkt)
if pkt.haslayer(ICMPv6EchoRequest):
- self.assert_checksum_valid(pkt, 'ICMPv6EchoRequest', 'cksum')
+ self.assert_checksum_valid(pkt, "ICMPv6EchoRequest", "cksum")
if pkt.haslayer(ICMPv6EchoReply):
- self.assert_checksum_valid(pkt, 'ICMPv6EchoReply', 'cksum')
+ self.assert_checksum_valid(pkt, "ICMPv6EchoReply", "cksum")
def get_counter(self, counter):
if counter.startswith("/"):
counter_value = self.statistics.get_counter(counter)
else:
- counters = self.vapi.cli("sh errors").split('\n')
+ counters = self.vapi.cli("sh errors").split("\n")
counter_value = 0
for i in range(1, len(counters) - 1):
results = counters[i].split()
@@ -1210,8 +1316,7 @@ class VppTestCase(CPUInterface, unittest.TestCase):
break
return counter_value
- def assert_counter_equal(self, counter, expected_value,
- thread=None, index=0):
+ def assert_counter_equal(self, counter, expected_value, thread=None, index=0):
c = self.get_counter(counter)
if thread is not None:
c = c[thread][index]
@@ -1221,13 +1326,13 @@ class VppTestCase(CPUInterface, unittest.TestCase):
def assert_packet_counter_equal(self, counter, expected_value):
counter_value = self.get_counter(counter)
- self.assert_equal(counter_value, expected_value,
- "packet counter `%s'" % counter)
+ self.assert_equal(
+ counter_value, expected_value, "packet counter `%s'" % counter
+ )
def assert_error_counter_equal(self, counter, expected_value):
counter_value = self.statistics[counter].sum()
- self.assert_equal(counter_value, expected_value,
- "error counter `%s'" % counter)
+ self.assert_equal(counter_value, expected_value, "error counter `%s'" % counter)
@classmethod
def sleep(cls, timeout, remark=None):
@@ -1238,7 +1343,7 @@ class VppTestCase(CPUInterface, unittest.TestCase):
# https://github.com/python/cpython/blob/6673decfa0fb078f60587f5cb5e98460eea137c2/Modules/timemodule.c#L1892 # noqa
if timeout == 0:
# yield quantum
- if hasattr(os, 'sched_yield'):
+ if hasattr(os, "sched_yield"):
os.sched_yield()
else:
time.sleep(0)
@@ -1249,13 +1354,18 @@ class VppTestCase(CPUInterface, unittest.TestCase):
time.sleep(timeout)
after = time.time()
if after - before > 2 * timeout:
- cls.logger.error("unexpected self.sleep() result - "
- "slept for %es instead of ~%es!",
- after - before, timeout)
+ cls.logger.error(
+ "unexpected self.sleep() result - slept for %es instead of ~%es!",
+ after - before,
+ timeout,
+ )
cls.logger.debug(
"Finished sleep (%s) - slept %es (wanted %es)",
- remark, after - before, timeout)
+ remark,
+ after - before,
+ timeout,
+ )
def virtual_sleep(self, timeout, remark=None):
self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
@@ -1285,7 +1395,8 @@ class VppTestCase(CPUInterface, unittest.TestCase):
stats_snapshot[cntr].sum() + diff,
f"'{cntr}' counter value (previous value: "
f"{stats_snapshot[cntr].sum()}, "
- f"expected diff: {diff})")
+ f"expected diff: {diff})",
+ )
else:
try:
self.assert_equal(
@@ -1293,7 +1404,8 @@ class VppTestCase(CPUInterface, unittest.TestCase):
stats_snapshot[cntr][:, sw_if_index].sum() + diff,
f"'{cntr}' counter value (previous value: "
f"{stats_snapshot[cntr][:, sw_if_index].sum()}, "
- f"expected diff: {diff})")
+ f"expected diff: {diff})",
+ )
except IndexError:
# if diff is 0, then this most probably a case where
# test declares multiple interfaces but traffic hasn't
@@ -1302,8 +1414,9 @@ class VppTestCase(CPUInterface, unittest.TestCase):
if 0 != diff:
raise
- def send_and_assert_no_replies(self, intf, pkts, remark="", timeout=None,
- stats_diff=None, trace=True, msg=None):
+ def send_and_assert_no_replies(
+ self, intf, pkts, remark="", timeout=None, stats_diff=None, trace=True, msg=None
+ ):
if stats_diff:
stats_snapshot = self.snapshot_stats(stats_diff)
@@ -1324,8 +1437,17 @@ class VppTestCase(CPUInterface, unittest.TestCase):
if stats_diff:
self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
- def send_and_expect(self, intf, pkts, output, n_rx=None, worker=None,
- trace=True, msg=None, stats_diff=None):
+ def send_and_expect(
+ self,
+ intf,
+ pkts,
+ output,
+ n_rx=None,
+ worker=None,
+ trace=True,
+ msg=None,
+ stats_diff=None,
+ ):
if stats_diff:
stats_snapshot = self.snapshot_stats(stats_diff)
@@ -1343,8 +1465,9 @@ class VppTestCase(CPUInterface, unittest.TestCase):
return rx
- def send_and_expect_load_balancing(self, input, pkts, outputs,
- worker=None, trace=True):
+ def send_and_expect_load_balancing(
+ self, input, pkts, outputs, worker=None, trace=True
+ ):
self.pg_send(input, pkts, worker=worker, trace=trace)
rxs = []
for oo in outputs:
@@ -1355,9 +1478,7 @@ class VppTestCase(CPUInterface, unittest.TestCase):
self.logger.debug(self.vapi.cli("show trace"))
return rxs
- def send_and_expect_some(self, intf, pkts, output,
- worker=None,
- trace=True):
+ def send_and_expect_some(self, intf, pkts, output, worker=None, trace=True):
self.pg_send(intf, pkts, worker=worker, trace=trace)
rx = output._get_capture(1)
if trace:
@@ -1366,8 +1487,7 @@ class VppTestCase(CPUInterface, unittest.TestCase):
self.assertTrue(len(rx) < len(pkts))
return rx
- def send_and_expect_only(self, intf, pkts, output, timeout=None,
- stats_diff=None):
+ def send_and_expect_only(self, intf, pkts, output, timeout=None, stats_diff=None):
if stats_diff:
stats_snapshot = self.snapshot_stats(stats_diff)
@@ -1427,8 +1547,7 @@ class VppTestResult(unittest.TestResult):
core_crash_test_cases_info = set()
current_test_case_info = None
- def __init__(self, stream=None, descriptions=None, verbosity=None,
- runner=None):
+ def __init__(self, stream=None, descriptions=None, verbosity=None, runner=None):
"""
:param stream File descriptor to store where to report test results.
Set to the standard error stream by default.
@@ -1453,9 +1572,9 @@ class VppTestResult(unittest.TestResult):
"""
if self.current_test_case_info:
self.current_test_case_info.logger.debug(
- "--- addSuccess() %s.%s(%s) called" % (test.__class__.__name__,
- test._testMethodName,
- test._testMethodDoc))
+ "--- addSuccess() %s.%s(%s) called"
+ % (test.__class__.__name__, test._testMethodName, test._testMethodDoc)
+ )
unittest.TestResult.addSuccess(self, test)
self.result_string = colorize("OK", GREEN)
@@ -1471,9 +1590,14 @@ class VppTestResult(unittest.TestResult):
"""
if self.current_test_case_info:
self.current_test_case_info.logger.debug(
- "--- addSkip() %s.%s(%s) called, reason is %s" %
- (test.__class__.__name__, test._testMethodName,
- test._testMethodDoc, reason))
+ "--- addSkip() %s.%s(%s) called, reason is %s"
+ % (
+ test.__class__.__name__,
+ test._testMethodName,
+ test._testMethodDoc,
+ reason,
+ )
+ )
unittest.TestResult.addSkip(self, test, reason)
self.result_string = colorize("SKIP", YELLOW)
@@ -1488,17 +1612,18 @@ class VppTestResult(unittest.TestResult):
failed_dir = config.failed_dir
link_path = os.path.join(
failed_dir,
- '%s-FAILED' %
- os.path.basename(self.current_test_case_info.tempdir))
+ "%s-FAILED" % os.path.basename(self.current_test_case_info.tempdir),
+ )
self.current_test_case_info.logger.debug(
- "creating a link to the failed test")
+ "creating a link to the failed test"
+ )
self.current_test_case_info.logger.debug(
- "os.symlink(%s, %s)" %
- (self.current_test_case_info.tempdir, link_path))
+ "os.symlink(%s, %s)"
+ % (self.current_test_case_info.tempdir, link_path)
+ )
if os.path.exists(link_path):
- self.current_test_case_info.logger.debug(
- 'symlink already exists')
+ self.current_test_case_info.logger.debug("symlink already exists")
else:
os.symlink(self.current_test_case_info.tempdir, link_path)
@@ -1506,7 +1631,7 @@ class VppTestResult(unittest.TestResult):
self.current_test_case_info.logger.error(e)
def send_result_through_pipe(self, test, result):
- if hasattr(self, 'test_framework_result_pipe'):
+ if hasattr(self, "test_framework_result_pipe"):
pipe = self.test_framework_result_pipe
if pipe:
pipe.send((test.id(), result))
@@ -1516,32 +1641,37 @@ class VppTestResult(unittest.TestResult):
if isinstance(test, unittest.suite._ErrorHolder):
test_name = test.description
else:
- test_name = '%s.%s(%s)' % (test.__class__.__name__,
- test._testMethodName,
- test._testMethodDoc)
+ test_name = "%s.%s(%s)" % (
+ test.__class__.__name__,
+ test._testMethodName,
+ test._testMethodDoc,
+ )
self.current_test_case_info.logger.debug(
- "--- %s() %s called, err is %s" %
- (fn_name, test_name, err))
+ "--- %s() %s called, err is %s" % (fn_name, test_name, err)
+ )
self.current_test_case_info.logger.debug(
- "formatted exception is:\n%s" %
- "".join(format_exception(*err)))
+ "formatted exception is:\n%s" % "".join(format_exception(*err))
+ )
def add_error(self, test, err, unittest_fn, error_type):
if error_type == FAIL:
- self.log_error(test, err, 'addFailure')
+ self.log_error(test, err, "addFailure")
error_type_str = colorize("FAIL", RED)
elif error_type == ERROR:
- self.log_error(test, err, 'addError')
+ self.log_error(test, err, "addError")
error_type_str = colorize("ERROR", RED)
else:
- raise Exception('Error type %s cannot be used to record an '
- 'error or a failure' % error_type)
+ raise Exception(
+ "Error type %s cannot be used to record an "
+ "error or a failure" % error_type
+ )
unittest_fn(self, test, err)
if self.current_test_case_info:
- self.result_string = "%s [ temp dir used by test case: %s ]" % \
- (error_type_str,
- self.current_test_case_info.tempdir)
+ self.result_string = "%s [ temp dir used by test case: %s ]" % (
+ error_type_str,
+ self.current_test_case_info.tempdir,
+ )
self.symlink_failed()
self.failed_test_cases_info.add(self.current_test_case_info)
if is_core_present(self.current_test_case_info.tempdir):
@@ -1550,12 +1680,12 @@ class VppTestResult(unittest.TestResult):
test_name = str(test)
else:
test_name = "'{!s}' ({!s})".format(
- get_testcase_doc_name(test), test.id())
+ get_testcase_doc_name(test), test.id()
+ )
self.current_test_case_info.core_crash_test = test_name
- self.core_crash_test_cases_info.add(
- self.current_test_case_info)
+ self.core_crash_test_cases_info.add(self.current_test_case_info)
else:
- self.result_string = '%s [no temp dir]' % error_type_str
+ self.result_string = "%s [no temp dir]" % error_type_str
self.send_result_through_pipe(test, error_type)
@@ -1613,15 +1743,13 @@ class VppTestResult(unittest.TestResult):
# This block may overwrite the colorized title above,
# but we want this to stand out and be fixed
if test.has_tag(TestCaseTag.FIXME_VPP_WORKERS):
- test_title = colorize(
- f"FIXME with VPP workers: {test_title}", RED)
+ test_title = colorize(f"FIXME with VPP workers: {test_title}", RED)
if test.has_tag(TestCaseTag.FIXME_ASAN):
- test_title = colorize(
- f"FIXME with ASAN: {test_title}", RED)
+ test_title = colorize(f"FIXME with ASAN: {test_title}", RED)
test.skip_fixme_asan()
- if hasattr(test, 'vpp_worker_count'):
+ if hasattr(test, "vpp_worker_count"):
if test.vpp_worker_count == 0:
test_title += " [main thread only]"
elif test.vpp_worker_count == 1:
@@ -1633,7 +1761,9 @@ class VppTestResult(unittest.TestResult):
test_title = colorize(
f"{test_title} [skipped - not enough cpus, "
f"required={test.__class__.get_cpus_required()}, "
- f"available={max_vpp_cpus}]", YELLOW)
+ f"available={max_vpp_cpus}]",
+ YELLOW,
+ )
print(double_line_delim)
print(test_title)
@@ -1644,8 +1774,7 @@ class VppTestResult(unittest.TestResult):
self.start_test = time.time()
unittest.TestResult.startTest(self, test)
if self.verbosity > 0:
- self.stream.writeln(
- "Starting " + self.getDescription(test) + " ...")
+ self.stream.writeln("Starting " + self.getDescription(test) + " ...")
self.stream.writeln(single_line_delim)
def stopTest(self, test):
@@ -1659,14 +1788,19 @@ class VppTestResult(unittest.TestResult):
if self.verbosity > 0:
self.stream.writeln(single_line_delim)
- self.stream.writeln("%-73s%s" % (self.getDescription(test),
- self.result_string))
+ self.stream.writeln(
+ "%-73s%s" % (self.getDescription(test), self.result_string)
+ )
self.stream.writeln(single_line_delim)
else:
- self.stream.writeln("%-68s %4.2f %s" %
- (self.getDescription(test),
- time.time() - self.start_test,
- self.result_string))
+ self.stream.writeln(
+ "%-68s %4.2f %s"
+ % (
+ self.getDescription(test),
+ time.time() - self.start_test,
+ self.result_string,
+ )
+ )
self.send_result_through_pipe(test, TEST_RUN)
@@ -1676,12 +1810,12 @@ class VppTestResult(unittest.TestResult):
"""
if len(self.errors) > 0 or len(self.failures) > 0:
self.stream.writeln()
- self.printErrorList('ERROR', self.errors)
- self.printErrorList('FAIL', self.failures)
+ self.printErrorList("ERROR", self.errors)
+ self.printErrorList("FAIL", self.failures)
# ^^ that is the last output from unittest before summary
if not self.runner.print_summary:
- devnull = unittest.runner._WritelnDecorator(open(os.devnull, 'w'))
+ devnull = unittest.runner._WritelnDecorator(open(os.devnull, "w"))
self.stream = devnull
self.runner.stream = devnull
@@ -1696,8 +1830,7 @@ class VppTestResult(unittest.TestResult):
"""
for test, err in errors:
self.stream.writeln(double_line_delim)
- self.stream.writeln("%s: %s" %
- (flavour, self.getDescription(test)))
+ self.stream.writeln("%s: %s" % (flavour, self.getDescription(test)))
self.stream.writeln(single_line_delim)
self.stream.writeln("%s" % err)
@@ -1712,14 +1845,23 @@ class VppTestRunner(unittest.TextTestRunner):
"""Class maintaining the results of the tests"""
return VppTestResult
- def __init__(self, keep_alive_pipe=None, descriptions=True, verbosity=1,
- result_pipe=None, failfast=False, buffer=False,
- resultclass=None, print_summary=True, **kwargs):
+ def __init__(
+ self,
+ keep_alive_pipe=None,
+ descriptions=True,
+ verbosity=1,
+ result_pipe=None,
+ failfast=False,
+ buffer=False,
+ resultclass=None,
+ print_summary=True,
+ **kwargs,
+ ):
# ignore stream setting here, use hard-coded stdout to be in sync
# with prints from VppTestCase methods ...
- super(VppTestRunner, self).__init__(sys.stdout, descriptions,
- verbosity, failfast, buffer,
- resultclass, **kwargs)
+ super(VppTestRunner, self).__init__(
+ sys.stdout, descriptions, verbosity, failfast, buffer, resultclass, **kwargs
+ )
KeepAliveReporter.pipe = keep_alive_pipe
self.orig_stream = self.stream
@@ -1728,10 +1870,7 @@ class VppTestRunner(unittest.TextTestRunner):
self.print_summary = print_summary
def _makeResult(self):
- return self.resultclass(self.stream,
- self.descriptions,
- self.verbosity,
- self)
+ return self.resultclass(self.stream, self.descriptions, self.verbosity, self)
def run(self, test):
"""
@@ -1754,91 +1893,120 @@ class Worker(Thread):
super(Worker, self).__init__(*args, **kwargs)
self.logger = logger
self.args = executable_args
- if hasattr(self, 'testcase') and self.testcase.debug_all:
+ if hasattr(self, "testcase") and self.testcase.debug_all:
if self.testcase.debug_gdbserver:
- self.args = ['/usr/bin/gdbserver', 'localhost:{port}'
- .format(port=self.testcase.gdbserver_port)] + args
- elif self.testcase.debug_gdb and hasattr(self, 'wait_for_gdb'):
+ self.args = [
+ "/usr/bin/gdbserver",
+ "localhost:{port}".format(port=self.testcase.gdbserver_port),
+ ] + args
+ elif self.testcase.debug_gdb and hasattr(self, "wait_for_gdb"):
self.args.append(self.wait_for_gdb)
self.app_bin = executable_args[0]
self.app_name = os.path.basename(self.app_bin)
- if hasattr(self, 'role'):
- self.app_name += ' {role}'.format(role=self.role)
+ if hasattr(self, "role"):
+ self.app_name += " {role}".format(role=self.role)
self.process = None
self.result = None
env = {} if env is None else env
self.env = copy.deepcopy(env)
def wait_for_enter(self):
- if not hasattr(self, 'testcase'):
+ if not hasattr(self, "testcase"):
return
if self.testcase.debug_all and self.testcase.debug_gdbserver:
print()
print(double_line_delim)
- print("Spawned GDB Server for '{app}' with PID: {pid}"
- .format(app=self.app_name, pid=self.process.pid))
+ print(
+ "Spawned GDB Server for '{app}' with PID: {pid}".format(
+ app=self.app_name, pid=self.process.pid
+ )
+ )
elif self.testcase.debug_all and self.testcase.debug_gdb:
print()
print(double_line_delim)
- print("Spawned '{app}' with PID: {pid}"
- .format(app=self.app_name, pid=self.process.pid))
+ print(
+ "Spawned '{app}' with PID: {pid}".format(
+ app=self.app_name, pid=self.process.pid
+ )
+ )
else:
return
print(single_line_delim)
print("You can debug '{app}' using:".format(app=self.app_name))
if self.testcase.debug_gdbserver:
- print("sudo gdb " + self.app_bin +
- " -ex 'target remote localhost:{port}'"
- .format(port=self.testcase.gdbserver_port))
- print("Now is the time to attach gdb by running the above "
- "command, set up breakpoints etc., then resume from "
- "within gdb by issuing the 'continue' command")
+ print(
+ "sudo gdb "
+ + self.app_bin
+ + " -ex 'target remote localhost:{port}'".format(
+ port=self.testcase.gdbserver_port
+ )
+ )
+ print(
+ "Now is the time to attach gdb by running the above "
+ "command, set up breakpoints etc., then resume from "
+ "within gdb by issuing the 'continue' command"
+ )
self.testcase.gdbserver_port += 1
elif self.testcase.debug_gdb:
- print("sudo gdb " + self.app_bin +
- " -ex 'attach {pid}'".format(pid=self.process.pid))
- print("Now is the time to attach gdb by running the above "
- "command and set up breakpoints etc., then resume from"
- " within gdb by issuing the 'continue' command")
+ print(
+ "sudo gdb "
+ + self.app_bin
+ + " -ex 'attach {pid}'".format(pid=self.process.pid)
+ )
+ print(
+ "Now is the time to attach gdb by running the above "
+ "command and set up breakpoints etc., then resume from"
+ " within gdb by issuing the 'continue' command"
+ )
print(single_line_delim)
input("Press ENTER to continue running the testcase...")
def run(self):
executable = self.args[0]
if not os.path.exists(executable) or not os.access(
- executable, os.F_OK | os.X_OK):
+ executable, os.F_OK | os.X_OK
+ ):
# Exit code that means some system file did not exist,
# could not be opened, or had some other kind of error.
self.result = os.EX_OSFILE
raise EnvironmentError(
- "executable '%s' is not found or executable." % executable)
- self.logger.debug("Running executable '{app}': '{cmd}'"
- .format(app=self.app_name,
- cmd=' '.join(self.args)))
+ "executable '%s' is not found or executable." % executable
+ )
+ self.logger.debug(
+ "Running executable '{app}': '{cmd}'".format(
+ app=self.app_name, cmd=" ".join(self.args)
+ )
+ )
env = os.environ.copy()
env.update(self.env)
env["CK_LOG_FILE_NAME"] = "-"
self.process = subprocess.Popen(
- ['stdbuf', '-o0', '-e0'] + self.args, shell=False, env=env,
- preexec_fn=os.setpgrp, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
+ ["stdbuf", "-o0", "-e0"] + self.args,
+ shell=False,
+ env=env,
+ preexec_fn=os.setpgrp,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
self.wait_for_enter()
out, err = self.process.communicate()
self.logger.debug("Finished running `{app}'".format(app=self.app_name))
self.logger.info("Return code is `%s'" % self.process.returncode)
self.logger.info(single_line_delim)
- self.logger.info("Executable `{app}' wrote to stdout:"
- .format(app=self.app_name))
+ self.logger.info(
+ "Executable `{app}' wrote to stdout:".format(app=self.app_name)
+ )
self.logger.info(single_line_delim)
- self.logger.info(out.decode('utf-8'))
+ self.logger.info(out.decode("utf-8"))
self.logger.info(single_line_delim)
- self.logger.info("Executable `{app}' wrote to stderr:"
- .format(app=self.app_name))
+ self.logger.info(
+ "Executable `{app}' wrote to stderr:".format(app=self.app_name)
+ )
self.logger.info(single_line_delim)
- self.logger.info(err.decode('utf-8'))
+ self.logger.info(err.decode("utf-8"))
self.logger.info(single_line_delim)
self.result = self.process.returncode
-if __name__ == '__main__':
+if __name__ == "__main__":
pass