From de8867535a32f448c94c983633201fe1f9e835b6 Mon Sep 17 00:00:00 2001 From: Matus Fabian Date: Wed, 7 Dec 2016 03:38:19 -0800 Subject: make test: add S-NAT plugin tests Change-Id: I7859f63c5b28480be1ae587fc923d15212769e66 Signed-off-by: Matus Fabian --- test/test_snat.py | 536 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 536 insertions(+) create mode 100644 test/test_snat.py (limited to 'test/test_snat.py') diff --git a/test/test_snat.py b/test/test_snat.py new file mode 100644 index 00000000..e90d9c0b --- /dev/null +++ b/test/test_snat.py @@ -0,0 +1,536 @@ +#!/usr/bin/env python + +import socket +import unittest +from logging import * + +from framework import VppTestCase, VppTestRunner + +from scapy.layers.inet import IP, TCP, UDP, ICMP +from scapy.layers.l2 import Ether + + +class TestSNAT(VppTestCase): + """ SNAT Test Cases """ + + @classmethod + def setUpClass(cls): + super(TestSNAT, cls).setUpClass() + + try: + cls.tcp_port_in = 6303 + cls.tcp_port_out = 6303 + cls.udp_port_in = 6304 + cls.udp_port_out = 6304 + cls.icmp_id_in = 6305 + cls.icmp_id_out = 6305 + cls.snat_addr = '10.0.0.3' + + cls.create_pg_interfaces(range(7)) + cls.interfaces = list(cls.pg_interfaces[0:4]) + + for i in cls.interfaces: + i.admin_up() + i.config_ip4() + i.resolve_arp() + + cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7])) + + for i in cls.overlapping_interfaces: + i._local_ip4 = "172.16.255.1" + i._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4) + i._remote_hosts[0]._ip4 = "172.16.255.2" + i.set_table_ip4(i.sw_if_index) + i.config_ip4() + i.admin_up() + i.resolve_arp() + + except Exception: + super(TestSNAT, cls).tearDownClass() + raise + + def create_stream_in(self, in_if, out_if): + """ + Create packet stream for inside network + + :param in_if: Inside interface + :param out_if: Outside interface + """ + pkts = [] + # TCP + p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / + IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) / + TCP(sport=self.tcp_port_in)) + pkts.append(p) + + # UDP + p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / + IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) / + UDP(sport=self.udp_port_in)) + pkts.append(p) + + # ICMP + p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / + IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) / + ICMP(id=self.icmp_id_in, type='echo-request')) + pkts.append(p) + + return pkts + + def create_stream_out(self, out_if, dst_ip=None): + """ + Create packet stream for outside network + + :param out_if: Outside interface + :param dst_ip: Destination IP address (Default use global SNAT address) + """ + if dst_ip is None: + dst_ip=self.snat_addr + pkts = [] + # TCP + p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / + IP(src=out_if.remote_ip4, dst=dst_ip) / + TCP(dport=self.tcp_port_out)) + pkts.append(p) + + # UDP + p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / + IP(src=out_if.remote_ip4, dst=dst_ip) / + UDP(dport=self.udp_port_out)) + pkts.append(p) + + # ICMP + p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / + IP(src=out_if.remote_ip4, dst=dst_ip) / + ICMP(id=self.icmp_id_out, type='echo-reply')) + pkts.append(p) + + return pkts + + def verify_capture_out(self, capture, nat_ip=None, same_port=False, + packet_num=3): + """ + Verify captured packets on outside network + + :param capture: Captured packets + :param nat_ip: Translated IP address (Default use global SNAT address) + :param same_port: Sorce port number is not translated (Default False) + :param packet_num: Expected number of packets (Default 3) + """ + if nat_ip is None: + nat_ip = self.snat_addr + self.assertEqual(packet_num, len(capture)) + for packet in capture: + try: + self.assertEqual(packet[IP].src, nat_ip) + if packet.haslayer(TCP): + if same_port: + self.assertEqual(packet[TCP].sport, self.tcp_port_in) + else: + self.assertNotEqual(packet[TCP].sport, self.tcp_port_in) + self.tcp_port_out = packet[TCP].sport + elif packet.haslayer(UDP): + if same_port: + self.assertEqual(packet[UDP].sport, self.udp_port_in) + else: + self.assertNotEqual(packet[UDP].sport, self.udp_port_in) + self.udp_port_out = packet[UDP].sport + else: + if same_port: + self.assertEqual(packet[ICMP].id, self.icmp_id_in) + else: + self.assertNotEqual(packet[ICMP].id, self.icmp_id_in) + self.icmp_id_out = packet[ICMP].id + except: + error("Unexpected or invalid packet (outside network):") + error(packet.show()) + raise + + def verify_capture_in(self, capture, in_if, packet_num=3): + """ + Verify captured packets on inside network + + :param capture: Captured packets + :param in_if: Inside interface + :param packet_num: Expected number of packets (Default 3) + """ + self.assertEqual(packet_num, len(capture)) + for packet in capture: + try: + self.assertEqual(packet[IP].dst, in_if.remote_ip4) + if packet.haslayer(TCP): + self.assertEqual(packet[TCP].dport, self.tcp_port_in) + elif packet.haslayer(UDP): + self.assertEqual(packet[UDP].dport, self.udp_port_in) + else: + self.assertEqual(packet[ICMP].id, self.icmp_id_in) + except: + error("Unexpected or invalid packet (inside network):") + error(packet.show()) + raise + + def clear_snat(self): + """ + Clear SNAT configuration. + """ + interfaces = self.vapi.snat_interface_dump() + for intf in interfaces: + self.vapi.snat_interface_add_del_feature(intf.sw_if_index, + intf.is_inside, + is_add=0) + + static_mappings = self.vapi.snat_static_mapping_dump() + for sm in static_mappings: + self.vapi.snat_add_static_mapping(sm.local_ip_address, + sm.external_ip_address, + local_port=sm.local_port, + external_port=sm.external_port, + addr_only=sm.addr_only, + vrf_id=sm.vrf_id, + is_add=0) + + adresses = self.vapi.snat_address_dump() + for addr in adresses: + self.vapi.snat_add_address_range(addr.ip_address, + addr.ip_address, + is_add=0) + + def snat_add_static_mapping(self, local_ip, external_ip, local_port=0, + external_port=0, vrf_id=0, is_add=1): + """ + Add/delete S-NAT static mapping + + :param local_ip: Local IP address + :param external_ip: External IP address + :param local_port: Local port number (Optional) + :param external_port: External port number (Optional) + :param vrf_id: VRF ID (Default 0) + :param is_add: 1 if add, 0 if delete (Default add) + """ + addr_only = 1 + if local_port and external_port: + addr_only = 0 + l_ip = socket.inet_pton(socket.AF_INET, local_ip) + e_ip = socket.inet_pton(socket.AF_INET, external_ip) + self.vapi.snat_add_static_mapping(l_ip, e_ip, local_port, external_port, + addr_only, vrf_id, is_add) + + def snat_add_address(self, ip, is_add=1): + """ + Add/delete S-NAT address + + :param ip: IP address + :param is_add: 1 if add, 0 if delete (Default add) + """ + snat_addr = socket.inet_pton(socket.AF_INET, ip) + self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add) + + def test_dynamic(self): + """ SNAT dynamic translation test """ + + self.snat_add_address(self.snat_addr) + self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index, + is_inside=0) + + # in2out + pkts = self.create_stream_in(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture() + self.verify_capture_out(capture) + + # out2in + pkts = self.create_stream_out(self.pg1) + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture() + self.verify_capture_in(capture, self.pg0) + + def test_static_in(self): + """ SNAT 1:1 NAT initialized from inside network """ + + nat_ip = "10.0.0.10" + self.tcp_port_out = 6303 + self.udp_port_out = 6304 + self.icmp_id_out = 6305 + + self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip) + self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index, + is_inside=0) + + # in2out + pkts = self.create_stream_in(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture() + self.verify_capture_out(capture, nat_ip, True) + + # out2in + pkts = self.create_stream_out(self.pg1, nat_ip) + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture() + self.verify_capture_in(capture, self.pg0) + + def test_static_out(self): + """ SNAT 1:1 NAT initialized from outside network """ + + nat_ip = "10.0.0.20" + self.tcp_port_out = 6303 + self.udp_port_out = 6304 + self.icmp_id_out = 6305 + + self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip) + self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index, + is_inside=0) + + # out2in + pkts = self.create_stream_out(self.pg1, nat_ip) + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture() + self.verify_capture_in(capture, self.pg0) + + # in2out + pkts = self.create_stream_in(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture() + self.verify_capture_out(capture, nat_ip, True) + + def test_static_with_port_in(self): + """ SNAT 1:1 NAT with port initialized from inside network """ + + self.tcp_port_out = 3606 + self.udp_port_out = 3607 + self.icmp_id_out = 3608 + + self.snat_add_address(self.snat_addr) + self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr, + self.tcp_port_in, self.tcp_port_out) + self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr, + self.udp_port_in, self.udp_port_out) + self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr, + self.icmp_id_in, self.icmp_id_out) + self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index, + is_inside=0) + + # in2out + pkts = self.create_stream_in(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture() + self.verify_capture_out(capture) + + # out2in + pkts = self.create_stream_out(self.pg1) + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture() + self.verify_capture_in(capture, self.pg0) + + def test_static_with_port_out(self): + """ SNAT 1:1 NAT with port initialized from outside network """ + + self.tcp_port_out = 30606 + self.udp_port_out = 30607 + self.icmp_id_out = 30608 + + self.snat_add_address(self.snat_addr) + self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr, + self.tcp_port_in, self.tcp_port_out) + self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr, + self.udp_port_in, self.udp_port_out) + self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr, + self.icmp_id_in, self.icmp_id_out) + self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index, + is_inside=0) + + # out2in + pkts = self.create_stream_out(self.pg1) + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture() + self.verify_capture_in(capture, self.pg0) + + # in2out + pkts = self.create_stream_in(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture() + self.verify_capture_out(capture) + + def test_static_vrf_aware(self): + """ SNAT 1:1 NAT VRF awareness """ + + nat_ip1 = "10.0.0.30" + nat_ip2 = "10.0.0.40" + self.tcp_port_out = 6303 + self.udp_port_out = 6304 + self.icmp_id_out = 6305 + + self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1, + vrf_id=self.pg4.sw_if_index) + self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2, + vrf_id=self.pg4.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index, + is_inside=0) + self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index) + + # inside interface VRF match SNAT static mapping VRF + pkts = self.create_stream_in(self.pg4, self.pg3) + self.pg4.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg3.get_capture() + self.verify_capture_out(capture, nat_ip1, True) + + # inside interface VRF don't match SNAT static mapping VRF (packets + # are dropped) + pkts = self.create_stream_in(self.pg0, self.pg3) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg3.get_capture() + self.verify_capture_out(capture, packet_num=0) + + def test_multiple_inside_interfaces(self): + """ SNAT multiple inside interfaces with non-overlapping address space """ + + self.snat_add_address(self.snat_addr) + self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index, + is_inside=0) + + # in2out 1st interface + pkts = self.create_stream_in(self.pg0, self.pg3) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg3.get_capture() + self.verify_capture_out(capture) + + # out2in 1st interface + pkts = self.create_stream_out(self.pg3) + self.pg3.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture() + self.verify_capture_in(capture, self.pg0) + + # in2out 2nd interface + pkts = self.create_stream_in(self.pg1, self.pg3) + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg3.get_capture() + self.verify_capture_out(capture) + + # out2in 2nd interface + pkts = self.create_stream_out(self.pg3) + self.pg3.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture() + self.verify_capture_in(capture, self.pg1) + + # in2out 3rd interface + pkts = self.create_stream_in(self.pg2, self.pg3) + self.pg2.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg3.get_capture() + self.verify_capture_out(capture) + + # out2in 3rd interface + pkts = self.create_stream_out(self.pg3) + self.pg3.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg2.get_capture() + self.verify_capture_in(capture, self.pg2) + + def test_inside_overlapping_interfaces(self): + """ SNAT multiple inside interfaces with overlapping address space """ + + self.snat_add_address(self.snat_addr) + self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index, + is_inside=0) + self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index) + + # in2out 1st interface + pkts = self.create_stream_in(self.pg4, self.pg3) + self.pg4.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg3.get_capture() + self.verify_capture_out(capture) + + # out2in 1st interface + pkts = self.create_stream_out(self.pg3) + self.pg3.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg4.get_capture() + self.verify_capture_in(capture, self.pg4) + + # in2out 2nd interface + pkts = self.create_stream_in(self.pg5, self.pg3) + self.pg5.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg3.get_capture() + self.verify_capture_out(capture) + + # out2in 2nd interface + pkts = self.create_stream_out(self.pg3) + self.pg3.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg5.get_capture() + self.verify_capture_in(capture, self.pg5) + + # in2out 3rd interface + pkts = self.create_stream_in(self.pg6, self.pg3) + self.pg6.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg3.get_capture() + self.verify_capture_out(capture) + + # out2in 3rd interface + pkts = self.create_stream_out(self.pg3) + self.pg3.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg6.get_capture() + self.verify_capture_in(capture, self.pg6) + + def tearDown(self): + super(TestSNAT, self).tearDown() + if not self.vpp_dead: + self.logger.info(self.vapi.cli("show snat verbose")) + self.clear_snat() + +if __name__ == '__main__': + unittest.main(testRunner=VppTestRunner) -- cgit 1.2.3-korg