From 613b2c3c78fbec12cc87a0095ee5488252449698 Mon Sep 17 00:00:00 2001 From: Nathan Skrzypczak Date: Thu, 10 Sep 2020 17:44:41 +0200 Subject: cnat: Add support for SNat ICMP Type: feature snat supports : * echo request/reply by allocating an identifier when translating echo requests * icmp errors in the same manner as dnat Change-Id: I684e983b0181f95c5eace5a984d40084e5625fa4 Signed-off-by: Nathan Skrzypczak --- src/plugins/cnat/test/test_cnat.py | 271 +++++++++++++++++++++++++++---------- 1 file changed, 201 insertions(+), 70 deletions(-) (limited to 'src/plugins/cnat/test/test_cnat.py') diff --git a/src/plugins/cnat/test/test_cnat.py b/src/plugins/cnat/test/test_cnat.py index 34cd8b58240..3f8d33cec4e 100644 --- a/src/plugins/cnat/test/test_cnat.py +++ b/src/plugins/cnat/test/test_cnat.py @@ -10,6 +10,7 @@ from scapy.layers.l2 import Ether from scapy.layers.inet import IP, UDP, TCP, ICMP from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach +from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply import struct @@ -123,34 +124,6 @@ class VppCNatTranslation(VppObject): return c[0][self.id] -class VppCNATSourceNat(VppObject): - - def __init__(self, test, address, exclude_subnets=[]): - self._test = test - self.address = address - self.exclude_subnets = exclude_subnets - - def add_vpp_config(self): - a = ip_address(self.address) - if 4 == a.version: - self._test.vapi.cnat_set_snat_addresses(snat_ip4=self.address) - else: - self._test.vapi.cnat_set_snat_addresses(snat_ip6=self.address) - for subnet in self.exclude_subnets: - self.cnat_exclude_subnet(subnet, True) - - def cnat_exclude_subnet(self, exclude_subnet, isAdd=True): - add = 1 if isAdd else 0 - self._test.vapi.cnat_add_del_snat_prefix( - prefix=exclude_subnet, is_add=add) - - def query_vpp_config(self): - return False - - def remove_vpp_config(self): - return False - - class TestCNatTranslation(VppTestCase): """ CNat Translation """ extra_vpp_punt_config = ["cnat", "{", @@ -568,49 +541,219 @@ class TestCNatSourceNAT(VppTestCase): i.config_ip6() i.resolve_ndp() + self.pg0.configure_ipv6_neighbors() + self.pg0.configure_ipv4_neighbors() + self.pg1.generate_remote_hosts(2) + self.pg1.configure_ipv4_neighbors() + self.pg1.configure_ipv6_neighbors() + + self.vapi.cli("test cnat scanner off") + self.vapi.cnat_set_snat_addresses( + snat_ip4=self.pg2.remote_hosts[0].ip4, + snat_ip6=self.pg2.remote_hosts[0].ip6) + self.vapi.feature_enable_disable( + enable=1, + arc_name="ip6-unicast", + feature_name="ip6-cnat-snat", + sw_if_index=self.pg0.sw_if_index) + self.vapi.feature_enable_disable( + enable=1, + arc_name="ip4-unicast", + feature_name="ip4-cnat-snat", + sw_if_index=self.pg0.sw_if_index) + def tearDown(self): + self.vapi.cnat_session_purge() for i in self.pg_interfaces: i.unconfig_ip4() i.unconfig_ip6() i.admin_down() super(TestCNatSourceNAT, self).tearDown() - def cnat_set_snat_address(self, srcNatAddr, interface, isV6=False): - t1 = VppCNATSourceNat(self, srcNatAddr) - t1.add_vpp_config() - cnat_arc_name = "ip6-unicast" if isV6 else "ip4-unicast" - cnat_feature_name = "ip6-cnat-snat" if isV6 else "ip4-cnat-snat" - self.vapi.feature_enable_disable( - enable=1, - arc_name=cnat_arc_name, - feature_name=cnat_feature_name, - sw_if_index=interface.sw_if_index) + def test_snat_v6(self): + # """ CNat Source Nat v6 """ + self.sourcenat_test_tcp_udp_conf(TCP, isV6=True) + self.sourcenat_test_tcp_udp_conf(UDP, isV6=True) + self.sourcenat_test_icmp_err_conf(isV6=True) + self.sourcenat_test_icmp_echo6_conf() - return t1 + def test_snat_v4(self): + # """ CNat Source Nat v4 """ + self.sourcenat_test_tcp_udp_conf(TCP) + self.sourcenat_test_tcp_udp_conf(UDP) + self.sourcenat_test_icmp_err_conf() + self.sourcenat_test_icmp_echo4_conf() - def cnat_test_sourcenat(self, srcNatAddr, l4p=TCP, isV6=False): - ip_v = "ip6" if isV6 else "ip4" - IP46 = IPv6 if isV6 else IP - sports = [1234, 1235, 1236] - dports = [6661, 6662, 6663] + def sourcenat_test_icmp_echo6_conf(self): + sports = [1234, 1235] + dports = [6661, 6662] - self.pg0.generate_remote_hosts(1) - self.pg0.configure_ipv4_neighbors() - self.pg0.configure_ipv6_neighbors() - self.pg1.generate_remote_hosts(len(sports)) - self.pg1.configure_ipv4_neighbors() - self.pg1.configure_ipv6_neighbors() + for nbr, remote_host in enumerate(self.pg1.remote_hosts): + client_addr = self.pg0.remote_hosts[0].ip6 + remote_addr = self.pg1.remote_hosts[nbr].ip6 + src_nat_addr = self.pg2.remote_hosts[0].ip6 - self.vapi.cli("test cnat scanner on") - t1 = self.cnat_set_snat_address(srcNatAddr, self.pg0, isV6) + # ping from pods to outside network + p1 = ( + Ether(dst=self.pg0.local_mac, + src=self.pg0.remote_hosts[0].mac) / + IPv6(src=client_addr, dst=remote_addr) / + ICMPv6EchoRequest(id=0xfeed) / + Raw()) + + rxs = self.send_and_expect( + self.pg0, + p1 * N_PKTS, + self.pg1) + + for rx in rxs: + self.assertEqual(rx[IPv6].src, src_nat_addr) + self.assert_packet_checksums_valid(rx) + + received_id = rx[0][ICMPv6EchoRequest].id + # ping reply from outside to pods + p2 = ( + Ether(dst=self.pg1.local_mac, + src=self.pg1.remote_hosts[nbr].mac) / + IPv6(src=remote_addr, dst=src_nat_addr) / + ICMPv6EchoReply(id=received_id)) + rxs = self.send_and_expect( + self.pg1, + p2 * N_PKTS, + self.pg0) + + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(rx[IPv6].src, remote_addr) + self.assertEqual(rx[ICMPv6EchoReply].id, 0xfeed) + + def sourcenat_test_icmp_echo4_conf(self): + sports = [1234, 1235] + dports = [6661, 6662] + + for nbr, remote_host in enumerate(self.pg1.remote_hosts): + IP46 = IP + client_addr = self.pg0.remote_hosts[0].ip4 + remote_addr = self.pg1.remote_hosts[nbr].ip4 + src_nat_addr = self.pg2.remote_hosts[0].ip4 + + # ping from pods to outside network + p1 = ( + Ether(dst=self.pg0.local_mac, + src=self.pg0.remote_hosts[0].mac) / + IP46(src=client_addr, dst=remote_addr) / + ICMP(type=8, id=0xfeed) / + Raw()) + + rxs = self.send_and_expect( + self.pg0, + p1 * N_PKTS, + self.pg1) + + for rx in rxs: + self.assertEqual(rx[IP46].src, src_nat_addr) + self.assert_packet_checksums_valid(rx) + + received_id = rx[0][ICMP].id + # ping reply from outside to pods + p2 = ( + Ether(dst=self.pg1.local_mac, + src=self.pg1.remote_hosts[nbr].mac) / + IP46(src=remote_addr, dst=src_nat_addr) / + ICMP(type=0, id=received_id)) + rxs = self.send_and_expect( + self.pg1, + p2 * N_PKTS, + self.pg0) + + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(rx[IP46].src, remote_addr) + self.assertEqual(rx[ICMP].id, 0xfeed) + + def sourcenat_test_icmp_err_conf(self, isV6=False): + sports = [1234, 1235] + dports = [6661, 6662] for nbr, remote_host in enumerate(self.pg1.remote_hosts): if isV6: + IP46 = IPv6 client_addr = self.pg0.remote_hosts[0].ip6 remote_addr = self.pg1.remote_hosts[nbr].ip6 + src_nat_addr = self.pg2.remote_hosts[0].ip6 + ICMP46 = ICMPv6DestUnreach + ICMPelem = ICMPv6DestUnreach(code=1) + IP46error = IPerror6 else: + IP46 = IP client_addr = self.pg0.remote_hosts[0].ip4 remote_addr = self.pg1.remote_hosts[nbr].ip4 + src_nat_addr = self.pg2.remote_hosts[0].ip4 + IP46error = IPerror + ICMP46 = ICMP + ICMPelem = ICMP(type=11) + + # from pods to outside network + p1 = ( + Ether(dst=self.pg0.local_mac, + src=self.pg0.remote_hosts[0].mac) / + IP46(src=client_addr, dst=remote_addr) / + TCP(sport=sports[nbr], dport=dports[nbr]) / + Raw()) + + rxs = self.send_and_expect( + self.pg0, + p1 * N_PKTS, + self.pg1) + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(rx[IP46].dst, remote_addr) + self.assertEqual(rx[TCP].dport, dports[nbr]) + self.assertEqual(rx[IP46].src, src_nat_addr) + sport = rx[TCP].sport + + InnerIP = rxs[0][IP46] + # from outside to pods, ICMP error + p2 = ( + Ether(dst=self.pg1.local_mac, + src=self.pg1.remote_hosts[nbr].mac) / + IP46(src=remote_addr, dst=src_nat_addr) / + ICMPelem / InnerIP) + + rxs = self.send_and_expect( + self.pg1, + p2 * N_PKTS, + self.pg0) + + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(rx[IP46].src, remote_addr) + self.assertEqual(rx[ICMP46][IP46error].src, client_addr) + self.assertEqual(rx[ICMP46][IP46error].dst, remote_addr) + self.assertEqual(rx[ICMP46][IP46error] + [TCPerror].sport, sports[nbr]) + self.assertEqual(rx[ICMP46][IP46error] + [TCPerror].dport, dports[nbr]) + + def sourcenat_test_tcp_udp_conf(self, l4p, isV6=False): + sports = [1234, 1235] + dports = [6661, 6662] + + for nbr, remote_host in enumerate(self.pg1.remote_hosts): + if isV6: + IP46 = IPv6 + client_addr = self.pg0.remote_hosts[0].ip6 + remote_addr = self.pg1.remote_hosts[nbr].ip6 + src_nat_addr = self.pg2.remote_hosts[0].ip6 + exclude_prefix = ip_network( + "%s/100" % remote_addr, strict=False) + else: + IP46 = IP + client_addr = self.pg0.remote_hosts[0].ip4 + remote_addr = self.pg1.remote_hosts[nbr].ip4 + src_nat_addr = self.pg2.remote_hosts[0].ip4 + exclude_prefix = ip_network( + "%s/16" % remote_addr, strict=False) # from pods to outside network p1 = ( Ether(dst=self.pg0.local_mac, @@ -627,14 +770,14 @@ class TestCNatSourceNAT(VppTestCase): self.assert_packet_checksums_valid(rx) self.assertEqual(rx[IP46].dst, remote_addr) self.assertEqual(rx[l4p].dport, dports[nbr]) - self.assertEqual(rx[IP46].src, srcNatAddr) + self.assertEqual(rx[IP46].src, src_nat_addr) sport = rx[l4p].sport # from outside to pods p2 = ( Ether(dst=self.pg1.local_mac, src=self.pg1.remote_hosts[nbr].mac) / - IP46(src=remote_addr, dst=srcNatAddr) / + IP46(src=remote_addr, dst=src_nat_addr) / l4p(sport=dports[nbr], dport=sport) / Raw()) @@ -651,11 +794,7 @@ class TestCNatSourceNAT(VppTestCase): self.assertEqual(rx[IP46].src, remote_addr) # add remote host to exclude list - subnet_mask = 100 if isV6 else 16 - subnet = "%s/%d" % (remote_addr, subnet_mask) - exclude_subnet = ip_network(subnet, strict=False) - - t1.cnat_exclude_subnet(exclude_subnet) + self.vapi.cnat_add_del_snat_prefix(prefix=exclude_prefix, is_add=1) self.vapi.cnat_session_purge() rxs = self.send_and_expect( @@ -669,7 +808,7 @@ class TestCNatSourceNAT(VppTestCase): self.assertEqual(rx[IP46].src, client_addr) # remove remote host from exclude list - t1.cnat_exclude_subnet(exclude_subnet, isAdd=False) + self.vapi.cnat_add_del_snat_prefix(prefix=exclude_prefix, is_add=0) self.vapi.cnat_session_purge() rxs = self.send_and_expect( @@ -681,17 +820,9 @@ class TestCNatSourceNAT(VppTestCase): self.assert_packet_checksums_valid(rx) self.assertEqual(rx[IP46].dst, remote_addr) self.assertEqual(rx[l4p].dport, dports[nbr]) - self.assertEqual(rx[IP46].src, srcNatAddr) + self.assertEqual(rx[IP46].src, src_nat_addr) - def test_cnat6_sourcenat(self): - # """ CNat Source Nat ipv6 """ - self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, TCP, True) - self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, UDP, True) - - def test_cnat4_sourcenat(self): - # """ CNat Source Nat ipv4 """ - self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, TCP) - self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, UDP) + self.vapi.cnat_session_purge() if __name__ == '__main__': -- cgit 1.2.3-korg