diff options
Diffstat (limited to 'test/asf')
40 files changed, 176 insertions, 5207 deletions
diff --git a/test/asf/asfframework.py b/test/asf/asfframework.py index 1214fbfecd2..024d7f0127d 100644 --- a/test/asf/asfframework.py +++ b/test/asf/asfframework.py @@ -15,6 +15,7 @@ import random import copy import platform import shutil +from pathlib import Path from collections import deque from threading import Thread, Event from inspect import getdoc, isclass @@ -22,16 +23,11 @@ from traceback import format_exception from logging import FileHandler, DEBUG, Formatter from enum import Enum from abc import ABC, abstractmethod -from struct import pack, unpack -from config import config, available_cpus, num_cpus, max_vpp_cpus +from config import config, max_vpp_cpus import hook as hookmodule -from vpp_pg_interface import VppPGInterface -from vpp_sub_interface import VppSubInterface from vpp_lo_interface import VppLoInterface -from vpp_bvi_interface import VppBviInterface from vpp_papi_provider import VppPapiProvider -from vpp_papi import VppEnum import vpp_papi from vpp_papi.vpp_stats import VPPStats from vpp_papi.vpp_transport_socket import VppTransportSocketIOError @@ -45,13 +41,13 @@ from log import ( colorize, ) from vpp_object import VppObjectRegistry -from util import ppp, is_core_present +from util import is_core_present from test_result_code import TestResultCode logger = logging.getLogger(__name__) # Set up an empty logger for the testcase that can be overridden as necessary -null_logger = logging.getLogger("VppTestCase") +null_logger = logging.getLogger("VppAsfTestCase") null_logger.addHandler(logging.NullHandler()) @@ -103,35 +99,6 @@ class VppDiedError(Exception): super(VppDiedError, self).__init__(msg) -class _PacketInfo(object): - """Private class to create packet info object. - - Help process information about the next packet. - Set variables to default values. - """ - - #: Store the index of the packet. - index = -1 - #: Store the index of the source packet generator interface of the packet. - src = -1 - #: Store the index of the destination packet generator interface - #: of the packet. - dst = -1 - #: Store expected ip version - ip = -1 - #: Store expected upper protocol - proto = -1 - #: Store the copy of the former packet. - data = None - - def __eq__(self, other): - index = self.index == other.index - src = self.src == other.src - dst = self.dst == other.dst - data = self.data == other.data - return index and src and dst and data - - def pump_output(testclass): """pump output from vpp stdout/stderr to proper queues""" stdout_fragment = "" @@ -188,6 +155,36 @@ def _is_platform_aarch64(): is_platform_aarch64 = _is_platform_aarch64() +def _is_distro_ubuntu2204(): + with open("/etc/os-release") as f: + for line in f.readlines(): + if "jammy" in line: + return True + return False + + +is_distro_ubuntu2204 = _is_distro_ubuntu2204() + + +def _is_distro_debian11(): + with open("/etc/os-release") as f: + for line in f.readlines(): + if "bullseye" in line: + return True + return False + + +is_distro_debian11 = _is_distro_debian11() + + +def _is_distro_ubuntu2204(): + with open("/etc/os-release") as f: + for line in f.readlines(): + if "jammy" in line: + return True + return False + + class KeepAliveReporter(object): """ Singleton object which reports test start to parent process @@ -233,6 +230,12 @@ class TestCaseTag(Enum): FIXME_VPP_WORKERS = 2 # marks the suites broken when ASan is enabled FIXME_ASAN = 3 + # marks suites broken on Ubuntu-22.04 + FIXME_UBUNTU2204 = 4 + # marks suites broken on Debian-11 + FIXME_DEBIAN11 = 5 + # marks suites broken on debug vpp image + FIXME_VPP_DEBUG = 6 def create_tag_decorator(e): @@ -249,6 +252,9 @@ def create_tag_decorator(e): tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO) tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS) tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN) +tag_fixme_ubuntu2204 = create_tag_decorator(TestCaseTag.FIXME_UBUNTU2204) +tag_fixme_debian11 = create_tag_decorator(TestCaseTag.FIXME_DEBIAN11) +tag_fixme_vpp_debug = create_tag_decorator(TestCaseTag.FIXME_VPP_DEBUG) class DummyVpp: @@ -276,7 +282,7 @@ class CPUInterface(ABC): cls.cpus = cpus -class VppTestCase(CPUInterface, unittest.TestCase): +class VppAsfTestCase(CPUInterface, unittest.TestCase): """This subclass is a base class for VPP test cases that are implemented as classes. It provides methods to create and run test case. """ @@ -288,19 +294,6 @@ class VppTestCase(CPUInterface, unittest.TestCase): vapi_response_timeout = 5 remove_configured_vpp_objects_on_tear_down = True - @property - def packet_infos(self): - """List of packet infos""" - return self._packet_infos - - @classmethod - def get_packet_count_for_if_idx(cls, dst_if_index): - """Get the number of packet info for specified destination if index""" - if dst_if_index in cls._packet_count_for_dst_if_idx: - return cls._packet_count_for_dst_if_idx[dst_if_index] - else: - return 0 - @classmethod def has_tag(cls, tag): """if the test case has a given tag - return true""" @@ -598,7 +591,7 @@ class VppTestCase(CPUInterface, unittest.TestCase): if cls.debug_attach: tmpdir = f"{config.tmp_dir}/unittest-attach-gdb" else: - tmpdir = f"{config.tmp_dir}/vpp-unittest-{cls.__name__}" + tmpdir = f"{config.tmp_dir}/{get_testcase_dirname(cls.__name__)}" if config.wipe_tmp_dir: shutil.rmtree(tmpdir, ignore_errors=True) os.mkdir(tmpdir) @@ -610,7 +603,7 @@ class VppTestCase(CPUInterface, unittest.TestCase): cls.file_handler = FileHandler(f"{cls.tempdir}/log.txt") return - logdir = f"{config.log_dir}/vpp-unittest-{cls.__name__}" + logdir = f"{config.log_dir}/{get_testcase_dirname(cls.__name__)}" if config.wipe_tmp_dir: shutil.rmtree(logdir, ignore_errors=True) os.mkdir(logdir) @@ -622,8 +615,9 @@ class VppTestCase(CPUInterface, unittest.TestCase): Perform class setup before running the testcase Remove shared memory files, start vpp and connect the vpp-api """ - super(VppTestCase, cls).setUpClass() + super(VppAsfTestCase, cls).setUpClass() cls.logger = get_logger(cls.__name__) + cls.logger.debug(f"--- START setUpClass() {cls.__name__} ---") random.seed(config.rnd_seed) if hasattr(cls, "parallel_handler"): cls.logger.addHandler(cls.parallel_handler) @@ -645,9 +639,6 @@ class VppTestCase(CPUInterface, unittest.TestCase): ) cls.logger.debug("Random seed is %s", config.rnd_seed) cls.setUpConstants() - cls.reset_packet_infos() - cls._pcaps = [] - cls._old_pcaps = [] cls.verbose = 0 cls.vpp_dead = False cls.registry = VppObjectRegistry() @@ -684,6 +675,7 @@ class VppTestCase(CPUInterface, unittest.TestCase): try: hook.poll_vpp() except VppDiedError: + cls.wait_for_coredump() cls.vpp_startup_failed = True cls.logger.critical( "VPP died shortly after startup, check the" @@ -718,6 +710,7 @@ class VppTestCase(CPUInterface, unittest.TestCase): cls.logger.debug("Exception connecting to VPP: %s" % e) cls.quit() raise e + cls.logger.debug(f"--- END setUpClass() {cls.__name__} ---") @classmethod def _debug_quit(cls): @@ -810,13 +803,13 @@ class VppTestCase(CPUInterface, unittest.TestCase): @classmethod def tearDownClass(cls): """Perform final cleanup after running all tests in this test-case""" - cls.logger.debug("--- tearDownClass() for %s called ---" % cls.__name__) + cls.logger.debug(f"--- START tearDownClass() {cls.__name__} ---") cls.reporter.send_keep_alive(cls, "tearDownClass") cls.quit() cls.file_handler.close() - cls.reset_packet_infos() if config.debug_framework: debug_internal.on_tear_down_class(cls) + cls.logger.debug(f"--- END tearDownClass() {cls.__name__} ---") def show_commands_at_teardown(self): """Allow subclass specific teardown logging additions.""" @@ -825,8 +818,7 @@ class VppTestCase(CPUInterface, unittest.TestCase): def tearDown(self): """Show various debug prints after each test""" self.logger.debug( - "--- tearDown() for %s.%s(%s) called ---" - % (self.__class__.__name__, self._testMethodName, self._testMethodDoc) + f"--- START tearDown() {self.__class__.__name__}.{self._testMethodName}({self._testMethodDoc}) ---" ) try: @@ -857,12 +849,29 @@ class VppTestCase(CPUInterface, unittest.TestCase): self.vpp_dead = True else: self.registry.unregister_all(self.logger) + # Remove any leftover pcap files + if hasattr(self, "pg_interfaces") and len(self.pg_interfaces) > 0: + testcase_dir = os.path.dirname(self.pg_interfaces[0].out_path) + for p in Path(testcase_dir).glob("pg*.pcap"): + self.logger.debug(f"Removing {p}") + p.unlink() + self.logger.debug( + f"--- END tearDown() {self.__class__.__name__}.{self._testMethodName}('{self._testMethodDoc}') ---" + ) def setUp(self): """Clear trace before running each test""" - super(VppTestCase, self).setUp() + super(VppAsfTestCase, self).setUp() + self.logger.debug( + f"--- START setUp() {self.__class__.__name__}.{self._testMethodName}('{self._testMethodDoc}') ---" + ) + # Save testname include in pcap history filenames + if hasattr(self, "pg_interfaces"): + for i in self.pg_interfaces: + i.test_name = self._testMethodName self.reporter.send_keep_alive(self) if self.vpp_dead: + self.wait_for_coredump() raise VppDiedError( rv=None, testcase=self.__class__.__name__, @@ -881,26 +890,9 @@ class VppTestCase(CPUInterface, unittest.TestCase): # store the test instance inside the test class - so that objects # holding the class can access instance methods (like assertEqual) type(self).test_instance = self - - @classmethod - def pg_enable_capture(cls, interfaces=None): - """ - Enable capture on packet-generator interfaces - - :param interfaces: iterable interface indexes (if None, - use self.pg_interfaces) - - """ - if interfaces is None: - interfaces = cls.pg_interfaces - for i in interfaces: - i.enable_capture() - - @classmethod - def register_pcap(cls, intf, worker): - """Register a pcap in the testclass""" - # add to the list of captures with current timestamp - cls._pcaps.append((intf, worker)) + self.logger.debug( + f"--- END setUp() {self.__class__.__name__}.{self._testMethodName}('{self._testMethodDoc}') ---" + ) @classmethod def get_vpp_time(cls): @@ -922,76 +914,6 @@ class VppTestCase(CPUInterface, unittest.TestCase): cls.sleep(0.1) @classmethod - def pg_start(cls, trace=True): - """Enable the PG, wait till it is done, then clean up""" - for intf, worker in cls._old_pcaps: - intf.handle_old_pcap_file(intf.get_in_path(worker), intf.in_history_counter) - cls._old_pcaps = [] - if trace: - cls.vapi.cli("clear trace") - cls.vapi.cli("trace add pg-input 1000") - cls.vapi.cli("packet-generator enable") - # PG, when starts, runs to completion - - # so let's avoid a race condition, - # and wait a little till it's done. - # Then clean it up - and then be gone. - deadline = time.time() + 300 - while cls.vapi.cli("show packet-generator").find("Yes") != -1: - cls.sleep(0.01) # yield - if time.time() > deadline: - cls.logger.error("Timeout waiting for pg to stop") - break - for intf, worker in cls._pcaps: - cls.vapi.cli("packet-generator delete %s" % intf.get_cap_name(worker)) - cls._old_pcaps = cls._pcaps - cls._pcaps = [] - - @classmethod - def create_pg_interfaces_internal(cls, interfaces, gso=0, gso_size=0, mode=None): - """ - Create packet-generator interfaces. - - :param interfaces: iterable indexes of the interfaces. - :returns: List of created interfaces. - - """ - result = [] - for i in interfaces: - intf = VppPGInterface(cls, i, gso, gso_size, mode) - setattr(cls, intf.name, intf) - result.append(intf) - cls.pg_interfaces = result - return result - - @classmethod - def create_pg_ip4_interfaces(cls, interfaces, gso=0, gso_size=0): - pgmode = VppEnum.vl_api_pg_interface_mode_t - return cls.create_pg_interfaces_internal( - interfaces, gso, gso_size, pgmode.PG_API_MODE_IP4 - ) - - @classmethod - def create_pg_ip6_interfaces(cls, interfaces, gso=0, gso_size=0): - pgmode = VppEnum.vl_api_pg_interface_mode_t - return cls.create_pg_interfaces_internal( - interfaces, gso, gso_size, pgmode.PG_API_MODE_IP6 - ) - - @classmethod - def create_pg_interfaces(cls, interfaces, gso=0, gso_size=0): - pgmode = VppEnum.vl_api_pg_interface_mode_t - return cls.create_pg_interfaces_internal( - interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET - ) - - @classmethod - def create_pg_ethernet_interfaces(cls, interfaces, gso=0, gso_size=0): - pgmode = VppEnum.vl_api_pg_interface_mode_t - return cls.create_pg_interfaces_internal( - interfaces, gso, gso_size, pgmode.PG_API_MODE_ETHERNET - ) - - @classmethod def create_loopback_interfaces(cls, count): """ Create loopback interfaces. @@ -1005,119 +927,6 @@ class VppTestCase(CPUInterface, unittest.TestCase): cls.lo_interfaces = result return result - @classmethod - def create_bvi_interfaces(cls, count): - """ - Create BVI interfaces. - - :param count: number of interfaces created. - :returns: List of created interfaces. - """ - result = [VppBviInterface(cls) for i in range(count)] - for intf in result: - setattr(cls, intf.name, intf) - cls.bvi_interfaces = result - return result - - @classmethod - def reset_packet_infos(cls): - """Reset the list of packet info objects and packet counts to zero""" - cls._packet_infos = {} - cls._packet_count_for_dst_if_idx = {} - - @classmethod - def create_packet_info(cls, src_if, dst_if): - """ - Create packet info object containing the source and destination indexes - and add it to the testcase's packet info list - - :param VppInterface src_if: source interface - :param VppInterface dst_if: destination interface - - :returns: _PacketInfo object - - """ - info = _PacketInfo() - info.index = len(cls._packet_infos) - info.src = src_if.sw_if_index - info.dst = dst_if.sw_if_index - if isinstance(dst_if, VppSubInterface): - dst_idx = dst_if.parent.sw_if_index - else: - dst_idx = dst_if.sw_if_index - if dst_idx in cls._packet_count_for_dst_if_idx: - cls._packet_count_for_dst_if_idx[dst_idx] += 1 - else: - cls._packet_count_for_dst_if_idx[dst_idx] = 1 - cls._packet_infos[info.index] = info - return info - - @staticmethod - def info_to_payload(info): - """ - Convert _PacketInfo object to packet payload - - :param info: _PacketInfo object - - :returns: string containing serialized data from packet info - """ - - # retrieve payload, currently 18 bytes (4 x ints + 1 short) - return pack("iiiih", info.index, info.src, info.dst, info.ip, info.proto) - - def get_next_packet_info(self, info): - """ - Iterate over the packet info list stored in the testcase - Start iteration with first element if info is None - Continue based on index in info if info is specified - - :param info: info or None - :returns: next info in list or None if no more infos - """ - if info is None: - next_index = 0 - else: - next_index = info.index + 1 - if next_index == len(self._packet_infos): - return None - else: - return self._packet_infos[next_index] - - def get_next_packet_info_for_interface(self, src_index, info): - """ - Search the packet info list for the next packet info with same source - interface index - - :param src_index: source interface index to search for - :param info: packet info - where to start the search - :returns: packet info or None - - """ - while True: - info = self.get_next_packet_info(info) - if info is None: - return None - if info.src == src_index: - return info - - def get_next_packet_info_for_interface2(self, src_index, dst_index, info): - """ - Search the packet info list for the next packet info with same source - and destination interface indexes - - :param src_index: source interface index to search for - :param dst_index: destination interface index to search for - :param info: packet info - where to start the search - :returns: packet info or None - - """ - while True: - info = self.get_next_packet_info_for_interface(src_index, info) - if info is None: - return None - if info.dst == dst_index: - return info - def assert_equal(self, real_value, expected_value, name_or_class=None): if name_or_class is None: self.assertEqual(real_value, expected_value) @@ -1152,25 +961,6 @@ class VppTestCase(CPUInterface, unittest.TestCase): ) self.assertTrue(expected_min <= real_value <= expected_max, msg) - def assert_ip_checksum_valid(self, received_packet, ignore_zero_checksum=False): - self.assert_checksum_valid( - received_packet, "IP", ignore_zero_checksum=ignore_zero_checksum - ) - - def assert_tcp_checksum_valid(self, received_packet, ignore_zero_checksum=False): - self.assert_checksum_valid( - received_packet, "TCP", ignore_zero_checksum=ignore_zero_checksum - ) - - def assert_udp_checksum_valid(self, received_packet, ignore_zero_checksum=True): - self.assert_checksum_valid( - received_packet, "UDP", ignore_zero_checksum=ignore_zero_checksum - ) - - def assert_icmp_checksum_valid(self, received_packet): - self.assert_checksum_valid(received_packet, "ICMP") - self.assert_embedded_icmp_checksum_valid(received_packet) - def get_counter(self, counter): if counter.startswith("/"): counter_value = self.statistics.get_counter(counter) @@ -1196,12 +986,6 @@ class VppTestCase(CPUInterface, unittest.TestCase): ) self.assert_equal(c, expected_value, "counter `%s[%s]'" % (counter, index)) - def assert_packet_counter_equal(self, counter, expected_value): - counter_value = self.get_counter(counter) - self.assert_equal( - counter_value, expected_value, "packet counter `%s'" % counter - ) - def assert_error_counter_equal(self, counter, expected_value): counter_value = self.statistics[counter].sum() self.assert_equal(counter_value, expected_value, "error counter `%s'" % counter) @@ -1242,11 +1026,6 @@ class VppTestCase(CPUInterface, unittest.TestCase): self.logger.debug("Moving VPP time by %s (%s)", timeout, remark) self.vapi.cli("set clock adjust %s" % timeout) - def pg_send(self, intf, pkts, worker=None, trace=True): - intf.add_stream(pkts, worker=worker) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start(trace=trace) - def snapshot_stats(self, stats_diff): """Return snapshot of interesting stats based on diff dictionary.""" stats_snapshot = {} @@ -1287,70 +1066,6 @@ class VppTestCase(CPUInterface, unittest.TestCase): f"Couldn't sum counter: {cntr} on sw_if_index: {sw_if_index}" ) from e - def send_and_assert_no_replies( - self, intf, pkts, remark="", timeout=None, stats_diff=None, trace=True, msg=None - ): - if stats_diff: - stats_snapshot = self.snapshot_stats(stats_diff) - - self.pg_send(intf, pkts) - - try: - if not timeout: - timeout = 1 - for i in self.pg_interfaces: - i.assert_nothing_captured(timeout=timeout, remark=remark) - timeout = 0.1 - finally: - if trace: - if msg: - self.logger.debug(f"send_and_assert_no_replies: {msg}") - self.logger.debug(self.vapi.cli("show trace")) - - if stats_diff: - self.compare_stats_with_snapshot(stats_diff, stats_snapshot) - - def send_and_expect_load_balancing( - self, input, pkts, outputs, worker=None, trace=True - ): - self.pg_send(input, pkts, worker=worker, trace=trace) - rxs = [] - for oo in outputs: - rx = oo._get_capture(1) - self.assertNotEqual(0, len(rx)) - rxs.append(rx) - if trace: - self.logger.debug(self.vapi.cli("show trace")) - return rxs - - def send_and_expect_some(self, intf, pkts, output, worker=None, trace=True): - self.pg_send(intf, pkts, worker=worker, trace=trace) - rx = output._get_capture(1) - if trace: - self.logger.debug(self.vapi.cli("show trace")) - self.assertTrue(len(rx) > 0) - self.assertTrue(len(rx) < len(pkts)) - return rx - - def send_and_expect_only(self, intf, pkts, output, timeout=None, stats_diff=None): - if stats_diff: - stats_snapshot = self.snapshot_stats(stats_diff) - - self.pg_send(intf, pkts) - rx = output.get_capture(len(pkts)) - outputs = [output] - if not timeout: - timeout = 1 - for i in self.pg_interfaces: - if i not in outputs: - i.assert_nothing_captured(timeout=timeout) - timeout = 0.1 - - if stats_diff: - self.compare_stats_with_snapshot(stats_diff, stats_snapshot) - - return rx - def get_testcase_doc_name(test): return getdoc(test.__class__).splitlines()[0] @@ -1364,6 +1079,14 @@ def get_test_description(descriptions, test): return str(test) +def get_failed_testcase_linkname(failed_dir, testcase_dirname): + return os.path.join(failed_dir, f"{testcase_dirname}-FAILED") + + +def get_testcase_dirname(testcase_class_name): + return f"vpp-unittest-{testcase_class_name}" + + class TestCaseInfo(object): def __init__(self, logger, tempdir, vpp_pid, vpp_bin_path): self.logger = logger @@ -1409,6 +1132,17 @@ class VppTestResult(unittest.TestResult): self.runner = runner self.printed = [] + def decodePcapFiles(self, test, when_configured=False): + if when_configured == False or config.decode_pcaps == True: + if hasattr(test, "pg_interfaces") and len(test.pg_interfaces) > 0: + testcase_dir = os.path.dirname(test.pg_interfaces[0].out_path) + test.pg_interfaces[0].decode_pcap_files( + testcase_dir, f"suite{test.__class__.__name__}" + ) + test.pg_interfaces[0].decode_pcap_files( + testcase_dir, test._testMethodName + ) + def addSuccess(self, test): """ Record a test succeeded result @@ -1417,6 +1151,7 @@ class VppTestResult(unittest.TestResult): """ self.log_result("addSuccess", test) + self.decodePcapFiles(test, when_configured=True) unittest.TestResult.addSuccess(self, test) self.result_string = colorize("OK", GREEN) self.result_code = TestResultCode.PASS @@ -1424,6 +1159,7 @@ class VppTestResult(unittest.TestResult): def addExpectedFailure(self, test, err): self.log_result("addExpectedFailure", test, err) + self.decodePcapFiles(test) super().addExpectedFailure(test, err) self.result_string = colorize("FAIL", GREEN) self.result_code = TestResultCode.EXPECTED_FAIL @@ -1431,6 +1167,7 @@ class VppTestResult(unittest.TestResult): def addUnexpectedSuccess(self, test): self.log_result("addUnexpectedSuccess", test) + self.decodePcapFiles(test, when_configured=True) super().addUnexpectedSuccess(test) self.result_string = colorize("OK", RED) self.result_code = TestResultCode.UNEXPECTED_PASS @@ -1458,9 +1195,8 @@ class VppTestResult(unittest.TestResult): if self.current_test_case_info: try: failed_dir = config.failed_dir - link_path = os.path.join( - failed_dir, - "%s-FAILED" % os.path.basename(self.current_test_case_info.tempdir), + link_path = get_failed_testcase_linkname( + failed_dir, os.path.basename(self.current_test_case_info.tempdir) ) self.current_test_case_info.logger.debug( @@ -1517,6 +1253,7 @@ class VppTestResult(unittest.TestResult): error_type_str = colorize("ERROR", RED) else: raise Exception(f"Unexpected result code {result_code}") + self.decodePcapFiles(test) unittest_fn(self, test, err) if self.current_test_case_info: @@ -1727,7 +1464,7 @@ class VppTestRunner(unittest.TextTestRunner): **kwargs, ): # ignore stream setting here, use hard-coded stdout to be in sync - # with prints from VppTestCase methods ... + # with prints from VppAsfTestCase methods ... super(VppTestRunner, self).__init__( sys.stdout, descriptions, verbosity, failfast, buffer, resultclass, **kwargs ) diff --git a/test/asf/lisp.py b/test/asf/lisp.py deleted file mode 100644 index 9ebc86a35e3..00000000000 --- a/test/asf/lisp.py +++ /dev/null @@ -1,385 +0,0 @@ -import socket -from ipaddress import ip_network - -from vpp_object import VppObject - - -class VppLispLocatorSet(VppObject): - """Represents LISP locator set in VPP""" - - def __init__(self, test, ls_name): - self._test = test - self._ls_name = ls_name - - @property - def test(self): - return self._test - - @property - def ls_name(self): - return self._ls_name - - def add_vpp_config(self): - self.test.vapi.lisp_add_del_locator_set(locator_set_name=self._ls_name) - self._test.registry.register(self, self.test.logger) - - def get_lisp_locator_sets_dump_entry(self): - result = self.test.vapi.lisp_locator_set_dump() - for ls in result: - if ls.ls_name.strip("\x00") == self._ls_name: - return ls - return None - - def query_vpp_config(self): - return self.get_lisp_locator_sets_dump_entry() is not None - - def remove_vpp_config(self): - self.test.vapi.lisp_add_del_locator_set( - locator_set_name=self._ls_name, is_add=0 - ) - - def object_id(self): - return "lisp-locator-set-%s" % self._ls_name - - -class VppLispLocator(VppObject): - """Represents LISP locator in VPP""" - - def __init__(self, test, sw_if_index, ls_name, priority=1, weight=1): - self._test = test - self._sw_if_index = sw_if_index - self._ls_name = ls_name - self._priority = priority - self._weight = weight - - @property - def test(self): - """Test which created this locator""" - return self._test - - @property - def ls_name(self): - """Locator set name""" - return self._ls_name - - @property - def sw_if_index(self): - return self._sw_if_index - - @property - def priority(self): - return self._priority - - @property - def weight(self): - return self._weight - - def add_vpp_config(self): - self.test.vapi.lisp_add_del_locator( - locator_set_name=self._ls_name, - sw_if_index=self._sw_if_index, - priority=self._priority, - weight=self._weight, - ) - self._test.registry.register(self, self.test.logger) - - def get_lisp_locator_dump_entry(self): - locators = self.test.vapi.lisp_locator_dump( - is_index_set=0, ls_name=self._ls_name - ) - for locator in locators: - if locator.sw_if_index == self._sw_if_index: - return locator - return None - - def query_vpp_config(self): - locator = self.get_lisp_locator_dump_entry() - return locator is not None - - def remove_vpp_config(self): - self.test.vapi.lisp_add_del_locator( - locator_set_name=self._ls_name, - sw_if_index=self._sw_if_index, - priority=self._priority, - weight=self._weight, - is_add=0, - ) - self._test.registry.register(self, self.test.logger) - - def object_id(self): - return "lisp-locator-%s-%d" % (self._ls_name, self._sw_if_index) - - -class LispEIDType: - PREFIX = 0 - MAC = 1 - NSH = 2 - - -class LispKeyIdType: - NONE = 0 - SHA1 = 1 - SHA256 = 2 - - -class LispEID: - """Lisp endpoint identifier""" - - def __init__(self, eid): - self.eid = eid - self._type = -1 - - # find out whether EID is ip prefix, or MAC - try: - self.prefix = ip_network(self.eid) - self._type = LispEIDType.PREFIX - return - except ValueError: - if self.eid.count(":") == 5: # MAC address - self.mac = self.eid - self._type = LispEIDType.MAC - return - raise Exception("Unsupported EID format {!s}!".format(eid)) - - @property - def eid_type(self): - return self._type - - @property - def address(self): - if self.eid_type == LispEIDType.PREFIX: - return self.prefix - elif self.eid_type == LispEIDType.MAC: - return self.mac - elif self.eid_type == LispEIDType.NSH: - return Exception("Unimplemented") - - @property - def packed(self): - if self.eid_type == LispEIDType.PREFIX: - return {"type": self._type, "address": {"prefix": self.prefix}} - elif self.eid_type == LispEIDType.MAC: - return {"type": self._type, "address": {"mac": self.mac}} - elif self.eid_type == LispEIDType.NSH: - return Exception("Unimplemented") - - -class LispKey: - """Lisp Key""" - - def __init__(self, key_type, key): - self._key_type = key_type - self._key = key - - @property - def packed(self): - return {"id": self._key_type, "key": self._key} - - -class VppLispMapping(VppObject): - """Represents common features for remote and local LISP mapping in VPP""" - - def __init__(self, test, eid, vni=0, priority=1, weight=1): - self._eid = LispEID(eid) - self._test = test - self._priority = priority - self._weight = weight - self._vni = vni - - @property - def test(self): - return self._test - - @property - def vni(self): - return self._vni - - @property - def eid(self): - return self._eid - - @property - def priority(self): - return self._priority - - @property - def weight(self): - return self._weight - - def get_lisp_mapping_dump_entry(self): - return self.test.vapi.lisp_eid_table_dump( - eid_set=1, vni=self._vni, eid=self._eid.packed - ) - - def query_vpp_config(self): - mapping = self.get_lisp_mapping_dump_entry() - return mapping - - def object_id(self): - return "lisp-mapping-[%s]-%s-%s-%s" % ( - self.vni, - self.eid.address, - self.priority, - self.weight, - ) - - -class VppLocalMapping(VppLispMapping): - """LISP Local mapping""" - - def __init__( - self, - test, - eid, - ls_name, - vni=0, - priority=1, - weight=1, - key_id=LispKeyIdType.NONE, - key="", - ): - super(VppLocalMapping, self).__init__(test, eid, vni, priority, weight) - self._ls_name = ls_name - self._key = LispKey(key_id, key) - - @property - def ls_name(self): - return self._ls_name - - @property - def key_id(self): - return self._key_id - - @property - def key(self): - return self._key - - def add_vpp_config(self): - self.test.vapi.lisp_add_del_local_eid( - locator_set_name=self._ls_name, - eid=self._eid.packed, - vni=self._vni, - key=self._key.packed, - ) - self._test.registry.register(self, self.test.logger) - - def remove_vpp_config(self): - self.test.vapi.lisp_add_del_local_eid( - locator_set_name=self._ls_name, - eid=self._eid.packed, - vni=self._vni, - is_add=0, - ) - - def object_id(self): - return "lisp-eid-local-mapping-%s[%d]" % (self._eid.address, self._vni) - - -class LispRemoteLocator: - def __init__(self, addr, priority=1, weight=1): - self.addr = addr - self.priority = priority - self.weight = weight - - @property - def packed(self): - return { - "priority": self.priority, - "weight": self.weight, - "ip_address": self.addr, - } - - -class VppRemoteMapping(VppLispMapping): - def __init__(self, test, eid, rlocs=None, vni=0, priority=1, weight=1): - super(VppRemoteMapping, self).__init__(test, eid, vni, priority, weight) - self._rlocs = rlocs - - @property - def rlocs(self): - rlocs = [] - for rloc in self._rlocs: - rlocs.append(rloc.packed) - return rlocs - - def add_vpp_config(self): - self.test.vapi.lisp_add_del_remote_mapping( - rlocs=self.rlocs, - deid=self._eid.packed, - vni=self._vni, - rloc_num=len(self._rlocs), - ) - self._test.registry.register(self, self.test.logger) - - def remove_vpp_config(self): - self.test.vapi.lisp_add_del_remote_mapping( - deid=self._eid.packed, vni=self._vni, is_add=0, rloc_num=0 - ) - - def object_id(self): - return "lisp-eid-remote-mapping-%s[%d]" % (self._eid.address, self._vni) - - -class VppLispAdjacency(VppObject): - """Represents LISP adjacency in VPP""" - - def __init__(self, test, leid, reid, vni=0): - self._leid = LispEID(leid) - self._reid = LispEID(reid) - if self._leid.eid_type != self._reid.eid_type: - raise Exception("remote and local EID are different types!") - self._vni = vni - self._test = test - - @property - def test(self): - return self._test - - @property - def leid(self): - return self._leid - - @property - def reid(self): - return self._reid - - @property - def vni(self): - return self._vni - - def add_vpp_config(self): - self.test.vapi.lisp_add_del_adjacency( - leid=self._leid.packed, reid=self._reid.packed, vni=self._vni - ) - self._test.registry.register(self, self.test.logger) - - @staticmethod - def eid_equal(eid, eid_api): - if eid.eid_type != eid_api.type: - return False - - if eid_api.type == LispEIDType.PREFIX: - if eid.address.prefixlen != eid_api.address.prefix.prefixlen: - return False - - if eid.address != eid_api.address: - return False - - return True - - def query_vpp_config(self): - res = self.test.vapi.lisp_adjacencies_get(vni=self._vni) - for adj in res.adjacencies: - if self.eid_equal(self._leid, adj.leid) and self.eid_equal( - self._reid, adj.reid - ): - return True - return False - - def remove_vpp_config(self): - self.test.vapi.lisp_add_del_adjacency( - leid=self._leid.packed, reid=self._reid.packed, vni=self._vni, is_add=0 - ) - - def object_id(self): - return "lisp-adjacency-%s-%s[%d]" % (self._leid, self._reid, self._vni) diff --git a/test/asf/remote_test.py b/test/asf/remote_test.py deleted file mode 100644 index 7743c7782e4..00000000000 --- a/test/asf/remote_test.py +++ /dev/null @@ -1,431 +0,0 @@ -#!/usr/bin/env python3 - -import inspect -import os -import reprlib -import unittest -from asfframework import VppTestCase -from multiprocessing import Process, Pipe -from pickle import dumps -import sys - -from enum import IntEnum, IntFlag - - -class SerializableClassCopy: - """ - Empty class used as a basis for a serializable copy of another class. - """ - - pass - - def __repr__(self): - return "<SerializableClassCopy dict=%s>" % self.__dict__ - - -class RemoteClassAttr: - """ - Wrapper around attribute of a remotely executed class. - """ - - def __init__(self, remote, attr): - self._path = [attr] if attr else [] - self._remote = remote - - def path_to_str(self): - return ".".join(self._path) - - def get_remote_value(self): - return self._remote._remote_exec(RemoteClass.GET, self.path_to_str()) - - def __repr__(self): - return self._remote._remote_exec(RemoteClass.REPR, self.path_to_str()) - - def __str__(self): - return self._remote._remote_exec(RemoteClass.STR, self.path_to_str()) - - def __getattr__(self, attr): - if attr[0] == "_": - if not (attr.startswith("__") and attr.endswith("__")): - raise AttributeError("tried to get private attribute: %s ", attr) - self._path.append(attr) - return self - - def __setattr__(self, attr, val): - if attr[0] == "_": - if not (attr.startswith("__") and attr.endswith("__")): - super(RemoteClassAttr, self).__setattr__(attr, val) - return - self._path.append(attr) - self._remote._remote_exec(RemoteClass.SETATTR, self.path_to_str(), value=val) - - def __call__(self, *args, **kwargs): - return self._remote._remote_exec( - RemoteClass.CALL, self.path_to_str(), *args, **kwargs - ) - - -class RemoteClass(Process): - """ - This class can wrap around and adapt the interface of another class, - and then delegate its execution to a newly forked child process. - - Usage: - - #. Create a remotely executed instance of MyClass. :: - - object = RemoteClass(MyClass, arg1='foo', arg2='bar') - object.start_remote() - - #. Access the object normally as if it was an instance of your - class. :: - - object.my_attribute = 20 - print object.my_attribute - print object.my_method(object.my_attribute) - object.my_attribute.nested_attribute = 'test' - - #. If you need the value of a remote attribute, use .get_remote_value - method. This method is automatically called when needed in the - context of a remotely executed class. E.g. :: - - if (object.my_attribute.get_remote_value() > 20): - object.my_attribute2 = object.my_attribute - - #. Destroy the instance. :: - - object.quit_remote() - object.terminate() - """ - - GET = 0 # Get attribute remotely - CALL = 1 # Call method remotely - SETATTR = 2 # Set attribute remotely - REPR = 3 # Get representation of a remote object - STR = 4 # Get string representation of a remote object - QUIT = 5 # Quit remote execution - - PIPE_PARENT = 0 # Parent end of the pipe - PIPE_CHILD = 1 # Child end of the pipe - - DEFAULT_TIMEOUT = 2 # default timeout for an operation to execute - - def __init__(self, cls, *args, **kwargs): - super(RemoteClass, self).__init__() - self._cls = cls - self._args = args - self._kwargs = kwargs - self._timeout = RemoteClass.DEFAULT_TIMEOUT - self._pipe = Pipe() # pipe for input/output arguments - - def __repr__(self): - return reprlib.repr(RemoteClassAttr(self, None)) - - def __str__(self): - return str(RemoteClassAttr(self, None)) - - def __call__(self, *args, **kwargs): - return self.RemoteClassAttr(self, None)() - - def __getattr__(self, attr): - if attr[0] == "_" or not self.is_alive(): - if not (attr.startswith("__") and attr.endswith("__")): - if hasattr(super(RemoteClass, self), "__getattr__"): - return super(RemoteClass, self).__getattr__(attr) - raise AttributeError("missing: %s", attr) - return RemoteClassAttr(self, attr) - - def __setattr__(self, attr, val): - if attr[0] == "_" or not self.is_alive(): - if not (attr.startswith("__") and attr.endswith("__")): - super(RemoteClass, self).__setattr__(attr, val) - return - setattr(RemoteClassAttr(self, None), attr, val) - - def _remote_exec(self, op, path=None, *args, **kwargs): - """ - Execute given operation on a given, possibly nested, member remotely. - """ - # automatically resolve remote objects in the arguments - mutable_args = list(args) - for i, val in enumerate(mutable_args): - if isinstance(val, RemoteClass) or isinstance(val, RemoteClassAttr): - mutable_args[i] = val.get_remote_value() - args = tuple(mutable_args) - for key, val in kwargs.items(): - if isinstance(val, RemoteClass) or isinstance(val, RemoteClassAttr): - kwargs[key] = val.get_remote_value() - # send request - args = self._make_serializable(args) - kwargs = self._make_serializable(kwargs) - self._pipe[RemoteClass.PIPE_PARENT].send((op, path, args, kwargs)) - timeout = self._timeout - # adjust timeout specifically for the .sleep method - if path is not None and path.split(".")[-1] == "sleep": - if args and isinstance(args[0], (long, int)): - timeout += args[0] - elif "timeout" in kwargs: - timeout += kwargs["timeout"] - if not self._pipe[RemoteClass.PIPE_PARENT].poll(timeout): - return None - try: - rv = self._pipe[RemoteClass.PIPE_PARENT].recv() - rv = self._deserialize(rv) - return rv - except EOFError: - return None - - def _get_local_object(self, path): - """ - Follow the path to obtain a reference on the addressed nested attribute - """ - obj = self._instance - for attr in path: - obj = getattr(obj, attr) - return obj - - def _get_local_value(self, path): - try: - return self._get_local_object(path) - except AttributeError: - return None - - def _call_local_method(self, path, *args, **kwargs): - try: - method = self._get_local_object(path) - return method(*args, **kwargs) - except AttributeError: - return None - - def _set_local_attr(self, path, value): - try: - obj = self._get_local_object(path[:-1]) - setattr(obj, path[-1], value) - except AttributeError: - pass - return None - - def _get_local_repr(self, path): - try: - obj = self._get_local_object(path) - return reprlib.repr(obj) - except AttributeError: - return None - - def _get_local_str(self, path): - try: - obj = self._get_local_object(path) - return str(obj) - except AttributeError: - return None - - def _serializable(self, obj): - """Test if the given object is serializable""" - try: - dumps(obj) - return True - except: - return False - - def _make_obj_serializable(self, obj): - """ - Make a serializable copy of an object. - Members which are difficult/impossible to serialize are stripped. - """ - if self._serializable(obj): - return obj # already serializable - - copy = SerializableClassCopy() - - """ - Dictionaries can hold complex values, so we split keys and values into - separate lists and serialize them individually. - """ - if type(obj) is dict: - copy.type = type(obj) - copy.k_list = list() - copy.v_list = list() - for k, v in obj.items(): - copy.k_list.append(self._make_serializable(k)) - copy.v_list.append(self._make_serializable(v)) - return copy - - # copy at least serializable attributes and properties - for name, member in inspect.getmembers(obj): - # skip private members and non-writable dunder methods. - if name[0] == "_": - if name in ["__weakref__"]: - continue - if name in ["__dict__"]: - continue - if not (name.startswith("__") and name.endswith("__")): - continue - if callable(member) and not isinstance(member, property): - continue - if not self._serializable(member): - member = self._make_serializable(member) - setattr(copy, name, member) - return copy - - def _make_serializable(self, obj): - """ - Make a serializable copy of an object or a list/tuple of objects. - Members which are difficult/impossible to serialize are stripped. - """ - if (type(obj) is list) or (type(obj) is tuple): - rv = [] - for item in obj: - rv.append(self._make_serializable(item)) - if type(obj) is tuple: - rv = tuple(rv) - return rv - elif isinstance(obj, IntEnum) or isinstance(obj, IntFlag): - return obj.value - else: - return self._make_obj_serializable(obj) - - def _deserialize_obj(self, obj): - if hasattr(obj, "type"): - if obj.type is dict: - _obj = dict() - for k, v in zip(obj.k_list, obj.v_list): - _obj[self._deserialize(k)] = self._deserialize(v) - return _obj - return obj - - def _deserialize(self, obj): - if (type(obj) is list) or (type(obj) is tuple): - rv = [] - for item in obj: - rv.append(self._deserialize(item)) - if type(obj) is tuple: - rv = tuple(rv) - return rv - else: - return self._deserialize_obj(obj) - - def start_remote(self): - """Start remote execution""" - self.start() - - def quit_remote(self): - """Quit remote execution""" - self._remote_exec(RemoteClass.QUIT, None) - - def get_remote_value(self): - """Get value of a remotely held object""" - return RemoteClassAttr(self, None).get_remote_value() - - def set_request_timeout(self, timeout): - """Change request timeout""" - self._timeout = timeout - - def run(self): - """ - Create instance of the wrapped class and execute operations - on it as requested by the parent process. - """ - self._instance = self._cls(*self._args, **self._kwargs) - while True: - try: - rv = None - # get request from the parent process - (op, path, args, kwargs) = self._pipe[RemoteClass.PIPE_CHILD].recv() - args = self._deserialize(args) - kwargs = self._deserialize(kwargs) - path = path.split(".") if path else [] - if op == RemoteClass.GET: - rv = self._get_local_value(path) - elif op == RemoteClass.CALL: - rv = self._call_local_method(path, *args, **kwargs) - elif op == RemoteClass.SETATTR and "value" in kwargs: - self._set_local_attr(path, kwargs["value"]) - elif op == RemoteClass.REPR: - rv = self._get_local_repr(path) - elif op == RemoteClass.STR: - rv = self._get_local_str(path) - elif op == RemoteClass.QUIT: - break - else: - continue - # send return value - if not self._serializable(rv): - rv = self._make_serializable(rv) - self._pipe[RemoteClass.PIPE_CHILD].send(rv) - except EOFError: - break - self._instance = None # destroy the instance - - -@unittest.skip("Remote Vpp Test Case Class") -class RemoteVppTestCase(VppTestCase): - """Re-use VppTestCase to create remote VPP segment - - In your test case:: - - @classmethod - def setUpClass(cls): - # fork new process before client connects to VPP - cls.remote_test = RemoteClass(RemoteVppTestCase) - - # start remote process - cls.remote_test.start_remote() - - # set up your test case - super(MyTestCase, cls).setUpClass() - - # set up remote test - cls.remote_test.setUpClass(cls.tempdir) - - @classmethod - def tearDownClass(cls): - # tear down remote test - cls.remote_test.tearDownClass() - - # stop remote process - cls.remote_test.quit_remote() - - # tear down your test case - super(MyTestCase, cls).tearDownClass() - """ - - def __init__(self): - super(RemoteVppTestCase, self).__init__("emptyTest") - - # Note: __del__ is a 'Finalizer" not a 'Destructor'. - # https://docs.python.org/3/reference/datamodel.html#object.__del__ - def __del__(self): - if hasattr(self, "vpp"): - self.vpp.poll() - if self.vpp.returncode is None: - self.vpp.terminate() - self.vpp.communicate() - - @classmethod - def setUpClass(cls, tempdir): - # disable features unsupported in remote VPP - orig_env = dict(os.environ) - if "STEP" in os.environ: - del os.environ["STEP"] - if "DEBUG" in os.environ: - del os.environ["DEBUG"] - cls.tempdir_prefix = os.path.basename(tempdir) + "/" - super(RemoteVppTestCase, cls).setUpClass() - os.environ = orig_env - - @classmethod - def tearDownClass(cls): - super(RemoteVppTestCase, cls).tearDownClass() - - @unittest.skip("Empty test") - def emptyTest(self): - """Do nothing""" - pass - - def setTestFunctionInfo(self, name, doc): - """ - Store the name and documentation string of currently executed test - in the main VPP for logging purposes. - """ - self._testMethodName = name - self._testMethodDoc = doc diff --git a/test/asf/test_adl.py b/test/asf/test_adl.py index bd1602ca8bb..7e5ca8dcbe3 100644 --- a/test/asf/test_adl.py +++ b/test/asf/test_adl.py @@ -2,11 +2,10 @@ import unittest -from asfframework import VppTestCase, VppTestRunner -from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath +from asfframework import VppAsfTestCase, VppTestRunner -class TestAdl(VppTestCase): +class TestAdl(VppAsfTestCase): """Allow/Deny Plugin Unit Test Cases""" @classmethod diff --git a/test/asf/test_api_client.py b/test/asf/test_api_client.py index 97744c6ba1b..3f0fc8a1020 100644 --- a/test/asf/test_api_client.py +++ b/test/asf/test_api_client.py @@ -2,11 +2,10 @@ import unittest -from asfframework import VppTestCase, VppTestRunner -from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath +from asfframework import VppAsfTestCase, VppTestRunner -class TestAPIClient(VppTestCase): +class TestAPIClient(VppAsfTestCase): """API Internal client Test Cases""" def test_client_unittest(self): diff --git a/test/asf/test_api_trace.py b/test/asf/test_api_trace.py index e38b81a4c7d..8776a79f0ac 100644 --- a/test/asf/test_api_trace.py +++ b/test/asf/test_api_trace.py @@ -1,12 +1,10 @@ -import os import unittest -from asfframework import VppTestCase, VppTestRunner -from vpp_papi import VppEnum +from asfframework import VppAsfTestCase, VppTestRunner import json import shutil -class TestJsonApiTrace(VppTestCase): +class TestJsonApiTrace(VppAsfTestCase): """JSON API trace related tests""" @classmethod diff --git a/test/asf/test_bihash.py b/test/asf/test_bihash.py index 24639bd7a3b..b7df894be05 100644 --- a/test/asf/test_bihash.py +++ b/test/asf/test_bihash.py @@ -3,11 +3,10 @@ import unittest from config import config -from asfframework import VppTestCase, VppTestRunner -from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath +from asfframework import VppAsfTestCase, VppTestRunner -class TestBihash(VppTestCase): +class TestBihash(VppAsfTestCase): """Bihash Test Cases""" @classmethod diff --git a/test/asf/test_buffers.py b/test/asf/test_buffers.py index b3a2b6d3d69..d22326f172a 100644 --- a/test/asf/test_buffers.py +++ b/test/asf/test_buffers.py @@ -1,9 +1,9 @@ #!/usr/bin/env python3 -from asfframework import VppTestCase +from asfframework import VppAsfTestCase -class TestBuffers(VppTestCase): +class TestBuffers(VppAsfTestCase): """Buffer C Unit Tests""" @classmethod diff --git a/test/asf/test_cli.py b/test/asf/test_cli.py index 808497f63d0..25ce3330d54 100644 --- a/test/asf/test_cli.py +++ b/test/asf/test_cli.py @@ -1,16 +1,14 @@ #!/usr/bin/env python3 """CLI functional tests""" -import datetime -import time import unittest from vpp_papi import VPPIOError -from asfframework import VppTestCase, VppTestRunner +from asfframework import VppAsfTestCase, VppTestRunner -class TestCLI(VppTestCase): +class TestCLI(VppAsfTestCase): """CLI Test Case""" maxDiff = None @@ -50,7 +48,7 @@ class TestCLI(VppTestCase): self.assertEqual(rv.retval, 0) -class TestCLIExtendedVapiTimeout(VppTestCase): +class TestCLIExtendedVapiTimeout(VppAsfTestCase): maxDiff = None @classmethod diff --git a/test/asf/test_counters.py b/test/asf/test_counters.py index d3fc56a52c0..086189ae517 100644 --- a/test/asf/test_counters.py +++ b/test/asf/test_counters.py @@ -1,11 +1,10 @@ #!/usr/bin/env python3 -from asfframework import VppTestCase -from asfframework import tag_fixme_vpp_workers +from asfframework import VppAsfTestCase, tag_fixme_vpp_workers @tag_fixme_vpp_workers -class TestCounters(VppTestCase): +class TestCounters(VppAsfTestCase): """Counters C Unit Tests""" @classmethod diff --git a/test/asf/test_crypto.py b/test/asf/test_crypto.py index f39cb46470e..56c96b69575 100644 --- a/test/asf/test_crypto.py +++ b/test/asf/test_crypto.py @@ -2,10 +2,10 @@ import unittest -from asfframework import VppTestCase, VppTestRunner +from asfframework import VppAsfTestCase, VppTestRunner -class TestCrypto(VppTestCase): +class TestCrypto(VppAsfTestCase): """Crypto Test Case""" @classmethod diff --git a/test/asf/test_endian.py b/test/asf/test_endian.py index 4509ad86133..9caed0efc4a 100644 --- a/test/asf/test_endian.py +++ b/test/asf/test_endian.py @@ -12,13 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import asfframework +from asfframework import VppAsfTestCase import vpp_papi_provider F64_ONE = 1.0 -class TestEndian(asfframework.VppTestCase): +class TestEndian(VppAsfTestCase): """TestEndian""" def test_f64_endian_value(self): diff --git a/test/asf/test_fib.py b/test/asf/test_fib.py index bbc10d1c178..9d391f57ed1 100644 --- a/test/asf/test_fib.py +++ b/test/asf/test_fib.py @@ -2,12 +2,11 @@ import unittest -from asfframework import tag_fixme_vpp_workers -from asfframework import VppTestCase, VppTestRunner +from asfframework import VppAsfTestCase, VppTestRunner, tag_fixme_vpp_workers @tag_fixme_vpp_workers -class TestFIB(VppTestCase): +class TestFIB(VppAsfTestCase): """FIB Test Case""" @classmethod diff --git a/test/asf/test_http.py b/test/asf/test_http.py index fd8cb7c506a..64f911c7bfa 100644 --- a/test/asf/test_http.py +++ b/test/asf/test_http.py @@ -2,15 +2,12 @@ """ Vpp HTTP tests """ import unittest -import os -import subprocess import http.client -from asfframework import VppTestCase, VppTestRunner, Worker -from vpp_devices import VppTAPInterface +from asfframework import VppAsfTestCase, VppTestRunner @unittest.skip("Requires root") -class TestHttpTps(VppTestCase): +class TestHttpTps(VppAsfTestCase): """HTTP test class""" @classmethod diff --git a/test/asf/test_http_static.py b/test/asf/test_http_static.py index 504ffa35236..1d87f4c75bd 100644 --- a/test/asf/test_http_static.py +++ b/test/asf/test_http_static.py @@ -1,5 +1,5 @@ from config import config -from asfframework import VppTestCase, VppTestRunner +from asfframework import VppAsfTestCase, VppTestRunner import unittest import subprocess import tempfile @@ -15,7 +15,7 @@ from vpp_qemu_utils import ( "http_static" in config.excluded_plugins, "Exclude HTTP Static Server plugin tests" ) @unittest.skipIf(config.skip_netns_tests, "netns not available or disabled from cli") -class TestHttpStaticVapi(VppTestCase): +class TestHttpStaticVapi(VppAsfTestCase): """enable the http static server and send requests [VAPI]""" @classmethod @@ -82,7 +82,7 @@ class TestHttpStaticVapi(VppTestCase): "http_static" in config.excluded_plugins, "Exclude HTTP Static Server plugin tests" ) @unittest.skipIf(config.skip_netns_tests, "netns not available or disabled from cli") -class TestHttpStaticCli(VppTestCase): +class TestHttpStaticCli(VppAsfTestCase): """enable the static http server and send requests [CLI]""" @classmethod diff --git a/test/asf/test_ipfix_export.py b/test/asf/test_ipfix_export.py deleted file mode 100644 index be4239edbec..00000000000 --- a/test/asf/test_ipfix_export.py +++ /dev/null @@ -1,196 +0,0 @@ -#!/usr/bin/env python3 -from __future__ import print_function -import binascii -import random -import socket -import unittest -import time -import re - -from asfframework import VppTestCase -from vpp_object import VppObject -from vpp_pg_interface import CaptureTimeoutError -from vpp_ip_route import VppIpRoute, VppRoutePath -from ipaddress import ip_address, IPv4Address, IPv6Address -from socket import AF_INET, AF_INET6 - - -class TestIpfixExporter(VppTestCase): - """Ipfix Exporter Tests""" - - def setUp(self): - super(TestIpfixExporter, self).setUp() - self.create_pg_interfaces(range(4)) - for i in self.pg_interfaces: - i.admin_up() - i.config_ip4() - i.resolve_arp() - i.config_ip6() - i.resolve_ndp() - i.disable_ipv6_ra() - - def tearDown(self): - super(TestIpfixExporter, self).tearDown() - for i in self.pg_interfaces: - i.unconfig_ip4() - i.unconfig_ip6() - i.admin_down() - - def find_exp_by_collector_addr(self, exporters, addr): - """Find the exporter in the list of exportes with the given addr""" - - for exp in exporters: - if exp.collector_address == IPv4Address(addr): - return exp - return None - - def verify_exporter_detail( - self, exp, collector_addr, src_addr, collector_port=4739, mtu=1400, interval=20 - ): - self.assertTrue(exp is not None) - self.assert_equal(exp.collector_address, collector_addr) - self.assert_equal(exp.src_address, src_addr) - self.assert_equal(exp.collector_port, collector_port) - self.assert_equal(exp.path_mtu, mtu) - self.assert_equal(exp.template_interval, interval) - - def test_create_multipe_exporters(self): - """test that we can create and dump multiple exporters""" - - mtu = 1400 - interval = 20 - port = 4739 - - # Old API - always gives us pool index 0. - self.vapi.set_ipfix_exporter( - collector_address=self.pg1.remote_ip4, - src_address=self.pg0.local_ip4, - collector_port=4739, - path_mtu=mtu, - template_interval=interval, - ) - - exporters = self.vapi.ipfix_exporter_dump() - exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4) - self.verify_exporter_detail( - exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4) - ) - - exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get)) - exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4) - self.verify_exporter_detail( - exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4) - ) - - # create a 2nd exporter - self.vapi.ipfix_exporter_create_delete( - collector_address=self.pg2.remote_ip4, - src_address=self.pg0.local_ip4, - collector_port=4739, - path_mtu=mtu, - template_interval=interval, - is_create=True, - ) - - exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get)) - self.assertTrue(len(exporters) == 2) - exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4) - self.verify_exporter_detail( - exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4) - ) - exp = self.find_exp_by_collector_addr(exporters, self.pg2.remote_ip4) - self.verify_exporter_detail( - exp, IPv4Address(self.pg2.remote_ip4), IPv4Address(self.pg0.local_ip4) - ) - - # Create a 3rd exporter - self.vapi.ipfix_exporter_create_delete( - collector_address=self.pg3.remote_ip4, - src_address=self.pg0.local_ip4, - collector_port=4739, - path_mtu=mtu, - template_interval=interval, - is_create=True, - ) - - exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get)) - self.assertTrue(len(exporters) == 3) - exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4) - self.verify_exporter_detail( - exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4) - ) - exp = self.find_exp_by_collector_addr(exporters, self.pg2.remote_ip4) - self.verify_exporter_detail( - exp, IPv4Address(self.pg2.remote_ip4), IPv4Address(self.pg0.local_ip4) - ) - exp = self.find_exp_by_collector_addr(exporters, self.pg3.remote_ip4) - self.verify_exporter_detail( - exp, IPv4Address(self.pg3.remote_ip4), IPv4Address(self.pg0.local_ip4) - ) - - # Modify the 2nd exporter. - self.vapi.ipfix_exporter_create_delete( - collector_address=self.pg2.remote_ip4, - src_address=self.pg0.local_ip4, - collector_port=4739, - path_mtu=mtu + 1, - template_interval=interval + 1, - is_create=True, - ) - - exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get)) - self.assertTrue(len(exporters) == 3) - exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4) - self.verify_exporter_detail( - exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4) - ) - exp = self.find_exp_by_collector_addr(exporters, self.pg2.remote_ip4) - self.verify_exporter_detail( - exp, - IPv4Address(self.pg2.remote_ip4), - IPv4Address(self.pg0.local_ip4), - mtu=mtu + 1, - interval=interval + 1, - ) - exp = self.find_exp_by_collector_addr(exporters, self.pg3.remote_ip4) - self.verify_exporter_detail( - exp, IPv4Address(self.pg3.remote_ip4), IPv4Address(self.pg0.local_ip4) - ) - - # Delete 2nd exporter - self.vapi.ipfix_exporter_create_delete( - collector_address=self.pg2.remote_ip4, - src_address=self.pg0.local_ip4, - collector_port=4739, - path_mtu=mtu, - template_interval=interval, - is_create=False, - ) - - exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get)) - self.assertTrue(len(exporters) == 2) - exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4) - self.verify_exporter_detail( - exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4) - ) - exp = self.find_exp_by_collector_addr(exporters, self.pg3.remote_ip4) - self.verify_exporter_detail( - exp, IPv4Address(self.pg3.remote_ip4), IPv4Address(self.pg0.local_ip4) - ) - - # Delete final exporter (exporter in slot 0 can not be deleted) - self.vapi.ipfix_exporter_create_delete( - collector_address=self.pg3.remote_ip4, - src_address=self.pg0.local_ip4, - collector_port=4739, - path_mtu=mtu, - template_interval=interval, - is_create=False, - ) - - exporters = list(self.vapi.vpp.details_iter(self.vapi.ipfix_all_exporter_get)) - self.assertTrue(len(exporters) == 1) - exp = self.find_exp_by_collector_addr(exporters, self.pg1.remote_ip4) - self.verify_exporter_detail( - exp, IPv4Address(self.pg1.remote_ip4), IPv4Address(self.pg0.local_ip4) - ) diff --git a/test/asf/test_ipsec_default.py b/test/asf/test_ipsec_default.py deleted file mode 100644 index 2fefb771c48..00000000000 --- a/test/asf/test_ipsec_default.py +++ /dev/null @@ -1,199 +0,0 @@ -import socket -import unittest - -from util import ppp -from asfframework import VppTestRunner -from template_ipsec import IpsecDefaultTemplate - -""" -When an IPSec SPD is configured on an interface, any inbound packets -not matching inbound policies, or outbound packets not matching outbound -policies, must be dropped by default as per RFC4301. - -This test uses simple IPv4 forwarding on interfaces with IPSec enabled -to check if packets with no matching rules are dropped by default. - -The basic setup is a single SPD bound to two interfaces, pg0 and pg1. - - ┌────┐ ┌────┐ - │SPD1│ │SPD1│ - ├────┤ ─────> ├────┤ - │PG0 │ │PG1 │ - └────┘ └────┘ - -First, both inbound and outbound BYPASS policies are configured allowing -traffic to pass from pg0 -> pg1. - -Packets are captured and verified at pg1. - -Then either the inbound or outbound policies are removed and we verify -packets are dropped as expected. - -""" - - -class IPSecInboundDefaultDrop(IpsecDefaultTemplate): - """IPSec: inbound packets drop by default with no matching rule""" - - def test_ipsec_inbound_default_drop(self): - # configure two interfaces and bind the same SPD to both - self.create_interfaces(2) - self.spd_create_and_intf_add(1, self.pg_interfaces) - pkt_count = 5 - - # catch-all inbound BYPASS policy, all interfaces - inbound_policy = self.spd_add_rem_policy( - 1, - None, - None, - socket.IPPROTO_UDP, - is_out=0, - priority=10, - policy_type="bypass", - all_ips=True, - ) - - # outbound BYPASS policy allowing traffic from pg0->pg1 - outbound_policy = self.spd_add_rem_policy( - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ) - - # create a packet stream pg0->pg1 + add to pg0 - packets0 = self.create_stream(self.pg0, self.pg1, pkt_count) - self.pg0.add_stream(packets0) - - # with inbound BYPASS rule at pg0, we expect to see forwarded - # packets on pg1 - self.pg_interfaces[1].enable_capture() - self.pg_start() - cap1 = self.pg1.get_capture() - for packet in cap1: - try: - self.logger.debug(ppp("SPD - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - self.logger.debug("SPD: Num packets: %s", len(cap1.res)) - # verify captures on pg1 - self.verify_capture(self.pg0, self.pg1, cap1) - # verify policies matched correct number of times - self.verify_policy_match(pkt_count, inbound_policy) - self.verify_policy_match(pkt_count, outbound_policy) - - # remove inbound catch-all BYPASS rule, traffic should now be dropped - self.spd_add_rem_policy( # inbound, all interfaces - 1, - None, - None, - socket.IPPROTO_UDP, - is_out=0, - priority=10, - policy_type="bypass", - all_ips=True, - remove=True, - ) - - # create another packet stream pg0->pg1 + add to pg0 - packets1 = self.create_stream(self.pg0, self.pg1, pkt_count) - self.pg0.add_stream(packets1) - self.pg_interfaces[1].enable_capture() - self.pg_start() - # confirm traffic has now been dropped - self.pg1.assert_nothing_captured( - remark="inbound pkts with no matching" "rules NOT dropped by default" - ) - # both policies should not have matched any further packets - # since we've dropped at input stage - self.verify_policy_match(pkt_count, outbound_policy) - self.verify_policy_match(pkt_count, inbound_policy) - - -class IPSecOutboundDefaultDrop(IpsecDefaultTemplate): - """IPSec: outbound packets drop by default with no matching rule""" - - def test_ipsec_inbound_default_drop(self): - # configure two interfaces and bind the same SPD to both - self.create_interfaces(2) - self.spd_create_and_intf_add(1, self.pg_interfaces) - pkt_count = 5 - - # catch-all inbound BYPASS policy, all interfaces - inbound_policy = self.spd_add_rem_policy( - 1, - None, - None, - socket.IPPROTO_UDP, - is_out=0, - priority=10, - policy_type="bypass", - all_ips=True, - ) - - # outbound BYPASS policy allowing traffic from pg0->pg1 - outbound_policy = self.spd_add_rem_policy( - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ) - - # create a packet stream pg0->pg1 + add to pg0 - packets0 = self.create_stream(self.pg0, self.pg1, pkt_count) - self.pg0.add_stream(packets0) - - # with outbound BYPASS rule allowing pg0->pg1, we expect to see - # forwarded packets on pg1 - self.pg_interfaces[1].enable_capture() - self.pg_start() - cap1 = self.pg1.get_capture() - for packet in cap1: - try: - self.logger.debug(ppp("SPD - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - self.logger.debug("SPD: Num packets: %s", len(cap1.res)) - # verify captures on pg1 - self.verify_capture(self.pg0, self.pg1, cap1) - # verify policies matched correct number of times - self.verify_policy_match(pkt_count, inbound_policy) - self.verify_policy_match(pkt_count, outbound_policy) - - # remove outbound rule - self.spd_add_rem_policy( - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - remove=True, - ) - - # create another packet stream pg0->pg1 + add to pg0 - packets1 = self.create_stream(self.pg0, self.pg1, pkt_count) - self.pg0.add_stream(packets1) - self.pg_interfaces[1].enable_capture() - self.pg_start() - # confirm traffic was dropped and not forwarded - self.pg1.assert_nothing_captured( - remark="outbound pkts with no matching rules NOT dropped " "by default" - ) - # inbound rule should have matched twice the # of pkts now - self.verify_policy_match(pkt_count * 2, inbound_policy) - # as dropped at outbound, outbound policy is the same - self.verify_policy_match(pkt_count, outbound_policy) - - -if __name__ == "__main__": - unittest.main(testRunner=VppTestRunner) diff --git a/test/asf/test_ipsec_spd_flow_cache_input.py b/test/asf/test_ipsec_spd_flow_cache_input.py deleted file mode 100644 index bab130dfa18..00000000000 --- a/test/asf/test_ipsec_spd_flow_cache_input.py +++ /dev/null @@ -1,866 +0,0 @@ -from os import remove -import socket -import unittest - -from util import ppp -from asfframework import VppTestRunner -from template_ipsec import SpdFlowCacheTemplate - - -class SpdFlowCacheInbound(SpdFlowCacheTemplate): - # Override setUpConstants to enable inbound flow cache in config - @classmethod - def setUpConstants(cls): - super(SpdFlowCacheInbound, cls).setUpConstants() - cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-inbound-spd-flow-cache on", "}"]) - cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline)) - - -class IPSec4SpdTestCaseBypass(SpdFlowCacheInbound): - """ IPSec/IPv4 inbound: Policy mode test case with flow cache \ - (add bypass)""" - - def test_ipsec_spd_inbound_bypass(self): - # In this test case, packets in IPv4 FWD path are configured - # to go through IPSec inbound SPD policy lookup. - # - # 2 inbound SPD rules (1 HIGH and 1 LOW) are added. - # - High priority rule action is set to DISCARD. - # - Low priority rule action is set to BYPASS. - # - # Since BYPASS rules take precedence over DISCARD - # (the order being PROTECT, BYPASS, DISCARD) we expect the - # BYPASS rule to match and traffic to be correctly forwarded. - self.create_interfaces(2) - pkt_count = 5 - - self.spd_create_and_intf_add(1, [self.pg1, self.pg0]) - - # create input rules - # bypass rule should take precedence over discard rule, - # even though it's lower priority - policy_0 = self.spd_add_rem_policy( # inbound, priority 10 - 1, - self.pg1, - self.pg0, - socket.IPPROTO_UDP, - is_out=0, - priority=10, - policy_type="bypass", - ) - policy_1 = self.spd_add_rem_policy( # inbound, priority 15 - 1, - self.pg1, - self.pg0, - socket.IPPROTO_UDP, - is_out=0, - priority=15, - policy_type="discard", - ) - - # create output rule so we can capture forwarded packets - policy_2 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ) - - # check flow cache is empty before sending traffic - self.verify_num_inbound_flow_cache_entries(0) - # create the packet stream - packets = self.create_stream(self.pg0, self.pg1, pkt_count) - # add the stream to the source interface - self.pg0.add_stream(packets) - self.pg1.enable_capture() - self.pg_start() - - # check capture on pg1 - capture = self.pg1.get_capture() - for packet in capture: - try: - self.logger.debug(ppp("SPD Add - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - self.logger.debug("SPD: Num packets: %s", len(capture.res)) - - # verify captured packets - self.verify_capture(self.pg0, self.pg1, capture) - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(0, policy_1) - self.verify_policy_match(pkt_count, policy_2) - # check input policy has been cached - self.verify_num_inbound_flow_cache_entries(1) - - -class IPSec4SpdTestCaseDiscard(SpdFlowCacheInbound): - """ IPSec/IPv4 inbound: Policy mode test case with flow cache \ - (add discard)""" - - def test_ipsec_spd_inbound_discard(self): - # In this test case, packets in IPv4 FWD path are configured - # to go through IPSec inbound SPD policy lookup. - # 1 DISCARD rule is added, so all traffic should be dropped. - self.create_interfaces(2) - pkt_count = 5 - - self.spd_create_and_intf_add(1, [self.pg1, self.pg0]) - - # create input rule - policy_0 = self.spd_add_rem_policy( # inbound, priority 10 - 1, - self.pg1, - self.pg0, - socket.IPPROTO_UDP, - is_out=0, - priority=10, - policy_type="discard", - ) - - # create output rule so we can capture forwarded packets - policy_1 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ) - - # check flow cache is empty before sending traffic - self.verify_num_inbound_flow_cache_entries(0) - # create the packet stream - packets = self.create_stream(self.pg0, self.pg1, pkt_count) - # add the stream to the source interface - self.pg0.add_stream(packets) - self.pg1.enable_capture() - self.pg_start() - # inbound discard rule should have dropped traffic - self.pg1.assert_nothing_captured() - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(0, policy_1) - # only inbound discard rule should have been cached - self.verify_num_inbound_flow_cache_entries(1) - - -class IPSec4SpdTestCaseRemoveInbound(SpdFlowCacheInbound): - """ IPSec/IPv4 inbound: Policy mode test case with flow cache \ - (remove bypass)""" - - def test_ipsec_spd_inbound_remove(self): - # In this test case, packets in IPv4 FWD path are configured - # to go through IPSec inbound SPD policy lookup. - # - # 2 inbound SPD rules (1 HIGH and 1 LOW) are added. - # - High priority rule action is set to DISCARD. - # - Low priority rule action is set to BYPASS. - # - # Since BYPASS rules take precedence over DISCARD - # (the order being PROTECT, BYPASS, DISCARD) we expect the - # BYPASS rule to match and traffic to be correctly forwarded. - # - # The BYPASS rules is then removed, and we check that all traffic - # is now correctly dropped. - self.create_interfaces(2) - pkt_count = 5 - - self.spd_create_and_intf_add(1, [self.pg1, self.pg0]) - - # create input rules - # bypass rule should take precedence over discard rule, - # even though it's lower priority - policy_0 = self.spd_add_rem_policy( # inbound, priority 10 - 1, - self.pg1, - self.pg0, - socket.IPPROTO_UDP, - is_out=0, - priority=10, - policy_type="bypass", - ) - policy_1 = self.spd_add_rem_policy( # inbound, priority 15 - 1, - self.pg1, - self.pg0, - socket.IPPROTO_UDP, - is_out=0, - priority=15, - policy_type="discard", - ) - - # create output rule so we can capture forwarded packets - policy_2 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ) - - # check flow cache is empty before sending traffic - self.verify_num_inbound_flow_cache_entries(0) - # create the packet stream - packets = self.create_stream(self.pg0, self.pg1, pkt_count) - # add the stream to the source interface - self.pg0.add_stream(packets) - self.pg1.enable_capture() - self.pg_start() - - # check capture on pg1 - capture = self.pg1.get_capture() - for packet in capture: - try: - self.logger.debug(ppp("SPD Add - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - self.logger.debug("SPD: Num packets: %s", len(capture.res)) - - # verify captured packets - self.verify_capture(self.pg0, self.pg1, capture) - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(0, policy_1) - self.verify_policy_match(pkt_count, policy_2) - # check input policy has been cached - self.verify_num_inbound_flow_cache_entries(1) - - # remove the input bypass rule - self.spd_add_rem_policy( # inbound, priority 10 - 1, - self.pg1, - self.pg0, - socket.IPPROTO_UDP, - is_out=0, - priority=10, - policy_type="bypass", - remove=True, - ) - # verify flow cache counter has been reset by rule removal - self.verify_num_inbound_flow_cache_entries(0) - - # resend the same packets - self.pg0.add_stream(packets) - self.pg1.enable_capture() # flush the old capture - self.pg_start() - - # inbound discard rule should have dropped traffic - self.pg1.assert_nothing_captured() - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(pkt_count, policy_1) - self.verify_policy_match(pkt_count, policy_2) - # by removing the bypass rule, we should have reset the flow cache - # we only expect the discard rule to now be in the flow cache - self.verify_num_inbound_flow_cache_entries(1) - - -class IPSec4SpdTestCaseReaddInbound(SpdFlowCacheInbound): - """ IPSec/IPv4 inbound: Policy mode test case with flow cache \ - (add, remove, re-add bypass)""" - - def test_ipsec_spd_inbound_readd(self): - # In this test case, packets in IPv4 FWD path are configured - # to go through IPSec inbound SPD policy lookup. - # - # 2 inbound SPD rules (1 HIGH and 1 LOW) are added. - # - High priority rule action is set to DISCARD. - # - Low priority rule action is set to BYPASS. - # - # Since BYPASS rules take precedence over DISCARD - # (the order being PROTECT, BYPASS, DISCARD) we expect the - # BYPASS rule to match and traffic to be correctly forwarded. - # - # The BYPASS rules is then removed, and we check that all traffic - # is now correctly dropped. - # - # The BYPASS rule is then readded, checking traffic is not forwarded - # correctly again - self.create_interfaces(2) - pkt_count = 5 - - self.spd_create_and_intf_add(1, [self.pg1, self.pg0]) - - # create input rules - # bypass rule should take precedence over discard rule, - # even though it's lower priority - policy_0 = self.spd_add_rem_policy( # inbound, priority 10 - 1, - self.pg1, - self.pg0, - socket.IPPROTO_UDP, - is_out=0, - priority=10, - policy_type="bypass", - ) - policy_1 = self.spd_add_rem_policy( # inbound, priority 15 - 1, - self.pg1, - self.pg0, - socket.IPPROTO_UDP, - is_out=0, - priority=15, - policy_type="discard", - ) - - # create output rule so we can capture forwarded packets - policy_2 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ) - - # check flow cache is empty before sending traffic - self.verify_num_inbound_flow_cache_entries(0) - # create the packet stream - packets = self.create_stream(self.pg0, self.pg1, pkt_count) - # add the stream to the source interface - self.pg0.add_stream(packets) - self.pg1.enable_capture() - self.pg_start() - - # check capture on pg1 - capture = self.pg1.get_capture() - for packet in capture: - try: - self.logger.debug(ppp("SPD Add - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - self.logger.debug("SPD: Num packets: %s", len(capture.res)) - - # verify captured packets - self.verify_capture(self.pg0, self.pg1, capture) - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(0, policy_1) - self.verify_policy_match(pkt_count, policy_2) - # check input policy has been cached - self.verify_num_inbound_flow_cache_entries(1) - - # remove the input bypass rule - self.spd_add_rem_policy( # inbound, priority 10 - 1, - self.pg1, - self.pg0, - socket.IPPROTO_UDP, - is_out=0, - priority=10, - policy_type="bypass", - remove=True, - ) - # verify flow cache counter has been reset by rule removal - self.verify_num_inbound_flow_cache_entries(0) - - # resend the same packets - self.pg0.add_stream(packets) - self.pg1.enable_capture() # flush the old capture - self.pg_start() - - # inbound discard rule should have dropped traffic - self.pg1.assert_nothing_captured() - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(pkt_count, policy_1) - self.verify_policy_match(pkt_count, policy_2) - # by removing the bypass rule, flow cache was reset - # we only expect the discard rule to now be in the flow cache - self.verify_num_inbound_flow_cache_entries(1) - - # readd the input bypass rule - policy_0 = self.spd_add_rem_policy( # inbound, priority 10 - 1, - self.pg1, - self.pg0, - socket.IPPROTO_UDP, - is_out=0, - priority=10, - policy_type="bypass", - ) - # verify flow cache counter has been reset by rule addition - self.verify_num_inbound_flow_cache_entries(0) - - # resend the same packets - self.pg0.add_stream(packets) - self.pg1.enable_capture() # flush the old capture - self.pg_start() - - # check capture on pg1 - capture = self.pg1.get_capture() - for packet in capture: - try: - self.logger.debug(ppp("SPD Add - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - - # verify captured packets - self.verify_capture(self.pg0, self.pg1, capture) - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(pkt_count, policy_1) - self.verify_policy_match(pkt_count * 2, policy_2) - # by readding the bypass rule, we reset the flow cache - # we only expect the bypass rule to now be in the flow cache - self.verify_num_inbound_flow_cache_entries(1) - - -class IPSec4SpdTestCaseMultipleInbound(SpdFlowCacheInbound): - """ IPSec/IPv4 inbound: Policy mode test case with flow cache \ - (multiple interfaces, multiple rules)""" - - def test_ipsec_spd_inbound_multiple(self): - # In this test case, packets in IPv4 FWD path are configured to go - # through IPSec outbound SPD policy lookup. - # - # Multiples rules on multiple interfaces are tested at the same time. - # 3x interfaces are configured, binding the same SPD to each. - # Each interface has 1 SPD rule- 2x BYPASS and 1x DISCARD - # - # Traffic should be forwarded with destinations pg1 & pg2 - # and dropped to pg0. - self.create_interfaces(3) - pkt_count = 5 - # bind SPD to all interfaces - self.spd_create_and_intf_add(1, self.pg_interfaces) - # add input rules on all interfaces - # pg0 -> pg1 - policy_0 = self.spd_add_rem_policy( # inbound, priority 10 - 1, - self.pg1, - self.pg0, - socket.IPPROTO_UDP, - is_out=0, - priority=10, - policy_type="bypass", - ) - # pg1 -> pg2 - policy_1 = self.spd_add_rem_policy( # inbound, priority 10 - 1, - self.pg2, - self.pg1, - socket.IPPROTO_UDP, - is_out=0, - priority=10, - policy_type="bypass", - ) - # pg2 -> pg0 - policy_2 = self.spd_add_rem_policy( # inbound, priority 10 - 1, - self.pg0, - self.pg2, - socket.IPPROTO_UDP, - is_out=0, - priority=10, - policy_type="discard", - ) - - # create output rules covering the the full ip range - # 0.0.0.0 -> 255.255.255.255, so we can capture forwarded packets - policy_3 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg0, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - all_ips=True, - ) - - # check flow cache is empty (0 active elements) before sending traffic - self.verify_num_inbound_flow_cache_entries(0) - - # create the packet streams - packets0 = self.create_stream(self.pg0, self.pg1, pkt_count) - packets1 = self.create_stream(self.pg1, self.pg2, pkt_count) - packets2 = self.create_stream(self.pg2, self.pg0, pkt_count) - # add the streams to the source interfaces - self.pg0.add_stream(packets0) - self.pg1.add_stream(packets1) - self.pg2.add_stream(packets2) - # enable capture on all interfaces - for pg in self.pg_interfaces: - pg.enable_capture() - # start the packet generator - self.pg_start() - - # get captures from ifs - if_caps = [] - for pg in [self.pg1, self.pg2]: # we are expecting captures on pg1/pg2 - if_caps.append(pg.get_capture()) - for packet in if_caps[-1]: - try: - self.logger.debug(ppp("SPD Add - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - - # verify captures that matched BYPASS rules - self.verify_capture(self.pg0, self.pg1, if_caps[0]) - self.verify_capture(self.pg1, self.pg2, if_caps[1]) - # verify that traffic to pg0 matched DISCARD rule and was dropped - self.pg0.assert_nothing_captured() - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(pkt_count, policy_1) - self.verify_policy_match(pkt_count, policy_2) - # check flow/policy match was cached for: 3x input policies - self.verify_num_inbound_flow_cache_entries(3) - - -class IPSec4SpdTestCaseOverwriteStaleInbound(SpdFlowCacheInbound): - """ IPSec/IPv4 inbound: Policy mode test case with flow cache \ - (overwrite stale entries)""" - - def test_ipsec_spd_inbound_overwrite(self): - # The operation of the flow cache is setup so that the entire cache - # is invalidated when adding or removing an SPD policy rule. - # For performance, old cache entries are not zero'd, but remain - # in the table as "stale" entries. If a flow matches a stale entry, - # and the epoch count does NOT match the current count, the entry - # is overwritten. - # In this test, 3 active rules are created and matched to enter - # them into the flow cache. - # A single entry is removed to invalidate the entire cache. - # We then readd the rule and test that overwriting of the previous - # stale entries occurs as expected, and that the flow cache entry - # counter is updated correctly. - self.create_interfaces(3) - pkt_count = 5 - # bind SPD to all interfaces - self.spd_create_and_intf_add(1, self.pg_interfaces) - # add input rules on all interfaces - # pg0 -> pg1 - policy_0 = self.spd_add_rem_policy( # inbound - 1, - self.pg1, - self.pg0, - socket.IPPROTO_UDP, - is_out=0, - priority=10, - policy_type="bypass", - ) - # pg1 -> pg2 - policy_1 = self.spd_add_rem_policy( # inbound - 1, - self.pg2, - self.pg1, - socket.IPPROTO_UDP, - is_out=0, - priority=10, - policy_type="bypass", - ) - # pg2 -> pg0 - policy_2 = self.spd_add_rem_policy( # inbound - 1, - self.pg0, - self.pg2, - socket.IPPROTO_UDP, - is_out=0, - priority=10, - policy_type="discard", - ) - - # create output rules covering the the full ip range - # 0.0.0.0 -> 255.255.255.255, so we can capture forwarded packets - policy_3 = self.spd_add_rem_policy( # outbound - 1, - self.pg0, - self.pg0, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - all_ips=True, - ) - - # check flow cache is empty (0 active elements) before sending traffic - self.verify_num_inbound_flow_cache_entries(0) - - # create the packet streams - packets0 = self.create_stream(self.pg0, self.pg1, pkt_count) - packets1 = self.create_stream(self.pg1, self.pg2, pkt_count) - packets2 = self.create_stream(self.pg2, self.pg0, pkt_count) - # add the streams to the source interfaces - self.pg0.add_stream(packets0) - self.pg1.add_stream(packets1) - self.pg2.add_stream(packets2) - # enable capture on all interfaces - for pg in self.pg_interfaces: - pg.enable_capture() - # start the packet generator - self.pg_start() - - # get captures from ifs - if_caps = [] - for pg in [self.pg1, self.pg2]: # we are expecting captures on pg1/pg2 - if_caps.append(pg.get_capture()) - for packet in if_caps[-1]: - try: - self.logger.debug(ppp("SPD Add - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - - # verify captures that matched BYPASS rules - self.verify_capture(self.pg0, self.pg1, if_caps[0]) - self.verify_capture(self.pg1, self.pg2, if_caps[1]) - # verify that traffic to pg0 matched DISCARD rule and was dropped - self.pg0.assert_nothing_captured() - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(pkt_count, policy_1) - self.verify_policy_match(pkt_count, policy_2) - # check flow/policy match was cached for: 3x input policies - self.verify_num_inbound_flow_cache_entries(3) - - # adding an outbound policy should not invalidate output flow cache - self.spd_add_rem_policy( # outbound - 1, - self.pg0, - self.pg0, - socket.IPPROTO_UDP, - is_out=1, - priority=1, - policy_type="bypass", - all_ips=True, - ) - # check inbound flow cache counter has not been reset - self.verify_num_inbound_flow_cache_entries(3) - - # remove + readd bypass policy - flow cache counter will be reset, - # and there will be 3x stale entries in flow cache - self.spd_add_rem_policy( # inbound, priority 10 - 1, - self.pg1, - self.pg0, - socket.IPPROTO_UDP, - is_out=0, - priority=10, - policy_type="bypass", - remove=True, - ) - # readd policy - policy_0 = self.spd_add_rem_policy( # inbound, priority 10 - 1, - self.pg1, - self.pg0, - socket.IPPROTO_UDP, - is_out=0, - priority=10, - policy_type="bypass", - ) - # check counter was reset - self.verify_num_inbound_flow_cache_entries(0) - - # resend the same packets - self.pg0.add_stream(packets0) - self.pg1.add_stream(packets1) - self.pg2.add_stream(packets2) - for pg in self.pg_interfaces: - pg.enable_capture() # flush previous captures - self.pg_start() - - # get captures from ifs - if_caps = [] - for pg in [self.pg1, self.pg2]: # we are expecting captures on pg1/pg2 - if_caps.append(pg.get_capture()) - for packet in if_caps[-1]: - try: - self.logger.debug(ppp("SPD Add - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - - # verify captures that matched BYPASS rules - self.verify_capture(self.pg0, self.pg1, if_caps[0]) - self.verify_capture(self.pg1, self.pg2, if_caps[1]) - # verify that traffic to pg0 matched DISCARD rule and was dropped - self.pg0.assert_nothing_captured() - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(pkt_count * 2, policy_1) - self.verify_policy_match(pkt_count * 2, policy_2) - # we are overwriting 3x stale entries - check flow cache counter - # is correct - self.verify_num_inbound_flow_cache_entries(3) - - -class IPSec4SpdTestCaseCollisionInbound(SpdFlowCacheInbound): - """ IPSec/IPv4 inbound: Policy mode test case with flow cache \ - (hash collision)""" - - # Override class setup to restrict hash table size to 16 buckets. - # This forces using only the lower 4 bits of the hash as a key, - # making hash collisions easy to find. - @classmethod - def setUpConstants(cls): - super(SpdFlowCacheInbound, cls).setUpConstants() - cls.vpp_cmdline.extend( - [ - "ipsec", - "{", - "ipv4-inbound-spd-flow-cache on", - "ipv4-inbound-spd-hash-buckets 16", - "}", - ] - ) - cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline)) - - def test_ipsec_spd_inbound_collision(self): - # The flow cache operation is setup to overwrite an entry - # if a hash collision occurs. - # In this test, 2 packets are configured that result in a - # hash with the same lower 4 bits. - # After the first packet is received, there should be one - # active entry in the flow cache. - # After the second packet with the same lower 4 bit hash - # is received, this should overwrite the same entry. - # Therefore there will still be a total of one (1) entry, - # in the flow cache with two matching policies. - # crc32_supported() method is used to check cpu for crc32 - # intrinsic support for hashing. - # If crc32 is not supported, we fall back to clib_xxhash() - self.create_interfaces(4) - pkt_count = 5 - # bind SPD to all interfaces - self.spd_create_and_intf_add(1, self.pg_interfaces) - - # create output rules covering the the full ip range - # 0.0.0.0 -> 255.255.255.255, so we can capture forwarded packets - policy_0 = self.spd_add_rem_policy( # outbound - 1, - self.pg0, - self.pg0, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - all_ips=True, - ) - - capture_intfs = [] - if self.crc32_supported(): # create crc32 collision on last 4 bits - hashed_with_crc32 = True - # add matching rules - policy_1 = self.spd_add_rem_policy( # inbound, priority 10 - 1, - self.pg1, - self.pg2, - socket.IPPROTO_UDP, - is_out=0, - priority=10, - policy_type="bypass", - ) - policy_2 = self.spd_add_rem_policy( # inbound, priority 10 - 1, - self.pg3, - self.pg0, - socket.IPPROTO_UDP, - is_out=0, - priority=10, - policy_type="bypass", - ) - - # we expect to get captures on pg1 + pg3 - capture_intfs.append(self.pg1) - capture_intfs.append(self.pg3) - - # check flow cache is empty before sending traffic - self.verify_num_inbound_flow_cache_entries(0) - - # create the packet streams - # packet hashes to: - # ad727628 - packets1 = self.create_stream(self.pg2, self.pg1, pkt_count, 1, 1) - # b5512898 - packets2 = self.create_stream(self.pg0, self.pg3, pkt_count, 1, 1) - # add the streams to the source interfaces - self.pg2.add_stream(packets1) - self.pg0.add_stream(packets2) - else: # create xxhash collision on last 4 bits - hashed_with_crc32 = False - # add matching rules - policy_1 = self.spd_add_rem_policy( # inbound, priority 10 - 1, - self.pg1, - self.pg2, - socket.IPPROTO_UDP, - is_out=0, - priority=10, - policy_type="bypass", - ) - policy_2 = self.spd_add_rem_policy( # inbound, priority 10 - 1, - self.pg2, - self.pg3, - socket.IPPROTO_UDP, - is_out=0, - priority=10, - policy_type="bypass", - ) - - capture_intfs.append(self.pg1) - capture_intfs.append(self.pg2) - - # check flow cache is empty before sending traffic - self.verify_num_inbound_flow_cache_entries(0) - - # create the packet streams - # 2f8f90f557eef12c - packets1 = self.create_stream(self.pg2, self.pg1, pkt_count, 1, 1) - # 6b7f9987719ffc1c - packets2 = self.create_stream(self.pg3, self.pg2, pkt_count, 1, 1) - # add the streams to the source interfaces - self.pg2.add_stream(packets1) - self.pg3.add_stream(packets2) - - # enable capture on interfaces we expect capture on & send pkts - for pg in capture_intfs: - pg.enable_capture() - self.pg_start() - - # get captures - if_caps = [] - for pg in capture_intfs: - if_caps.append(pg.get_capture()) - for packet in if_caps[-1]: - try: - self.logger.debug(ppp("SPD Add - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - - # verify captures that matched BYPASS rule - if hashed_with_crc32: - self.verify_capture(self.pg2, self.pg1, if_caps[0]) - self.verify_capture(self.pg0, self.pg3, if_caps[1]) - else: # hashed with xxhash - self.verify_capture(self.pg2, self.pg1, if_caps[0]) - self.verify_capture(self.pg3, self.pg2, if_caps[1]) - - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_1) - self.verify_policy_match(pkt_count, policy_2) - self.verify_policy_match(pkt_count * 2, policy_0) # output policy - # we have matched 2 policies, but due to the hash collision - # one active entry is expected - self.verify_num_inbound_flow_cache_entries(1) - - -if __name__ == "__main__": - unittest.main(testRunner=VppTestRunner) diff --git a/test/asf/test_ipsec_spd_flow_cache_output.py b/test/asf/test_ipsec_spd_flow_cache_output.py deleted file mode 100644 index ec68c3ed7d8..00000000000 --- a/test/asf/test_ipsec_spd_flow_cache_output.py +++ /dev/null @@ -1,765 +0,0 @@ -import socket -import unittest - -from util import ppp -from asfframework import VppTestRunner -from template_ipsec import SpdFlowCacheTemplate - - -class SpdFlowCacheOutbound(SpdFlowCacheTemplate): - # Override setUpConstants to enable outbound flow cache in config - @classmethod - def setUpConstants(cls): - super(SpdFlowCacheOutbound, cls).setUpConstants() - cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-outbound-spd-flow-cache on", "}"]) - cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline)) - - -class IPSec4SpdTestCaseAdd(SpdFlowCacheOutbound): - """ IPSec/IPv4 outbound: Policy mode test case with flow cache \ - (add rule)""" - - def test_ipsec_spd_outbound_add(self): - # In this test case, packets in IPv4 FWD path are configured - # to go through IPSec outbound SPD policy lookup. - # 2 SPD rules (1 HIGH and 1 LOW) are added. - # High priority rule action is set to BYPASS. - # Low priority rule action is set to DISCARD. - # Traffic sent on pg0 interface should match high priority - # rule and should be sent out on pg1 interface. - self.create_interfaces(2) - pkt_count = 5 - self.spd_create_and_intf_add(1, [self.pg1]) - policy_0 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ) - policy_1 = self.spd_add_rem_policy( # outbound, priority 5 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=5, - policy_type="discard", - ) - - # check flow cache is empty before sending traffic - self.verify_num_outbound_flow_cache_entries(0) - - # create the packet stream - packets = self.create_stream(self.pg0, self.pg1, pkt_count) - # add the stream to the source interface + enable capture - self.pg0.add_stream(packets) - self.pg0.enable_capture() - self.pg1.enable_capture() - # start the packet generator - self.pg_start() - # get capture - capture = self.pg1.get_capture() - for packet in capture: - try: - self.logger.debug(ppp("SPD - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - self.logger.debug("SPD: Num packets: %s", len(capture.res)) - - # assert nothing captured on pg0 - self.pg0.assert_nothing_captured() - # verify captured packets - self.verify_capture(self.pg0, self.pg1, capture) - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(0, policy_1) - # check policy in SPD has been cached after traffic - # matched BYPASS rule in SPD - self.verify_num_outbound_flow_cache_entries(1) - - -class IPSec4SpdTestCaseRemoveOutbound(SpdFlowCacheOutbound): - """ IPSec/IPv4 outbound: Policy mode test case with flow cache \ - (remove rule)""" - - def test_ipsec_spd_outbound_remove(self): - # In this test case, packets in IPv4 FWD path are configured - # to go through IPSec outbound SPD policy lookup. - # 2 SPD rules (1 HIGH and 1 LOW) are added. - # High priority rule action is set to BYPASS. - # Low priority rule action is set to DISCARD. - # High priority rule is then removed. - # Traffic sent on pg0 interface should match low priority - # rule and should be discarded after SPD lookup. - self.create_interfaces(2) - pkt_count = 5 - self.spd_create_and_intf_add(1, [self.pg1]) - policy_0 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ) - policy_1 = self.spd_add_rem_policy( # outbound, priority 5 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=5, - policy_type="discard", - ) - - # check flow cache is empty before sending traffic - self.verify_num_outbound_flow_cache_entries(0) - - # create the packet stream - packets = self.create_stream(self.pg0, self.pg1, pkt_count) - # add the stream to the source interface + enable capture - self.pg0.add_stream(packets) - self.pg0.enable_capture() - self.pg1.enable_capture() - # start the packet generator - self.pg_start() - # get capture - capture = self.pg1.get_capture() - for packet in capture: - try: - self.logger.debug(ppp("SPD - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - - # assert nothing captured on pg0 - self.pg0.assert_nothing_captured() - # verify capture on pg1 - self.logger.debug("SPD: Num packets: %s", len(capture.res)) - self.verify_capture(self.pg0, self.pg1, capture) - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(0, policy_1) - # check policy in SPD has been cached after traffic - # matched BYPASS rule in SPD - self.verify_num_outbound_flow_cache_entries(1) - - # now remove the bypass rule - self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - remove=True, - ) - # verify flow cache counter has been reset by rule removal - self.verify_num_outbound_flow_cache_entries(0) - - # resend the same packets - self.pg0.add_stream(packets) - self.pg0.enable_capture() # flush the old captures - self.pg1.enable_capture() - self.pg_start() - # assert nothing captured on pg0 - self.pg0.assert_nothing_captured() - # all packets will be dropped by SPD rule - self.pg1.assert_nothing_captured() - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(pkt_count, policy_1) - # previous stale entry in flow cache should have been overwritten, - # with one active entry - self.verify_num_outbound_flow_cache_entries(1) - - -class IPSec4SpdTestCaseReaddOutbound(SpdFlowCacheOutbound): - """ IPSec/IPv4 outbound: Policy mode test case with flow cache \ - (add, remove, re-add)""" - - def test_ipsec_spd_outbound_readd(self): - # In this test case, packets in IPv4 FWD path are configured - # to go through IPSec outbound SPD policy lookup. - # 2 SPD rules (1 HIGH and 1 LOW) are added. - # High priority rule action is set to BYPASS. - # Low priority rule action is set to DISCARD. - # Traffic sent on pg0 interface should match high priority - # rule and should be sent out on pg1 interface. - # High priority rule is then removed. - # Traffic sent on pg0 interface should match low priority - # rule and should be discarded after SPD lookup. - # Readd high priority rule. - # Traffic sent on pg0 interface should match high priority - # rule and should be sent out on pg1 interface. - self.create_interfaces(2) - pkt_count = 5 - self.spd_create_and_intf_add(1, [self.pg1]) - policy_0 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ) - policy_1 = self.spd_add_rem_policy( # outbound, priority 5 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=5, - policy_type="discard", - ) - - # check flow cache is empty before sending traffic - self.verify_num_outbound_flow_cache_entries(0) - - # create the packet stream - packets = self.create_stream(self.pg0, self.pg1, pkt_count) - # add the stream to the source interface + enable capture - self.pg0.add_stream(packets) - self.pg0.enable_capture() - self.pg1.enable_capture() - # start the packet generator - self.pg_start() - # get capture - capture = self.pg1.get_capture() - for packet in capture: - try: - self.logger.debug(ppp("SPD - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - self.logger.debug("SPD: Num packets: %s", len(capture.res)) - - # assert nothing captured on pg0 - self.pg0.assert_nothing_captured() - # verify capture on pg1 - self.verify_capture(self.pg0, self.pg1, capture) - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(0, policy_1) - # check policy in SPD has been cached after traffic - # matched BYPASS rule in SPD - self.verify_num_outbound_flow_cache_entries(1) - - # now remove the bypass rule, leaving only the discard rule - self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - remove=True, - ) - # verify flow cache counter has been reset by rule removal - self.verify_num_outbound_flow_cache_entries(0) - - # resend the same packets - self.pg0.add_stream(packets) - self.pg0.enable_capture() # flush the old captures - self.pg1.enable_capture() - self.pg_start() - - # assert nothing captured on pg0 - self.pg0.assert_nothing_captured() - # all packets will be dropped by SPD rule - self.pg1.assert_nothing_captured() - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(pkt_count, policy_1) - # previous stale entry in flow cache should have been overwritten - self.verify_num_outbound_flow_cache_entries(1) - - # now readd the bypass rule - policy_0 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ) - # verify flow cache counter has been reset by rule addition - self.verify_num_outbound_flow_cache_entries(0) - - # resend the same packets - self.pg0.add_stream(packets) - self.pg0.enable_capture() # flush the old captures - self.pg1.enable_capture() - self.pg_start() - - # get capture - capture = self.pg1.get_capture(pkt_count) - for packet in capture: - try: - self.logger.debug(ppp("SPD - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - self.logger.debug("SPD: Num packets: %s", len(capture.res)) - - # assert nothing captured on pg0 - self.pg0.assert_nothing_captured() - # verify captured packets - self.verify_capture(self.pg0, self.pg1, capture) - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(pkt_count, policy_1) - # previous stale entry in flow cache should have been overwritten - self.verify_num_outbound_flow_cache_entries(1) - - -class IPSec4SpdTestCaseMultipleOutbound(SpdFlowCacheOutbound): - """ IPSec/IPv4 outbound: Policy mode test case with flow cache \ - (multiple interfaces, multiple rules)""" - - def test_ipsec_spd_outbound_multiple(self): - # In this test case, packets in IPv4 FWD path are configured to go - # through IPSec outbound SPD policy lookup. - # Multiples rules on multiple interfaces are tested at the same time. - # 3x interfaces are configured, binding the same SPD to each. - # Each interface has 2 SPD rules (1 BYPASS and 1 DISCARD). - # On pg0 & pg1, the BYPASS rule is HIGH priority - # On pg2, the DISCARD rule is HIGH priority - # Traffic should be received on pg0 & pg1 and dropped on pg2. - self.create_interfaces(3) - pkt_count = 5 - # bind SPD to all interfaces - self.spd_create_and_intf_add(1, self.pg_interfaces) - # add rules on all interfaces - policy_01 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ) - policy_02 = self.spd_add_rem_policy( # outbound, priority 5 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=5, - policy_type="discard", - ) - - policy_11 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg1, - self.pg2, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ) - policy_12 = self.spd_add_rem_policy( # outbound, priority 5 - 1, - self.pg1, - self.pg2, - socket.IPPROTO_UDP, - is_out=1, - priority=5, - policy_type="discard", - ) - - policy_21 = self.spd_add_rem_policy( # outbound, priority 5 - 1, - self.pg2, - self.pg0, - socket.IPPROTO_UDP, - is_out=1, - priority=5, - policy_type="bypass", - ) - policy_22 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg2, - self.pg0, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="discard", - ) - - # interfaces bound to an SPD, will by default drop inbound - # traffic with no matching policies. add catch-all inbound - # bypass rule to SPD: - self.spd_add_rem_policy( # inbound, all interfaces - 1, - None, - None, - socket.IPPROTO_UDP, - is_out=0, - priority=10, - policy_type="bypass", - all_ips=True, - ) - - # check flow cache is empty (0 active elements) before sending traffic - self.verify_num_outbound_flow_cache_entries(0) - - # create the packet streams - packets0 = self.create_stream(self.pg0, self.pg1, pkt_count) - packets1 = self.create_stream(self.pg1, self.pg2, pkt_count) - packets2 = self.create_stream(self.pg2, self.pg0, pkt_count) - # add the streams to the source interfaces - self.pg0.add_stream(packets0) - self.pg1.add_stream(packets1) - self.pg2.add_stream(packets2) - # enable capture on all interfaces - for pg in self.pg_interfaces: - pg.enable_capture() - # start the packet generator - self.pg_start() - - # get captures - if_caps = [] - for pg in [self.pg1, self.pg2]: # we are expecting captures on pg1/pg2 - if_caps.append(pg.get_capture()) - for packet in if_caps[-1]: - try: - self.logger.debug(ppp("SPD - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - self.logger.debug("SPD: Num packets: %s", len(if_caps[0].res)) - self.logger.debug("SPD: Num packets: %s", len(if_caps[1].res)) - - # verify captures that matched BYPASS rule - self.verify_capture(self.pg0, self.pg1, if_caps[0]) - self.verify_capture(self.pg1, self.pg2, if_caps[1]) - # verify that traffic to pg0 matched DISCARD rule and was dropped - self.pg0.assert_nothing_captured() - # verify all packets that were expected to match rules, matched - # pg0 -> pg1 - self.verify_policy_match(pkt_count, policy_01) - self.verify_policy_match(0, policy_02) - # pg1 -> pg2 - self.verify_policy_match(pkt_count, policy_11) - self.verify_policy_match(0, policy_12) - # pg2 -> pg0 - self.verify_policy_match(0, policy_21) - self.verify_policy_match(pkt_count, policy_22) - # check that 3 matching policies in SPD have been cached - self.verify_num_outbound_flow_cache_entries(3) - - -class IPSec4SpdTestCaseOverwriteStaleOutbound(SpdFlowCacheOutbound): - """ IPSec/IPv4 outbound: Policy mode test case with flow cache \ - (overwrite stale entries)""" - - def test_ipsec_spd_outbound_overwrite(self): - # The operation of the flow cache is setup so that the entire cache - # is invalidated when adding or removing an SPD policy rule. - # For performance, old cache entries are not zero'd, but remain - # in the table as "stale" entries. If a flow matches a stale entry, - # and the epoch count does NOT match the current count, the entry - # is overwritten. - # In this test, 3 active rules are created and matched to enter - # them into the flow cache. - # A single entry is removed to invalidate the entire cache. - # We then readd the rule and test that overwriting of the previous - # stale entries occurs as expected, and that the flow cache entry - # counter is updated correctly. - self.create_interfaces(3) - pkt_count = 2 - # bind SPD to all interfaces - self.spd_create_and_intf_add(1, self.pg_interfaces) - # add output rules on all interfaces - # pg0 -> pg1 - policy_0 = self.spd_add_rem_policy( # outbound - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ) - # pg1 -> pg2 - policy_1 = self.spd_add_rem_policy( # outbound - 1, - self.pg1, - self.pg2, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ) - # pg2 -> pg0 - policy_2 = self.spd_add_rem_policy( # outbound - 1, - self.pg2, - self.pg0, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="discard", - ) - - # interfaces bound to an SPD, will by default drop inbound - # traffic with no matching policies. add catch-all inbound - # bypass rule to SPD: - self.spd_add_rem_policy( # inbound, all interfaces - 1, - None, - None, - socket.IPPROTO_UDP, - is_out=0, - priority=10, - policy_type="bypass", - all_ips=True, - ) - - # check flow cache is empty (0 active elements) before sending traffic - self.verify_num_outbound_flow_cache_entries(0) - - # create the packet streams - packets0 = self.create_stream(self.pg0, self.pg1, pkt_count) - packets1 = self.create_stream(self.pg1, self.pg2, pkt_count) - packets2 = self.create_stream(self.pg2, self.pg0, pkt_count) - # add the streams to the source interfaces - self.pg0.add_stream(packets0) - self.pg1.add_stream(packets1) - self.pg2.add_stream(packets2) - # enable capture on all interfaces - for pg in self.pg_interfaces: - pg.enable_capture() - # start the packet generator - self.pg_start() - - # get captures from ifs - if_caps = [] - for pg in [self.pg1, self.pg2]: # we are expecting captures on pg1/pg2 - if_caps.append(pg.get_capture()) - for packet in if_caps[-1]: - try: - self.logger.debug(ppp("SPD Add - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - - # verify captures that matched BYPASS rules - self.verify_capture(self.pg0, self.pg1, if_caps[0]) - self.verify_capture(self.pg1, self.pg2, if_caps[1]) - # verify that traffic to pg0 matched DISCARD rule and was dropped - self.pg0.assert_nothing_captured() - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(pkt_count, policy_1) - self.verify_policy_match(pkt_count, policy_2) - # check flow/policy match was cached for: 3x output policies - self.verify_num_outbound_flow_cache_entries(3) - - # adding an inbound policy should not invalidate output flow cache - self.spd_add_rem_policy( # inbound - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=0, - priority=10, - policy_type="bypass", - ) - # check flow cache counter has not been reset - self.verify_num_outbound_flow_cache_entries(3) - - # remove a bypass policy - flow cache counter will be reset, and - # there will be 3x stale entries in flow cache - self.spd_add_rem_policy( # outbound - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - remove=True, - ) - # readd policy - policy_0 = self.spd_add_rem_policy( # outbound - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ) - # check counter was reset with flow cache invalidation - self.verify_num_outbound_flow_cache_entries(0) - - # resend the same packets - self.pg0.add_stream(packets0) - self.pg1.add_stream(packets1) - self.pg2.add_stream(packets2) - for pg in self.pg_interfaces: - pg.enable_capture() # flush previous captures - self.pg_start() - - # get captures from ifs - if_caps = [] - for pg in [self.pg1, self.pg2]: # we are expecting captures on pg1/pg2 - if_caps.append(pg.get_capture()) - for packet in if_caps[-1]: - try: - self.logger.debug(ppp("SPD Add - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - - # verify captures that matched BYPASS rules - self.verify_capture(self.pg0, self.pg1, if_caps[0]) - self.verify_capture(self.pg1, self.pg2, if_caps[1]) - # verify that traffic to pg0 matched DISCARD rule and was dropped - self.pg0.assert_nothing_captured() - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(pkt_count * 2, policy_1) - self.verify_policy_match(pkt_count * 2, policy_2) - # we are overwriting 3x stale entries - check flow cache counter - # is correct - self.verify_num_outbound_flow_cache_entries(3) - - -class IPSec4SpdTestCaseCollisionOutbound(SpdFlowCacheOutbound): - """ IPSec/IPv4 outbound: Policy mode test case with flow cache \ - (hash collision)""" - - # Override class setup to restrict vector size to 16 elements. - # This forces using only the lower 4 bits of the hash as a key, - # making hash collisions easy to find. - @classmethod - def setUpConstants(cls): - super(SpdFlowCacheOutbound, cls).setUpConstants() - cls.vpp_cmdline.extend( - [ - "ipsec", - "{", - "ipv4-outbound-spd-flow-cache on", - "ipv4-outbound-spd-hash-buckets 16", - "}", - ] - ) - cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline)) - - def test_ipsec_spd_outbound_collision(self): - # The flow cache operation is setup to overwrite an entry - # if a hash collision occurs. - # In this test, 2 packets are configured that result in a - # hash with the same lower 4 bits. - # After the first packet is received, there should be one - # active entry in the flow cache. - # After the second packet with the same lower 4 bit hash - # is received, this should overwrite the same entry. - # Therefore there will still be a total of one (1) entry, - # in the flow cache with two matching policies. - # crc32_supported() method is used to check cpu for crc32 - # intrinsic support for hashing. - # If crc32 is not supported, we fall back to clib_xxhash() - self.create_interfaces(3) - pkt_count = 5 - # bind SPD to all interfaces - self.spd_create_and_intf_add(1, self.pg_interfaces) - # add rules - policy_0 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg1, - self.pg2, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ) - policy_1 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg2, - self.pg0, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ) - - # interfaces bound to an SPD, will by default drop inbound - # traffic with no matching policies. add catch-all inbound - # bypass rule to SPD: - self.spd_add_rem_policy( # inbound, all interfaces - 1, - None, - None, - socket.IPPROTO_UDP, - is_out=0, - priority=10, - policy_type="bypass", - all_ips=True, - ) - - # check flow cache is empty (0 active elements) before sending traffic - self.verify_num_outbound_flow_cache_entries(0) - - # create the packet streams generating collision on last 4 bits - if self.crc32_supported(): - # packet hashes to: - # 432c99c2 - packets1 = self.create_stream(self.pg1, self.pg2, pkt_count, 1, 1) - # 31f8f3f2 - packets2 = self.create_stream(self.pg2, self.pg0, pkt_count, 6, 6) - else: # clib_xxhash - # ec3a258551bc0306 - packets1 = self.create_stream(self.pg1, self.pg2, pkt_count, 2, 2) - # 61fee526d18d7a6 - packets2 = self.create_stream(self.pg2, self.pg0, pkt_count, 3, 3) - - # add the streams to the source interfaces - self.pg1.add_stream(packets1) - self.pg2.add_stream(packets2) - # enable capture on all interfaces - for pg in self.pg_interfaces: - pg.enable_capture() - # start the packet generator - self.pg_start() - - # get captures from ifs - the proper pkt_count of packets was saved by - # create_packet_info() based on dst_if parameter - if_caps = [] - for pg in [self.pg2, self.pg0]: # we are expecting captures on pg2/pg0 - if_caps.append(pg.get_capture()) - for packet in if_caps[-1]: - try: - self.logger.debug(ppp("SPD - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - self.logger.debug("SPD: Num packets: %s", len(if_caps[0].res)) - self.logger.debug("SPD: Num packets: %s", len(if_caps[1].res)) - - # verify captures that matched BYPASS rule - self.verify_capture(self.pg1, self.pg2, if_caps[0]) - self.verify_capture(self.pg2, self.pg0, if_caps[1]) - # verify all packets that were expected to match rules, matched - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(pkt_count, policy_1) - # we have matched 2 policies, but due to the hash collision - # one active entry is expected - self.verify_num_outbound_flow_cache_entries(1) - - -if __name__ == "__main__": - unittest.main(testRunner=VppTestRunner) diff --git a/test/asf/test_ipsec_spd_fp_output.py b/test/asf/test_ipsec_spd_fp_output.py deleted file mode 100644 index a92669a4f3f..00000000000 --- a/test/asf/test_ipsec_spd_fp_output.py +++ /dev/null @@ -1,1418 +0,0 @@ -import socket -import unittest -import ipaddress - -from util import ppp -from asfframework import VppTestRunner -from template_ipsec import IPSecIPv4Fwd -from template_ipsec import IPSecIPv6Fwd - - -class SpdFastPathOutbound(IPSecIPv4Fwd): - # Override setUpConstants to enable outbound fast path in config - @classmethod - def setUpConstants(cls): - super(SpdFastPathOutbound, cls).setUpConstants() - cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-outbound-spd-fast-path on", "}"]) - cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline)) - - -class SpdFastPathIPv6Outbound(IPSecIPv6Fwd): - # Override setUpConstants to enable outbound fast path in config - @classmethod - def setUpConstants(cls): - super(SpdFastPathIPv6Outbound, cls).setUpConstants() - cls.vpp_cmdline.extend(["ipsec", "{", "ipv6-outbound-spd-fast-path on", "}"]) - cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline)) - - -class IPSec4SpdTestCaseAdd(SpdFastPathOutbound): - """ IPSec/IPv4 outbound: Policy mode test case with fast path \ - (add rule)""" - - def test_ipsec_spd_outbound_add(self): - # In this test case, packets in IPv4 FWD path are configured - # to go through IPSec outbound SPD policy lookup. - # 2 SPD rules (1 HIGH and 1 LOW) are added. - # High priority rule action is set to BYPASS. - # Low priority rule action is set to DISCARD. - # Traffic sent on pg0 interface should match high priority - # rule and should be sent out on pg1 interface. - self.create_interfaces(2) - pkt_count = 5 - s_port_s = 1111 - s_port_e = 1111 - d_port_s = 2222 - d_port_e = 2222 - self.spd_create_and_intf_add(1, [self.pg1]) - policy_0 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - local_port_start=s_port_s, - local_port_stop=s_port_e, - remote_port_start=d_port_s, - remote_port_stop=d_port_e, - ) - policy_1 = self.spd_add_rem_policy( # outbound, priority 5 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=5, - policy_type="discard", - local_port_start=s_port_s, - local_port_stop=s_port_e, - remote_port_start=d_port_s, - remote_port_stop=d_port_e, - ) - - # create the packet stream - packets = self.create_stream(self.pg0, self.pg1, pkt_count, s_port_s, d_port_s) - # add the stream to the source interface + enable capture - self.pg0.add_stream(packets) - self.pg0.enable_capture() - self.pg1.enable_capture() - # start the packet generator - self.pg_start() - # get capture - capture = self.pg1.get_capture() - for packet in capture: - try: - self.logger.debug(ppp("SPD - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - self.logger.debug("SPD: Num packets: %s", len(capture.res)) - - # assert nothing captured on pg0 - self.pg0.assert_nothing_captured() - # verify captured packets - self.verify_capture(self.pg0, self.pg1, capture) - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(0, policy_1) - - -class IPSec4SpdTestCaseAddPortRange(SpdFastPathOutbound): - """ IPSec/IPv4 outbound: Policy mode test case with fast path \ - (add all ips port range rule)""" - - def test_ipsec_spd_outbound_add(self): - # In this test case, packets in IPv4 FWD path are configured - # to go through IPSec outbound SPD policy lookup. - # 2 SPD rules (1 HIGH and 1 LOW) are added. - # High priority rule action is set to BYPASS. - # Low priority rule action is set to DISCARD. - # Traffic sent on pg0 interface should match high priority - # rule and should be sent out on pg1 interface. - self.create_interfaces(2) - pkt_count = 5 - s_port_s = 1000 - s_port_e = 2023 - d_port_s = 5000 - d_port_e = 6023 - self.spd_create_and_intf_add(1, [self.pg1]) - policy_0 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - all_ips=True, - local_port_start=s_port_s, - local_port_stop=s_port_e, - remote_port_start=d_port_s, - remote_port_stop=d_port_e, - ) - policy_1 = self.spd_add_rem_policy( # outbound, priority 5 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=5, - policy_type="discard", - all_ips=True, - local_port_start=s_port_s, - local_port_stop=s_port_e, - remote_port_start=d_port_s, - remote_port_stop=d_port_e, - ) - - # create the packet stream - packets = self.create_stream(self.pg0, self.pg1, pkt_count, 1333, 5444) - # add the stream to the source interface + enable capture - self.pg0.add_stream(packets) - self.pg0.enable_capture() - self.pg1.enable_capture() - # start the packet generator - self.pg_start() - # get capture - capture = self.pg1.get_capture() - for packet in capture: - try: - self.logger.debug(ppp("SPD - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - self.logger.debug("SPD: Num packets: %s", len(capture.res)) - - # assert nothing captured on pg0 - self.pg0.assert_nothing_captured() - # verify captured packets - self.verify_capture(self.pg0, self.pg1, capture) - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(0, policy_1) - - -class IPSec4SpdTestCaseAddIPRange(SpdFastPathOutbound): - """ IPSec/IPv4 outbound: Policy mode test case with fast path \ - (add ips range with any port rule)""" - - def test_ipsec_spd_outbound_add(self): - # In this test case, packets in IPv4 FWD path are configured - # to go through IPSec outbound SPD policy lookup. - # 2 SPD rules (1 HIGH and 1 LOW) are added. - # High priority rule action is set to BYPASS. - # Low priority rule action is set to DISCARD. - # Traffic sent on pg0 interface should match high priority - # rule and should be sent out on pg1 interface. - self.create_interfaces(2) - pkt_count = 5 - s_ip_s = ipaddress.ip_address(self.pg0.remote_ip4) - s_ip_e = ipaddress.ip_address(int(s_ip_s) + 5) - d_ip_s = ipaddress.ip_address(self.pg1.remote_ip4) - d_ip_e = ipaddress.ip_address(int(d_ip_s) + 0) - self.spd_create_and_intf_add(1, [self.pg1]) - policy_0 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ip_range=True, - local_ip_start=s_ip_s, - local_ip_stop=s_ip_e, - remote_ip_start=d_ip_s, - remote_ip_stop=d_ip_e, - ) - policy_1 = self.spd_add_rem_policy( # outbound, priority 5 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=5, - policy_type="discard", - ip_range=True, - local_ip_start=s_ip_s, - local_ip_stop=s_ip_e, - remote_ip_start=d_ip_s, - remote_ip_stop=d_ip_e, - ) - - # create the packet stream - packets = self.create_stream(self.pg0, self.pg1, pkt_count) - # add the stream to the source interface + enable capture - self.pg0.add_stream(packets) - self.pg0.enable_capture() - self.pg1.enable_capture() - # start the packet generator - self.pg_start() - # get capture - capture = self.pg1.get_capture() - for packet in capture: - try: - self.logger.debug(ppp("SPD - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - self.logger.debug("SPD: Num packets: %s", len(capture.res)) - - # assert nothing captured on pg0 - self.pg0.assert_nothing_captured() - # verify captured packets - self.verify_capture(self.pg0, self.pg1, capture) - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(0, policy_1) - - -class IPSec4SpdTestCaseAddIPAndPortRange(SpdFastPathOutbound): - """ IPSec/IPv4 outbound: Policy mode test case with fast path \ - (add all ips range rule)""" - - def test_ipsec_spd_outbound_add(self): - # In this test case, packets in IPv4 FWD path are configured - # to go through IPSec outbound SPD policy lookup. - # 2 SPD rules (1 HIGH and 1 LOW) are added. - # High priority rule action is set to BYPASS. - # Low priority rule action is set to DISCARD. - # Traffic sent on pg0 interface should match high priority - # rule and should be sent out on pg1 interface. - # in this test we define ranges of ports and ip addresses. - self.create_interfaces(2) - pkt_count = 5 - s_port_s = 1000 - s_port_e = 1000 + 1023 - d_port_s = 5000 - d_port_e = 5000 + 1023 - - s_ip_s = ipaddress.ip_address( - int(ipaddress.ip_address(self.pg0.remote_ip4)) - 24 - ) - s_ip_e = ipaddress.ip_address(int(s_ip_s) + 255) - d_ip_s = ipaddress.ip_address(self.pg1.remote_ip4) - d_ip_e = ipaddress.ip_address(int(d_ip_s) + 255) - self.spd_create_and_intf_add(1, [self.pg1]) - policy_0 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ip_range=True, - local_ip_start=s_ip_s, - local_ip_stop=s_ip_e, - remote_ip_start=d_ip_s, - remote_ip_stop=d_ip_e, - local_port_start=s_port_s, - local_port_stop=s_port_e, - remote_port_start=d_port_s, - remote_port_stop=d_port_e, - ) - policy_1 = self.spd_add_rem_policy( # outbound, priority 5 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=5, - policy_type="discard", - ip_range=True, - local_ip_start=s_ip_s, - local_ip_stop=s_ip_e, - remote_ip_start=d_ip_s, - remote_ip_stop=d_ip_e, - local_port_start=s_port_s, - local_port_stop=s_port_e, - remote_port_start=d_port_s, - remote_port_stop=d_port_e, - ) - - # create the packet stream - packets = self.create_stream(self.pg0, self.pg1, pkt_count) - # add the stream to the source interface + enable capture - self.pg0.add_stream(packets) - self.pg0.enable_capture() - self.pg1.enable_capture() - # start the packet generator - self.pg_start() - # get capture - capture = self.pg1.get_capture() - for packet in capture: - try: - self.logger.debug(ppp("SPD - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - self.logger.debug("SPD: Num packets: %s", len(capture.res)) - - # assert nothing captured on pg0 - self.pg0.assert_nothing_captured() - # verify captured packets - self.verify_capture(self.pg0, self.pg1, capture) - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(0, policy_1) - - -class IPSec4SpdTestCaseAddAll(SpdFastPathOutbound): - """ IPSec/IPv4 outbound: Policy mode test case with fast path \ - (add all ips ports rule)""" - - def test_ipsec_spd_outbound_add(self): - # In this test case, packets in IPv4 FWD path are configured - # to go through IPSec outbound SPD policy lookup. - # 2 SPD rules (1 HIGH and 1 LOW) are added. - # Low priority rule action is set to BYPASS all ips. - # High priority rule action is set to DISCARD all ips. - # Traffic sent on pg0 interface when LOW priority rule is added, - # expect the packet is being sent out to pg1. Then HIGH priority - # rule is added and send the same traffic to pg0, this time expect - # the traffic is dropped. - self.create_interfaces(2) - pkt_count = 5 - self.spd_create_and_intf_add(1, [self.pg1]) - policy_0 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - all_ips=True, - ) - - # create the packet stream - packets = self.create_stream(self.pg0, self.pg1, pkt_count) - # add the stream to the source interface + enable capture - self.pg0.add_stream(packets) - self.pg0.enable_capture() - self.pg1.enable_capture() - # start the packet generator - self.pg_start() - # get capture - capture = self.pg1.get_capture() - for packet in capture: - try: - self.logger.debug(ppp("SPD - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - self.logger.debug("SPD: Num packets: %s", len(capture.res)) - - # assert nothing captured on pg0 - self.pg0.assert_nothing_captured() - # verify captured packets - self.verify_capture(self.pg0, self.pg1, capture) - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - - policy_1 = self.spd_add_rem_policy( # outbound, priority 20 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=20, - policy_type="discard", - all_ips=True, - ) - - # create the packet stream - packets = self.create_stream(self.pg0, self.pg1, pkt_count) - # add the stream to the source interface + enable capture - self.pg0.add_stream(packets) - self.pg0.enable_capture() - self.pg1.enable_capture() - # start the packet generator - self.pg_start() - # assert nothing captured on pg0 and pg1 - self.pg0.assert_nothing_captured() - self.pg1.assert_nothing_captured() - - -class IPSec4SpdTestCaseRemove(SpdFastPathOutbound): - """ IPSec/IPv4 outbound: Policy mode test case with fast path \ - (remove rule)""" - - def test_ipsec_spd_outbound_remove(self): - # In this test case, packets in IPv4 FWD path are configured - # to go through IPSec outbound SPD policy lookup. - # 2 SPD rules (1 HIGH and 1 LOW) are added. - # High priority rule action is set to BYPASS. - # Low priority rule action is set to DISCARD. - # High priority rule is then removed. - # Traffic sent on pg0 interface should match low priority - # rule and should be discarded after SPD lookup. - self.create_interfaces(2) - pkt_count = 5 - self.spd_create_and_intf_add(1, [self.pg1]) - policy_0 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ) - policy_1 = self.spd_add_rem_policy( # outbound, priority 5 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=5, - policy_type="discard", - ) - - # create the packet stream - packets = self.create_stream(self.pg0, self.pg1, pkt_count) - # add the stream to the source interface + enable capture - self.pg0.add_stream(packets) - self.pg0.enable_capture() - self.pg1.enable_capture() - # start the packet generator - self.pg_start() - # get capture - capture = self.pg1.get_capture() - for packet in capture: - try: - self.logger.debug(ppp("SPD - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - - # assert nothing captured on pg0 - self.pg0.assert_nothing_captured() - # verify capture on pg1 - self.logger.debug("SPD: Num packets: %s", len(capture.res)) - self.verify_capture(self.pg0, self.pg1, capture) - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(0, policy_1) - # now remove the bypass rule - self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - remove=True, - ) - - # resend the same packets - self.pg0.add_stream(packets) - self.pg0.enable_capture() # flush the old captures - self.pg1.enable_capture() - self.pg_start() - # assert nothing captured on pg0 - self.pg0.assert_nothing_captured() - # all packets will be dropped by SPD rule - self.pg1.assert_nothing_captured() - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(pkt_count, policy_1) - - -class IPSec4SpdTestCaseReadd(SpdFastPathOutbound): - """ IPSec/IPv4 outbound: Policy mode test case with fast path \ - (add, remove, re-add)""" - - def test_ipsec_spd_outbound_readd(self): - # In this test case, packets in IPv4 FWD path are configured - # to go through IPSec outbound SPD policy lookup. - # 2 SPD rules (1 HIGH and 1 LOW) are added. - # High priority rule action is set to BYPASS. - # Low priority rule action is set to DISCARD. - # Traffic sent on pg0 interface should match high priority - # rule and should be sent out on pg1 interface. - # High priority rule is then removed. - # Traffic sent on pg0 interface should match low priority - # rule and should be discarded after SPD lookup. - # Readd high priority rule. - # Traffic sent on pg0 interface should match high priority - # rule and should be sent out on pg1 interface. - self.create_interfaces(2) - pkt_count = 5 - self.spd_create_and_intf_add(1, [self.pg1]) - policy_0 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ) - policy_1 = self.spd_add_rem_policy( # outbound, priority 5 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=5, - policy_type="discard", - ) - - # create the packet stream - packets = self.create_stream(self.pg0, self.pg1, pkt_count) - # add the stream to the source interface + enable capture - self.pg0.add_stream(packets) - self.pg0.enable_capture() - self.pg1.enable_capture() - # start the packet generator - self.pg_start() - # get capture - capture = self.pg1.get_capture() - for packet in capture: - try: - self.logger.debug(ppp("SPD - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - self.logger.debug("SPD: Num packets: %s", len(capture.res)) - - # assert nothing captured on pg0 - self.pg0.assert_nothing_captured() - # verify capture on pg1 - self.verify_capture(self.pg0, self.pg1, capture) - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(0, policy_1) - # remove the bypass rule, leaving only the discard rule - self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - remove=True, - ) - - # resend the same packets - self.pg0.add_stream(packets) - self.pg0.enable_capture() # flush the old captures - self.pg1.enable_capture() - self.pg_start() - - # assert nothing captured on pg0 - self.pg0.assert_nothing_captured() - # all packets will be dropped by SPD rule - self.pg1.assert_nothing_captured() - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(pkt_count, policy_1) - - # now readd the bypass rule - policy_0 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ) - - # resend the same packets - self.pg0.add_stream(packets) - self.pg0.enable_capture() # flush the old captures - self.pg1.enable_capture() - self.pg_start() - - # get capture - capture = self.pg1.get_capture(pkt_count) - for packet in capture: - try: - self.logger.debug(ppp("SPD - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - self.logger.debug("SPD: Num packets: %s", len(capture.res)) - - # assert nothing captured on pg0 - self.pg0.assert_nothing_captured() - # verify captured packets - self.verify_capture(self.pg0, self.pg1, capture) - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(pkt_count, policy_1) - - -class IPSec4SpdTestCaseMultiple(SpdFastPathOutbound): - """ IPSec/IPv4 outbound: Policy mode test case with fast path \ - (multiple interfaces, multiple rules)""" - - def test_ipsec_spd_outbound_multiple(self): - # In this test case, packets in IPv4 FWD path are configured to go - # through IPSec outbound SPD policy lookup. - # Multiples rules on multiple interfaces are tested at the same time. - # 3x interfaces are configured, binding the same SPD to each. - # Each interface has 2 SPD rules (1 BYPASS and 1 DISCARD). - # On pg0 & pg1, the BYPASS rule is HIGH priority - # On pg2, the DISCARD rule is HIGH priority - # Traffic should be received on pg0 & pg1 and dropped on pg2. - self.create_interfaces(3) - pkt_count = 5 - # bind SPD to all interfaces - self.spd_create_and_intf_add(1, self.pg_interfaces) - # add rules on all interfaces - policy_01 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ) - policy_02 = self.spd_add_rem_policy( # outbound, priority 5 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=5, - policy_type="discard", - ) - - policy_11 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg1, - self.pg2, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ) - policy_12 = self.spd_add_rem_policy( # outbound, priority 5 - 1, - self.pg1, - self.pg2, - socket.IPPROTO_UDP, - is_out=1, - priority=5, - policy_type="discard", - ) - - policy_21 = self.spd_add_rem_policy( # outbound, priority 5 - 1, - self.pg2, - self.pg0, - socket.IPPROTO_UDP, - is_out=1, - priority=5, - policy_type="bypass", - ) - policy_22 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg2, - self.pg0, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="discard", - ) - - # interfaces bound to an SPD, will by default drop inbound - # traffic with no matching policies. add catch-all inbound - # bypass rule to SPD: - self.spd_add_rem_policy( # inbound, all interfaces - 1, - None, - None, - socket.IPPROTO_UDP, - is_out=0, - priority=10, - policy_type="bypass", - all_ips=True, - ) - - # create the packet streams - packets0 = self.create_stream(self.pg0, self.pg1, pkt_count) - packets1 = self.create_stream(self.pg1, self.pg2, pkt_count) - packets2 = self.create_stream(self.pg2, self.pg0, pkt_count) - # add the streams to the source interfaces - self.pg0.add_stream(packets0) - self.pg1.add_stream(packets1) - self.pg2.add_stream(packets2) - # enable capture on all interfaces - for pg in self.pg_interfaces: - pg.enable_capture() - # start the packet generator - self.pg_start() - - # get captures - if_caps = [] - for pg in [self.pg1, self.pg2]: # we are expecting captures on pg1/pg2 - if_caps.append(pg.get_capture()) - for packet in if_caps[-1]: - try: - self.logger.debug(ppp("SPD - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - self.logger.debug("SPD: Num packets: %s", len(if_caps[0].res)) - self.logger.debug("SPD: Num packets: %s", len(if_caps[1].res)) - - # verify captures that matched BYPASS rule - self.verify_capture(self.pg0, self.pg1, if_caps[0]) - self.verify_capture(self.pg1, self.pg2, if_caps[1]) - # verify that traffic to pg0 matched DISCARD rule and was dropped - self.pg0.assert_nothing_captured() - # verify all packets that were expected to match rules, matched - # pg0 -> pg1 - self.verify_policy_match(pkt_count, policy_01) - self.verify_policy_match(0, policy_02) - # pg1 -> pg2 - self.verify_policy_match(pkt_count, policy_11) - self.verify_policy_match(0, policy_12) - # pg2 -> pg0 - self.verify_policy_match(0, policy_21) - self.verify_policy_match(pkt_count, policy_22) - - -class IPSec6SpdTestCaseAdd(SpdFastPathIPv6Outbound): - """ IPSec/IPv6 outbound: Policy mode test case with fast path \ - (add rule)""" - - def test_ipsec_spd_outbound_add(self): - # In this test case, packets in IPv4 FWD path are configured - # to go through IPSec outbound SPD policy lookup. - # 2 SPD rules (1 HIGH and 1 LOW) are added. - # High priority rule action is set to BYPASS. - # Low priority rule action is set to DISCARD. - # Traffic sent on pg0 interface should match high priority - # rule and should be sent out on pg1 interface. - self.create_interfaces(2) - pkt_count = 5 - s_port_s = 1111 - s_port_e = 1111 - d_port_s = 2222 - d_port_e = 2222 - self.spd_create_and_intf_add(1, [self.pg1]) - policy_0 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - local_port_start=s_port_s, - local_port_stop=s_port_e, - remote_port_start=d_port_s, - remote_port_stop=d_port_e, - ) - policy_1 = self.spd_add_rem_policy( # outbound, priority 5 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=5, - policy_type="discard", - local_port_start=s_port_s, - local_port_stop=s_port_e, - remote_port_start=d_port_s, - remote_port_stop=d_port_e, - ) - - # create the packet stream - packets = self.create_stream(self.pg0, self.pg1, pkt_count, s_port_s, d_port_s) - # add the stream to the source interface + enable capture - self.pg0.add_stream(packets) - self.pg0.enable_capture() - self.pg1.enable_capture() - # start the packet generator - self.pg_start() - # get capture - capture = self.pg1.get_capture() - for packet in capture: - try: - self.logger.debug(ppp("SPD - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - self.logger.debug("SPD: Num packets: %s", len(capture.res)) - - # assert nothing captured on pg0 - self.pg0.assert_nothing_captured() - # verify captured packets - self.verify_capture(self.pg0, self.pg1, capture) - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(0, policy_1) - - -class IPSec6SpdTestCaseAddAll(SpdFastPathIPv6Outbound): - """ IPSec/IPv6 outbound: Policy mode test case with fast path \ - (add all ips ports rule)""" - - def test_ipsec_spd_outbound_add(self): - # In this test case, packets in IPv4 FWD path are configured - # to go through IPSec outbound SPD policy lookup. - # 2 SPD rules (1 HIGH and 1 LOW) are added. - # Low priority rule action is set to BYPASS all ips. - # High priority rule action is set to DISCARD all ips. - # Traffic sent on pg0 interface when LOW priority rule is added, - # expect the packet is being sent out to pg1. Then HIGH priority - # rule is added and send the same traffic to pg0, this time expect - # the traffic is dropped. - self.create_interfaces(2) - pkt_count = 5 - self.spd_create_and_intf_add(1, [self.pg1]) - policy_0 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - all_ips=True, - ) - - # create the packet stream - packets = self.create_stream(self.pg0, self.pg1, pkt_count) - # add the stream to the source interface + enable capture - self.pg0.add_stream(packets) - self.pg0.enable_capture() - self.pg1.enable_capture() - # start the packet generator - self.pg_start() - # get capture - capture = self.pg1.get_capture() - for packet in capture: - try: - self.logger.debug(ppp("SPD - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - self.logger.debug("SPD: Num packets: %s", len(capture.res)) - - # assert nothing captured on pg0 - self.pg0.assert_nothing_captured() - # verify captured packets - self.verify_capture(self.pg0, self.pg1, capture) - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - - policy_1 = self.spd_add_rem_policy( # outbound, priority 20 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=20, - policy_type="discard", - all_ips=True, - ) - - # create the packet stream - packets = self.create_stream(self.pg0, self.pg1, pkt_count) - # add the stream to the source interface + enable capture - self.pg0.add_stream(packets) - self.pg0.enable_capture() - self.pg1.enable_capture() - # start the packet generator - self.pg_start() - # assert nothing captured on pg0 and pg1 - self.pg0.assert_nothing_captured() - self.pg1.assert_nothing_captured() - - -class IPSec6SpdTestCaseAddPortRange(SpdFastPathIPv6Outbound): - """ IPSec/IPv6 outbound: Policy mode test case with fast path \ - (add all ips port range rule)""" - - def test_ipsec_spd_outbound_add(self): - # In this test case, packets in IPv4 FWD path are configured - # to go through IPSec outbound SPD policy lookup. - # 2 SPD rules (1 HIGH and 1 LOW) are added. - # High priority rule action is set to BYPASS. - # Low priority rule action is set to DISCARD. - # Traffic sent on pg0 interface should match high priority - # rule and should be sent out on pg1 interface. - self.create_interfaces(2) - pkt_count = 5 - s_port_s = 1000 - s_port_e = 2023 - d_port_s = 5000 - d_port_e = 6023 - self.spd_create_and_intf_add(1, [self.pg1]) - policy_0 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - all_ips=True, - local_port_start=s_port_s, - local_port_stop=s_port_e, - remote_port_start=d_port_s, - remote_port_stop=d_port_e, - ) - policy_1 = self.spd_add_rem_policy( # outbound, priority 5 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=5, - policy_type="discard", - all_ips=True, - local_port_start=s_port_s, - local_port_stop=s_port_e, - remote_port_start=d_port_s, - remote_port_stop=d_port_e, - ) - - # create the packet stream - packets = self.create_stream(self.pg0, self.pg1, pkt_count, 1333, 5444) - # add the stream to the source interface + enable capture - self.pg0.add_stream(packets) - self.pg0.enable_capture() - self.pg1.enable_capture() - # start the packet generator - self.pg_start() - # get capture - capture = self.pg1.get_capture() - for packet in capture: - try: - self.logger.debug(ppp("SPD - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - self.logger.debug("SPD: Num packets: %s", len(capture.res)) - - # assert nothing captured on pg0 - self.pg0.assert_nothing_captured() - # verify captured packets - self.verify_capture(self.pg0, self.pg1, capture) - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(0, policy_1) - - -class IPSec6SpdTestCaseAddIPRange(SpdFastPathIPv6Outbound): - """ IPSec/IPv6 outbound: Policy mode test case with fast path \ - (add ips range with any port rule)""" - - def test_ipsec_spd_outbound_add(self): - # In this test case, packets in IPv4 FWD path are configured - # to go through IPSec outbound SPD policy lookup. - # 2 SPD rules (1 HIGH and 1 LOW) are added. - # High priority rule action is set to BYPASS. - # Low priority rule action is set to DISCARD. - # Traffic sent on pg0 interface should match high priority - # rule and should be sent out on pg1 interface. - self.create_interfaces(2) - pkt_count = 5 - s_ip_s = ipaddress.ip_address(self.pg0.remote_ip6) - s_ip_e = ipaddress.ip_address(int(s_ip_s) + 5) - d_ip_s = ipaddress.ip_address(self.pg1.remote_ip6) - d_ip_e = ipaddress.ip_address(int(d_ip_s) + 0) - self.spd_create_and_intf_add(1, [self.pg1]) - policy_0 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ip_range=True, - local_ip_start=s_ip_s, - local_ip_stop=s_ip_e, - remote_ip_start=d_ip_s, - remote_ip_stop=d_ip_e, - ) - policy_1 = self.spd_add_rem_policy( # outbound, priority 5 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=5, - policy_type="discard", - ip_range=True, - local_ip_start=s_ip_s, - local_ip_stop=s_ip_e, - remote_ip_start=d_ip_s, - remote_ip_stop=d_ip_e, - ) - - # create the packet stream - packets = self.create_stream(self.pg0, self.pg1, pkt_count) - # add the stream to the source interface + enable capture - self.pg0.add_stream(packets) - self.pg0.enable_capture() - self.pg1.enable_capture() - # start the packet generator - self.pg_start() - # get capture - capture = self.pg1.get_capture() - for packet in capture: - try: - self.logger.debug(ppp("SPD - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - self.logger.debug("SPD: Num packets: %s", len(capture.res)) - - # assert nothing captured on pg0 - self.pg0.assert_nothing_captured() - # verify captured packets - self.verify_capture(self.pg0, self.pg1, capture) - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(0, policy_1) - - -class IPSec6SpdTestCaseAddIPAndPortRange(SpdFastPathIPv6Outbound): - """ IPSec/IPvr6 outbound: Policy mode test case with fast path \ - (add all ips range rule)""" - - def test_ipsec_spd_outbound_add(self): - # In this test case, packets in IPv4 FWD path are configured - # to go through IPSec outbound SPD policy lookup. - # 2 SPD rules (1 HIGH and 1 LOW) are added. - # High priority rule action is set to BYPASS. - # Low priority rule action is set to DISCARD. - # Traffic sent on pg0 interface should match high priority - # rule and should be sent out on pg1 interface. - # in this test we define ranges of ports and ip addresses. - self.create_interfaces(2) - pkt_count = 5 - s_port_s = 1000 - s_port_e = 1000 + 1023 - d_port_s = 5000 - d_port_e = 5000 + 1023 - - s_ip_s = ipaddress.ip_address( - int(ipaddress.ip_address(self.pg0.remote_ip6)) - 24 - ) - s_ip_e = ipaddress.ip_address(int(s_ip_s) + 255) - d_ip_s = ipaddress.ip_address(self.pg1.remote_ip6) - d_ip_e = ipaddress.ip_address(int(d_ip_s) + 255) - self.spd_create_and_intf_add(1, [self.pg1]) - policy_0 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ip_range=True, - local_ip_start=s_ip_s, - local_ip_stop=s_ip_e, - remote_ip_start=d_ip_s, - remote_ip_stop=d_ip_e, - local_port_start=s_port_s, - local_port_stop=s_port_e, - remote_port_start=d_port_s, - remote_port_stop=d_port_e, - ) - policy_1 = self.spd_add_rem_policy( # outbound, priority 5 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=5, - policy_type="discard", - ip_range=True, - local_ip_start=s_ip_s, - local_ip_stop=s_ip_e, - remote_ip_start=d_ip_s, - remote_ip_stop=d_ip_e, - local_port_start=s_port_s, - local_port_stop=s_port_e, - remote_port_start=d_port_s, - remote_port_stop=d_port_e, - ) - - # create the packet stream - packets = self.create_stream(self.pg0, self.pg1, pkt_count) - # add the stream to the source interface + enable capture - self.pg0.add_stream(packets) - self.pg0.enable_capture() - self.pg1.enable_capture() - # start the packet generator - self.pg_start() - # get capture - capture = self.pg1.get_capture() - for packet in capture: - try: - self.logger.debug(ppp("SPD - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - self.logger.debug("SPD: Num packets: %s", len(capture.res)) - - # assert nothing captured on pg0 - self.pg0.assert_nothing_captured() - # verify captured packets - self.verify_capture(self.pg0, self.pg1, capture) - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(0, policy_1) - - -class IPSec6SpdTestCaseReadd(SpdFastPathIPv6Outbound): - """ IPSec/IPv6 outbound: Policy mode test case with fast path \ - (add, remove, re-add)""" - - def test_ipsec_spd_outbound_readd(self): - # In this test case, packets in IPv4 FWD path are configured - # to go through IPSec outbound SPD policy lookup. - # 2 SPD rules (1 HIGH and 1 LOW) are added. - # High priority rule action is set to BYPASS. - # Low priority rule action is set to DISCARD. - # Traffic sent on pg0 interface should match high priority - # rule and should be sent out on pg1 interface. - # High priority rule is then removed. - # Traffic sent on pg0 interface should match low priority - # rule and should be discarded after SPD lookup. - # Readd high priority rule. - # Traffic sent on pg0 interface should match high priority - # rule and should be sent out on pg1 interface. - self.create_interfaces(2) - pkt_count = 5 - self.spd_create_and_intf_add(1, [self.pg1]) - policy_0 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ) - policy_1 = self.spd_add_rem_policy( # outbound, priority 5 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=5, - policy_type="discard", - ) - - # create the packet stream - packets = self.create_stream(self.pg0, self.pg1, pkt_count) - # add the stream to the source interface + enable capture - self.pg0.add_stream(packets) - self.pg0.enable_capture() - self.pg1.enable_capture() - # start the packet generator - self.pg_start() - # get capture - capture = self.pg1.get_capture() - for packet in capture: - try: - self.logger.debug(ppp("SPD - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - self.logger.debug("SPD: Num packets: %s", len(capture.res)) - - # assert nothing captured on pg0 - self.pg0.assert_nothing_captured() - # verify capture on pg1 - self.verify_capture(self.pg0, self.pg1, capture) - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(0, policy_1) - # remove the bypass rule, leaving only the discard rule - self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - remove=True, - ) - - # resend the same packets - self.pg0.add_stream(packets) - self.pg0.enable_capture() # flush the old captures - self.pg1.enable_capture() - self.pg_start() - - # assert nothing captured on pg0 - self.pg0.assert_nothing_captured() - # all packets will be dropped by SPD rule - self.pg1.assert_nothing_captured() - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(pkt_count, policy_1) - - # now readd the bypass rule - policy_0 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ) - - # resend the same packets - self.pg0.add_stream(packets) - self.pg0.enable_capture() # flush the old captures - self.pg1.enable_capture() - self.pg_start() - - # get capture - capture = self.pg1.get_capture(pkt_count) - for packet in capture: - try: - self.logger.debug(ppp("SPD - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - self.logger.debug("SPD: Num packets: %s", len(capture.res)) - - # assert nothing captured on pg0 - self.pg0.assert_nothing_captured() - # verify captured packets - self.verify_capture(self.pg0, self.pg1, capture) - # verify all policies matched the expected number of times - self.verify_policy_match(pkt_count, policy_0) - self.verify_policy_match(pkt_count, policy_1) - - -class IPSec6SpdTestCaseMultiple(SpdFastPathIPv6Outbound): - """ IPSec/IPv6 outbound: Policy mode test case with fast path \ - (multiple interfaces, multiple rules)""" - - def test_ipsec_spd_outbound_multiple(self): - # In this test case, packets in IPv4 FWD path are configured to go - # through IPSec outbound SPD policy lookup. - # Multiples rules on multiple interfaces are tested at the same time. - # 3x interfaces are configured, binding the same SPD to each. - # Each interface has 2 SPD rules (1 BYPASS and 1 DISCARD). - # On pg0 & pg1, the BYPASS rule is HIGH priority - # On pg2, the DISCARD rule is HIGH priority - # Traffic should be received on pg0 & pg1 and dropped on pg2. - self.create_interfaces(3) - pkt_count = 5 - # bind SPD to all interfaces - self.spd_create_and_intf_add(1, self.pg_interfaces) - # add rules on all interfaces - policy_01 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ) - policy_02 = self.spd_add_rem_policy( # outbound, priority 5 - 1, - self.pg0, - self.pg1, - socket.IPPROTO_UDP, - is_out=1, - priority=5, - policy_type="discard", - ) - - policy_11 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg1, - self.pg2, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="bypass", - ) - policy_12 = self.spd_add_rem_policy( # outbound, priority 5 - 1, - self.pg1, - self.pg2, - socket.IPPROTO_UDP, - is_out=1, - priority=5, - policy_type="discard", - ) - - policy_21 = self.spd_add_rem_policy( # outbound, priority 5 - 1, - self.pg2, - self.pg0, - socket.IPPROTO_UDP, - is_out=1, - priority=5, - policy_type="bypass", - ) - policy_22 = self.spd_add_rem_policy( # outbound, priority 10 - 1, - self.pg2, - self.pg0, - socket.IPPROTO_UDP, - is_out=1, - priority=10, - policy_type="discard", - ) - - # interfaces bound to an SPD, will by default drop inbound - # traffic with no matching policies. add catch-all inbound - # bypass rule to SPD: - self.spd_add_rem_policy( # inbound, all interfaces - 1, - None, - None, - socket.IPPROTO_UDP, - is_out=0, - priority=10, - policy_type="bypass", - all_ips=True, - ) - - # create the packet streams - packets0 = self.create_stream(self.pg0, self.pg1, pkt_count) - packets1 = self.create_stream(self.pg1, self.pg2, pkt_count) - packets2 = self.create_stream(self.pg2, self.pg0, pkt_count) - # add the streams to the source interfaces - self.pg0.add_stream(packets0) - self.pg1.add_stream(packets1) - self.pg2.add_stream(packets2) - # enable capture on all interfaces - for pg in self.pg_interfaces: - pg.enable_capture() - # start the packet generator - self.pg_start() - - # get captures - if_caps = [] - for pg in [self.pg1, self.pg2]: # we are expecting captures on pg1/pg2 - if_caps.append(pg.get_capture()) - for packet in if_caps[-1]: - try: - self.logger.debug(ppp("SPD - Got packet:", packet)) - except Exception: - self.logger.error(ppp("Unexpected or invalid packet:", packet)) - raise - self.logger.debug("SPD: Num packets: %s", len(if_caps[0].res)) - self.logger.debug("SPD: Num packets: %s", len(if_caps[1].res)) - - # verify captures that matched BYPASS rule - self.verify_capture(self.pg0, self.pg1, if_caps[0]) - self.verify_capture(self.pg1, self.pg2, if_caps[1]) - # verify that traffic to pg0 matched DISCARD rule and was dropped - self.pg0.assert_nothing_captured() - # verify all packets that were expected to match rules, matched - # pg0 -> pg1 - self.verify_policy_match(pkt_count, policy_01) - self.verify_policy_match(0, policy_02) - # pg1 -> pg2 - self.verify_policy_match(pkt_count, policy_11) - self.verify_policy_match(0, policy_12) - # pg2 -> pg0 - self.verify_policy_match(0, policy_21) - self.verify_policy_match(pkt_count, policy_22) - - -if __name__ == "__main__": - unittest.main(testRunner=VppTestRunner) diff --git a/test/asf/test_lb_api.py b/test/asf/test_lb_api.py index b1e04a93ec3..9608d0473a6 100644 --- a/test/asf/test_lb_api.py +++ b/test/asf/test_lb_api.py @@ -12,13 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -import asfframework -import ipaddress +from asfframework import VppAsfTestCase DEFAULT_VIP = "lb_vip_details(_0=978, context=12, vip=vl_api_lb_ip_addr_t(pfx=IPv6Network(u'::/0'), protocol=<vl_api_ip_proto_t.IP_API_PROTO_RESERVED: 255>, port=0), encap=<vl_api_lb_encap_type_t.LB_API_ENCAP_TYPE_GRE4: 0>, dscp=<vl_api_ip_dscp_t.IP_API_DSCP_CS0: 0>, srv_type=<vl_api_lb_srv_type_t.LB_API_SRV_TYPE_CLUSTERIP: 0>, target_port=0, flow_table_length=0)" # noqa -class TestLbEmptyApi(asfframework.VppTestCase): +class TestLbEmptyApi(VppAsfTestCase): """TestLbEmptyApi""" def test_lb_empty_vip_dump(self): @@ -35,7 +34,7 @@ class TestLbEmptyApi(asfframework.VppTestCase): self.assertEqual(rv, [], "Expected: [] Received: %r." % rv) -class TestLbApi(asfframework.VppTestCase): +class TestLbApi(VppAsfTestCase): """TestLbApi""" def test_lb_vip_dump(self): @@ -56,7 +55,7 @@ class TestLbApi(asfframework.VppTestCase): self.vapi.cli("lb vip 2001::/16 del") -class TestLbAsApi(asfframework.VppTestCase): +class TestLbAsApi(VppAsfTestCase): """TestLbAsApi""" def test_lb_as_dump(self): diff --git a/test/asf/test_mactime.py b/test/asf/test_mactime.py index 1becd6f2eb3..215bd132cf0 100644 --- a/test/asf/test_mactime.py +++ b/test/asf/test_mactime.py @@ -3,11 +3,10 @@ import unittest from config import config -from asfframework import VppTestCase, VppTestRunner -from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath +from asfframework import VppAsfTestCase, VppTestRunner -class TestMactime(VppTestCase): +class TestMactime(VppAsfTestCase): """Mactime Unit Test Cases""" @classmethod diff --git a/test/asf/test_mpcap.py b/test/asf/test_mpcap.py index 854182d84a2..ed8ce1e0ea9 100644 --- a/test/asf/test_mpcap.py +++ b/test/asf/test_mpcap.py @@ -2,12 +2,11 @@ import unittest -from asfframework import VppTestCase, VppTestRunner -from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath +from asfframework import VppAsfTestCase, VppTestRunner import os -class TestMpcap(VppTestCase): +class TestMpcap(VppAsfTestCase): """Mpcap Unit Test Cases""" @classmethod diff --git a/test/asf/test_node_variants.py b/test/asf/test_node_variants.py index 5762664ca93..c0c7cc35658 100644 --- a/test/asf/test_node_variants.py +++ b/test/asf/test_node_variants.py @@ -2,7 +2,7 @@ import re import unittest import platform -from asfframework import VppTestCase +from asfframework import VppAsfTestCase def checkX86(): @@ -19,7 +19,7 @@ def skipVariant(variant): return checkX86() and match is not None -class TestNodeVariant(VppTestCase): +class TestNodeVariant(VppAsfTestCase): """Test Node Variants""" @classmethod diff --git a/test/asf/test_offload.py b/test/asf/test_offload.py index ce5a65d98df..4c800129094 100644 --- a/test/asf/test_offload.py +++ b/test/asf/test_offload.py @@ -2,11 +2,10 @@ import unittest -from asfframework import VppTestCase, VppTestRunner -from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath +from asfframework import VppAsfTestCase, VppTestRunner -class TestOffload(VppTestCase): +class TestOffload(VppAsfTestCase): """Offload Unit Test Cases""" @classmethod diff --git a/test/asf/test_pcap.py b/test/asf/test_pcap.py deleted file mode 100644 index c2ba1384533..00000000000 --- a/test/asf/test_pcap.py +++ /dev/null @@ -1,163 +0,0 @@ -#!/usr/bin/env python3 - -import os -import unittest - -from scapy.layers.l2 import Ether -from scapy.layers.inet import IP, UDP -from scapy.packet import Raw - -from asfframework import VppTestCase, VppTestRunner - - -class TestPcap(VppTestCase): - """Pcap Unit Test Cases""" - - @classmethod - def setUpClass(cls): - super(TestPcap, cls).setUpClass() - - cls.create_pg_interfaces(range(1)) - for i in cls.pg_interfaces: - i.admin_up() - i.config_ip4() - i.resolve_arp() - - @classmethod - def tearDownClass(cls): - for i in cls.pg_interfaces: - i.admin_down() - - super(TestPcap, cls).tearDownClass() - - def setUp(self): - super(TestPcap, self).setUp() - - def tearDown(self): - super(TestPcap, self).tearDown() - - # This is a code coverage test, but it only runs for 0.3 seconds - # might as well just run it... - def test_pcap_unittest(self): - """PCAP Capture Tests""" - cmds = [ - "loop create", - "set int ip address loop0 11.22.33.1/24", - "set int state loop0 up", - "loop create", - "set int ip address loop1 11.22.34.1/24", - "set int state loop1 up", - "set ip neighbor loop1 11.22.34.44 03:00:11:22:34:44", - "packet-generator new {\n" - " name s0\n" - " limit 10\n" - " size 128-128\n" - " interface loop0\n" - " tx-interface loop1\n" - " node loop1-output\n" - " buffer-flags ip4 offload\n" - " buffer-offload-flags offload-ip-cksum offload-udp-cksum\n" - " data {\n" - " IP4: 1.2.3 -> dead.0000.0001\n" - " UDP: 11.22.33.44 -> 11.22.34.44\n" - " ttl 2 checksum 13\n" - " UDP: 1234 -> 2345\n" - " checksum 11\n" - " incrementing 114\n" - " }\n" - "}", - "pcap dispatch trace on max 100 buffer-trace pg-input 10", - "pa en", - "pcap dispatch trace off", - "pcap trace rx tx max 1000 intfc any", - "pa en", - "pcap trace status", - "pcap trace rx tx off", - "classify filter pcap mask l3 ip4 src match l3 ip4 src 11.22.33.44", - "pcap trace rx tx max 1000 intfc any file filt.pcap filter", - "show cla t verbose 2", - "show cla t verbose", - "show cla t", - "pa en", - "pcap trace rx tx off", - "classify filter pcap del mask l3 ip4 src", - ] - - for cmd in cmds: - r = self.vapi.cli_return_response(cmd) - if r.retval != 0: - if hasattr(r, "reply"): - self.logger.info(cmd + " FAIL reply " + r.reply) - else: - self.logger.info(cmd + " FAIL retval " + str(r.retval)) - - self.assertTrue(os.path.exists("/tmp/dispatch.pcap")) - self.assertTrue(os.path.exists("/tmp/rxtx.pcap")) - self.assertTrue(os.path.exists("/tmp/filt.pcap")) - os.remove("/tmp/dispatch.pcap") - os.remove("/tmp/rxtx.pcap") - os.remove("/tmp/filt.pcap") - - def test_pcap_trace_api(self): - """PCAP API Tests""" - - pkt = ( - Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac) - / IP(src=self.pg0.local_ip4, dst=self.pg0.remote_ip4, ttl=2) - / UDP(sport=1234, dport=2345) - / Raw(b"\xa5" * 128) - ) - - self.vapi.pcap_trace_on( - capture_rx=True, - capture_tx=True, - max_packets=1000, - sw_if_index=0, - filename="trace_any.pcap", - ) - self.pg_send(self.pg0, pkt * 10) - self.vapi.pcap_trace_off() - - self.vapi.cli( - f"classify filter pcap mask l3 ip4 src match l3 ip4 src {self.pg0.local_ip4}" - ) - self.vapi.pcap_trace_on( - capture_rx=True, - capture_tx=True, - filter=True, - max_packets=1000, - sw_if_index=0, - filename="trace_any_filter.pcap", - ) - self.pg_send(self.pg0, pkt * 10) - self.vapi.pcap_trace_off() - self.vapi.cli("classify filter pcap del mask l3 ip4 src") - - pkt = ( - Ether(src=self.pg0.local_mac, dst=self.pg0.remote_mac) - # wrong destination address - / IP(src=self.pg0.local_ip4, dst=self.pg0.local_ip4, ttl=2) - / UDP(sport=1234, dport=2345) - / Raw(b"\xa5" * 128) - ) - - self.vapi.pcap_trace_on( - capture_drop=True, - max_packets=1000, - sw_if_index=0, - error="{ip4-local}.{spoofed_local_packets}", - filename="trace_drop_err.pcap", - ) - self.pg_send(self.pg0, pkt * 10) - self.vapi.pcap_trace_off() - - self.assertTrue(os.path.exists("/tmp/trace_any.pcap")) - self.assertTrue(os.path.exists("/tmp/trace_any_filter.pcap")) - self.assertTrue(os.path.exists("/tmp/trace_drop_err.pcap")) - os.remove("/tmp/trace_any.pcap") - os.remove("/tmp/trace_any_filter.pcap") - os.remove("/tmp/trace_drop_err.pcap") - - -if __name__ == "__main__": - unittest.main(testRunner=VppTestRunner) diff --git a/test/asf/test_policer.py b/test/asf/test_policer.py index c23ec00956d..9c01bf0fc1c 100644 --- a/test/asf/test_policer.py +++ b/test/asf/test_policer.py @@ -3,8 +3,8 @@ import unittest -from asfframework import VppTestCase, VppTestRunner -from vpp_policer import VppPolicer, PolicerAction +from asfframework import VppAsfTestCase, VppTestRunner +from vpp_policer import VppPolicer # Default for the tests is 10s of "Green" packets at 8Mbps, ie. 10M bytes. # The policer helper CLI "sends" 500 byte packets, so default is 20000. @@ -23,7 +23,7 @@ CBURST = 100000 # Committed burst in bytes EBURST = 200000 # Excess burst in bytes -class TestPolicer(VppTestCase): +class TestPolicer(VppAsfTestCase): """Policer Test Case""" def run_policer_test( diff --git a/test/asf/test_quic.py b/test/asf/test_quic.py index 2414186ab13..e453bd5b3e5 100644 --- a/test/asf/test_quic.py +++ b/test/asf/test_quic.py @@ -3,11 +3,9 @@ import unittest import os -import subprocess import signal from config import config -from framework import tag_fixme_vpp_workers -from framework import VppTestCase, VppTestRunner, Worker +from asfframework import VppAsfTestCase, VppTestRunner, Worker, tag_fixme_vpp_workers from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath @@ -53,7 +51,7 @@ class QUICAppWorker(Worker): @unittest.skipIf("quic" in config.excluded_plugins, "Exclude QUIC plugin tests") -class QUICTestCase(VppTestCase): +class QUICTestCase(VppAsfTestCase): """QUIC Test Case""" timeout = 20 diff --git a/test/asf/test_session.py b/test/asf/test_session.py index 885d66c6863..64f59df5758 100644 --- a/test/asf/test_session.py +++ b/test/asf/test_session.py @@ -2,14 +2,17 @@ import unittest -from asfframework import tag_fixme_vpp_workers -from asfframework import VppTestCase, VppTestRunner -from asfframework import tag_run_solo +from asfframework import ( + VppAsfTestCase, + VppTestRunner, + tag_fixme_vpp_workers, + tag_run_solo, +) from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath @tag_fixme_vpp_workers -class TestSession(VppTestCase): +class TestSession(VppAsfTestCase): """Session Test Case""" @classmethod @@ -106,7 +109,7 @@ class TestSession(VppTestCase): @tag_fixme_vpp_workers -class TestSessionUnitTests(VppTestCase): +class TestSessionUnitTests(VppAsfTestCase): """Session Unit Tests Case""" @classmethod @@ -135,7 +138,7 @@ class TestSessionUnitTests(VppTestCase): @tag_run_solo -class TestSegmentManagerTests(VppTestCase): +class TestSegmentManagerTests(VppAsfTestCase): """SVM Fifo Unit Tests Case""" @classmethod @@ -162,7 +165,7 @@ class TestSegmentManagerTests(VppTestCase): @tag_run_solo -class TestSvmFifoUnitTests(VppTestCase): +class TestSvmFifoUnitTests(VppAsfTestCase): """SVM Fifo Unit Tests Case""" @classmethod diff --git a/test/asf/test_sparse_vec.py b/test/asf/test_sparse_vec.py index 614bc2e94bc..cf0afd8aaf3 100644 --- a/test/asf/test_sparse_vec.py +++ b/test/asf/test_sparse_vec.py @@ -2,11 +2,10 @@ import unittest -from asfframework import VppTestCase, VppTestRunner -from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath +from asfframework import VppAsfTestCase, VppTestRunner -class TestSparseVec(VppTestCase): +class TestSparseVec(VppAsfTestCase): """SparseVec Test Cases""" @classmethod diff --git a/test/asf/test_string.py b/test/asf/test_string.py index 3a861ef97a8..2eeecd7dfd8 100644 --- a/test/asf/test_string.py +++ b/test/asf/test_string.py @@ -2,11 +2,10 @@ import unittest -from asfframework import VppTestCase, VppTestRunner -from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath +from asfframework import VppAsfTestCase, VppTestRunner -class TestString(VppTestCase): +class TestString(VppAsfTestCase): """String Test Cases""" @classmethod diff --git a/test/asf/test_tap.py b/test/asf/test_tap.py index 1a9d0ac56b8..c436ec6b6ae 100644 --- a/test/asf/test_tap.py +++ b/test/asf/test_tap.py @@ -1,7 +1,7 @@ import unittest import os -from asfframework import VppTestCase, VppTestRunner +from asfframework import VppAsfTestCase, VppTestRunner from vpp_devices import VppTAPInterface @@ -10,7 +10,7 @@ def check_tuntap_driver_access(): @unittest.skip("Requires root") -class TestTAP(VppTestCase): +class TestTAP(VppAsfTestCase): """TAP Test Case""" def test_tap_add_del(self): diff --git a/test/asf/test_tcp.py b/test/asf/test_tcp.py index 4a16d573668..69fc5c472a5 100644 --- a/test/asf/test_tcp.py +++ b/test/asf/test_tcp.py @@ -2,11 +2,11 @@ import unittest -from asfframework import VppTestCase, VppTestRunner +from asfframework import VppAsfTestCase, VppTestRunner from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath -class TestTCP(VppTestCase): +class TestTCP(VppAsfTestCase): """TCP Test Case""" @classmethod @@ -93,7 +93,7 @@ class TestTCP(VppTestCase): ip_t10.remove_vpp_config() -class TestTCPUnitTests(VppTestCase): +class TestTCPUnitTests(VppAsfTestCase): "TCP Unit Tests" @classmethod diff --git a/test/asf/test_tls.py b/test/asf/test_tls.py index e70c63d9a32..d2d1d9a4747 100644 --- a/test/asf/test_tls.py +++ b/test/asf/test_tls.py @@ -5,7 +5,7 @@ import os import re import subprocess -from asfframework import VppTestCase, VppTestRunner +from asfframework import VppAsfTestCase, VppTestRunner from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath @@ -52,7 +52,7 @@ def checkAll(): return ret -class TestTLS(VppTestCase): +class TestTLS(VppAsfTestCase): """TLS Qat Test Case.""" @classmethod diff --git a/test/asf/test_vapi.py b/test/asf/test_vapi.py index 2eb47b59017..10d9411dbb6 100644 --- a/test/asf/test_vapi.py +++ b/test/asf/test_vapi.py @@ -5,10 +5,10 @@ import unittest import os import signal from config import config -from asfframework import VppTestCase, VppTestRunner, Worker +from asfframework import VppAsfTestCase, VppTestRunner, Worker -class VAPITestCase(VppTestCase): +class VAPITestCase(VppAsfTestCase): """VAPI test""" @classmethod diff --git a/test/asf/test_vcl.py b/test/asf/test_vcl.py index 59c077ee4f7..a1113b863e8 100644 --- a/test/asf/test_vcl.py +++ b/test/asf/test_vcl.py @@ -7,8 +7,8 @@ import subprocess import signal import glob from config import config -from asfframework import VppTestCase, VppTestRunner, Worker -from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath, FibPathProto +from asfframework import VppAsfTestCase, VppTestRunner, Worker +from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath iperf3 = "/usr/bin/iperf3" @@ -58,7 +58,7 @@ class VCLAppWorker(Worker): super(VCLAppWorker, self).__init__(self.args, logger, env, *args, **kwargs) -class VCLTestCase(VppTestCase): +class VCLTestCase(VppAsfTestCase): """VCL Test Class""" session_startup = ["poll-main"] @@ -84,7 +84,7 @@ class VCLTestCase(VppTestCase): self.timeout = 20 self.echo_phrase = "Hello, world! Jenny is a friend of mine." self.pre_test_sleep = 0.3 - self.post_test_sleep = 0.2 + self.post_test_sleep = 1 self.sapi_client_sock = "" self.sapi_server_sock = "" diff --git a/test/asf/test_vhost.py b/test/asf/test_vhost.py index eb584633d5a..622716cafe3 100644 --- a/test/asf/test_vhost.py +++ b/test/asf/test_vhost.py @@ -2,12 +2,12 @@ import unittest -from asfframework import VppTestCase, VppTestRunner +from asfframework import VppAsfTestCase, VppTestRunner from vpp_vhost_interface import VppVhostInterface -class TesVhostInterface(VppTestCase): +class TesVhostInterface(VppAsfTestCase): """Vhost User Test Case""" @classmethod diff --git a/test/asf/test_vlib.py b/test/asf/test_vlib.py deleted file mode 100644 index dce08b823b4..00000000000 --- a/test/asf/test_vlib.py +++ /dev/null @@ -1,327 +0,0 @@ -#!/usr/bin/env python3 - -import unittest -import pexpect -import time -import signal -from config import config -from asfframework import VppTestCase, VppTestRunner -from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath -from scapy.layers.inet import IP, ICMP -from scapy.layers.l2 import Ether -from scapy.packet import Raw - - -@unittest.skipUnless(config.gcov, "part of code coverage tests") -class TestVlib(VppTestCase): - """Vlib Unit Test Cases""" - - vpp_worker_count = 1 - - @classmethod - def setUpClass(cls): - super(TestVlib, cls).setUpClass() - - @classmethod - def tearDownClass(cls): - super(TestVlib, cls).tearDownClass() - - def setUp(self): - super(TestVlib, self).setUp() - - def tearDown(self): - super(TestVlib, self).tearDown() - - def test_vlib_main_unittest(self): - """Vlib main.c Code Coverage Test""" - - cmds = [ - "loopback create", - "packet-generator new {\n" - " name vlib\n" - " limit 15\n" - " size 128-128\n" - " interface loop0\n" - " node ethernet-input\n" - " data {\n" - " IP6: 00:d0:2d:5e:86:85 -> 00:0d:ea:d0:00:00\n" - " ICMP: db00::1 -> db00::2\n" - " incrementing 30\n" - " }\n" - "}\n", - "event-logger trace dispatch", - "event-logger stop", - "event-logger clear", - "event-logger resize 102400", - "event-logger restart", - "pcap dispatch trace on max 100 buffer-trace pg-input 15", - "pa en", - "show event-log 100 all", - "event-log save", - "event-log save foo", - "pcap dispatch trace", - "pcap dispatch trace status", - "pcap dispatch trace off", - "show vlib frame-allocation", - ] - - for cmd in cmds: - r = self.vapi.cli_return_response(cmd) - if r.retval != 0: - if hasattr(r, "reply"): - self.logger.info(cmd + " FAIL reply " + r.reply) - else: - self.logger.info(cmd + " FAIL retval " + str(r.retval)) - - def test_vlib_node_cli_unittest(self): - """Vlib node_cli.c Code Coverage Test""" - - cmds = [ - "loopback create", - "packet-generator new {\n" - " name vlib\n" - " limit 15\n" - " size 128-128\n" - " interface loop0\n" - " node ethernet-input\n" - " data {\n" - " IP6: 00:d0:2d:5e:86:85 -> 00:0d:ea:d0:00:00\n" - " ICMP: db00::1 -> db00::2\n" - " incrementing 30\n" - " }\n" - "}\n", - "show vlib graph", - "show vlib graph ethernet-input", - "show vlib graphviz", - "show vlib graphviz graphviz.dot", - "pa en", - "show runtime ethernet-input", - "show runtime brief verbose max summary", - "clear runtime", - "show node index 1", - "show node ethernet-input", - "show node pg-input", - "set node function", - "set node function no-such-node", - "set node function cdp-input default", - "set node function ethernet-input default", - "set node function ethernet-input bozo", - "set node function ethernet-input", - "show \t", - ] - - for cmd in cmds: - r = self.vapi.cli_return_response(cmd) - if r.retval != 0: - if hasattr(r, "reply"): - self.logger.info(cmd + " FAIL reply " + r.reply) - else: - self.logger.info(cmd + " FAIL retval " + str(r.retval)) - - def test_vlib_buffer_c_unittest(self): - """Vlib buffer.c Code Coverage Test""" - - cmds = [ - "loopback create", - "packet-generator new {\n" - " name vlib\n" - " limit 15\n" - " size 128-128\n" - " interface loop0\n" - " node ethernet-input\n" - " data {\n" - " IP6: 00:d0:2d:5e:86:85 -> 00:0d:ea:d0:00:00\n" - " ICMP: db00::1 -> db00::2\n" - " incrementing 30\n" - " }\n" - "}\n", - "event-logger trace", - "event-logger trace enable", - "event-logger trace api cli barrier", - "pa en", - "show interface bogus", - "event-logger trace disable api cli barrier", - "event-logger trace circuit-node ethernet-input", - "event-logger trace circuit-node ethernet-input disable", - "clear interfaces", - "test vlib", - "test vlib2", - "show memory api-segment stats-segment main-heap verbose", - "leak-check { show memory }", - "show cpu", - "memory-trace main-heap", - "memory-trace main-heap api-segment stats-segment", - "leak-check { show version }", - "show version ?", - "comment { show version }", - "uncomment { show version }", - "show memory main-heap", - "show memory bogus", - "choices", - "test heap-validate", - "memory-trace main-heap disable", - "show buffers", - "show eve", - "show help", - "show ip ", - ] - - for cmd in cmds: - r = self.vapi.cli_return_response(cmd) - if r.retval != 0: - if hasattr(r, "reply"): - self.logger.info(cmd + " FAIL reply " + r.reply) - else: - self.logger.info(cmd + " FAIL retval " + str(r.retval)) - - def test_vlib_format_unittest(self): - """Vlib format.c Code Coverage Test""" - - cmds = [ - "loopback create", - "classify filter pcap mask l2 proto match l2 proto 0x86dd", - "classify filter pcap del", - "test format-vlib", - ] - - for cmd in cmds: - r = self.vapi.cli_return_response(cmd) - if r.retval != 0: - if hasattr(r, "reply"): - self.logger.info(cmd + " FAIL reply " + r.reply) - else: - self.logger.info(cmd + " FAIL retval " + str(r.retval)) - - def test_vlib_main_unittest(self): - """Private Binary API Segment Test (takes 70 seconds)""" - - vat_path = config.vpp + "_api_test" - vat = pexpect.spawn(vat_path, ["socket-name", self.get_api_sock_path()]) - vat.expect("vat# ", timeout=10) - vat.sendline("sock_init_shm") - vat.expect("vat# ", timeout=10) - vat.sendline("sh api cli") - vat.kill(signal.SIGKILL) - vat.wait() - self.logger.info("vat terminated, 70 second wait for the Reaper") - time.sleep(70) - self.logger.info("Reaper should be complete...") - - def test_pool(self): - """Fixed-size Pool Test""" - - cmds = [ - "test pool", - ] - - for cmd in cmds: - r = self.vapi.cli_return_response(cmd) - if r.retval != 0: - if hasattr(r, "reply"): - self.logger.info(cmd + " FAIL reply " + r.reply) - else: - self.logger.info(cmd + " FAIL retval " + str(r.retval)) - - -class TestVlibFrameLeak(VppTestCase): - """Vlib Frame Leak Test Cases""" - - vpp_worker_count = 1 - - @classmethod - def setUpClass(cls): - super(TestVlibFrameLeak, cls).setUpClass() - - @classmethod - def tearDownClass(cls): - super(TestVlibFrameLeak, cls).tearDownClass() - - def setUp(self): - super(TestVlibFrameLeak, self).setUp() - # create 1 pg interface - self.create_pg_interfaces(range(1)) - - for i in self.pg_interfaces: - i.admin_up() - i.config_ip4() - i.resolve_arp() - - def tearDown(self): - super(TestVlibFrameLeak, self).tearDown() - for i in self.pg_interfaces: - i.unconfig_ip4() - i.admin_down() - - def test_vlib_mw_refork_frame_leak(self): - """Vlib worker thread refork leak test case""" - icmp_id = 0xB - icmp_seq = 5 - icmp_load = b"\x0a" * 18 - pkt = ( - Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) - / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) - / ICMP(id=icmp_id, seq=icmp_seq) - / Raw(load=icmp_load) - ) - - # Send a packet - self.pg0.add_stream(pkt) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - - rx = self.pg0.get_capture(1) - - self.assertEquals(len(rx), 1) - rx = rx[0] - ether = rx[Ether] - ipv4 = rx[IP] - - self.assertEqual(ether.src, self.pg0.local_mac) - self.assertEqual(ether.dst, self.pg0.remote_mac) - - self.assertEqual(ipv4.src, self.pg0.local_ip4) - self.assertEqual(ipv4.dst, self.pg0.remote_ip4) - - # Save allocated frame count - frame_allocated = {} - for fs in self.vapi.cli("show vlib frame-allocation").splitlines()[1:]: - spl = fs.split() - thread = int(spl[0]) - size = int(spl[1]) - alloc = int(spl[2]) - key = (thread, size) - frame_allocated[key] = alloc - - # cause reforks - _ = self.create_loopback_interfaces(1) - - # send the same packet - self.pg0.add_stream(pkt) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - - rx = self.pg0.get_capture(1) - - self.assertEquals(len(rx), 1) - rx = rx[0] - ether = rx[Ether] - ipv4 = rx[IP] - - self.assertEqual(ether.src, self.pg0.local_mac) - self.assertEqual(ether.dst, self.pg0.remote_mac) - - self.assertEqual(ipv4.src, self.pg0.local_ip4) - self.assertEqual(ipv4.dst, self.pg0.remote_ip4) - - # Check that no frame were leaked during refork - for fs in self.vapi.cli("show vlib frame-allocation").splitlines()[1:]: - spl = fs.split() - thread = int(spl[0]) - size = int(spl[1]) - alloc = int(spl[2]) - key = (thread, size) - self.assertEqual(frame_allocated[key], alloc) - - -if __name__ == "__main__": - unittest.main(testRunner=VppTestRunner) diff --git a/test/asf/test_vpe_api.py b/test/asf/test_vpe_api.py index 426a3878c59..4d866ec906a 100644 --- a/test/asf/test_vpe_api.py +++ b/test/asf/test_vpe_api.py @@ -13,13 +13,12 @@ # limitations under the License. import datetime import time -import unittest -from asfframework import VppTestCase +from asfframework import VppAsfTestCase enable_print = False -class TestVpeApi(VppTestCase): +class TestVpeApi(VppAsfTestCase): """TestVpeApi""" def test_log_dump_default(self): diff --git a/test/asf/test_vppinfra.py b/test/asf/test_vppinfra.py index 4b49628cf58..56391bfb13c 100644 --- a/test/asf/test_vppinfra.py +++ b/test/asf/test_vppinfra.py @@ -2,10 +2,10 @@ import unittest -from asfframework import VppTestCase, VppTestRunner +from asfframework import VppAsfTestCase, VppTestRunner -class TestVppinfra(VppTestCase): +class TestVppinfra(VppAsfTestCase): """Vppinfra Unit Test Cases""" @classmethod |