diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/Makefile | 20 | ||||
-rw-r--r-- | test/asf/test_vcl.py | 42 | ||||
-rw-r--r-- | test/template_ipsec.py | 21 | ||||
-rw-r--r-- | test/test_ipsec_spd_fp_input.py | 152 | ||||
-rw-r--r-- | test/test_linux_cp.py | 134 |
5 files changed, 356 insertions, 13 deletions
diff --git a/test/Makefile b/test/Makefile index 79feba9165e..17bf33d0884 100644 --- a/test/Makefile +++ b/test/Makefile @@ -388,24 +388,32 @@ COV_REM_TODO_NO_TEST="*/vpp-api/client/*" "*/plugins/prom/*" \ "*/vnet/ipsec/esp_format.c" "*/vnet/ethernet/sfp.c" \ "*/vnet/ethernet/ethernet_format_fns.h" \ "*/plugins/ikev2/ikev2_format.c" "*/vnet/bier/bier_types.c" - -COV_REM_ALT_TEST="*/plugins/hs_apps/*" "*/plugins/http/*.h" +ifeq ($(HS-TEST),1) +COV_REM_HST_UNUSED_FEAT= "*/plugins/ping/*" "*/plugins/unittest/mpcap_node.c" "*/vnet/bfd/*" \ + "*/vnet/bier/*" "*/vnet/bonding/*" "*/vnet/classify/*" \ + "*/vnet/gso/*" "*/vnet/ipfix-export/*" "*/vnet/ipip/*" \ + "*/vnet/ipsec/*" "*/vnet/l2/*" "*/vnet/mpls/*" \ + "*/vnet/pg/*" "*/vnet/policer/*" "*/vnet/snap/*" \ + "*/vnet/span/*" "*/vnet/srv6/*" "*/vnet/teib/*" \ + "*/vnet/tunnel/*" "*/vpp-api/vapi/*" "*/vpp/app/vpe_cli.c" \ + "*/vppinfra/pcap.c" "*/vppinfra/pcap_funcs.h" +endif .PHONY: cov-post cov-post: wipe-cov $(BUILD_COV_DIR) - @lcov --ignore-errors --capture \ + @lcov --ignore-errors unused,empty,mismatch,gcov --capture \ --directory $(VPP_BUILD_DIR) \ --output-file $(BUILD_COV_DIR)/coverage$(HS_TEST).info @test -z "$(EXTERN_COV_DIR)" || \ - lcov --ignore-errors --capture \ + lcov --ignore-errors unused,empty,mismatch,gcov --capture \ --directory $(EXTERN_COV_DIR) \ --output-file $(BUILD_COV_DIR)/extern-coverage$(HS_TEST).info - @lcov --ignore-errors --remove $(BUILD_COV_DIR)/coverage$(HS_TEST).info \ + @lcov --ignore-errors unused,empty,mismatch,gcov --remove $(BUILD_COV_DIR)/coverage$(HS_TEST).info \ $(COV_REM_NOT_CODE) \ $(COV_REM_DRIVERS) \ $(COV_REM_TODO_NO_TEST) \ $(COV_REM_UNUSED_FEAT) \ - $(COV_REM_ALT_TEST) \ + $(COV_REM_HST_UNUSED_FEAT) \ -o $(BUILD_COV_DIR)/coverage-filtered$(HS_TEST).info @genhtml $(BUILD_COV_DIR)/coverage-filtered$(HS_TEST).info \ --output-directory $(BUILD_COV_DIR)/html diff --git a/test/asf/test_vcl.py b/test/asf/test_vcl.py index 143b46c22ee..ae837fa4065 100644 --- a/test/asf/test_vcl.py +++ b/test/asf/test_vcl.py @@ -564,6 +564,48 @@ class VCLThruHostStackEcho(VCLTestCase): @unittest.skipIf( "hs_apps" in config.excluded_plugins, "Exclude tests requiring hs_apps plugin" ) +class VCLThruHostStackCLUDPEcho(VCLTestCase): + """VCL Thru Host Stack CL UDP Echo""" + + @classmethod + def setUpClass(cls): + super(VCLThruHostStackCLUDPEcho, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(VCLThruHostStackCLUDPEcho, cls).tearDownClass() + + def setUp(self): + super(VCLThruHostStackCLUDPEcho, self).setUp() + + self.thru_host_stack_setup() + self.pre_test_sleep = 2 + self.timeout = 5 + + def tearDown(self): + self.thru_host_stack_tear_down() + super(VCLThruHostStackCLUDPEcho, self).tearDown() + + def test_vcl_thru_host_stack_cl_udp_echo(self): + """run VCL IPv4 thru host stack CL UDP echo test""" + server_args = ["-s", self.loop0.local_ip4] + client_args = ["-c", self.loop0.local_ip4] + self.thru_host_stack_test( + "vcl_test_cl_udp", + server_args, + "vcl_test_cl_udp", + client_args, + ) + + def show_commands_at_teardown(self): + self.logger.debug(self.vapi.cli("show app server")) + self.logger.debug(self.vapi.cli("show session verbose")) + self.logger.debug(self.vapi.cli("show app mq")) + + +@unittest.skipIf( + "hs_apps" in config.excluded_plugins, "Exclude tests requiring hs_apps plugin" +) class VCLThruHostStackTLS(VCLTestCase): """VCL Thru Host Stack TLS""" diff --git a/test/template_ipsec.py b/test/template_ipsec.py index 4e68d44013f..ab5aa9390da 100644 --- a/test/template_ipsec.py +++ b/test/template_ipsec.py @@ -16,6 +16,8 @@ from scapy.layers.inet6 import ( IPv6ExtHdrDestOpt, ) +from scapy.layers.isakmp import ISAKMP + from framework import VppTestCase from asfframework import VppTestRunner @@ -3246,11 +3248,22 @@ class IPSecIPv6Fwd(VppTestCase): payload = self.info_to_payload(info) # create the packet itself p = ( - Ether(dst=src_if.local_mac, src=src_if.remote_mac) - / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) - / UDP(sport=src_prt, dport=dst_prt) - / Raw(payload) + ( + Ether(dst=src_if.local_mac, src=src_if.remote_mac) + / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) + / UDP(sport=src_prt, dport=dst_prt) + / ISAKMP() + / Raw(payload) + ) + if (src_prt == 500 or src_prt == 4500) + else ( + Ether(dst=src_if.local_mac, src=src_if.remote_mac) + / IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) + / UDP(sport=src_prt, dport=dst_prt) + / Raw(payload) + ) ) + # store a copy of the packet in the packet info info.data = p.copy() # append the packet to the list diff --git a/test/test_ipsec_spd_fp_input.py b/test/test_ipsec_spd_fp_input.py index 1953bbe5eaf..ed38a51abdb 100644 --- a/test/test_ipsec_spd_fp_input.py +++ b/test/test_ipsec_spd_fp_input.py @@ -835,9 +835,6 @@ class IPSec4SpdTestCaseMultiple(SpdFastPathInbound): self.verify_policy_match(0, policy_22) -@unittest.skipIf( - "ping" in config.excluded_plugins, "Exclude tests requiring Ping plugin" -) class IPSec6SpdTestCaseProtect(SpdFastPathIPv6InboundProtect): """ IPSec/IPv6 inbound: Policy mode test case with fast path \ (add protect)""" @@ -889,6 +886,155 @@ class IPSec6SpdTestCaseProtect(SpdFastPathIPv6InboundProtect): self.assertEqual(p.tra_sa_in.get_err("lost"), 0) +class IPSec6SpdTestCaseBypass(SpdFastPathIPv6Inbound): + """ IPSec/IPv6 inbound: Policy mode test case with fast path \ + (add bypass)""" + + def test_ipsec_spd_inbound_bypass(self): + # In this test case, packets in IPv6 FWD path are configured + # to go through IPSec inbound SPD policy lookup. + # + # 2 inbound SPD rules (1 HIGH and 1 LOW) are added. + # - High priority rule action is set to DISCARD. + # - Low priority rule action is set to BYPASS. + # + # Since BYPASS rules take precedence over DISCARD + # (the order being PROTECT, BYPASS, DISCARD) we expect the + # BYPASS rule to match and traffic to be correctly forwarded. + self.create_interfaces(2) + pkt_count = 5 + + self.spd_create_and_intf_add(1, [self.pg1, self.pg0]) + + # create input rules + # bypass rule should take precedence over discard rule, + # even though it's lower priority, because for input policies + # matching PROTECT policies precedes matching BYPASS policies + # which preceeds matching for DISCARD policies. + # Any hit stops the process. + policy_0 = self.spd_add_rem_policy( # inbound, priority 10 + 1, + self.pg1, + self.pg0, + socket.IPPROTO_UDP, + is_out=0, + priority=10, + policy_type="bypass", + ip_range=True, + local_ip_start=self.pg1.remote_ip6, + local_ip_stop=self.pg1.remote_ip6, + remote_ip_start=self.pg0.remote_ip6, + remote_ip_stop=self.pg0.remote_ip6, + ) + policy_1 = self.spd_add_rem_policy( # inbound, priority 15 + 1, + self.pg1, + self.pg0, + socket.IPPROTO_UDP, + is_out=0, + priority=15, + policy_type="discard", + ip_range=True, + local_ip_start=self.pg1.remote_ip6, + local_ip_stop=self.pg1.remote_ip6, + remote_ip_start=self.pg0.remote_ip6, + remote_ip_stop=self.pg0.remote_ip6, + ) + + # create output rule so we can capture forwarded packets + policy_2 = self.spd_add_rem_policy( # outbound, priority 10 + 1, + self.pg0, + self.pg1, + socket.IPPROTO_UDP, + is_out=1, + priority=10, + policy_type="bypass", + ) + + # create the packet stream + packets = self.create_stream( + self.pg0, self.pg1, pkt_count, src_prt=500, dst_prt=500 + ) + # add the stream to the source interface + self.pg0.add_stream(packets) + self.pg1.enable_capture() + self.pg_start() + + # check capture on pg1 + capture = self.pg1.get_capture() + for packet in capture: + try: + self.logger.debug(ppp("SPD Add - Got packet:", packet)) + except Exception: + self.logger.error(ppp("Unexpected or invalid packet:", packet)) + raise + self.logger.debug("SPD: Num packets: %s", len(capture.res)) + + # verify captured packets + self.verify_capture(self.pg0, self.pg1, capture) + # verify all policies matched the expected number of times + self.verify_policy_match(pkt_count, policy_0) + self.verify_policy_match(0, policy_1) + self.verify_policy_match(pkt_count, policy_2) + + +class IPSec6SpdTestCaseDiscard(SpdFastPathIPv6Inbound): + """ IPSec/IPv6 inbound: Policy mode test case with fast path \ + (add discard)""" + + def test_ipsec_spd_inbound_discard(self): + # In this test case, packets in IPv6 FWD path are configured + # to go through IPSec inbound SPD policy lookup. + # + # Rule action is set to DISCARD. + + self.create_interfaces(2) + pkt_count = 5 + + self.spd_create_and_intf_add(1, [self.pg1, self.pg0]) + + # create input rules + # bypass rule should take precedence over discard rule, + # even though it's lower priority + policy_0 = self.spd_add_rem_policy( # inbound, priority 10 + 1, + self.pg1, + self.pg0, + socket.IPPROTO_UDP, + is_out=0, + priority=10, + policy_type="discard", + ) + + # create output rule so we can capture forwarded packets + policy_1 = self.spd_add_rem_policy( # outbound, priority 10 + 1, + self.pg1, + self.pg0, + socket.IPPROTO_UDP, + is_out=1, + priority=10, + policy_type="bypass", + ) + + # create the packet stream + packets = self.create_stream( + self.pg0, self.pg1, pkt_count, src_prt=500, dst_prt=500 + ) + # add the stream to the source interface + self.pg0.add_stream(packets) + self.pg1.enable_capture() + self.pg_start() + + # check capture on pg1 + capture = self.pg1.assert_nothing_captured() + + # verify all policies matched the expected number of times + self.verify_policy_match(pkt_count, policy_0) + self.verify_policy_match(0, policy_1) + + class IPSec6SpdTestCaseTunProtect(SpdFastPathIPv6InboundProtect): """IPSec/IPv6 inbound: Policy mode test case with fast path""" diff --git a/test/test_linux_cp.py b/test/test_linux_cp.py index ff6023cea26..d7116233236 100644 --- a/test/test_linux_cp.py +++ b/test/test_linux_cp.py @@ -6,6 +6,14 @@ import socket from scapy.layers.inet import IP, UDP from scapy.layers.inet6 import IPv6, Raw from scapy.layers.l2 import Ether, ARP +from scapy.contrib.lacp import LACP +from scapy.contrib.lldp import ( + LLDPDUChassisID, + LLDPDUPortID, + LLDPDUTimeToLive, + LLDPDUEndOfLLDPDU, + LLDPDU, +) from util import reassemble4 from vpp_object import VppObject @@ -427,5 +435,131 @@ class TestLinuxCPIpsec(TemplateIpsec, TemplateIpsecItf4, IpsecTun4): self.unconfig_network(p) +@unittest.skipIf("linux-cp" in config.excluded_plugins, "Exclude linux-cp plugin tests") +class TestLinuxCPEthertype(VppTestCase): + """Linux CP Ethertype""" + + extra_vpp_plugin_config = [ + "plugin", + "linux_cp_plugin.so", + "{", + "enable", + "}", + "plugin", + "linux_cp_unittest_plugin.so", + "{", + "enable", + "}", + "plugin", + "lldp_plugin.so", + "{", + "disable", + "}", + ] + + LACP_ETHERTYPE = 0x8809 + LLDP_ETHERTYPE = 0x88CC + + @classmethod + def setUpClass(cls): + super(TestLinuxCPEthertype, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestLinuxCPEthertype, cls).tearDownClass() + + def setUp(self): + super(TestLinuxCPEthertype, self).setUp() + self.create_pg_interfaces(range(2)) + for i in self.pg_interfaces: + i.admin_up() + + self.host = self.pg0 + self.phy = self.pg1 + + self.pair = VppLcpPair(self, self.phy, self.host).add_vpp_config() + self.logger.info(self.vapi.cli("sh lcp")) + + def tearDown(self): + self.pair.remove_vpp_config() + + for i in self.pg_interfaces: + i.admin_down() + super(TestLinuxCPEthertype, self).tearDown() + + def send_packet(self, sender, receiver, ethertype, dst, data, expect_copy=True): + packet = Ether(src=sender.remote_mac, dst=dst, type=ethertype) / data + if expect_copy: + rxs = self.send_and_expect(sender, [packet], receiver) + for rx in rxs: + self.assertEqual(packet.show2(True), rx.show2(True)) + else: + self.send_and_assert_no_replies(sender, [packet]) + + def send_lacp_packet(self, sender, receiver, expect_copy=True): + data = LACP( + actor_system="00:00:00:00:00:01", partner_system="00:00:00:00:00:02" + ) + self.send_packet( + sender, + receiver, + self.LACP_ETHERTYPE, + "01:80:c2:00:00:02", + data, + expect_copy, + ) + + def send_lldp_packet(self, sender, receiver, expect_copy=True): + data = ( + LLDPDUChassisID(subtype=4, id="01:02:03:04:05:06") + / LLDPDUPortID(subtype=3, id="07:08:09:0a:0b:0c") + / LLDPDUTimeToLive(ttl=120) + / LLDPDUEndOfLLDPDU() + ) + self.send_packet( + sender, + receiver, + self.LLDP_ETHERTYPE, + "01:80:c2:00:00:0e", + data, + expect_copy, + ) + + def check_ethertype_enabled(self, ethertype, enabled=True): + reply = self.vapi.lcp_ethertype_get() + output = self.vapi.cli("show lcp ethertype") + + if enabled: + self.assertIn(ethertype, reply.ethertypes) + self.assertIn(hex(ethertype), output) + else: + self.assertNotIn(ethertype, reply.ethertypes) + self.assertNotIn(hex(ethertype), output) + + def test_linux_cp_lacp(self): + """Linux CP LACP Test""" + self.check_ethertype_enabled(self.LACP_ETHERTYPE, enabled=False) + self.send_lacp_packet(self.phy, self.host, expect_copy=False) + self.send_lacp_packet(self.host, self.phy, expect_copy=False) + + self.vapi.cli("lcp ethertype enable " + str(self.LACP_ETHERTYPE)) + + self.check_ethertype_enabled(self.LACP_ETHERTYPE, enabled=True) + self.send_lacp_packet(self.phy, self.host, expect_copy=True) + self.send_lacp_packet(self.host, self.phy, expect_copy=True) + + def test_linux_cp_lldp(self): + """Linux CP LLDP Test""" + self.check_ethertype_enabled(self.LLDP_ETHERTYPE, enabled=False) + self.send_lldp_packet(self.phy, self.host, expect_copy=False) + self.send_lldp_packet(self.host, self.phy, expect_copy=False) + + self.vapi.cli("lcp ethertype enable " + str(self.LLDP_ETHERTYPE)) + + self.check_ethertype_enabled(self.LLDP_ETHERTYPE, enabled=True) + self.send_lldp_packet(self.phy, self.host, expect_copy=True) + self.send_lldp_packet(self.host, self.phy, expect_copy=True) + + if __name__ == "__main__": unittest.main(testRunner=VppTestRunner) |