aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/Makefile20
-rw-r--r--test/asf/test_vcl.py42
-rw-r--r--test/template_ipsec.py21
-rw-r--r--test/test_ipsec_spd_fp_input.py152
-rw-r--r--test/test_linux_cp.py134
5 files changed, 356 insertions, 13 deletions
diff --git a/test/Makefile b/test/Makefile
index 79feba9165e..17bf33d0884 100644
--- a/test/Makefile
+++ b/test/Makefile
@@ -388,24 +388,32 @@ COV_REM_TODO_NO_TEST="*/vpp-api/client/*" "*/plugins/prom/*" \
"*/vnet/ipsec/esp_format.c" "*/vnet/ethernet/sfp.c" \
"*/vnet/ethernet/ethernet_format_fns.h" \
"*/plugins/ikev2/ikev2_format.c" "*/vnet/bier/bier_types.c"
-
-COV_REM_ALT_TEST="*/plugins/hs_apps/*" "*/plugins/http/*.h"
+ifeq ($(HS-TEST),1)
+COV_REM_HST_UNUSED_FEAT= "*/plugins/ping/*" "*/plugins/unittest/mpcap_node.c" "*/vnet/bfd/*" \
+ "*/vnet/bier/*" "*/vnet/bonding/*" "*/vnet/classify/*" \
+ "*/vnet/gso/*" "*/vnet/ipfix-export/*" "*/vnet/ipip/*" \
+ "*/vnet/ipsec/*" "*/vnet/l2/*" "*/vnet/mpls/*" \
+ "*/vnet/pg/*" "*/vnet/policer/*" "*/vnet/snap/*" \
+ "*/vnet/span/*" "*/vnet/srv6/*" "*/vnet/teib/*" \
+ "*/vnet/tunnel/*" "*/vpp-api/vapi/*" "*/vpp/app/vpe_cli.c" \
+ "*/vppinfra/pcap.c" "*/vppinfra/pcap_funcs.h"
+endif
.PHONY: cov-post
cov-post: wipe-cov $(BUILD_COV_DIR)
- @lcov --ignore-errors --capture \
+ @lcov --ignore-errors unused,empty,mismatch,gcov --capture \
--directory $(VPP_BUILD_DIR) \
--output-file $(BUILD_COV_DIR)/coverage$(HS_TEST).info
@test -z "$(EXTERN_COV_DIR)" || \
- lcov --ignore-errors --capture \
+ lcov --ignore-errors unused,empty,mismatch,gcov --capture \
--directory $(EXTERN_COV_DIR) \
--output-file $(BUILD_COV_DIR)/extern-coverage$(HS_TEST).info
- @lcov --ignore-errors --remove $(BUILD_COV_DIR)/coverage$(HS_TEST).info \
+ @lcov --ignore-errors unused,empty,mismatch,gcov --remove $(BUILD_COV_DIR)/coverage$(HS_TEST).info \
$(COV_REM_NOT_CODE) \
$(COV_REM_DRIVERS) \
$(COV_REM_TODO_NO_TEST) \
$(COV_REM_UNUSED_FEAT) \
- $(COV_REM_ALT_TEST) \
+ $(COV_REM_HST_UNUSED_FEAT) \
-o $(BUILD_COV_DIR)/coverage-filtered$(HS_TEST).info
@genhtml $(BUILD_COV_DIR)/coverage-filtered$(HS_TEST).info \
--output-directory $(BUILD_COV_DIR)/html
diff --git a/test/asf/test_vcl.py b/test/asf/test_vcl.py
index 143b46c22ee..ae837fa4065 100644
--- a/test/asf/test_vcl.py
+++ b/test/asf/test_vcl.py
@@ -564,6 +564,48 @@ class VCLThruHostStackEcho(VCLTestCase):
@unittest.skipIf(
"hs_apps" in config.excluded_plugins, "Exclude tests requiring hs_apps plugin"
)
+class VCLThruHostStackCLUDPEcho(VCLTestCase):
+ """VCL Thru Host Stack CL UDP Echo"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(VCLThruHostStackCLUDPEcho, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(VCLThruHostStackCLUDPEcho, cls).tearDownClass()
+
+ def setUp(self):
+ super(VCLThruHostStackCLUDPEcho, self).setUp()
+
+ self.thru_host_stack_setup()
+ self.pre_test_sleep = 2
+ self.timeout = 5
+
+ def tearDown(self):
+ self.thru_host_stack_tear_down()
+ super(VCLThruHostStackCLUDPEcho, self).tearDown()
+
+ def test_vcl_thru_host_stack_cl_udp_echo(self):
+ """run VCL IPv4 thru host stack CL UDP echo test"""
+ server_args = ["-s", self.loop0.local_ip4]
+ client_args = ["-c", self.loop0.local_ip4]
+ self.thru_host_stack_test(
+ "vcl_test_cl_udp",
+ server_args,
+ "vcl_test_cl_udp",
+ client_args,
+ )
+
+ def show_commands_at_teardown(self):
+ self.logger.debug(self.vapi.cli("show app server"))
+ self.logger.debug(self.vapi.cli("show session verbose"))
+ self.logger.debug(self.vapi.cli("show app mq"))
+
+
+@unittest.skipIf(
+ "hs_apps" in config.excluded_plugins, "Exclude tests requiring hs_apps plugin"
+)
class VCLThruHostStackTLS(VCLTestCase):
"""VCL Thru Host Stack TLS"""
diff --git a/test/template_ipsec.py b/test/template_ipsec.py
index 4e68d44013f..ab5aa9390da 100644
--- a/test/template_ipsec.py
+++ b/test/template_ipsec.py
@@ -16,6 +16,8 @@ from scapy.layers.inet6 import (
IPv6ExtHdrDestOpt,
)
+from scapy.layers.isakmp import ISAKMP
+
from framework import VppTestCase
from asfframework import VppTestRunner
@@ -3246,11 +3248,22 @@ class IPSecIPv6Fwd(VppTestCase):
payload = self.info_to_payload(info)
# create the packet itself
p = (
- Ether(dst=src_if.local_mac, src=src_if.remote_mac)
- / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
- / UDP(sport=src_prt, dport=dst_prt)
- / Raw(payload)
+ (
+ Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+ / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
+ / UDP(sport=src_prt, dport=dst_prt)
+ / ISAKMP()
+ / Raw(payload)
+ )
+ if (src_prt == 500 or src_prt == 4500)
+ else (
+ Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+ / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6)
+ / UDP(sport=src_prt, dport=dst_prt)
+ / Raw(payload)
+ )
)
+
# store a copy of the packet in the packet info
info.data = p.copy()
# append the packet to the list
diff --git a/test/test_ipsec_spd_fp_input.py b/test/test_ipsec_spd_fp_input.py
index 1953bbe5eaf..ed38a51abdb 100644
--- a/test/test_ipsec_spd_fp_input.py
+++ b/test/test_ipsec_spd_fp_input.py
@@ -835,9 +835,6 @@ class IPSec4SpdTestCaseMultiple(SpdFastPathInbound):
self.verify_policy_match(0, policy_22)
-@unittest.skipIf(
- "ping" in config.excluded_plugins, "Exclude tests requiring Ping plugin"
-)
class IPSec6SpdTestCaseProtect(SpdFastPathIPv6InboundProtect):
""" IPSec/IPv6 inbound: Policy mode test case with fast path \
(add protect)"""
@@ -889,6 +886,155 @@ class IPSec6SpdTestCaseProtect(SpdFastPathIPv6InboundProtect):
self.assertEqual(p.tra_sa_in.get_err("lost"), 0)
+class IPSec6SpdTestCaseBypass(SpdFastPathIPv6Inbound):
+ """ IPSec/IPv6 inbound: Policy mode test case with fast path \
+ (add bypass)"""
+
+ def test_ipsec_spd_inbound_bypass(self):
+ # In this test case, packets in IPv6 FWD path are configured
+ # to go through IPSec inbound SPD policy lookup.
+ #
+ # 2 inbound SPD rules (1 HIGH and 1 LOW) are added.
+ # - High priority rule action is set to DISCARD.
+ # - Low priority rule action is set to BYPASS.
+ #
+ # Since BYPASS rules take precedence over DISCARD
+ # (the order being PROTECT, BYPASS, DISCARD) we expect the
+ # BYPASS rule to match and traffic to be correctly forwarded.
+ self.create_interfaces(2)
+ pkt_count = 5
+
+ self.spd_create_and_intf_add(1, [self.pg1, self.pg0])
+
+ # create input rules
+ # bypass rule should take precedence over discard rule,
+ # even though it's lower priority, because for input policies
+ # matching PROTECT policies precedes matching BYPASS policies
+ # which preceeds matching for DISCARD policies.
+ # Any hit stops the process.
+ policy_0 = self.spd_add_rem_policy( # inbound, priority 10
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="bypass",
+ ip_range=True,
+ local_ip_start=self.pg1.remote_ip6,
+ local_ip_stop=self.pg1.remote_ip6,
+ remote_ip_start=self.pg0.remote_ip6,
+ remote_ip_stop=self.pg0.remote_ip6,
+ )
+ policy_1 = self.spd_add_rem_policy( # inbound, priority 15
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=15,
+ policy_type="discard",
+ ip_range=True,
+ local_ip_start=self.pg1.remote_ip6,
+ local_ip_stop=self.pg1.remote_ip6,
+ remote_ip_start=self.pg0.remote_ip6,
+ remote_ip_stop=self.pg0.remote_ip6,
+ )
+
+ # create output rule so we can capture forwarded packets
+ policy_2 = self.spd_add_rem_policy( # outbound, priority 10
+ 1,
+ self.pg0,
+ self.pg1,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
+
+ # create the packet stream
+ packets = self.create_stream(
+ self.pg0, self.pg1, pkt_count, src_prt=500, dst_prt=500
+ )
+ # add the stream to the source interface
+ self.pg0.add_stream(packets)
+ self.pg1.enable_capture()
+ self.pg_start()
+
+ # check capture on pg1
+ capture = self.pg1.get_capture()
+ for packet in capture:
+ try:
+ self.logger.debug(ppp("SPD Add - Got packet:", packet))
+ except Exception:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+ self.logger.debug("SPD: Num packets: %s", len(capture.res))
+
+ # verify captured packets
+ self.verify_capture(self.pg0, self.pg1, capture)
+ # verify all policies matched the expected number of times
+ self.verify_policy_match(pkt_count, policy_0)
+ self.verify_policy_match(0, policy_1)
+ self.verify_policy_match(pkt_count, policy_2)
+
+
+class IPSec6SpdTestCaseDiscard(SpdFastPathIPv6Inbound):
+ """ IPSec/IPv6 inbound: Policy mode test case with fast path \
+ (add discard)"""
+
+ def test_ipsec_spd_inbound_discard(self):
+ # In this test case, packets in IPv6 FWD path are configured
+ # to go through IPSec inbound SPD policy lookup.
+ #
+ # Rule action is set to DISCARD.
+
+ self.create_interfaces(2)
+ pkt_count = 5
+
+ self.spd_create_and_intf_add(1, [self.pg1, self.pg0])
+
+ # create input rules
+ # bypass rule should take precedence over discard rule,
+ # even though it's lower priority
+ policy_0 = self.spd_add_rem_policy( # inbound, priority 10
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=0,
+ priority=10,
+ policy_type="discard",
+ )
+
+ # create output rule so we can capture forwarded packets
+ policy_1 = self.spd_add_rem_policy( # outbound, priority 10
+ 1,
+ self.pg1,
+ self.pg0,
+ socket.IPPROTO_UDP,
+ is_out=1,
+ priority=10,
+ policy_type="bypass",
+ )
+
+ # create the packet stream
+ packets = self.create_stream(
+ self.pg0, self.pg1, pkt_count, src_prt=500, dst_prt=500
+ )
+ # add the stream to the source interface
+ self.pg0.add_stream(packets)
+ self.pg1.enable_capture()
+ self.pg_start()
+
+ # check capture on pg1
+ capture = self.pg1.assert_nothing_captured()
+
+ # verify all policies matched the expected number of times
+ self.verify_policy_match(pkt_count, policy_0)
+ self.verify_policy_match(0, policy_1)
+
+
class IPSec6SpdTestCaseTunProtect(SpdFastPathIPv6InboundProtect):
"""IPSec/IPv6 inbound: Policy mode test case with fast path"""
diff --git a/test/test_linux_cp.py b/test/test_linux_cp.py
index ff6023cea26..d7116233236 100644
--- a/test/test_linux_cp.py
+++ b/test/test_linux_cp.py
@@ -6,6 +6,14 @@ import socket
from scapy.layers.inet import IP, UDP
from scapy.layers.inet6 import IPv6, Raw
from scapy.layers.l2 import Ether, ARP
+from scapy.contrib.lacp import LACP
+from scapy.contrib.lldp import (
+ LLDPDUChassisID,
+ LLDPDUPortID,
+ LLDPDUTimeToLive,
+ LLDPDUEndOfLLDPDU,
+ LLDPDU,
+)
from util import reassemble4
from vpp_object import VppObject
@@ -427,5 +435,131 @@ class TestLinuxCPIpsec(TemplateIpsec, TemplateIpsecItf4, IpsecTun4):
self.unconfig_network(p)
+@unittest.skipIf("linux-cp" in config.excluded_plugins, "Exclude linux-cp plugin tests")
+class TestLinuxCPEthertype(VppTestCase):
+ """Linux CP Ethertype"""
+
+ extra_vpp_plugin_config = [
+ "plugin",
+ "linux_cp_plugin.so",
+ "{",
+ "enable",
+ "}",
+ "plugin",
+ "linux_cp_unittest_plugin.so",
+ "{",
+ "enable",
+ "}",
+ "plugin",
+ "lldp_plugin.so",
+ "{",
+ "disable",
+ "}",
+ ]
+
+ LACP_ETHERTYPE = 0x8809
+ LLDP_ETHERTYPE = 0x88CC
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestLinuxCPEthertype, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestLinuxCPEthertype, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestLinuxCPEthertype, self).setUp()
+ self.create_pg_interfaces(range(2))
+ for i in self.pg_interfaces:
+ i.admin_up()
+
+ self.host = self.pg0
+ self.phy = self.pg1
+
+ self.pair = VppLcpPair(self, self.phy, self.host).add_vpp_config()
+ self.logger.info(self.vapi.cli("sh lcp"))
+
+ def tearDown(self):
+ self.pair.remove_vpp_config()
+
+ for i in self.pg_interfaces:
+ i.admin_down()
+ super(TestLinuxCPEthertype, self).tearDown()
+
+ def send_packet(self, sender, receiver, ethertype, dst, data, expect_copy=True):
+ packet = Ether(src=sender.remote_mac, dst=dst, type=ethertype) / data
+ if expect_copy:
+ rxs = self.send_and_expect(sender, [packet], receiver)
+ for rx in rxs:
+ self.assertEqual(packet.show2(True), rx.show2(True))
+ else:
+ self.send_and_assert_no_replies(sender, [packet])
+
+ def send_lacp_packet(self, sender, receiver, expect_copy=True):
+ data = LACP(
+ actor_system="00:00:00:00:00:01", partner_system="00:00:00:00:00:02"
+ )
+ self.send_packet(
+ sender,
+ receiver,
+ self.LACP_ETHERTYPE,
+ "01:80:c2:00:00:02",
+ data,
+ expect_copy,
+ )
+
+ def send_lldp_packet(self, sender, receiver, expect_copy=True):
+ data = (
+ LLDPDUChassisID(subtype=4, id="01:02:03:04:05:06")
+ / LLDPDUPortID(subtype=3, id="07:08:09:0a:0b:0c")
+ / LLDPDUTimeToLive(ttl=120)
+ / LLDPDUEndOfLLDPDU()
+ )
+ self.send_packet(
+ sender,
+ receiver,
+ self.LLDP_ETHERTYPE,
+ "01:80:c2:00:00:0e",
+ data,
+ expect_copy,
+ )
+
+ def check_ethertype_enabled(self, ethertype, enabled=True):
+ reply = self.vapi.lcp_ethertype_get()
+ output = self.vapi.cli("show lcp ethertype")
+
+ if enabled:
+ self.assertIn(ethertype, reply.ethertypes)
+ self.assertIn(hex(ethertype), output)
+ else:
+ self.assertNotIn(ethertype, reply.ethertypes)
+ self.assertNotIn(hex(ethertype), output)
+
+ def test_linux_cp_lacp(self):
+ """Linux CP LACP Test"""
+ self.check_ethertype_enabled(self.LACP_ETHERTYPE, enabled=False)
+ self.send_lacp_packet(self.phy, self.host, expect_copy=False)
+ self.send_lacp_packet(self.host, self.phy, expect_copy=False)
+
+ self.vapi.cli("lcp ethertype enable " + str(self.LACP_ETHERTYPE))
+
+ self.check_ethertype_enabled(self.LACP_ETHERTYPE, enabled=True)
+ self.send_lacp_packet(self.phy, self.host, expect_copy=True)
+ self.send_lacp_packet(self.host, self.phy, expect_copy=True)
+
+ def test_linux_cp_lldp(self):
+ """Linux CP LLDP Test"""
+ self.check_ethertype_enabled(self.LLDP_ETHERTYPE, enabled=False)
+ self.send_lldp_packet(self.phy, self.host, expect_copy=False)
+ self.send_lldp_packet(self.host, self.phy, expect_copy=False)
+
+ self.vapi.cli("lcp ethertype enable " + str(self.LLDP_ETHERTYPE))
+
+ self.check_ethertype_enabled(self.LLDP_ETHERTYPE, enabled=True)
+ self.send_lldp_packet(self.phy, self.host, expect_copy=True)
+ self.send_lldp_packet(self.host, self.phy, expect_copy=True)
+
+
if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)