aboutsummaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/test_snat.py166
1 files changed, 164 insertions, 2 deletions
diff --git a/test/test_snat.py b/test/test_snat.py
index 8d384384222..0eceaab256b 100644
--- a/test/test_snat.py
+++ b/test/test_snat.py
@@ -324,7 +324,7 @@ class TestSNAT(MethodHolder):
i.config_ip4()
i.resolve_arp()
- cls.pg0.generate_remote_hosts(2)
+ cls.pg0.generate_remote_hosts(3)
cls.pg0.configure_ipv4_neighbors()
cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
@@ -1016,7 +1016,7 @@ class TestSNAT(MethodHolder):
self.icmp_id_in])
def test_hairpinning(self):
- """ SNAT hairpinning """
+ """ SNAT hairpinning - 1:1 NAT with port"""
host = self.pg0.remote_hosts[0]
server = self.pg0.remote_hosts[1]
@@ -1075,6 +1075,168 @@ class TestSNAT(MethodHolder):
self.logger.error(ppp("Unexpected or invalid packet:"), p)
raise
+ def test_hairpinning2(self):
+ """ SNAT hairpinning - 1:1 NAT"""
+
+ server1_nat_ip = "10.0.0.10"
+ server2_nat_ip = "10.0.0.11"
+ host = self.pg0.remote_hosts[0]
+ server1 = self.pg0.remote_hosts[1]
+ server2 = self.pg0.remote_hosts[2]
+ server_tcp_port = 22
+ server_udp_port = 20
+
+ self.snat_add_address(self.snat_addr)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ # add static mapping for servers
+ self.snat_add_static_mapping(server1.ip4, server1_nat_ip)
+ self.snat_add_static_mapping(server2.ip4, server2_nat_ip)
+
+ # host to server1
+ pkts = []
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=host.ip4, dst=server1_nat_ip) /
+ TCP(sport=self.tcp_port_in, dport=server_tcp_port))
+ pkts.append(p)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=host.ip4, dst=server1_nat_ip) /
+ UDP(sport=self.udp_port_in, dport=server_udp_port))
+ pkts.append(p)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=host.ip4, dst=server1_nat_ip) /
+ ICMP(id=self.icmp_id_in, type='echo-request'))
+ pkts.append(p)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IP].src, self.snat_addr)
+ self.assertEqual(packet[IP].dst, server1.ip4)
+ if packet.haslayer(TCP):
+ self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
+ self.assertEqual(packet[TCP].dport, server_tcp_port)
+ self.tcp_port_out = packet[TCP].sport
+ elif packet.haslayer(UDP):
+ self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
+ self.assertEqual(packet[UDP].dport, server_udp_port)
+ self.udp_port_out = packet[UDP].sport
+ else:
+ self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
+ self.icmp_id_out = packet[ICMP].id
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
+ # server1 to host
+ pkts = []
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=server1.ip4, dst=self.snat_addr) /
+ TCP(sport=server_tcp_port, dport=self.tcp_port_out))
+ pkts.append(p)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=server1.ip4, dst=self.snat_addr) /
+ UDP(sport=server_udp_port, dport=self.udp_port_out))
+ pkts.append(p)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=server1.ip4, dst=self.snat_addr) /
+ ICMP(id=self.icmp_id_out, type='echo-reply'))
+ pkts.append(p)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IP].src, server1_nat_ip)
+ self.assertEqual(packet[IP].dst, host.ip4)
+ if packet.haslayer(TCP):
+ self.assertEqual(packet[TCP].dport, self.tcp_port_in)
+ self.assertEqual(packet[TCP].sport, server_tcp_port)
+ elif packet.haslayer(UDP):
+ self.assertEqual(packet[UDP].dport, self.udp_port_in)
+ self.assertEqual(packet[UDP].sport, server_udp_port)
+ else:
+ self.assertEqual(packet[ICMP].id, self.icmp_id_in)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
+ # server2 to server1
+ pkts = []
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=server2.ip4, dst=server1_nat_ip) /
+ TCP(sport=self.tcp_port_in, dport=server_tcp_port))
+ pkts.append(p)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=server2.ip4, dst=server1_nat_ip) /
+ UDP(sport=self.udp_port_in, dport=server_udp_port))
+ pkts.append(p)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=server2.ip4, dst=server1_nat_ip) /
+ ICMP(id=self.icmp_id_in, type='echo-request'))
+ pkts.append(p)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IP].src, server2_nat_ip)
+ self.assertEqual(packet[IP].dst, server1.ip4)
+ if packet.haslayer(TCP):
+ self.assertEqual(packet[TCP].sport, self.tcp_port_in)
+ self.assertEqual(packet[TCP].dport, server_tcp_port)
+ self.tcp_port_out = packet[TCP].sport
+ elif packet.haslayer(UDP):
+ self.assertEqual(packet[UDP].sport, self.udp_port_in)
+ self.assertEqual(packet[UDP].dport, server_udp_port)
+ self.udp_port_out = packet[UDP].sport
+ else:
+ self.assertEqual(packet[ICMP].id, self.icmp_id_in)
+ self.icmp_id_out = packet[ICMP].id
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
+ # server1 to server2
+ pkts = []
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=server1.ip4, dst=server2_nat_ip) /
+ TCP(sport=server_tcp_port, dport=self.tcp_port_out))
+ pkts.append(p)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=server1.ip4, dst=server2_nat_ip) /
+ UDP(sport=server_udp_port, dport=self.udp_port_out))
+ pkts.append(p)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=server1.ip4, dst=server2_nat_ip) /
+ ICMP(id=self.icmp_id_out, type='echo-reply'))
+ pkts.append(p)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IP].src, server1_nat_ip)
+ self.assertEqual(packet[IP].dst, server2.ip4)
+ if packet.haslayer(TCP):
+ self.assertEqual(packet[TCP].dport, self.tcp_port_in)
+ self.assertEqual(packet[TCP].sport, server_tcp_port)
+ elif packet.haslayer(UDP):
+ self.assertEqual(packet[UDP].dport, self.udp_port_in)
+ self.assertEqual(packet[UDP].sport, server_udp_port)
+ else:
+ self.assertEqual(packet[ICMP].id, self.icmp_id_in)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
def test_max_translations_per_user(self):
""" MAX translations per user - recycle the least recently used """