diff options
author | Neale Ranns <nranns@cisco.com> | 2019-12-20 00:54:57 +0000 |
---|---|---|
committer | Neale Ranns <nranns@cisco.com> | 2020-01-04 04:50:47 +0000 |
commit | 02950406c49a743f631395ed52073921744e1afd (patch) | |
tree | 2891403e2fe8cc879f43d4e46e314a2f412763cb /test/template_ipsec.py | |
parent | 2f04cb9f142abef82cd379432cecdafef9e776db (diff) |
ipsec: Targeted unit testing
Type: fix
1 - big packets; chained buffers and those without enoguh space to add
ESP header
2 - IPv6 extension headers in packets that are encrypted/decrypted
3 - Interface protection with SAs that have null algorithms
Signed-off-by: Neale Ranns <nranns@cisco.com>
Change-Id: Ie330861fb06a9b248d9dcd5c730e21326ac8e973
Diffstat (limited to 'test/template_ipsec.py')
-rw-r--r-- | test/template_ipsec.py | 164 |
1 files changed, 161 insertions, 3 deletions
diff --git a/test/template_ipsec.py b/test/template_ipsec.py index bcfc0d9253a..398a6bb0a23 100644 --- a/test/template_ipsec.py +++ b/test/template_ipsec.py @@ -6,7 +6,9 @@ from scapy.layers.inet import IP, ICMP, TCP, UDP from scapy.layers.ipsec import SecurityAssociation, ESP from scapy.layers.l2 import Ether from scapy.packet import Raw -from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest +from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, IPv6ExtHdrHopByHop, \ + IPv6ExtHdrFragment, IPv6ExtHdrDestOpt + from framework import VppTestCase, VppTestRunner from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200 @@ -641,6 +643,108 @@ class IpsecTra6(object): self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count) self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count) + def gen_encrypt_pkts_ext_hdrs6(self, sa, sw_intf, src, dst, count=1, + payload_size=54): + return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / + sa.encrypt(IPv6(src=src, dst=dst) / + ICMPv6EchoRequest(id=0, seq=1, + data='X' * payload_size)) + for i in range(count)] + + def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54): + return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / + IPv6(src=src, dst=dst) / + IPv6ExtHdrHopByHop() / + IPv6ExtHdrFragment(id=2, offset=200) / + Raw(b'\xff' * 200) + for i in range(count)] + + def verify_tra_encrypted6(self, p, sa, rxs): + decrypted = [] + for rx in rxs: + self.assert_packet_checksums_valid(rx) + try: + decrypt_pkt = p.vpp_tra_sa.decrypt(rx[IPv6]) + decrypted.append(decrypt_pkt) + self.assert_equal(decrypt_pkt.src, self.tra_if.local_ip6) + self.assert_equal(decrypt_pkt.dst, self.tra_if.remote_ip6) + except: + self.logger.debug(ppp("Unexpected packet:", rx)) + try: + self.logger.debug(ppp("Decrypted packet:", decrypt_pkt)) + except: + pass + raise + return decrypted + + def verify_tra_66_ext_hdrs(self, p): + count = 63 + + # + # check we can decrypt with options + # + tx = self.gen_encrypt_pkts_ext_hdrs6(p.scapy_tra_sa, self.tra_if, + src=self.tra_if.remote_ip6, + dst=self.tra_if.local_ip6, + count=count) + self.send_and_expect(self.tra_if, tx, self.tra_if) + + # + # injecting a packet from ourselves to be routed of box is a hack + # but it matches an outbout policy, alors je ne regrette rien + # + + # one extension before ESP + tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) / + IPv6(src=self.tra_if.local_ip6, + dst=self.tra_if.remote_ip6) / + IPv6ExtHdrFragment(id=2, offset=200) / + Raw(b'\xff' * 200)) + + rxs = self.send_and_expect(self.pg2, [tx], self.tra_if) + dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs) + + for dc in dcs: + # for reasons i'm not going to investigate scapy does not + # created the correct headers after decrypt. but reparsing + # the ipv6 packet fixes it + dc = IPv6(raw(dc[IPv6])) + self.assert_equal(dc[IPv6ExtHdrFragment].id, 2) + + # two extensions before ESP + tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) / + IPv6(src=self.tra_if.local_ip6, + dst=self.tra_if.remote_ip6) / + IPv6ExtHdrHopByHop() / + IPv6ExtHdrFragment(id=2, offset=200) / + Raw(b'\xff' * 200)) + + rxs = self.send_and_expect(self.pg2, [tx], self.tra_if) + dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs) + + for dc in dcs: + dc = IPv6(raw(dc[IPv6])) + self.assertTrue(dc[IPv6ExtHdrHopByHop]) + self.assert_equal(dc[IPv6ExtHdrFragment].id, 2) + + # two extensions before ESP, one after + tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) / + IPv6(src=self.tra_if.local_ip6, + dst=self.tra_if.remote_ip6) / + IPv6ExtHdrHopByHop() / + IPv6ExtHdrFragment(id=2, offset=200) / + IPv6ExtHdrDestOpt() / + Raw(b'\xff' * 200)) + + rxs = self.send_and_expect(self.pg2, [tx], self.tra_if) + dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs) + + for dc in dcs: + dc = IPv6(raw(dc[IPv6])) + self.assertTrue(dc[IPv6ExtHdrDestOpt]) + self.assertTrue(dc[IPv6ExtHdrHopByHop]) + self.assert_equal(dc[IPv6ExtHdrFragment].id, 2) + class IpsecTra6Tests(IpsecTra6): """ UT test methods for Transport v6 """ @@ -653,6 +757,12 @@ class IpsecTra6Tests(IpsecTra6): self.verify_tra_basic6(count=257) +class IpsecTra6ExtTests(IpsecTra6): + def test_tra_ext_hdrs_66(self): + """ ipsec 6o6 tra extension headers test """ + self.verify_tra_66_ext_hdrs(self.params[socket.AF_INET6]) + + class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests): """ UT test methods for Transport v6 and v4""" pass @@ -715,6 +825,7 @@ class IpsecTun4(object): def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None): self.vapi.cli("clear errors") + self.vapi.cli("clear ipsec counters") if not n_rx: n_rx = count try: @@ -736,8 +847,45 @@ class IpsecTun4(object): self.logger.info(self.vapi.ppcli("show error")) self.logger.info(self.vapi.ppcli("show ipsec all")) + self.logger.info(self.vapi.ppcli("show ipsec sa 0")) + self.logger.info(self.vapi.ppcli("show ipsec sa 4")) self.verify_counters4(p, count, n_rx) + """ verify methods for Transport v4 """ + def verify_tun_44_bad_packet_sizes(self, p): + # with a buffer size of 2048, 1989 bytes of payload + # means there isn't space to insert the ESP header + N_PKTS = 63 + for p_siz in [1989, 8500]: + send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if, + src=p.remote_tun_if_host, + dst=self.pg1.remote_ip4, + count=N_PKTS, + payload_size=p_siz) + self.send_and_assert_no_replies(self.tun_if, send_pkts) + send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4, + dst=p.remote_tun_if_host, count=N_PKTS, + payload_size=p_siz) + self.send_and_assert_no_replies(self.pg1, send_pkts, + self.tun_if) + + # both large packets on decrpyt count against chained buffers + # the 9000 bytes one does on encrypt + self.assertEqual(2 * N_PKTS, + self.statistics.get_err_counter( + '/err/%s/chained buffers (packet dropped)' % + self.tun4_decrypt_node_name)) + self.assertEqual(N_PKTS, + self.statistics.get_err_counter( + '/err/%s/chained buffers (packet dropped)' % + self.tun4_encrypt_node_name)) + + # on encrypt the 1989 size is no trailer space + self.assertEqual(N_PKTS, + self.statistics.get_err_counter( + '/err/%s/no trailer space (packet dropped)' % + self.tun4_encrypt_node_name)) + def verify_tun_reass_44(self, p): self.vapi.cli("clear errors") self.vapi.ip_reassembly_enable_disable( @@ -828,6 +976,10 @@ class IpsecTun4Tests(IpsecTun4): def test_tun_basic44(self): """ ipsec 4o4 tunnel basic test """ self.verify_tun_44(self.params[socket.AF_INET], count=1) + self.tun_if.admin_down() + self.tun_if.resolve_arp() + self.tun_if.admin_up() + self.verify_tun_44(self.params[socket.AF_INET], count=1) def test_tun_reass_basic44(self): """ ipsec 4o4 tunnel basic reassembly test """ @@ -835,7 +987,13 @@ class IpsecTun4Tests(IpsecTun4): def test_tun_burst44(self): """ ipsec 4o4 tunnel burst test """ - self.verify_tun_44(self.params[socket.AF_INET], count=257) + self.verify_tun_44(self.params[socket.AF_INET], count=127) + + +class IpsecTunEsp4Tests(IpsecTun4): + def test_tun_bad_packet_sizes(self): + """ ipsec v4 tunnel bad packet size """ + self.verify_tun_44_bad_packet_sizes(self.params[socket.AF_INET]) class IpsecTun6(object): @@ -926,7 +1084,7 @@ class IpsecTun6(object): src=p.remote_tun_if_host, dst=self.pg1.remote_ip6, count=1, - payload_size=1900) + payload_size=1850) send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger) recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1, n_rx=1) |