aboutsummaryrefslogtreecommitdiffstats
path: root/test/asf
diff options
context:
space:
mode:
authorDave Wallace <dwallacelf@gmail.com>2023-08-31 00:47:44 -0400
committerAndrew Yourtchenko <ayourtch@gmail.com>2023-11-03 05:06:43 +0000
commit8800f732f868bf54da8adba05e38bd2477895ca5 (patch)
tree41cfeab26058ef7238c1e1e8199a05617a98541e /test/asf
parentaf5684bf18077acf1f448c6f2a62ef1af9f9be05 (diff)
tests: refactor asf framework code
- Make framework.py classes a subset of asfframework.py classes - Remove all packet related code from asfframework.py - Add test class and test case set up debug output to log - Repatriate packet tests from asf to test directory - Remove non-packet related code from framework.py and inherit them from asfframework.py classes - Clean up unused import variables - Re-enable BFD tests on Ubuntu 22.04 and fix intermittent test failures in echo_looped_back testcases (where # control packets verified but not guaranteed to be received during test) - Re-enable Wireguard tests on Ubuntu 22.04 and fix intermittent test failures in handshake ratelimiting testcases and event testcase - Run Wiregard testcase suites solo - Improve debug output in log.txt - Increase VCL/LDP post sleep timeout to allow iperf server to finish cleanly. - Fix pcap history files to be sorted by suite and testcase and ensure order/timestamp is correct based on creation in the testcase. - Decode pcap files for each suite and testcase for all errors or if configured via comandline option / env var - Improve vpp corefile detection to allow complete corefile generation - Disable vm vpp interfaces testcases on debian11 - Clean up failed unittest dir when retrying failed testcases and unify testname directory and failed linknames into framwork functions Type: test Change-Id: I0764f79ea5bb639d278bf635ed2408d4d5220e1e Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'test/asf')
-rw-r--r--test/asf/asfframework.py463
-rw-r--r--test/asf/lisp.py385
-rw-r--r--test/asf/remote_test.py431
-rw-r--r--test/asf/test_adl.py5
-rw-r--r--test/asf/test_api_client.py5
-rw-r--r--test/asf/test_api_trace.py6
-rw-r--r--test/asf/test_bihash.py5
-rw-r--r--test/asf/test_buffers.py4
-rw-r--r--test/asf/test_cli.py8
-rw-r--r--test/asf/test_counters.py5
-rw-r--r--test/asf/test_crypto.py4
-rw-r--r--test/asf/test_endian.py4
-rw-r--r--test/asf/test_fib.py5
-rw-r--r--test/asf/test_http.py7
-rw-r--r--test/asf/test_http_static.py6
-rw-r--r--test/asf/test_ipfix_export.py196
-rw-r--r--test/asf/test_ipsec_default.py199
-rw-r--r--test/asf/test_ipsec_spd_flow_cache_input.py866
-rw-r--r--test/asf/test_ipsec_spd_flow_cache_output.py765
-rw-r--r--test/asf/test_ipsec_spd_fp_output.py1418
-rw-r--r--test/asf/test_lb_api.py9
-rw-r--r--test/asf/test_mactime.py5
-rw-r--r--test/asf/test_mpcap.py5
-rw-r--r--test/asf/test_node_variants.py4
-rw-r--r--test/asf/test_offload.py5
-rw-r--r--test/asf/test_pcap.py163
-rw-r--r--test/asf/test_policer.py6
-rw-r--r--test/asf/test_quic.py6
-rw-r--r--test/asf/test_session.py17
-rw-r--r--test/asf/test_sparse_vec.py5
-rw-r--r--test/asf/test_string.py5
-rw-r--r--test/asf/test_tap.py4
-rw-r--r--test/asf/test_tcp.py6
-rw-r--r--test/asf/test_tls.py4
-rw-r--r--test/asf/test_vapi.py4
-rw-r--r--test/asf/test_vcl.py8
-rw-r--r--test/asf/test_vhost.py4
-rw-r--r--test/asf/test_vlib.py327
-rw-r--r--test/asf/test_vpe_api.py5
-rw-r--r--test/asf/test_vppinfra.py4
40 files changed, 176 insertions, 5207 deletions
diff --git a/test/asf/asfframework.py b/test/asf/asfframework.py
index 1214fbfecd2..024d7f0127d 100644
--- a/test/asf/asfframework.py
+++ b/test/asf/asfframework.py
@@ -15,6 +15,7 @@ import random
import copy
import platform
import shutil
+from pathlib import Path
from collections import deque
from threading import Thread, Event
from inspect import getdoc, isclass
@@ -22,16 +23,11 @@ from traceback import format_exception
from logging import FileHandler, DEBUG, Formatter
from enum import Enum
from abc import ABC, abstractmethod
-from struct import pack, unpack
-from config import config, available_cpus, num_cpus, max_vpp_cpus
+from config import config, max_vpp_cpus
import hook as hookmodule
-from vpp_pg_interface import VppPGInterface
-from vpp_sub_interface import VppSubInterface
from vpp_lo_interface import VppLoInterface
-from vpp_bvi_interface import VppBviInterface
from vpp_papi_provider import VppPapiProvider
-from vpp_papi import VppEnum
import vpp_papi
from vpp_papi.vpp_stats import VPPStats
from vpp_papi.vpp_transport_socket import VppTransportSocketIOError
@@ -45,13 +41,13 @@ from log import (
colorize,
)
from vpp_object import VppObjectRegistry
-from util import ppp, is_core_present
+from util import is_core_present
from test_result_code import TestResultCode
logger = logging.getLogger(__name__)
# Set up an empty logger for the testcase that can be overridden as necessary
-null_logger = logging.getLogger("VppTestCase")
+null_logger = logging.getLogger("VppAsfTestCase")
null_logger.addHandler(logging.NullHandler())
@@ -103,35 +99,6 @@ class VppDiedError(Exception):
super(VppDiedError, self).__init__(msg)
-class _PacketInfo(object):
- """Private class to create packet info object.
-
- Help process information about the next packet.
- Set variables to default values.
- """
-
- #: Store the index of the packet.
- index = -1
- #: Store the index of the source packet generator interface of the packet.
- src = -1
- #: Store the index of the destination packet generator interface
- #: of the packet.
- dst = -1
- #: Store expected ip version
- ip = -1
- #: Store expected upper protocol
- proto = -1
- #: Store the copy of the former packet.
- data = None
-
- def __eq__(self, other):
- index = self.index == other.index
- src = self.src == other.src
- dst = self.dst == other.dst
- data = self.data == other.data
- return index and src and dst and data
-
-
def pump_output(testclass):
"""pump output from vpp stdout/stderr to proper queues"""
stdout_fragment = ""
@@ -188,6 +155,36 @@ def _is_platform_aarch64():
is_platform_aarch64 = _is_platform_aarch64()
+def _is_distro_ubuntu2204():
+ with open("/etc/os-release") as f:
+ for line in f.readlines():
+ if "jammy" in line:
+ return True
+ return False
+
+
+is_distro_ubuntu2204 = _is_distro_ubuntu2204()
+
+
+def _is_distro_debian11():
+ with open("/etc/os-release") as f:
+ for line in f.readlines():
+ if "bullseye" in line:
+ return True
+ return False
+
+
+is_distro_debian11 = _is_distro_debian11()
+
+
+def _is_distro_ubuntu2204():
+ with open("/etc/os-release") as f:
+ for line in f.readlines():
+ if "jammy" in line:
+ return True
+ return False
+
+
class KeepAliveReporter(object):
"""
Singleton object which reports test start to parent process
@@ -233,6 +230,12 @@ class TestCaseTag(Enum):
FIXME_VPP_WORKERS = 2
# marks the suites broken when ASan is enabled
FIXME_ASAN = 3
+ # marks suites broken on Ubuntu-22.04
+ FIXME_UBUNTU2204 = 4
+ # marks suites broken on Debian-11
+ FIXME_DEBIAN11 = 5
+ # marks suites broken on debug vpp image
+ FIXME_VPP_DEBUG = 6
def create_tag_decorator(e):
@@ -249,6 +252,9 @@ def create_tag_decorator(e):
tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
+tag_fixme_ubuntu2204 = create_tag_decorator(TestCaseTag.FIXME_UBUNTU2204)
+tag_fixme_debian11 = create_tag_decorator(TestCaseTag.FIXME_DEBIAN11)
+tag_fixme_vpp_debug = create_tag_decorator(TestCaseTag.FIXME_VPP_DEBUG)
class DummyVpp:
@@ -276,7 +282,7 @@ class CPUInterface(ABC):
cls.cpus = cpus
-class VppTestCase(CPUInterface, unittest.TestCase):
+class VppAsfTestCase(CPUInterface, unittest.TestCase):
"""This subclass is a base class for VPP test cases that are implemented as
classes. It provides methods to create and run test case.
"""
@@ -288,19 +294,6 @@ class VppTestCase(CPUInterface, unittest.TestCase):
vapi_response_timeout = 5
remove_configured_vpp_objects_on_tear_down = True
- @property
- def packet_infos(self):
- """List of packet infos"""
- return self._packet_infos
-
- @classmethod
- def get_packet_count_for_if_idx(cls, dst_if_index):
- """Get the number of packet info for specified destination if index"""
- if dst_if_index in cls._packet_count_for_dst_if_idx:
- return cls._packet_count_for_dst_if_idx[dst_if_index]
- else:
- return 0
-
@classmethod
def has_tag(cls, tag):
"""if the test case has a given tag - return true"""
@@ -598,7 +591,7 @@ class VppTestCase(CPUInterface, unittest.TestCase):
if cls.debug_attach:
tmpdir = f"{config.tmp_dir}/unittest-attach-gdb"
else:
- tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}"
+ tmpdir = f"{config.tmp_dir}/{get_testcase_dirname(cls.__name__)}"
if config.wipe_tmp_dir:
shutil.rmtree(tmpdir, ignore_errors=True)
os.mkdir(tmpdir)
@@ -610,7 +603,7 @@ class VppTestCase(CPUInterface, unittest.TestCase):
cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt")
return
- logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}"
+ logdir = f"{config.log_dir}/{get_testcase_dirname(cls.__name__)}"
if config.wipe_tmp_dir:
shutil.rmtree(logdir, ignore_errors=True)
os.mkdir(logdir)
@@ -622,8 +615,9 @@ class VppTestCase(CPUInterface, unittest.TestCase):
Perform class setup before running the testcase
Remove shared memory files, start vpp and connect the vpp-api
"""
- super(VppTestCase, cls).setUpClass()
+ super(VppAsfTestCase, cls).setUpClass()
cls.logger = get_logger(cls.__name__)
+ cls.logger.debug(f"--- START setUpClass() {cls.__name__} ---")
random.seed(config.rnd_seed)
if hasattr(cls, "parallel_handler"):
cls.logger.addHandler(cls.parallel_handler)
@@ -645,9 +639,6 @@ class VppTestCase(CPUInterface, unittest.TestCase):
)
cls.logger.debug("Random seed is %s", config.rnd_seed)
cls.setUpConstants()
- cls.reset_packet_infos()
- cls._pcaps = []
- cls._old_pcaps = []
cls.verbose = 0
cls.vpp_dead = False
cls.registry = VppObjectRegistry()
@@ -684,6 +675,7 @@ class VppTestCase(CPUInterface, unittest.TestCase):
try:
hook.poll_vpp()
except VppDiedError:
+ cls.wait_for_coredump()
cls.vpp_startup_failed = True
cls.logger.critical(
"VPP died shortly after startup, check the"
@@ -718,6 +710,7 @@ class VppTestCase(CPUInterface, unittest.TestCase):
cls.logger.debug("Exception connecting to VPP: %s" % e)
cls.quit()
raise e
+ cls.logger.debug(f"--- END setUpClass() {cls.__name__} ---")
@classmethod
def _debug_quit(cls):
@@ -810,13 +803,13 @@ class VppTestCase(CPUInterface, unittest.TestCase):
@classmethod
def tearDownClass(cls):
"""Perform final cleanup after running all tests in this test-case"""
- cls.logger.debug("--- tearDownClass() for %s called ---" % cls.__name__)
+ cls.logger.debug(f"--- START tearDownClass() {cls.__name__} ---")
cls.reporter.send_keep_alive(cls, "tearDownClass")
cls.quit()
cls.file_handler.close()
- cls.reset_packet_infos()
if config.debug_framework:
debug_internal.on_tear_down_class(cls)
+ cls.logger.debug(f"--- END tearDownClass() {cls.__name__} ---")
def show_commands_at_teardown(self):
"""Allow subclass specific teardown logging additions."""
@@ -825,8 +818,7 @@ class VppTestCase(CPUInterface, unittest.TestCase):
def tearDown(self):
"""Show various debug prints after each test"""
self.logger.debug(
- "--- tearDown() for %s.%s(%s) called ---"
- % (self.__class__.__name__, self._testMethodName, self._testMethodDoc)
+ f"--- START tearDown() {self.__class__.__name__}.{self._testMethodName}({self._testMethodDoc}) ---"
)
try:
@@ -857,12 +849,29 @@ class VppTestCase(CPUInterface, unittest.TestCase):
self.vpp_dead = True
else:
self.registry.unregister_all(self.logger)
+ # Remove any leftover pcap files
+ if hasattr(self, "pg_interfaces") and len(self.pg_interfaces) > 0:
+ testcase_dir = os.path.dirname(self.pg_interfaces[0].out_path)
+ for p in Path(testcase_dir).glob("pg*.pcap"):
+ self.logger.debug(f"Removing {p}")
+ p.unlink()
+ self.logger.debug(
+ f"--- END tearDown() {self.__class__.__name__}.{self._testMethodName}('{self._testMethodDoc}') ---"
+ )
def setUp(self):
"""Clear trace before running each test"""
- super(VppTestCase, self).setUp()
+ super(VppAsfTestCase, self).setUp()
+ self.logger.debug(
+ f"--- START setUp() {self.__class__.__name__}.{self._testMethodName}('{self._testMethodDoc}') ---"
+ )
+ # Save testname include in pcap history filenames
+ if hasattr(self, "pg_interfaces"):
+ for i in self.pg_interfaces:
+ i.test_name = self._testMethodName
self.reporter.send_keep_alive(self)
if self.vpp_dead:
+ self.wait_for_coredump()
raise VppDiedError(
rv=None,
testcase=self.__class__.__name__,
@@ -881,26 +890,9 @@ class VppTestCase(CPUInterface, unittest.TestCase):
# store the test instance inside the test class - so that objects
# holding the class can access instance methods (like assertEqual)
type(self).test_instance = self
-
- @classmethod
- def pg_enable_capture(cls, interfaces=None):
- """
- Enable capture on packet-generator interfaces
-
- :param interfaces: iterable interface indexes (if None,
- use self.pg_interfaces)
-
- """
- if interfaces is None:
- interfaces = cls.pg_interfaces
- for i in interfaces:
- i.enable_capture()
-
- @classmethod
- def register_pcap(cls, intf, worker):
- """Register a pcap in the testclass"""
- # add to the list of captures with current timestamp
- cls._pcaps.append((intf, worker))
+ self.logger.debug(
+ f"--- END setUp() {self.__class__.__name__}.{self._testMethodName}('{self._testMethodDoc}') ---"
+ )
@classmethod
def get_vpp_time(cls):
@@ -922,76 +914,6 @@ class VppTestCase(CPUInterface, unittest.TestCase):
cls.sleep(0.1)
@classmethod
- def pg_start(cls, trace=True):
- """Enable the PG, wait till it is done, then clean up"""
- for intf, worker in cls._old_pcaps:
- intf.handle_old_pcap_file(intf.get_in_path(worker), intf.in_history_counter)
- cls._old_pcaps = []
- if trace:
- cls.vapi.cli("clear trace")
- cls.vapi.cli("trace add pg-input 1000")
- cls.vapi.cli("packet-generator enable")
- # PG, when starts, runs to completion -
- # so let's avoid a race condition,
- # and wait a little till it's done.
- # Then clean it up - and then be gone.
- deadline = time.time() + 300
- while cls.vapi.cli("show packet-generator").find("Yes") != -1:
- cls.sleep(0.01) # yield
- if time.time() > deadline:
- cls.logger.error("Timeout waiting for pg to stop")
- break
- for intf, worker in cls._pcaps:
- cls.vapi.cli("packet-generator delete %s" % intf.get_cap_name(worker))
- cls._old_pcaps = cls._pcaps
- cls._pcaps = []
-
- @classmethod
- def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0, mode=None):
- """
- Create packet-generator interfaces.
-
- :param interfaces: iterable indexes of the interfaces.
- :returns: List of created interfaces.
-
- """
- result = []
- for i in interfaces:
- intf = VppPGInterface(cls, i, gso, gso_size, mode)
- setattr(cls, intf.name, intf)
- result.append(intf)
- cls.pg_interfaces = result
- return result
-
- @classmethod
- def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0):
- pgmode = VppEnum.vl_api_pg_interface_mode_t
- return cls.create_pg_interfaces_internal(
- interfaces, gso, gso_size, pgmode.PG_API_MODE_IP4
- )
-
- @classmethod
- def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0):
- pgmode = VppEnum.vl_api_pg_interface_mode_t
- return cls.create_pg_interfaces_internal(
- interfaces, gso, gso_size, pgmode.PG_API_MODE_IP6
- )
-
- @classmethod
- def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0):
- pgmode = VppEnum.vl_api_pg_interface_mode_t
- return cls.create_pg_interfaces_internal(
- interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
- )
-
- @classmethod
- def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0):
- pgmode = VppEnum.vl_api_pg_interface_mode_t
- return cls.create_pg_interfaces_internal(
- interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET
- )
-
- @classmethod
def create_loopback_interfaces(cls, count):
"""
Create loopback interfaces.
@@ -1005,119 +927,6 @@ class VppTestCase(CPUInterface, unittest.TestCase):
cls.lo_interfaces = result
return result
- @classmethod
- def create_bvi_interfaces(cls, count):
- """
- Create BVI interfaces.
-
- :param count: number of interfaces created.
- :returns: List of created interfaces.
- """
- result = [VppBviInterface(cls) for i in range(count)]
- for intf in result:
- setattr(cls, intf.name, intf)
- cls.bvi_interfaces = result
- return result
-
- @classmethod
- def reset_packet_infos(cls):
- """Reset the list of packet info objects and packet counts to zero"""
- cls._packet_infos = {}
- cls._packet_count_for_dst_if_idx = {}
-
- @classmethod
- def create_packet_info(cls, src_if, dst_if):
- """
- Create packet info object containing the source and destination indexes
- and add it to the testcase's packet info list
-
- :param VppInterface src_if: source interface
- :param VppInterface dst_if: destination interface
-
- :returns: _PacketInfo object
-
- """
- info = _PacketInfo()
- info.index = len(cls._packet_infos)
- info.src = src_if.sw_if_index
- info.dst = dst_if.sw_if_index
- if isinstance(dst_if, VppSubInterface):
- dst_idx = dst_if.parent.sw_if_index
- else:
- dst_idx = dst_if.sw_if_index
- if dst_idx in cls._packet_count_for_dst_if_idx:
- cls._packet_count_for_dst_if_idx[dst_idx] += 1
- else:
- cls._packet_count_for_dst_if_idx[dst_idx] = 1
- cls._packet_infos[info.index] = info
- return info
-
- @staticmethod
- def info_to_payload(info):
- """
- Convert _PacketInfo object to packet payload
-
- :param info: _PacketInfo object
-
- :returns: string containing serialized data from packet info
- """
-
- # retrieve payload, currently 18 bytes (4 x ints + 1 short)
- return pack("iiiih", info.index, info.src, info.dst, info.ip, info.proto)
-
- def get_next_packet_info(self, info):
- """
- Iterate over the packet info list stored in the testcase
- Start iteration with first element if info is None
- Continue based on index in info if info is specified
-
- :param info: info or None
- :returns: next info in list or None if no more infos
- """
- if info is None:
- next_index = 0
- else:
- next_index = info.index + 1
- if next_index == len(self._packet_infos):
- return None
- else:
- return self._packet_infos[next_index]
-
- def get_next_packet_info_for_interface(self, src_index, info):
- """
- Search the packet info list for the next packet info with same source
- interface index
-
- :param src_index: source interface index to search for
- :param info: packet info - where to start the search
- :returns: packet info or None
-
- """
- while True:
- info = self.get_next_packet_info(info)
- if info is None:
- return None
- if info.src == src_index:
- return info
-
- def get_next_packet_info_for_interface2(self, src_index, dst_index, info):
- """
- Search the packet info list for the next packet info with same source
- and destination interface indexes
-
- :param src_index: source interface index to search for
- :param dst_index: destination interface index to search for
- :param info: packet info - where to start the search
- :returns: packet info or None
-
- """
- while True:
- info = self.get_next_packet_info_for_interface(src_index, info)
- if info is None:
- return None
- if info.dst == dst_index:
- return info
-
def assert_equal(self, real_value, expected_value, name_or_class=None):
if name_or_class is None:
self.assertEqual(real_value, expected_value)
@@ -1152,25 +961,6 @@ class VppTestCase(CPUInterface, unittest.TestCase):
)
self.assertTrue(expected_min <= real_value <= expected_max, msg)
- def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False):
- self.assert_checksum_valid(
- received_packet, "IP", ignore_zero_checksum=ignore_zero_checksum
- )
-
- def assert_tcp_checksum_valid(self, received_packet, ignore_zero_checksum=False):
- self.assert_checksum_valid(
- received_packet, "TCP", ignore_zero_checksum=ignore_zero_checksum
- )
-
- def assert_udp_checksum_valid(self, received_packet, ignore_zero_checksum=True):
- self.assert_checksum_valid(
- received_packet, "UDP", ignore_zero_checksum=ignore_zero_checksum
- )
-
- def assert_icmp_checksum_valid(self, received_packet):
- self.assert_checksum_valid(received_packet, "ICMP")
- self.assert_embedded_icmp_checksum_valid(received_packet)
-
def get_counter(self, counter):
if counter.startswith("/"):
counter_value = self.statistics.get_counter(counter)
@@ -1196,12 +986,6 @@ class VppTestCase(CPUInterface, unittest.TestCase):
)
self.assert_equal(c, expected_value, "counter `%s[%s]'" % (counter, index))
- def assert_packet_counter_equal(self, counter, expected_value):
- counter_value = self.get_counter(counter)
- self.assert_equal(
- counter_value, expected_value, "packet counter `%s'" % counter
- )
-
def assert_error_counter_equal(self, counter, expected_value):
counter_value = self.statistics[counter].sum()
self.assert_equal(counter_value, expected_value, "error counter `%s'" % counter)
@@ -1242,11 +1026,6 @@ class VppTestCase(CPUInterface, unittest.TestCase):
self.logger.debug("Moving VPP time by %s (%s)", timeout, remark)
self.vapi.cli("set clock adjust %s" % timeout)
- def pg_send(self, intf, pkts, worker=None, trace=True):
- intf.add_stream(pkts, worker=worker)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start(trace=trace)
-
def snapshot_stats(self, stats_diff):
"""Return snapshot of interesting stats based on diff dictionary."""
stats_snapshot = {}
@@ -1287,70 +1066,6 @@ class VppTestCase(CPUInterface, unittest.TestCase):
f"Couldn't sum counter: {cntr} on sw_if_index: {sw_if_index}"
) from e
- def send_and_assert_no_replies(
- self, intf, pkts, remark="", timeout=None, stats_diff=None, trace=True, msg=None
- ):
- if stats_diff:
- stats_snapshot = self.snapshot_stats(stats_diff)
-
- self.pg_send(intf, pkts)
-
- try:
- if not timeout:
- timeout = 1
- for i in self.pg_interfaces:
- i.assert_nothing_captured(timeout=timeout, remark=remark)
- timeout = 0.1
- finally:
- if trace:
- if msg:
- self.logger.debug(f"send_and_assert_no_replies: {msg}")
- self.logger.debug(self.vapi.cli("show trace"))
-
- if stats_diff:
- self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
-
- def send_and_expect_load_balancing(
- self, input, pkts, outputs, worker=None, trace=True
- ):
- self.pg_send(input, pkts, worker=worker, trace=trace)
- rxs = []
- for oo in outputs:
- rx = oo._get_capture(1)
- self.assertNotEqual(0, len(rx))
- rxs.append(rx)
- if trace:
- self.logger.debug(self.vapi.cli("show trace"))
- return rxs
-
- def send_and_expect_some(self, intf, pkts, output, worker=None, trace=True):
- self.pg_send(intf, pkts, worker=worker, trace=trace)
- rx = output._get_capture(1)
- if trace:
- self.logger.debug(self.vapi.cli("show trace"))
- self.assertTrue(len(rx) > 0)
- self.assertTrue(len(rx) < len(pkts))
- return rx
-
- def send_and_expect_only(self, intf, pkts, output, timeout=None, stats_diff=None):
- if stats_diff:
- stats_snapshot = self.snapshot_stats(stats_diff)
-
- self.pg_send(intf, pkts)
- rx = output.get_capture(len(pkts))
- outputs = [output]
- if not timeout:
- timeout = 1
- for i in self.pg_interfaces:
- if i not in outputs:
- i.assert_nothing_captured(timeout=timeout)
- timeout = 0.1
-
- if stats_diff:
- self.compare_stats_with_snapshot(stats_diff, stats_snapshot)
-
- return rx
-
def get_testcase_doc_name(test):
return getdoc(test.__class__).splitlines()[0]
@@ -1364,6 +1079,14 @@ def get_test_description(descriptions, test):
return str(test)
+def get_failed_testcase_linkname(failed_dir, testcase_dirname):
+ return os.path.join(failed_dir, f"{testcase_dirname}-FAILED")
+
+
+def get_testcase_dirname(testcase_class_name):
+ return f"vpp-unittest-{testcase_class_name}"
+
+
class TestCaseInfo(object):
def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path):
self.logger = logger
@@ -1409,6 +1132,17 @@ class VppTestResult(unittest.TestResult):
self.runner = runner
self.printed = []
+ def decodePcapFiles(self, test, when_configured=False):
+ if when_configured == False or config.decode_pcaps == True:
+ if hasattr(test, "pg_interfaces") and len(test.pg_interfaces) > 0:
+ testcase_dir = os.path.dirname(test.pg_interfaces[0].out_path)
+ test.pg_interfaces[0].decode_pcap_files(
+ testcase_dir, f"suite{test.__class__.__name__}"
+ )
+ test.pg_interfaces[0].decode_pcap_files(
+ testcase_dir, test._testMethodName
+ )
+
def addSuccess(self, test):
"""
Record a test succeeded result
@@ -1417,6 +1151,7 @@ class VppTestResult(unittest.TestResult):
"""
self.log_result("addSuccess", test)
+ self.decodePcapFiles(test, when_configured=True)
unittest.TestResult.addSuccess(self, test)
self.result_string = colorize("OK", GREEN)
self.result_code = TestResultCode.PASS
@@ -1424,6 +1159,7 @@ class VppTestResult(unittest.TestResult):
def addExpectedFailure(self, test, err):
self.log_result("addExpectedFailure", test, err)
+ self.decodePcapFiles(test)
super().addExpectedFailure(test, err)
self.result_string = colorize("FAIL", GREEN)
self.result_code = TestResultCode.EXPECTED_FAIL
@@ -1431,6 +1167,7 @@ class VppTestResult(unittest.TestResult):
def addUnexpectedSuccess(self, test):
self.log_result("addUnexpectedSuccess", test)
+ self.decodePcapFiles(test, when_configured=True)
super().addUnexpectedSuccess(test)
self.result_string = colorize("OK", RED)
self.result_code = TestResultCode.UNEXPECTED_PASS
@@ -1458,9 +1195,8 @@ class VppTestResult(unittest.TestResult):
if self.current_test_case_info:
try:
failed_dir = config.failed_dir
- link_path = os.path.join(
- failed_dir,
- "%s-FAILED" % os.path.basename(self.current_test_case_info.tempdir),
+ link_path = get_failed_testcase_linkname(
+ failed_dir, os.path.basename(self.current_test_case_info.tempdir)
)
self.current_test_case_info.logger.debug(
@@ -1517,6 +1253,7 @@ class VppTestResult(unittest.TestResult):
error_type_str = colorize("ERROR", RED)
else:
raise Exception(f"Unexpected result code {result_code}")
+ self.decodePcapFiles(test)
unittest_fn(self, test, err)
if self.current_test_case_info:
@@ -1727,7 +1464,7 @@ class VppTestRunner(unittest.TextTestRunner):
**kwargs,
):
# ignore stream setting here, use hard-coded stdout to be in sync
- # with prints from VppTestCase methods ...
+ # with prints from VppAsfTestCase methods ...
super(VppTestRunner, self).__init__(
sys.stdout, descriptions, verbosity, failfast, buffer, resultclass, **kwargs
)
diff --git a/test/asf/lisp.py b/test/asf/lisp.py
deleted file mode 100644
index 9ebc86a35e3..00000000000
--- a/test/asf/lisp.py
+++ /dev/null
@@ -1,385 +0,0 @@
-import socket
-from ipaddress import ip_network
-
-from vpp_object import VppObject
-
-
-class VppLispLocatorSet(VppObject):
- """Represents LISP locator set in VPP"""
-
- def __init__(self, test, ls_name):
- self._test = test
- self._ls_name = ls_name
-
- @property
- def test(self):
- return self._test
-
- @property
- def ls_name(self):
- return self._ls_name
-
- def add_vpp_config(self):
- self.test.vapi.lisp_add_del_locator_set(locator_set_name=self._ls_name)
- self._test.registry.register(self, self.test.logger)
-
- def get_lisp_locator_sets_dump_entry(self):
- result = self.test.vapi.lisp_locator_set_dump()
- for ls in result:
- if ls.ls_name.strip("\x00") == self._ls_name:
- return ls
- return None
-
- def query_vpp_config(self):
- return self.get_lisp_locator_sets_dump_entry() is not None
-
- def remove_vpp_config(self):
- self.test.vapi.lisp_add_del_locator_set(
- locator_set_name=self._ls_name, is_add=0
- )
-
- def object_id(self):
- return "lisp-locator-set-%s" % self._ls_name
-
-
-class VppLispLocator(VppObject):
- """Represents LISP locator in VPP"""
-
- def __init__(self, test, sw_if_index, ls_name, priority=1, weight=1):
- self._test = test
- self._sw_if_index = sw_if_index
- self._ls_name = ls_name
- self._priority = priority
- self._weight = weight
-
- @property
- def test(self):
- """Test which created this locator"""
- return self._test
-
- @property
- def ls_name(self):
- """Locator set name"""
- return self._ls_name
-
- @property
- def sw_if_index(self):
- return self._sw_if_index
-
- @property
- def priority(self):
- return self._priority
-
- @property
- def weight(self):
- return self._weight
-
- def add_vpp_config(self):
- self.test.vapi.lisp_add_del_locator(
- locator_set_name=self._ls_name,
- sw_if_index=self._sw_if_index,
- priority=self._priority,
- weight=self._weight,
- )
- self._test.registry.register(self, self.test.logger)
-
- def get_lisp_locator_dump_entry(self):
- locators = self.test.vapi.lisp_locator_dump(
- is_index_set=0, ls_name=self._ls_name
- )
- for locator in locators:
- if locator.sw_if_index == self._sw_if_index:
- return locator
- return None
-
- def query_vpp_config(self):
- locator = self.get_lisp_locator_dump_entry()
- return locator is not None
-
- def remove_vpp_config(self):
- self.test.vapi.lisp_add_del_locator(
- locator_set_name=self._ls_name,
- sw_if_index=self._sw_if_index,
- priority=self._priority,
- weight=self._weight,
- is_add=0,
- )
- self._test.registry.register(self, self.test.logger)
-
- def object_id(self):
- return "lisp-locator-%s-%d" % (self._ls_name, self._sw_if_index)
-
-
-class LispEIDType:
- PREFIX = 0
- MAC = 1
- NSH = 2
-
-
-class LispKeyIdType:
- NONE = 0
- SHA1 = 1
- SHA256 = 2
-
-
-class LispEID:
- """Lisp endpoint identifier"""
-
- def __init__(self, eid):
- self.eid = eid
- self._type = -1
-
- # find out whether EID is ip prefix, or MAC
- try:
- self.prefix = ip_network(self.eid)
- self._type = LispEIDType.PREFIX
- return
- except ValueError:
- if self.eid.count(":") == 5: # MAC address
- self.mac = self.eid
- self._type = LispEIDType.MAC
- return
- raise Exception("Unsupported EID format {!s}!".format(eid))
-
- @property
- def eid_type(self):
- return self._type
-
- @property
- def address(self):
- if self.eid_type == LispEIDType.PREFIX:
- return self.prefix
- elif self.eid_type == LispEIDType.MAC:
- return self.mac
- elif self.eid_type == LispEIDType.NSH:
- return Exception("Unimplemented")
-
- @property
- def packed(self):
- if self.eid_type == LispEIDType.PREFIX:
- return {"type": self._type, "address": {"prefix": self.prefix}}
- elif self.eid_type == LispEIDType.MAC:
- return {"type": self._type, "address": {"mac": self.mac}}
- elif self.eid_type == LispEIDType.NSH:
- return Exception("Unimplemented")
-
-
-class LispKey:
- """Lisp Key"""
-
- def __init__(self, key_type, key):
- self._key_type = key_type
- self._key = key
-
- @property
- def packed(self):
- return {"id": self._key_type, "key": self._key}
-
-
-class VppLispMapping(VppObject):
- """Represents common features for remote and local LISP mapping in VPP"""
-
- def __init__(self, test, eid, vni=0, priority=1, weight=1):
- self._eid = LispEID(eid)
- self._test = test
- self._priority = priority
- self._weight = weight
- self._vni = vni
-
- @property
- def test(self):
- return self._test
-
- @property
- def vni(self):
- return self._vni
-
- @property
- def eid(self):
- return self._eid
-
- @property
- def priority(self):
- return self._priority
-
- @property
- def weight(self):
- return self._weight
-
- def get_lisp_mapping_dump_entry(self):
- return self.test.vapi.lisp_eid_table_dump(
- eid_set=1, vni=self._vni, eid=self._eid.packed
- )
-
- def query_vpp_config(self):
- mapping = self.get_lisp_mapping_dump_entry()
- return mapping
-
- def object_id(self):
- return "lisp-mapping-[%s]-%s-%s-%s" % (
- self.vni,
- self.eid.address,
- self.priority,
- self.weight,
- )
-
-
-class VppLocalMapping(VppLispMapping):
- """LISP Local mapping"""
-
- def __init__(
- self,
- test,
- eid,
- ls_name,
- vni=0,
- priority=1,
- weight=1,
- key_id=LispKeyIdType.NONE,
- key="",
- ):
- super(VppLocalMapping, self).__init__(test, eid, vni, priority, weight)
- self._ls_name = ls_name
- self._key = LispKey(key_id, key)
-
- @property
- def ls_name(self):
- return self._ls_name
-
- @property
- def key_id(self):
- return self._key_id
-
- @property
- def key(self):
- return self._key
-
- def add_vpp_config(self):
- self.test.vapi.lisp_add_del_local_eid(
- locator_set_name=self._ls_name,
- eid=self._eid.packed,
- vni=self._vni,
- key=self._key.packed,
- )
- self._test.registry.register(self, self.test.logger)
-
- def remove_vpp_config(self):
- self.test.vapi.lisp_add_del_local_eid(
- locator_set_name=self._ls_name,
- eid=self._eid.packed,
- vni=self._vni,
- is_add=0,
- )
-
- def object_id(self):
- return "lisp-eid-local-mapping-%s[%d]" % (self._eid.address, self._vni)
-
-
-class LispRemoteLocator:
- def __init__(self, addr, priority=1, weight=1):
- self.addr = addr
- self.priority = priority
- self.weight = weight
-
- @property
- def packed(self):
- return {
- "priority": self.priority,
- "weight": self.weight,
- "ip_address": self.addr,
- }
-
-
-class VppRemoteMapping(VppLispMapping):
- def __init__(self, test, eid, rlocs=None, vni=0, priority=1, weight=1):
- super(VppRemoteMapping, self).__init__(test, eid, vni, priority, weight)
- self._rlocs = rlocs
-
- @property
- def rlocs(self):
- rlocs = []
- for rloc in self._rlocs:
- rlocs.append(rloc.packed)
- return rlocs
-
- def add_vpp_config(self):
- self.test.vapi.lisp_add_del_remote_mapping(
- rlocs=self.rlocs,
- deid=self._eid.packed,
- vni=self._vni,
- rloc_num=len(self._rlocs),
- )
- self._test.registry.register(self, self.test.logger)
-
- def remove_vpp_config(self):
- self.test.vapi.lisp_add_del_remote_mapping(
- deid=self._eid.packed, vni=self._vni, is_add=0, rloc_num=0
- )
-
- def object_id(self):
- return "lisp-eid-remote-mapping-%s[%d]" % (self._eid.address, self._vni)
-
-
-class VppLispAdjacency(VppObject):
- """Represents LISP adjacency in VPP"""
-
- def __init__(self, test, leid, reid, vni=0):
- self._leid = LispEID(leid)
- self._reid = LispEID(reid)
- if self._leid.eid_type != self._reid.eid_type:
- raise Exception("remote and local EID are different types!")
- self._vni = vni
- self._test = test
-
- @property
- def test(self):
- return self._test
-
- @property
- def leid(self):
- return self._leid
-
- @property
- def reid(self):
- return self._reid
-
- @property
- def vni(self):
- return self._vni
-
- def add_vpp_config(self):
- self.test.vapi.lisp_add_del_adjacency(
- leid=self._leid.packed, reid=self._reid.packed, vni=self._vni
- )
- self._test.registry.register(self, self.test.logger)
-
- @staticmethod
- def eid_equal(eid, eid_api):
- if eid.eid_type != eid_api.type:
- return False
-
- if eid_api.type == LispEIDType.PREFIX:
- if eid.address.prefixlen != eid_api.address.prefix.prefixlen:
- return False
-
- if eid.address != eid_api.address:
- return False
-
- return True
-
- def query_vpp_config(self):
- res = self.test.vapi.lisp_adjacencies_get(vni=self._vni)
- for adj in res.adjacencies:
- if self.eid_equal(self._leid, adj.leid) and self.eid_equal(
- self._reid, adj.reid
- ):
- return True
- return False
-
- def remove_vpp_config(self):
- self.test.vapi.lisp_add_del_adjacency(
- leid=self._leid.packed, reid=self._reid.packed, vni=self._vni, is_add=0
- )
-
- def object_id(self):
- return "lisp-adjacency-%s-%s[%d]" % (self._leid, self._reid, self._vni)
diff --git a/test/asf/remote_test.py b/test/asf/remote_test.py
deleted file mode 100644
index 7743c7782e4..00000000000
--- a/test/asf/remote_test.py
+++ /dev/null
@@ -1,431 +0,0 @@
-#!/usr/bin/env python3
-
-import inspect
-import os
-import reprlib
-import unittest
-from asfframework import VppTestCase
-from multiprocessing import Process, Pipe
-from pickle import dumps
-import sys
-
-from enum import IntEnum, IntFlag
-
-
-class SerializableClassCopy:
- """
- Empty class used as a basis for a serializable copy of another class.
- """
-
- pass
-
- def __repr__(self):
- return "<SerializableClassCopy dict=%s>" % self.__dict__
-
-
-class RemoteClassAttr:
- """
- Wrapper around attribute of a remotely executed class.
- """
-
- def __init__(self, remote, attr):
- self._path = [attr] if attr else []
- self._remote = remote
-
- def path_to_str(self):
- return ".".join(self._path)
-
- def get_remote_value(self):
- return self._remote._remote_exec(RemoteClass.GET, self.path_to_str())
-
- def __repr__(self):
- return self._remote._remote_exec(RemoteClass.REPR, self.path_to_str())
-
- def __str__(self):
- return self._remote._remote_exec(RemoteClass.STR, self.path_to_str())
-
- def __getattr__(self, attr):
- if attr[0] == "_":
- if not (attr.startswith("__") and attr.endswith("__")):
- raise AttributeError("tried to get private attribute: %s ", attr)
- self._path.append(attr)
- return self
-
- def __setattr__(self, attr, val):
- if attr[0] == "_":
- if not (attr.startswith("__") and attr.endswith("__")):
- super(RemoteClassAttr, self).__setattr__(attr, val)
- return
- self._path.append(attr)
- self._remote._remote_exec(RemoteClass.SETATTR, self.path_to_str(), value=val)
-
- def __call__(self, *args, **kwargs):
- return self._remote._remote_exec(
- RemoteClass.CALL, self.path_to_str(), *args, **kwargs
- )
-
-
-class RemoteClass(Process):
- """
- This class can wrap around and adapt the interface of another class,
- and then delegate its execution to a newly forked child process.
-
- Usage:
-
- #. Create a remotely executed instance of MyClass. ::
-
- object = RemoteClass(MyClass, arg1='foo', arg2='bar')
- object.start_remote()
-
- #. Access the object normally as if it was an instance of your
- class. ::
-
- object.my_attribute = 20
- print object.my_attribute
- print object.my_method(object.my_attribute)
- object.my_attribute.nested_attribute = 'test'
-
- #. If you need the value of a remote attribute, use .get_remote_value
- method. This method is automatically called when needed in the
- context of a remotely executed class. E.g. ::
-
- if (object.my_attribute.get_remote_value() > 20):
- object.my_attribute2 = object.my_attribute
-
- #. Destroy the instance. ::
-
- object.quit_remote()
- object.terminate()
- """
-
- GET = 0 # Get attribute remotely
- CALL = 1 # Call method remotely
- SETATTR = 2 # Set attribute remotely
- REPR = 3 # Get representation of a remote object
- STR = 4 # Get string representation of a remote object
- QUIT = 5 # Quit remote execution
-
- PIPE_PARENT = 0 # Parent end of the pipe
- PIPE_CHILD = 1 # Child end of the pipe
-
- DEFAULT_TIMEOUT = 2 # default timeout for an operation to execute
-
- def __init__(self, cls, *args, **kwargs):
- super(RemoteClass, self).__init__()
- self._cls = cls
- self._args = args
- self._kwargs = kwargs
- self._timeout = RemoteClass.DEFAULT_TIMEOUT
- self._pipe = Pipe() # pipe for input/output arguments
-
- def __repr__(self):
- return reprlib.repr(RemoteClassAttr(self, None))
-
- def __str__(self):
- return str(RemoteClassAttr(self, None))
-
- def __call__(self, *args, **kwargs):
- return self.RemoteClassAttr(self, None)()
-
- def __getattr__(self, attr):
- if attr[0] == "_" or not self.is_alive():
- if not (attr.startswith("__") and attr.endswith("__")):
- if hasattr(super(RemoteClass, self), "__getattr__"):
- return super(RemoteClass, self).__getattr__(attr)
- raise AttributeError("missing: %s", attr)
- return RemoteClassAttr(self, attr)
-
- def __setattr__(self, attr, val):
- if attr[0] == "_" or not self.is_alive():
- if not (attr.startswith("__") and attr.endswith("__")):
- super(RemoteClass, self).__setattr__(attr, val)
- return
- setattr(RemoteClassAttr(self, None), attr, val)
-
- def _remote_exec(self, op, path=None, *args, **kwargs):
- """
- Execute given operation on a given, possibly nested, member remotely.
- """
- # automatically resolve remote objects in the arguments
- mutable_args = list(args)
- for i, val in enumerate(mutable_args):
- if isinstance(val, RemoteClass) or isinstance(val, RemoteClassAttr):
- mutable_args[i] = val.get_remote_value()
- args = tuple(mutable_args)
- for key, val in kwargs.items():
- if isinstance(val, RemoteClass) or isinstance(val, RemoteClassAttr):
- kwargs[key] = val.get_remote_value()
- # send request
- args = self._make_serializable(args)
- kwargs = self._make_serializable(kwargs)
- self._pipe[RemoteClass.PIPE_PARENT].send((op, path, args, kwargs))
- timeout = self._timeout
- # adjust timeout specifically for the .sleep method
- if path is not None and path.split(".")[-1] == "sleep":
- if args and isinstance(args[0], (long, int)):
- timeout += args[0]
- elif "timeout" in kwargs:
- timeout += kwargs["timeout"]
- if not self._pipe[RemoteClass.PIPE_PARENT].poll(timeout):
- return None
- try:
- rv = self._pipe[RemoteClass.PIPE_PARENT].recv()
- rv = self._deserialize(rv)
- return rv
- except EOFError:
- return None
-
- def _get_local_object(self, path):
- """
- Follow the path to obtain a reference on the addressed nested attribute
- """
- obj = self._instance
- for attr in path:
- obj = getattr(obj, attr)
- return obj
-
- def _get_local_value(self, path):
- try:
- return self._get_local_object(path)
- except AttributeError:
- return None
-
- def _call_local_method(self, path, *args, **kwargs):
- try:
- method = self._get_local_object(path)
- return method(*args, **kwargs)
- except AttributeError:
- return None
-
- def _set_local_attr(self, path, value):
- try:
- obj = self._get_local_object(path[:-1])
- setattr(obj, path[-1], value)
- except AttributeError:
- pass
- return None
-
- def _get_local_repr(self, path):
- try:
- obj = self._get_local_object(path)
- return reprlib.repr(obj)
- except AttributeError:
- return None
-
- def _get_local_str(self, path):
- try:
- obj = self._get_local_object(path)
- return str(obj)
- except AttributeError:
- return None
-
- def _serializable(self, obj):
- """Test if the given object is serializable"""
- try:
- dumps(obj)
- return True
- except:
- return False
-
- def _make_obj_serializable(self, obj):
- """
- Make a serializable copy of an object.
- Members which are difficult/impossible to serialize are stripped.
- """
- if self._serializable(obj):
- return obj # already serializable
-
- copy = SerializableClassCopy()
-
- """
- Dictionaries can hold complex values, so we split keys and values into
- separate lists and serialize them individually.
- """
- if type(obj) is dict:
- copy.type = type(obj)
- copy.k_list = list()
- copy.v_list = list()
- for k, v in obj.items():
- copy.k_list.append(self._make_serializable(k))
- copy.v_list.append(self._make_serializable(v))
- return copy
-
- # copy at least serializable attributes and properties
- for name, member in inspect.getmembers(obj):
- # skip private members and non-writable dunder methods.
- if name[0] == "_":
- if name in ["__weakref__"]:
- continue
- if name in ["__dict__"]:
- continue
- if not (name.startswith("__") and name.endswith("__")):
- continue
- if callable(member) and not isinstance(member, property):
- continue
- if not self._serializable(member):
- member = self._make_serializable(member)
- setattr(copy, name, member)
- return copy
-
- def _make_serializable(self, obj):
- """
- Make a serializable copy of an object or a list/tuple of objects.
- Members which are difficult/impossible to serialize are stripped.
- """
- if (type(obj) is list) or (type(obj) is tuple):
- rv = []
- for item in obj:
- rv.append(self._make_serializable(item))
- if type(obj) is tuple:
- rv = tuple(rv)
- return rv
- elif isinstance(obj, IntEnum) or isinstance(obj, IntFlag):
- return obj.value
- else:
- return self._make_obj_serializable(obj)
-
- def _deserialize_obj(self, obj):
- if hasattr(obj, "type"):
- if obj.type is dict:
- _obj = dict()
- for k, v in zip(obj.k_list, obj.v_list):
- _obj[self._deserialize(k)] = self._deserialize(v)
- return _obj
- return obj
-
- def _deserialize(self, obj):
- if (type(obj) is list) or (type(obj) is tuple):
- rv = []
- for item in obj:
- rv.append(self._deserialize(item))
- if type(obj) is tuple:
- rv = tuple(rv)
- return rv
- else:
- return self._deserialize_obj(obj)
-
- def start_remote(self):
- """Start remote execution"""
- self.start()
-
- def quit_remote(self):
- """Quit remote execution"""
- self._remote_exec(RemoteClass.QUIT, None)
-
- def get_remote_value(self):
- """Get value of a remotely held object"""
- return RemoteClassAttr(self, None).get_remote_value()
-
- def set_request_timeout(self, timeout):
- """Change request timeout"""
- self._timeout = timeout
-
- def run(self):
- """
- Create instance of the wrapped class and execute operations
- on it as requested by the parent process.
- """
- self._instance = self._cls(*self._args, **self._kwargs)
- while True:
- try:
- rv = None
- # get request from the parent process
- (op, path, args, kwargs) = self._pipe[RemoteClass.PIPE_CHILD].recv()
- args = self._deserialize(args)
- kwargs = self._deserialize(kwargs)
- path = path.split(".") if path else []
- if op == RemoteClass.GET:
- rv = self._get_local_value(path)
- elif op == RemoteClass.CALL:
- rv = self._call_local_method(path, *args, **kwargs)
- elif op == RemoteClass.SETATTR and "value" in kwargs:
- self._set_local_attr(path, kwargs["value"])
- elif op == RemoteClass.REPR:
- rv = self._get_local_repr(path)
- elif op == RemoteClass.STR:
- rv = self._get_local_str(path)
- elif op == RemoteClass.QUIT:
- break
- else:
- continue
- # send return value
- if not self._serializable(rv):
- rv = self._make_serializable(rv)
- self._pipe[RemoteClass.PIPE_CHILD].send(rv)
- except EOFError:
- break
- self._instance = None # destroy the instance
-
-
-@unittest.skip("Remote Vpp Test Case Class")
-class RemoteVppTestCase(VppTestCase):
- """Re-use VppTestCase to create remote VPP segment
-
- In your test case::
-
- @classmethod
- def setUpClass(cls):
- # fork new process before client connects to VPP
- cls.remote_test = RemoteClass(RemoteVppTestCase)
-
- # start remote process
- cls.remote_test.start_remote()
-
- # set up your test case
- super(MyTestCase, cls).setUpClass()
-
- # set up remote test
- cls.remote_test.setUpClass(cls.tempdir)
-
- @classmethod
- def tearDownClass(cls):
- # tear down remote test
- cls.remote_test.tearDownClass()
-
- # stop remote process
- cls.remote_test.quit_remote()
-
- # tear down your test case
- super(MyTestCase, cls).tearDownClass()
- """
-
- def __init__(self):
- super(RemoteVppTestCase, self).__init__("emptyTest")
-
- # Note: __del__ is a 'Finalizer" not a 'Destructor'.
- # https://docs.python.org/3/reference/datamodel.html#object.__del__
- def __del__(self):
- if hasattr(self, "vpp"):
- self.vpp.poll()
- if self.vpp.returncode is None:
- self.vpp.terminate()
- self.vpp.communicate()
-
- @classmethod
- def setUpClass(cls, tempdir):
- # disable features unsupported in remote VPP
- orig_env = dict(os.environ)
- if "STEP" in os.environ:
- del os.environ["STEP"]
- if "DEBUG" in os.environ:
- del os.environ["DEBUG"]
- cls.tempdir_prefix = os.path.basename(tempdir) + "/"
- super(RemoteVppTestCase, cls).setUpClass()
- os.environ = orig_env
-
- @classmethod
- def tearDownClass(cls):
- super(RemoteVppTestCase, cls).tearDownClass()
-
- @unittest.skip("Empty test")
- def emptyTest(self):
- """Do nothing"""
- pass
-
- def setTestFunctionInfo(self, name, doc):
- """
- Store the name and documentation string of currently executed test
- in the main VPP for logging purposes.
- """
- self._testMethodName = name
- self._testMethodDoc = doc
diff --git a/test/asf/test_adl.py b/test/asf/test_adl.py
index bd1602ca8bb..7e5ca8dcbe3 100644
--- a/test/asf/test_adl.py
+++ b/test/asf/test_adl.py
@@ -2,11 +2,10 @@
import unittest
-from asfframework import VppTestCase, VppTestRunner
-from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
+from asfframework import VppAsfTestCase, VppTestRunner
-class TestAdl(VppTestCase):
+class TestAdl(VppAsfTestCase):
"""Allow/Deny Plugin Unit Test Cases"""
@classmethod
diff --git a/test/asf/test_api_client.py b/test/asf/test_api_client.py
index 97744c6ba1b..3f0fc8a1020 100644
--- a/test/asf/test_api_client.py
+++ b/test/asf/test_api_client.py
@@ -2,11 +2,10 @@
import unittest
-from asfframework import VppTestCase, VppTestRunner
-from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
+from asfframework import VppAsfTestCase, VppTestRunner
-class TestAPIClient(VppTestCase):
+class TestAPIClient(VppAsfTestCase):
"""API Internal client Test Cases"""
def test_client_unittest(self):
diff --git a/test/asf/test_api_trace.py b/test/asf/test_api_trace.py
index e38b81a4c7d..8776a79f0ac 100644
--- a/test/asf/test_api_trace.py
+++ b/test/asf/test_api_trace.py
@@ -1,12 +1,10 @@
-import os
import unittest
-from asfframework import VppTestCase, VppTestRunner
-from vpp_papi import VppEnum
+from asfframework import VppAsfTestCase, VppTestRunner
import json
import shutil
-class TestJsonApiTrace(VppTestCase):
+class TestJsonApiTrace(VppAsfTestCase):
"""JSON API trace related tests"""
@classmethod
diff --git a/test/asf/test_bihash.py b/test/asf/test_bihash.py
index 24639bd7a3b..b7df894be05 100644
--- a/test/asf/test_bihash.py
+++ b/test/asf/test_bihash.py
@@ -3,11 +3,10 @@
import unittest
from config import config
-from asfframework import VppTestCase, VppTestRunner
-from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
+from asfframework import VppAsfTestCase, VppTestRunner
-class TestBihash(VppTestCase):
+class TestBihash(VppAsfTestCase):
"""Bihash Test Cases"""
@classmethod
diff --git a/test/asf/test_buffers.py b/test/asf/test_buffers.py
index b3a2b6d3d69..d22326f172a 100644
--- a/test/asf/test_buffers.py
+++ b/test/asf/test_buffers.py
@@ -1,9 +1,9 @@
#!/usr/bin/env python3
-from asfframework import VppTestCase
+from asfframework import VppAsfTestCase
-class TestBuffers(VppTestCase):
+class TestBuffers(VppAsfTestCase):
"""Buffer C Unit Tests"""
@classmethod
diff --git a/test/asf/test_cli.py b/test/asf/test_cli.py
index 808497f63d0..25ce3330d54 100644
--- a/test/asf/test_cli.py
+++ b/test/asf/test_cli.py
@@ -1,16 +1,14 @@
#!/usr/bin/env python3
"""CLI functional tests"""
-import datetime
-import time
import unittest
from vpp_papi import VPPIOError
-from asfframework import VppTestCase, VppTestRunner
+from asfframework import VppAsfTestCase, VppTestRunner
-class TestCLI(VppTestCase):
+class TestCLI(VppAsfTestCase):
"""CLI Test Case"""
maxDiff = None
@@ -50,7 +48,7 @@ class TestCLI(VppTestCase):
self.assertEqual(rv.retval, 0)
-class TestCLIExtendedVapiTimeout(VppTestCase):
+class TestCLIExtendedVapiTimeout(VppAsfTestCase):
maxDiff = None
@classmethod
diff --git a/test/asf/test_counters.py b/test/asf/test_counters.py
index d3fc56a52c0..086189ae517 100644
--- a/test/asf/test_counters.py
+++ b/test/asf/test_counters.py
@@ -1,11 +1,10 @@
#!/usr/bin/env python3
-from asfframework import VppTestCase
-from asfframework import tag_fixme_vpp_workers
+from asfframework import VppAsfTestCase, tag_fixme_vpp_workers
@tag_fixme_vpp_workers
-class TestCounters(VppTestCase):
+class TestCounters(VppAsfTestCase):
"""Counters C Unit Tests"""
@classmethod
diff --git a/test/asf/test_crypto.py b/test/asf/test_crypto.py
index f39cb46470e..56c96b69575 100644
--- a/test/asf/test_crypto.py
+++ b/test/asf/test_crypto.py
@@ -2,10 +2,10 @@
import unittest
-from asfframework import VppTestCase, VppTestRunner
+from asfframework import VppAsfTestCase, VppTestRunner
-class TestCrypto(VppTestCase):
+class TestCrypto(VppAsfTestCase):
"""Crypto Test Case"""
@classmethod
diff --git a/test/asf/test_endian.py b/test/asf/test_endian.py
index 4509ad86133..9caed0efc4a 100644
--- a/test/asf/test_endian.py
+++ b/test/asf/test_endian.py
@@ -12,13 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import asfframework
+from asfframework import VppAsfTestCase
import vpp_papi_provider
F64_ONE = 1.0
-class TestEndian(asfframework.VppTestCase):
+class TestEndian(VppAsfTestCase):
"""TestEndian"""
def test_f64_endian_value(self):
diff --git a/test/asf/test_fib.py b/test/asf/test_fib.py
index bbc10d1c178..9d391f57ed1 100644
--- a/test/asf/test_fib.py
+++ b/test/asf/test_fib.py
@@ -2,12 +2,11 @@
import unittest
-from asfframework import tag_fixme_vpp_workers
-from asfframework import VppTestCase, VppTestRunner
+from asfframework import VppAsfTestCase, VppTestRunner, tag_fixme_vpp_workers
@tag_fixme_vpp_workers
-class TestFIB(VppTestCase):
+class TestFIB(VppAsfTestCase):
"""FIB Test Case"""
@classmethod
diff --git a/test/asf/test_http.py b/test/asf/test_http.py
index fd8cb7c506a..64f911c7bfa 100644
--- a/test/asf/test_http.py
+++ b/test/asf/test_http.py
@@ -2,15 +2,12 @@
""" Vpp HTTP tests """
import unittest
-import os
-import subprocess
import http.client
-from asfframework import VppTestCase, VppTestRunner, Worker
-from vpp_devices import VppTAPInterface
+from asfframework import VppAsfTestCase, VppTestRunner
@unittest.skip("Requires root")
-class TestHttpTps(VppTestCase):
+class TestHttpTps(VppAsfTestCase):
"""HTTP test class"""
@classmethod
diff --git a/test/asf/test_http_static.py b/test/asf/test_http_static.py
index 504ffa35236..1d87f4c75bd 100644
--- a/test/asf/test_http_static.py
+++ b/test/asf/test_http_static.py
@@ -1,5 +1,5 @@
from config import config
-from asfframework import VppTestCase, VppTestRunner
+from asfframework import VppAsfTestCase, VppTestRunner
import unittest
import subprocess
import tempfile
@@ -15,7 +15,7 @@ from vpp_qemu_utils import (
"http_static" in config.excluded_plugins, "Exclude HTTP Static Server plugin tests"
)
@unittest.skipIf(config.skip_netns_tests, "netns not available or disabled from cli")
-class TestHttpStaticVapi(VppTestCase):
+class TestHttpStaticVapi(VppAsfTestCase):
"""enable the http static server and send requests [VAPI]"""
@classmethod
@@ -82,7 +82,7 @@ class TestHttpStaticVapi(VppTestCase):
"http_static" in config.excluded_plugins, "Exclude HTTP Static Server plugin tests"
)
@unittest.skipIf(config.skip_netns_tests, "netns not available or disabled from cli")
-class TestHttpStaticCli(VppTestCase):
+class TestHttpStaticCli(VppAsfTestCase):
"""enable the static http server and send requests [CLI]"""
@classmethod
diff --git a/test/asf/test_ipfix_export.py b/test/asf/test_ipfix_export.py
deleted file mode 100644
index be4239edbec..00000000000
--- a/test/asf/test_ipfix_export.py
+++ /dev/null
@@ -1,196 +0,0 @@
-#!/usr/bin/env python3
-from __future__ import print_function
-import binascii
-import random
-import socket
-import unittest
-import time
-import re
-
-from asfframework import VppTestCase
-from vpp_object import VppObject
-from vpp_pg_interface import CaptureTimeoutError
-from vpp_ip_route import VppIpRoute, VppRoutePath
-from ipaddress import ip_address, IPv4Address, IPv6Address
-from socket import AF_INET, AF_INET6
-
-
-class TestIpfixExporter(VppTestCase):
- """Ipfix Exporter Tests"""
-
- def setUp(self):
- super(TestIpfixExporter, self).setUp()
- self.create_pg_interfaces(range(4))
- for i in self.pg_interfaces:
- i.admin_up()
- i.config_ip4()
- i.resolve_arp()
- i.config_ip6()
- i.resolve_ndp()
- i.disable_ipv6_ra()
-
- def tearDown(self):
- super(TestIpfixExporter, self).tearDown()
- for i in self.pg_interfaces:
- i.unconfig_ip4()
- i.unconfig_ip6()
- i.admin_down()
-
- def find_exp_by_collector_addr(self, exporters, addr):
- """Find the exporter in the list of exportes with the given addr"""
-
- for exp in exporters:
- if exp.collector_address == IPv4Address(addr):
- return exp
- return None
-
- def verify_exporter_detail(
- self, exp, collector_addr, src_addr, collector_port=4739, mtu=1400, interval=20
- ):
- self.assertTrue(exp is not None)
- self.assert_equal(exp.collector_address, collector_addr)
- self.assert_equal(exp.src_address, src_addr)
- self.assert_equal(exp.collector_port, collector_port)
- self.assert_equal(exp.path_mtu, mtu)
- self.assert_equal(exp.template_interval, interval)
-
- def test_create_multipe_exporters(self):
- """test that we can create and dump multiple exporters"""
-
- mtu = 1400
- interval = 20
- port = 4739
-
- # Old API - always gives us pool index 0.
- self.vapi.set_ipfix_exporter(
- collector_address=self.pg1.remote_ip4,
- src_address=self.pg0.local_ip4,
- collector_port=4739,
- path_mtu=mtu,
- template_interval=interval,
- )
-
- exporters = self.vapi.ipfix_exporter_dump()
- exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
- self.verify_exporter_detail(
- exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4)
- )
-
- exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get))
- exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
- self.verify_exporter_detail(
- exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4)
- )
-
- # create a 2nd exporter
- self.vapi.ipfix_exporter_create_delete(
- collector_address=self.pg2.remote_ip4,
- src_address=self.pg0.local_ip4,
- collector_port=4739,
- path_mtu=mtu,
- template_interval=interval,
- is_create=True,
- )
-
- exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get))
- self.assertTrue(len(exporters) == 2)
- exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
- self.verify_exporter_detail(
- exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4)
- )
- exp = self.find_exp_by_collector_addr(exporters, self.pg2.remote_ip4)
- self.verify_exporter_detail(
- exp, IPv4Address(self.pg2.remote_ip4), IPv4Address(self.pg0.local_ip4)
- )
-
- # Create a 3rd exporter
- self.vapi.ipfix_exporter_create_delete(
- collector_address=self.pg3.remote_ip4,
- src_address=self.pg0.local_ip4,
- collector_port=4739,
- path_mtu=mtu,
- template_interval=interval,
- is_create=True,
- )
-
- exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get))
- self.assertTrue(len(exporters) == 3)
- exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
- self.verify_exporter_detail(
- exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4)
- )
- exp = self.find_exp_by_collector_addr(exporters, self.pg2.remote_ip4)
- self.verify_exporter_detail(
- exp, IPv4Address(self.pg2.remote_ip4), IPv4Address(self.pg0.local_ip4)
- )
- exp = self.find_exp_by_collector_addr(exporters, self.pg3.remote_ip4)
- self.verify_exporter_detail(
- exp, IPv4Address(self.pg3.remote_ip4), IPv4Address(self.pg0.local_ip4)
- )
-
- # Modify the 2nd exporter.
- self.vapi.ipfix_exporter_create_delete(
- collector_address=self.pg2.remote_ip4,
- src_address=self.pg0.local_ip4,
- collector_port=4739,
- path_mtu=mtu + 1,
- template_interval=interval + 1,
- is_create=True,
- )
-
- exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get))
- self.assertTrue(len(exporters) == 3)
- exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
- self.verify_exporter_detail(
- exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4)
- )
- exp = self.find_exp_by_collector_addr(exporters, self.pg2.remote_ip4)
- self.verify_exporter_detail(
- exp,
- IPv4Address(self.pg2.remote_ip4),
- IPv4Address(self.pg0.local_ip4),
- mtu=mtu + 1,
- interval=interval + 1,
- )
- exp = self.find_exp_by_collector_addr(exporters, self.pg3.remote_ip4)
- self.verify_exporter_detail(
- exp, IPv4Address(self.pg3.remote_ip4), IPv4Address(self.pg0.local_ip4)
- )
-
- # Delete 2nd exporter
- self.vapi.ipfix_exporter_create_delete(
- collector_address=self.pg2.remote_ip4,
- src_address=self.pg0.local_ip4,
- collector_port=4739,
- path_mtu=mtu,
- template_interval=interval,
- is_create=False,
- )
-
- exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get))
- self.assertTrue(len(exporters) == 2)
- exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
- self.verify_exporter_detail(
- exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4)
- )
- exp = self.find_exp_by_collector_addr(exporters, self.pg3.remote_ip4)
- self.verify_exporter_detail(
- exp, IPv4Address(self.pg3.remote_ip4), IPv4Address(self.pg0.local_ip4)
- )
-
- # Delete final exporter (exporter in slot 0 can not be deleted)
- self.vapi.ipfix_exporter_create_delete(
- collector_address=self.pg3.remote_ip4,
- src_address=self.pg0.local_ip4,
- collector_port=4739,
- path_mtu=mtu,
- template_interval=interval,
- is_create=False,
- )
-
- exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get))
- self.assertTrue(len(exporters) == 1)
- exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4)
- self.verify_exporter_detail(
- exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4)
- )
diff --git a/test/asf/test_ipsec_default.py b/test/asf/test_ipsec_default.py
deleted file mode 100644
index 2fefb771c48..00000000000
--- a/test/asf/test_ipsec_default.py
+++ /dev/null
@@ -1,199 +0,0 @@
-import socket
-import unittest
-
-from util import ppp
-from asfframework import VppTestRunner
-from template_ipsec import IpsecDefaultTemplate
-
-"""
-When an IPSec SPD is configured on an interface, any inbound packets
-not matching inbound policies, or outbound packets not matching outbound
-policies, must be dropped by default as per RFC4301.
-
-This test uses simple IPv4 forwarding on interfaces with IPSec enabled
-to check if packets with no matching rules are dropped by default.
-
-The basic setup is a single SPD bound to two interfaces, pg0 and pg1.
-
- ┌────┐ ┌────┐
- │SPD1│ │SPD1│
- ├────┤ ─────> ├────┤
- │PG0 │ │PG1 │
- └────┘ └────┘
-
-First, both inbound and outbound BYPASS policies are configured allowing
-traffic to pass from pg0 -> pg1.
-
-Packets are captured and verified at pg1.
-
-Then either the inbound or outbound policies are removed and we verify
-packets are dropped as expected.
-
-"""
-
-
-class IPSecInboundDefaultDrop(IpsecDefaultTemplate):
- """IPSec: inbound packets drop by default with no matching rule"""
-
- def test_ipsec_inbound_default_drop(self):
- # configure two interfaces and bind the same SPD to both
- self.create_interfaces(2)
- self.spd_create_and_intf_add(1, self.pg_interfaces)
- pkt_count = 5
-
- # catch-all inbound BYPASS policy, all interfaces
- inbound_policy = self.spd_add_rem_policy(
- 1,
- None,
- None,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=10,
- policy_type="bypass",
- all_ips=True,
- )
-
- # outbound BYPASS policy allowing traffic from pg0->pg1
- outbound_policy = self.spd_add_rem_policy(
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- )
-
- # create a packet stream pg0->pg1 + add to pg0
- packets0 = self.create_stream(self.pg0, self.pg1, pkt_count)
- self.pg0.add_stream(packets0)
-
- # with inbound BYPASS rule at pg0, we expect to see forwarded
- # packets on pg1
- self.pg_interfaces[1].enable_capture()
- self.pg_start()
- cap1 = self.pg1.get_capture()
- for packet in cap1:
- try:
- self.logger.debug(ppp("SPD - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
- self.logger.debug("SPD: Num packets: %s", len(cap1.res))
- # verify captures on pg1
- self.verify_capture(self.pg0, self.pg1, cap1)
- # verify policies matched correct number of times
- self.verify_policy_match(pkt_count, inbound_policy)
- self.verify_policy_match(pkt_count, outbound_policy)
-
- # remove inbound catch-all BYPASS rule, traffic should now be dropped
- self.spd_add_rem_policy( # inbound, all interfaces
- 1,
- None,
- None,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=10,
- policy_type="bypass",
- all_ips=True,
- remove=True,
- )
-
- # create another packet stream pg0->pg1 + add to pg0
- packets1 = self.create_stream(self.pg0, self.pg1, pkt_count)
- self.pg0.add_stream(packets1)
- self.pg_interfaces[1].enable_capture()
- self.pg_start()
- # confirm traffic has now been dropped
- self.pg1.assert_nothing_captured(
- remark="inbound pkts with no matching" "rules NOT dropped by default"
- )
- # both policies should not have matched any further packets
- # since we've dropped at input stage
- self.verify_policy_match(pkt_count, outbound_policy)
- self.verify_policy_match(pkt_count, inbound_policy)
-
-
-class IPSecOutboundDefaultDrop(IpsecDefaultTemplate):
- """IPSec: outbound packets drop by default with no matching rule"""
-
- def test_ipsec_inbound_default_drop(self):
- # configure two interfaces and bind the same SPD to both
- self.create_interfaces(2)
- self.spd_create_and_intf_add(1, self.pg_interfaces)
- pkt_count = 5
-
- # catch-all inbound BYPASS policy, all interfaces
- inbound_policy = self.spd_add_rem_policy(
- 1,
- None,
- None,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=10,
- policy_type="bypass",
- all_ips=True,
- )
-
- # outbound BYPASS policy allowing traffic from pg0->pg1
- outbound_policy = self.spd_add_rem_policy(
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- )
-
- # create a packet stream pg0->pg1 + add to pg0
- packets0 = self.create_stream(self.pg0, self.pg1, pkt_count)
- self.pg0.add_stream(packets0)
-
- # with outbound BYPASS rule allowing pg0->pg1, we expect to see
- # forwarded packets on pg1
- self.pg_interfaces[1].enable_capture()
- self.pg_start()
- cap1 = self.pg1.get_capture()
- for packet in cap1:
- try:
- self.logger.debug(ppp("SPD - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
- self.logger.debug("SPD: Num packets: %s", len(cap1.res))
- # verify captures on pg1
- self.verify_capture(self.pg0, self.pg1, cap1)
- # verify policies matched correct number of times
- self.verify_policy_match(pkt_count, inbound_policy)
- self.verify_policy_match(pkt_count, outbound_policy)
-
- # remove outbound rule
- self.spd_add_rem_policy(
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- remove=True,
- )
-
- # create another packet stream pg0->pg1 + add to pg0
- packets1 = self.create_stream(self.pg0, self.pg1, pkt_count)
- self.pg0.add_stream(packets1)
- self.pg_interfaces[1].enable_capture()
- self.pg_start()
- # confirm traffic was dropped and not forwarded
- self.pg1.assert_nothing_captured(
- remark="outbound pkts with no matching rules NOT dropped " "by default"
- )
- # inbound rule should have matched twice the # of pkts now
- self.verify_policy_match(pkt_count * 2, inbound_policy)
- # as dropped at outbound, outbound policy is the same
- self.verify_policy_match(pkt_count, outbound_policy)
-
-
-if __name__ == "__main__":
- unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_ipsec_spd_flow_cache_input.py b/test/asf/test_ipsec_spd_flow_cache_input.py
deleted file mode 100644
index bab130dfa18..00000000000
--- a/test/asf/test_ipsec_spd_flow_cache_input.py
+++ /dev/null
@@ -1,866 +0,0 @@
-from os import remove
-import socket
-import unittest
-
-from util import ppp
-from asfframework import VppTestRunner
-from template_ipsec import SpdFlowCacheTemplate
-
-
-class SpdFlowCacheInbound(SpdFlowCacheTemplate):
- # Override setUpConstants to enable inbound flow cache in config
- @classmethod
- def setUpConstants(cls):
- super(SpdFlowCacheInbound, cls).setUpConstants()
- cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-inbound-spd-flow-cache on", "}"])
- cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
-
-
-class IPSec4SpdTestCaseBypass(SpdFlowCacheInbound):
- """ IPSec/IPv4 inbound: Policy mode test case with flow cache \
- (add bypass)"""
-
- def test_ipsec_spd_inbound_bypass(self):
- # In this test case, packets in IPv4 FWD path are configured
- # to go through IPSec inbound SPD policy lookup.
- #
- # 2 inbound SPD rules (1 HIGH and 1 LOW) are added.
- # - High priority rule action is set to DISCARD.
- # - Low priority rule action is set to BYPASS.
- #
- # Since BYPASS rules take precedence over DISCARD
- # (the order being PROTECT, BYPASS, DISCARD) we expect the
- # BYPASS rule to match and traffic to be correctly forwarded.
- self.create_interfaces(2)
- pkt_count = 5
-
- self.spd_create_and_intf_add(1, [self.pg1, self.pg0])
-
- # create input rules
- # bypass rule should take precedence over discard rule,
- # even though it's lower priority
- policy_0 = self.spd_add_rem_policy( # inbound, priority 10
- 1,
- self.pg1,
- self.pg0,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=10,
- policy_type="bypass",
- )
- policy_1 = self.spd_add_rem_policy( # inbound, priority 15
- 1,
- self.pg1,
- self.pg0,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=15,
- policy_type="discard",
- )
-
- # create output rule so we can capture forwarded packets
- policy_2 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- )
-
- # check flow cache is empty before sending traffic
- self.verify_num_inbound_flow_cache_entries(0)
- # create the packet stream
- packets = self.create_stream(self.pg0, self.pg1, pkt_count)
- # add the stream to the source interface
- self.pg0.add_stream(packets)
- self.pg1.enable_capture()
- self.pg_start()
-
- # check capture on pg1
- capture = self.pg1.get_capture()
- for packet in capture:
- try:
- self.logger.debug(ppp("SPD Add - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
- self.logger.debug("SPD: Num packets: %s", len(capture.res))
-
- # verify captured packets
- self.verify_capture(self.pg0, self.pg1, capture)
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(0, policy_1)
- self.verify_policy_match(pkt_count, policy_2)
- # check input policy has been cached
- self.verify_num_inbound_flow_cache_entries(1)
-
-
-class IPSec4SpdTestCaseDiscard(SpdFlowCacheInbound):
- """ IPSec/IPv4 inbound: Policy mode test case with flow cache \
- (add discard)"""
-
- def test_ipsec_spd_inbound_discard(self):
- # In this test case, packets in IPv4 FWD path are configured
- # to go through IPSec inbound SPD policy lookup.
- # 1 DISCARD rule is added, so all traffic should be dropped.
- self.create_interfaces(2)
- pkt_count = 5
-
- self.spd_create_and_intf_add(1, [self.pg1, self.pg0])
-
- # create input rule
- policy_0 = self.spd_add_rem_policy( # inbound, priority 10
- 1,
- self.pg1,
- self.pg0,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=10,
- policy_type="discard",
- )
-
- # create output rule so we can capture forwarded packets
- policy_1 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- )
-
- # check flow cache is empty before sending traffic
- self.verify_num_inbound_flow_cache_entries(0)
- # create the packet stream
- packets = self.create_stream(self.pg0, self.pg1, pkt_count)
- # add the stream to the source interface
- self.pg0.add_stream(packets)
- self.pg1.enable_capture()
- self.pg_start()
- # inbound discard rule should have dropped traffic
- self.pg1.assert_nothing_captured()
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(0, policy_1)
- # only inbound discard rule should have been cached
- self.verify_num_inbound_flow_cache_entries(1)
-
-
-class IPSec4SpdTestCaseRemoveInbound(SpdFlowCacheInbound):
- """ IPSec/IPv4 inbound: Policy mode test case with flow cache \
- (remove bypass)"""
-
- def test_ipsec_spd_inbound_remove(self):
- # In this test case, packets in IPv4 FWD path are configured
- # to go through IPSec inbound SPD policy lookup.
- #
- # 2 inbound SPD rules (1 HIGH and 1 LOW) are added.
- # - High priority rule action is set to DISCARD.
- # - Low priority rule action is set to BYPASS.
- #
- # Since BYPASS rules take precedence over DISCARD
- # (the order being PROTECT, BYPASS, DISCARD) we expect the
- # BYPASS rule to match and traffic to be correctly forwarded.
- #
- # The BYPASS rules is then removed, and we check that all traffic
- # is now correctly dropped.
- self.create_interfaces(2)
- pkt_count = 5
-
- self.spd_create_and_intf_add(1, [self.pg1, self.pg0])
-
- # create input rules
- # bypass rule should take precedence over discard rule,
- # even though it's lower priority
- policy_0 = self.spd_add_rem_policy( # inbound, priority 10
- 1,
- self.pg1,
- self.pg0,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=10,
- policy_type="bypass",
- )
- policy_1 = self.spd_add_rem_policy( # inbound, priority 15
- 1,
- self.pg1,
- self.pg0,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=15,
- policy_type="discard",
- )
-
- # create output rule so we can capture forwarded packets
- policy_2 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- )
-
- # check flow cache is empty before sending traffic
- self.verify_num_inbound_flow_cache_entries(0)
- # create the packet stream
- packets = self.create_stream(self.pg0, self.pg1, pkt_count)
- # add the stream to the source interface
- self.pg0.add_stream(packets)
- self.pg1.enable_capture()
- self.pg_start()
-
- # check capture on pg1
- capture = self.pg1.get_capture()
- for packet in capture:
- try:
- self.logger.debug(ppp("SPD Add - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
- self.logger.debug("SPD: Num packets: %s", len(capture.res))
-
- # verify captured packets
- self.verify_capture(self.pg0, self.pg1, capture)
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(0, policy_1)
- self.verify_policy_match(pkt_count, policy_2)
- # check input policy has been cached
- self.verify_num_inbound_flow_cache_entries(1)
-
- # remove the input bypass rule
- self.spd_add_rem_policy( # inbound, priority 10
- 1,
- self.pg1,
- self.pg0,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=10,
- policy_type="bypass",
- remove=True,
- )
- # verify flow cache counter has been reset by rule removal
- self.verify_num_inbound_flow_cache_entries(0)
-
- # resend the same packets
- self.pg0.add_stream(packets)
- self.pg1.enable_capture() # flush the old capture
- self.pg_start()
-
- # inbound discard rule should have dropped traffic
- self.pg1.assert_nothing_captured()
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(pkt_count, policy_1)
- self.verify_policy_match(pkt_count, policy_2)
- # by removing the bypass rule, we should have reset the flow cache
- # we only expect the discard rule to now be in the flow cache
- self.verify_num_inbound_flow_cache_entries(1)
-
-
-class IPSec4SpdTestCaseReaddInbound(SpdFlowCacheInbound):
- """ IPSec/IPv4 inbound: Policy mode test case with flow cache \
- (add, remove, re-add bypass)"""
-
- def test_ipsec_spd_inbound_readd(self):
- # In this test case, packets in IPv4 FWD path are configured
- # to go through IPSec inbound SPD policy lookup.
- #
- # 2 inbound SPD rules (1 HIGH and 1 LOW) are added.
- # - High priority rule action is set to DISCARD.
- # - Low priority rule action is set to BYPASS.
- #
- # Since BYPASS rules take precedence over DISCARD
- # (the order being PROTECT, BYPASS, DISCARD) we expect the
- # BYPASS rule to match and traffic to be correctly forwarded.
- #
- # The BYPASS rules is then removed, and we check that all traffic
- # is now correctly dropped.
- #
- # The BYPASS rule is then readded, checking traffic is not forwarded
- # correctly again
- self.create_interfaces(2)
- pkt_count = 5
-
- self.spd_create_and_intf_add(1, [self.pg1, self.pg0])
-
- # create input rules
- # bypass rule should take precedence over discard rule,
- # even though it's lower priority
- policy_0 = self.spd_add_rem_policy( # inbound, priority 10
- 1,
- self.pg1,
- self.pg0,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=10,
- policy_type="bypass",
- )
- policy_1 = self.spd_add_rem_policy( # inbound, priority 15
- 1,
- self.pg1,
- self.pg0,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=15,
- policy_type="discard",
- )
-
- # create output rule so we can capture forwarded packets
- policy_2 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- )
-
- # check flow cache is empty before sending traffic
- self.verify_num_inbound_flow_cache_entries(0)
- # create the packet stream
- packets = self.create_stream(self.pg0, self.pg1, pkt_count)
- # add the stream to the source interface
- self.pg0.add_stream(packets)
- self.pg1.enable_capture()
- self.pg_start()
-
- # check capture on pg1
- capture = self.pg1.get_capture()
- for packet in capture:
- try:
- self.logger.debug(ppp("SPD Add - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
- self.logger.debug("SPD: Num packets: %s", len(capture.res))
-
- # verify captured packets
- self.verify_capture(self.pg0, self.pg1, capture)
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(0, policy_1)
- self.verify_policy_match(pkt_count, policy_2)
- # check input policy has been cached
- self.verify_num_inbound_flow_cache_entries(1)
-
- # remove the input bypass rule
- self.spd_add_rem_policy( # inbound, priority 10
- 1,
- self.pg1,
- self.pg0,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=10,
- policy_type="bypass",
- remove=True,
- )
- # verify flow cache counter has been reset by rule removal
- self.verify_num_inbound_flow_cache_entries(0)
-
- # resend the same packets
- self.pg0.add_stream(packets)
- self.pg1.enable_capture() # flush the old capture
- self.pg_start()
-
- # inbound discard rule should have dropped traffic
- self.pg1.assert_nothing_captured()
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(pkt_count, policy_1)
- self.verify_policy_match(pkt_count, policy_2)
- # by removing the bypass rule, flow cache was reset
- # we only expect the discard rule to now be in the flow cache
- self.verify_num_inbound_flow_cache_entries(1)
-
- # readd the input bypass rule
- policy_0 = self.spd_add_rem_policy( # inbound, priority 10
- 1,
- self.pg1,
- self.pg0,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=10,
- policy_type="bypass",
- )
- # verify flow cache counter has been reset by rule addition
- self.verify_num_inbound_flow_cache_entries(0)
-
- # resend the same packets
- self.pg0.add_stream(packets)
- self.pg1.enable_capture() # flush the old capture
- self.pg_start()
-
- # check capture on pg1
- capture = self.pg1.get_capture()
- for packet in capture:
- try:
- self.logger.debug(ppp("SPD Add - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
-
- # verify captured packets
- self.verify_capture(self.pg0, self.pg1, capture)
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(pkt_count, policy_1)
- self.verify_policy_match(pkt_count * 2, policy_2)
- # by readding the bypass rule, we reset the flow cache
- # we only expect the bypass rule to now be in the flow cache
- self.verify_num_inbound_flow_cache_entries(1)
-
-
-class IPSec4SpdTestCaseMultipleInbound(SpdFlowCacheInbound):
- """ IPSec/IPv4 inbound: Policy mode test case with flow cache \
- (multiple interfaces, multiple rules)"""
-
- def test_ipsec_spd_inbound_multiple(self):
- # In this test case, packets in IPv4 FWD path are configured to go
- # through IPSec outbound SPD policy lookup.
- #
- # Multiples rules on multiple interfaces are tested at the same time.
- # 3x interfaces are configured, binding the same SPD to each.
- # Each interface has 1 SPD rule- 2x BYPASS and 1x DISCARD
- #
- # Traffic should be forwarded with destinations pg1 & pg2
- # and dropped to pg0.
- self.create_interfaces(3)
- pkt_count = 5
- # bind SPD to all interfaces
- self.spd_create_and_intf_add(1, self.pg_interfaces)
- # add input rules on all interfaces
- # pg0 -> pg1
- policy_0 = self.spd_add_rem_policy( # inbound, priority 10
- 1,
- self.pg1,
- self.pg0,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=10,
- policy_type="bypass",
- )
- # pg1 -> pg2
- policy_1 = self.spd_add_rem_policy( # inbound, priority 10
- 1,
- self.pg2,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=10,
- policy_type="bypass",
- )
- # pg2 -> pg0
- policy_2 = self.spd_add_rem_policy( # inbound, priority 10
- 1,
- self.pg0,
- self.pg2,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=10,
- policy_type="discard",
- )
-
- # create output rules covering the the full ip range
- # 0.0.0.0 -> 255.255.255.255, so we can capture forwarded packets
- policy_3 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg0,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- all_ips=True,
- )
-
- # check flow cache is empty (0 active elements) before sending traffic
- self.verify_num_inbound_flow_cache_entries(0)
-
- # create the packet streams
- packets0 = self.create_stream(self.pg0, self.pg1, pkt_count)
- packets1 = self.create_stream(self.pg1, self.pg2, pkt_count)
- packets2 = self.create_stream(self.pg2, self.pg0, pkt_count)
- # add the streams to the source interfaces
- self.pg0.add_stream(packets0)
- self.pg1.add_stream(packets1)
- self.pg2.add_stream(packets2)
- # enable capture on all interfaces
- for pg in self.pg_interfaces:
- pg.enable_capture()
- # start the packet generator
- self.pg_start()
-
- # get captures from ifs
- if_caps = []
- for pg in [self.pg1, self.pg2]: # we are expecting captures on pg1/pg2
- if_caps.append(pg.get_capture())
- for packet in if_caps[-1]:
- try:
- self.logger.debug(ppp("SPD Add - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
-
- # verify captures that matched BYPASS rules
- self.verify_capture(self.pg0, self.pg1, if_caps[0])
- self.verify_capture(self.pg1, self.pg2, if_caps[1])
- # verify that traffic to pg0 matched DISCARD rule and was dropped
- self.pg0.assert_nothing_captured()
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(pkt_count, policy_1)
- self.verify_policy_match(pkt_count, policy_2)
- # check flow/policy match was cached for: 3x input policies
- self.verify_num_inbound_flow_cache_entries(3)
-
-
-class IPSec4SpdTestCaseOverwriteStaleInbound(SpdFlowCacheInbound):
- """ IPSec/IPv4 inbound: Policy mode test case with flow cache \
- (overwrite stale entries)"""
-
- def test_ipsec_spd_inbound_overwrite(self):
- # The operation of the flow cache is setup so that the entire cache
- # is invalidated when adding or removing an SPD policy rule.
- # For performance, old cache entries are not zero'd, but remain
- # in the table as "stale" entries. If a flow matches a stale entry,
- # and the epoch count does NOT match the current count, the entry
- # is overwritten.
- # In this test, 3 active rules are created and matched to enter
- # them into the flow cache.
- # A single entry is removed to invalidate the entire cache.
- # We then readd the rule and test that overwriting of the previous
- # stale entries occurs as expected, and that the flow cache entry
- # counter is updated correctly.
- self.create_interfaces(3)
- pkt_count = 5
- # bind SPD to all interfaces
- self.spd_create_and_intf_add(1, self.pg_interfaces)
- # add input rules on all interfaces
- # pg0 -> pg1
- policy_0 = self.spd_add_rem_policy( # inbound
- 1,
- self.pg1,
- self.pg0,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=10,
- policy_type="bypass",
- )
- # pg1 -> pg2
- policy_1 = self.spd_add_rem_policy( # inbound
- 1,
- self.pg2,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=10,
- policy_type="bypass",
- )
- # pg2 -> pg0
- policy_2 = self.spd_add_rem_policy( # inbound
- 1,
- self.pg0,
- self.pg2,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=10,
- policy_type="discard",
- )
-
- # create output rules covering the the full ip range
- # 0.0.0.0 -> 255.255.255.255, so we can capture forwarded packets
- policy_3 = self.spd_add_rem_policy( # outbound
- 1,
- self.pg0,
- self.pg0,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- all_ips=True,
- )
-
- # check flow cache is empty (0 active elements) before sending traffic
- self.verify_num_inbound_flow_cache_entries(0)
-
- # create the packet streams
- packets0 = self.create_stream(self.pg0, self.pg1, pkt_count)
- packets1 = self.create_stream(self.pg1, self.pg2, pkt_count)
- packets2 = self.create_stream(self.pg2, self.pg0, pkt_count)
- # add the streams to the source interfaces
- self.pg0.add_stream(packets0)
- self.pg1.add_stream(packets1)
- self.pg2.add_stream(packets2)
- # enable capture on all interfaces
- for pg in self.pg_interfaces:
- pg.enable_capture()
- # start the packet generator
- self.pg_start()
-
- # get captures from ifs
- if_caps = []
- for pg in [self.pg1, self.pg2]: # we are expecting captures on pg1/pg2
- if_caps.append(pg.get_capture())
- for packet in if_caps[-1]:
- try:
- self.logger.debug(ppp("SPD Add - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
-
- # verify captures that matched BYPASS rules
- self.verify_capture(self.pg0, self.pg1, if_caps[0])
- self.verify_capture(self.pg1, self.pg2, if_caps[1])
- # verify that traffic to pg0 matched DISCARD rule and was dropped
- self.pg0.assert_nothing_captured()
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(pkt_count, policy_1)
- self.verify_policy_match(pkt_count, policy_2)
- # check flow/policy match was cached for: 3x input policies
- self.verify_num_inbound_flow_cache_entries(3)
-
- # adding an outbound policy should not invalidate output flow cache
- self.spd_add_rem_policy( # outbound
- 1,
- self.pg0,
- self.pg0,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=1,
- policy_type="bypass",
- all_ips=True,
- )
- # check inbound flow cache counter has not been reset
- self.verify_num_inbound_flow_cache_entries(3)
-
- # remove + readd bypass policy - flow cache counter will be reset,
- # and there will be 3x stale entries in flow cache
- self.spd_add_rem_policy( # inbound, priority 10
- 1,
- self.pg1,
- self.pg0,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=10,
- policy_type="bypass",
- remove=True,
- )
- # readd policy
- policy_0 = self.spd_add_rem_policy( # inbound, priority 10
- 1,
- self.pg1,
- self.pg0,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=10,
- policy_type="bypass",
- )
- # check counter was reset
- self.verify_num_inbound_flow_cache_entries(0)
-
- # resend the same packets
- self.pg0.add_stream(packets0)
- self.pg1.add_stream(packets1)
- self.pg2.add_stream(packets2)
- for pg in self.pg_interfaces:
- pg.enable_capture() # flush previous captures
- self.pg_start()
-
- # get captures from ifs
- if_caps = []
- for pg in [self.pg1, self.pg2]: # we are expecting captures on pg1/pg2
- if_caps.append(pg.get_capture())
- for packet in if_caps[-1]:
- try:
- self.logger.debug(ppp("SPD Add - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
-
- # verify captures that matched BYPASS rules
- self.verify_capture(self.pg0, self.pg1, if_caps[0])
- self.verify_capture(self.pg1, self.pg2, if_caps[1])
- # verify that traffic to pg0 matched DISCARD rule and was dropped
- self.pg0.assert_nothing_captured()
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(pkt_count * 2, policy_1)
- self.verify_policy_match(pkt_count * 2, policy_2)
- # we are overwriting 3x stale entries - check flow cache counter
- # is correct
- self.verify_num_inbound_flow_cache_entries(3)
-
-
-class IPSec4SpdTestCaseCollisionInbound(SpdFlowCacheInbound):
- """ IPSec/IPv4 inbound: Policy mode test case with flow cache \
- (hash collision)"""
-
- # Override class setup to restrict hash table size to 16 buckets.
- # This forces using only the lower 4 bits of the hash as a key,
- # making hash collisions easy to find.
- @classmethod
- def setUpConstants(cls):
- super(SpdFlowCacheInbound, cls).setUpConstants()
- cls.vpp_cmdline.extend(
- [
- "ipsec",
- "{",
- "ipv4-inbound-spd-flow-cache on",
- "ipv4-inbound-spd-hash-buckets 16",
- "}",
- ]
- )
- cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
-
- def test_ipsec_spd_inbound_collision(self):
- # The flow cache operation is setup to overwrite an entry
- # if a hash collision occurs.
- # In this test, 2 packets are configured that result in a
- # hash with the same lower 4 bits.
- # After the first packet is received, there should be one
- # active entry in the flow cache.
- # After the second packet with the same lower 4 bit hash
- # is received, this should overwrite the same entry.
- # Therefore there will still be a total of one (1) entry,
- # in the flow cache with two matching policies.
- # crc32_supported() method is used to check cpu for crc32
- # intrinsic support for hashing.
- # If crc32 is not supported, we fall back to clib_xxhash()
- self.create_interfaces(4)
- pkt_count = 5
- # bind SPD to all interfaces
- self.spd_create_and_intf_add(1, self.pg_interfaces)
-
- # create output rules covering the the full ip range
- # 0.0.0.0 -> 255.255.255.255, so we can capture forwarded packets
- policy_0 = self.spd_add_rem_policy( # outbound
- 1,
- self.pg0,
- self.pg0,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- all_ips=True,
- )
-
- capture_intfs = []
- if self.crc32_supported(): # create crc32 collision on last 4 bits
- hashed_with_crc32 = True
- # add matching rules
- policy_1 = self.spd_add_rem_policy( # inbound, priority 10
- 1,
- self.pg1,
- self.pg2,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=10,
- policy_type="bypass",
- )
- policy_2 = self.spd_add_rem_policy( # inbound, priority 10
- 1,
- self.pg3,
- self.pg0,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=10,
- policy_type="bypass",
- )
-
- # we expect to get captures on pg1 + pg3
- capture_intfs.append(self.pg1)
- capture_intfs.append(self.pg3)
-
- # check flow cache is empty before sending traffic
- self.verify_num_inbound_flow_cache_entries(0)
-
- # create the packet streams
- # packet hashes to:
- # ad727628
- packets1 = self.create_stream(self.pg2, self.pg1, pkt_count, 1, 1)
- # b5512898
- packets2 = self.create_stream(self.pg0, self.pg3, pkt_count, 1, 1)
- # add the streams to the source interfaces
- self.pg2.add_stream(packets1)
- self.pg0.add_stream(packets2)
- else: # create xxhash collision on last 4 bits
- hashed_with_crc32 = False
- # add matching rules
- policy_1 = self.spd_add_rem_policy( # inbound, priority 10
- 1,
- self.pg1,
- self.pg2,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=10,
- policy_type="bypass",
- )
- policy_2 = self.spd_add_rem_policy( # inbound, priority 10
- 1,
- self.pg2,
- self.pg3,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=10,
- policy_type="bypass",
- )
-
- capture_intfs.append(self.pg1)
- capture_intfs.append(self.pg2)
-
- # check flow cache is empty before sending traffic
- self.verify_num_inbound_flow_cache_entries(0)
-
- # create the packet streams
- # 2f8f90f557eef12c
- packets1 = self.create_stream(self.pg2, self.pg1, pkt_count, 1, 1)
- # 6b7f9987719ffc1c
- packets2 = self.create_stream(self.pg3, self.pg2, pkt_count, 1, 1)
- # add the streams to the source interfaces
- self.pg2.add_stream(packets1)
- self.pg3.add_stream(packets2)
-
- # enable capture on interfaces we expect capture on & send pkts
- for pg in capture_intfs:
- pg.enable_capture()
- self.pg_start()
-
- # get captures
- if_caps = []
- for pg in capture_intfs:
- if_caps.append(pg.get_capture())
- for packet in if_caps[-1]:
- try:
- self.logger.debug(ppp("SPD Add - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
-
- # verify captures that matched BYPASS rule
- if hashed_with_crc32:
- self.verify_capture(self.pg2, self.pg1, if_caps[0])
- self.verify_capture(self.pg0, self.pg3, if_caps[1])
- else: # hashed with xxhash
- self.verify_capture(self.pg2, self.pg1, if_caps[0])
- self.verify_capture(self.pg3, self.pg2, if_caps[1])
-
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_1)
- self.verify_policy_match(pkt_count, policy_2)
- self.verify_policy_match(pkt_count * 2, policy_0) # output policy
- # we have matched 2 policies, but due to the hash collision
- # one active entry is expected
- self.verify_num_inbound_flow_cache_entries(1)
-
-
-if __name__ == "__main__":
- unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_ipsec_spd_flow_cache_output.py b/test/asf/test_ipsec_spd_flow_cache_output.py
deleted file mode 100644
index ec68c3ed7d8..00000000000
--- a/test/asf/test_ipsec_spd_flow_cache_output.py
+++ /dev/null
@@ -1,765 +0,0 @@
-import socket
-import unittest
-
-from util import ppp
-from asfframework import VppTestRunner
-from template_ipsec import SpdFlowCacheTemplate
-
-
-class SpdFlowCacheOutbound(SpdFlowCacheTemplate):
- # Override setUpConstants to enable outbound flow cache in config
- @classmethod
- def setUpConstants(cls):
- super(SpdFlowCacheOutbound, cls).setUpConstants()
- cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-outbound-spd-flow-cache on", "}"])
- cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
-
-
-class IPSec4SpdTestCaseAdd(SpdFlowCacheOutbound):
- """ IPSec/IPv4 outbound: Policy mode test case with flow cache \
- (add rule)"""
-
- def test_ipsec_spd_outbound_add(self):
- # In this test case, packets in IPv4 FWD path are configured
- # to go through IPSec outbound SPD policy lookup.
- # 2 SPD rules (1 HIGH and 1 LOW) are added.
- # High priority rule action is set to BYPASS.
- # Low priority rule action is set to DISCARD.
- # Traffic sent on pg0 interface should match high priority
- # rule and should be sent out on pg1 interface.
- self.create_interfaces(2)
- pkt_count = 5
- self.spd_create_and_intf_add(1, [self.pg1])
- policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- )
- policy_1 = self.spd_add_rem_policy( # outbound, priority 5
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=5,
- policy_type="discard",
- )
-
- # check flow cache is empty before sending traffic
- self.verify_num_outbound_flow_cache_entries(0)
-
- # create the packet stream
- packets = self.create_stream(self.pg0, self.pg1, pkt_count)
- # add the stream to the source interface + enable capture
- self.pg0.add_stream(packets)
- self.pg0.enable_capture()
- self.pg1.enable_capture()
- # start the packet generator
- self.pg_start()
- # get capture
- capture = self.pg1.get_capture()
- for packet in capture:
- try:
- self.logger.debug(ppp("SPD - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
- self.logger.debug("SPD: Num packets: %s", len(capture.res))
-
- # assert nothing captured on pg0
- self.pg0.assert_nothing_captured()
- # verify captured packets
- self.verify_capture(self.pg0, self.pg1, capture)
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(0, policy_1)
- # check policy in SPD has been cached after traffic
- # matched BYPASS rule in SPD
- self.verify_num_outbound_flow_cache_entries(1)
-
-
-class IPSec4SpdTestCaseRemoveOutbound(SpdFlowCacheOutbound):
- """ IPSec/IPv4 outbound: Policy mode test case with flow cache \
- (remove rule)"""
-
- def test_ipsec_spd_outbound_remove(self):
- # In this test case, packets in IPv4 FWD path are configured
- # to go through IPSec outbound SPD policy lookup.
- # 2 SPD rules (1 HIGH and 1 LOW) are added.
- # High priority rule action is set to BYPASS.
- # Low priority rule action is set to DISCARD.
- # High priority rule is then removed.
- # Traffic sent on pg0 interface should match low priority
- # rule and should be discarded after SPD lookup.
- self.create_interfaces(2)
- pkt_count = 5
- self.spd_create_and_intf_add(1, [self.pg1])
- policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- )
- policy_1 = self.spd_add_rem_policy( # outbound, priority 5
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=5,
- policy_type="discard",
- )
-
- # check flow cache is empty before sending traffic
- self.verify_num_outbound_flow_cache_entries(0)
-
- # create the packet stream
- packets = self.create_stream(self.pg0, self.pg1, pkt_count)
- # add the stream to the source interface + enable capture
- self.pg0.add_stream(packets)
- self.pg0.enable_capture()
- self.pg1.enable_capture()
- # start the packet generator
- self.pg_start()
- # get capture
- capture = self.pg1.get_capture()
- for packet in capture:
- try:
- self.logger.debug(ppp("SPD - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
-
- # assert nothing captured on pg0
- self.pg0.assert_nothing_captured()
- # verify capture on pg1
- self.logger.debug("SPD: Num packets: %s", len(capture.res))
- self.verify_capture(self.pg0, self.pg1, capture)
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(0, policy_1)
- # check policy in SPD has been cached after traffic
- # matched BYPASS rule in SPD
- self.verify_num_outbound_flow_cache_entries(1)
-
- # now remove the bypass rule
- self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- remove=True,
- )
- # verify flow cache counter has been reset by rule removal
- self.verify_num_outbound_flow_cache_entries(0)
-
- # resend the same packets
- self.pg0.add_stream(packets)
- self.pg0.enable_capture() # flush the old captures
- self.pg1.enable_capture()
- self.pg_start()
- # assert nothing captured on pg0
- self.pg0.assert_nothing_captured()
- # all packets will be dropped by SPD rule
- self.pg1.assert_nothing_captured()
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(pkt_count, policy_1)
- # previous stale entry in flow cache should have been overwritten,
- # with one active entry
- self.verify_num_outbound_flow_cache_entries(1)
-
-
-class IPSec4SpdTestCaseReaddOutbound(SpdFlowCacheOutbound):
- """ IPSec/IPv4 outbound: Policy mode test case with flow cache \
- (add, remove, re-add)"""
-
- def test_ipsec_spd_outbound_readd(self):
- # In this test case, packets in IPv4 FWD path are configured
- # to go through IPSec outbound SPD policy lookup.
- # 2 SPD rules (1 HIGH and 1 LOW) are added.
- # High priority rule action is set to BYPASS.
- # Low priority rule action is set to DISCARD.
- # Traffic sent on pg0 interface should match high priority
- # rule and should be sent out on pg1 interface.
- # High priority rule is then removed.
- # Traffic sent on pg0 interface should match low priority
- # rule and should be discarded after SPD lookup.
- # Readd high priority rule.
- # Traffic sent on pg0 interface should match high priority
- # rule and should be sent out on pg1 interface.
- self.create_interfaces(2)
- pkt_count = 5
- self.spd_create_and_intf_add(1, [self.pg1])
- policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- )
- policy_1 = self.spd_add_rem_policy( # outbound, priority 5
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=5,
- policy_type="discard",
- )
-
- # check flow cache is empty before sending traffic
- self.verify_num_outbound_flow_cache_entries(0)
-
- # create the packet stream
- packets = self.create_stream(self.pg0, self.pg1, pkt_count)
- # add the stream to the source interface + enable capture
- self.pg0.add_stream(packets)
- self.pg0.enable_capture()
- self.pg1.enable_capture()
- # start the packet generator
- self.pg_start()
- # get capture
- capture = self.pg1.get_capture()
- for packet in capture:
- try:
- self.logger.debug(ppp("SPD - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
- self.logger.debug("SPD: Num packets: %s", len(capture.res))
-
- # assert nothing captured on pg0
- self.pg0.assert_nothing_captured()
- # verify capture on pg1
- self.verify_capture(self.pg0, self.pg1, capture)
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(0, policy_1)
- # check policy in SPD has been cached after traffic
- # matched BYPASS rule in SPD
- self.verify_num_outbound_flow_cache_entries(1)
-
- # now remove the bypass rule, leaving only the discard rule
- self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- remove=True,
- )
- # verify flow cache counter has been reset by rule removal
- self.verify_num_outbound_flow_cache_entries(0)
-
- # resend the same packets
- self.pg0.add_stream(packets)
- self.pg0.enable_capture() # flush the old captures
- self.pg1.enable_capture()
- self.pg_start()
-
- # assert nothing captured on pg0
- self.pg0.assert_nothing_captured()
- # all packets will be dropped by SPD rule
- self.pg1.assert_nothing_captured()
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(pkt_count, policy_1)
- # previous stale entry in flow cache should have been overwritten
- self.verify_num_outbound_flow_cache_entries(1)
-
- # now readd the bypass rule
- policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- )
- # verify flow cache counter has been reset by rule addition
- self.verify_num_outbound_flow_cache_entries(0)
-
- # resend the same packets
- self.pg0.add_stream(packets)
- self.pg0.enable_capture() # flush the old captures
- self.pg1.enable_capture()
- self.pg_start()
-
- # get capture
- capture = self.pg1.get_capture(pkt_count)
- for packet in capture:
- try:
- self.logger.debug(ppp("SPD - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
- self.logger.debug("SPD: Num packets: %s", len(capture.res))
-
- # assert nothing captured on pg0
- self.pg0.assert_nothing_captured()
- # verify captured packets
- self.verify_capture(self.pg0, self.pg1, capture)
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(pkt_count, policy_1)
- # previous stale entry in flow cache should have been overwritten
- self.verify_num_outbound_flow_cache_entries(1)
-
-
-class IPSec4SpdTestCaseMultipleOutbound(SpdFlowCacheOutbound):
- """ IPSec/IPv4 outbound: Policy mode test case with flow cache \
- (multiple interfaces, multiple rules)"""
-
- def test_ipsec_spd_outbound_multiple(self):
- # In this test case, packets in IPv4 FWD path are configured to go
- # through IPSec outbound SPD policy lookup.
- # Multiples rules on multiple interfaces are tested at the same time.
- # 3x interfaces are configured, binding the same SPD to each.
- # Each interface has 2 SPD rules (1 BYPASS and 1 DISCARD).
- # On pg0 & pg1, the BYPASS rule is HIGH priority
- # On pg2, the DISCARD rule is HIGH priority
- # Traffic should be received on pg0 & pg1 and dropped on pg2.
- self.create_interfaces(3)
- pkt_count = 5
- # bind SPD to all interfaces
- self.spd_create_and_intf_add(1, self.pg_interfaces)
- # add rules on all interfaces
- policy_01 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- )
- policy_02 = self.spd_add_rem_policy( # outbound, priority 5
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=5,
- policy_type="discard",
- )
-
- policy_11 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg1,
- self.pg2,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- )
- policy_12 = self.spd_add_rem_policy( # outbound, priority 5
- 1,
- self.pg1,
- self.pg2,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=5,
- policy_type="discard",
- )
-
- policy_21 = self.spd_add_rem_policy( # outbound, priority 5
- 1,
- self.pg2,
- self.pg0,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=5,
- policy_type="bypass",
- )
- policy_22 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg2,
- self.pg0,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="discard",
- )
-
- # interfaces bound to an SPD, will by default drop inbound
- # traffic with no matching policies. add catch-all inbound
- # bypass rule to SPD:
- self.spd_add_rem_policy( # inbound, all interfaces
- 1,
- None,
- None,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=10,
- policy_type="bypass",
- all_ips=True,
- )
-
- # check flow cache is empty (0 active elements) before sending traffic
- self.verify_num_outbound_flow_cache_entries(0)
-
- # create the packet streams
- packets0 = self.create_stream(self.pg0, self.pg1, pkt_count)
- packets1 = self.create_stream(self.pg1, self.pg2, pkt_count)
- packets2 = self.create_stream(self.pg2, self.pg0, pkt_count)
- # add the streams to the source interfaces
- self.pg0.add_stream(packets0)
- self.pg1.add_stream(packets1)
- self.pg2.add_stream(packets2)
- # enable capture on all interfaces
- for pg in self.pg_interfaces:
- pg.enable_capture()
- # start the packet generator
- self.pg_start()
-
- # get captures
- if_caps = []
- for pg in [self.pg1, self.pg2]: # we are expecting captures on pg1/pg2
- if_caps.append(pg.get_capture())
- for packet in if_caps[-1]:
- try:
- self.logger.debug(ppp("SPD - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
- self.logger.debug("SPD: Num packets: %s", len(if_caps[0].res))
- self.logger.debug("SPD: Num packets: %s", len(if_caps[1].res))
-
- # verify captures that matched BYPASS rule
- self.verify_capture(self.pg0, self.pg1, if_caps[0])
- self.verify_capture(self.pg1, self.pg2, if_caps[1])
- # verify that traffic to pg0 matched DISCARD rule and was dropped
- self.pg0.assert_nothing_captured()
- # verify all packets that were expected to match rules, matched
- # pg0 -> pg1
- self.verify_policy_match(pkt_count, policy_01)
- self.verify_policy_match(0, policy_02)
- # pg1 -> pg2
- self.verify_policy_match(pkt_count, policy_11)
- self.verify_policy_match(0, policy_12)
- # pg2 -> pg0
- self.verify_policy_match(0, policy_21)
- self.verify_policy_match(pkt_count, policy_22)
- # check that 3 matching policies in SPD have been cached
- self.verify_num_outbound_flow_cache_entries(3)
-
-
-class IPSec4SpdTestCaseOverwriteStaleOutbound(SpdFlowCacheOutbound):
- """ IPSec/IPv4 outbound: Policy mode test case with flow cache \
- (overwrite stale entries)"""
-
- def test_ipsec_spd_outbound_overwrite(self):
- # The operation of the flow cache is setup so that the entire cache
- # is invalidated when adding or removing an SPD policy rule.
- # For performance, old cache entries are not zero'd, but remain
- # in the table as "stale" entries. If a flow matches a stale entry,
- # and the epoch count does NOT match the current count, the entry
- # is overwritten.
- # In this test, 3 active rules are created and matched to enter
- # them into the flow cache.
- # A single entry is removed to invalidate the entire cache.
- # We then readd the rule and test that overwriting of the previous
- # stale entries occurs as expected, and that the flow cache entry
- # counter is updated correctly.
- self.create_interfaces(3)
- pkt_count = 2
- # bind SPD to all interfaces
- self.spd_create_and_intf_add(1, self.pg_interfaces)
- # add output rules on all interfaces
- # pg0 -> pg1
- policy_0 = self.spd_add_rem_policy( # outbound
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- )
- # pg1 -> pg2
- policy_1 = self.spd_add_rem_policy( # outbound
- 1,
- self.pg1,
- self.pg2,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- )
- # pg2 -> pg0
- policy_2 = self.spd_add_rem_policy( # outbound
- 1,
- self.pg2,
- self.pg0,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="discard",
- )
-
- # interfaces bound to an SPD, will by default drop inbound
- # traffic with no matching policies. add catch-all inbound
- # bypass rule to SPD:
- self.spd_add_rem_policy( # inbound, all interfaces
- 1,
- None,
- None,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=10,
- policy_type="bypass",
- all_ips=True,
- )
-
- # check flow cache is empty (0 active elements) before sending traffic
- self.verify_num_outbound_flow_cache_entries(0)
-
- # create the packet streams
- packets0 = self.create_stream(self.pg0, self.pg1, pkt_count)
- packets1 = self.create_stream(self.pg1, self.pg2, pkt_count)
- packets2 = self.create_stream(self.pg2, self.pg0, pkt_count)
- # add the streams to the source interfaces
- self.pg0.add_stream(packets0)
- self.pg1.add_stream(packets1)
- self.pg2.add_stream(packets2)
- # enable capture on all interfaces
- for pg in self.pg_interfaces:
- pg.enable_capture()
- # start the packet generator
- self.pg_start()
-
- # get captures from ifs
- if_caps = []
- for pg in [self.pg1, self.pg2]: # we are expecting captures on pg1/pg2
- if_caps.append(pg.get_capture())
- for packet in if_caps[-1]:
- try:
- self.logger.debug(ppp("SPD Add - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
-
- # verify captures that matched BYPASS rules
- self.verify_capture(self.pg0, self.pg1, if_caps[0])
- self.verify_capture(self.pg1, self.pg2, if_caps[1])
- # verify that traffic to pg0 matched DISCARD rule and was dropped
- self.pg0.assert_nothing_captured()
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(pkt_count, policy_1)
- self.verify_policy_match(pkt_count, policy_2)
- # check flow/policy match was cached for: 3x output policies
- self.verify_num_outbound_flow_cache_entries(3)
-
- # adding an inbound policy should not invalidate output flow cache
- self.spd_add_rem_policy( # inbound
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=10,
- policy_type="bypass",
- )
- # check flow cache counter has not been reset
- self.verify_num_outbound_flow_cache_entries(3)
-
- # remove a bypass policy - flow cache counter will be reset, and
- # there will be 3x stale entries in flow cache
- self.spd_add_rem_policy( # outbound
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- remove=True,
- )
- # readd policy
- policy_0 = self.spd_add_rem_policy( # outbound
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- )
- # check counter was reset with flow cache invalidation
- self.verify_num_outbound_flow_cache_entries(0)
-
- # resend the same packets
- self.pg0.add_stream(packets0)
- self.pg1.add_stream(packets1)
- self.pg2.add_stream(packets2)
- for pg in self.pg_interfaces:
- pg.enable_capture() # flush previous captures
- self.pg_start()
-
- # get captures from ifs
- if_caps = []
- for pg in [self.pg1, self.pg2]: # we are expecting captures on pg1/pg2
- if_caps.append(pg.get_capture())
- for packet in if_caps[-1]:
- try:
- self.logger.debug(ppp("SPD Add - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
-
- # verify captures that matched BYPASS rules
- self.verify_capture(self.pg0, self.pg1, if_caps[0])
- self.verify_capture(self.pg1, self.pg2, if_caps[1])
- # verify that traffic to pg0 matched DISCARD rule and was dropped
- self.pg0.assert_nothing_captured()
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(pkt_count * 2, policy_1)
- self.verify_policy_match(pkt_count * 2, policy_2)
- # we are overwriting 3x stale entries - check flow cache counter
- # is correct
- self.verify_num_outbound_flow_cache_entries(3)
-
-
-class IPSec4SpdTestCaseCollisionOutbound(SpdFlowCacheOutbound):
- """ IPSec/IPv4 outbound: Policy mode test case with flow cache \
- (hash collision)"""
-
- # Override class setup to restrict vector size to 16 elements.
- # This forces using only the lower 4 bits of the hash as a key,
- # making hash collisions easy to find.
- @classmethod
- def setUpConstants(cls):
- super(SpdFlowCacheOutbound, cls).setUpConstants()
- cls.vpp_cmdline.extend(
- [
- "ipsec",
- "{",
- "ipv4-outbound-spd-flow-cache on",
- "ipv4-outbound-spd-hash-buckets 16",
- "}",
- ]
- )
- cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
-
- def test_ipsec_spd_outbound_collision(self):
- # The flow cache operation is setup to overwrite an entry
- # if a hash collision occurs.
- # In this test, 2 packets are configured that result in a
- # hash with the same lower 4 bits.
- # After the first packet is received, there should be one
- # active entry in the flow cache.
- # After the second packet with the same lower 4 bit hash
- # is received, this should overwrite the same entry.
- # Therefore there will still be a total of one (1) entry,
- # in the flow cache with two matching policies.
- # crc32_supported() method is used to check cpu for crc32
- # intrinsic support for hashing.
- # If crc32 is not supported, we fall back to clib_xxhash()
- self.create_interfaces(3)
- pkt_count = 5
- # bind SPD to all interfaces
- self.spd_create_and_intf_add(1, self.pg_interfaces)
- # add rules
- policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg1,
- self.pg2,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- )
- policy_1 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg2,
- self.pg0,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- )
-
- # interfaces bound to an SPD, will by default drop inbound
- # traffic with no matching policies. add catch-all inbound
- # bypass rule to SPD:
- self.spd_add_rem_policy( # inbound, all interfaces
- 1,
- None,
- None,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=10,
- policy_type="bypass",
- all_ips=True,
- )
-
- # check flow cache is empty (0 active elements) before sending traffic
- self.verify_num_outbound_flow_cache_entries(0)
-
- # create the packet streams generating collision on last 4 bits
- if self.crc32_supported():
- # packet hashes to:
- # 432c99c2
- packets1 = self.create_stream(self.pg1, self.pg2, pkt_count, 1, 1)
- # 31f8f3f2
- packets2 = self.create_stream(self.pg2, self.pg0, pkt_count, 6, 6)
- else: # clib_xxhash
- # ec3a258551bc0306
- packets1 = self.create_stream(self.pg1, self.pg2, pkt_count, 2, 2)
- # 61fee526d18d7a6
- packets2 = self.create_stream(self.pg2, self.pg0, pkt_count, 3, 3)
-
- # add the streams to the source interfaces
- self.pg1.add_stream(packets1)
- self.pg2.add_stream(packets2)
- # enable capture on all interfaces
- for pg in self.pg_interfaces:
- pg.enable_capture()
- # start the packet generator
- self.pg_start()
-
- # get captures from ifs - the proper pkt_count of packets was saved by
- # create_packet_info() based on dst_if parameter
- if_caps = []
- for pg in [self.pg2, self.pg0]: # we are expecting captures on pg2/pg0
- if_caps.append(pg.get_capture())
- for packet in if_caps[-1]:
- try:
- self.logger.debug(ppp("SPD - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
- self.logger.debug("SPD: Num packets: %s", len(if_caps[0].res))
- self.logger.debug("SPD: Num packets: %s", len(if_caps[1].res))
-
- # verify captures that matched BYPASS rule
- self.verify_capture(self.pg1, self.pg2, if_caps[0])
- self.verify_capture(self.pg2, self.pg0, if_caps[1])
- # verify all packets that were expected to match rules, matched
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(pkt_count, policy_1)
- # we have matched 2 policies, but due to the hash collision
- # one active entry is expected
- self.verify_num_outbound_flow_cache_entries(1)
-
-
-if __name__ == "__main__":
- unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_ipsec_spd_fp_output.py b/test/asf/test_ipsec_spd_fp_output.py
deleted file mode 100644
index a92669a4f3f..00000000000
--- a/test/asf/test_ipsec_spd_fp_output.py
+++ /dev/null
@@ -1,1418 +0,0 @@
-import socket
-import unittest
-import ipaddress
-
-from util import ppp
-from asfframework import VppTestRunner
-from template_ipsec import IPSecIPv4Fwd
-from template_ipsec import IPSecIPv6Fwd
-
-
-class SpdFastPathOutbound(IPSecIPv4Fwd):
- # Override setUpConstants to enable outbound fast path in config
- @classmethod
- def setUpConstants(cls):
- super(SpdFastPathOutbound, cls).setUpConstants()
- cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-outbound-spd-fast-path on", "}"])
- cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
-
-
-class SpdFastPathIPv6Outbound(IPSecIPv6Fwd):
- # Override setUpConstants to enable outbound fast path in config
- @classmethod
- def setUpConstants(cls):
- super(SpdFastPathIPv6Outbound, cls).setUpConstants()
- cls.vpp_cmdline.extend(["ipsec", "{", "ipv6-outbound-spd-fast-path on", "}"])
- cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline))
-
-
-class IPSec4SpdTestCaseAdd(SpdFastPathOutbound):
- """ IPSec/IPv4 outbound: Policy mode test case with fast path \
- (add rule)"""
-
- def test_ipsec_spd_outbound_add(self):
- # In this test case, packets in IPv4 FWD path are configured
- # to go through IPSec outbound SPD policy lookup.
- # 2 SPD rules (1 HIGH and 1 LOW) are added.
- # High priority rule action is set to BYPASS.
- # Low priority rule action is set to DISCARD.
- # Traffic sent on pg0 interface should match high priority
- # rule and should be sent out on pg1 interface.
- self.create_interfaces(2)
- pkt_count = 5
- s_port_s = 1111
- s_port_e = 1111
- d_port_s = 2222
- d_port_e = 2222
- self.spd_create_and_intf_add(1, [self.pg1])
- policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- local_port_start=s_port_s,
- local_port_stop=s_port_e,
- remote_port_start=d_port_s,
- remote_port_stop=d_port_e,
- )
- policy_1 = self.spd_add_rem_policy( # outbound, priority 5
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=5,
- policy_type="discard",
- local_port_start=s_port_s,
- local_port_stop=s_port_e,
- remote_port_start=d_port_s,
- remote_port_stop=d_port_e,
- )
-
- # create the packet stream
- packets = self.create_stream(self.pg0, self.pg1, pkt_count, s_port_s, d_port_s)
- # add the stream to the source interface + enable capture
- self.pg0.add_stream(packets)
- self.pg0.enable_capture()
- self.pg1.enable_capture()
- # start the packet generator
- self.pg_start()
- # get capture
- capture = self.pg1.get_capture()
- for packet in capture:
- try:
- self.logger.debug(ppp("SPD - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
- self.logger.debug("SPD: Num packets: %s", len(capture.res))
-
- # assert nothing captured on pg0
- self.pg0.assert_nothing_captured()
- # verify captured packets
- self.verify_capture(self.pg0, self.pg1, capture)
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(0, policy_1)
-
-
-class IPSec4SpdTestCaseAddPortRange(SpdFastPathOutbound):
- """ IPSec/IPv4 outbound: Policy mode test case with fast path \
- (add all ips port range rule)"""
-
- def test_ipsec_spd_outbound_add(self):
- # In this test case, packets in IPv4 FWD path are configured
- # to go through IPSec outbound SPD policy lookup.
- # 2 SPD rules (1 HIGH and 1 LOW) are added.
- # High priority rule action is set to BYPASS.
- # Low priority rule action is set to DISCARD.
- # Traffic sent on pg0 interface should match high priority
- # rule and should be sent out on pg1 interface.
- self.create_interfaces(2)
- pkt_count = 5
- s_port_s = 1000
- s_port_e = 2023
- d_port_s = 5000
- d_port_e = 6023
- self.spd_create_and_intf_add(1, [self.pg1])
- policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- all_ips=True,
- local_port_start=s_port_s,
- local_port_stop=s_port_e,
- remote_port_start=d_port_s,
- remote_port_stop=d_port_e,
- )
- policy_1 = self.spd_add_rem_policy( # outbound, priority 5
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=5,
- policy_type="discard",
- all_ips=True,
- local_port_start=s_port_s,
- local_port_stop=s_port_e,
- remote_port_start=d_port_s,
- remote_port_stop=d_port_e,
- )
-
- # create the packet stream
- packets = self.create_stream(self.pg0, self.pg1, pkt_count, 1333, 5444)
- # add the stream to the source interface + enable capture
- self.pg0.add_stream(packets)
- self.pg0.enable_capture()
- self.pg1.enable_capture()
- # start the packet generator
- self.pg_start()
- # get capture
- capture = self.pg1.get_capture()
- for packet in capture:
- try:
- self.logger.debug(ppp("SPD - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
- self.logger.debug("SPD: Num packets: %s", len(capture.res))
-
- # assert nothing captured on pg0
- self.pg0.assert_nothing_captured()
- # verify captured packets
- self.verify_capture(self.pg0, self.pg1, capture)
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(0, policy_1)
-
-
-class IPSec4SpdTestCaseAddIPRange(SpdFastPathOutbound):
- """ IPSec/IPv4 outbound: Policy mode test case with fast path \
- (add ips range with any port rule)"""
-
- def test_ipsec_spd_outbound_add(self):
- # In this test case, packets in IPv4 FWD path are configured
- # to go through IPSec outbound SPD policy lookup.
- # 2 SPD rules (1 HIGH and 1 LOW) are added.
- # High priority rule action is set to BYPASS.
- # Low priority rule action is set to DISCARD.
- # Traffic sent on pg0 interface should match high priority
- # rule and should be sent out on pg1 interface.
- self.create_interfaces(2)
- pkt_count = 5
- s_ip_s = ipaddress.ip_address(self.pg0.remote_ip4)
- s_ip_e = ipaddress.ip_address(int(s_ip_s) + 5)
- d_ip_s = ipaddress.ip_address(self.pg1.remote_ip4)
- d_ip_e = ipaddress.ip_address(int(d_ip_s) + 0)
- self.spd_create_and_intf_add(1, [self.pg1])
- policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- ip_range=True,
- local_ip_start=s_ip_s,
- local_ip_stop=s_ip_e,
- remote_ip_start=d_ip_s,
- remote_ip_stop=d_ip_e,
- )
- policy_1 = self.spd_add_rem_policy( # outbound, priority 5
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=5,
- policy_type="discard",
- ip_range=True,
- local_ip_start=s_ip_s,
- local_ip_stop=s_ip_e,
- remote_ip_start=d_ip_s,
- remote_ip_stop=d_ip_e,
- )
-
- # create the packet stream
- packets = self.create_stream(self.pg0, self.pg1, pkt_count)
- # add the stream to the source interface + enable capture
- self.pg0.add_stream(packets)
- self.pg0.enable_capture()
- self.pg1.enable_capture()
- # start the packet generator
- self.pg_start()
- # get capture
- capture = self.pg1.get_capture()
- for packet in capture:
- try:
- self.logger.debug(ppp("SPD - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
- self.logger.debug("SPD: Num packets: %s", len(capture.res))
-
- # assert nothing captured on pg0
- self.pg0.assert_nothing_captured()
- # verify captured packets
- self.verify_capture(self.pg0, self.pg1, capture)
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(0, policy_1)
-
-
-class IPSec4SpdTestCaseAddIPAndPortRange(SpdFastPathOutbound):
- """ IPSec/IPv4 outbound: Policy mode test case with fast path \
- (add all ips range rule)"""
-
- def test_ipsec_spd_outbound_add(self):
- # In this test case, packets in IPv4 FWD path are configured
- # to go through IPSec outbound SPD policy lookup.
- # 2 SPD rules (1 HIGH and 1 LOW) are added.
- # High priority rule action is set to BYPASS.
- # Low priority rule action is set to DISCARD.
- # Traffic sent on pg0 interface should match high priority
- # rule and should be sent out on pg1 interface.
- # in this test we define ranges of ports and ip addresses.
- self.create_interfaces(2)
- pkt_count = 5
- s_port_s = 1000
- s_port_e = 1000 + 1023
- d_port_s = 5000
- d_port_e = 5000 + 1023
-
- s_ip_s = ipaddress.ip_address(
- int(ipaddress.ip_address(self.pg0.remote_ip4)) - 24
- )
- s_ip_e = ipaddress.ip_address(int(s_ip_s) + 255)
- d_ip_s = ipaddress.ip_address(self.pg1.remote_ip4)
- d_ip_e = ipaddress.ip_address(int(d_ip_s) + 255)
- self.spd_create_and_intf_add(1, [self.pg1])
- policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- ip_range=True,
- local_ip_start=s_ip_s,
- local_ip_stop=s_ip_e,
- remote_ip_start=d_ip_s,
- remote_ip_stop=d_ip_e,
- local_port_start=s_port_s,
- local_port_stop=s_port_e,
- remote_port_start=d_port_s,
- remote_port_stop=d_port_e,
- )
- policy_1 = self.spd_add_rem_policy( # outbound, priority 5
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=5,
- policy_type="discard",
- ip_range=True,
- local_ip_start=s_ip_s,
- local_ip_stop=s_ip_e,
- remote_ip_start=d_ip_s,
- remote_ip_stop=d_ip_e,
- local_port_start=s_port_s,
- local_port_stop=s_port_e,
- remote_port_start=d_port_s,
- remote_port_stop=d_port_e,
- )
-
- # create the packet stream
- packets = self.create_stream(self.pg0, self.pg1, pkt_count)
- # add the stream to the source interface + enable capture
- self.pg0.add_stream(packets)
- self.pg0.enable_capture()
- self.pg1.enable_capture()
- # start the packet generator
- self.pg_start()
- # get capture
- capture = self.pg1.get_capture()
- for packet in capture:
- try:
- self.logger.debug(ppp("SPD - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
- self.logger.debug("SPD: Num packets: %s", len(capture.res))
-
- # assert nothing captured on pg0
- self.pg0.assert_nothing_captured()
- # verify captured packets
- self.verify_capture(self.pg0, self.pg1, capture)
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(0, policy_1)
-
-
-class IPSec4SpdTestCaseAddAll(SpdFastPathOutbound):
- """ IPSec/IPv4 outbound: Policy mode test case with fast path \
- (add all ips ports rule)"""
-
- def test_ipsec_spd_outbound_add(self):
- # In this test case, packets in IPv4 FWD path are configured
- # to go through IPSec outbound SPD policy lookup.
- # 2 SPD rules (1 HIGH and 1 LOW) are added.
- # Low priority rule action is set to BYPASS all ips.
- # High priority rule action is set to DISCARD all ips.
- # Traffic sent on pg0 interface when LOW priority rule is added,
- # expect the packet is being sent out to pg1. Then HIGH priority
- # rule is added and send the same traffic to pg0, this time expect
- # the traffic is dropped.
- self.create_interfaces(2)
- pkt_count = 5
- self.spd_create_and_intf_add(1, [self.pg1])
- policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- all_ips=True,
- )
-
- # create the packet stream
- packets = self.create_stream(self.pg0, self.pg1, pkt_count)
- # add the stream to the source interface + enable capture
- self.pg0.add_stream(packets)
- self.pg0.enable_capture()
- self.pg1.enable_capture()
- # start the packet generator
- self.pg_start()
- # get capture
- capture = self.pg1.get_capture()
- for packet in capture:
- try:
- self.logger.debug(ppp("SPD - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
- self.logger.debug("SPD: Num packets: %s", len(capture.res))
-
- # assert nothing captured on pg0
- self.pg0.assert_nothing_captured()
- # verify captured packets
- self.verify_capture(self.pg0, self.pg1, capture)
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
-
- policy_1 = self.spd_add_rem_policy( # outbound, priority 20
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=20,
- policy_type="discard",
- all_ips=True,
- )
-
- # create the packet stream
- packets = self.create_stream(self.pg0, self.pg1, pkt_count)
- # add the stream to the source interface + enable capture
- self.pg0.add_stream(packets)
- self.pg0.enable_capture()
- self.pg1.enable_capture()
- # start the packet generator
- self.pg_start()
- # assert nothing captured on pg0 and pg1
- self.pg0.assert_nothing_captured()
- self.pg1.assert_nothing_captured()
-
-
-class IPSec4SpdTestCaseRemove(SpdFastPathOutbound):
- """ IPSec/IPv4 outbound: Policy mode test case with fast path \
- (remove rule)"""
-
- def test_ipsec_spd_outbound_remove(self):
- # In this test case, packets in IPv4 FWD path are configured
- # to go through IPSec outbound SPD policy lookup.
- # 2 SPD rules (1 HIGH and 1 LOW) are added.
- # High priority rule action is set to BYPASS.
- # Low priority rule action is set to DISCARD.
- # High priority rule is then removed.
- # Traffic sent on pg0 interface should match low priority
- # rule and should be discarded after SPD lookup.
- self.create_interfaces(2)
- pkt_count = 5
- self.spd_create_and_intf_add(1, [self.pg1])
- policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- )
- policy_1 = self.spd_add_rem_policy( # outbound, priority 5
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=5,
- policy_type="discard",
- )
-
- # create the packet stream
- packets = self.create_stream(self.pg0, self.pg1, pkt_count)
- # add the stream to the source interface + enable capture
- self.pg0.add_stream(packets)
- self.pg0.enable_capture()
- self.pg1.enable_capture()
- # start the packet generator
- self.pg_start()
- # get capture
- capture = self.pg1.get_capture()
- for packet in capture:
- try:
- self.logger.debug(ppp("SPD - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
-
- # assert nothing captured on pg0
- self.pg0.assert_nothing_captured()
- # verify capture on pg1
- self.logger.debug("SPD: Num packets: %s", len(capture.res))
- self.verify_capture(self.pg0, self.pg1, capture)
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(0, policy_1)
- # now remove the bypass rule
- self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- remove=True,
- )
-
- # resend the same packets
- self.pg0.add_stream(packets)
- self.pg0.enable_capture() # flush the old captures
- self.pg1.enable_capture()
- self.pg_start()
- # assert nothing captured on pg0
- self.pg0.assert_nothing_captured()
- # all packets will be dropped by SPD rule
- self.pg1.assert_nothing_captured()
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(pkt_count, policy_1)
-
-
-class IPSec4SpdTestCaseReadd(SpdFastPathOutbound):
- """ IPSec/IPv4 outbound: Policy mode test case with fast path \
- (add, remove, re-add)"""
-
- def test_ipsec_spd_outbound_readd(self):
- # In this test case, packets in IPv4 FWD path are configured
- # to go through IPSec outbound SPD policy lookup.
- # 2 SPD rules (1 HIGH and 1 LOW) are added.
- # High priority rule action is set to BYPASS.
- # Low priority rule action is set to DISCARD.
- # Traffic sent on pg0 interface should match high priority
- # rule and should be sent out on pg1 interface.
- # High priority rule is then removed.
- # Traffic sent on pg0 interface should match low priority
- # rule and should be discarded after SPD lookup.
- # Readd high priority rule.
- # Traffic sent on pg0 interface should match high priority
- # rule and should be sent out on pg1 interface.
- self.create_interfaces(2)
- pkt_count = 5
- self.spd_create_and_intf_add(1, [self.pg1])
- policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- )
- policy_1 = self.spd_add_rem_policy( # outbound, priority 5
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=5,
- policy_type="discard",
- )
-
- # create the packet stream
- packets = self.create_stream(self.pg0, self.pg1, pkt_count)
- # add the stream to the source interface + enable capture
- self.pg0.add_stream(packets)
- self.pg0.enable_capture()
- self.pg1.enable_capture()
- # start the packet generator
- self.pg_start()
- # get capture
- capture = self.pg1.get_capture()
- for packet in capture:
- try:
- self.logger.debug(ppp("SPD - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
- self.logger.debug("SPD: Num packets: %s", len(capture.res))
-
- # assert nothing captured on pg0
- self.pg0.assert_nothing_captured()
- # verify capture on pg1
- self.verify_capture(self.pg0, self.pg1, capture)
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(0, policy_1)
- # remove the bypass rule, leaving only the discard rule
- self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- remove=True,
- )
-
- # resend the same packets
- self.pg0.add_stream(packets)
- self.pg0.enable_capture() # flush the old captures
- self.pg1.enable_capture()
- self.pg_start()
-
- # assert nothing captured on pg0
- self.pg0.assert_nothing_captured()
- # all packets will be dropped by SPD rule
- self.pg1.assert_nothing_captured()
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(pkt_count, policy_1)
-
- # now readd the bypass rule
- policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- )
-
- # resend the same packets
- self.pg0.add_stream(packets)
- self.pg0.enable_capture() # flush the old captures
- self.pg1.enable_capture()
- self.pg_start()
-
- # get capture
- capture = self.pg1.get_capture(pkt_count)
- for packet in capture:
- try:
- self.logger.debug(ppp("SPD - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
- self.logger.debug("SPD: Num packets: %s", len(capture.res))
-
- # assert nothing captured on pg0
- self.pg0.assert_nothing_captured()
- # verify captured packets
- self.verify_capture(self.pg0, self.pg1, capture)
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(pkt_count, policy_1)
-
-
-class IPSec4SpdTestCaseMultiple(SpdFastPathOutbound):
- """ IPSec/IPv4 outbound: Policy mode test case with fast path \
- (multiple interfaces, multiple rules)"""
-
- def test_ipsec_spd_outbound_multiple(self):
- # In this test case, packets in IPv4 FWD path are configured to go
- # through IPSec outbound SPD policy lookup.
- # Multiples rules on multiple interfaces are tested at the same time.
- # 3x interfaces are configured, binding the same SPD to each.
- # Each interface has 2 SPD rules (1 BYPASS and 1 DISCARD).
- # On pg0 & pg1, the BYPASS rule is HIGH priority
- # On pg2, the DISCARD rule is HIGH priority
- # Traffic should be received on pg0 & pg1 and dropped on pg2.
- self.create_interfaces(3)
- pkt_count = 5
- # bind SPD to all interfaces
- self.spd_create_and_intf_add(1, self.pg_interfaces)
- # add rules on all interfaces
- policy_01 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- )
- policy_02 = self.spd_add_rem_policy( # outbound, priority 5
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=5,
- policy_type="discard",
- )
-
- policy_11 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg1,
- self.pg2,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- )
- policy_12 = self.spd_add_rem_policy( # outbound, priority 5
- 1,
- self.pg1,
- self.pg2,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=5,
- policy_type="discard",
- )
-
- policy_21 = self.spd_add_rem_policy( # outbound, priority 5
- 1,
- self.pg2,
- self.pg0,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=5,
- policy_type="bypass",
- )
- policy_22 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg2,
- self.pg0,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="discard",
- )
-
- # interfaces bound to an SPD, will by default drop inbound
- # traffic with no matching policies. add catch-all inbound
- # bypass rule to SPD:
- self.spd_add_rem_policy( # inbound, all interfaces
- 1,
- None,
- None,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=10,
- policy_type="bypass",
- all_ips=True,
- )
-
- # create the packet streams
- packets0 = self.create_stream(self.pg0, self.pg1, pkt_count)
- packets1 = self.create_stream(self.pg1, self.pg2, pkt_count)
- packets2 = self.create_stream(self.pg2, self.pg0, pkt_count)
- # add the streams to the source interfaces
- self.pg0.add_stream(packets0)
- self.pg1.add_stream(packets1)
- self.pg2.add_stream(packets2)
- # enable capture on all interfaces
- for pg in self.pg_interfaces:
- pg.enable_capture()
- # start the packet generator
- self.pg_start()
-
- # get captures
- if_caps = []
- for pg in [self.pg1, self.pg2]: # we are expecting captures on pg1/pg2
- if_caps.append(pg.get_capture())
- for packet in if_caps[-1]:
- try:
- self.logger.debug(ppp("SPD - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
- self.logger.debug("SPD: Num packets: %s", len(if_caps[0].res))
- self.logger.debug("SPD: Num packets: %s", len(if_caps[1].res))
-
- # verify captures that matched BYPASS rule
- self.verify_capture(self.pg0, self.pg1, if_caps[0])
- self.verify_capture(self.pg1, self.pg2, if_caps[1])
- # verify that traffic to pg0 matched DISCARD rule and was dropped
- self.pg0.assert_nothing_captured()
- # verify all packets that were expected to match rules, matched
- # pg0 -> pg1
- self.verify_policy_match(pkt_count, policy_01)
- self.verify_policy_match(0, policy_02)
- # pg1 -> pg2
- self.verify_policy_match(pkt_count, policy_11)
- self.verify_policy_match(0, policy_12)
- # pg2 -> pg0
- self.verify_policy_match(0, policy_21)
- self.verify_policy_match(pkt_count, policy_22)
-
-
-class IPSec6SpdTestCaseAdd(SpdFastPathIPv6Outbound):
- """ IPSec/IPv6 outbound: Policy mode test case with fast path \
- (add rule)"""
-
- def test_ipsec_spd_outbound_add(self):
- # In this test case, packets in IPv4 FWD path are configured
- # to go through IPSec outbound SPD policy lookup.
- # 2 SPD rules (1 HIGH and 1 LOW) are added.
- # High priority rule action is set to BYPASS.
- # Low priority rule action is set to DISCARD.
- # Traffic sent on pg0 interface should match high priority
- # rule and should be sent out on pg1 interface.
- self.create_interfaces(2)
- pkt_count = 5
- s_port_s = 1111
- s_port_e = 1111
- d_port_s = 2222
- d_port_e = 2222
- self.spd_create_and_intf_add(1, [self.pg1])
- policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- local_port_start=s_port_s,
- local_port_stop=s_port_e,
- remote_port_start=d_port_s,
- remote_port_stop=d_port_e,
- )
- policy_1 = self.spd_add_rem_policy( # outbound, priority 5
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=5,
- policy_type="discard",
- local_port_start=s_port_s,
- local_port_stop=s_port_e,
- remote_port_start=d_port_s,
- remote_port_stop=d_port_e,
- )
-
- # create the packet stream
- packets = self.create_stream(self.pg0, self.pg1, pkt_count, s_port_s, d_port_s)
- # add the stream to the source interface + enable capture
- self.pg0.add_stream(packets)
- self.pg0.enable_capture()
- self.pg1.enable_capture()
- # start the packet generator
- self.pg_start()
- # get capture
- capture = self.pg1.get_capture()
- for packet in capture:
- try:
- self.logger.debug(ppp("SPD - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
- self.logger.debug("SPD: Num packets: %s", len(capture.res))
-
- # assert nothing captured on pg0
- self.pg0.assert_nothing_captured()
- # verify captured packets
- self.verify_capture(self.pg0, self.pg1, capture)
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(0, policy_1)
-
-
-class IPSec6SpdTestCaseAddAll(SpdFastPathIPv6Outbound):
- """ IPSec/IPv6 outbound: Policy mode test case with fast path \
- (add all ips ports rule)"""
-
- def test_ipsec_spd_outbound_add(self):
- # In this test case, packets in IPv4 FWD path are configured
- # to go through IPSec outbound SPD policy lookup.
- # 2 SPD rules (1 HIGH and 1 LOW) are added.
- # Low priority rule action is set to BYPASS all ips.
- # High priority rule action is set to DISCARD all ips.
- # Traffic sent on pg0 interface when LOW priority rule is added,
- # expect the packet is being sent out to pg1. Then HIGH priority
- # rule is added and send the same traffic to pg0, this time expect
- # the traffic is dropped.
- self.create_interfaces(2)
- pkt_count = 5
- self.spd_create_and_intf_add(1, [self.pg1])
- policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- all_ips=True,
- )
-
- # create the packet stream
- packets = self.create_stream(self.pg0, self.pg1, pkt_count)
- # add the stream to the source interface + enable capture
- self.pg0.add_stream(packets)
- self.pg0.enable_capture()
- self.pg1.enable_capture()
- # start the packet generator
- self.pg_start()
- # get capture
- capture = self.pg1.get_capture()
- for packet in capture:
- try:
- self.logger.debug(ppp("SPD - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
- self.logger.debug("SPD: Num packets: %s", len(capture.res))
-
- # assert nothing captured on pg0
- self.pg0.assert_nothing_captured()
- # verify captured packets
- self.verify_capture(self.pg0, self.pg1, capture)
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
-
- policy_1 = self.spd_add_rem_policy( # outbound, priority 20
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=20,
- policy_type="discard",
- all_ips=True,
- )
-
- # create the packet stream
- packets = self.create_stream(self.pg0, self.pg1, pkt_count)
- # add the stream to the source interface + enable capture
- self.pg0.add_stream(packets)
- self.pg0.enable_capture()
- self.pg1.enable_capture()
- # start the packet generator
- self.pg_start()
- # assert nothing captured on pg0 and pg1
- self.pg0.assert_nothing_captured()
- self.pg1.assert_nothing_captured()
-
-
-class IPSec6SpdTestCaseAddPortRange(SpdFastPathIPv6Outbound):
- """ IPSec/IPv6 outbound: Policy mode test case with fast path \
- (add all ips port range rule)"""
-
- def test_ipsec_spd_outbound_add(self):
- # In this test case, packets in IPv4 FWD path are configured
- # to go through IPSec outbound SPD policy lookup.
- # 2 SPD rules (1 HIGH and 1 LOW) are added.
- # High priority rule action is set to BYPASS.
- # Low priority rule action is set to DISCARD.
- # Traffic sent on pg0 interface should match high priority
- # rule and should be sent out on pg1 interface.
- self.create_interfaces(2)
- pkt_count = 5
- s_port_s = 1000
- s_port_e = 2023
- d_port_s = 5000
- d_port_e = 6023
- self.spd_create_and_intf_add(1, [self.pg1])
- policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- all_ips=True,
- local_port_start=s_port_s,
- local_port_stop=s_port_e,
- remote_port_start=d_port_s,
- remote_port_stop=d_port_e,
- )
- policy_1 = self.spd_add_rem_policy( # outbound, priority 5
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=5,
- policy_type="discard",
- all_ips=True,
- local_port_start=s_port_s,
- local_port_stop=s_port_e,
- remote_port_start=d_port_s,
- remote_port_stop=d_port_e,
- )
-
- # create the packet stream
- packets = self.create_stream(self.pg0, self.pg1, pkt_count, 1333, 5444)
- # add the stream to the source interface + enable capture
- self.pg0.add_stream(packets)
- self.pg0.enable_capture()
- self.pg1.enable_capture()
- # start the packet generator
- self.pg_start()
- # get capture
- capture = self.pg1.get_capture()
- for packet in capture:
- try:
- self.logger.debug(ppp("SPD - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
- self.logger.debug("SPD: Num packets: %s", len(capture.res))
-
- # assert nothing captured on pg0
- self.pg0.assert_nothing_captured()
- # verify captured packets
- self.verify_capture(self.pg0, self.pg1, capture)
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(0, policy_1)
-
-
-class IPSec6SpdTestCaseAddIPRange(SpdFastPathIPv6Outbound):
- """ IPSec/IPv6 outbound: Policy mode test case with fast path \
- (add ips range with any port rule)"""
-
- def test_ipsec_spd_outbound_add(self):
- # In this test case, packets in IPv4 FWD path are configured
- # to go through IPSec outbound SPD policy lookup.
- # 2 SPD rules (1 HIGH and 1 LOW) are added.
- # High priority rule action is set to BYPASS.
- # Low priority rule action is set to DISCARD.
- # Traffic sent on pg0 interface should match high priority
- # rule and should be sent out on pg1 interface.
- self.create_interfaces(2)
- pkt_count = 5
- s_ip_s = ipaddress.ip_address(self.pg0.remote_ip6)
- s_ip_e = ipaddress.ip_address(int(s_ip_s) + 5)
- d_ip_s = ipaddress.ip_address(self.pg1.remote_ip6)
- d_ip_e = ipaddress.ip_address(int(d_ip_s) + 0)
- self.spd_create_and_intf_add(1, [self.pg1])
- policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- ip_range=True,
- local_ip_start=s_ip_s,
- local_ip_stop=s_ip_e,
- remote_ip_start=d_ip_s,
- remote_ip_stop=d_ip_e,
- )
- policy_1 = self.spd_add_rem_policy( # outbound, priority 5
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=5,
- policy_type="discard",
- ip_range=True,
- local_ip_start=s_ip_s,
- local_ip_stop=s_ip_e,
- remote_ip_start=d_ip_s,
- remote_ip_stop=d_ip_e,
- )
-
- # create the packet stream
- packets = self.create_stream(self.pg0, self.pg1, pkt_count)
- # add the stream to the source interface + enable capture
- self.pg0.add_stream(packets)
- self.pg0.enable_capture()
- self.pg1.enable_capture()
- # start the packet generator
- self.pg_start()
- # get capture
- capture = self.pg1.get_capture()
- for packet in capture:
- try:
- self.logger.debug(ppp("SPD - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
- self.logger.debug("SPD: Num packets: %s", len(capture.res))
-
- # assert nothing captured on pg0
- self.pg0.assert_nothing_captured()
- # verify captured packets
- self.verify_capture(self.pg0, self.pg1, capture)
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(0, policy_1)
-
-
-class IPSec6SpdTestCaseAddIPAndPortRange(SpdFastPathIPv6Outbound):
- """ IPSec/IPvr6 outbound: Policy mode test case with fast path \
- (add all ips range rule)"""
-
- def test_ipsec_spd_outbound_add(self):
- # In this test case, packets in IPv4 FWD path are configured
- # to go through IPSec outbound SPD policy lookup.
- # 2 SPD rules (1 HIGH and 1 LOW) are added.
- # High priority rule action is set to BYPASS.
- # Low priority rule action is set to DISCARD.
- # Traffic sent on pg0 interface should match high priority
- # rule and should be sent out on pg1 interface.
- # in this test we define ranges of ports and ip addresses.
- self.create_interfaces(2)
- pkt_count = 5
- s_port_s = 1000
- s_port_e = 1000 + 1023
- d_port_s = 5000
- d_port_e = 5000 + 1023
-
- s_ip_s = ipaddress.ip_address(
- int(ipaddress.ip_address(self.pg0.remote_ip6)) - 24
- )
- s_ip_e = ipaddress.ip_address(int(s_ip_s) + 255)
- d_ip_s = ipaddress.ip_address(self.pg1.remote_ip6)
- d_ip_e = ipaddress.ip_address(int(d_ip_s) + 255)
- self.spd_create_and_intf_add(1, [self.pg1])
- policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- ip_range=True,
- local_ip_start=s_ip_s,
- local_ip_stop=s_ip_e,
- remote_ip_start=d_ip_s,
- remote_ip_stop=d_ip_e,
- local_port_start=s_port_s,
- local_port_stop=s_port_e,
- remote_port_start=d_port_s,
- remote_port_stop=d_port_e,
- )
- policy_1 = self.spd_add_rem_policy( # outbound, priority 5
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=5,
- policy_type="discard",
- ip_range=True,
- local_ip_start=s_ip_s,
- local_ip_stop=s_ip_e,
- remote_ip_start=d_ip_s,
- remote_ip_stop=d_ip_e,
- local_port_start=s_port_s,
- local_port_stop=s_port_e,
- remote_port_start=d_port_s,
- remote_port_stop=d_port_e,
- )
-
- # create the packet stream
- packets = self.create_stream(self.pg0, self.pg1, pkt_count)
- # add the stream to the source interface + enable capture
- self.pg0.add_stream(packets)
- self.pg0.enable_capture()
- self.pg1.enable_capture()
- # start the packet generator
- self.pg_start()
- # get capture
- capture = self.pg1.get_capture()
- for packet in capture:
- try:
- self.logger.debug(ppp("SPD - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
- self.logger.debug("SPD: Num packets: %s", len(capture.res))
-
- # assert nothing captured on pg0
- self.pg0.assert_nothing_captured()
- # verify captured packets
- self.verify_capture(self.pg0, self.pg1, capture)
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(0, policy_1)
-
-
-class IPSec6SpdTestCaseReadd(SpdFastPathIPv6Outbound):
- """ IPSec/IPv6 outbound: Policy mode test case with fast path \
- (add, remove, re-add)"""
-
- def test_ipsec_spd_outbound_readd(self):
- # In this test case, packets in IPv4 FWD path are configured
- # to go through IPSec outbound SPD policy lookup.
- # 2 SPD rules (1 HIGH and 1 LOW) are added.
- # High priority rule action is set to BYPASS.
- # Low priority rule action is set to DISCARD.
- # Traffic sent on pg0 interface should match high priority
- # rule and should be sent out on pg1 interface.
- # High priority rule is then removed.
- # Traffic sent on pg0 interface should match low priority
- # rule and should be discarded after SPD lookup.
- # Readd high priority rule.
- # Traffic sent on pg0 interface should match high priority
- # rule and should be sent out on pg1 interface.
- self.create_interfaces(2)
- pkt_count = 5
- self.spd_create_and_intf_add(1, [self.pg1])
- policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- )
- policy_1 = self.spd_add_rem_policy( # outbound, priority 5
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=5,
- policy_type="discard",
- )
-
- # create the packet stream
- packets = self.create_stream(self.pg0, self.pg1, pkt_count)
- # add the stream to the source interface + enable capture
- self.pg0.add_stream(packets)
- self.pg0.enable_capture()
- self.pg1.enable_capture()
- # start the packet generator
- self.pg_start()
- # get capture
- capture = self.pg1.get_capture()
- for packet in capture:
- try:
- self.logger.debug(ppp("SPD - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
- self.logger.debug("SPD: Num packets: %s", len(capture.res))
-
- # assert nothing captured on pg0
- self.pg0.assert_nothing_captured()
- # verify capture on pg1
- self.verify_capture(self.pg0, self.pg1, capture)
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(0, policy_1)
- # remove the bypass rule, leaving only the discard rule
- self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- remove=True,
- )
-
- # resend the same packets
- self.pg0.add_stream(packets)
- self.pg0.enable_capture() # flush the old captures
- self.pg1.enable_capture()
- self.pg_start()
-
- # assert nothing captured on pg0
- self.pg0.assert_nothing_captured()
- # all packets will be dropped by SPD rule
- self.pg1.assert_nothing_captured()
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(pkt_count, policy_1)
-
- # now readd the bypass rule
- policy_0 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- )
-
- # resend the same packets
- self.pg0.add_stream(packets)
- self.pg0.enable_capture() # flush the old captures
- self.pg1.enable_capture()
- self.pg_start()
-
- # get capture
- capture = self.pg1.get_capture(pkt_count)
- for packet in capture:
- try:
- self.logger.debug(ppp("SPD - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
- self.logger.debug("SPD: Num packets: %s", len(capture.res))
-
- # assert nothing captured on pg0
- self.pg0.assert_nothing_captured()
- # verify captured packets
- self.verify_capture(self.pg0, self.pg1, capture)
- # verify all policies matched the expected number of times
- self.verify_policy_match(pkt_count, policy_0)
- self.verify_policy_match(pkt_count, policy_1)
-
-
-class IPSec6SpdTestCaseMultiple(SpdFastPathIPv6Outbound):
- """ IPSec/IPv6 outbound: Policy mode test case with fast path \
- (multiple interfaces, multiple rules)"""
-
- def test_ipsec_spd_outbound_multiple(self):
- # In this test case, packets in IPv4 FWD path are configured to go
- # through IPSec outbound SPD policy lookup.
- # Multiples rules on multiple interfaces are tested at the same time.
- # 3x interfaces are configured, binding the same SPD to each.
- # Each interface has 2 SPD rules (1 BYPASS and 1 DISCARD).
- # On pg0 & pg1, the BYPASS rule is HIGH priority
- # On pg2, the DISCARD rule is HIGH priority
- # Traffic should be received on pg0 & pg1 and dropped on pg2.
- self.create_interfaces(3)
- pkt_count = 5
- # bind SPD to all interfaces
- self.spd_create_and_intf_add(1, self.pg_interfaces)
- # add rules on all interfaces
- policy_01 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- )
- policy_02 = self.spd_add_rem_policy( # outbound, priority 5
- 1,
- self.pg0,
- self.pg1,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=5,
- policy_type="discard",
- )
-
- policy_11 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg1,
- self.pg2,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="bypass",
- )
- policy_12 = self.spd_add_rem_policy( # outbound, priority 5
- 1,
- self.pg1,
- self.pg2,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=5,
- policy_type="discard",
- )
-
- policy_21 = self.spd_add_rem_policy( # outbound, priority 5
- 1,
- self.pg2,
- self.pg0,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=5,
- policy_type="bypass",
- )
- policy_22 = self.spd_add_rem_policy( # outbound, priority 10
- 1,
- self.pg2,
- self.pg0,
- socket.IPPROTO_UDP,
- is_out=1,
- priority=10,
- policy_type="discard",
- )
-
- # interfaces bound to an SPD, will by default drop inbound
- # traffic with no matching policies. add catch-all inbound
- # bypass rule to SPD:
- self.spd_add_rem_policy( # inbound, all interfaces
- 1,
- None,
- None,
- socket.IPPROTO_UDP,
- is_out=0,
- priority=10,
- policy_type="bypass",
- all_ips=True,
- )
-
- # create the packet streams
- packets0 = self.create_stream(self.pg0, self.pg1, pkt_count)
- packets1 = self.create_stream(self.pg1, self.pg2, pkt_count)
- packets2 = self.create_stream(self.pg2, self.pg0, pkt_count)
- # add the streams to the source interfaces
- self.pg0.add_stream(packets0)
- self.pg1.add_stream(packets1)
- self.pg2.add_stream(packets2)
- # enable capture on all interfaces
- for pg in self.pg_interfaces:
- pg.enable_capture()
- # start the packet generator
- self.pg_start()
-
- # get captures
- if_caps = []
- for pg in [self.pg1, self.pg2]: # we are expecting captures on pg1/pg2
- if_caps.append(pg.get_capture())
- for packet in if_caps[-1]:
- try:
- self.logger.debug(ppp("SPD - Got packet:", packet))
- except Exception:
- self.logger.error(ppp("Unexpected or invalid packet:", packet))
- raise
- self.logger.debug("SPD: Num packets: %s", len(if_caps[0].res))
- self.logger.debug("SPD: Num packets: %s", len(if_caps[1].res))
-
- # verify captures that matched BYPASS rule
- self.verify_capture(self.pg0, self.pg1, if_caps[0])
- self.verify_capture(self.pg1, self.pg2, if_caps[1])
- # verify that traffic to pg0 matched DISCARD rule and was dropped
- self.pg0.assert_nothing_captured()
- # verify all packets that were expected to match rules, matched
- # pg0 -> pg1
- self.verify_policy_match(pkt_count, policy_01)
- self.verify_policy_match(0, policy_02)
- # pg1 -> pg2
- self.verify_policy_match(pkt_count, policy_11)
- self.verify_policy_match(0, policy_12)
- # pg2 -> pg0
- self.verify_policy_match(0, policy_21)
- self.verify_policy_match(pkt_count, policy_22)
-
-
-if __name__ == "__main__":
- unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_lb_api.py b/test/asf/test_lb_api.py
index b1e04a93ec3..9608d0473a6 100644
--- a/test/asf/test_lb_api.py
+++ b/test/asf/test_lb_api.py
@@ -12,13 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import asfframework
-import ipaddress
+from asfframework import VppAsfTestCase
DEFAULT_VIP = "lb_vip_details(_0=978, context=12, vip=vl_api_lb_ip_addr_t(pfx=IPv6Network(u'::/0'), protocol=<vl_api_ip_proto_t.IP_API_PROTO_RESERVED: 255>, port=0), encap=<vl_api_lb_encap_type_t.LB_API_ENCAP_TYPE_GRE4: 0>, dscp=<vl_api_ip_dscp_t.IP_API_DSCP_CS0: 0>, srv_type=<vl_api_lb_srv_type_t.LB_API_SRV_TYPE_CLUSTERIP: 0>, target_port=0, flow_table_length=0)" # noqa
-class TestLbEmptyApi(asfframework.VppTestCase):
+class TestLbEmptyApi(VppAsfTestCase):
"""TestLbEmptyApi"""
def test_lb_empty_vip_dump(self):
@@ -35,7 +34,7 @@ class TestLbEmptyApi(asfframework.VppTestCase):
self.assertEqual(rv, [], "Expected: [] Received: %r." % rv)
-class TestLbApi(asfframework.VppTestCase):
+class TestLbApi(VppAsfTestCase):
"""TestLbApi"""
def test_lb_vip_dump(self):
@@ -56,7 +55,7 @@ class TestLbApi(asfframework.VppTestCase):
self.vapi.cli("lb vip 2001::/16 del")
-class TestLbAsApi(asfframework.VppTestCase):
+class TestLbAsApi(VppAsfTestCase):
"""TestLbAsApi"""
def test_lb_as_dump(self):
diff --git a/test/asf/test_mactime.py b/test/asf/test_mactime.py
index 1becd6f2eb3..215bd132cf0 100644
--- a/test/asf/test_mactime.py
+++ b/test/asf/test_mactime.py
@@ -3,11 +3,10 @@
import unittest
from config import config
-from asfframework import VppTestCase, VppTestRunner
-from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
+from asfframework import VppAsfTestCase, VppTestRunner
-class TestMactime(VppTestCase):
+class TestMactime(VppAsfTestCase):
"""Mactime Unit Test Cases"""
@classmethod
diff --git a/test/asf/test_mpcap.py b/test/asf/test_mpcap.py
index 854182d84a2..ed8ce1e0ea9 100644
--- a/test/asf/test_mpcap.py
+++ b/test/asf/test_mpcap.py
@@ -2,12 +2,11 @@
import unittest
-from asfframework import VppTestCase, VppTestRunner
-from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
+from asfframework import VppAsfTestCase, VppTestRunner
import os
-class TestMpcap(VppTestCase):
+class TestMpcap(VppAsfTestCase):
"""Mpcap Unit Test Cases"""
@classmethod
diff --git a/test/asf/test_node_variants.py b/test/asf/test_node_variants.py
index 5762664ca93..c0c7cc35658 100644
--- a/test/asf/test_node_variants.py
+++ b/test/asf/test_node_variants.py
@@ -2,7 +2,7 @@
import re
import unittest
import platform
-from asfframework import VppTestCase
+from asfframework import VppAsfTestCase
def checkX86():
@@ -19,7 +19,7 @@ def skipVariant(variant):
return checkX86() and match is not None
-class TestNodeVariant(VppTestCase):
+class TestNodeVariant(VppAsfTestCase):
"""Test Node Variants"""
@classmethod
diff --git a/test/asf/test_offload.py b/test/asf/test_offload.py
index ce5a65d98df..4c800129094 100644
--- a/test/asf/test_offload.py
+++ b/test/asf/test_offload.py
@@ -2,11 +2,10 @@
import unittest
-from asfframework import VppTestCase, VppTestRunner
-from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
+from asfframework import VppAsfTestCase, VppTestRunner
-class TestOffload(VppTestCase):
+class TestOffload(VppAsfTestCase):
"""Offload Unit Test Cases"""
@classmethod
diff --git a/test/asf/test_pcap.py b/test/asf/test_pcap.py
deleted file mode 100644
index c2ba1384533..00000000000
--- a/test/asf/test_pcap.py
+++ /dev/null
@@ -1,163 +0,0 @@
-#!/usr/bin/env python3
-
-import os
-import unittest
-
-from scapy.layers.l2 import Ether
-from scapy.layers.inet import IP, UDP
-from scapy.packet import Raw
-
-from asfframework import VppTestCase, VppTestRunner
-
-
-class TestPcap(VppTestCase):
- """Pcap Unit Test Cases"""
-
- @classmethod
- def setUpClass(cls):
- super(TestPcap, cls).setUpClass()
-
- cls.create_pg_interfaces(range(1))
- for i in cls.pg_interfaces:
- i.admin_up()
- i.config_ip4()
- i.resolve_arp()
-
- @classmethod
- def tearDownClass(cls):
- for i in cls.pg_interfaces:
- i.admin_down()
-
- super(TestPcap, cls).tearDownClass()
-
- def setUp(self):
- super(TestPcap, self).setUp()
-
- def tearDown(self):
- super(TestPcap, self).tearDown()
-
- # This is a code coverage test, but it only runs for 0.3 seconds
- # might as well just run it...
- def test_pcap_unittest(self):
- """PCAP Capture Tests"""
- cmds = [
- "loop create",
- "set int ip address loop0 11.22.33.1/24",
- "set int state loop0 up",
- "loop create",
- "set int ip address loop1 11.22.34.1/24",
- "set int state loop1 up",
- "set ip neighbor loop1 11.22.34.44 03:00:11:22:34:44",
- "packet-generator new {\n"
- " name s0\n"
- " limit 10\n"
- " size 128-128\n"
- " interface loop0\n"
- " tx-interface loop1\n"
- " node loop1-output\n"
- " buffer-flags ip4 offload\n"
- " buffer-offload-flags offload-ip-cksum offload-udp-cksum\n"
- " data {\n"
- " IP4: 1.2.3 -> dead.0000.0001\n"
- " UDP: 11.22.33.44 -> 11.22.34.44\n"
- " ttl 2 checksum 13\n"
- " UDP: 1234 -> 2345\n"
- " checksum 11\n"
- " incrementing 114\n"
- " }\n"
- "}",
- "pcap dispatch trace on max 100 buffer-trace pg-input 10",
- "pa en",
- "pcap dispatch trace off",
- "pcap trace rx tx max 1000 intfc any",
- "pa en",
- "pcap trace status",
- "pcap trace rx tx off",
- "classify filter pcap mask l3 ip4 src match l3 ip4 src 11.22.33.44",
- "pcap trace rx tx max 1000 intfc any file filt.pcap filter",
- "show cla t verbose 2",
- "show cla t verbose",
- "show cla t",
- "pa en",
- "pcap trace rx tx off",
- "classify filter pcap del mask l3 ip4 src",
- ]
-
- for cmd in cmds:
- r = self.vapi.cli_return_response(cmd)
- if r.retval != 0:
- if hasattr(r, "reply"):
- self.logger.info(cmd + " FAIL reply " + r.reply)
- else:
- self.logger.info(cmd + " FAIL retval " + str(r.retval))
-
- self.assertTrue(os.path.exists("/tmp/dispatch.pcap"))
- self.assertTrue(os.path.exists("/tmp/rxtx.pcap"))
- self.assertTrue(os.path.exists("/tmp/filt.pcap"))
- os.remove("/tmp/dispatch.pcap")
- os.remove("/tmp/rxtx.pcap")
- os.remove("/tmp/filt.pcap")
-
- def test_pcap_trace_api(self):
- """PCAP API Tests"""
-
- pkt = (
- Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
- / IP(src=self.pg0.local_ip4, dst=self.pg0.remote_ip4, ttl=2)
- / UDP(sport=1234, dport=2345)
- / Raw(b"\xa5" * 128)
- )
-
- self.vapi.pcap_trace_on(
- capture_rx=True,
- capture_tx=True,
- max_packets=1000,
- sw_if_index=0,
- filename="trace_any.pcap",
- )
- self.pg_send(self.pg0, pkt * 10)
- self.vapi.pcap_trace_off()
-
- self.vapi.cli(
- f"classify filter pcap mask l3 ip4 src match l3 ip4 src {self.pg0.local_ip4}"
- )
- self.vapi.pcap_trace_on(
- capture_rx=True,
- capture_tx=True,
- filter=True,
- max_packets=1000,
- sw_if_index=0,
- filename="trace_any_filter.pcap",
- )
- self.pg_send(self.pg0, pkt * 10)
- self.vapi.pcap_trace_off()
- self.vapi.cli("classify filter pcap del mask l3 ip4 src")
-
- pkt = (
- Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac)
- # wrong destination address
- / IP(src=self.pg0.local_ip4, dst=self.pg0.local_ip4, ttl=2)
- / UDP(sport=1234, dport=2345)
- / Raw(b"\xa5" * 128)
- )
-
- self.vapi.pcap_trace_on(
- capture_drop=True,
- max_packets=1000,
- sw_if_index=0,
- error="{ip4-local}.{spoofed_local_packets}",
- filename="trace_drop_err.pcap",
- )
- self.pg_send(self.pg0, pkt * 10)
- self.vapi.pcap_trace_off()
-
- self.assertTrue(os.path.exists("/tmp/trace_any.pcap"))
- self.assertTrue(os.path.exists("/tmp/trace_any_filter.pcap"))
- self.assertTrue(os.path.exists("/tmp/trace_drop_err.pcap"))
- os.remove("/tmp/trace_any.pcap")
- os.remove("/tmp/trace_any_filter.pcap")
- os.remove("/tmp/trace_drop_err.pcap")
-
-
-if __name__ == "__main__":
- unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_policer.py b/test/asf/test_policer.py
index c23ec00956d..9c01bf0fc1c 100644
--- a/test/asf/test_policer.py
+++ b/test/asf/test_policer.py
@@ -3,8 +3,8 @@
import unittest
-from asfframework import VppTestCase, VppTestRunner
-from vpp_policer import VppPolicer, PolicerAction
+from asfframework import VppAsfTestCase, VppTestRunner
+from vpp_policer import VppPolicer
# Default for the tests is 10s of "Green" packets at 8Mbps, ie. 10M bytes.
# The policer helper CLI "sends" 500 byte packets, so default is 20000.
@@ -23,7 +23,7 @@ CBURST = 100000 # Committed burst in bytes
EBURST = 200000 # Excess burst in bytes
-class TestPolicer(VppTestCase):
+class TestPolicer(VppAsfTestCase):
"""Policer Test Case"""
def run_policer_test(
diff --git a/test/asf/test_quic.py b/test/asf/test_quic.py
index 2414186ab13..e453bd5b3e5 100644
--- a/test/asf/test_quic.py
+++ b/test/asf/test_quic.py
@@ -3,11 +3,9 @@
import unittest
import os
-import subprocess
import signal
from config import config
-from framework import tag_fixme_vpp_workers
-from framework import VppTestCase, VppTestRunner, Worker
+from asfframework import VppAsfTestCase, VppTestRunner, Worker, tag_fixme_vpp_workers
from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
@@ -53,7 +51,7 @@ class QUICAppWorker(Worker):
@unittest.skipIf("quic" in config.excluded_plugins, "Exclude QUIC plugin tests")
-class QUICTestCase(VppTestCase):
+class QUICTestCase(VppAsfTestCase):
"""QUIC Test Case"""
timeout = 20
diff --git a/test/asf/test_session.py b/test/asf/test_session.py
index 885d66c6863..64f59df5758 100644
--- a/test/asf/test_session.py
+++ b/test/asf/test_session.py
@@ -2,14 +2,17 @@
import unittest
-from asfframework import tag_fixme_vpp_workers
-from asfframework import VppTestCase, VppTestRunner
-from asfframework import tag_run_solo
+from asfframework import (
+ VppAsfTestCase,
+ VppTestRunner,
+ tag_fixme_vpp_workers,
+ tag_run_solo,
+)
from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
@tag_fixme_vpp_workers
-class TestSession(VppTestCase):
+class TestSession(VppAsfTestCase):
"""Session Test Case"""
@classmethod
@@ -106,7 +109,7 @@ class TestSession(VppTestCase):
@tag_fixme_vpp_workers
-class TestSessionUnitTests(VppTestCase):
+class TestSessionUnitTests(VppAsfTestCase):
"""Session Unit Tests Case"""
@classmethod
@@ -135,7 +138,7 @@ class TestSessionUnitTests(VppTestCase):
@tag_run_solo
-class TestSegmentManagerTests(VppTestCase):
+class TestSegmentManagerTests(VppAsfTestCase):
"""SVM Fifo Unit Tests Case"""
@classmethod
@@ -162,7 +165,7 @@ class TestSegmentManagerTests(VppTestCase):
@tag_run_solo
-class TestSvmFifoUnitTests(VppTestCase):
+class TestSvmFifoUnitTests(VppAsfTestCase):
"""SVM Fifo Unit Tests Case"""
@classmethod
diff --git a/test/asf/test_sparse_vec.py b/test/asf/test_sparse_vec.py
index 614bc2e94bc..cf0afd8aaf3 100644
--- a/test/asf/test_sparse_vec.py
+++ b/test/asf/test_sparse_vec.py
@@ -2,11 +2,10 @@
import unittest
-from asfframework import VppTestCase, VppTestRunner
-from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
+from asfframework import VppAsfTestCase, VppTestRunner
-class TestSparseVec(VppTestCase):
+class TestSparseVec(VppAsfTestCase):
"""SparseVec Test Cases"""
@classmethod
diff --git a/test/asf/test_string.py b/test/asf/test_string.py
index 3a861ef97a8..2eeecd7dfd8 100644
--- a/test/asf/test_string.py
+++ b/test/asf/test_string.py
@@ -2,11 +2,10 @@
import unittest
-from asfframework import VppTestCase, VppTestRunner
-from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
+from asfframework import VppAsfTestCase, VppTestRunner
-class TestString(VppTestCase):
+class TestString(VppAsfTestCase):
"""String Test Cases"""
@classmethod
diff --git a/test/asf/test_tap.py b/test/asf/test_tap.py
index 1a9d0ac56b8..c436ec6b6ae 100644
--- a/test/asf/test_tap.py
+++ b/test/asf/test_tap.py
@@ -1,7 +1,7 @@
import unittest
import os
-from asfframework import VppTestCase, VppTestRunner
+from asfframework import VppAsfTestCase, VppTestRunner
from vpp_devices import VppTAPInterface
@@ -10,7 +10,7 @@ def check_tuntap_driver_access():
@unittest.skip("Requires root")
-class TestTAP(VppTestCase):
+class TestTAP(VppAsfTestCase):
"""TAP Test Case"""
def test_tap_add_del(self):
diff --git a/test/asf/test_tcp.py b/test/asf/test_tcp.py
index 4a16d573668..69fc5c472a5 100644
--- a/test/asf/test_tcp.py
+++ b/test/asf/test_tcp.py
@@ -2,11 +2,11 @@
import unittest
-from asfframework import VppTestCase, VppTestRunner
+from asfframework import VppAsfTestCase, VppTestRunner
from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
-class TestTCP(VppTestCase):
+class TestTCP(VppAsfTestCase):
"""TCP Test Case"""
@classmethod
@@ -93,7 +93,7 @@ class TestTCP(VppTestCase):
ip_t10.remove_vpp_config()
-class TestTCPUnitTests(VppTestCase):
+class TestTCPUnitTests(VppAsfTestCase):
"TCP Unit Tests"
@classmethod
diff --git a/test/asf/test_tls.py b/test/asf/test_tls.py
index e70c63d9a32..d2d1d9a4747 100644
--- a/test/asf/test_tls.py
+++ b/test/asf/test_tls.py
@@ -5,7 +5,7 @@ import os
import re
import subprocess
-from asfframework import VppTestCase, VppTestRunner
+from asfframework import VppAsfTestCase, VppTestRunner
from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
@@ -52,7 +52,7 @@ def checkAll():
return ret
-class TestTLS(VppTestCase):
+class TestTLS(VppAsfTestCase):
"""TLS Qat Test Case."""
@classmethod
diff --git a/test/asf/test_vapi.py b/test/asf/test_vapi.py
index 2eb47b59017..10d9411dbb6 100644
--- a/test/asf/test_vapi.py
+++ b/test/asf/test_vapi.py
@@ -5,10 +5,10 @@ import unittest
import os
import signal
from config import config
-from asfframework import VppTestCase, VppTestRunner, Worker
+from asfframework import VppAsfTestCase, VppTestRunner, Worker
-class VAPITestCase(VppTestCase):
+class VAPITestCase(VppAsfTestCase):
"""VAPI test"""
@classmethod
diff --git a/test/asf/test_vcl.py b/test/asf/test_vcl.py
index 59c077ee4f7..a1113b863e8 100644
--- a/test/asf/test_vcl.py
+++ b/test/asf/test_vcl.py
@@ -7,8 +7,8 @@ import subprocess
import signal
import glob
from config import config
-from asfframework import VppTestCase, VppTestRunner, Worker
-from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath, FibPathProto
+from asfframework import VppAsfTestCase, VppTestRunner, Worker
+from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
iperf3 = "/usr/bin/iperf3"
@@ -58,7 +58,7 @@ class VCLAppWorker(Worker):
super(VCLAppWorker, self).__init__(self.args, logger, env, *args, **kwargs)
-class VCLTestCase(VppTestCase):
+class VCLTestCase(VppAsfTestCase):
"""VCL Test Class"""
session_startup = ["poll-main"]
@@ -84,7 +84,7 @@ class VCLTestCase(VppTestCase):
self.timeout = 20
self.echo_phrase = "Hello, world! Jenny is a friend of mine."
self.pre_test_sleep = 0.3
- self.post_test_sleep = 0.2
+ self.post_test_sleep = 1
self.sapi_client_sock = ""
self.sapi_server_sock = ""
diff --git a/test/asf/test_vhost.py b/test/asf/test_vhost.py
index eb584633d5a..622716cafe3 100644
--- a/test/asf/test_vhost.py
+++ b/test/asf/test_vhost.py
@@ -2,12 +2,12 @@
import unittest
-from asfframework import VppTestCase, VppTestRunner
+from asfframework import VppAsfTestCase, VppTestRunner
from vpp_vhost_interface import VppVhostInterface
-class TesVhostInterface(VppTestCase):
+class TesVhostInterface(VppAsfTestCase):
"""Vhost User Test Case"""
@classmethod
diff --git a/test/asf/test_vlib.py b/test/asf/test_vlib.py
deleted file mode 100644
index dce08b823b4..00000000000
--- a/test/asf/test_vlib.py
+++ /dev/null
@@ -1,327 +0,0 @@
-#!/usr/bin/env python3
-
-import unittest
-import pexpect
-import time
-import signal
-from config import config
-from asfframework import VppTestCase, VppTestRunner
-from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
-from scapy.layers.inet import IP, ICMP
-from scapy.layers.l2 import Ether
-from scapy.packet import Raw
-
-
-@unittest.skipUnless(config.gcov, "part of code coverage tests")
-class TestVlib(VppTestCase):
- """Vlib Unit Test Cases"""
-
- vpp_worker_count = 1
-
- @classmethod
- def setUpClass(cls):
- super(TestVlib, cls).setUpClass()
-
- @classmethod
- def tearDownClass(cls):
- super(TestVlib, cls).tearDownClass()
-
- def setUp(self):
- super(TestVlib, self).setUp()
-
- def tearDown(self):
- super(TestVlib, self).tearDown()
-
- def test_vlib_main_unittest(self):
- """Vlib main.c Code Coverage Test"""
-
- cmds = [
- "loopback create",
- "packet-generator new {\n"
- " name vlib\n"
- " limit 15\n"
- " size 128-128\n"
- " interface loop0\n"
- " node ethernet-input\n"
- " data {\n"
- " IP6: 00:d0:2d:5e:86:85 -> 00:0d:ea:d0:00:00\n"
- " ICMP: db00::1 -> db00::2\n"
- " incrementing 30\n"
- " }\n"
- "}\n",
- "event-logger trace dispatch",
- "event-logger stop",
- "event-logger clear",
- "event-logger resize 102400",
- "event-logger restart",
- "pcap dispatch trace on max 100 buffer-trace pg-input 15",
- "pa en",
- "show event-log 100 all",
- "event-log save",
- "event-log save foo",
- "pcap dispatch trace",
- "pcap dispatch trace status",
- "pcap dispatch trace off",
- "show vlib frame-allocation",
- ]
-
- for cmd in cmds:
- r = self.vapi.cli_return_response(cmd)
- if r.retval != 0:
- if hasattr(r, "reply"):
- self.logger.info(cmd + " FAIL reply " + r.reply)
- else:
- self.logger.info(cmd + " FAIL retval " + str(r.retval))
-
- def test_vlib_node_cli_unittest(self):
- """Vlib node_cli.c Code Coverage Test"""
-
- cmds = [
- "loopback create",
- "packet-generator new {\n"
- " name vlib\n"
- " limit 15\n"
- " size 128-128\n"
- " interface loop0\n"
- " node ethernet-input\n"
- " data {\n"
- " IP6: 00:d0:2d:5e:86:85 -> 00:0d:ea:d0:00:00\n"
- " ICMP: db00::1 -> db00::2\n"
- " incrementing 30\n"
- " }\n"
- "}\n",
- "show vlib graph",
- "show vlib graph ethernet-input",
- "show vlib graphviz",
- "show vlib graphviz graphviz.dot",
- "pa en",
- "show runtime ethernet-input",
- "show runtime brief verbose max summary",
- "clear runtime",
- "show node index 1",
- "show node ethernet-input",
- "show node pg-input",
- "set node function",
- "set node function no-such-node",
- "set node function cdp-input default",
- "set node function ethernet-input default",
- "set node function ethernet-input bozo",
- "set node function ethernet-input",
- "show \t",
- ]
-
- for cmd in cmds:
- r = self.vapi.cli_return_response(cmd)
- if r.retval != 0:
- if hasattr(r, "reply"):
- self.logger.info(cmd + " FAIL reply " + r.reply)
- else:
- self.logger.info(cmd + " FAIL retval " + str(r.retval))
-
- def test_vlib_buffer_c_unittest(self):
- """Vlib buffer.c Code Coverage Test"""
-
- cmds = [
- "loopback create",
- "packet-generator new {\n"
- " name vlib\n"
- " limit 15\n"
- " size 128-128\n"
- " interface loop0\n"
- " node ethernet-input\n"
- " data {\n"
- " IP6: 00:d0:2d:5e:86:85 -> 00:0d:ea:d0:00:00\n"
- " ICMP: db00::1 -> db00::2\n"
- " incrementing 30\n"
- " }\n"
- "}\n",
- "event-logger trace",
- "event-logger trace enable",
- "event-logger trace api cli barrier",
- "pa en",
- "show interface bogus",
- "event-logger trace disable api cli barrier",
- "event-logger trace circuit-node ethernet-input",
- "event-logger trace circuit-node ethernet-input disable",
- "clear interfaces",
- "test vlib",
- "test vlib2",
- "show memory api-segment stats-segment main-heap verbose",
- "leak-check { show memory }",
- "show cpu",
- "memory-trace main-heap",
- "memory-trace main-heap api-segment stats-segment",
- "leak-check { show version }",
- "show version ?",
- "comment { show version }",
- "uncomment { show version }",
- "show memory main-heap",
- "show memory bogus",
- "choices",
- "test heap-validate",
- "memory-trace main-heap disable",
- "show buffers",
- "show eve",
- "show help",
- "show ip ",
- ]
-
- for cmd in cmds:
- r = self.vapi.cli_return_response(cmd)
- if r.retval != 0:
- if hasattr(r, "reply"):
- self.logger.info(cmd + " FAIL reply " + r.reply)
- else:
- self.logger.info(cmd + " FAIL retval " + str(r.retval))
-
- def test_vlib_format_unittest(self):
- """Vlib format.c Code Coverage Test"""
-
- cmds = [
- "loopback create",
- "classify filter pcap mask l2 proto match l2 proto 0x86dd",
- "classify filter pcap del",
- "test format-vlib",
- ]
-
- for cmd in cmds:
- r = self.vapi.cli_return_response(cmd)
- if r.retval != 0:
- if hasattr(r, "reply"):
- self.logger.info(cmd + " FAIL reply " + r.reply)
- else:
- self.logger.info(cmd + " FAIL retval " + str(r.retval))
-
- def test_vlib_main_unittest(self):
- """Private Binary API Segment Test (takes 70 seconds)"""
-
- vat_path = config.vpp + "_api_test"
- vat = pexpect.spawn(vat_path, ["socket-name", self.get_api_sock_path()])
- vat.expect("vat# ", timeout=10)
- vat.sendline("sock_init_shm")
- vat.expect("vat# ", timeout=10)
- vat.sendline("sh api cli")
- vat.kill(signal.SIGKILL)
- vat.wait()
- self.logger.info("vat terminated, 70 second wait for the Reaper")
- time.sleep(70)
- self.logger.info("Reaper should be complete...")
-
- def test_pool(self):
- """Fixed-size Pool Test"""
-
- cmds = [
- "test pool",
- ]
-
- for cmd in cmds:
- r = self.vapi.cli_return_response(cmd)
- if r.retval != 0:
- if hasattr(r, "reply"):
- self.logger.info(cmd + " FAIL reply " + r.reply)
- else:
- self.logger.info(cmd + " FAIL retval " + str(r.retval))
-
-
-class TestVlibFrameLeak(VppTestCase):
- """Vlib Frame Leak Test Cases"""
-
- vpp_worker_count = 1
-
- @classmethod
- def setUpClass(cls):
- super(TestVlibFrameLeak, cls).setUpClass()
-
- @classmethod
- def tearDownClass(cls):
- super(TestVlibFrameLeak, cls).tearDownClass()
-
- def setUp(self):
- super(TestVlibFrameLeak, self).setUp()
- # create 1 pg interface
- self.create_pg_interfaces(range(1))
-
- for i in self.pg_interfaces:
- i.admin_up()
- i.config_ip4()
- i.resolve_arp()
-
- def tearDown(self):
- super(TestVlibFrameLeak, self).tearDown()
- for i in self.pg_interfaces:
- i.unconfig_ip4()
- i.admin_down()
-
- def test_vlib_mw_refork_frame_leak(self):
- """Vlib worker thread refork leak test case"""
- icmp_id = 0xB
- icmp_seq = 5
- icmp_load = b"\x0a" * 18
- pkt = (
- Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
- / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
- / ICMP(id=icmp_id, seq=icmp_seq)
- / Raw(load=icmp_load)
- )
-
- # Send a packet
- self.pg0.add_stream(pkt)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
-
- rx = self.pg0.get_capture(1)
-
- self.assertEquals(len(rx), 1)
- rx = rx[0]
- ether = rx[Ether]
- ipv4 = rx[IP]
-
- self.assertEqual(ether.src, self.pg0.local_mac)
- self.assertEqual(ether.dst, self.pg0.remote_mac)
-
- self.assertEqual(ipv4.src, self.pg0.local_ip4)
- self.assertEqual(ipv4.dst, self.pg0.remote_ip4)
-
- # Save allocated frame count
- frame_allocated = {}
- for fs in self.vapi.cli("show vlib frame-allocation").splitlines()[1:]:
- spl = fs.split()
- thread = int(spl[0])
- size = int(spl[1])
- alloc = int(spl[2])
- key = (thread, size)
- frame_allocated[key] = alloc
-
- # cause reforks
- _ = self.create_loopback_interfaces(1)
-
- # send the same packet
- self.pg0.add_stream(pkt)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
-
- rx = self.pg0.get_capture(1)
-
- self.assertEquals(len(rx), 1)
- rx = rx[0]
- ether = rx[Ether]
- ipv4 = rx[IP]
-
- self.assertEqual(ether.src, self.pg0.local_mac)
- self.assertEqual(ether.dst, self.pg0.remote_mac)
-
- self.assertEqual(ipv4.src, self.pg0.local_ip4)
- self.assertEqual(ipv4.dst, self.pg0.remote_ip4)
-
- # Check that no frame were leaked during refork
- for fs in self.vapi.cli("show vlib frame-allocation").splitlines()[1:]:
- spl = fs.split()
- thread = int(spl[0])
- size = int(spl[1])
- alloc = int(spl[2])
- key = (thread, size)
- self.assertEqual(frame_allocated[key], alloc)
-
-
-if __name__ == "__main__":
- unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_vpe_api.py b/test/asf/test_vpe_api.py
index 426a3878c59..4d866ec906a 100644
--- a/test/asf/test_vpe_api.py
+++ b/test/asf/test_vpe_api.py
@@ -13,13 +13,12 @@
# limitations under the License.
import datetime
import time
-import unittest
-from asfframework import VppTestCase
+from asfframework import VppAsfTestCase
enable_print = False
-class TestVpeApi(VppTestCase):
+class TestVpeApi(VppAsfTestCase):
"""TestVpeApi"""
def test_log_dump_default(self):
diff --git a/test/asf/test_vppinfra.py b/test/asf/test_vppinfra.py
index 4b49628cf58..56391bfb13c 100644
--- a/test/asf/test_vppinfra.py
+++ b/test/asf/test_vppinfra.py
@@ -2,10 +2,10 @@
import unittest
-from asfframework import VppTestCase, VppTestRunner
+from asfframework import VppAsfTestCase, VppTestRunner
-class TestVppinfra(VppTestCase):
+class TestVppinfra(VppAsfTestCase):
"""Vppinfra Unit Test Cases"""
@classmethod