diff options
Diffstat (limited to 'test/test_ipsec_spd_fp_input.py')
-rw-r--r-- | test/test_ipsec_spd_fp_input.py | 117 |
1 files changed, 77 insertions, 40 deletions
diff --git a/test/test_ipsec_spd_fp_input.py b/test/test_ipsec_spd_fp_input.py index 199fbdf7c5d..9037ed25902 100644 --- a/test/test_ipsec_spd_fp_input.py +++ b/test/test_ipsec_spd_fp_input.py @@ -7,7 +7,6 @@ from framework import VppTestRunner from template_ipsec import IPSecIPv4Fwd from template_ipsec import IPSecIPv6Fwd from test_ipsec_esp import TemplateIpsecEsp -import pdb def debug_signal_handler(signal, frame): @@ -35,30 +34,6 @@ class SpdFastPathInbound(IPSecIPv4Fwd): cls.vpp_cmdline.extend(["ipsec", "{", "ipv4-inbound-spd-fast-path on", "}"]) cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline)) - @classmethod - def create_enc_stream(self, src_if, dst_if, pkt_count, src_prt=1234, dst_prt=5678): - packets = [] - params = self.params[socket.AF_INET] - for i in range(pkt_count): - # create packet info stored in the test case instance - info = self.create_packet_info(src_if, dst_if) - # convert the info into packet payload - payload = self.info_to_payload(info) - # create the packet itself - p = Ether( - src=self.tra_if.remote_mac, dst=self.tra_if.local_mac - ) / params.scapy_tra_sa.encrypt( - IP(src=self.tra_if.remote_ip4, dst=self.tra_if.local_ip4) - / UDP(sport=src_prt, dport=dst_prt) - / Raw(payload) - ) - # store a copy of the packet in the packet info - info.data = p.copy() - # append the packet to the list - packets.append(p) - # return the created packet list - return packets - class SpdFastPathInboundProtect(TemplateIpsecEsp): @classmethod @@ -98,6 +73,29 @@ class SpdFastPathIPv6Inbound(IPSecIPv6Fwd): cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline)) +class SpdFastPathIPv6InboundProtect(TemplateIpsecEsp): + @classmethod + def setUpConstants(cls): + super(SpdFastPathIPv6InboundProtect, cls).setUpConstants() + cls.vpp_cmdline.extend(["ipsec", "{", "ipv6-inbound-spd-fast-path on", "}"]) + cls.logger.info("VPP modified cmdline is %s" % " ".join(cls.vpp_cmdline)) + + @classmethod + def setUpClass(cls): + super(SpdFastPathIPv6InboundProtect, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(SpdFastPathIPv6InboundProtect, cls).tearDownClass() + + def setUp(self): + super(SpdFastPathIPv6InboundProtect, self).setUp() + + def tearDown(self): + self.unconfig_network() + super(SpdFastPathIPv6InboundProtect, self).tearDown() + + class IPSec4SpdTestCaseBypass(SpdFastPathInbound): """ IPSec/IPv4 inbound: Policy mode test case with fast path \ (add bypass)""" @@ -206,17 +204,12 @@ class IPSec4SpdTestCaseDiscard(SpdFastPathInbound): # even though it's lower priority policy_0 = self.spd_add_rem_policy( # inbound, priority 10 1, - self.pg1, self.pg0, + self.pg1, socket.IPPROTO_UDP, is_out=0, priority=10, policy_type="discard", - ip_range=True, - local_ip_start=self.pg0.remote_ip4, - local_ip_stop=self.pg0.remote_ip4, - remote_ip_start=self.pg1.remote_ip4, - remote_ip_stop=self.pg1.remote_ip4, ) # create output rule so we can capture forwarded packets @@ -264,16 +257,9 @@ class IPSec4SpdTestCaseProtect(SpdFastPathInboundProtect): super(IPSec4SpdTestCaseProtect, self).tearDown() def test_ipsec_spd_inbound_protect(self): - # In this test case, packets in IPv4 FWD path are configured + # In this test case, encrypted packets in IPv4 + # PROTECT path are configured # to go through IPSec inbound SPD policy lookup. - # - # 2 inbound SPD rules (1 HIGH and 1 LOW) are added. - # - High priority rule action is set to DISCARD. - # - Low priority rule action is set to BYPASS. - # - # Since BYPASS rules take precedence over DISCARD - # (the order being PROTECT, BYPASS, DISCARD) we expect the - # BYPASS rule to match and traffic to be correctly forwarded. pkt_count = 5 payload_size = 64 @@ -840,5 +826,56 @@ class IPSec4SpdTestCaseMultiple(SpdFastPathInbound): self.verify_policy_match(0, policy_22) +class IPSec6SpdTestCaseProtect(SpdFastPathIPv6InboundProtect): + """ IPSec/IPv6 inbound: Policy mode test case with fast path \ + (add protect)""" + + @classmethod + def setUpClass(cls): + super(IPSec6SpdTestCaseProtect, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(IPSec6SpdTestCaseProtect, cls).tearDownClass() + + def setUp(self): + super(IPSec6SpdTestCaseProtect, self).setUp() + + def tearDown(self): + super(IPSec6SpdTestCaseProtect, self).tearDown() + + def test_ipsec6_spd_inbound_protect(self): + pkt_count = 5 + payload_size = 64 + p = self.params[socket.AF_INET6] + send_pkts = self.gen_encrypt_pkts6( + p, + p.scapy_tra_sa, + self.tra_if, + src=self.tra_if.local_ip6, + dst=self.tra_if.remote_ip6, + count=pkt_count, + payload_size=payload_size, + ) + recv_pkts = self.send_and_expect(self.tra_if, send_pkts, self.tra_if) + + self.logger.info(self.vapi.ppcli("show error")) + self.logger.info(self.vapi.ppcli("show ipsec all")) + pkts = p.tra_sa_in.get_stats()["packets"] + self.assertEqual( + pkts, + pkt_count, + "incorrect SA in counts: expected %d != %d" % (pkt_count, pkts), + ) + pkts = p.tra_sa_out.get_stats()["packets"] + self.assertEqual( + pkts, + pkt_count, + "incorrect SA out counts: expected %d != %d" % (pkt_count, pkts), + ) + self.assertEqual(p.tra_sa_out.get_lost(), 0) + self.assertEqual(p.tra_sa_in.get_lost(), 0) + + if __name__ == "__main__": unittest.main(testRunner=VppTestRunner) |