summaryrefslogtreecommitdiffstats
path: root/test/test_snat.py
diff options
context:
space:
mode:
authorJuraj Sloboda <jsloboda@cisco.com>2017-02-08 23:54:21 -0800
committerOle Trøan <otroan@employees.org>2017-02-16 10:48:53 +0000
commitb33f413af46ec8dff7f222dbd5bc3bcec1502d3d (patch)
tree6c3118bfce76a0fe65f757b32d3d1d2f4b8d5789 /test/test_snat.py
parent205e934ee7530dac475be14d9054183625171ccc (diff)
Add handling of ICMP error packets in SNAT (VPP-629)
Change-Id: I8d2022b7cb3ef3da736c085bccbb5b9c057a8d76 Signed-off-by: Juraj Sloboda <jsloboda@cisco.com>
Diffstat (limited to 'test/test_snat.py')
-rw-r--r--test/test_snat.py195
1 files changed, 187 insertions, 8 deletions
diff --git a/test/test_snat.py b/test/test_snat.py
index 9d2070a078c..500d629dfe5 100644
--- a/test/test_snat.py
+++ b/test/test_snat.py
@@ -6,6 +6,7 @@ import struct
from framework import VppTestCase, VppTestRunner
from scapy.layers.inet import IP, TCP, UDP, ICMP
+from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.l2 import Ether, ARP
from scapy.data import IP_PROTOS
from util import ppp
@@ -64,59 +65,61 @@ class TestSNAT(VppTestCase):
super(TestSNAT, cls).tearDownClass()
raise
- def create_stream_in(self, in_if, out_if):
+ def create_stream_in(self, in_if, out_if, ttl=64):
"""
Create packet stream for inside network
:param in_if: Inside interface
:param out_if: Outside interface
+ :param ttl: TTL of generated packets
"""
pkts = []
# TCP
p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
- IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
+ IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
TCP(sport=self.tcp_port_in))
pkts.append(p)
# UDP
p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
- IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
+ IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
UDP(sport=self.udp_port_in))
pkts.append(p)
# ICMP
p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
- IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
+ IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
ICMP(id=self.icmp_id_in, type='echo-request'))
pkts.append(p)
return pkts
- def create_stream_out(self, out_if, dst_ip=None):
+ def create_stream_out(self, out_if, dst_ip=None, ttl=64):
"""
Create packet stream for outside network
:param out_if: Outside interface
:param dst_ip: Destination IP address (Default use global SNAT address)
+ :param ttl: TTL of generated packets
"""
if dst_ip is None:
dst_ip = self.snat_addr
pkts = []
# TCP
p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
- IP(src=out_if.remote_ip4, dst=dst_ip) /
+ IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
TCP(dport=self.tcp_port_out))
pkts.append(p)
# UDP
p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
- IP(src=out_if.remote_ip4, dst=dst_ip) /
+ IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
UDP(dport=self.udp_port_out))
pkts.append(p)
# ICMP
p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
- IP(src=out_if.remote_ip4, dst=dst_ip) /
+ IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
ICMP(id=self.icmp_id_out, type='echo-reply'))
pkts.append(p)
@@ -209,6 +212,75 @@ class TestSNAT(VppTestCase):
"(inside network):", packet))
raise
+ def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
+ packet_num=3, icmp_type=11):
+ """
+ Verify captured packets with ICMP errors on outside network
+
+ :param capture: Captured packets
+ :param src_ip: Translated IP address or IP address of VPP
+ (Default use global SNAT address)
+ :param packet_num: Expected number of packets (Default 3)
+ :param icmp_type: Type of error ICMP packet
+ we are expecting (Default 11)
+ """
+ if src_ip is None:
+ src_ip = self.snat_addr
+ self.assertEqual(packet_num, len(capture))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IP].src, src_ip)
+ self.assertTrue(packet.haslayer(ICMP))
+ icmp = packet[ICMP]
+ self.assertEqual(icmp.type, icmp_type)
+ self.assertTrue(icmp.haslayer(IPerror))
+ inner_ip = icmp[IPerror]
+ if inner_ip.haslayer(TCPerror):
+ self.assertEqual(inner_ip[TCPerror].dport,
+ self.tcp_port_out)
+ elif inner_ip.haslayer(UDPerror):
+ self.assertEqual(inner_ip[UDPerror].dport,
+ self.udp_port_out)
+ else:
+ self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet "
+ "(outside network):", packet))
+ raise
+
+ def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
+ icmp_type=11):
+ """
+ Verify captured packets with ICMP errors on inside network
+
+ :param capture: Captured packets
+ :param in_if: Inside interface
+ :param packet_num: Expected number of packets (Default 3)
+ :param icmp_type: Type of error ICMP packet
+ we are expecting (Default 11)
+ """
+ self.assertEqual(packet_num, len(capture))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IP].dst, in_if.remote_ip4)
+ self.assertTrue(packet.haslayer(ICMP))
+ icmp = packet[ICMP]
+ self.assertEqual(icmp.type, icmp_type)
+ self.assertTrue(icmp.haslayer(IPerror))
+ inner_ip = icmp[IPerror]
+ if inner_ip.haslayer(TCPerror):
+ self.assertEqual(inner_ip[TCPerror].sport,
+ self.tcp_port_in)
+ elif inner_ip.haslayer(UDPerror):
+ self.assertEqual(inner_ip[UDPerror].sport,
+ self.udp_port_in)
+ else:
+ self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet "
+ "(inside network):", packet))
+ raise
+
def verify_ipfix_nat44_ses(self, data):
"""
Verify IPFIX NAT44 session create/delete event
@@ -367,6 +439,113 @@ class TestSNAT(VppTestCase):
capture = self.pg0.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg0)
+ def test_dynamic_icmp_errors_in2out_ttl_1(self):
+ """ SNAT handling of client packets with TTL=1 """
+
+ self.snat_add_address(self.snat_addr)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ # Client side - generate traffic
+ pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ # Client side - verify ICMP type 11 packets
+ capture = self.pg0.get_capture(len(pkts))
+ self.verify_capture_in_with_icmp_errors(capture, self.pg0)
+
+ def test_dynamic_icmp_errors_out2in_ttl_1(self):
+ """ SNAT handling of server packets with TTL=1 """
+
+ self.snat_add_address(self.snat_addr)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ # Client side - create sessions
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ # Server side - generate traffic
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture)
+ pkts = self.create_stream_out(self.pg1, ttl=1)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ # Server side - verify ICMP type 11 packets
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out_with_icmp_errors(capture,
+ src_ip=self.pg1.local_ip4)
+
+ def test_dynamic_icmp_errors_in2out_ttl_2(self):
+ """ SNAT handling of error respones to client packets with TTL=2 """
+
+ self.snat_add_address(self.snat_addr)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ # Client side - generate traffic
+ pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ # Server side - simulate ICMP type 11 response
+ capture = self.pg1.get_capture(len(pkts))
+ pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
+ ICMP(type=11) / packet[IP] for packet in capture]
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ # Client side - verify ICMP type 11 packets
+ capture = self.pg0.get_capture(len(pkts))
+ self.verify_capture_in_with_icmp_errors(capture, self.pg0)
+
+ def test_dynamic_icmp_errors_out2in_ttl_2(self):
+ """ SNAT handling of error respones to server packets with TTL=2 """
+
+ self.snat_add_address(self.snat_addr)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ # Client side - create sessions
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ # Server side - generate traffic
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture)
+ pkts = self.create_stream_out(self.pg1, ttl=2)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ # Client side - simulate ICMP type 11 response
+ capture = self.pg0.get_capture(len(pkts))
+ pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ ICMP(type=11) / packet[IP] for packet in capture]
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ # Server side - verify ICMP type 11 packets
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out_with_icmp_errors(capture)
+
def test_static_in(self):
""" SNAT 1:1 NAT initialized from inside network """