aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/Makefile9
-rw-r--r--test/asf/asfframework.py63
-rw-r--r--test/asf/test_auto_sdl.py593
-rw-r--r--test/asf/test_session.py155
-rw-r--r--test/asf/test_session_sdl.py2
-rw-r--r--test/asf/test_vcl.py6
-rw-r--r--test/test_bpf_trace_filter.py60
-rw-r--r--test/test_cnat.py126
-rw-r--r--test/test_dhcp.py69
-rw-r--r--test/test_flowprobe.py1
-rw-r--r--test/test_ikev2.py20
-rw-r--r--test/test_ip4.py7
-rw-r--r--test/test_ip6.py7
-rw-r--r--test/test_ipsec_api.py63
-rw-r--r--test/test_ipsec_spd_fp_input.py46
-rw-r--r--test/test_nat44_ed.py2
-rw-r--r--test/test_nat64.py1
-rw-r--r--test/test_neighbor.py5
-rw-r--r--test/test_pcap.py27
-rw-r--r--test/test_pg_stream.py241
-rw-r--r--test/test_sflow.py212
-rw-r--r--test/test_snort.py54
-rw-r--r--test/vpp_papi_provider.py5
23 files changed, 1618 insertions, 156 deletions
diff --git a/test/Makefile b/test/Makefile
index 79feba9165e..1d22832edb7 100644
--- a/test/Makefile
+++ b/test/Makefile
@@ -389,23 +389,20 @@ COV_REM_TODO_NO_TEST="*/vpp-api/client/*" "*/plugins/prom/*" \
"*/vnet/ethernet/ethernet_format_fns.h" \
"*/plugins/ikev2/ikev2_format.c" "*/vnet/bier/bier_types.c"
-COV_REM_ALT_TEST="*/plugins/hs_apps/*" "*/plugins/http/*.h"
-
.PHONY: cov-post
cov-post: wipe-cov $(BUILD_COV_DIR)
- @lcov --ignore-errors --capture \
+ @lcov --ignore-errors unused,empty,mismatch,gcov --capture \
--directory $(VPP_BUILD_DIR) \
--output-file $(BUILD_COV_DIR)/coverage$(HS_TEST).info
@test -z "$(EXTERN_COV_DIR)" || \
- lcov --ignore-errors --capture \
+ lcov --ignore-errors unused,empty,mismatch,gcov --capture \
--directory $(EXTERN_COV_DIR) \
--output-file $(BUILD_COV_DIR)/extern-coverage$(HS_TEST).info
- @lcov --ignore-errors --remove $(BUILD_COV_DIR)/coverage$(HS_TEST).info \
+ @lcov --ignore-errors unused,empty,mismatch,gcov --remove $(BUILD_COV_DIR)/coverage$(HS_TEST).info \
$(COV_REM_NOT_CODE) \
$(COV_REM_DRIVERS) \
$(COV_REM_TODO_NO_TEST) \
$(COV_REM_UNUSED_FEAT) \
- $(COV_REM_ALT_TEST) \
-o $(BUILD_COV_DIR)/coverage-filtered$(HS_TEST).info
@genhtml $(BUILD_COV_DIR)/coverage-filtered$(HS_TEST).info \
--output-directory $(BUILD_COV_DIR)/html
diff --git a/test/asf/asfframework.py b/test/asf/asfframework.py
index bd1b45c6476..7670a0753d1 100644
--- a/test/asf/asfframework.py
+++ b/test/asf/asfframework.py
@@ -155,28 +155,6 @@ def _is_platform_aarch64():
is_platform_aarch64 = _is_platform_aarch64()
-def _is_distro_ubuntu2204():
- with open("/etc/os-release") as f:
- for line in f.readlines():
- if "jammy" in line:
- return True
- return False
-
-
-is_distro_ubuntu2204 = _is_distro_ubuntu2204()
-
-
-def _is_distro_ubuntu2404():
- with open("/etc/os-release") as f:
- for line in f.readlines():
- if "noble" in line:
- return True
- return False
-
-
-is_distro_ubuntu2404 = _is_distro_ubuntu2404()
-
-
def _is_distro_debian11():
with open("/etc/os-release") as f:
for line in f.readlines():
@@ -233,14 +211,10 @@ class TestCaseTag(Enum):
FIXME_VPP_WORKERS = 2
# marks the suites broken when ASan is enabled
FIXME_ASAN = 3
- # marks suites broken on Ubuntu-22.04
- FIXME_UBUNTU2204 = 4
# marks suites broken on Debian-11
- FIXME_DEBIAN11 = 5
+ FIXME_DEBIAN11 = 4
# marks suites broken on debug vpp image
- FIXME_VPP_DEBUG = 6
- # marks suites broken on Ubuntu-24.04
- FIXME_UBUNTU2404 = 7
+ FIXME_VPP_DEBUG = 5
def create_tag_decorator(e):
@@ -257,10 +231,8 @@ def create_tag_decorator(e):
tag_run_solo = create_tag_decorator(TestCaseTag.RUN_SOLO)
tag_fixme_vpp_workers = create_tag_decorator(TestCaseTag.FIXME_VPP_WORKERS)
tag_fixme_asan = create_tag_decorator(TestCaseTag.FIXME_ASAN)
-tag_fixme_ubuntu2204 = create_tag_decorator(TestCaseTag.FIXME_UBUNTU2204)
tag_fixme_debian11 = create_tag_decorator(TestCaseTag.FIXME_DEBIAN11)
tag_fixme_vpp_debug = create_tag_decorator(TestCaseTag.FIXME_VPP_DEBUG)
-tag_fixme_ubuntu2404 = create_tag_decorator(TestCaseTag.FIXME_UBUNTU2404)
class DummyVpp:
@@ -323,18 +295,6 @@ class VppAsfTestCase(CPUInterface, unittest.TestCase):
cls = unittest.skip("Skipping @tag_fixme_asan tests")(cls)
@classmethod
- def skip_fixme_ubuntu2204(cls):
- """if @tag_fixme_ubuntu2204 & is Ubuntu22.04 - mark for skip"""
- if cls.has_tag(TestCaseTag.FIXME_UBUNTU2204) and is_distro_ubuntu2204 == True:
- cls = unittest.skip("Skipping @tag_fixme_ubuntu2204 tests")(cls)
-
- @classmethod
- def skip_fixme_ubuntu2404(cls):
- """if @tag_fixme_ubuntu2404 & is Ubuntu24.04 - mark for skip"""
- if cls.has_tag(TestCaseTag.FIXME_UBUNTU2404) and is_distro_ubuntu2404 == True:
- cls = unittest.skip("Skipping @tag_fixme_ubuntu2404 tests")(cls)
-
- @classmethod
def instance(cls):
"""Return the instance of this testcase"""
return cls.test_instance
@@ -479,6 +439,11 @@ class VppAsfTestCase(CPUInterface, unittest.TestCase):
"{",
"enable",
"}",
+ "plugin",
+ "auto_sdl_unittest_plugin.so",
+ "{",
+ "enable",
+ "}",
]
+ cls.extra_vpp_plugin_config
+ [
@@ -1381,20 +1346,6 @@ class VppTestResult(unittest.TestResult):
test_title = colorize(f"FIXME with ASAN: {test_title}", RED)
test.skip_fixme_asan()
- if (
- test.has_tag(TestCaseTag.FIXME_UBUNTU2204)
- and is_distro_ubuntu2204 == True
- ):
- test_title = colorize(f"FIXME with Ubuntu 22.04: {test_title}", RED)
- test.skip_fixme_ubuntu2204()
-
- if (
- test.has_tag(TestCaseTag.FIXME_UBUNTU2404)
- and is_distro_ubuntu2404 == True
- ):
- test_title = colorize(f"FIXME with Ubuntu 24.04: {test_title}", RED)
- test.skip_fixme_ubuntu2404()
-
if hasattr(test, "vpp_worker_count"):
if test.vpp_worker_count == 0:
test_title += " [main thread only]"
diff --git a/test/asf/test_auto_sdl.py b/test/asf/test_auto_sdl.py
new file mode 100644
index 00000000000..6b101c54e5c
--- /dev/null
+++ b/test/asf/test_auto_sdl.py
@@ -0,0 +1,593 @@
+#!/usr/bin/env python3
+
+import subprocess
+import socket
+
+import unittest
+
+from asfframework import (
+ VppAsfTestCase,
+ VppTestRunner,
+ tag_fixme_vpp_workers,
+ tag_run_solo,
+)
+from config import config
+from ipaddress import IPv4Network, IPv6Network
+from vpp_acl import AclRule, VppAcl, VppAclInterface
+
+from vpp_ip_route import (
+ VppIpRoute,
+ VppRoutePath,
+ VppIpTable,
+)
+
+from vpp_papi import VppEnum
+
+
+VPP_TAP_IP4 = "8.8.8.1"
+VPP_TAP_IP6 = "2001::1"
+
+HOST_TAP_IP4 = "8.8.8.2"
+HOST_TAP_IP6 = "2001::2"
+SCALE_COUNT = 10
+
+
+@tag_fixme_vpp_workers
+class TestAutoSDLUnitTests(VppAsfTestCase):
+ """Auto SDL Unit Tests Case"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestAutoSDLUnitTests, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestAutoSDLUnitTests, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestAutoSDLUnitTests, self).setUp()
+ self.vapi.session_enable_disable(is_enable=1)
+
+ def test_session(self):
+ """Auto SDL Unit Tests"""
+ error = self.vapi.cli("test auto-sdl all")
+
+ if error:
+ self.logger.critical(error)
+ self.assertNotIn("failed", error)
+
+ def tearDown(self):
+ super(TestAutoSDLUnitTests, self).tearDown()
+ self.vapi.session_enable_disable(is_enable=0)
+
+
+@tag_fixme_vpp_workers
+@unittest.skipUnless(config.extended, "part of extended tests")
+class TestAutoSDL(VppAsfTestCase):
+ """Auto SDL Baasic Test Case"""
+
+ tcp_startup = ["syn-rcvd-time 1"]
+
+ @classmethod
+ def setUpClass(cls):
+ if cls.tcp_startup:
+ conf = "tcp {" + " ".join(cls.tcp_startup) + "}"
+ cls.extra_vpp_config = [conf]
+ super(TestAutoSDL, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestAutoSDL, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestAutoSDL, self).setUp()
+
+ self.vapi.session_enable_disable_v2(
+ rt_engine_type=VppEnum.vl_api_rt_backend_engine_t.RT_BACKEND_ENGINE_API_SDL
+ )
+
+ # self.logger.info(self.vapi.cli("create tap host-ip4-addr HOST_TAP_IP/24"))
+ self.tap0 = self.vapi.tap_create_v3(
+ id=0,
+ host_ip4_prefix=HOST_TAP_IP4 + "/24",
+ host_ip4_prefix_set=True,
+ host_ip6_prefix=HOST_TAP_IP6 + "/64",
+ host_ip6_prefix_set=True,
+ )
+
+ # self.logger.info(self.vapi.cli("set interface state tap0 up"))
+ self.vapi.sw_interface_set_flags(sw_if_index=self.tap0.sw_if_index, flags=1)
+
+ def tearDown(self):
+ self.logger.info(
+ self.vapi.sw_interface_add_del_address(
+ is_add=0,
+ sw_if_index=self.tap0.sw_if_index,
+ prefix=VPP_TAP_IP4 + "/24",
+ del_all=1,
+ )
+ )
+ # self.logger.info(self.vapi.cli("set interface ip address tap0 VPP_TAP_IP6/64"))
+ self.logger.info(
+ self.vapi.sw_interface_add_del_address(
+ is_add=0,
+ sw_if_index=self.tap0.sw_if_index,
+ prefix=VPP_TAP_IP6 + "/64",
+ del_all=1,
+ )
+ )
+ self.logger.info(self.vapi.tap_delete_v2(self.tap0.sw_if_index))
+ self.logger.info(
+ self.vapi.session_enable_disable_v2(
+ rt_engine_type=VppEnum.vl_api_rt_backend_engine_t.RT_BACKEND_ENGINE_API_DISABLE
+ )
+ )
+ dump = self.vapi.session_sdl_v3_dump()
+ self.assertTrue(len(dump) == 0)
+ super(TestAutoSDL, self).tearDown()
+
+ def test_auto_sdl(self):
+ """Auto SDL test"""
+
+ # self.logger.info(self.vapi.cli("set interface ip address tap0 VPP_TAP_IP/24"))
+ self.vapi.sw_interface_add_del_address(
+ sw_if_index=self.tap0.sw_if_index, prefix=VPP_TAP_IP4 + "/24"
+ )
+ # self.logger.info(self.vapi.cli("set interface ip address tap0 VPP_TAP_IP6/64"))
+ self.vapi.sw_interface_add_del_address(
+ sw_if_index=self.tap0.sw_if_index, prefix=VPP_TAP_IP6 + "/64"
+ )
+
+ # start the cli server
+ self.logger.info("Starting cli sever")
+ self.logger.info(self.vapi.cli("http cli server"))
+
+ self.logger.info(
+ self.vapi.cli("http cli server uri http://::0/80 listener add")
+ )
+
+ # Test 1. No ACL. curl should work.
+ self.logger.info("Starting test 1")
+ for i in range(10):
+ try:
+ process = subprocess.run(
+ [
+ "curl",
+ "--noproxy",
+ "'*'",
+ "http://" + VPP_TAP_IP4 + ":80/sh/version",
+ ],
+ capture_output=True,
+ timeout=2,
+ )
+ except:
+ self.logger.info("timeout")
+ else:
+ break
+ self.assertEqual(0, process.returncode)
+ self.logger.info("Test 1 passed")
+
+ # Test 2. Add ACL to block the source.
+ rule = AclRule(
+ is_permit=0,
+ proto=6,
+ src_prefix=IPv4Network("8.8.0.0/16"),
+ dst_prefix=IPv4Network(VPP_TAP_IP4 + "/32"),
+ ports=80,
+ )
+ acl = VppAcl(self, rules=[rule])
+ acl.add_vpp_config()
+
+ # Apply the ACL on the interface output
+ # Auto SDL entry should be created and timed out accordingly
+ acl_if_e = VppAclInterface(
+ self, sw_if_index=self.tap0.sw_if_index, n_input=0, acls=[acl]
+ )
+ acl_if_e.add_vpp_config()
+
+ self.vapi.auto_sdl_config(threshold=2, remove_timeout=3, enable=True)
+
+ for i in range(2):
+ try:
+ process = subprocess.run(
+ [
+ "curl",
+ "--noproxy",
+ "'*'",
+ "http://" + VPP_TAP_IP4 + ":80/sh/version",
+ ],
+ capture_output=True,
+ timeout=2,
+ )
+ except:
+ self.logger.info("curl timeout -- as exepcted")
+
+ # check for the SDL entry
+ for i in range(10):
+ dump = self.vapi.session_sdl_v3_dump()
+ if len(dump) == 0:
+ self.sleep(1)
+ self.assertTrue(len(dump) > 0)
+ self.assertEqual(dump[0].rmt, IPv4Network(HOST_TAP_IP4 + "/32"))
+
+ # verify entry is timed out and removed eventually
+ for i in range(10):
+ dump = self.vapi.session_sdl_v3_dump()
+ if len(dump) > 0:
+ self.sleep(1)
+ self.assertTrue(len(dump) == 0)
+
+ acl_if_e.remove_vpp_config()
+ acl.remove_vpp_config()
+ self.logger.info("Test 2 passed")
+
+ # Test 3: Do the same for IPv6
+ process = subprocess.run(
+ [
+ "curl",
+ "-6",
+ "--noproxy",
+ "'*'",
+ "http://" + "[" + VPP_TAP_IP6 + "]" + ":80/sh/version",
+ ],
+ capture_output=True,
+ )
+ self.assertEqual(0, process.returncode)
+
+ rule = AclRule(
+ is_permit=0,
+ proto=6,
+ src_prefix=IPv6Network(HOST_TAP_IP6 + "/128"),
+ dst_prefix=IPv6Network(VPP_TAP_IP6 + "/128"),
+ ports=80,
+ )
+ acl = VppAcl(self, rules=[rule])
+ acl.add_vpp_config()
+
+ # Apply the ACL on the interface output
+ acl_if_e = VppAclInterface(
+ self, sw_if_index=self.tap0.sw_if_index, n_input=0, acls=[acl]
+ )
+ acl_if_e.add_vpp_config()
+
+ for i in range(2):
+ try:
+ process = subprocess.run(
+ [
+ "curl",
+ "-6",
+ "--noproxy",
+ "'*'",
+ "http://" + "[" + VPP_TAP_IP6 + "]" + ":80/sh/version",
+ ],
+ capture_output=True,
+ timeout=2,
+ )
+ except:
+ self.logger.info("curl timeout -- as exepcted")
+
+ acl_if_e.remove_vpp_config()
+ acl.remove_vpp_config()
+
+ # verify the SDL entry is added
+ for i in range(5):
+ dump = self.vapi.session_sdl_v3_dump()
+ if len(dump) == 0:
+ self.sleep(1)
+ self.assertTrue(len(dump) > 0)
+ self.assertEqual(dump[0].rmt, IPv6Network(HOST_TAP_IP6 + "/128"))
+
+ # verify the entry is removed after timeout
+ for i in range(10):
+ dump = self.vapi.session_sdl_v3_dump()
+ if len(dump) > 0:
+ self.sleep(1)
+ self.assertEqual(len(dump), 0)
+ self.logger.info("Test 3 passed")
+
+ self.vapi.auto_sdl_config(enable=False)
+
+ # bring down the cli server
+ self.logger.info(self.vapi.cli("http cli server listener del"))
+ self.logger.info(
+ self.vapi.cli("http cli server uri http://::0/80 listener del")
+ )
+
+ def test_auto_sdl_appns_ip4(self):
+ """Auto SDL with appns test -- ip4"""
+
+ # Test tap0 in appns 1
+ table_id = 1
+ tbl = VppIpTable(self, table_id)
+ tbl.add_vpp_config()
+
+ # place tap0 to vrf 1
+ self.vapi.sw_interface_set_table(
+ self.tap0.sw_if_index, is_ipv6=0, vrf_id=table_id
+ )
+
+ # place tap0 to appns 1
+ self.vapi.app_namespace_add_del_v4(
+ namespace_id="1", secret=1, sw_if_index=self.tap0.sw_if_index
+ )
+ # configure ip4 address on tap0
+ self.vapi.sw_interface_add_del_address(
+ sw_if_index=self.tap0.sw_if_index, prefix=VPP_TAP_IP4 + "/24"
+ )
+
+ # start http cli server in appns 1
+ self.logger.info(self.vapi.cli("http cli server appns 1 secret 1"))
+
+ process = subprocess.run(
+ [
+ "curl",
+ "--noproxy",
+ "'*'",
+ "http://" + VPP_TAP_IP4 + ":80/sh/version",
+ ],
+ timeout=1,
+ capture_output=True,
+ )
+ self.assertEqual(0, process.returncode)
+
+ # Apply the ACL on the interface output
+ rule = AclRule(
+ is_permit=0,
+ proto=6,
+ src_prefix=IPv4Network("8.8.0.0/16"),
+ dst_prefix=IPv4Network(VPP_TAP_IP4 + "/32"),
+ ports=80,
+ )
+ acl = VppAcl(self, rules=[rule])
+ acl.add_vpp_config()
+
+ acl_if_e = VppAclInterface(
+ self, sw_if_index=self.tap0.sw_if_index, n_input=0, acls=[acl]
+ )
+ acl_if_e.add_vpp_config()
+
+ self.vapi.auto_sdl_config(threshold=1, remove_timeout=60, enable=True)
+ for i in range(10):
+ try:
+ process = subprocess.run(
+ [
+ "curl",
+ "--noproxy",
+ "'*'",
+ "http://" + VPP_TAP_IP4 + ":80/sh/version",
+ ],
+ capture_output=True,
+ timeout=1,
+ )
+ except:
+ self.logger.info("connect timeout -- as expected")
+ else:
+ dump = self.vapi.session_sdl_v3_dump()
+ if len(dump) == 0:
+ self.sleep(1)
+ else:
+ break
+
+ for i in range(60):
+ dump = self.vapi.session_sdl_v3_dump()
+ if len(dump) != 1:
+ self.sleep(1)
+ self.assertEqual(len(dump), 1)
+ self.assertEqual(dump[0].rmt, IPv4Network(HOST_TAP_IP4 + "/32"))
+ self.logger.info("Test 6 passed")
+
+ acl_if_e.remove_vpp_config()
+ acl.remove_vpp_config()
+
+ # bring down the cli server
+ self.logger.info(self.vapi.cli("http cli server listener del"))
+
+ # delete namespace
+ self.vapi.app_namespace_add_del_v4(
+ namespace_id="1",
+ secret=1,
+ sw_if_index=self.tap0.sw_if_index,
+ is_add=0,
+ )
+
+ # Disable auto sdl -- quicker than waiting the entry to timeout
+ self.vapi.auto_sdl_config(enable=False)
+
+ # disable session sdl
+ self.vapi.session_enable_disable_v2(
+ rt_engine_type=VppEnum.vl_api_rt_backend_engine_t.RT_BACKEND_ENGINE_API_DISABLE
+ )
+
+ dump = self.vapi.session_sdl_v3_dump()
+ self.assertTrue(len(dump) == 0)
+
+ def test_auto_sdl_appns_ip6(self):
+ """Auto SDL with appns test -- ip6"""
+
+ # Test tap0 in appns 1
+ table_id = 1
+ tbl = VppIpTable(self, table_id, is_ip6=1)
+ tbl.add_vpp_config()
+
+ # place tap0 to vrf 1
+ self.vapi.sw_interface_set_table(
+ self.tap0.sw_if_index, is_ipv6=1, vrf_id=table_id
+ )
+
+ # place tap0 to appns 1
+ self.vapi.app_namespace_add_del_v4(
+ namespace_id="1", secret=1, sw_if_index=self.tap0.sw_if_index
+ )
+ # configure ip6 address on tap0
+ self.vapi.sw_interface_add_del_address(
+ sw_if_index=self.tap0.sw_if_index, prefix=VPP_TAP_IP6 + "/64"
+ )
+
+ # start http cli server in appns 1
+ self.logger.info(
+ self.vapi.cli("http cli server appns 1 secret 1 uri http://::0/80")
+ )
+
+ self.sleep(3)
+ process = subprocess.run(
+ [
+ "curl",
+ "-6",
+ "--noproxy",
+ "'*'",
+ "http://" + "[" + VPP_TAP_IP6 + "]" + ":80/sh/version",
+ ],
+ timeout=5,
+ capture_output=True,
+ )
+ self.assertEqual(0, process.returncode)
+
+ # Apply the ACL on the interface output
+ rule = AclRule(
+ is_permit=0,
+ proto=6,
+ src_prefix=IPv6Network(HOST_TAP_IP6 + "/128"),
+ dst_prefix=IPv6Network(VPP_TAP_IP6 + "/128"),
+ ports=80,
+ )
+ acl = VppAcl(self, rules=[rule])
+ acl.add_vpp_config()
+
+ acl_if_e = VppAclInterface(
+ self, sw_if_index=self.tap0.sw_if_index, n_input=0, acls=[acl]
+ )
+ acl_if_e.add_vpp_config()
+
+ self.vapi.auto_sdl_config(threshold=1, remove_timeout=60, enable=True)
+ for i in range(10):
+ try:
+ process = subprocess.run(
+ [
+ "curl",
+ "-6",
+ "--noproxy",
+ "'*'",
+ "http://" + "[" + VPP_TAP_IP6 + "]" + ":80/sh/version",
+ ],
+ capture_output=True,
+ timeout=1,
+ )
+ except:
+ self.logger.info("connect timeout -- as expected")
+ else:
+ dump = self.vapi.session_sdl_v3_dump()
+ if len(dump) == 0:
+ self.sleep(1)
+ else:
+ break
+
+ for i in range(60):
+ dump = self.vapi.session_sdl_v3_dump()
+ if len(dump) != 1:
+ self.sleep(1)
+ self.assertEqual(len(dump), 1)
+ self.assertEqual(dump[0].rmt, IPv6Network(HOST_TAP_IP6 + "/128"))
+ self.logger.info("Test 6 passed")
+
+ acl_if_e.remove_vpp_config()
+ acl.remove_vpp_config()
+
+ # bring down the cli server
+ self.logger.info(
+ self.vapi.cli("http cli server uri http://::0/80 listener del")
+ )
+
+ # delete namespace
+ self.vapi.app_namespace_add_del_v4(
+ namespace_id="1",
+ secret=1,
+ sw_if_index=self.tap0.sw_if_index,
+ is_add=0,
+ )
+
+ @unittest.skip("test disabled for auto sdl")
+ def test_auto_sdl_scale(self):
+ """Auto SDL scale test"""
+
+ # Test 4: Scale
+ # Send 250 packets from different sources. Should create 250 auto-SDL
+ # and SDL entries
+ # self.logger.info(self.vapi.cli("set interface ip address tap0 VPP_TAP_IP/24"))
+
+ self.vapi.sw_interface_add_del_address(
+ sw_if_index=self.tap0.sw_if_index, prefix=VPP_TAP_IP4 + "/24"
+ )
+
+ # start the cli server
+ self.logger.info("Starting cli sever")
+ self.logger.info(self.vapi.cli("http cli server"))
+
+ rule = AclRule(
+ is_permit=0,
+ proto=6,
+ src_prefix=IPv4Network("8.8.0.0/16"),
+ dst_prefix=IPv4Network(VPP_TAP_IP4 + "/32"),
+ ports=80,
+ )
+ acl = VppAcl(self, rules=[rule])
+ acl.add_vpp_config()
+
+ # Apply the ACL on the interface output
+ acl_if_e = VppAclInterface(
+ self, sw_if_index=self.tap0.sw_if_index, n_input=0, acls=[acl]
+ )
+ acl_if_e.add_vpp_config()
+
+ # set the remove_timeout to a large value. Otherwise, some entries may
+ # get timed out before we accumulate all of them for verification
+ self.vapi.auto_sdl_config(threshold=1, remove_timeout=300, enable=True)
+
+ for i in range(SCALE_COUNT):
+ prefix = "8.8.8.{0}".format(i + 3)
+ prefix_mask = "8.8.8.{0}/24".format(i + 3)
+ prefix_port = (prefix, 5000)
+ process = subprocess.run(
+ [
+ "ip",
+ "address",
+ "add",
+ prefix_mask,
+ "dev",
+ "tap0",
+ ],
+ capture_output=True,
+ )
+ self.assertEqual(process.returncode, 0)
+ for i in range(2):
+ try:
+ s = socket.create_connection(
+ (VPP_TAP_IP4, 80), timeout=0.5, source_address=prefix_port
+ )
+ except:
+ self.logger.info("connect timeout -- as exepcted")
+
+ # check for the SDL entry
+ for i in range(60):
+ dump = self.vapi.session_sdl_v3_dump()
+ if len(dump) != SCALE_COUNT:
+ self.sleep(1)
+
+ self.assertEqual(len(dump), SCALE_COUNT)
+ self.logger.info("Test 4 passed")
+
+ # Test 5: Disable auto-sdl
+ # It should clean up the Auto SDL and SDL entries immediately
+ self.logger.info(self.vapi.auto_sdl_config(enable=False))
+ dump = self.vapi.session_sdl_v3_dump()
+ self.assertEqual(len(dump), 0)
+
+ acl_if_e.remove_vpp_config()
+ acl.remove_vpp_config()
+ self.logger.info("Test 5 passed")
+
+ # bring down the cli server
+ self.vapi.cli("http cli server listener del")
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)
diff --git a/test/asf/test_session.py b/test/asf/test_session.py
index fe8da126195..79bbbd63b35 100644
--- a/test/asf/test_session.py
+++ b/test/asf/test_session.py
@@ -122,7 +122,6 @@ class TestSession(VppAsfTestCase):
ip_t10.remove_vpp_config()
-@tag_fixme_vpp_workers
class TestApplicationNamespace(VppAsfTestCase):
"""Application Namespacee"""
@@ -219,6 +218,160 @@ class TestApplicationNamespace(VppAsfTestCase):
)
self.assertEqual(rv.retval, -1)
+ def test_application_namespace_binding(self):
+ """Application Namespace Interface Binding"""
+
+ self.vapi.session_enable_disable_v2(
+ rt_engine_type=VppEnum.vl_api_rt_backend_engine_t.RT_BACKEND_ENGINE_API_RULE_TABLE
+ )
+
+ table_id = 99
+
+ # Bad ip4_fib_id
+ with self.vapi.assert_negative_api_retval():
+ rv = self.vapi.app_namespace_add_del_v4(
+ is_add=1, namespace_id="2", ip4_fib_id=table_id, ip6_fib_id=0
+ )
+ self.assertEqual(rv.retval, -19)
+
+ # Bad ip6_fib_id
+ with self.vapi.assert_negative_api_retval():
+ rv = self.vapi.app_namespace_add_del_v4(
+ is_add=1, namespace_id="2", ip4_fib_id=0, ip6_fib_id=table_id
+ )
+ self.assertEqual(rv.retval, -19)
+
+ tbl = VppIpTable(self, table_id)
+ tbl.add_vpp_config()
+
+ tbl6 = VppIpTable(self, table_id, is_ip6=1)
+ tbl6.add_vpp_config()
+
+ # Not expecting an error with valid table_id's
+ self.vapi.app_namespace_add_del_v4(
+ is_add=1, namespace_id="2", ip4_fib_id=table_id, ip6_fib_id=table_id
+ )
+ # delete
+ self.vapi.app_namespace_add_del_v4(
+ is_add=0, namespace_id="2", ip4_fib_id=table_id, ip6_fib_id=table_id
+ )
+
+ # ip4 only
+ self.vapi.app_namespace_add_del_v4(
+ is_add=1, namespace_id="2", ip4_fib_id=table_id, ip6_fib_id=0xFFFFFFFF
+ )
+ # delete
+ self.vapi.app_namespace_add_del_v4(
+ is_add=0, namespace_id="2", ip4_fib_id=table_id, ip6_fib_id=0xFFFFFFFF
+ )
+
+ # ip6 only
+ self.vapi.app_namespace_add_del_v4(
+ is_add=1, namespace_id="2", ip4_fib_id=0xFFFFFFFF, ip6_fib_id=table_id
+ )
+ # delete
+ self.vapi.app_namespace_add_del_v4(
+ is_add=0, namespace_id="2", ip4_fib_id=0xFFFFFFFF, ip6_fib_id=table_id
+ )
+
+ app0 = self.vapi.app_namespace_add_del_v4(
+ namespace_id="0", sw_if_index=self.loop0.sw_if_index, is_add=1
+ )
+ self.vapi.session_rule_add_del(
+ transport_proto=VppEnum.vl_api_transport_proto_t.TRANSPORT_PROTO_API_TCP,
+ lcl="172.100.1.1/32",
+ rmt="172.100.1.2/32",
+ lcl_port=5000,
+ rmt_port=5000,
+ action_index=1,
+ appns_index=app0.appns_index,
+ scope=VppEnum.vl_api_session_rule_scope_t.SESSION_RULE_SCOPE_API_GLOBAL,
+ is_add=1,
+ )
+ dump = self.vapi.session_rules_v2_dump()
+ self.assertEqual(len(dump[1].appns_index), 2)
+ self.assertEqual(dump[1].count, 2)
+
+ # move the interface to vrf 99
+ self.vapi.sw_interface_set_table(
+ sw_if_index=self.loop0.sw_if_index,
+ is_ipv6=0,
+ vrf_id=table_id,
+ )
+ dump = self.vapi.session_rules_v2_dump()
+ self.assertEqual(len(dump[1].appns_index), 1)
+ self.assertEqual(dump[1].count, 1)
+
+ self.vapi.session_rule_add_del(
+ transport_proto=VppEnum.vl_api_transport_proto_t.TRANSPORT_PROTO_API_TCP,
+ lcl="172.100.1.1/32",
+ rmt="172.100.1.2/32",
+ lcl_port=5000,
+ rmt_port=5000,
+ action_index=1,
+ appns_index=0,
+ scope=VppEnum.vl_api_session_rule_scope_t.SESSION_RULE_SCOPE_API_GLOBAL,
+ is_add=0,
+ )
+
+ # move the interface to vrf 0
+ self.vapi.sw_interface_set_table(
+ sw_if_index=self.loop0.sw_if_index,
+ is_ipv6=0,
+ vrf_id=0,
+ )
+
+ # try it with ip6
+ self.vapi.session_rule_add_del(
+ transport_proto=VppEnum.vl_api_transport_proto_t.TRANSPORT_PROTO_API_TCP,
+ lcl="2001::1/128",
+ rmt="2002::1/128",
+ lcl_port=5000,
+ rmt_port=5000,
+ action_index=1,
+ appns_index=app0.appns_index,
+ scope=VppEnum.vl_api_session_rule_scope_t.SESSION_RULE_SCOPE_API_GLOBAL,
+ is_add=1,
+ )
+ dump = self.vapi.session_rules_v2_dump()
+ self.assertEqual(len(dump[1].appns_index), 2)
+ self.assertEqual(dump[1].count, 2)
+
+ self.vapi.sw_interface_set_table(
+ sw_if_index=self.loop0.sw_if_index,
+ is_ipv6=1,
+ vrf_id=table_id,
+ )
+ dump = self.vapi.session_rules_v2_dump()
+ self.assertEqual(len(dump[1].appns_index), 2)
+ self.assertEqual(dump[1].count, 2)
+
+ # move back to 0
+ self.vapi.sw_interface_set_table(
+ sw_if_index=self.loop0.sw_if_index,
+ is_ipv6=1,
+ vrf_id=0,
+ )
+
+ self.vapi.session_rule_add_del(
+ transport_proto=VppEnum.vl_api_transport_proto_t.TRANSPORT_PROTO_API_TCP,
+ lcl="2001::1/128",
+ rmt="2002::1/128",
+ lcl_port=5000,
+ rmt_port=5000,
+ action_index=1,
+ appns_index=0,
+ scope=VppEnum.vl_api_session_rule_scope_t.SESSION_RULE_SCOPE_API_GLOBAL,
+ is_add=0,
+ )
+
+ self.vapi.app_namespace_add_del_v4(
+ namespace_id="0", sw_if_index=self.loop0.sw_if_index, is_add=0
+ )
+
+ tbl.remove_vpp_config()
+ tbl6.remove_vpp_config()
+
@tag_fixme_vpp_workers
class TestSessionUnitTests(VppAsfTestCase):
diff --git a/test/asf/test_session_sdl.py b/test/asf/test_session_sdl.py
index 952ad10bb79..53301f7bd6c 100644
--- a/test/asf/test_session_sdl.py
+++ b/test/asf/test_session_sdl.py
@@ -234,7 +234,7 @@ class TestSessionSDL(VppTestCase):
self.create_rule(rmt=self.loop1.local_ip6 + "/128", action_index=0, tag="")
)
self.apply_rules(rules, is_add=1, appns_index=0)
- filter = self.vapi.session_sdl_v2_dump()
+ filter = self.vapi.session_sdl_v3_dump()
self.assertEqual(filter[0].rmt, IPv6Network(self.loop1.local_ip6 + "/128"))
error = self.vapi.cli_return_response(client_cmd)
diff --git a/test/asf/test_vcl.py b/test/asf/test_vcl.py
index 124ea14089b..143b46c22ee 100644
--- a/test/asf/test_vcl.py
+++ b/test/asf/test_vcl.py
@@ -7,7 +7,7 @@ import subprocess
import signal
import glob
from config import config
-from asfframework import VppAsfTestCase, VppTestRunner, Worker, tag_fixme_ubuntu2404
+from asfframework import VppAsfTestCase, VppTestRunner, Worker
from vpp_ip_route import VppIpTable, VppIpRoute, VppRoutePath
iperf3 = "/usr/bin/iperf3"
@@ -311,7 +311,6 @@ class VCLTestCase(VppAsfTestCase):
self.assert_equal(worker_client.result, 0, "Binary test return code")
-@tag_fixme_ubuntu2404
class LDPCutThruTestCase(VCLTestCase):
"""LDP Cut Thru Tests"""
@@ -1024,7 +1023,6 @@ class VCLThruHostStackNsock(VCLTestCase):
)
-@tag_fixme_ubuntu2404
class LDPThruHostStackIperf(VCLTestCase):
"""LDP Thru Host Stack Iperf"""
@@ -1072,7 +1070,6 @@ class LDPThruHostStackIperf(VCLTestCase):
)
-@tag_fixme_ubuntu2404
class LDPThruHostStackIperfUdp(VCLTestCase):
"""LDP Thru Host Stack Iperf UDP"""
@@ -1118,7 +1115,6 @@ class LDPThruHostStackIperfUdp(VCLTestCase):
)
-@tag_fixme_ubuntu2404
class LDPIpv6CutThruTestCase(VCLTestCase):
"""LDP IPv6 Cut Thru Tests"""
diff --git a/test/test_bpf_trace_filter.py b/test/test_bpf_trace_filter.py
index 6958caa6b37..d90088a5412 100644
--- a/test/test_bpf_trace_filter.py
+++ b/test/test_bpf_trace_filter.py
@@ -1,5 +1,7 @@
from framework import VppTestCase
from asfframework import VppTestRunner
+from vpp_papi_provider import CliFailedCommandError
+import os
import unittest
from config import config
from scapy.layers.l2 import Ether
@@ -77,6 +79,17 @@ class TestBpfTraceFilter(VppTestCase):
"Unexpected packets in the trace buffer",
)
+ # verify that bpf trace filter has been selected for pcap
+ self.vapi.cli("set pcap filter function bpf_trace_filter")
+ self.vapi.cli("pcap trace rx tx max 1000 intfc any filter")
+
+ # verify that packets are filtered in not captured in pcap
+ self.pg0.add_stream(packets)
+ self.pg_start()
+
+ with self.assertRaises(CliFailedCommandError):
+ self.vapi.cli("pcap trace rx tx off")
+
reply = self.vapi.cli("show bpf trace filter")
self.assertIn("(000)", reply, "Unexpected bpf filter dump")
@@ -103,6 +116,29 @@ class TestBpfTraceFilter(VppTestCase):
"Unexpected packets in the trace buffer",
)
+ # verify that bpf trace filter has been selected for pcap
+ self.vapi.pcap_set_filter_function(filter_function_name="bpf_trace_filter")
+ reply = self.vapi.cli("show pcap filter function")
+ self.assertIn(
+ "(*) name:bpf_trace_filter",
+ reply,
+ "BPF Trace filter is not selected for Pcap",
+ )
+
+ # verify that packets are filtered in not captured in pcap
+ self.vapi.pcap_trace_on(
+ capture_rx=True,
+ capture_tx=True,
+ max_packets=1000,
+ filter=True,
+ sw_if_index=0,
+ filename="bpf_trace.pcap",
+ )
+ self.pg0.add_stream(packets)
+ self.pg_start()
+ with self.vapi.assert_negative_api_retval():
+ self.vapi.pcap_trace_off()
+
def test_bpf_trace_filter_vapi_v2(self):
"""BPF Trace filter test [VAPI v2]"""
self.vapi.bpf_trace_filter_set_v2(filter="tcp or dst port 5678")
@@ -131,6 +167,30 @@ class TestBpfTraceFilter(VppTestCase):
"Unexpected packets in the trace buffer",
)
+ # verify that bpf trace filter has been selected for pcap
+ self.vapi.pcap_set_filter_function(filter_function_name="bpf_trace_filter")
+ reply = self.vapi.cli("show pcap filter function")
+ self.assertIn(
+ "(*) name:bpf_trace_filter",
+ reply,
+ "BPF Trace filter is not selected for Pcap",
+ )
+
+ # verify that packets are captured in pcap with filter
+ self.vapi.pcap_trace_on(
+ capture_rx=True,
+ capture_tx=True,
+ max_packets=1000,
+ filter=True,
+ sw_if_index=0,
+ filename="bpf_trace.pcap",
+ )
+ self.pg0.add_stream(packets)
+ self.pg_start()
+ self.vapi.pcap_trace_off()
+ self.assertTrue(os.path.exists("/tmp/bpf_trace.pcap"))
+ os.remove("/tmp/bpf_trace.pcap")
+
if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)
diff --git a/test/test_cnat.py b/test/test_cnat.py
index 9e979a4e09e..8d8f3210577 100644
--- a/test/test_cnat.py
+++ b/test/test_cnat.py
@@ -11,9 +11,10 @@ from config import config
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, TCP, ICMP
-from scapy.layers.inet import IPerror, TCPerror, UDPerror
+from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach
from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
+from scapy.layers.inet6 import ICMPv6TimeExceeded
from ipaddress import ip_network
@@ -760,12 +761,14 @@ class TestCNatSourceNAT(CnatCommonTestCase):
self.sourcenat_test_tcp_udp_conf(TCP, is_v6=True)
self.sourcenat_test_tcp_udp_conf(UDP, is_v6=True)
self.sourcenat_test_icmp_echo_conf(is_v6=True)
+ self.sourcenat_test_icmp_traceroute_conf(is_v6=True)
def test_snat_v4(self):
# """ CNat Source Nat v4 """
self.sourcenat_test_tcp_udp_conf(TCP)
self.sourcenat_test_tcp_udp_conf(UDP)
self.sourcenat_test_icmp_echo_conf()
+ self.sourcenat_test_icmp_traceroute_conf()
def sourcenat_test_icmp_echo_conf(self, is_v6=False):
ctx = CnatTestContext(self, ICMP, is_v6=is_v6)
@@ -774,6 +777,125 @@ class TestCNatSourceNAT(CnatCommonTestCase):
ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 8)
ctx.cnat_send_return().cnat_expect_return()
+ def sourcenat_test_icmp_traceroute_conf(self, is_v6=False):
+ # IPv4 ICMP
+ if not is_v6:
+ # Create an ICMP traceroute packet with TTL set to 1.
+ # The CNAT translates the packet, but the NATted packet is dropped
+ # due to the TTL of 1. An ICMP Time Exceeded message is sent
+ # to the source (which is the NATted address).
+ # The packet will be translated once more to the original
+ # source IP address.
+
+ icmp = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IP(
+ ttl=1,
+ src=self.pg0.remote_hosts[0].ip4,
+ dst=self.pg1.remote_hosts[0].ip4,
+ )
+ / ICMP(id=0xFEED, type=8) # ICMP Type Echo Request
+ / Raw()
+ )
+
+ self.rxs = self.send_and_expect(self.pg0, icmp, self.pg0)
+
+ for rx in self.rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(rx[IP].dst, self.pg0.remote_hosts[0].ip4)
+ self.assertEqual(rx[IP].src, "172.16.1.1")
+ self.assertEqual(rx[ICMP].type, 11) # ICMP Type 11 (Time Exceeded)
+ self.assertEqual(
+ rx[ICMP].code, 0
+ ) # ICMP Code 0 (TTL Zero During Transit)
+ inner = rx[ICMP].payload
+ self.assertEqual(inner[IPerror].src, self.pg0.remote_hosts[0].ip4)
+ self.assertEqual(inner[IPerror].dst, self.pg1.remote_hosts[0].ip4)
+ self.assertEqual(inner[ICMPerror].type, 8) # ICMP Echo Request
+ self.assertEqual(inner[ICMPerror].id, 0xFEED)
+
+ # source ---> NATted Transit ---> Transit 2 ... ---> Transit N ---> Destination
+ # Simulate an ICMP Time Exceeded message arriving at the NATted Transit
+ # from the Transit N-2 node. This occurs because the NATted packet
+ # is dropped due to a TTL of 1.
+ # An ICMP Time Exceeded message is sent back to the source
+ # (initially the NATted address). The CNAT then translates the message
+ # back to the original source IP address.
+
+ # For ICMP based traffic, snat session uses identifier for session key.
+ # snat allocates a new identifier. To hit the snat session from Transit N-2
+ # to NATed Transit, packet should use snat allocated identifier. To get the
+ # snat allocated identifier, echo request will be sent and captured at the
+ # destination, taken out the identifier from the packet and use it to set
+ # the identifier in the ICMP time exceed packet
+ icmp[IP].ttl = 64
+ rxs = self.send_and_expect(self.pg0, icmp, self.pg1)
+
+ icmp_error = (
+ Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
+ / IP(src="172.16.1.1", dst=self.pg2.remote_hosts[0].ip4)
+ / ICMP(type=11, code=0)
+ / IPerror(
+ src=self.pg2.remote_hosts[0].ip4, dst=self.pg1.remote_hosts[0].ip4
+ )
+ / ICMPerror(id=rxs[0][ICMP].id, type=8)
+ / Raw()
+ )
+
+ self.rxs = self.send_and_expect(self.pg1, icmp_error, self.pg0)
+ for rx in self.rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(rx[IP].dst, self.pg0.remote_hosts[0].ip4)
+ self.assertEqual(rx[IP].src, "172.16.1.1")
+ self.assertEqual(rx[ICMP].type, 11) # ICMP Type 11 (Time Exceeded)
+ self.assertEqual(
+ rx[ICMP].code, 0
+ ) # ICMP Code 0 (TTL Zero During Transit)
+ inner = rx[ICMP].payload
+ self.assertEqual(inner[IPerror].src, self.pg0.remote_hosts[0].ip4)
+ self.assertEqual(inner[IPerror].dst, self.pg1.remote_hosts[0].ip4)
+ self.assertEqual(inner[ICMPerror].type, 8) # ICMP Echo Request
+ self.assertEqual(inner[ICMPerror].id, 0xFEED)
+
+ # IPv6 ICMPv6
+ if is_v6:
+
+ # Create an ICMPv6 traceroute packet with Hop Limit set to 1.
+ # The CNAT translates the packet, but the NATted packet is dropped
+ # due to the Hop Limit of 1. An ICMPv6 Time Exceeded message is sent
+ # back to the source (which is the NATted address).
+ # The CNAT translates the message once more to restore
+ # the original source IPv6 address.
+ icmp6 = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IPv6(
+ hlim=1,
+ src=self.pg0.remote_hosts[0].ip6,
+ dst=self.pg1.remote_hosts[0].ip6,
+ )
+ / ICMPv6EchoRequest(id=0xFEED)
+ / Raw()
+ )
+ self.rxs = self.send_and_expect(self.pg0, icmp6, self.pg0)
+
+ for rx in self.rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(rx[IPv6].dst, self.pg0.remote_hosts[0].ip6)
+ self.assertEqual(rx[IPv6].src, "fd01:1::1")
+ self.assertEqual(
+ rx[ICMPv6TimeExceeded].type, 3
+ ) # ICMPv6 Type 3 (Time Exceeded)
+ self.assertEqual(
+ rx[ICMPv6TimeExceeded].code, 0
+ ) # ICMPv6 Code 0 (TTL Zero During Transit)
+ inner = rx[ICMPv6TimeExceeded].payload
+ self.assertEqual(inner[IPerror6].src, self.pg0.remote_hosts[0].ip6)
+ self.assertEqual(inner[IPerror6].dst, self.pg1.remote_hosts[0].ip6)
+ self.assertEqual(
+ inner[ICMPv6EchoRequest].type, 128
+ ) # ICMPv6 Echo Request
+ self.assertEqual(inner[ICMPv6EchoRequest].id, 0xFEED)
+
def sourcenat_test_tcp_udp_conf(self, L4PROTO, is_v6=False):
ctx = CnatTestContext(self, L4PROTO, is_v6)
# we should source NAT
@@ -823,7 +945,7 @@ class TestCNatSourceNAT(CnatCommonTestCase):
@unittest.skipIf("cnat" in config.excluded_plugins, "Exclude CNAT plugin tests")
class TestCNatDHCP(CnatCommonTestCase):
- """CNat Translation"""
+ """CNat DHCP"""
@classmethod
def setUpClass(cls):
diff --git a/test/test_dhcp.py b/test/test_dhcp.py
index 15af323454f..f93bbcea165 100644
--- a/test/test_dhcp.py
+++ b/test/test_dhcp.py
@@ -2,7 +2,6 @@
import unittest
import socket
-import six
from framework import VppTestCase
from asfframework import VppTestRunner, tag_run_solo
@@ -140,12 +139,12 @@ class TestDHCP(VppTestCase):
# The ID space is VPP internal - so no matching value
# scapy
#
- self.assertEqual(six.byte2int(data[0:1]), 1)
- self.assertEqual(six.byte2int(data[1:2]), 4)
- self.assertEqual(six.byte2int(data[2:3]), 0)
- self.assertEqual(six.byte2int(data[3:4]), 0)
- self.assertEqual(six.byte2int(data[4:5]), 0)
- self.assertEqual(six.byte2int(data[5:6]), intf._sw_if_index)
+ self.assertEqual(data[0], 1)
+ self.assertEqual(data[1], 4)
+ self.assertEqual(data[2], 0)
+ self.assertEqual(data[3], 0)
+ self.assertEqual(data[4], 0)
+ self.assertEqual(data[5], intf._sw_if_index)
#
# next sub-option is the IP address of the client side
@@ -154,8 +153,8 @@ class TestDHCP(VppTestCase):
#
claddr = socket.inet_pton(AF_INET, ip_addr)
- self.assertEqual(six.byte2int(data[6:7]), 5)
- self.assertEqual(six.byte2int(data[7:8]), 4)
+ self.assertEqual(data[6], 5)
+ self.assertEqual(data[7], 4)
self.assertEqual(data[8], claddr[0])
self.assertEqual(data[9], claddr[1])
self.assertEqual(data[10], claddr[2])
@@ -165,37 +164,33 @@ class TestDHCP(VppTestCase):
# sub-option 151 encodes vss_type 1,
# the 3 byte oui and the 4 byte fib_id
self.assertEqual(id_len, 0)
- self.assertEqual(six.byte2int(data[12:13]), 151)
- self.assertEqual(six.byte2int(data[13:14]), 8)
- self.assertEqual(six.byte2int(data[14:15]), 1)
- self.assertEqual(six.byte2int(data[15:16]), 0)
- self.assertEqual(six.byte2int(data[16:17]), 0)
- self.assertEqual(six.byte2int(data[17:18]), oui)
- self.assertEqual(six.byte2int(data[18:19]), 0)
- self.assertEqual(six.byte2int(data[19:20]), 0)
- self.assertEqual(six.byte2int(data[20:21]), 0)
- self.assertEqual(six.byte2int(data[21:22]), fib_id)
+ self.assertEqual(data[12], 151)
+ self.assertEqual(data[13], 8)
+ self.assertEqual(data[14], 1)
+ self.assertEqual(data[15], 0)
+ self.assertEqual(data[16], 0)
+ self.assertEqual(data[17], oui)
+ self.assertEqual(data[18], 0)
+ self.assertEqual(data[19], 0)
+ self.assertEqual(data[20], 0)
+ self.assertEqual(data[21], fib_id)
# VSS control sub-option
- self.assertEqual(six.byte2int(data[22:23]), 152)
- self.assertEqual(six.byte2int(data[23:24]), 0)
+ self.assertEqual(data[22], 152)
+ self.assertEqual(data[23], 0)
if id_len > 0:
# sub-option 151 encode vss_type of 0
# followerd by vpn_id in ascii
self.assertEqual(oui, 0)
- self.assertEqual(six.byte2int(data[12:13]), 151)
- self.assertEqual(six.byte2int(data[13:14]), id_len + 1)
- self.assertEqual(six.byte2int(data[14:15]), 0)
+ self.assertEqual(data[12], 151)
+ self.assertEqual(data[13], id_len + 1)
+ self.assertEqual(data[14], 0)
self.assertEqual(data[15 : 15 + id_len].decode("ascii"), vpn_id)
# VSS control sub-option
- self.assertEqual(
- six.byte2int(data[15 + len(vpn_id) : 16 + len(vpn_id)]), 152
- )
- self.assertEqual(
- six.byte2int(data[16 + len(vpn_id) : 17 + len(vpn_id)]), 0
- )
+ self.assertEqual(data[15 + len(vpn_id)], 152)
+ self.assertEqual(data[16 + len(vpn_id)], 0)
found = 1
self.assertTrue(found)
@@ -381,13 +376,13 @@ class TestDHCP(VppTestCase):
self.assertEqual(vss.type, 1)
# the OUI and FIB-id are really 3 and 4 bytes resp.
# but the tested range is small
- self.assertEqual(six.byte2int(vss.data[0:1]), 0)
- self.assertEqual(six.byte2int(vss.data[1:2]), 0)
- self.assertEqual(six.byte2int(vss.data[2:3]), oui)
- self.assertEqual(six.byte2int(vss.data[3:4]), 0)
- self.assertEqual(six.byte2int(vss.data[4:5]), 0)
- self.assertEqual(six.byte2int(vss.data[5:6]), 0)
- self.assertEqual(six.byte2int(vss.data[6:7]), fib_id)
+ self.assertEqual(vss.data[0], 0)
+ self.assertEqual(vss.data[1], 0)
+ self.assertEqual(vss.data[2], oui)
+ self.assertEqual(vss.data[3], 0)
+ self.assertEqual(vss.data[4], 0)
+ self.assertEqual(vss.data[5], 0)
+ self.assertEqual(vss.data[6], fib_id)
if id_len > 0:
self.assertEqual(oui, 0)
diff --git a/test/test_flowprobe.py b/test/test_flowprobe.py
index 18a2e3a36c2..bf77f154907 100644
--- a/test/test_flowprobe.py
+++ b/test/test_flowprobe.py
@@ -18,7 +18,6 @@ from asfframework import (
tag_fixme_vpp_workers,
tag_fixme_debian11,
tag_run_solo,
- is_distro_ubuntu2204,
is_distro_debian11,
VppTestRunner,
)
diff --git a/test/test_ikev2.py b/test/test_ikev2.py
index 4bff829c51b..51f1405ffbe 100644
--- a/test/test_ikev2.py
+++ b/test/test_ikev2.py
@@ -22,8 +22,6 @@ from scapy.packet import raw, Raw
from scapy.utils import long_converter
from framework import VppTestCase
from asfframework import (
- tag_fixme_vpp_workers,
- tag_fixme_ubuntu2404,
VppTestRunner,
)
from vpp_ikev2 import Profile, IDType, AuthMethod
@@ -2322,7 +2320,6 @@ class TestResponderRekey(TestResponderPsk):
self.assertEqual(r[0].sa.stats.n_rekey_req, 1)
-@tag_fixme_ubuntu2404
class TestResponderRekeyRepeat(TestResponderRekey):
"""test ikev2 responder - rekey repeat"""
@@ -2330,6 +2327,22 @@ class TestResponderRekeyRepeat(TestResponderRekey):
def test_responder(self):
super(TestResponderRekeyRepeat, self).test_responder()
+
+ # The sleep interval for this test is set to 0.1 seconds instead of the default 2 seconds.
+ # This change is necessary because the test verifies the expiration of old IPsec SAs
+ # (self.fail("old IPsec SA not expired")) within a strict timeframe. A longer sleep
+ # interval, such as 2 seconds, would significantly delay the loop iterations, reducing
+ # the granularity of checks for SA expiration and increasing the risk of false failures.
+ #
+ # By setting the sleep interval to 0.1 seconds:
+ # - The test can perform frequent checks for the status of IPsec SAs, ensuring timely
+ # detection of their expiration.
+ # - It reduces the likelihood of the test prematurely failing due to missing an SA
+ # expiration event caused by coarse-grained timing checks.
+ #
+ # This adjustment enhances test stability and ensures accurate validation of the
+ # expiration behavior under the conditions specified by the test.
+ self.vapi.ikev2_plugin_set_sleep_interval(timeout=0.1)
# rekey request is not accepted until old IPsec SA is expired
capture = self.send_rekey_from_initiator()
ih = self.get_ike_header(capture[0])
@@ -2357,7 +2370,6 @@ class TestResponderRekeyKEX(TestResponderRekey):
vpp_worker_count = 2
-@tag_fixme_ubuntu2404
class TestResponderRekeyRepeatKEX(TestResponderRekeyRepeat):
"""test ikev2 responder - rekey repeat with key exchange"""
diff --git a/test/test_ip4.py b/test/test_ip4.py
index 150ea629308..9d5db0d38cb 100644
--- a/test/test_ip4.py
+++ b/test/test_ip4.py
@@ -9,7 +9,6 @@ from scapy.layers.inet import IP, UDP, TCP, ICMP, icmptypes, icmpcodes
from scapy.layers.inet6 import IPv6
from scapy.layers.l2 import Ether, Dot1Q, ARP
from scapy.packet import Raw
-from six import moves
from framework import VppTestCase
from asfframework import VppTestRunner, tag_fixme_vpp_workers
@@ -142,13 +141,11 @@ class TestIPv4(VppTestCase):
pkts = [
self.modify_packet(src_if, i, pkt_tmpl)
- for i in moves.range(
- self.pg_if_packet_sizes[0], self.pg_if_packet_sizes[1], 10
- )
+ for i in range(self.pg_if_packet_sizes[0], self.pg_if_packet_sizes[1], 10)
]
pkts_b = [
self.modify_packet(src_if, i, pkt_tmpl)
- for i in moves.range(
+ for i in range(
self.pg_if_packet_sizes[1] + hdr_ext,
self.pg_if_packet_sizes[2] + hdr_ext,
50,
diff --git a/test/test_ip6.py b/test/test_ip6.py
index 25f2c623a0b..66e94f7f302 100644
--- a/test/test_ip6.py
+++ b/test/test_ip6.py
@@ -34,7 +34,6 @@ from scapy.utils6 import (
in6_ptop,
in6_islladdr,
)
-from six import moves
from framework import VppTestCase
from asfframework import VppTestRunner, tag_run_solo
@@ -319,13 +318,11 @@ class TestIPv6(TestIPv6ND):
pkts = [
self.modify_packet(src_if, i, pkt_tmpl)
- for i in moves.range(
- self.pg_if_packet_sizes[0], self.pg_if_packet_sizes[1], 10
- )
+ for i in range(self.pg_if_packet_sizes[0], self.pg_if_packet_sizes[1], 10)
]
pkts_b = [
self.modify_packet(src_if, i, pkt_tmpl)
- for i in moves.range(
+ for i in range(
self.pg_if_packet_sizes[1] + hdr_ext,
self.pg_if_packet_sizes[2] + hdr_ext,
50,
diff --git a/test/test_ipsec_api.py b/test/test_ipsec_api.py
index 7208d2887b5..158cb6b9df5 100644
--- a/test/test_ipsec_api.py
+++ b/test/test_ipsec_api.py
@@ -4,6 +4,7 @@ from framework import VppTestCase
from asfframework import VppTestRunner
from template_ipsec import IPsecIPv4Params
from vpp_papi import VppEnum
+from ipaddress import IPv4Address
from vpp_ipsec import VppIpsecSA
@@ -120,20 +121,15 @@ class IpsecApiTestCase(VppTestCase):
)
self.vapi.ipsec_select_backend(protocol=self.vpp_ah_protocol, index=0)
- def __check_sa_binding(self, sa_id, thread_index):
- found_sa = False
+ def __sa_dump(self, sa):
sa_dumps = self.vapi.ipsec_sa_v5_dump()
for dump in sa_dumps:
- if dump.entry.sad_id == sa_id:
- self.assertEqual(dump.thread_index, thread_index)
- found_sa = True
- break
+ if dump.entry.sad_id == sa.id:
+ return dump
+ self.fail("SA not found in VPP")
- if not found_sa:
- self.fail("SA not found in VPP")
-
- def test_sa_worker_bind(self):
- """Bind an SA to a worker"""
+ def test_sa_basic(self):
+ """basic SA API tests"""
sa = VppIpsecSA(
self,
self.ipv4_params.scapy_tun_sa_id,
@@ -143,14 +139,51 @@ class IpsecApiTestCase(VppTestCase):
self.ipv4_params.crypt_algo_vpp_id,
self.ipv4_params.crypt_key,
VppEnum.vl_api_ipsec_proto_t.IPSEC_API_PROTO_ESP,
+ flags=VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY
+ | VppEnum.vl_api_ipsec_sad_flags_t.IPSEC_API_SAD_FLAG_IS_INBOUND,
)
sa.add_vpp_config()
- self.__check_sa_binding(sa.id, 0xFFFF)
-
+ # check general SA dump
+ dump = self.__sa_dump(sa)
+ self.assertEqual(dump.entry.sad_id, sa.id)
+ self.assertEqual(dump.entry.spi, sa.spi)
+ self.assertEqual(dump.entry.protocol, sa.proto)
+ self.assertEqual(dump.entry.crypto_algorithm, sa.crypto_alg)
+ self.assertEqual(
+ dump.entry.crypto_key.data[: dump.entry.crypto_key.length], sa.crypto_key
+ )
+ self.assertEqual(dump.entry.integrity_algorithm, sa.integ_alg)
+ self.assertEqual(
+ dump.entry.integrity_key.data[: dump.entry.integrity_key.length],
+ sa.integ_key,
+ )
+ self.assertEqual(dump.entry.flags, sa.flags)
+ self.assertEqual(dump.entry.tunnel.instance, 0)
+ self.assertEqual(dump.entry.tunnel.src, IPv4Address("0.0.0.0"))
+ self.assertEqual(dump.entry.tunnel.dst, IPv4Address("0.0.0.0"))
+ self.assertEqual(dump.entry.tunnel.sw_if_index, 0)
+ self.assertEqual(dump.entry.tunnel.table_id, sa.table_id)
+ self.assertEqual(dump.entry.tunnel.encap_decap_flags, sa.tun_flags)
+ self.assertEqual(dump.entry.tunnel.mode, 0)
+ self.assertEqual(dump.entry.tunnel.flags, 0)
+ self.assertEqual(dump.entry.tunnel.dscp, 0)
+ self.assertEqual(dump.entry.tunnel.hop_limit, 0)
+ self.assertEqual(dump.entry.salt, 0)
+ self.assertEqual(dump.entry.udp_src_port, 0)
+ self.assertEqual(dump.entry.udp_dst_port, 0)
+ self.assertEqual(dump.entry.anti_replay_window_size, 64)
+ self.assertEqual(dump.sw_if_index, 0xFFFFFFFF)
+ self.assertEqual(dump.seq_outbound, 0)
+ self.assertEqual(dump.last_seq_inbound, 0)
+ self.assertEqual(dump.replay_window, 0xFFFFFFFFFFFFFFFF)
+ self.assertEqual(dump.thread_index, 0xFFFF)
+ self.assertEqual(dump.stat_index, 0)
+
+ # check SA binding API
self.vapi.ipsec_sad_bind(sa_id=sa.id, worker=1)
-
- self.__check_sa_binding(sa.id, 2)
+ dump = self.__sa_dump(sa)
+ self.assertEqual(dump.thread_index, 2)
sa.remove_vpp_config()
diff --git a/test/test_ipsec_spd_fp_input.py b/test/test_ipsec_spd_fp_input.py
index eb04df49244..1953bbe5eaf 100644
--- a/test/test_ipsec_spd_fp_input.py
+++ b/test/test_ipsec_spd_fp_input.py
@@ -9,6 +9,7 @@ from template_ipsec import IPSecIPv6Fwd
from test_ipsec_esp import TemplateIpsecEsp
from template_ipsec import SpdFastPathTemplate
from config import config
+import pdb
def debug_signal_handler(signal, frame):
@@ -888,5 +889,50 @@ class IPSec6SpdTestCaseProtect(SpdFastPathIPv6InboundProtect):
self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
+class IPSec6SpdTestCaseTunProtect(SpdFastPathIPv6InboundProtect):
+ """IPSec/IPv6 inbound: Policy mode test case with fast path"""
+
+ # In this test sa_in defines a tunnel. Matching should be
+ # done based on the sa tunnel header.
+
+ @classmethod
+ def setUpClass(cls):
+ super(IPSec6SpdTestCaseTunProtect, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(IPSec6SpdTestCaseTunProtect, cls).tearDownClass()
+
+ def setUp(self):
+ super(IPSec6SpdTestCaseTunProtect, self).setUp()
+
+ def tearDown(self):
+ super(IPSec6SpdTestCaseTunProtect, self).tearDown()
+
+ def test_ipsec6_spd_inbound_tun_protect(self):
+ pkt_count = 5
+ payload_size = 64
+ p = self.params[socket.AF_INET6]
+ send_pkts = self.gen_encrypt_pkts6(
+ p,
+ p.scapy_tun_sa,
+ self.tun_if,
+ src=p.remote_tun_if_host,
+ dst=self.pg1.remote_ip6,
+ count=pkt_count,
+ payload_size=payload_size,
+ )
+ recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
+ self.logger.info(self.vapi.ppcli("show error"))
+ self.logger.info(self.vapi.ppcli("show ipsec all"))
+ pkts = p.tun_sa_in.get_stats()["packets"]
+ self.assertEqual(
+ pkts,
+ pkt_count,
+ "incorrect SA in counts: expected %d != %d" % (pkt_count, pkts),
+ )
+ self.assertEqual(p.tun_sa_in.get_err("lost"), 0)
+
+
if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)
diff --git a/test/test_nat44_ed.py b/test/test_nat44_ed.py
index 83629bb185b..233881424d4 100644
--- a/test/test_nat44_ed.py
+++ b/test/test_nat44_ed.py
@@ -7,7 +7,7 @@ from random import randint, choice
import re
import scapy.compat
from framework import VppTestCase, VppLoInterface
-from asfframework import VppTestRunner, is_distro_ubuntu2204
+from asfframework import VppTestRunner
from scapy.data import IP_PROTOS
from scapy.layers.inet import IP, TCP, UDP, ICMP, GRE
from scapy.layers.inet import IPerror, TCPerror
diff --git a/test/test_nat64.py b/test/test_nat64.py
index 0ddce3d23ec..0eb57ee9cce 100644
--- a/test/test_nat64.py
+++ b/test/test_nat64.py
@@ -12,7 +12,6 @@ from config import config
from framework import VppTestCase
from asfframework import (
tag_fixme_vpp_workers,
- is_distro_ubuntu2204,
VppTestRunner,
)
from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
diff --git a/test/test_neighbor.py b/test/test_neighbor.py
index d11d4abac14..731005b5d65 100644
--- a/test/test_neighbor.py
+++ b/test/test_neighbor.py
@@ -3,8 +3,9 @@
import unittest
import os
+from config import config
from framework import VppTestCase
-from asfframework import VppTestRunner, tag_fixme_vpp_workers, tag_fixme_ubuntu2204
+from asfframework import VppTestRunner, tag_fixme_vpp_workers
from vpp_neighbor import VppNeighbor, find_nbr
from vpp_ip_route import (
VppIpRoute,
@@ -2330,7 +2331,7 @@ class NeighborStatsTestCase(VppTestCase):
self.assertEqual(NUM_PKTS + 16, nd1.get_stats()["packets"])
-@tag_fixme_ubuntu2204
+@unittest.skipUnless(config.extended, "part of extended tests")
class NeighborAgeTestCase(VppTestCase):
"""ARP/ND Aging"""
diff --git a/test/test_pcap.py b/test/test_pcap.py
index b73a601bcc8..94418081ace 100644
--- a/test/test_pcap.py
+++ b/test/test_pcap.py
@@ -75,11 +75,13 @@ class TestPcap(VppTestCase):
"pa en",
"pcap dispatch trace off",
"pcap trace rx tx max 1000 intfc any",
+ "pcap trace status",
"pa en",
"pcap trace status",
"pcap trace rx tx off",
"classify filter pcap mask l3 ip4 src match l3 ip4 src 11.22.33.44",
"pcap trace rx tx max 1000 intfc any file filt.pcap filter",
+ "pcap trace status",
"show cla t verbose 2",
"show cla t verbose",
"show cla t",
@@ -123,6 +125,21 @@ class TestPcap(VppTestCase):
self.pg_send(self.pg0, pkt * 10)
self.vapi.pcap_trace_off()
+ # Launching trace with filter & no classifier table specified
+ # should result in no packet capture
+ self.vapi.pcap_trace_on(
+ capture_rx=True,
+ capture_tx=True,
+ max_packets=1000,
+ filter=True,
+ sw_if_index=0,
+ filename="trace_any_invalid_filter.pcap",
+ )
+ self.pg_send(self.pg0, pkt * 10)
+ with self.vapi.assert_negative_api_retval():
+ self.vapi.pcap_trace_off()
+ self.assertFalse(os.path.exists("/tmp/trace_any_invalid_filter.pcap"))
+
self.vapi.cli(
f"classify filter pcap mask l3 ip4 src match l3 ip4 src {self.pg0.local_ip4}"
)
@@ -163,6 +180,16 @@ class TestPcap(VppTestCase):
os.remove("/tmp/trace_any_filter.pcap")
os.remove("/tmp/trace_drop_err.pcap")
+ # Attempting to start a trace with no filename should return an error
+ with self.vapi.assert_negative_api_retval():
+ self.vapi.pcap_trace_on(
+ capture_rx=True,
+ capture_tx=True,
+ filter=True,
+ max_packets=1000,
+ sw_if_index=0,
+ )
+
if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)
diff --git a/test/test_pg_stream.py b/test/test_pg_stream.py
new file mode 100644
index 00000000000..471c85c43f0
--- /dev/null
+++ b/test/test_pg_stream.py
@@ -0,0 +1,241 @@
+#!/usr/bin/env python3
+
+import unittest
+import time
+import re
+
+from scapy.packet import Raw
+from scapy.layers.l2 import Ether
+from scapy.layers.inet import IP, UDP
+from scapy.layers.inet6 import IPv6
+
+from framework import VppTestCase
+from asfframework import VppTestRunner
+
+
+class TestPgStream(VppTestCase):
+ """PG Stream Test Case"""
+
+ def __init__(self, *args):
+ VppTestCase.__init__(self, *args)
+
+ def setUp(self):
+ super(TestPgStream, self).setUp()
+
+ # Create 3 pg interfaces - one each for ethernet, IPv4, and IPv6.
+ self.create_pg_interfaces(range(0, 1))
+ self.pg_interfaces += self.create_pg_ip4_interfaces(range(1, 2))
+ self.pg_interfaces += self.create_pg_ip6_interfaces(range(2, 3))
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+
+ for i in [self.pg0, self.pg1]:
+ i.config_ip4()
+
+ for i in [self.pg0, self.pg2]:
+ i.config_ip6()
+
+ self.pg0.resolve_arp()
+ self.pg0.resolve_ndp()
+
+ def tearDown(self):
+ super(TestPgStream, self).tearDown()
+ for i in self.pg_interfaces:
+ i.unconfig_ip4()
+ i.admin_down()
+ i.remove_vpp_config()
+
+ def pg_stream(self, count=100, rate=1e6, packet_size=700):
+ rate = str(rate)
+ packet_size = str(packet_size)
+ count = str(count)
+
+ cmds = [
+ "clear trace",
+ "trace add pg-input 1000",
+ "packet-generator new {{\n"
+ " name pg0-pg1-stream\n"
+ " limit {count}\n"
+ " node ethernet-input\n"
+ " source pg0\n"
+ " rate {rate}\n"
+ " size {packet_size}+{packet_size}\n"
+ " buffer-flags ip4 offload\n"
+ " buffer-offload-flags offload-ip-cksum offload-udp-cksum\n"
+ " data {{\n"
+ " IP4: {src_mac} -> {dst_mac}\n"
+ " UDP: {src} -> {dst}\n"
+ " UDP: 1234 -> 4321\n"
+ " incrementing 100\n"
+ " }}\n"
+ "}}\n".format(
+ count=count,
+ rate=rate,
+ packet_size=packet_size,
+ src_mac=self.pg0.remote_mac,
+ dst_mac=self.pg0.local_mac,
+ src=self.pg0.remote_ip4,
+ dst=self.pg1.remote_ip4,
+ ),
+ "packet-generator new {{\n"
+ " name pg0-pg2-stream\n"
+ " limit {count}\n"
+ " node ethernet-input\n"
+ " source pg0\n"
+ " rate {rate}\n"
+ " size {packet_size}+{packet_size}\n"
+ " buffer-flags ip6 offload\n"
+ " buffer-offload-flags offload-udp-cksum\n"
+ " data {{\n"
+ " IP6: {src_mac} -> {dst_mac}\n"
+ " UDP: {src} -> {dst}\n"
+ " UDP: 1234 -> 4321\n"
+ " incrementing 100\n"
+ " }}\n"
+ "}}\n".format(
+ count=count,
+ rate=rate,
+ packet_size=packet_size,
+ src_mac=self.pg0.remote_mac,
+ dst_mac=self.pg0.local_mac,
+ src=self.pg0.remote_ip6,
+ dst=self.pg2.remote_ip6,
+ ),
+ "packet-generator new {{\n"
+ " name pg1-pg0-stream\n"
+ " limit {count}\n"
+ " node ip4-input\n"
+ " source pg1\n"
+ " rate {rate}\n"
+ " size {packet_size}+{packet_size}\n"
+ " buffer-flags ip4 offload\n"
+ " buffer-offload-flags offload-ip-cksum offload-udp-cksum\n"
+ " data {{\n"
+ " UDP: {src} -> {dst}\n"
+ " UDP: 1234 -> 4321\n"
+ " incrementing 100\n"
+ " }}\n"
+ "}}\n".format(
+ count=count,
+ rate=rate,
+ packet_size=packet_size,
+ src=self.pg1.remote_ip4,
+ dst=self.pg0.remote_ip4,
+ ),
+ "packet-generator new {{\n"
+ " name pg2-pg0-stream\n"
+ " limit {count}\n"
+ " node ip6-input\n"
+ " source pg2\n"
+ " rate {rate}\n"
+ " size {packet_size}+{packet_size}\n"
+ " buffer-flags ip6 offload\n"
+ " buffer-offload-flags offload-udp-cksum\n"
+ " data {{\n"
+ " UDP: {src} -> {dst}\n"
+ " UDP: 1234 -> 4321\n"
+ " incrementing 100\n"
+ " }}\n"
+ "}}\n".format(
+ count=count,
+ rate=rate,
+ packet_size=packet_size,
+ src=self.pg2.remote_ip6,
+ dst=self.pg0.remote_ip6,
+ ),
+ "packet-generator enable",
+ "show error",
+ ]
+
+ for cmd in cmds:
+ r = self.vapi.cli_return_response(cmd)
+ if r.retval != 0:
+ if hasattr(r, "reply"):
+ self.logger.info(cmd + " FAIL reply " + r.reply)
+ else:
+ self.logger.info(cmd + " FAIL retval " + str(r.retval))
+ self.assertTrue(r.retval == 0)
+
+ deadline = time.time() + 30
+ while self.vapi.cli("show packet-generator").find("Yes") != -1:
+ self.sleep(0.01) # yield
+ if time.time() > deadline:
+ self.logger.error("Timeout waiting for pg to stop")
+ break
+
+ r = self.vapi.cli_return_response("show trace")
+ self.assertTrue(r.retval == 0)
+ self.assertTrue(hasattr(r, "reply"))
+ rv = r.reply
+ packets = rv.split("\nPacket ")
+ for packet in enumerate(packets, start=1):
+ match = re.search(r"stream\s+([\w-]+)", packet[1])
+ if match:
+ stream_name = match.group(1)
+ else:
+ continue
+ if stream_name == "pg0-pg1-stream":
+ look_here = packet[1].find("ethernet-input")
+ self.assertNotEqual(look_here, -1)
+ search_string = "ip4 offload-ip-cksum offload-udp-cksum l2-hdr-offset 0 l3-hdr-offset 14 l4-hdr-offset 34"
+ look_here = packet[1].find(search_string)
+ self.assertNotEqual(look_here, -1)
+ search_string = "ip4 l2-hdr-offset 0 l3-hdr-offset 14 l4-hdr-offset 34"
+ look_here = packet[1].find(search_string)
+ self.assertNotEqual(look_here, -1)
+ elif stream_name == "pg0-pg2-stream":
+ look_here = packet[1].find("ethernet-input")
+ self.assertNotEqual(look_here, -1)
+ search_string = "ip6 offload-udp-cksum l2-hdr-offset 0 l3-hdr-offset 14 l4-hdr-offset 54"
+ look_here = packet[1].find(search_string)
+ self.assertNotEqual(look_here, -1)
+ search_string = "ip6 l2-hdr-offset 0 l3-hdr-offset 14 l4-hdr-offset 54"
+ look_here = packet[1].find(search_string)
+ self.assertNotEqual(look_here, -1)
+ elif stream_name == "pg1-pg0-stream":
+ look_here = packet[1].find("ethernet-input")
+ self.assertEqual(look_here, -1)
+ look_here = packet[1].find("ip4-input")
+ self.assertNotEqual(look_here, -1)
+ search_string = "ip4 offload-ip-cksum offload-udp-cksum l2-hdr-offset 0 l3-hdr-offset 0 l4-hdr-offset 20"
+ look_here = packet[1].find(search_string)
+ self.assertNotEqual(look_here, -1)
+ search_string = "ip4 l2-hdr-offset 0 l3-hdr-offset 0 l4-hdr-offset 20"
+ look_here = packet[1].find(search_string)
+ self.assertNotEqual(look_here, -1)
+ elif stream_name == "pg2-pg0-stream":
+ look_here = packet[1].find("ethernet-input")
+ self.assertEqual(look_here, -1)
+ look_here = packet[1].find("ip6-input")
+ self.assertNotEqual(look_here, -1)
+ search_string = "ip6 offload-udp-cksum l2-hdr-offset 0 l3-hdr-offset 0 l4-hdr-offset 40"
+ look_here = packet[1].find(search_string)
+ self.assertNotEqual(look_here, -1)
+ search_string = "ip6 l2-hdr-offset 0 l3-hdr-offset 0 l4-hdr-offset 40"
+ look_here = packet[1].find(search_string)
+ self.assertNotEqual(look_here, -1)
+
+ self.logger.info(self.vapi.cli("packet-generator disable"))
+ self.logger.info(self.vapi.cli("packet-generator delete pg0-pg1-stream"))
+ self.logger.info(self.vapi.cli("packet-generator delete pg0-pg2-stream"))
+ self.logger.info(self.vapi.cli("packet-generator delete pg1-pg0-stream"))
+ self.logger.info(self.vapi.cli("packet-generator delete pg2-pg0-stream"))
+
+ r = self.vapi.cli_return_response("show buffers")
+ self.assertTrue(r.retval == 0)
+ self.assertTrue(hasattr(r, "reply"))
+ rv = r.reply
+ used = int(rv.strip().split("\n")[-1].split()[-1])
+ self.assertEqual(used, 0)
+
+ def test_pg_stream(self):
+ """PG Stream testing"""
+ self.pg_stream(rate=100, packet_size=64)
+ self.pg_stream(count=1000, rate=1000)
+ self.pg_stream(count=100000, rate=10000, packet_size=1500)
+ self.pg_stream(packet_size=4000)
+
+
+if __name__ == "__main__":
+ unittest.main(testRunner=VppTestRunner)
diff --git a/test/test_sflow.py b/test/test_sflow.py
new file mode 100644
index 00000000000..d37ed84f252
--- /dev/null
+++ b/test/test_sflow.py
@@ -0,0 +1,212 @@
+#!/usr/bin/env python3
+
+import unittest
+from framework import VppTestCase
+from asfframework import VppTestRunner
+from scapy.layers.l2 import Ether
+from scapy.packet import Raw
+from scapy.layers.inet import IP, UDP
+from random import randint
+import re # for finding counters in "sh errors" output
+
+
+class SFlowTestCase(VppTestCase):
+ """sFlow test case"""
+
+ @classmethod
+ def setUpClass(self):
+ super(SFlowTestCase, self).setUpClass()
+
+ @classmethod
+ def teadDownClass(cls):
+ super(SFlowTestCase, cls).tearDownClass()
+
+ def setUp(self):
+ self.create_pg_interfaces(range(2)) # create pg0 and pg1
+ for i in self.pg_interfaces:
+ i.admin_up() # put the interface up
+ i.config_ip4() # configure IPv4 address on the interface
+ i.resolve_arp() # resolve ARP, so that we know VPP MAC
+
+ def tearDown(self):
+ for i in self.pg_interfaces:
+ i.admin_down()
+ i.unconfig()
+ i.set_table_ip4(0)
+ i.set_table_ip6(0)
+
+ def is_hw_interface_in_dump(self, dump, hw_if_index):
+ for i in dump:
+ if i.hw_if_index == hw_if_index:
+ return True
+ else:
+ return False
+
+ def enable_sflow_via_api(self):
+ ## TEST: Enable one interface
+ ret = self.vapi.sflow_enable_disable(hw_if_index=1, enable_disable=True)
+ self.assertEqual(ret.retval, 0)
+
+ ## TEST: interface dump all
+ ret = self.vapi.sflow_interface_dump()
+ self.assertTrue(self.is_hw_interface_in_dump(ret, 1))
+
+ ## TEST: Disable one interface
+ ret = self.vapi.sflow_enable_disable(hw_if_index=1, enable_disable=False)
+ self.assertEqual(ret.retval, 0)
+
+ ## TEST: interface dump all after enable + disable
+ ret = self.vapi.sflow_interface_dump()
+ self.assertEqual(len(ret), 0)
+
+ ## TEST: Enable both interfaces
+ ret = self.vapi.sflow_enable_disable(hw_if_index=1, enable_disable=True)
+ self.assertEqual(ret.retval, 0)
+ ret = self.vapi.sflow_enable_disable(hw_if_index=2, enable_disable=True)
+ self.assertEqual(ret.retval, 0)
+
+ ## TEST: interface dump all
+ ret = self.vapi.sflow_interface_dump()
+ self.assertTrue(self.is_hw_interface_in_dump(ret, 1))
+ self.assertTrue(self.is_hw_interface_in_dump(ret, 2))
+
+ ## TEST: the default sampling rate
+ ret = self.vapi.sflow_sampling_rate_get()
+ self.assert_equal(ret.sampling_N, 10000)
+
+ ## TEST: sflow_sampling_rate_set()
+ self.vapi.sflow_sampling_rate_set(sampling_N=1)
+ ret = self.vapi.sflow_sampling_rate_get()
+ self.assert_equal(ret.sampling_N, 1)
+
+ ## TEST: the default polling interval
+ ret = self.vapi.sflow_polling_interval_get()
+ self.assert_equal(ret.polling_S, 20)
+
+ ## TEST: sflow_polling_interval_set()
+ self.vapi.sflow_polling_interval_set(polling_S=10)
+ ret = self.vapi.sflow_polling_interval_get()
+ self.assert_equal(ret.polling_S, 10)
+
+ ## TEST: the default header bytes
+ ret = self.vapi.sflow_header_bytes_get()
+ self.assert_equal(ret.header_B, 128)
+
+ ## TEST: sflow_header_bytes_set()
+ self.vapi.sflow_header_bytes_set(header_B=96)
+ ret = self.vapi.sflow_header_bytes_get()
+ self.assert_equal(ret.header_B, 96)
+
+ def create_stream(self, src_if, dst_if, count):
+ packets = []
+ for i in range(count):
+ # create packet info stored in the test case instance
+ info = self.create_packet_info(src_if, dst_if)
+ # convert the info into packet payload
+ payload = self.info_to_payload(info)
+ # create the packet itself
+ p = (
+ Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+ / IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4)
+ / UDP(sport=randint(49152, 65535), dport=5678)
+ / Raw(payload)
+ )
+ # store a copy of the packet in the packet info
+ info.data = p.copy()
+ # append the packet to the list
+ packets.append(p)
+ # return the created packet list
+ return packets
+
+ def verify_capture(self, src_if, dst_if, capture):
+ packet_info = None
+ for packet in capture:
+ try:
+ ip = packet[IP]
+ udp = packet[UDP]
+ # convert the payload to packet info object
+ payload_info = self.payload_to_info(packet[Raw])
+ # make sure the indexes match
+ self.assert_equal(
+ payload_info.src, src_if.sw_if_index, "source sw_if_index"
+ )
+ self.assert_equal(
+ payload_info.dst, dst_if.sw_if_index, "destination sw_if_index"
+ )
+ packet_info = self.get_next_packet_info_for_interface2(
+ src_if.sw_if_index, dst_if.sw_if_index, packet_info
+ )
+ # make sure we didn't run out of saved packets
+ self.assertIsNotNone(packet_info)
+ self.assert_equal(
+ payload_info.index, packet_info.index, "packet info index"
+ )
+ saved_packet = packet_info.data # fetch the saved packet
+ # assert the values match
+ self.assert_equal(ip.src, saved_packet[IP].src, "IP source address")
+ self.assert_equal(udp.sport, saved_packet[UDP].sport, "UDP source port")
+ except:
+ self.logger.error("Unexpected or invalid packet:", packet)
+ raise
+ remaining_packet = self.get_next_packet_info_for_interface2(
+ src_if.sw_if_index, dst_if.sw_if_index, packet_info
+ )
+ self.assertIsNone(
+ remaining_packet,
+ "Interface %s: Packet expected from interface "
+ "%s didn't arrive" % (dst_if.name, src_if.name),
+ )
+
+ def get_sflow_counter(self, counter):
+ counters = self.vapi.cli("sh errors").split("\n")
+ for i in range(1, len(counters) - 1):
+ results = counters[i].split()
+ if results[1] == "sflow":
+ if re.search(counter, counters[i]) is not None:
+ return int(results[0])
+ return None
+
+ def verify_sflow(self, count):
+ ctr_processed = "sflow packets processed"
+ ctr_sampled = "sflow packets sampled"
+ ctr_dropped = "sflow packets dropped"
+ ctr_ps_sent = "sflow PSAMPLE sent"
+ ctr_ps_fail = "sflow PSAMPLE send failed"
+ processed = self.get_sflow_counter(ctr_processed)
+ sampled = self.get_sflow_counter(ctr_sampled)
+ dropped = self.get_sflow_counter(ctr_dropped)
+ ps_sent = self.get_sflow_counter(ctr_ps_sent)
+ ps_fail = self.get_sflow_counter(ctr_ps_fail)
+ self.assert_equal(processed, count, ctr_processed)
+ self.assert_equal(sampled, count, ctr_sampled)
+ self.assert_equal(dropped, None, ctr_dropped)
+ # TODO decide how to warn if PSAMPLE is not working
+ # It requires a prior "sudo modprobe psample", but
+ # that should probably be done at system boot time
+ # or maybe in a systemctl startup script, so we
+ # should only warn here.
+ self.logger.info(ctr_ps_sent + "=" + str(ps_sent))
+ self.logger.info(ctr_ps_fail + "=" + str(ps_fail))
+
+ def test_basic(self):
+ self.enable_sflow_via_api()
+ count = 7
+ # create the packet stream
+ packets = self.create_stream(self.pg0, self.pg1, count)
+ # add the stream to the source interface
+ self.pg0.add_stream(packets)
+ # enable capture on both interfaces
+ self.pg0.enable_capture()
+ self.pg1.enable_capture()
+ # start the packet generator
+ self.pg_start()
+ # get capture - the proper count of packets was saved by
+ # create_packet_info() based on dst_if parameter
+ capture = self.pg1.get_capture()
+ # assert nothing captured on pg0 (always do this last, so that
+ # some time has already passed since pg_start())
+ self.pg0.assert_nothing_captured()
+ # verify capture
+ self.verify_capture(self.pg0, self.pg1, capture)
+ # verify sflow counters
+ self.verify_sflow(count)
diff --git a/test/test_snort.py b/test/test_snort.py
index 19401cb7b85..5335091dba7 100644
--- a/test/test_snort.py
+++ b/test/test_snort.py
@@ -12,10 +12,10 @@ class TestSnort(VppTestCase):
def setUpClass(cls):
super(TestSnort, cls).setUpClass()
try:
- cls.create_pg_interfaces(range(2))
+ cls.create_pg_interfaces(range(4))
for i in cls.pg_interfaces:
i.config_ip4().resolve_arp()
- i.admin_up()
+ i.admin_down()
except Exception:
cls.tearDownClass()
raise
@@ -24,26 +24,28 @@ class TestSnort(VppTestCase):
def tearDownClass(cls):
for i in cls.pg_interfaces:
i.unconfig_ip4()
- i.admin_down()
super(TestSnort, cls).tearDownClass()
def test_snort_cli(self):
# TODO: add a test with packets
# { cli command : part of the expected reply }
- print("TEST SNORT CLI")
commands_replies = {
"snort create-instance name snortTest queue-size 16 on-disconnect drop": "",
"snort create-instance name snortTest2 queue-size 16 on-disconnect pass": "",
"snort attach instance snortTest interface pg0 output": "",
"snort attach instance snortTest2 interface pg1 input": "",
+ "snort attach all-instances interface pg2 inout": "",
+ "snort attach instance snortTest instance snortTest2 interface pg3 inout": "",
"show snort instances": "snortTest",
"show snort interfaces": "pg0",
"show snort clients": "number of clients",
"show snort mode": "input mode: interrupt",
"snort mode polling": "",
"snort mode interrupt": "",
- "snort detach interface pg0": "",
- "snort detach interface pg1": "",
+ "snort detach instance snortTest interface pg0": "",
+ "snort detach instance snortTest2 interface pg1": "",
+ "snort detach all-instances interface pg2": "",
+ "snort detach instance snortTest instance snortTest2 interface pg3": "",
"snort delete instance snortTest": "",
}
@@ -64,7 +66,7 @@ class TestSnortVapi(VppTestCase):
for i in cls.pg_interfaces:
i.config_ip4()
i.resolve_arp()
- i.admin_up()
+ i.admin_down()
except Exception:
cls.tearDownClass()
raise
@@ -73,7 +75,6 @@ class TestSnortVapi(VppTestCase):
def tearDownClass(cls):
for i in cls.pg_interfaces:
i.unconfig_ip4()
- i.admin_down()
super(TestSnortVapi, cls).tearDownClass()
def test_snort_01_modes_set_interrupt(self):
@@ -109,14 +110,20 @@ class TestSnortVapi(VppTestCase):
reply = self.vapi.snort_interface_attach(
instance_index=0, sw_if_index=1, snort_dir=1
)
+ reply = self.vapi.snort_interface_attach(
+ instance_index=0, sw_if_index=2, snort_dir=2
+ )
+ # verify attaching with an invalid direction is rejected
try:
reply = self.vapi.snort_interface_attach(
- instance_index=1, sw_if_index=1, snort_dir=1
+ instance_index=1, sw_if_index=2, snort_dir=4
)
except:
pass
else:
self.assertNotEqual(reply.retval, 0)
+ reply = self.vapi.cli("show snort interfaces")
+ self.assertNotIn("snortTest1", reply)
reply = self.vapi.snort_interface_attach(
instance_index=1, sw_if_index=2, snort_dir=3
@@ -124,6 +131,31 @@ class TestSnortVapi(VppTestCase):
reply = self.vapi.cli("show snort interfaces")
self.assertIn("snortTest0", reply)
self.assertIn("snortTest1", reply)
+ self.assertIn("input", reply)
+ self.assertIn("inout", reply)
+ self.assertIn("output", reply)
+
+ # verify attaching a previously attached interface is rejected
+ try:
+ reply = self.vapi.snort_interface_attach(
+ instance_index=1, sw_if_index=2, snort_dir=2
+ )
+ except:
+ pass
+ else:
+ self.assertNotEqual(reply.retval, 0)
+
+ # verify attaching an invalid sw_if_index is rejected
+ try:
+ reply = self.vapi.snort_interface_attach(
+ instance_index=1, sw_if_index=3, snort_dir=2
+ )
+ except:
+ pass
+ else:
+ self.assertNotEqual(reply.retval, 0)
+ reply = self.vapi.cli("show snort interfaces")
+ self.assertIn("snortTest1", reply)
def test_snort_05_delete_instance(self):
"""Instances can be deleted"""
@@ -131,14 +163,14 @@ class TestSnortVapi(VppTestCase):
reply = self.vapi.cli("show snort interfaces")
self.assertNotIn("snortTest0", reply)
self.assertIn("snortTest1", reply)
- reply = self.vapi.cli("show snort interfaces")
self.assertNotIn("pg0", reply)
self.assertIn("pg1", reply)
def test_snort_06_detach_if(self):
"""Interfaces can be detached"""
+ # verify detaching an invalid sw_if_index is rejected
try:
- reply = self.vapi.snort_interface_detach(sw_if_index=1)
+ reply = self.vapi.snort_interface_detach(sw_if_index=3)
except:
pass
else:
diff --git a/test/vpp_papi_provider.py b/test/vpp_papi_provider.py
index 2d49bf2a8f0..1822e4d9d19 100644
--- a/test/vpp_papi_provider.py
+++ b/test/vpp_papi_provider.py
@@ -9,7 +9,6 @@ import os
import socket
import time
import queue
-from six import moves, iteritems
from config import config
from vpp_papi import VPPApiClient
from hook import Hook
@@ -333,7 +332,7 @@ class VppPapiProvider(object):
# Default override
if name in defaultmapping:
- for k, v in iteritems(defaultmapping[name]):
+ for k, v in defaultmapping[name].items():
if k in d:
continue
d[k] = v
@@ -389,7 +388,7 @@ class VppPapiProvider(object):
api_fn.__name__,
as_fn_signature(api_args),
reply.retval,
- moves.reprlib.repr(reply),
+ reprlib.repr(reply),
)
)
self.test_class.logger.info(msg)