aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_nat.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_nat.py')
-rw-r--r--test/test_nat.py241
1 files changed, 229 insertions, 12 deletions
diff --git a/test/test_nat.py b/test/test_nat.py
index aeeb5aa2bd5..4ced0af46e0 100644
--- a/test/test_nat.py
+++ b/test/test_nat.py
@@ -134,30 +134,34 @@ class MethodHolder(VppTestCase):
self.assertEqual(new['ICMPv6EchoReply'].cksum,
pkt['ICMPv6EchoReply'].cksum)
- def create_stream_in(self, in_if, out_if, ttl=64):
+ def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
"""
Create packet stream for inside network
:param in_if: Inside interface
:param out_if: Outside interface
+ :param dst_ip: Destination address
:param ttl: TTL of generated packets
"""
+ if dst_ip is None:
+ dst_ip = out_if.remote_ip4
+
pkts = []
# TCP
p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
- IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
+ IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
TCP(sport=self.tcp_port_in, dport=20))
pkts.append(p)
# UDP
p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
- IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
+ IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
UDP(sport=self.udp_port_in, dport=20))
pkts.append(p)
# ICMP
p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
- IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
+ IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
ICMP(id=self.icmp_id_in, type='echo-request'))
pkts.append(p)
@@ -206,6 +210,48 @@ class MethodHolder(VppTestCase):
pref_n[15] = ip4_n[3]
return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
+ def extract_ip4(self, ip6, plen):
+ """
+ Extract IPv4 address embedded in IPv6 addresses
+
+ :param ip6: IPv6 address
+ :param plen: IPv6 prefix length
+ :returns: extracted IPv4 address
+ """
+ ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
+ ip4_n = [None] * 4
+ if plen == 32:
+ ip4_n[0] = ip6_n[4]
+ ip4_n[1] = ip6_n[5]
+ ip4_n[2] = ip6_n[6]
+ ip4_n[3] = ip6_n[7]
+ elif plen == 40:
+ ip4_n[0] = ip6_n[5]
+ ip4_n[1] = ip6_n[6]
+ ip4_n[2] = ip6_n[7]
+ ip4_n[3] = ip6_n[9]
+ elif plen == 48:
+ ip4_n[0] = ip6_n[6]
+ ip4_n[1] = ip6_n[7]
+ ip4_n[2] = ip6_n[9]
+ ip4_n[3] = ip6_n[10]
+ elif plen == 56:
+ ip4_n[0] = ip6_n[7]
+ ip4_n[1] = ip6_n[9]
+ ip4_n[2] = ip6_n[10]
+ ip4_n[3] = ip6_n[11]
+ elif plen == 64:
+ ip4_n[0] = ip6_n[9]
+ ip4_n[1] = ip6_n[10]
+ ip4_n[2] = ip6_n[11]
+ ip4_n[3] = ip6_n[12]
+ elif plen == 96:
+ ip4_n[0] = ip6_n[12]
+ ip4_n[1] = ip6_n[13]
+ ip4_n[2] = ip6_n[14]
+ ip4_n[3] = ip6_n[15]
+ return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
+
def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
"""
Create IPv6 packet stream for inside network
@@ -284,8 +330,37 @@ class MethodHolder(VppTestCase):
return pkts
+ def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
+ """
+ Create packet stream for outside network
+
+ :param out_if: Outside interface
+ :param dst_ip: Destination IP address (Default use global NAT address)
+ :param hl: HL of generated packets
+ """
+ pkts = []
+ # TCP
+ p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
+ IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
+ TCP(dport=self.tcp_port_out, sport=20))
+ pkts.append(p)
+
+ # UDP
+ p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
+ IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
+ UDP(dport=self.udp_port_out, sport=20))
+ pkts.append(p)
+
+ # ICMP
+ p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
+ IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
+ ICMPv6EchoReply(id=self.icmp_id_out))
+ pkts.append(p)
+
+ return pkts
+
def verify_capture_out(self, capture, nat_ip=None, same_port=False,
- packet_num=3, dst_ip=None):
+ packet_num=3, dst_ip=None, is_ip6=False):
"""
Verify captured packets on outside network
@@ -294,16 +369,24 @@ class MethodHolder(VppTestCase):
:param same_port: Sorce port number is not translated (Default False)
:param packet_num: Expected number of packets (Default 3)
:param dst_ip: Destination IP address (Default do not verify)
+ :param is_ip6: If L3 protocol is IPv6 (Default False)
"""
+ if is_ip6:
+ IP46 = IPv6
+ ICMP46 = ICMPv6EchoRequest
+ else:
+ IP46 = IP
+ ICMP46 = ICMP
if nat_ip is None:
nat_ip = self.nat_addr
self.assertEqual(packet_num, len(capture))
for packet in capture:
try:
- self.check_ip_checksum(packet)
- self.assertEqual(packet[IP].src, nat_ip)
+ if not is_ip6:
+ self.check_ip_checksum(packet)
+ self.assertEqual(packet[IP46].src, nat_ip)
if dst_ip is not None:
- self.assertEqual(packet[IP].dst, dst_ip)
+ self.assertEqual(packet[IP46].dst, dst_ip)
if packet.haslayer(TCP):
if same_port:
self.assertEqual(packet[TCP].sport, self.tcp_port_in)
@@ -321,16 +404,33 @@ class MethodHolder(VppTestCase):
self.udp_port_out = packet[UDP].sport
else:
if same_port:
- self.assertEqual(packet[ICMP].id, self.icmp_id_in)
+ self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
else:
- self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
- self.icmp_id_out = packet[ICMP].id
- self.check_icmp_checksum(packet)
+ self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
+ self.icmp_id_out = packet[ICMP46].id
+ if is_ip6:
+ self.check_icmpv6_checksum(packet)
+ else:
+ self.check_icmp_checksum(packet)
except:
self.logger.error(ppp("Unexpected or invalid packet "
"(outside network):", packet))
raise
+ def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
+ packet_num=3, dst_ip=None):
+ """
+ Verify captured packets on outside network
+
+ :param capture: Captured packets
+ :param nat_ip: Translated IP address
+ :param same_port: Sorce port number is not translated (Default False)
+ :param packet_num: Expected number of packets (Default 3)
+ :param dst_ip: Destination IP address (Default do not verify)
+ """
+ return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
+ dst_ip, True)
+
def verify_capture_in(self, capture, in_if, packet_num=3):
"""
Verify captured packets on inside network
@@ -3105,6 +3205,123 @@ class TestNAT44(MethodHolder):
self.clear_nat44()
+class TestNAT44Out2InDPO(MethodHolder):
+ """ NAT44 Test Cases using out2in DPO """
+
+ @classmethod
+ def setUpConstants(cls):
+ super(TestNAT44Out2InDPO, cls).setUpConstants()
+ cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestNAT44Out2InDPO, cls).setUpClass()
+
+ try:
+ cls.tcp_port_in = 6303
+ cls.tcp_port_out = 6303
+ cls.udp_port_in = 6304
+ cls.udp_port_out = 6304
+ cls.icmp_id_in = 6305
+ cls.icmp_id_out = 6305
+ cls.nat_addr = '10.0.0.3'
+ cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
+ cls.dst_ip4 = '192.168.70.1'
+
+ cls.create_pg_interfaces(range(2))
+
+ cls.pg0.admin_up()
+ cls.pg0.config_ip4()
+ cls.pg0.resolve_arp()
+
+ cls.pg1.admin_up()
+ cls.pg1.config_ip6()
+ cls.pg1.resolve_ndp()
+
+ cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
+ dst_address_length=0,
+ next_hop_address=cls.pg1.remote_ip6n,
+ next_hop_sw_if_index=cls.pg1.sw_if_index)
+
+ except Exception:
+ super(TestNAT44Out2InDPO, cls).tearDownClass()
+ raise
+
+ def configure_xlat(self):
+ self.dst_ip6_pfx = '1:2:3::'
+ self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
+ self.dst_ip6_pfx)
+ self.dst_ip6_pfx_len = 96
+ self.src_ip6_pfx = '4:5:6::'
+ self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
+ self.src_ip6_pfx)
+ self.src_ip6_pfx_len = 96
+ self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
+ self.src_ip6_pfx_n, self.src_ip6_pfx_len,
+ '\x00\x00\x00\x00', 0, is_translation=1,
+ is_rfc6052=1)
+
+ def test_464xlat_ce(self):
+ """ Test 464XLAT CE with NAT44 """
+
+ self.configure_xlat()
+
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
+
+ out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
+ self.dst_ip6_pfx_len)
+ out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
+ self.src_ip6_pfx_len)
+
+ try:
+ pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
+ dst_ip=out_src_ip6)
+
+ pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
+ out_dst_ip6)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ self.verify_capture_in(capture, self.pg0)
+ finally:
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
+ is_add=0)
+ self.vapi.nat44_add_del_address_range(self.nat_addr_n,
+ self.nat_addr_n, is_add=0)
+
+ def test_464xlat_ce_no_nat(self):
+ """ Test 464XLAT CE without NAT44 """
+
+ self.configure_xlat()
+
+ out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
+ self.dst_ip6_pfx_len)
+ out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
+ self.src_ip6_pfx_len)
+
+ pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
+ nat_ip=out_dst_ip6, same_port=True)
+
+ pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ self.verify_capture_in(capture, self.pg0)
+
+
class TestDeterministicNAT(MethodHolder):
""" Deterministic NAT Test Cases """