summaryrefslogtreecommitdiffstats
path: root/test
diff options
context:
space:
mode:
authorMatus Fabian <matfabia@cisco.com>2017-11-06 05:04:53 -0800
committerOle Trøan <otroan@employees.org>2017-11-07 11:52:39 +0000
commit8ebe62536223e5a8d827b2b870cbd57aa34fd7ef (patch)
treef4ab30aa41a7a26bb6068e7f0f441930cc7774bb /test
parent5917939256af392914d8a648de0c3287042ddbf6 (diff)
NAT: DS-Lite (VPP-1040)
Dual-Stack Lite enables a broadband service provider to share IPv4 addresses among customers by combining two well-known technologies: IPv4-in-IPv6 and NAT. Change-Id: I039740f8548c623cd1ac89b8ecda1a6cc4aafb9c Signed-off-by: Matus Fabian <matfabia@cisco.com>
Diffstat (limited to 'test')
-rw-r--r--test/test_nat.py148
-rw-r--r--test/vpp_papi_provider.py28
2 files changed, 176 insertions, 0 deletions
diff --git a/test/test_nat.py b/test/test_nat.py
index 37e1b1e7b75..e420baffd1d 100644
--- a/test/test_nat.py
+++ b/test/test_nat.py
@@ -5,6 +5,7 @@ import unittest
import struct
from framework import VppTestCase, VppTestRunner, running_extended_tests
+from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
from scapy.layers.inet import IP, TCP, UDP, ICMP
from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
@@ -4007,5 +4008,152 @@ class TestNAT64(MethodHolder):
self.logger.info(self.vapi.cli("show nat64 session table all"))
self.clear_nat64()
+
+class TestDSlite(MethodHolder):
+ """ DS-Lite Test Cases """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestDSlite, cls).setUpClass()
+
+ try:
+ cls.nat_addr = '10.0.0.3'
+ cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
+
+ cls.create_pg_interfaces(range(2))
+ cls.pg0.admin_up()
+ cls.pg0.config_ip4()
+ cls.pg0.resolve_arp()
+ cls.pg1.admin_up()
+ cls.pg1.config_ip6()
+ cls.pg1.generate_remote_hosts(2)
+ cls.pg1.configure_ipv6_neighbors()
+
+ except Exception:
+ super(TestDSlite, cls).tearDownClass()
+ raise
+
+ def test_dslite(self):
+ """ Test DS-Lite """
+ self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
+ self.nat_addr_n)
+ aftr_ip4 = '192.0.0.1'
+ aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
+ aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
+ aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
+ self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
+
+ # UDP
+ p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
+ IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
+ UDP(sport=20000, dport=10000))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ capture = capture[0]
+ self.assertFalse(capture.haslayer(IPv6))
+ self.assertEqual(capture[IP].src, self.nat_addr)
+ self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
+ self.assertNotEqual(capture[UDP].sport, 20000)
+ self.assertEqual(capture[UDP].dport, 10000)
+ self.check_ip_checksum(capture)
+ out_port = capture[UDP].sport
+
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
+ UDP(sport=10000, dport=out_port))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ capture = capture[0]
+ self.assertEqual(capture[IPv6].src, aftr_ip6)
+ self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
+ self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
+ self.assertEqual(capture[IP].dst, '192.168.1.1')
+ self.assertEqual(capture[UDP].sport, 10000)
+ self.assertEqual(capture[UDP].dport, 20000)
+ self.check_ip_checksum(capture)
+
+ # TCP
+ p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
+ IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
+ TCP(sport=20001, dport=10001))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ capture = capture[0]
+ self.assertFalse(capture.haslayer(IPv6))
+ self.assertEqual(capture[IP].src, self.nat_addr)
+ self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
+ self.assertNotEqual(capture[TCP].sport, 20001)
+ self.assertEqual(capture[TCP].dport, 10001)
+ self.check_ip_checksum(capture)
+ self.check_tcp_checksum(capture)
+ out_port = capture[TCP].sport
+
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
+ TCP(sport=10001, dport=out_port))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ capture = capture[0]
+ self.assertEqual(capture[IPv6].src, aftr_ip6)
+ self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
+ self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
+ self.assertEqual(capture[IP].dst, '192.168.1.1')
+ self.assertEqual(capture[TCP].sport, 10001)
+ self.assertEqual(capture[TCP].dport, 20001)
+ self.check_ip_checksum(capture)
+ self.check_tcp_checksum(capture)
+
+ # ICMP
+ p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
+ IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
+ ICMP(id=4000, type='echo-request'))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ capture = capture[0]
+ self.assertFalse(capture.haslayer(IPv6))
+ self.assertEqual(capture[IP].src, self.nat_addr)
+ self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
+ self.assertNotEqual(capture[ICMP].id, 4000)
+ self.check_ip_checksum(capture)
+ self.check_icmp_checksum(capture)
+ out_id = capture[ICMP].id
+
+ p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
+ IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
+ ICMP(id=out_id, type='echo-reply'))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ capture = capture[0]
+ self.assertEqual(capture[IPv6].src, aftr_ip6)
+ self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
+ self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
+ self.assertEqual(capture[IP].dst, '192.168.1.1')
+ self.assertEqual(capture[ICMP].id, 4000)
+ self.check_ip_checksum(capture)
+ self.check_icmp_checksum(capture)
+
+ def tearDown(self):
+ super(TestDSlite, self).tearDown()
+ if not self.vpp_dead:
+ self.logger.info(self.vapi.cli("show dslite pool"))
+ self.logger.info(
+ self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
+ self.logger.info(self.vapi.cli("show dslite sessions"))
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)
diff --git a/test/vpp_papi_provider.py b/test/vpp_papi_provider.py
index d84012b2d2b..468cf83d57b 100644
--- a/test/vpp_papi_provider.py
+++ b/test/vpp_papi_provider.py
@@ -1647,6 +1647,34 @@ class VppPapiProvider(object):
"""
return self.api(self.papi.nat64_prefix_dump, {})
+ def dslite_set_aftr_addr(self, ip6, ip4):
+ """Set DS-Lite AFTR addresses
+
+ :param ip4: IPv4 address
+ :param ip6: IPv6 address
+ """
+ return self.api(
+ self.papi.dslite_set_aftr_addr,
+ {'ip4_addr': ip4,
+ 'ip6_addr': ip6})
+
+ def dslite_add_del_pool_addr_range(
+ self,
+ start_addr,
+ end_addr,
+ is_add=1):
+ """Add/del address range to DS-Lite pool
+
+ :param start_addr: First IP address
+ :param end_addr: Last IP address
+ :param is_add: 1 if add, 0 if delete (Default value = 1)
+ """
+ return self.api(
+ self.papi.dslite_add_del_pool_addr_range,
+ {'start_addr': start_addr,
+ 'end_addr': end_addr,
+ 'is_add': is_add})
+
def control_ping(self):
self.api(self.papi.control_ping)