aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_snat.py
diff options
context:
space:
mode:
authorMatus Fabian <matfabia@cisco.com>2016-12-07 03:38:19 -0800
committerDamjan Marion <dmarion.lists@gmail.com>2016-12-07 14:46:05 +0000
commitde8867535a32f448c94c983633201fe1f9e835b6 (patch)
tree617522cb07d3b3b2833d5275bc506f90db21ebea /test/test_snat.py
parentcd06b728920e7119582de4d9a09ac71034706b2a (diff)
make test: add S-NAT plugin tests
Change-Id: I7859f63c5b28480be1ae587fc923d15212769e66 Signed-off-by: Matus Fabian <matfabia@cisco.com>
Diffstat (limited to 'test/test_snat.py')
-rw-r--r--test/test_snat.py536
1 files changed, 536 insertions, 0 deletions
diff --git a/test/test_snat.py b/test/test_snat.py
new file mode 100644
index 00000000..e90d9c0b
--- /dev/null
+++ b/test/test_snat.py
@@ -0,0 +1,536 @@
+#!/usr/bin/env python
+
+import socket
+import unittest
+from logging import *
+
+from framework import VppTestCase, VppTestRunner
+
+from scapy.layers.inet import IP, TCP, UDP, ICMP
+from scapy.layers.l2 import Ether
+
+
+class TestSNAT(VppTestCase):
+ """ SNAT Test Cases """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestSNAT, cls).setUpClass()
+
+ try:
+ cls.tcp_port_in = 6303
+ cls.tcp_port_out = 6303
+ cls.udp_port_in = 6304
+ cls.udp_port_out = 6304
+ cls.icmp_id_in = 6305
+ cls.icmp_id_out = 6305
+ cls.snat_addr = '10.0.0.3'
+
+ cls.create_pg_interfaces(range(7))
+ cls.interfaces = list(cls.pg_interfaces[0:4])
+
+ for i in cls.interfaces:
+ i.admin_up()
+ i.config_ip4()
+ i.resolve_arp()
+
+ cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
+
+ for i in cls.overlapping_interfaces:
+ i._local_ip4 = "172.16.255.1"
+ i._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
+ i._remote_hosts[0]._ip4 = "172.16.255.2"
+ i.set_table_ip4(i.sw_if_index)
+ i.config_ip4()
+ i.admin_up()
+ i.resolve_arp()
+
+ except Exception:
+ super(TestSNAT, cls).tearDownClass()
+ raise
+
+ def create_stream_in(self, in_if, out_if):
+ """
+ Create packet stream for inside network
+
+ :param in_if: Inside interface
+ :param out_if: Outside interface
+ """
+ pkts = []
+ # TCP
+ p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
+ IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
+ TCP(sport=self.tcp_port_in))
+ pkts.append(p)
+
+ # UDP
+ p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
+ IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
+ UDP(sport=self.udp_port_in))
+ pkts.append(p)
+
+ # ICMP
+ p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
+ IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
+ ICMP(id=self.icmp_id_in, type='echo-request'))
+ pkts.append(p)
+
+ return pkts
+
+ def create_stream_out(self, out_if, dst_ip=None):
+ """
+ Create packet stream for outside network
+
+ :param out_if: Outside interface
+ :param dst_ip: Destination IP address (Default use global SNAT address)
+ """
+ if dst_ip is None:
+ dst_ip=self.snat_addr
+ pkts = []
+ # TCP
+ p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
+ IP(src=out_if.remote_ip4, dst=dst_ip) /
+ TCP(dport=self.tcp_port_out))
+ pkts.append(p)
+
+ # UDP
+ p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
+ IP(src=out_if.remote_ip4, dst=dst_ip) /
+ UDP(dport=self.udp_port_out))
+ pkts.append(p)
+
+ # ICMP
+ p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
+ IP(src=out_if.remote_ip4, dst=dst_ip) /
+ ICMP(id=self.icmp_id_out, type='echo-reply'))
+ pkts.append(p)
+
+ return pkts
+
+ def verify_capture_out(self, capture, nat_ip=None, same_port=False,
+ packet_num=3):
+ """
+ Verify captured packets on outside network
+
+ :param capture: Captured packets
+ :param nat_ip: Translated IP address (Default use global SNAT address)
+ :param same_port: Sorce port number is not translated (Default False)
+ :param packet_num: Expected number of packets (Default 3)
+ """
+ if nat_ip is None:
+ nat_ip = self.snat_addr
+ self.assertEqual(packet_num, len(capture))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IP].src, nat_ip)
+ if packet.haslayer(TCP):
+ if same_port:
+ self.assertEqual(packet[TCP].sport, self.tcp_port_in)
+ else:
+ self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
+ self.tcp_port_out = packet[TCP].sport
+ elif packet.haslayer(UDP):
+ if same_port:
+ self.assertEqual(packet[UDP].sport, self.udp_port_in)
+ else:
+ self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
+ self.udp_port_out = packet[UDP].sport
+ else:
+ if same_port:
+ self.assertEqual(packet[ICMP].id, self.icmp_id_in)
+ else:
+ self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
+ self.icmp_id_out = packet[ICMP].id
+ except:
+ error("Unexpected or invalid packet (outside network):")
+ error(packet.show())
+ raise
+
+ def verify_capture_in(self, capture, in_if, packet_num=3):
+ """
+ Verify captured packets on inside network
+
+ :param capture: Captured packets
+ :param in_if: Inside interface
+ :param packet_num: Expected number of packets (Default 3)
+ """
+ self.assertEqual(packet_num, len(capture))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IP].dst, in_if.remote_ip4)
+ if packet.haslayer(TCP):
+ self.assertEqual(packet[TCP].dport, self.tcp_port_in)
+ elif packet.haslayer(UDP):
+ self.assertEqual(packet[UDP].dport, self.udp_port_in)
+ else:
+ self.assertEqual(packet[ICMP].id, self.icmp_id_in)
+ except:
+ error("Unexpected or invalid packet (inside network):")
+ error(packet.show())
+ raise
+
+ def clear_snat(self):
+ """
+ Clear SNAT configuration.
+ """
+ interfaces = self.vapi.snat_interface_dump()
+ for intf in interfaces:
+ self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
+ intf.is_inside,
+ is_add=0)
+
+ static_mappings = self.vapi.snat_static_mapping_dump()
+ for sm in static_mappings:
+ self.vapi.snat_add_static_mapping(sm.local_ip_address,
+ sm.external_ip_address,
+ local_port=sm.local_port,
+ external_port=sm.external_port,
+ addr_only=sm.addr_only,
+ vrf_id=sm.vrf_id,
+ is_add=0)
+
+ adresses = self.vapi.snat_address_dump()
+ for addr in adresses:
+ self.vapi.snat_add_address_range(addr.ip_address,
+ addr.ip_address,
+ is_add=0)
+
+ def snat_add_static_mapping(self, local_ip, external_ip, local_port=0,
+ external_port=0, vrf_id=0, is_add=1):
+ """
+ Add/delete S-NAT static mapping
+
+ :param local_ip: Local IP address
+ :param external_ip: External IP address
+ :param local_port: Local port number (Optional)
+ :param external_port: External port number (Optional)
+ :param vrf_id: VRF ID (Default 0)
+ :param is_add: 1 if add, 0 if delete (Default add)
+ """
+ addr_only = 1
+ if local_port and external_port:
+ addr_only = 0
+ l_ip = socket.inet_pton(socket.AF_INET, local_ip)
+ e_ip = socket.inet_pton(socket.AF_INET, external_ip)
+ self.vapi.snat_add_static_mapping(l_ip, e_ip, local_port, external_port,
+ addr_only, vrf_id, is_add)
+
+ def snat_add_address(self, ip, is_add=1):
+ """
+ Add/delete S-NAT address
+
+ :param ip: IP address
+ :param is_add: 1 if add, 0 if delete (Default add)
+ """
+ snat_addr = socket.inet_pton(socket.AF_INET, ip)
+ self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add)
+
+ def test_dynamic(self):
+ """ SNAT dynamic translation test """
+
+ self.snat_add_address(self.snat_addr)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ # in2out
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture()
+ self.verify_capture_out(capture)
+
+ # out2in
+ pkts = self.create_stream_out(self.pg1)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture()
+ self.verify_capture_in(capture, self.pg0)
+
+ def test_static_in(self):
+ """ SNAT 1:1 NAT initialized from inside network """
+
+ nat_ip = "10.0.0.10"
+ self.tcp_port_out = 6303
+ self.udp_port_out = 6304
+ self.icmp_id_out = 6305
+
+ self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ # in2out
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture()
+ self.verify_capture_out(capture, nat_ip, True)
+
+ # out2in
+ pkts = self.create_stream_out(self.pg1, nat_ip)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture()
+ self.verify_capture_in(capture, self.pg0)
+
+ def test_static_out(self):
+ """ SNAT 1:1 NAT initialized from outside network """
+
+ nat_ip = "10.0.0.20"
+ self.tcp_port_out = 6303
+ self.udp_port_out = 6304
+ self.icmp_id_out = 6305
+
+ self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ # out2in
+ pkts = self.create_stream_out(self.pg1, nat_ip)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture()
+ self.verify_capture_in(capture, self.pg0)
+
+ # in2out
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture()
+ self.verify_capture_out(capture, nat_ip, True)
+
+ def test_static_with_port_in(self):
+ """ SNAT 1:1 NAT with port initialized from inside network """
+
+ self.tcp_port_out = 3606
+ self.udp_port_out = 3607
+ self.icmp_id_out = 3608
+
+ self.snat_add_address(self.snat_addr)
+ self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
+ self.tcp_port_in, self.tcp_port_out)
+ self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
+ self.udp_port_in, self.udp_port_out)
+ self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
+ self.icmp_id_in, self.icmp_id_out)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ # in2out
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture()
+ self.verify_capture_out(capture)
+
+ # out2in
+ pkts = self.create_stream_out(self.pg1)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture()
+ self.verify_capture_in(capture, self.pg0)
+
+ def test_static_with_port_out(self):
+ """ SNAT 1:1 NAT with port initialized from outside network """
+
+ self.tcp_port_out = 30606
+ self.udp_port_out = 30607
+ self.icmp_id_out = 30608
+
+ self.snat_add_address(self.snat_addr)
+ self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
+ self.tcp_port_in, self.tcp_port_out)
+ self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
+ self.udp_port_in, self.udp_port_out)
+ self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
+ self.icmp_id_in, self.icmp_id_out)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ # out2in
+ pkts = self.create_stream_out(self.pg1)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture()
+ self.verify_capture_in(capture, self.pg0)
+
+ # in2out
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture()
+ self.verify_capture_out(capture)
+
+ def test_static_vrf_aware(self):
+ """ SNAT 1:1 NAT VRF awareness """
+
+ nat_ip1 = "10.0.0.30"
+ nat_ip2 = "10.0.0.40"
+ self.tcp_port_out = 6303
+ self.udp_port_out = 6304
+ self.icmp_id_out = 6305
+
+ self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
+ vrf_id=self.pg4.sw_if_index)
+ self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
+ vrf_id=self.pg4.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
+ is_inside=0)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
+
+ # inside interface VRF match SNAT static mapping VRF
+ pkts = self.create_stream_in(self.pg4, self.pg3)
+ self.pg4.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg3.get_capture()
+ self.verify_capture_out(capture, nat_ip1, True)
+
+ # inside interface VRF don't match SNAT static mapping VRF (packets
+ # are dropped)
+ pkts = self.create_stream_in(self.pg0, self.pg3)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg3.get_capture()
+ self.verify_capture_out(capture, packet_num=0)
+
+ def test_multiple_inside_interfaces(self):
+ """ SNAT multiple inside interfaces with non-overlapping address space """
+
+ self.snat_add_address(self.snat_addr)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
+ is_inside=0)
+
+ # in2out 1st interface
+ pkts = self.create_stream_in(self.pg0, self.pg3)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg3.get_capture()
+ self.verify_capture_out(capture)
+
+ # out2in 1st interface
+ pkts = self.create_stream_out(self.pg3)
+ self.pg3.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture()
+ self.verify_capture_in(capture, self.pg0)
+
+ # in2out 2nd interface
+ pkts = self.create_stream_in(self.pg1, self.pg3)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg3.get_capture()
+ self.verify_capture_out(capture)
+
+ # out2in 2nd interface
+ pkts = self.create_stream_out(self.pg3)
+ self.pg3.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture()
+ self.verify_capture_in(capture, self.pg1)
+
+ # in2out 3rd interface
+ pkts = self.create_stream_in(self.pg2, self.pg3)
+ self.pg2.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg3.get_capture()
+ self.verify_capture_out(capture)
+
+ # out2in 3rd interface
+ pkts = self.create_stream_out(self.pg3)
+ self.pg3.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg2.get_capture()
+ self.verify_capture_in(capture, self.pg2)
+
+ def test_inside_overlapping_interfaces(self):
+ """ SNAT multiple inside interfaces with overlapping address space """
+
+ self.snat_add_address(self.snat_addr)
+ self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
+ is_inside=0)
+ self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
+
+ # in2out 1st interface
+ pkts = self.create_stream_in(self.pg4, self.pg3)
+ self.pg4.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg3.get_capture()
+ self.verify_capture_out(capture)
+
+ # out2in 1st interface
+ pkts = self.create_stream_out(self.pg3)
+ self.pg3.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg4.get_capture()
+ self.verify_capture_in(capture, self.pg4)
+
+ # in2out 2nd interface
+ pkts = self.create_stream_in(self.pg5, self.pg3)
+ self.pg5.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg3.get_capture()
+ self.verify_capture_out(capture)
+
+ # out2in 2nd interface
+ pkts = self.create_stream_out(self.pg3)
+ self.pg3.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg5.get_capture()
+ self.verify_capture_in(capture, self.pg5)
+
+ # in2out 3rd interface
+ pkts = self.create_stream_in(self.pg6, self.pg3)
+ self.pg6.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg3.get_capture()
+ self.verify_capture_out(capture)
+
+ # out2in 3rd interface
+ pkts = self.create_stream_out(self.pg3)
+ self.pg3.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg6.get_capture()
+ self.verify_capture_in(capture, self.pg6)
+
+ def tearDown(self):
+ super(TestSNAT, self).tearDown()
+ if not self.vpp_dead:
+ self.logger.info(self.vapi.cli("show snat verbose"))
+ self.clear_snat()
+
+if __name__ == '__main__':
+ unittest.main(testRunner=VppTestRunner)