diff options
author | Juraj Sloboda <jsloboda@cisco.com> | 2018-04-10 13:51:54 +0200 |
---|---|---|
committer | Damjan Marion <dmarion.lists@gmail.com> | 2018-04-19 10:35:13 +0000 |
commit | 1e5c07d379a092f4829e7081aa962d013b31fdfc (patch) | |
tree | 1b1a3c386663b3356773cf374b55a64f14d43709 /test/test_nat.py | |
parent | b14826e459302556febb391456ea90644278645a (diff) |
Add special Twice-NAT feature (VPP-1221)
When enabled then Twice-NAT is applied only when
source IP equals destination IP after DNAT
Change-Id: I58a9d1d222b2a10c83eafffb2107f32c1b4aa3a8
Signed-off-by: Juraj Sloboda <jsloboda@cisco.com>
Diffstat (limited to 'test/test_nat.py')
-rw-r--r-- | test/test_nat.py | 221 |
1 files changed, 125 insertions, 96 deletions
diff --git a/test/test_nat.py b/test/test_nat.py index 7c126199072..47f3b8c7299 100644 --- a/test/test_nat.py +++ b/test/test_nat.py @@ -922,6 +922,9 @@ class TestNAT44(MethodHolder): cls.pg0.generate_remote_hosts(3) cls.pg0.configure_ipv4_neighbors() + cls.pg1.generate_remote_hosts(1) + cls.pg1.configure_ipv4_neighbors() + cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7])) cls.vapi.ip_table_add_del(10, is_add=1) cls.vapi.ip_table_add_del(20, is_add=1) @@ -1029,6 +1032,7 @@ class TestNAT44(MethodHolder): vrf_id=sm.vrf_id, protocol=sm.protocol, twice_nat=sm.twice_nat, + self_twice_nat=sm.self_twice_nat, out2in_only=sm.out2in_only, tag=sm.tag, external_sw_if_index=sm.external_sw_if_index, @@ -1042,6 +1046,7 @@ class TestNAT44(MethodHolder): lb_sm.protocol, vrf_id=lb_sm.vrf_id, twice_nat=lb_sm.twice_nat, + self_twice_nat=lb_sm.self_twice_nat, out2in_only=lb_sm.out2in_only, tag=lb_sm.tag, is_add=0, @@ -1072,7 +1077,8 @@ class TestNAT44(MethodHolder): def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0', local_port=0, external_port=0, vrf_id=0, is_add=1, external_sw_if_index=0xFFFFFFFF, - proto=0, twice_nat=0, out2in_only=0, tag=""): + proto=0, twice_nat=0, self_twice_nat=0, + out2in_only=0, tag=""): """ Add/delete NAT44 static mapping @@ -1085,6 +1091,9 @@ class TestNAT44(MethodHolder): :param external_sw_if_index: External interface instead of IP address :param proto: IP protocol (Mandatory if port specified) :param twice_nat: 1 if translate external host address and port + :param self_twice_nat: 1 if translate external host address and port + whenever external host address equals + local address of internal host :param out2in_only: if 1 rule is matching only out2in direction :param tag: Opaque string tag """ @@ -1103,6 +1112,7 @@ class TestNAT44(MethodHolder): vrf_id, proto, twice_nat, + self_twice_nat, out2in_only, tag, is_add) @@ -3844,56 +3854,128 @@ class TestNAT44(MethodHolder): self.logger.error(ppp("Unexpected or invalid packet:", p)) raise - def test_twice_nat(self): - """ Twice NAT44 """ + def twice_nat_common(self, self_twice_nat=False, same_pg=False, lb=False, + client_id=None): twice_nat_addr = '10.0.1.3' + port_in = 8080 + if lb: + if not same_pg: + port_in1 = port_in + port_in2 = port_in + else: + port_in1 = port_in+1 + port_in2 = port_in+2 + port_out = 80 eh_port_out = 4567 - eh_port_in = 0 + + server1 = self.pg0.remote_hosts[0] + server2 = self.pg0.remote_hosts[1] + if lb and same_pg: + server2 = server1 + if not lb: + server = server1 + + pg0 = self.pg0 + if same_pg: + pg1 = self.pg0 + else: + pg1 = self.pg1 + + eh_translate = ((not self_twice_nat) or (not lb and same_pg) or + client_id == 1) + self.nat44_add_address(self.nat_addr) self.nat44_add_address(twice_nat_addr, twice_nat=1) - self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr, - port_in, port_out, proto=IP_PROTOS.tcp, - twice_nat=1) - self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index) - self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index, + if not lb: + self.nat44_add_static_mapping(pg0.remote_ip4, self.nat_addr, + port_in, port_out, + proto=IP_PROTOS.tcp, + twice_nat=int(not self_twice_nat), + self_twice_nat=int(self_twice_nat)) + else: + locals = [{'addr': server1.ip4n, + 'port': port_in1, + 'probability': 50}, + {'addr': server2.ip4n, + 'port': port_in2, + 'probability': 50}] + out_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr) + self.vapi.nat44_add_del_lb_static_mapping(out_addr_n, + port_out, + IP_PROTOS.tcp, + twice_nat=int( + not self_twice_nat), + self_twice_nat=int( + self_twice_nat), + local_num=len(locals), + locals=locals) + self.vapi.nat44_interface_add_del_feature(pg0.sw_if_index) + self.vapi.nat44_interface_add_del_feature(pg1.sw_if_index, is_inside=0) - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / + if same_pg: + if not lb: + client = server + else: + assert client_id is not None + if client_id == 1: + client = self.pg0.remote_hosts[0] + elif client_id == 2: + client = self.pg0.remote_hosts[1] + else: + client = pg1.remote_hosts[0] + p = (Ether(src=pg1.remote_mac, dst=pg1.local_mac) / + IP(src=client.ip4, dst=self.nat_addr) / TCP(sport=eh_port_out, dport=port_out)) - self.pg1.add_stream(p) + pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() - capture = self.pg0.get_capture(1) + capture = pg0.get_capture(1) p = capture[0] try: ip = p[IP] tcp = p[TCP] - self.assertEqual(ip.dst, self.pg0.remote_ip4) - self.assertEqual(ip.src, twice_nat_addr) - self.assertEqual(tcp.dport, port_in) - self.assertNotEqual(tcp.sport, eh_port_out) + if lb: + if ip.dst == server1.ip4: + server = server1 + port_in = port_in1 + else: + server = server2 + port_in = port_in2 + self.assertEqual(ip.dst, server.ip4) + if lb and same_pg: + self.assertIn(tcp.dport, [port_in1, port_in2]) + else: + self.assertEqual(tcp.dport, port_in) + if eh_translate: + self.assertEqual(ip.src, twice_nat_addr) + self.assertNotEqual(tcp.sport, eh_port_out) + else: + self.assertEqual(ip.src, client.ip4) + self.assertEqual(tcp.sport, eh_port_out) + eh_addr_in = ip.src eh_port_in = tcp.sport + saved_port_in = tcp.dport self.check_tcp_checksum(p) self.check_ip_checksum(p) except: self.logger.error(ppp("Unexpected or invalid packet:", p)) raise - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) / - TCP(sport=port_in, dport=eh_port_in)) - self.pg0.add_stream(p) + p = (Ether(src=server.mac, dst=pg0.local_mac) / + IP(src=server.ip4, dst=eh_addr_in) / + TCP(sport=saved_port_in, dport=eh_port_in)) + pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() - capture = self.pg1.get_capture(1) + capture = pg1.get_capture(1) p = capture[0] try: ip = p[IP] tcp = p[TCP] - self.assertEqual(ip.dst, self.pg1.remote_ip4) + self.assertEqual(ip.dst, client.ip4) self.assertEqual(ip.src, self.nat_addr) self.assertEqual(tcp.dport, eh_port_out) self.assertEqual(tcp.sport, port_out) @@ -3903,84 +3985,31 @@ class TestNAT44(MethodHolder): self.logger.error(ppp("Unexpected or invalid packet:", p)) raise - def test_twice_nat_lb(self): - """ Twice NAT44 local service load balancing """ - external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr) - twice_nat_addr = '10.0.1.3' - local_port = 8080 - external_port = 80 - eh_port_out = 4567 - eh_port_in = 0 - server1 = self.pg0.remote_hosts[0] - server2 = self.pg0.remote_hosts[1] + def test_twice_nat(self): + """ Twice NAT44 """ + self.twice_nat_common() - locals = [{'addr': server1.ip4n, - 'port': local_port, - 'probability': 50}, - {'addr': server2.ip4n, - 'port': local_port, - 'probability': 50}] + def test_self_twice_nat_positive(self): + """ Self Twice NAT44 (positive test) """ + self.twice_nat_common(self_twice_nat=True, same_pg=True) - self.nat44_add_address(self.nat_addr) - self.nat44_add_address(twice_nat_addr, twice_nat=1) + def test_self_twice_nat_negative(self): + """ Self Twice NAT44 (negative test) """ + self.twice_nat_common(self_twice_nat=True) - self.vapi.nat44_add_del_lb_static_mapping(external_addr_n, - external_port, - IP_PROTOS.tcp, - twice_nat=1, - local_num=len(locals), - locals=locals) - self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index) - self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index, - is_inside=0) + def test_twice_nat_lb(self): + """ Twice NAT44 local service load balancing """ + self.twice_nat_common(lb=True) - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=eh_port_out, dport=external_port)) - self.pg1.add_stream(p) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg0.get_capture(1) - p = capture[0] - server = None - try: - ip = p[IP] - tcp = p[TCP] - self.assertEqual(ip.src, twice_nat_addr) - self.assertIn(ip.dst, [server1.ip4, server2.ip4]) - if ip.dst == server1.ip4: - server = server1 - else: - server = server2 - self.assertNotEqual(tcp.sport, eh_port_out) - eh_port_in = tcp.sport - self.assertEqual(tcp.dport, local_port) - self.check_tcp_checksum(p) - self.check_ip_checksum(p) - except: - self.logger.error(ppp("Unexpected or invalid packet:", p)) - raise + def test_self_twice_nat_lb_positive(self): + """ Self Twice NAT44 local service load balancing (positive test) """ + self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True, + client_id=1) - p = (Ether(src=server.mac, dst=self.pg0.local_mac) / - IP(src=server.ip4, dst=twice_nat_addr) / - TCP(sport=local_port, dport=eh_port_in)) - self.pg0.add_stream(p) - self.pg_enable_capture(self.pg_interfaces) - self.pg_start() - capture = self.pg1.get_capture(1) - p = capture[0] - try: - ip = p[IP] - tcp = p[TCP] - self.assertEqual(ip.src, self.nat_addr) - self.assertEqual(ip.dst, self.pg1.remote_ip4) - self.assertEqual(tcp.sport, external_port) - self.assertEqual(tcp.dport, eh_port_out) - self.check_tcp_checksum(p) - self.check_ip_checksum(p) - except: - self.logger.error(ppp("Unexpected or invalid packet:", p)) - raise + def test_self_twice_nat_lb_negative(self): + """ Self Twice NAT44 local service load balancing (negative test) """ + self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True, + client_id=2) def test_twice_nat_interface_addr(self): """ Acquire twice NAT44 addresses from interface """ |