aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_nat.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_nat.py')
-rw-r--r--test/test_nat.py131
1 files changed, 131 insertions, 0 deletions
diff --git a/test/test_nat.py b/test/test_nat.py
index e3b44859f61..7ff3f8517ad 100644
--- a/test/test_nat.py
+++ b/test/test_nat.py
@@ -6185,5 +6185,136 @@ class TestDSliteCE(MethodHolder):
self.logger.info(
self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
+
+class TestNAT66(MethodHolder):
+ """ NAT66 Test Cases """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestNAT66, cls).setUpClass()
+
+ try:
+ cls.nat_addr = 'fd01:ff::2'
+ cls.nat_addr_n = socket.inet_pton(socket.AF_INET6, cls.nat_addr)
+
+ cls.create_pg_interfaces(range(2))
+ cls.interfaces = list(cls.pg_interfaces)
+
+ for i in cls.interfaces:
+ i.admin_up()
+ i.config_ip6()
+ i.configure_ipv6_neighbors()
+
+ except Exception:
+ super(TestNAT66, cls).tearDownClass()
+ raise
+
+ def test_static(self):
+ """ 1:1 NAT66 test """
+ self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
+ self.vapi.nat66_add_del_interface(self.pg1.sw_if_index, is_inside=0)
+ self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
+ self.nat_addr_n)
+
+ # in2out
+ pkts = []
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
+ TCP())
+ pkts.append(p)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
+ UDP())
+ pkts.append(p)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
+ ICMPv6EchoRequest())
+ pkts.append(p)
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
+ GRE() / IP() / TCP())
+ pkts.append(p)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IPv6].src, self.nat_addr)
+ self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
+ if packet.haslayer(TCP):
+ self.check_tcp_checksum(packet)
+ elif packet.haslayer(UDP):
+ self.check_udp_checksum(packet)
+ elif packet.haslayer(ICMPv6EchoRequest):
+ self.check_icmpv6_checksum(packet)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
+ # out2in
+ pkts = []
+ p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
+ TCP())
+ pkts.append(p)
+ p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
+ UDP())
+ pkts.append(p)
+ p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
+ ICMPv6EchoReply())
+ pkts.append(p)
+ p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
+ GRE() / IP() / TCP())
+ pkts.append(p)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ for packet in capture:
+ try:
+ self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
+ self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
+ if packet.haslayer(TCP):
+ self.check_tcp_checksum(packet)
+ elif packet.haslayer(UDP):
+ self.check_udp_checksum(packet)
+ elif packet.haslayer(ICMPv6EchoReply):
+ self.check_icmpv6_checksum(packet)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", packet))
+ raise
+
+ sm = self.vapi.nat66_static_mapping_dump()
+ self.assertEqual(len(sm), 1)
+ self.assertEqual(sm[0].total_pkts, 8)
+
+ def clear_nat66(self):
+ """
+ Clear NAT66 configuration.
+ """
+ interfaces = self.vapi.nat66_interface_dump()
+ for intf in interfaces:
+ self.vapi.nat66_add_del_interface(intf.sw_if_index,
+ intf.is_inside,
+ is_add=0)
+
+ static_mappings = self.vapi.nat66_static_mapping_dump()
+ for sm in static_mappings:
+ self.vapi.nat66_add_del_static_mapping(sm.local_ip_address,
+ sm.external_ip_address,
+ sm.vrf_id,
+ is_add=0)
+
+ def tearDown(self):
+ super(TestNAT66, self).tearDown()
+ if not self.vpp_dead:
+ self.logger.info(self.vapi.cli("show nat66 interfaces"))
+ self.logger.info(self.vapi.cli("show nat66 static mappings"))
+ self.clear_nat66()
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)