summaryrefslogtreecommitdiffstats
path: root/test/test_snat.py
diff options
context:
space:
mode:
authorMatus Fabian <matfabia@cisco.com>2017-06-08 05:24:28 -0700
committerOle Trøan <otroan@employees.org>2017-06-09 07:30:14 +0000
commit732036d677b84aa8eaea45f8059783e827622b77 (patch)
treebac6a6fccc1ae8500d5d3f54dbede362d0402459 /test/test_snat.py
parentef2a5bf0a31c9c0a94f9f497cb6353f46073e6ec (diff)
NAT64: ICMP error support
Added ICMP error messages translation. Added check for multi thread (not supported yet, so init failed). Added API definition for custom NAT64 refix. Change-Id: Ice2f04631af63e594aecc09087a1cf59f3b676fb Signed-off-by: Matus Fabian <matfabia@cisco.com>
Diffstat (limited to 'test/test_snat.py')
-rw-r--r--test/test_snat.py90
1 files changed, 90 insertions, 0 deletions
diff --git a/test/test_snat.py b/test/test_snat.py
index 64fa3057471..c6344a9ee74 100644
--- a/test/test_snat.py
+++ b/test/test_snat.py
@@ -8,6 +8,7 @@ from framework import VppTestCase, VppTestRunner, running_extended_tests
from scapy.layers.inet import IP, TCP, UDP, ICMP
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
+from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
from scapy.layers.l2 import Ether, ARP
from scapy.data import IP_PROTOS
from scapy.packet import bind_layers
@@ -2635,6 +2636,95 @@ class TestNAT64(MethodHolder):
ses_num_after_timeout = self.nat64_get_ses_num()
self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
+ def test_icmp_error(self):
+ """ NAT64 ICMP Error message translation """
+ self.tcp_port_in = 6303
+ self.udp_port_in = 6304
+ self.icmp_id_in = 6305
+
+ ses_num_start = self.nat64_get_ses_num()
+
+ self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+ self.nat_addr_n)
+ self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+ self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+
+ # send some packets to create sessions
+ pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture_ip4 = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture_ip4, packet_num=3,
+ nat_ip=self.nat_addr,
+ dst_ip=self.pg1.remote_ip4)
+
+ pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture_ip6 = self.pg0.get_capture(len(pkts))
+ ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
+ self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
+ self.pg0.remote_ip6)
+
+ # in2out
+ pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
+ ICMPv6DestUnreach(code=1) /
+ packet[IPv6] for packet in capture_ip6]
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IP].src, self.nat_addr)
+ self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
+ self.assertEqual(packet[ICMP].type, 3)
+ self.assertEqual(packet[ICMP].code, 13)
+ inner = packet[IPerror]
+ self.assertEqual(inner.src, self.pg1.remote_ip4)
+ self.assertEqual(inner.dst, self.nat_addr)
+ if inner.haslayer(TCPerror):
+ self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
+ elif inner.haslayer(UDPerror):
+ self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
+ else:
+ self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
+ # out2in
+ pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ ICMP(type=3, code=13) /
+ packet[IP] for packet in capture_ip4]
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IPv6].src, ip.src)
+ self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
+ icmp = packet[ICMPv6DestUnreach]
+ self.assertEqual(icmp.code, 1)
+ inner = icmp[IPerror6]
+ self.assertEqual(inner.src, self.pg0.remote_ip6)
+ self.assertEqual(inner.dst, ip.src)
+ if inner.haslayer(TCPerror):
+ self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
+ elif inner.haslayer(UDPerror):
+ self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
+ else:
+ self.assertEqual(inner[ICMPv6EchoRequest].id,
+ self.icmp_id_in)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
def nat64_get_ses_num(self):
"""
Return number of active NAT64 sessions.