diff options
author | Neale Ranns <nranns@cisco.com> | 2020-06-30 07:47:14 +0000 |
---|---|---|
committer | Dave Barach <openvpp@barachs.net> | 2020-07-21 18:42:25 +0000 |
commit | dd4ccf2623b547654d215ffcf42f9813e42aa90c (patch) | |
tree | 827d8a6086160b70e5f490dde520be6bc577e1d5 /test | |
parent | 0c65f52bb9395526613493aa9c042ea4f6dbc1fc (diff) |
ipsec: Dedicated IPSec interface type
Type: feature
Signed-off-by: Neale Ranns <nranns@cisco.com>
Change-Id: Ie8bd50df163aea2798e9f9d35a13dcadc4a4a4b2
Diffstat (limited to 'test')
-rw-r--r-- | test/template_ipsec.py | 2 | ||||
-rw-r--r-- | test/test_ipsec_tun_if_esp.py | 306 | ||||
-rw-r--r-- | test/vpp_ipsec.py | 39 |
3 files changed, 343 insertions, 4 deletions
diff --git a/test/template_ipsec.py b/test/template_ipsec.py index 1caed0dd932..c3484e1ad0b 100644 --- a/test/template_ipsec.py +++ b/test/template_ipsec.py @@ -914,6 +914,7 @@ class IpsecTun4(object): def verify_tun_64(self, p, count=1): self.vapi.cli("clear errors") + self.vapi.cli("clear ipsec sa") try: send_pkts = self.gen_encrypt_pkts6(p, p.scapy_tun_sa, self.tun_if, src=p.remote_tun_if_host6, @@ -1104,6 +1105,7 @@ class IpsecTun6(object): def verify_tun_46(self, p, count=1): """ ipsec 4o6 tunnel basic test """ self.vapi.cli("clear errors") + self.vapi.cli("clear ipsec sa") try: send_pkts = self.gen_encrypt_pkts(p, p.scapy_tun_sa, self.tun_if, src=p.remote_tun_if_host4, diff --git a/test/test_ipsec_tun_if_esp.py b/test/test_ipsec_tun_if_esp.py index a59baf1dfbf..183012608fe 100644 --- a/test/test_ipsec_tun_if_esp.py +++ b/test/test_ipsec_tun_if_esp.py @@ -15,7 +15,7 @@ from vpp_ipsec_tun_interface import VppIpsecTunInterface from vpp_gre_interface import VppGreInterface from vpp_ipip_tun_interface import VppIpIpTunInterface from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto -from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect +from vpp_ipsec import VppIpsecSA, VppIpsecTunProtect, VppIpsecInterface from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint from vpp_teib import VppTeib @@ -24,13 +24,18 @@ from vpp_papi import VppEnum from vpp_acl import AclRule, VppAcl, VppAclInterface -def config_tun_params(p, encryption_type, tun_if): +def config_tun_params(p, encryption_type, tun_if, src=None, dst=None): ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6} esn_en = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t. IPSEC_API_SAD_FLAG_USE_ESN)) crypt_key = mk_scapy_crypt_key(p) - p.tun_dst = tun_if.remote_ip - p.tun_src = tun_if.local_ip + if tun_if: + p.tun_dst = tun_if.remote_ip + p.tun_src = tun_if.local_ip + else: + p.tun_dst = dst + p.tun_src = src + p.scapy_tun_sa = SecurityAssociation( encryption_type, spi=p.vpp_tun_spi, crypt_algo=p.crypt_algo, @@ -2473,5 +2478,298 @@ class TestIpsec6TunProtectTunDrop(TemplateIpsec, self.unconfig_network(p) +class TemplateIpsecItf4(object): + """ IPsec Interface IPv4 """ + + encryption_type = ESP + tun4_encrypt_node_name = "esp4-encrypt-tun" + tun4_decrypt_node_name = "esp4-decrypt-tun" + tun4_input_node = "ipsec4-tun-input" + + def config_sa_tun(self, p, src, dst): + config_tun_params(p, self.encryption_type, None, src, dst) + + p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi, + p.auth_algo_vpp_id, p.auth_key, + p.crypt_algo_vpp_id, p.crypt_key, + self.vpp_esp_protocol, + src, dst, + flags=p.flags) + p.tun_sa_out.add_vpp_config() + + p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi, + p.auth_algo_vpp_id, p.auth_key, + p.crypt_algo_vpp_id, p.crypt_key, + self.vpp_esp_protocol, + dst, src, + flags=p.flags) + p.tun_sa_in.add_vpp_config() + + def config_protect(self, p): + p.tun_protect = VppIpsecTunProtect(self, + p.tun_if, + p.tun_sa_out, + [p.tun_sa_in]) + p.tun_protect.add_vpp_config() + + def config_network(self, p): + p.tun_if = VppIpsecInterface(self) + + p.tun_if.add_vpp_config() + p.tun_if.admin_up() + p.tun_if.config_ip4() + p.tun_if.config_ip6() + + p.route = VppIpRoute(self, p.remote_tun_if_host, 32, + [VppRoutePath(p.tun_if.remote_ip4, + 0xffffffff)]) + p.route.add_vpp_config() + r = VppIpRoute(self, p.remote_tun_if_host6, 128, + [VppRoutePath(p.tun_if.remote_ip6, + 0xffffffff, + proto=DpoProto.DPO_PROTO_IP6)]) + r.add_vpp_config() + + def unconfig_network(self, p): + p.route.remove_vpp_config() + p.tun_if.remove_vpp_config() + + def unconfig_protect(self, p): + p.tun_protect.remove_vpp_config() + + def unconfig_sa(self, p): + p.tun_sa_out.remove_vpp_config() + p.tun_sa_in.remove_vpp_config() + + +class TestIpsecItf4(TemplateIpsec, + TemplateIpsecItf4, + IpsecTun4): + """ IPsec Interface IPv4 """ + + def setUp(self): + super(TestIpsecItf4, self).setUp() + + self.tun_if = self.pg0 + + def tearDown(self): + super(TestIpsecItf4, self).tearDown() + + def test_tun_44(self): + """IPSEC interface IPv4""" + + n_pkts = 127 + p = self.ipv4_params + + self.config_network(p) + self.config_sa_tun(p, + self.pg0.local_ip4, + self.pg0.remote_ip4) + self.config_protect(p) + + self.verify_tun_44(p, count=n_pkts) + c = p.tun_if.get_rx_stats() + self.assertEqual(c['packets'], n_pkts) + c = p.tun_if.get_tx_stats() + self.assertEqual(c['packets'], n_pkts) + + p.tun_if.admin_down() + self.verify_tun_dropped_44(p, count=n_pkts) + p.tun_if.admin_up() + self.verify_tun_44(p, count=n_pkts) + + c = p.tun_if.get_rx_stats() + self.assertEqual(c['packets'], 3*n_pkts) + c = p.tun_if.get_tx_stats() + self.assertEqual(c['packets'], 2*n_pkts) + + # it's a v6 packet when its encrypted + self.tun4_encrypt_node_name = "esp6-encrypt-tun" + + self.verify_tun_64(p, count=n_pkts) + c = p.tun_if.get_rx_stats() + self.assertEqual(c['packets'], 4*n_pkts) + c = p.tun_if.get_tx_stats() + self.assertEqual(c['packets'], 3*n_pkts) + + self.tun4_encrypt_node_name = "esp4-encrypt-tun" + + self.vapi.cli("clear interfaces") + + # rekey - create new SAs and update the tunnel protection + np = copy.copy(p) + np.crypt_key = b'X' + p.crypt_key[1:] + np.scapy_tun_spi += 100 + np.scapy_tun_sa_id += 1 + np.vpp_tun_spi += 100 + np.vpp_tun_sa_id += 1 + np.tun_if.local_spi = p.vpp_tun_spi + np.tun_if.remote_spi = p.scapy_tun_spi + + self.config_sa_tun(np, + self.pg0.local_ip4, + self.pg0.remote_ip4) + self.config_protect(np) + self.unconfig_sa(p) + + self.verify_tun_44(np, count=n_pkts) + c = p.tun_if.get_rx_stats() + self.assertEqual(c['packets'], n_pkts) + c = p.tun_if.get_tx_stats() + self.assertEqual(c['packets'], n_pkts) + + # teardown + self.unconfig_protect(np) + self.unconfig_sa(np) + self.unconfig_network(p) + + +class TemplateIpsecItf6(object): + """ IPsec Interface IPv6 """ + + encryption_type = ESP + tun6_encrypt_node_name = "esp6-encrypt-tun" + tun6_decrypt_node_name = "esp6-decrypt-tun" + tun6_input_node = "ipsec6-tun-input" + + def config_sa_tun(self, p, src, dst): + config_tun_params(p, self.encryption_type, None, src, dst) + + p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi, + p.auth_algo_vpp_id, p.auth_key, + p.crypt_algo_vpp_id, p.crypt_key, + self.vpp_esp_protocol, + src, dst, + flags=p.flags) + p.tun_sa_out.add_vpp_config() + + p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi, + p.auth_algo_vpp_id, p.auth_key, + p.crypt_algo_vpp_id, p.crypt_key, + self.vpp_esp_protocol, + dst, src, + flags=p.flags) + p.tun_sa_in.add_vpp_config() + + def config_protect(self, p): + p.tun_protect = VppIpsecTunProtect(self, + p.tun_if, + p.tun_sa_out, + [p.tun_sa_in]) + p.tun_protect.add_vpp_config() + + def config_network(self, p): + p.tun_if = VppIpsecInterface(self) + + p.tun_if.add_vpp_config() + p.tun_if.admin_up() + p.tun_if.config_ip4() + p.tun_if.config_ip6() + + r = VppIpRoute(self, p.remote_tun_if_host4, 32, + [VppRoutePath(p.tun_if.remote_ip4, + 0xffffffff)]) + r.add_vpp_config() + + p.route = VppIpRoute(self, p.remote_tun_if_host, 128, + [VppRoutePath(p.tun_if.remote_ip6, + 0xffffffff, + proto=DpoProto.DPO_PROTO_IP6)]) + p.route.add_vpp_config() + + def unconfig_network(self, p): + p.route.remove_vpp_config() + p.tun_if.remove_vpp_config() + + def unconfig_protect(self, p): + p.tun_protect.remove_vpp_config() + + def unconfig_sa(self, p): + p.tun_sa_out.remove_vpp_config() + p.tun_sa_in.remove_vpp_config() + + +class TestIpsecItf6(TemplateIpsec, + TemplateIpsecItf6, + IpsecTun6): + """ IPsec Interface IPv6 """ + + def setUp(self): + super(TestIpsecItf6, self).setUp() + + self.tun_if = self.pg0 + + def tearDown(self): + super(TestIpsecItf6, self).tearDown() + + def test_tun_44(self): + """IPSEC interface IPv6""" + + n_pkts = 127 + p = self.ipv6_params + + self.config_network(p) + self.config_sa_tun(p, + self.pg0.local_ip6, + self.pg0.remote_ip6) + self.config_protect(p) + + self.verify_tun_66(p, count=n_pkts) + c = p.tun_if.get_rx_stats() + self.assertEqual(c['packets'], n_pkts) + c = p.tun_if.get_tx_stats() + self.assertEqual(c['packets'], n_pkts) + + p.tun_if.admin_down() + self.verify_drop_tun_66(p, count=n_pkts) + p.tun_if.admin_up() + self.verify_tun_66(p, count=n_pkts) + + c = p.tun_if.get_rx_stats() + self.assertEqual(c['packets'], 3*n_pkts) + c = p.tun_if.get_tx_stats() + self.assertEqual(c['packets'], 2*n_pkts) + + # it's a v4 packet when its encrypted + self.tun6_encrypt_node_name = "esp4-encrypt-tun" + + self.verify_tun_46(p, count=n_pkts) + c = p.tun_if.get_rx_stats() + self.assertEqual(c['packets'], 4*n_pkts) + c = p.tun_if.get_tx_stats() + self.assertEqual(c['packets'], 3*n_pkts) + + self.tun6_encrypt_node_name = "esp6-encrypt-tun" + + self.vapi.cli("clear interfaces") + + # rekey - create new SAs and update the tunnel protection + np = copy.copy(p) + np.crypt_key = b'X' + p.crypt_key[1:] + np.scapy_tun_spi += 100 + np.scapy_tun_sa_id += 1 + np.vpp_tun_spi += 100 + np.vpp_tun_sa_id += 1 + np.tun_if.local_spi = p.vpp_tun_spi + np.tun_if.remote_spi = p.scapy_tun_spi + + self.config_sa_tun(np, + self.pg0.local_ip6, + self.pg0.remote_ip6) + self.config_protect(np) + self.unconfig_sa(p) + + self.verify_tun_66(np, count=n_pkts) + c = p.tun_if.get_rx_stats() + self.assertEqual(c['packets'], n_pkts) + c = p.tun_if.get_tx_stats() + self.assertEqual(c['packets'], n_pkts) + + # teardown + self.unconfig_protect(np) + self.unconfig_sa(np) + self.unconfig_network(p) + + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner) diff --git a/test/vpp_ipsec.py b/test/vpp_ipsec.py index 985f6d4dc4e..f012a4a1e84 100644 --- a/test/vpp_ipsec.py +++ b/test/vpp_ipsec.py @@ -1,6 +1,7 @@ from vpp_object import VppObject from ipaddress import ip_address from vpp_papi import VppEnum +from vpp_interface import VppInterface try: text_type = unicode @@ -368,3 +369,41 @@ class VppIpsecTunProtect(VppObject): self.nh == str(b.tun.nh): return True return False + + +class VppIpsecInterface(VppInterface): + """ + VPP IPSec interface + """ + + def __init__(self, test, mode=None): + super(VppIpsecInterface, self).__init__(test) + + # only p2p mode is supported currently + self.mode = (VppEnum.vl_api_tunnel_mode_t. + TUNNEL_API_MODE_P2P) + + def add_vpp_config(self): + r = self.test.vapi.ipsec_itf_create(itf={ + 'user_instance': 0xffffffff, + 'mode': self.mode, + }) + self.set_sw_if_index(r.sw_if_index) + self.test.registry.register(self, self.test.logger) + return self + + def remove_vpp_config(self): + self.test.vapi.ipsec_itf_delete(sw_if_index=self._sw_if_index) + + def query_vpp_config(self): + ts = self.test.vapi.ipsec_itf_dump(sw_if_index=0xffffffff) + for t in ts: + if t.tunnel.sw_if_index == self._sw_if_index: + return True + return False + + def __str__(self): + return self.object_id() + + def object_id(self): + return "ipsec-%d" % self._sw_if_index |