From 02950406c49a743f631395ed52073921744e1afd Mon Sep 17 00:00:00 2001 From: Neale Ranns Date: Fri, 20 Dec 2019 00:54:57 +0000 Subject: ipsec: Targeted unit testing Type: fix 1 - big packets; chained buffers and those without enoguh space to add ESP header 2 - IPv6 extension headers in packets that are encrypted/decrypted 3 - Interface protection with SAs that have null algorithms Signed-off-by: Neale Ranns Change-Id: Ie330861fb06a9b248d9dcd5c730e21326ac8e973 --- test/template_ipsec.py | 164 +++++++++++++++++++++- test/test_ipsec_esp.py | 7 +- test/test_ipsec_tun_if_esp.py | 320 +++++++++++++++++++++++++++++++++++++++++- 3 files changed, 481 insertions(+), 10 deletions(-) (limited to 'test') diff --git a/test/template_ipsec.py b/test/template_ipsec.py index bcfc0d9253a..398a6bb0a23 100644 --- a/test/template_ipsec.py +++ b/test/template_ipsec.py @@ -6,7 +6,9 @@ from scapy.layers.inet import IP, ICMP, TCP, UDP from scapy.layers.ipsec import SecurityAssociation, ESP from scapy.layers.l2 import Ether from scapy.packet import Raw -from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest +from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, IPv6ExtHdrHopByHop, \ + IPv6ExtHdrFragment, IPv6ExtHdrDestOpt + from framework import VppTestCase, VppTestRunner from util import ppp, reassemble4, fragment_rfc791, fragment_rfc8200 @@ -641,6 +643,108 @@ class IpsecTra6(object): self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count) self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count) + def gen_encrypt_pkts_ext_hdrs6(self, sa, sw_intf, src, dst, count=1, + payload_size=54): + return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / + sa.encrypt(IPv6(src=src, dst=dst) / + ICMPv6EchoRequest(id=0, seq=1, + data='X' * payload_size)) + for i in range(count)] + + def gen_pkts_ext_hdrs6(self, sw_intf, src, dst, count=1, payload_size=54): + return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / + IPv6(src=src, dst=dst) / + IPv6ExtHdrHopByHop() / + IPv6ExtHdrFragment(id=2, offset=200) / + Raw(b'\xff' * 200) + for i in range(count)] + + def verify_tra_encrypted6(self, p, sa, rxs): + decrypted = [] + for rx in rxs: + self.assert_packet_checksums_valid(rx) + try: + decrypt_pkt = p.vpp_tra_sa.decrypt(rx[IPv6]) + decrypted.append(decrypt_pkt) + self.assert_equal(decrypt_pkt.src, self.tra_if.local_ip6) + self.assert_equal(decrypt_pkt.dst, self.tra_if.remote_ip6) + except: + self.logger.debug(ppp("Unexpected packet:", rx)) + try: + self.logger.debug(ppp("Decrypted packet:", decrypt_pkt)) + except: + pass + raise + return decrypted + + def verify_tra_66_ext_hdrs(self, p): + count = 63 + + # + # check we can decrypt with options + # + tx = self.gen_encrypt_pkts_ext_hdrs6(p.scapy_tra_sa, self.tra_if, + src=self.tra_if.remote_ip6, + dst=self.tra_if.local_ip6, + count=count) + self.send_and_expect(self.tra_if, tx, self.tra_if) + + # + # injecting a packet from ourselves to be routed of box is a hack + # but it matches an outbout policy, alors je ne regrette rien + # + + # one extension before ESP + tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) / + IPv6(src=self.tra_if.local_ip6, + dst=self.tra_if.remote_ip6) / + IPv6ExtHdrFragment(id=2, offset=200) / + Raw(b'\xff' * 200)) + + rxs = self.send_and_expect(self.pg2, [tx], self.tra_if) + dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs) + + for dc in dcs: + # for reasons i'm not going to investigate scapy does not + # created the correct headers after decrypt. but reparsing + # the ipv6 packet fixes it + dc = IPv6(raw(dc[IPv6])) + self.assert_equal(dc[IPv6ExtHdrFragment].id, 2) + + # two extensions before ESP + tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) / + IPv6(src=self.tra_if.local_ip6, + dst=self.tra_if.remote_ip6) / + IPv6ExtHdrHopByHop() / + IPv6ExtHdrFragment(id=2, offset=200) / + Raw(b'\xff' * 200)) + + rxs = self.send_and_expect(self.pg2, [tx], self.tra_if) + dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs) + + for dc in dcs: + dc = IPv6(raw(dc[IPv6])) + self.assertTrue(dc[IPv6ExtHdrHopByHop]) + self.assert_equal(dc[IPv6ExtHdrFragment].id, 2) + + # two extensions before ESP, one after + tx = (Ether(src=self.pg2.remote_mac, dst=self.pg2.local_mac) / + IPv6(src=self.tra_if.local_ip6, + dst=self.tra_if.remote_ip6) / + IPv6ExtHdrHopByHop() / + IPv6ExtHdrFragment(id=2, offset=200) / + IPv6ExtHdrDestOpt() / + Raw(b'\xff' * 200)) + + rxs = self.send_and_expect(self.pg2, [tx], self.tra_if) + dcs = self.verify_tra_encrypted6(p, p.vpp_tra_sa, rxs) + + for dc in dcs: + dc = IPv6(raw(dc[IPv6])) + self.assertTrue(dc[IPv6ExtHdrDestOpt]) + self.assertTrue(dc[IPv6ExtHdrHopByHop]) + self.assert_equal(dc[IPv6ExtHdrFragment].id, 2) + class IpsecTra6Tests(IpsecTra6): """ UT test methods for Transport v6 """ @@ -653,6 +757,12 @@ class IpsecTra6Tests(IpsecTra6): self.verify_tra_basic6(count=257) +class IpsecTra6ExtTests(IpsecTra6): + def test_tra_ext_hdrs_66(self): + """ ipsec 6o6 tra extension headers test """ + self.verify_tra_66_ext_hdrs(self.params[socket.AF_INET6]) + + class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests): """ UT test methods for Transport v6 and v4""" pass @@ -715,6 +825,7 @@ class IpsecTun4(object): def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None): self.vapi.cli("clear errors") + self.vapi.cli("clear ipsec counters") if not n_rx: n_rx = count try: @@ -736,8 +847,45 @@ class IpsecTun4(object): self.logger.info(self.vapi.ppcli("show error")) self.logger.info(self.vapi.ppcli("show ipsec all")) + self.logger.info(self.vapi.ppcli("show ipsec sa 0")) + self.logger.info(self.vapi.ppcli("show ipsec sa 4")) self.verify_counters4(p, count, n_rx) + """ verify methods for Transport v4 """ + def verify_tun_44_bad_packet_sizes(self, p): + # with a buffer size of 2048, 1989 bytes of payload + # means there isn't space to insert the ESP header + N_PKTS = 63 + for p_siz in [1989, 8500]: + send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if, + src=p.remote_tun_if_host, + dst=self.pg1.remote_ip4, + count=N_PKTS, + payload_size=p_siz) + self.send_and_assert_no_replies(self.tun_if, send_pkts) + send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4, + dst=p.remote_tun_if_host, count=N_PKTS, + payload_size=p_siz) + self.send_and_assert_no_replies(self.pg1, send_pkts, + self.tun_if) + + # both large packets on decrpyt count against chained buffers + # the 9000 bytes one does on encrypt + self.assertEqual(2 * N_PKTS, + self.statistics.get_err_counter( + '/err/%s/chained buffers (packet dropped)' % + self.tun4_decrypt_node_name)) + self.assertEqual(N_PKTS, + self.statistics.get_err_counter( + '/err/%s/chained buffers (packet dropped)' % + self.tun4_encrypt_node_name)) + + # on encrypt the 1989 size is no trailer space + self.assertEqual(N_PKTS, + self.statistics.get_err_counter( + '/err/%s/no trailer space (packet dropped)' % + self.tun4_encrypt_node_name)) + def verify_tun_reass_44(self, p): self.vapi.cli("clear errors") self.vapi.ip_reassembly_enable_disable( @@ -828,6 +976,10 @@ class IpsecTun4Tests(IpsecTun4): def test_tun_basic44(self): """ ipsec 4o4 tunnel basic test """ self.verify_tun_44(self.params[socket.AF_INET], count=1) + self.tun_if.admin_down() + self.tun_if.resolve_arp() + self.tun_if.admin_up() + self.verify_tun_44(self.params[socket.AF_INET], count=1) def test_tun_reass_basic44(self): """ ipsec 4o4 tunnel basic reassembly test """ @@ -835,7 +987,13 @@ class IpsecTun4Tests(IpsecTun4): def test_tun_burst44(self): """ ipsec 4o4 tunnel burst test """ - self.verify_tun_44(self.params[socket.AF_INET], count=257) + self.verify_tun_44(self.params[socket.AF_INET], count=127) + + +class IpsecTunEsp4Tests(IpsecTun4): + def test_tun_bad_packet_sizes(self): + """ ipsec v4 tunnel bad packet size """ + self.verify_tun_44_bad_packet_sizes(self.params[socket.AF_INET]) class IpsecTun6(object): @@ -926,7 +1084,7 @@ class IpsecTun6(object): src=p.remote_tun_if_host, dst=self.pg1.remote_ip6, count=1, - payload_size=1900) + payload_size=1850) send_pkts = fragment_rfc8200(send_pkts[0], 1, 1400, self.logger) recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1, n_rx=1) diff --git a/test/test_ipsec_esp.py b/test/test_ipsec_esp.py index 82346d64708..60e5c93ed65 100644 --- a/test/test_ipsec_esp.py +++ b/test/test_ipsec_esp.py @@ -9,7 +9,8 @@ from template_ipsec import IpsecTra46Tests, IpsecTun46Tests, TemplateIpsec, \ IpsecTcpTests, IpsecTun4Tests, IpsecTra4Tests, config_tra_params, \ config_tun_params, IPsecIPv4Params, IPsecIPv6Params, \ IpsecTra4, IpsecTun4, IpsecTra6, IpsecTun6, \ - IpsecTun6HandoffTests, IpsecTun4HandoffTests + IpsecTun6HandoffTests, IpsecTun4HandoffTests, \ + IpsecTra6ExtTests, IpsecTunEsp4Tests from vpp_ipsec import VppIpsecSpd, VppIpsecSpdEntry, VppIpsecSA,\ VppIpsecSpdItfBinding from vpp_ip_route import VppIpRoute, VppRoutePath @@ -286,7 +287,9 @@ class TemplateIpsecEsp(ConfigIpsecESP): super(TemplateIpsecEsp, self).tearDown() -class TestIpsecEsp1(TemplateIpsecEsp, IpsecTra46Tests, IpsecTun46Tests): +class TestIpsecEsp1(TemplateIpsecEsp, IpsecTra46Tests, + IpsecTun46Tests, IpsecTunEsp4Tests, + IpsecTra6ExtTests): """ Ipsec ESP - TUN & TRA tests """ pass diff --git a/test/test_ipsec_tun_if_esp.py b/test/test_ipsec_tun_if_esp.py index eefd477c71d..469ebc7fc87 100644 --- a/test/test_ipsec_tun_if_esp.py +++ b/test/test_ipsec_tun_if_esp.py @@ -304,6 +304,7 @@ class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4): p.scapy_tra_spi = p.scapy_tra_spi + ii p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii p.vpp_tra_spi = p.vpp_tra_spi + ii + p.tun_dst = self.pg0.remote_hosts[ii].ip4 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi, p.scapy_tun_spi, @@ -311,7 +312,7 @@ class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4): p.crypt_key, p.crypt_key, p.auth_algo_vpp_id, p.auth_key, p.auth_key, - dst=self.pg0.remote_hosts[ii].ip4) + dst=p.tun_dst) p.tun_if.add_vpp_config() p.tun_if.admin_up() p.tun_if.config_ip4() @@ -334,6 +335,27 @@ class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4): c = p.tun_if.get_tx_stats() self.assertEqual(c['packets'], 127) + def test_tun_rr_44(self): + """ Round-robin packets acrros multiple interface """ + tx = [] + for p in self.multi_params: + tx = tx + self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if, + src=p.remote_tun_if_host, + dst=self.pg1.remote_ip4) + rxs = self.send_and_expect(self.tun_if, tx, self.pg1) + + for rx, p in zip(rxs, self.multi_params): + self.verify_decrypted(p, [rx]) + + tx = [] + for p in self.multi_params: + tx = tx + self.gen_pkts(self.pg1, src=self.pg1.remote_ip4, + dst=p.remote_tun_if_host) + rxs = self.send_and_expect(self.pg1, tx, self.tun_if) + + for rx, p in zip(rxs, self.multi_params): + self.verify_encrypted(p, p.vpp_tun_sa, [rx]) + class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4): """ IPsec IPv4 Tunnel interface all Algos """ @@ -521,6 +543,69 @@ class TestIpsec4TunIfEspAll(TemplateIpsec, IpsecTun4): p.tun_sa_in.remove_vpp_config() +class TestIpsec4TunIfEspNoAlgo(TemplateIpsec, IpsecTun4): + """ IPsec IPv4 Tunnel interface all Algos """ + + encryption_type = ESP + tun4_encrypt_node_name = "esp4-encrypt-tun" + tun4_decrypt_node_name = "esp4-decrypt-tun" + + def config_network(self, p): + + p.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t. + IPSEC_API_INTEG_ALG_NONE) + p.auth_algo = 'NULL' + p.auth_key = [] + + p.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t. + IPSEC_API_CRYPTO_ALG_NONE) + p.crypt_algo = 'NULL' + p.crypt_key = [] + + p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi, + p.scapy_tun_spi, + p.crypt_algo_vpp_id, + p.crypt_key, p.crypt_key, + p.auth_algo_vpp_id, p.auth_key, + p.auth_key, + salt=p.salt) + p.tun_if.add_vpp_config() + p.tun_if.admin_up() + p.tun_if.config_ip4() + config_tun_params(p, self.encryption_type, p.tun_if) + self.logger.info(self.vapi.cli("sh ipsec sa 0")) + self.logger.info(self.vapi.cli("sh ipsec sa 1")) + + p.route = VppIpRoute(self, p.remote_tun_if_host, 32, + [VppRoutePath(p.tun_if.remote_ip4, + 0xffffffff)]) + p.route.add_vpp_config() + + def unconfig_network(self, p): + p.tun_if.unconfig_ip4() + p.tun_if.remove_vpp_config() + p.route.remove_vpp_config() + + def setUp(self): + super(TestIpsec4TunIfEspNoAlgo, self).setUp() + + self.tun_if = self.pg0 + + def tearDown(self): + super(TestIpsec4TunIfEspNoAlgo, self).tearDown() + + def test_tun_44(self): + p = self.ipv4_params + + self.config_network(p) + + tx = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4, + dst=p.remote_tun_if_host) + self.send_and_assert_no_replies(self.pg1, tx) + + self.unconfig_network(p) + + class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6): """ IPsec IPv6 Multi Tunnel interface """ @@ -919,6 +1004,16 @@ class TestIpsecGreIfEspTra(TemplateIpsec, Raw(b'X' * payload_size)) for i in range(count)] + def gen_encrypt_non_ip_pkts(self, sa, sw_intf, src, dst, count=1, + payload_size=100): + return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / + sa.encrypt(IP(src=self.pg0.remote_ip4, + dst=self.pg0.local_ip4) / + GRE() / + UDP(sport=1144, dport=2233) / + Raw(b'X' * payload_size)) + for i in range(count)] + def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=100): return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / @@ -996,6 +1091,115 @@ class TestIpsecGreIfEspTra(TemplateIpsec, p.tun_if.unconfig_ip4() super(TestIpsecGreIfEspTra, self).tearDown() + def test_gre_non_ip(self): + p = self.ipv4_params + tx = self.gen_encrypt_non_ip_pkts(p.scapy_tun_sa, self.tun_if, + src=p.remote_tun_if_host, + dst=self.pg1.remote_ip6) + self.send_and_assert_no_replies(self.tun_if, tx) + node_name = ('/err/%s/unsupported payload' % + self.tun4_decrypt_node_name) + self.assertEqual(1, self.statistics.get_err_counter(node_name)) + + +class TestIpsecGre6IfEspTra(TemplateIpsec, + IpsecTun6Tests): + """ Ipsec GRE ESP - TRA tests """ + tun6_encrypt_node_name = "esp6-encrypt-tun" + tun6_decrypt_node_name = "esp6-decrypt-tun" + encryption_type = ESP + + def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1, + payload_size=100): + return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / + sa.encrypt(IPv6(src=self.pg0.remote_ip6, + dst=self.pg0.local_ip6) / + GRE() / + IPv6(src=self.pg1.local_ip6, + dst=self.pg1.remote_ip6) / + UDP(sport=1144, dport=2233) / + Raw(b'X' * payload_size)) + for i in range(count)] + + def gen_pkts6(self, sw_intf, src, dst, count=1, + payload_size=100): + return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / + IPv6(src="1::1", dst="1::2") / + UDP(sport=1144, dport=2233) / + Raw(b'X' * payload_size) + for i in range(count)] + + def verify_decrypted6(self, p, rxs): + for rx in rxs: + self.assert_equal(rx[Ether].dst, self.pg1.remote_mac) + self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6) + + def verify_encrypted6(self, p, sa, rxs): + for rx in rxs: + try: + pkt = sa.decrypt(rx[IPv6]) + if not pkt.haslayer(IPv6): + pkt = IPv6(pkt[Raw].load) + self.assert_packet_checksums_valid(pkt) + self.assertTrue(pkt.haslayer(GRE)) + e = pkt[GRE] + self.assertEqual(e[IPv6].dst, "1::2") + except (IndexError, AssertionError): + self.logger.debug(ppp("Unexpected packet:", rx)) + try: + self.logger.debug(ppp("Decrypted packet:", pkt)) + except: + pass + raise + + def setUp(self): + super(TestIpsecGre6IfEspTra, self).setUp() + + self.tun_if = self.pg0 + + p = self.ipv6_params + + bd1 = VppBridgeDomain(self, 1) + bd1.add_vpp_config() + + p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi, + p.auth_algo_vpp_id, p.auth_key, + p.crypt_algo_vpp_id, p.crypt_key, + self.vpp_esp_protocol) + p.tun_sa_out.add_vpp_config() + + p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi, + p.auth_algo_vpp_id, p.auth_key, + p.crypt_algo_vpp_id, p.crypt_key, + self.vpp_esp_protocol) + p.tun_sa_in.add_vpp_config() + + p.tun_if = VppGreInterface(self, + self.pg0.local_ip6, + self.pg0.remote_ip6) + p.tun_if.add_vpp_config() + + p.tun_protect = VppIpsecTunProtect(self, + p.tun_if, + p.tun_sa_out, + [p.tun_sa_in]) + p.tun_protect.add_vpp_config() + + p.tun_if.admin_up() + p.tun_if.config_ip6() + config_tra_params(p, self.encryption_type, p.tun_if) + + r = VppIpRoute(self, "1::2", 128, + [VppRoutePath(p.tun_if.remote_ip6, + 0xffffffff, + proto=DpoProto.DPO_PROTO_IP6)]) + r.add_vpp_config() + + def tearDown(self): + p = self.ipv6_params + p.tun_if.unconfig_ip6() + super(TestIpsecGre6IfEspTra, self).tearDown() + class TemplateIpsec4TunProtect(object): """ IPsec IPv4 Tunnel protect """ @@ -1286,6 +1490,54 @@ class TestIpsec4TunProtectTun(TemplateIpsec, self.unconfig_network(p) +class TestIpsec4TunProtectTunDrop(TemplateIpsec, + TemplateIpsec4TunProtect, + IpsecTun4): + """ IPsec IPv4 Tunnel protect - tunnel mode - drop""" + + encryption_type = ESP + tun4_encrypt_node_name = "esp4-encrypt-tun" + tun4_decrypt_node_name = "esp4-decrypt-tun" + + def setUp(self): + super(TestIpsec4TunProtectTunDrop, self).setUp() + + self.tun_if = self.pg0 + + def tearDown(self): + super(TestIpsec4TunProtectTunDrop, self).tearDown() + + def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1, + payload_size=100): + return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / + sa.encrypt(IP(src=sw_intf.remote_ip4, + dst="5.5.5.5") / + IP(src=src, dst=dst) / + UDP(sport=1144, dport=2233) / + Raw(b'X' * payload_size)) + for i in range(count)] + + def test_tun_drop_44(self): + """IPSEC tunnel protect bogus tunnel header """ + + p = self.ipv4_params + + self.config_network(p) + self.config_sa_tun(p) + self.config_protect(p) + + tx = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if, + src=p.remote_tun_if_host, + dst=self.pg1.remote_ip4, + count=63) + self.send_and_assert_no_replies(self.tun_if, tx) + + # teardown + self.unconfig_protect(p) + self.unconfig_sa(p) + self.unconfig_network(p) + + class TemplateIpsec6TunProtect(object): """ IPsec IPv6 Tunnel protect """ @@ -1379,7 +1631,7 @@ class TestIpsec6TunProtect(TemplateIpsec, super(TestIpsec6TunProtect, self).tearDown() def test_tun_66(self): - """IPSEC tunnel protect""" + """IPSEC tunnel protect 6o6""" p = self.ipv6_params @@ -1413,6 +1665,15 @@ class TestIpsec6TunProtect(TemplateIpsec, c = p.tun_if.get_tx_stats() self.assertEqual(c['packets'], 254) + # bounce the interface state + p.tun_if.admin_down() + self.verify_drop_tun_66(np, count=127) + node = ('/err/ipsec6-tun-input/%s' % + 'ipsec packets received on disabled interface') + self.assertEqual(127, self.statistics.get_err_counter(node)) + p.tun_if.admin_up() + self.verify_tun_66(np, count=127) + # 3 phase rekey # 1) add two input SAs [old, new] # 2) swap output SA to [new] @@ -1447,9 +1708,9 @@ class TestIpsec6TunProtect(TemplateIpsec, self.verify_drop_tun_66(np, count=127) c = p.tun_if.get_rx_stats() - self.assertEqual(c['packets'], 127*7) + self.assertEqual(c['packets'], 127*9) c = p.tun_if.get_tx_stats() - self.assertEqual(c['packets'], 127*7) + self.assertEqual(c['packets'], 127*8) self.unconfig_sa(np) # teardown @@ -1458,7 +1719,7 @@ class TestIpsec6TunProtect(TemplateIpsec, self.unconfig_network(p) def test_tun_46(self): - """IPSEC tunnel protect""" + """IPSEC tunnel protect 4o6""" p = self.ipv6_params @@ -1581,5 +1842,54 @@ class TestIpsec6TunProtectTun(TemplateIpsec, self.unconfig_network(p) +class TestIpsec6TunProtectTunDrop(TemplateIpsec, + TemplateIpsec6TunProtect, + IpsecTun6): + """ IPsec IPv6 Tunnel protect - tunnel mode - drop""" + + encryption_type = ESP + tun6_encrypt_node_name = "esp6-encrypt-tun" + tun6_decrypt_node_name = "esp6-decrypt-tun" + + def setUp(self): + super(TestIpsec6TunProtectTunDrop, self).setUp() + + self.tun_if = self.pg0 + + def tearDown(self): + super(TestIpsec6TunProtectTunDrop, self).tearDown() + + def gen_encrypt_pkts5(self, sa, sw_intf, src, dst, count=1, + payload_size=100): + # the IP destination of the revelaed packet does not match + # that assigned to the tunnel + return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) / + sa.encrypt(IPv6(src=sw_intf.remote_ip6, + dst="5::5") / + IPv6(src=src, dst=dst) / + UDP(sport=1144, dport=2233) / + Raw(b'X' * payload_size)) + for i in range(count)] + + def test_tun_drop_66(self): + """IPSEC 6 tunnel protect bogus tunnel header """ + + p = self.ipv6_params + + self.config_network(p) + self.config_sa_tun(p) + self.config_protect(p) + + tx = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if, + src=p.remote_tun_if_host, + dst=self.pg1.remote_ip6, + count=63) + self.send_and_assert_no_replies(self.tun_if, tx) + + self.unconfig_protect(p) + self.unconfig_sa(p) + self.unconfig_network(p) + + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner) -- cgit 1.2.3-korg