aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_snat.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_snat.py')
-rw-r--r--test/test_snat.py139
1 files changed, 137 insertions, 2 deletions
diff --git a/test/test_snat.py b/test/test_snat.py
index 75445820a2f..d8268770f02 100644
--- a/test/test_snat.py
+++ b/test/test_snat.py
@@ -157,16 +157,65 @@ class MethodHolder(VppTestCase):
return pkts
- def create_stream_in_ip6(self, in_if, out_if, hlim=64):
+ def compose_ip6(self, ip4, pref, plen):
+ """
+ Compose IPv4-embedded IPv6 addresses
+
+ :param ip4: IPv4 address
+ :param pref: IPv6 prefix
+ :param plen: IPv6 prefix length
+ :returns: IPv4-embedded IPv6 addresses
+ """
+ pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
+ ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
+ if plen == 32:
+ pref_n[4] = ip4_n[0]
+ pref_n[5] = ip4_n[1]
+ pref_n[6] = ip4_n[2]
+ pref_n[7] = ip4_n[3]
+ elif plen == 40:
+ pref_n[5] = ip4_n[0]
+ pref_n[6] = ip4_n[1]
+ pref_n[7] = ip4_n[2]
+ pref_n[9] = ip4_n[3]
+ elif plen == 48:
+ pref_n[6] = ip4_n[0]
+ pref_n[7] = ip4_n[1]
+ pref_n[9] = ip4_n[2]
+ pref_n[10] = ip4_n[3]
+ elif plen == 56:
+ pref_n[7] = ip4_n[0]
+ pref_n[9] = ip4_n[1]
+ pref_n[10] = ip4_n[2]
+ pref_n[11] = ip4_n[3]
+ elif plen == 64:
+ pref_n[9] = ip4_n[0]
+ pref_n[10] = ip4_n[1]
+ pref_n[11] = ip4_n[2]
+ pref_n[12] = ip4_n[3]
+ elif plen == 96:
+ pref_n[12] = ip4_n[0]
+ pref_n[13] = ip4_n[1]
+ pref_n[14] = ip4_n[2]
+ pref_n[15] = ip4_n[3]
+ return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
+
+ def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
"""
Create IPv6 packet stream for inside network
:param in_if: Inside interface
:param out_if: Outside interface
:param ttl: Hop Limit of generated packets
+ :param pref: NAT64 prefix
+ :param plen: NAT64 prefix length
"""
pkts = []
- dst = ''.join(['64:ff9b::', out_if.remote_ip4])
+ if pref is None:
+ dst = ''.join(['64:ff9b::', out_if.remote_ip4])
+ else:
+ dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
+
# TCP
p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
@@ -3101,6 +3150,84 @@ class TestNAT64(MethodHolder):
self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
+ def test_prefix(self):
+ """ NAT64 Network-Specific Prefix """
+
+ self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+ self.nat_addr_n)
+ self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
+ self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+ self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
+ self.vrf1_nat_addr_n,
+ vrf_id=self.vrf1_id)
+ self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
+
+ # Add global prefix
+ global_pref64 = "2001:db8::"
+ global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
+ global_pref64_len = 32
+ self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
+
+ prefix = self.vapi.nat64_prefix_dump()
+ self.assertEqual(len(prefix), 1)
+ self.assertEqual(prefix[0].prefix, global_pref64_n)
+ self.assertEqual(prefix[0].prefix_len, global_pref64_len)
+ self.assertEqual(prefix[0].vrf_id, 0)
+
+ # Add tenant specific prefix
+ vrf1_pref64 = "2001:db8:122:300::"
+ vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
+ vrf1_pref64_len = 56
+ self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
+ vrf1_pref64_len,
+ vrf_id=self.vrf1_id)
+ prefix = self.vapi.nat64_prefix_dump()
+ self.assertEqual(len(prefix), 2)
+
+ # Global prefix
+ pkts = self.create_stream_in_ip6(self.pg0,
+ self.pg1,
+ pref=global_pref64,
+ plen=global_pref64_len)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture, nat_ip=self.nat_addr,
+ dst_ip=self.pg1.remote_ip4)
+
+ pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ dst_ip = self.compose_ip6(self.pg1.remote_ip4,
+ global_pref64,
+ global_pref64_len)
+ self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
+
+ # Tenant specific prefix
+ pkts = self.create_stream_in_ip6(self.pg2,
+ self.pg1,
+ pref=vrf1_pref64,
+ plen=vrf1_pref64_len)
+ self.pg2.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
+ dst_ip=self.pg1.remote_ip4)
+
+ pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg2.get_capture(len(pkts))
+ dst_ip = self.compose_ip6(self.pg1.remote_ip4,
+ vrf1_pref64,
+ vrf1_pref64_len)
+ self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
+
def nat64_get_ses_num(self):
"""
Return number of active NAT64 sessions.
@@ -3166,11 +3293,19 @@ class TestNAT64(MethodHolder):
vrf_id=addr.vrf_id,
is_add=0)
+ prefixes = self.vapi.nat64_prefix_dump()
+ for prefix in prefixes:
+ self.vapi.nat64_add_del_prefix(prefix.prefix,
+ prefix.prefix_len,
+ vrf_id=prefix.vrf_id,
+ is_add=0)
+
def tearDown(self):
super(TestNAT64, self).tearDown()
if not self.vpp_dead:
self.logger.info(self.vapi.cli("show nat64 pool"))
self.logger.info(self.vapi.cli("show nat64 interfaces"))
+ self.logger.info(self.vapi.cli("show nat64 prefix"))
self.logger.info(self.vapi.cli("show nat64 bib tcp"))
self.logger.info(self.vapi.cli("show nat64 bib udp"))
self.logger.info(self.vapi.cli("show nat64 bib icmp"))