From 4b089f27b3eda69be2fc8a9ef9f74d39cd00fc7f Mon Sep 17 00:00:00 2001 From: Klement Sekera Date: Tue, 17 Apr 2018 18:04:57 +0200 Subject: ipsec: support UDP encap/decap for NAT traversal Change-Id: I65c12617ad49e4d5ef242e53988782f0cefa5684 Signed-off-by: Klement Sekera --- test/test_ipsec_ah.py | 8 +- test/test_ipsec_esp.py | 12 +-- test/test_ipsec_nat.py | 260 ++++++++++++++++++++++++++++++++++++++++++++++ test/vpp_papi_provider.py | 11 +- 4 files changed, 276 insertions(+), 15 deletions(-) create mode 100644 test/test_ipsec_nat.py (limited to 'test') diff --git a/test/test_ipsec_ah.py b/test/test_ipsec_ah.py index 042bb88f5be..fc19d781561 100644 --- a/test/test_ipsec_ah.py +++ b/test/test_ipsec_ah.py @@ -99,14 +99,14 @@ class TestIpsecAh(VppTestCase): l_stopaddr, r_startaddr, r_stopaddr, - protocol=51) + protocol=socket.IPPROTO_AH) cls.vapi.ipsec_spd_add_del_entry( spd_id, l_startaddr, l_stopaddr, r_startaddr, r_stopaddr, - protocol=51, + protocol=socket.IPPROTO_AH, is_outbound=0) l_startaddr = l_stopaddr = socket.inet_pton( socket.AF_INET, cls.remote_pg0_lb_addr) @@ -164,14 +164,14 @@ class TestIpsecAh(VppTestCase): l_stopaddr, r_startaddr, r_stopaddr, - protocol=51) + protocol=socket.IPPROTO_AH) cls.vapi.ipsec_spd_add_del_entry( spd_id, l_startaddr, l_stopaddr, r_startaddr, r_stopaddr, - protocol=51, + protocol=socket.IPPROTO_AH, is_outbound=0) l_startaddr = l_stopaddr = cls.pg2.local_ip4n r_startaddr = r_stopaddr = cls.pg2.remote_ip4n diff --git a/test/test_ipsec_esp.py b/test/test_ipsec_esp.py index f6782749148..15ce4a96878 100644 --- a/test/test_ipsec_esp.py +++ b/test/test_ipsec_esp.py @@ -31,11 +31,11 @@ class TestIpsecEsp(VppTestCase): TUNNEL MODE: --- encrypt --- plain --- - |pg0| -------> |VPP| ------> |pg1| + |pg0| <------- |VPP| <------ |pg1| --- --- --- --- decrypt --- plain --- - |pg0| <------- |VPP| <------ |pg1| + |pg0| -------> |VPP| ------> |pg1| --- --- --- Note : IPv6 is not covered @@ -103,14 +103,14 @@ class TestIpsecEsp(VppTestCase): l_stopaddr, r_startaddr, r_stopaddr, - protocol=50) + protocol=socket.IPPROTO_ESP) cls.vapi.ipsec_spd_add_del_entry( spd_id, l_startaddr, l_stopaddr, r_startaddr, r_stopaddr, - protocol=50, + protocol=socket.IPPROTO_ESP, is_outbound=0) l_startaddr = l_stopaddr = socket.inet_pton( socket.AF_INET, cls.remote_pg0_lb_addr) @@ -172,14 +172,14 @@ class TestIpsecEsp(VppTestCase): l_stopaddr, r_startaddr, r_stopaddr, - protocol=50) + protocol=socket.IPPROTO_ESP) cls.vapi.ipsec_spd_add_del_entry( spd_id, l_startaddr, l_stopaddr, r_startaddr, r_stopaddr, - protocol=50, + protocol=socket.IPPROTO_ESP, is_outbound=0) l_startaddr = l_stopaddr = cls.pg2.local_ip4n r_startaddr = r_stopaddr = cls.pg2.remote_ip4n diff --git a/test/test_ipsec_nat.py b/test/test_ipsec_nat.py new file mode 100644 index 00000000000..9c22fbb559a --- /dev/null +++ b/test/test_ipsec_nat.py @@ -0,0 +1,260 @@ +#!/usr/bin/env python + +import socket + +from scapy.layers.l2 import Ether +from scapy.layers.inet import ICMP, IP, TCP, UDP +from scapy.layers.ipsec import SecurityAssociation, ESP +from util import ppp, ppc +from framework import VppTestCase + + +class IPSecNATTestCase(VppTestCase): + """ IPSec/NAT + + TRANSPORT MODE: + + --- encrypt --- + |pg2| <-------> |VPP| + --- decrypt --- + + TUNNEL MODE: + + + public network | private network + --- encrypt --- plain --- + |pg0| <------- |VPP| <------ |pg1| + --- --- --- + + --- decrypt --- plain --- + |pg0| -------> |VPP| ------> |pg1| + --- --- --- + """ + + remote_pg0_client_addr = '1.1.1.1' + + @classmethod + def setUpClass(cls): + super(IPSecNATTestCase, cls).setUpClass() + cls.create_pg_interfaces(range(2)) + for i in cls.pg_interfaces: + i.configure_ipv4_neighbors() + i.admin_up() + i.config_ip4() + i.resolve_arp() + + cls.tcp_port_in = 6303 + cls.tcp_port_out = 6303 + cls.udp_port_in = 6304 + cls.udp_port_out = 6304 + cls.icmp_id_in = 6305 + cls.icmp_id_out = 6305 + cls.config_esp_tun() + cls.logger.info(cls.vapi.ppcli("show ipsec")) + + def create_stream_plain(self, src_mac, dst_mac, src_ip, dst_ip): + return [ + # TCP + Ether(src=src_mac, dst=dst_mac) / + IP(src=src_ip, dst=dst_ip) / + TCP(sport=self.tcp_port_in, dport=20), + # UDP + Ether(src=src_mac, dst=dst_mac) / + IP(src=src_ip, dst=dst_ip) / + UDP(sport=self.udp_port_in, dport=20), + # ICMP + Ether(src=src_mac, dst=dst_mac) / + IP(src=src_ip, dst=dst_ip) / + ICMP(id=self.icmp_id_in, type='echo-request') + ] + + def create_stream_encrypted(self, src_mac, dst_mac, src_ip, dst_ip, sa): + return [ + # TCP + Ether(src=src_mac, dst=dst_mac) / + sa.encrypt(IP(src=src_ip, dst=dst_ip) / + TCP(dport=self.tcp_port_out, sport=20)), + # UDP + Ether(src=src_mac, dst=dst_mac) / + sa.encrypt(IP(src=src_ip, dst=dst_ip) / + UDP(dport=self.udp_port_out, sport=20)), + # ICMP + Ether(src=src_mac, dst=dst_mac) / + sa.encrypt(IP(src=src_ip, dst=dst_ip) / + ICMP(id=self.icmp_id_out, type='echo-request')) + ] + + def check_checksum(self, pkt, layer): + """ Check checksum of the packet on given layer """ + new = pkt.__class__(str(pkt)) + del new[layer].chksum + new = new.__class__(str(new)) + self.assertEqual(new[layer].chksum, pkt[layer].chksum) + + def check_ip_checksum(self, pkt): + return self.check_checksum(pkt, 'IP') + + def check_tcp_checksum(self, pkt): + return self.check_checksum(pkt, 'TCP') + + def check_udp_checksum(self, pkt): + return self.check_checksum(pkt, 'UDP') + + def check_icmp_checksum(self, pkt): + return self.check_checksum(pkt, 'ICMP') + + def verify_capture_plain(self, capture): + for packet in capture: + try: + self.check_ip_checksum(packet) + self.assert_equal(packet[IP].src, self.pg0.remote_ip4, + "decrypted packet source address") + self.assert_equal(packet[IP].dst, self.pg1.remote_ip4, + "decrypted packet destination address") + if packet.haslayer(TCP): + self.assertFalse( + packet.haslayer(UDP), + "unexpected UDP header in decrypted packet") + self.assert_equal(packet[TCP].dport, self.tcp_port_in, + "decrypted packet TCP destination port") + self.check_tcp_checksum(packet) + elif packet.haslayer(UDP): + if packet[UDP].payload: + self.assertFalse( + packet[UDP][1].haslayer(UDP), + "unexpected UDP header in decrypted packet") + self.assert_equal(packet[UDP].dport, self.udp_port_in, + "decrypted packet UDP destination port") + else: + self.assertFalse( + packet.haslayer(UDP), + "unexpected UDP header in decrypted packet") + self.assert_equal(packet[ICMP].id, self.icmp_id_in, + "decrypted packet ICMP ID") + self.check_icmp_checksum(packet) + except Exception: + self.logger.error( + ppp("Unexpected or invalid plain packet:", packet)) + raise + + def verify_capture_encrypted(self, capture, sa): + for packet in capture: + try: + self.assertIn(ESP, packet[IP]) + decrypt_pkt = sa.decrypt(packet[IP]) + self.assert_equal(decrypt_pkt[IP].src, self.pg1.remote_ip4, + "encrypted packet source address") + self.assert_equal(decrypt_pkt[IP].dst, self.pg0.remote_ip4, + "encrypted packet destination address") + # if decrypt_pkt.haslayer(TCP): + # self.tcp_port_out = decrypt_pkt[TCP].sport + # elif decrypt_pkt.haslayer(UDP): + # self.udp_port_out = decrypt_pkt[UDP].sport + # else: + # self.icmp_id_out = decrypt_pkt[ICMP].id + except Exception: + self.logger.error( + ppp("Unexpected or invalid encrypted packet:", packet)) + raise + + @classmethod + def config_esp_tun(cls): + spd_id = 1 + remote_sa_id = 10 + local_sa_id = 20 + remote_tun_spi = 1001 + local_tun_spi = 1000 + client = socket.inet_pton(socket.AF_INET, cls.remote_pg0_client_addr) + cls.vapi.ip_add_del_route(client, 32, cls.pg0.remote_ip4n) + cls.vapi.ipsec_sad_add_del_entry(remote_sa_id, remote_tun_spi, + cls.pg1.remote_ip4n, + cls.pg0.remote_ip4n, + integrity_key_length=20, + crypto_key_length=16, + protocol=1, udp_encap=1) + cls.vapi.ipsec_sad_add_del_entry(local_sa_id, local_tun_spi, + cls.pg0.remote_ip4n, + cls.pg1.remote_ip4n, + integrity_key_length=20, + crypto_key_length=16, + protocol=1, udp_encap=1) + cls.vapi.ipsec_spd_add_del(spd_id) + cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg0.sw_if_index) + l_startaddr = r_startaddr = socket.inet_pton(socket.AF_INET, + "0.0.0.0") + l_stopaddr = r_stopaddr = socket.inet_pton(socket.AF_INET, + "255.255.255.255") + cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr, + r_startaddr, r_stopaddr, + protocol=socket.IPPROTO_ESP) + cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr, + r_startaddr, r_stopaddr, + protocol=socket.IPPROTO_ESP, + is_outbound=0) + cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr, + r_startaddr, r_stopaddr, + remote_port_start=4500, + remote_port_stop=4500, + protocol=socket.IPPROTO_UDP) + cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr, + r_startaddr, r_stopaddr, + remote_port_start=4500, + remote_port_stop=4500, + protocol=socket.IPPROTO_UDP, + is_outbound=0) + l_startaddr = l_stopaddr = cls.pg0.remote_ip4n + r_startaddr = r_stopaddr = cls.pg1.remote_ip4n + cls.vapi.ipsec_spd_add_del_entry(spd_id, l_startaddr, l_stopaddr, + r_startaddr, r_stopaddr, + priority=10, policy=3, + is_outbound=0, sa_id=local_sa_id) + cls.vapi.ipsec_spd_add_del_entry(spd_id, r_startaddr, r_stopaddr, + l_startaddr, l_stopaddr, + priority=10, policy=3, + sa_id=remote_sa_id) + + def test_ipsec_nat_tun(self): + """ IPSec/NAT tunnel test case """ + local_tun_sa = SecurityAssociation(ESP, spi=0x000003e9, + crypt_algo='AES-CBC', + crypt_key='JPjyOWBeVEQiMe7h', + auth_algo='HMAC-SHA1-96', + auth_key='C91KUR9GYMm5GfkEvNjX', + tunnel_header=IP( + src=self.pg1.remote_ip4, + dst=self.pg0.remote_ip4), + nat_t_header=UDP( + sport=4500, + dport=4500)) + # in2out - from private network to public + pkts = self.create_stream_plain( + self.pg1.remote_mac, self.pg1.local_mac, + self.pg1.remote_ip4, self.pg0.remote_ip4) + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(len(pkts)) + self.verify_capture_encrypted(capture, local_tun_sa) + + remote_tun_sa = SecurityAssociation(ESP, spi=0x000003e8, + crypt_algo='AES-CBC', + crypt_key='JPjyOWBeVEQiMe7h', + auth_algo='HMAC-SHA1-96', + auth_key='C91KUR9GYMm5GfkEvNjX', + tunnel_header=IP( + src=self.pg0.remote_ip4, + dst=self.pg1.remote_ip4), + nat_t_header=UDP( + sport=4500, + dport=4500)) + + # out2in - from public network to private + pkts = self.create_stream_encrypted( + self.pg0.remote_mac, self.pg0.local_mac, + self.pg0.remote_ip4, self.pg1.remote_ip4, remote_tun_sa) + self.logger.info(ppc("Sending packets:", pkts)) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + self.verify_capture_plain(capture) diff --git a/test/vpp_papi_provider.py b/test/vpp_papi_provider.py index 53be992aeeb..b3627317d9b 100644 --- a/test/vpp_papi_provider.py +++ b/test/vpp_papi_provider.py @@ -3060,9 +3060,8 @@ class VppPapiProvider(object): :returns: reply from the API """ return self.api( - self.papi.ipsec_interface_add_del_spd, { - 'spd_id': spd_id, - 'sw_if_index': sw_if_index, 'is_add': is_add}) + self.papi.ipsec_interface_add_del_spd, + {'spd_id': spd_id, 'sw_if_index': sw_if_index, 'is_add': is_add}) def ipsec_sad_add_del_entry(self, sad_id, @@ -3077,7 +3076,8 @@ class VppPapiProvider(object): crypto_key_length=0, crypto_key='JPjyOWBeVEQiMe7h', is_add=1, - is_tunnel=1): + is_tunnel=1, + udp_encap=0): """ IPSEC SA add/del Sample CLI : 'ipsec sa add 10 spi 1001 esp \ crypto-key 4a506a794f574265564551694d653768 \ @@ -3130,7 +3130,8 @@ class VppPapiProvider(object): 'crypto_key_length': crypto_key_length, 'crypto_key': crypto_key, 'is_add': is_add, - 'is_tunnel': is_tunnel}) + 'is_tunnel': is_tunnel, + 'udp_encap': udp_encap}) def ipsec_spd_add_del_entry(self, spd_id, -- cgit 1.2.3-korg