summaryrefslogtreecommitdiffstats
path: root/test/asf/asfframework.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/asf/asfframework.py')
-rw-r--r--test/asf/asfframework.py463
1 files changed, 100 insertions, 363 deletions
diff --git a/test/asf/asfframework.py b/test/asf/asfframework.py
index 1214fbfecd2..024d7f0127d 100644
--- a/test/asf/asfframework.py
+++ b/test/asf/asfframework.py
@@ -15,6 +15,7 @@ import random
import copy
import platform
import shutil
+from pathlib import Path
from collections import deque
from threading import Thread, Event
from inspect import getdoc, isclass
@@ -22,16 +23,11 @@ from traceback import format_exception
from logging import FileHandler, DEBUG, Formatter
from enum import Enum
from abc import ABC, abstractmethod
-from struct import pack, unpack
-from config import config, available_cpus, num_cpus, max_vpp_cpus
+from config import config, max_vpp_cpus
import hook as hookmodule
-from vpp_pg_interface import VppPGInterface
-from vpp_sub_interface import VppSubInterface
from vpp_lo_interface import VppLoInterface
-from vpp_bvi_interface import VppBviInterface
from vpp_papi_provider import VppPapiProvider
-from vpp_papi import VppEnum
import vpp_papi
from vpp_papi.vpp_stats import VPPStats
from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
@@ -45,13 +41,13 @@ from log import (
colorize,
)
from vpp_object import VppObjectRegistry
-from util import ppp, is_core_present
+from util import is_core_present
from test_result_code import TestResultCode
logger = logging.getLogger(__name__)
# Set up an empty logger for the testcase that can be overridden as necessary
-null_logger = logging.getLogger("VppTestCase")
+null_logger = logging.getLogger("VppAsfTestCase")
null_logger.addHandler(logging.NullHandler())
@@ -103,35 +99,6 @@ class VppDiedError(Exception):
super(VppDiedError, self).__init__(msg)
-class _PacketInfo(object):
- """Private class to create packet info object.
-
- Help process information about the next packet.
- Set variables to default values.
- """
-
- #: Store the index of the packet.
- index = -1
- #: Store the index of the source packet generator interface of the packet.
- src = -1
- #: Store the index of the destination packet generator interface
- #: of the packet.
- dst = -1
- #: Store expected ip version
- ip = -1
- #: Store expected upper protocol
- proto = -1
- #: Store the copy of the former packet.
- data = None
-
- def __eq__(self, other):
- index = self.index == other.index
- src = self.src == other.src
- dst = self.dst == other.dst
- data = self.data == other.data
- return index and src and dst and data
-
-
def pump_output(testclass):
"""pump output from vpp stdout/stderr to proper queues"""
stdout_fragment = ""
@@ -188,6 +155,36 @@ def _is_platform_aarch64():
is_platform_aarch64 = _is_platform_aarch64()
+def _is_distro_ubuntu2204():
+ with open("/etc/os-release") as f:
+ for line in f.readlines():
+ if "jammy" in line:
+ return True
+ return False
+
+
+is_distro_ubuntu2204 = _is_distro_ubuntu2204()
+
+
+def _is_distro_debian11():
+ with open("/etc/os-release") as f:
+ for line in f.readlines():
+ if "bullseye" in line:
+ return True
+ return False
+
+
+is_distro_debian11 = _is_distro_debian11()
+
+
+def _is_distro_ubuntu2204():
+ with open("/etc/os-release") as f:
+ for line in f.readlines():
+ if "jammy" in line:
+ return True
+ return False
+
+
class KeepAliveReporter(object):
"""
Singleton object which reports test start to parent process
@@ -233,6 +230,12 @@ class TestCaseTag(Enum):
FIXME_VPP_WORKERS = 2
# marks the suites broken when ASan is enabled
FIXME_ASAN = 3
+ # marks suites broken on Ubuntu-22.04
+ FIXME_UBUNTU2204 = 4
+ # marks suites broken on Debian-11
+ FIXME_DEBIAN11 = 5
+ # marks suites broken on debug vpp image
+ FIXME_VPP_DEBUG = 6
def create_tag_decorator(e):
@@ -249,6 +252,9 @@ def create_tag_decorator(e):
tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
+tag_fixme_ubuntu2204 = create_tag_decorator(TestCaseTag.FIXME_UBUNTU2204)
+tag_fixme_debian11 = create_tag_decorator(TestCaseTag.FIXME_DEBIAN11)
+tag_fixme_vpp_debug = create_tag_decorator(TestCaseTag.FIXME_VPP_DEBUG)
class DummyVpp:
@@ -276,7 +282,7 @@ class CPUInterface(ABC):
cls.cpus = cpus
-class VppTestCase(CPUInterface, unittest.TestCase):
+class VppAsfTestCase(CPUInterface, unittest.TestCase):
"""This subclass is a base class for VPP test cases that are implemented as
classes. It provides methods to create and run test case.
"""
@@ -288,19 +294,6 @@ class VppTestCase(CPUInterface, unittest.TestCase):
vapi_response_timeout = 5
remove_configured_vpp_objects_on_tear_down = True
- @property
- def packet_infos(self):
- """List of packet infos"""
- return self._packet_infos
-
- @classmethod
- def get_packet_count_for_if_idx(cls, dst_if_index):
- """Get the number of packet info for specified destination if index"""
- if dst_if_index in cls._packet_count_for_dst_if_idx:
- return cls._packet_count_for_dst_if_idx[dst_if_index]
- else:
- return 0
-
@classmethod
def has_tag(cls, tag):
"""if the test case has a given tag - return true"""
@@ -598,7 +591,7 @@ class VppTestCase(CPUInterface, unittest.TestCase):
if cls.debug_attach:
tmpdir = f"{config.tmp_dir}/unittest-attach-gdb"
else:
- tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
+ tmpdir = f"{config.tmp_dir}/{get_testcase_dirname(cls.__name__)}"
if config.wipe_tmp_dir:
shutil.rmtree(tmpdir, ignore_errors=True)
os.mkdir(tmpdir)
@@ -610,7 +603,7 @@ class VppTestCase(CPUInterface, unittest.TestCase):
cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
return
- logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
+ logdir = f"{config.log_dir}/{get_testcase_dirname(cls.__name__)}"
if config.wipe_tmp_dir:
shutil.rmtree(logdir, ignore_errors=True)
os.mkdir(logdir)
@@ -622,8 +615,9 @@ class VppTestCase(CPUInterface, unittest.TestCase):
Perform class setup before running the testcase
Remove shared memory files, start vpp and connect the vpp-api
"""
- super(VppTestCase, cls).setUpClass()
+ super(VppAsfTestCase, cls).setUpClass()
cls.logger = get_logger(cls.__name__)
+ cls.logger.debug(f"--- START setUpClass() {cls.__name__} ---")
random.seed(config.rnd_seed)
if hasattr(cls, "parallel_handler"):
cls.logger.addHandler(cls.parallel_handler)
@@ -645,9 +639,6 @@ class VppTestCase(CPUInterface, unittest.TestCase):
)
cls.logger.debug("Random seed is %s", config.rnd_seed)
cls.setUpConstants()
- cls.reset_packet_infos()
- cls._pcaps = []
- cls._old_pcaps = []
cls.verbose = 0
cls.vpp_dead = False
cls.registry = VppObjectRegistry()
@@ -684,6 +675,7 @@ class VppTestCase(CPUInterface, unittest.TestCase):
try:
hook.poll_vpp()
except VppDiedError:
+ cls.wait_for_coredump()
cls.vpp_startup_failed = True
cls.logger.critical(
"VPP died shortly after startup, check the"
@@ -718,6 +710,7 @@ class VppTestCase(CPUInterface, unittest.TestCase):
cls.logger.debug("Exception connecting to VPP: %s" % e)
cls.quit()
raise e
+ cls.logger.debug(f"--- END setUpClass() {cls.__name__} ---")
@classmethod
def _debug_quit(cls):
@@ -810,13 +803,13 @@ class VppTestCase(CPUInterface, unittest.TestCase):
@classmethod
def tearDownClass(cls):
"""Perform final cleanup after running all tests in this test-case"""
- cls.logger.debug("--- tearDownClass() for %s called ---" % cls.__name__)
+ cls.logger.debug(f"--- START tearDownClass() {cls.__name__} ---")
cls.reporter.send_keep_alive(cls, "tearDownClass")
cls.quit()
cls.file_handler.close()
- cls.reset_packet_infos()
if config.debug_framework:
debug_internal.on_tear_down_class(cls)
+ cls.logger.debug(f"--- END tearDownClass() {cls.__name__} ---")
def show_commands_at_teardown(self):
"""Allow subclass specific teardown logging additions."""
@@ -825,8 +818,7 @@ class VppTestCase(CPUInterface, unittest.TestCase):
def tearDown(self):
"""Show various debug prints after each test"""
self.logger.debug(
- "--- tearDown() for %s.%s(%s) called ---"
- % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
+ f"--- START tearDown() {self.__class__.__name__}.{self._testMethodName}({self._testMethodDoc}) ---"
)
try:
@@ -857,12 +849,29 @@ class VppTestCase(CPUInterface, unittest.TestCase):
self.vpp_dead = True
else:
self.registry.unregister_all(self.logger)
+ # Remove any leftover pcap files
+ if hasattr(self, "pg_interfaces") and len(self.pg_interfaces) > 0:
+ testcase_dir = os.path.dirname(self.pg_interfaces[0].out_path)
+ for p in Path(testcase_dir).glob("pg*.pcap"):
+ self.logger.debug(f"Removing {p}")
+ p.unlink()
+ self.logger.debug(
+ f"--- END tearDown() {self.__class__.__name__}.{self._testMethodName}('{self._testMethodDoc}') ---"
+ )
def setUp(self):
"""Clear trace before running each test"""
- super(VppTestCase, self).setUp()
+ super(VppAsfTestCase, self).setUp()
+ self.logger.debug(
+ f"--- START setUp() {self.__class__.__name__}.{self._testMethodName}('{self._testMethodDoc}') ---"
+ )
+ # Save testname include in pcap history filenames
+ if hasattr(self, "pg_interfaces"):
+ for i in self.pg_interfaces:
+ i.test_name = self._testMethodName
self.reporter.send_keep_alive(self)
if self.vpp_dead:
+ self.wait_for_coredump()
raise VppDiedError(
rv=None,
testcase=self.__class__.__name__,
@@ -881,26 +890,9 @@ class VppTestCase(CPUInterface, unittest.TestCase):
# store the test instance inside the test class - so that objects
# holding the class can access instance methods (like assertEqual)
type(self).test_instance = self
-
- @classmethod
- def pg_enable_capture(cls, interfaces=None):
- """
- Enable capture on packet-generator interfaces
-
- :param interfaces: iterable interface indexes (if None,
- use self.pg_interfaces)
-
- """
- if interfaces is None:
- interfaces = cls.pg_interfaces
- for i in interfaces:
- i.enable_capture()
-
- @classmethod
- def register_pcap(cls, intf, worker):
- """Register a pcap in the testclass"""
- # add to the list of captures with current timestamp
- cls._pcaps.append((intf, worker))
+ self.logger.debug(
+ f"--- END setUp() {self.__class__.__name__}.{self._testMethodName}('{self._testMethodDoc}') ---"
+ )
@classmethod
def get_vpp_time(cls):
@@ -922,76 +914,6 @@ class VppTestCase(CPUInterface, unittest.TestCase):
cls.sleep(0.1)
@classmethod
- def pg_start(cls, trace=True):
- """Enable the PG, wait till it is done, then clean up"""
- for intf, worker in cls._old_pcaps:
- intf.handle_old_pcap_file(intf.get_in_path(worker), intf.in_history_counter)
- cls._old_pcaps = []
- if trace:
- cls.vapi.cli("clear trace")
- cls.vapi.cli("trace add pg-input 1000")
- cls.vapi.cli("packet-generator enable")
- # PG, when starts, runs to completion -
- # so let's avoid a race condition,
- # and wait a little till it's done.
- # Then clean it up - and then be gone.
- deadline = time.time() + 300
- while cls.vapi.cli("show packet-generator").find("Yes") != -1:
- cls.sleep(0.01) # yield
- if time.time() > deadline:
- cls.logger.error("Timeout waiting for pg to stop")
- break
- for intf, worker in cls._pcaps:
- cls.vapi.cli("packet-generator delete %s" % intf.get_cap_name(worker))
- cls._old_pcaps = cls._pcaps
- cls._pcaps = []
-
- @classmethod
- def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0, mode=None):
- """
- Create packet-generator interfaces.
-
- :param interfaces: iterable indexes of the interfaces.
- :returns: List of created interfaces.
-
- """
- result = []
- for i in interfaces:
- intf = VppPGInterface(cls, i, gso, gso_size, mode)
- setattr(cls, intf.name, intf)
- result.append(intf)
- cls.pg_interfaces = result
- return result
-
- @classmethod
- def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
- pgmode = VppEnum.vl_api_pg_interface_mode_t
- return cls.create_pg_interfaces_internal(
- interfaces, gso, gso_size, pgmode.PG_API_MODE_IP4
- )
-
- @classmethod
- def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
- pgmode = VppEnum.vl_api_pg_interface_mode_t
- return cls.create_pg_interfaces_internal(
- interfaces, gso, gso_size, pgmode.PG_API_MODE_IP6
- )
-
- @classmethod
- def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
- pgmode = VppEnum.vl_api_pg_interface_mode_t
- return cls.create_pg_interfaces_internal(
- interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
- )
-
- @classmethod
- def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
- pgmode = VppEnum.vl_api_pg_interface_mode_t
- return cls.create_pg_interfaces_internal(
- interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
- )
-
- @classmethod
def create_loopback_interfaces(cls, count):
"""
Create loopback interfaces.
@@ -1005,119 +927,6 @@ class VppTestCase(CPUInterface, unittest.TestCase):
cls.lo_interfaces = result
return result
- @classmethod
- def create_bvi_interfaces(cls, count):
- """
- Create BVI interfaces.
-
- :param count: number of interfaces created.
- :returns: List of created interfaces.
- """
- result = [VppBviInterface(cls) for i in range(count)]
- for intf in result:
- setattr(cls, intf.name, intf)
- cls.bvi_interfaces = result
- return result
-
- @classmethod
- def reset_packet_infos(cls):
- """Reset the list of packet info objects and packet counts to zero"""
- cls._packet_infos = {}
- cls._packet_count_for_dst_if_idx = {}
-
- @classmethod
- def create_packet_info(cls, src_if, dst_if):
- """
- Create packet info object containing the source and destination indexes
- and add it to the testcase's packet info list
-
- :param VppInterface src_if: source interface
- :param VppInterface dst_if: destination interface
-
- :returns: _PacketInfo object
-
- """
- info = _PacketInfo()
- info.index = len(cls._packet_infos)
- info.src = src_if.sw_if_index
- info.dst = dst_if.sw_if_index
- if isinstance(dst_if, VppSubInterface):
- dst_idx = dst_if.parent.sw_if_index
- else:
- dst_idx = dst_if.sw_if_index
- if dst_idx in cls._packet_count_for_dst_if_idx:
- cls._packet_count_for_dst_if_idx[dst_idx] += 1
- else:
- cls._packet_count_for_dst_if_idx[dst_idx] = 1
- cls._packet_infos[info.index] = info
- return info
-
- @staticmethod
- def info_to_payload(info):
- """
- Convert _PacketInfo object to packet payload
-
- :param info: _PacketInfo object
-
- :returns: string containing serialized data from packet info
- """
-
- # retrieve payload, currently 18 bytes (4 x ints + 1 short)
- return pack("iiiih", info.index, info.src, info.dst, info.ip, info.proto)
-
- def get_next_packet_info(self, info):
- """
- Iterate over the packet info list stored in the testcase
- Start iteration with first element if info is None
- Continue based on index in info if info is specified
-
- :param info: info or None
- :returns: next info in list or None if no more infos
- """
- if info is None:
- next_index = 0
- else:
- next_index = info.index + 1
- if next_index == len(self._packet_infos):
- return None
- else:
- return self._packet_infos[next_index]
-
- def get_next_packet_info_for_interface(self, src_index, info):
- """
- Search the packet info list for the next packet info with same source
- interface index
-
- :param src_index: source interface index to search for
- :param info: packet info - where to start the search
- :returns: packet info or None
-
- """
- while True:
- info = self.get_next_packet_info(info)
- if info is None:
- return None
- if info.src == src_index:
- return info
-
- def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
- """
- Search the packet info list for the next packet info with same source
- and destination interface indexes
-
- :param src_index: source interface index to search for
- :param dst_index: destination interface index to search for
- :param info: packet info - where to start the search
- :returns: packet info or None
-
- """
- while True:
- info = self.get_next_packet_info_for_interface(src_index, info)
- if info is None:
- return None
- if info.dst == dst_index:
- return info
-
def assert_equal(self, real_value, expected_value, name_or_class=None):
if name_or_class is None:
self.assertEqual(real_value, expected_value)
@@ -1152,25 +961,6 @@ class VppTestCase(CPUInterface, unittest.TestCase):
)
self.assertTrue(expected_min <= real_value <= expected_max, msg)
- def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False):
- self.assert_checksum_valid(
- received_packet, "IP", ignore_zero_checksum=ignore_zero_checksum
- )
-
- def assert_tcp_checksum_valid(self, received_packet, ignore_zero_checksum=False):
- self.assert_checksum_valid(
- received_packet, "TCP", ignore_zero_checksum=ignore_zero_checksum
- )
-
- def assert_udp_checksum_valid(self, received_packet, ignore_zero_checksum=True):
- self.assert_checksum_valid(
- received_packet, "UDP", ignore_zero_checksum=ignore_zero_checksum
- )
-
- def assert_icmp_checksum_valid(self, received_packet):
- self.assert_checksum_valid(received_packet, "ICMP")
- self.assert_embedded_icmp_checksum_valid(received_packet)
-
def get_counter(self, counter):
if counter.startswith("/"):
counter_value = self.statistics.get_counter(counter)
@@ -1196,12 +986,6 @@ class VppTestCase(CPUInterface, unittest.TestCase):
)
self.assert_equal(c, expected_value, "counter `%s[%s]'" % (counter, index))
- def assert_packet_counter_equal(self, counter, expected_value):
- counter_value = self.get_counter(counter)
- self.assert_equal(
- counter_value, expected_value, "packet counter `%s'" % counter
- )
-
def assert_error_counter_equal(self, counter, expected_value):
counter_value = self.statistics[counter].sum()
self.assert_equal(counter_value, expected_value, "error counter `%s'" % counter)
@@ -1242,11 +1026,6 @@ class VppTestCase(CPUInterface, unittest.TestCase):
self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
self.vapi.cli("set clock adjust %s" % timeout)
- def pg_send(self, intf, pkts, worker=None, trace=True):
- intf.add_stream(pkts, worker=worker)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start(trace=trace)
-
def snapshot_stats(self, stats_diff):
"""Return snapshot of interesting stats based on diff dictionary."""
stats_snapshot = {}
@@ -1287,70 +1066,6 @@ class VppTestCase(CPUInterface, unittest.TestCase):
f"Couldn't sum counter: {cntr} on sw_if_index: {sw_if_index}"
) from e
- def send_and_assert_no_replies(
- self, intf, pkts, remark="", timeout=None, stats_diff=None, trace=True, msg=None
- ):
- if stats_diff:
- stats_snapshot = self.snapshot_stats(stats_diff)
-
- self.pg_send(intf, pkts)
-
- try:
- if not timeout:
- timeout = 1
- for i in self.pg_interfaces:
- i.assert_nothing_captured(timeout=timeout, remark=remark)
- timeout = 0.1
- finally:
- if trace:
- if msg:
- self.logger.debug(f"send_and_assert_no_replies: {msg}")
- self.logger.debug(self.vapi.cli("show trace"))
-
- if stats_diff:
- self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
-
- def send_and_expect_load_balancing(
- self, input, pkts, outputs, worker=None, trace=True
- ):
- self.pg_send(input, pkts, worker=worker, trace=trace)
- rxs = []
- for oo in outputs:
- rx = oo._get_capture(1)
- self.assertNotEqual(0, len(rx))
- rxs.append(rx)
- if trace:
- self.logger.debug(self.vapi.cli("show trace"))
- return rxs
-
- def send_and_expect_some(self, intf, pkts, output, worker=None, trace=True):
- self.pg_send(intf, pkts, worker=worker, trace=trace)
- rx = output._get_capture(1)
- if trace:
- self.logger.debug(self.vapi.cli("show trace"))
- self.assertTrue(len(rx) > 0)
- self.assertTrue(len(rx) < len(pkts))
- return rx
-
- def send_and_expect_only(self, intf, pkts, output, timeout=None, stats_diff=None):
- if stats_diff:
- stats_snapshot = self.snapshot_stats(stats_diff)
-
- self.pg_send(intf, pkts)
- rx = output.get_capture(len(pkts))
- outputs = [output]
- if not timeout:
- timeout = 1
- for i in self.pg_interfaces:
- if i not in outputs:
- i.assert_nothing_captured(timeout=timeout)
- timeout = 0.1
-
- if stats_diff:
- self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
-
- return rx
-
def get_testcase_doc_name(test):
return getdoc(test.__class__).splitlines()[0]
@@ -1364,6 +1079,14 @@ def get_test_description(descriptions, test):
return str(test)
+def get_failed_testcase_linkname(failed_dir, testcase_dirname):
+ return os.path.join(failed_dir, f"{testcase_dirname}-FAILED")
+
+
+def get_testcase_dirname(testcase_class_name):
+ return f"vpp-unittest-{testcase_class_name}"
+
+
class TestCaseInfo(object):
def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
self.logger = logger
@@ -1409,6 +1132,17 @@ class VppTestResult(unittest.TestResult):
self.runner = runner
self.printed = []
+ def decodePcapFiles(self, test, when_configured=False):
+ if when_configured == False or config.decode_pcaps == True:
+ if hasattr(test, "pg_interfaces") and len(test.pg_interfaces) > 0:
+ testcase_dir = os.path.dirname(test.pg_interfaces[0].out_path)
+ test.pg_interfaces[0].decode_pcap_files(
+ testcase_dir, f"suite{test.__class__.__name__}"
+ )
+ test.pg_interfaces[0].decode_pcap_files(
+ testcase_dir, test._testMethodName
+ )
+
def addSuccess(self, test):
"""
Record a test succeeded result
@@ -1417,6 +1151,7 @@ class VppTestResult(unittest.TestResult):
"""
self.log_result("addSuccess", test)
+ self.decodePcapFiles(test, when_configured=True)
unittest.TestResult.addSuccess(self, test)
self.result_string = colorize("OK", GREEN)
self.result_code = TestResultCode.PASS
@@ -1424,6 +1159,7 @@ class VppTestResult(unittest.TestResult):
def addExpectedFailure(self, test, err):
self.log_result("addExpectedFailure", test, err)
+ self.decodePcapFiles(test)
super().addExpectedFailure(test, err)
self.result_string = colorize("FAIL", GREEN)
self.result_code = TestResultCode.EXPECTED_FAIL
@@ -1431,6 +1167,7 @@ class VppTestResult(unittest.TestResult):
def addUnexpectedSuccess(self, test):
self.log_result("addUnexpectedSuccess", test)
+ self.decodePcapFiles(test, when_configured=True)
super().addUnexpectedSuccess(test)
self.result_string = colorize("OK", RED)
self.result_code = TestResultCode.UNEXPECTED_PASS
@@ -1458,9 +1195,8 @@ class VppTestResult(unittest.TestResult):
if self.current_test_case_info:
try:
failed_dir = config.failed_dir
- link_path = os.path.join(
- failed_dir,
- "%s-FAILED" % os.path.basename(self.current_test_case_info.tempdir),
+ link_path = get_failed_testcase_linkname(
+ failed_dir, os.path.basename(self.current_test_case_info.tempdir)
)
self.current_test_case_info.logger.debug(
@@ -1517,6 +1253,7 @@ class VppTestResult(unittest.TestResult):
error_type_str = colorize("ERROR", RED)
else:
raise Exception(f"Unexpected result code {result_code}")
+ self.decodePcapFiles(test)
unittest_fn(self, test, err)
if self.current_test_case_info:
@@ -1727,7 +1464,7 @@ class VppTestRunner(unittest.TextTestRunner):
**kwargs,
):
# ignore stream setting here, use hard-coded stdout to be in sync
- # with prints from VppTestCase methods ...
+ # with prints from VppAsfTestCase methods ...
super(VppTestRunner, self).__init__(
sys.stdout, descriptions, verbosity, failfast, buffer, resultclass, **kwargs
)