diff options
Diffstat (limited to 'src/plugins/cnat/test/test_cnat.py')
-rw-r--r-- | src/plugins/cnat/test/test_cnat.py | 975 |
1 files changed, 0 insertions, 975 deletions
diff --git a/src/plugins/cnat/test/test_cnat.py b/src/plugins/cnat/test/test_cnat.py deleted file mode 100644 index ff4c44033cb..00000000000 --- a/src/plugins/cnat/test/test_cnat.py +++ /dev/null @@ -1,975 +0,0 @@ -#!/usr/bin/env python3 - -import unittest - -from framework import VppTestCase, VppTestRunner -from vpp_ip import DpoProto, INVALID_INDEX -from itertools import product - -from scapy.packet import Raw -from scapy.layers.l2 import Ether -from scapy.layers.inet import IP, UDP, TCP, ICMP -from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror -from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach -from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply - -import struct - -from ipaddress import ip_address, ip_network, \ - IPv4Address, IPv6Address, IPv4Network, IPv6Network - -from vpp_object import VppObject -from vpp_papi import VppEnum - -N_PKTS = 15 - - -class Ep(object): - """ CNat endpoint """ - - def __init__(self, ip=None, port=0, l4p=TCP, - sw_if_index=INVALID_INDEX, is_v6=False): - self.ip = ip - if ip is None: - self.ip = "::" if is_v6 else "0.0.0.0" - self.port = port - self.l4p = l4p - self.sw_if_index = sw_if_index - if is_v6: - self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP6 - else: - self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP4 - - def encode(self): - return {'addr': self.ip, - 'port': self.port, - 'sw_if_index': self.sw_if_index, - 'if_af': self.if_af} - - @classmethod - def from_pg(cls, pg, is_v6=False): - if pg is None: - return cls(is_v6=is_v6) - else: - return cls(sw_if_index=pg.sw_if_index, is_v6=is_v6) - - @property - def isV6(self): - return ":" in self.ip - - def __str__(self): - return ("%s:%d" % (self.ip, self.port)) - - -class EpTuple(object): - """ CNat endpoint """ - - def __init__(self, src, dst): - self.src = src - self.dst = dst - - def encode(self): - return {'src_ep': self.src.encode(), - 'dst_ep': self.dst.encode()} - - def __str__(self): - return ("%s->%s" % (self.src, self.dst)) - - -class VppCNatTranslation(VppObject): - - def __init__(self, test, iproto, vip, paths): - self._test = test - self.vip = vip - self.iproto = iproto - self.paths = paths - self.encoded_paths = [] - for path in self.paths: - self.encoded_paths.append(path.encode()) - - def __str__(self): - return ("%s %s %s" % (self.vip, self.iproto, self.paths)) - - @property - def vl4_proto(self): - ip_proto = VppEnum.vl_api_ip_proto_t - return { - UDP: ip_proto.IP_API_PROTO_UDP, - TCP: ip_proto.IP_API_PROTO_TCP, - }[self.iproto] - - def add_vpp_config(self): - r = self._test.vapi.cnat_translation_update( - {'vip': self.vip.encode(), - 'ip_proto': self.vl4_proto, - 'n_paths': len(self.paths), - 'paths': self.encoded_paths}) - self._test.registry.register(self, self._test.logger) - self.id = r.id - - def modify_vpp_config(self, paths): - self.paths = paths - self.encoded_paths = [] - for path in self.paths: - self.encoded_paths.append(path.encode()) - - r = self._test.vapi.cnat_translation_update( - {'vip': self.vip.encode(), - 'ip_proto': self.vl4_proto, - 'n_paths': len(self.paths), - 'paths': self.encoded_paths}) - self._test.registry.register(self, self._test.logger) - - def remove_vpp_config(self): - self._test.vapi.cnat_translation_del(id=self.id) - - def query_vpp_config(self): - for t in self._test.vapi.cnat_translation_dump(): - if self.id == t.translation.id: - return t.translation - return None - - def object_id(self): - return ("cnat-translation-%s" % (self.vip)) - - def get_stats(self): - c = self._test.statistics.get_counter("/net/cnat-translation") - return c[0][self.id] - - -class TestCNatTranslation(VppTestCase): - """ CNat Translation """ - extra_vpp_punt_config = ["cnat", "{", - "session-db-buckets", "64", - "session-cleanup-timeout", "0.1", - "session-max-age", "1", - "tcp-max-age", "1", - "scanner", "off", "}"] - - @classmethod - def setUpClass(cls): - super(TestCNatTranslation, cls).setUpClass() - - @classmethod - def tearDownClass(cls): - super(TestCNatTranslation, cls).tearDownClass() - - def setUp(self): - super(TestCNatTranslation, self).setUp() - - self.create_pg_interfaces(range(3)) - - for i in self.pg_interfaces: - i.admin_up() - i.config_ip4() - i.resolve_arp() - i.config_ip6() - i.resolve_ndp() - - def tearDown(self): - for i in self.pg_interfaces: - i.unconfig_ip4() - i.unconfig_ip6() - i.admin_down() - super(TestCNatTranslation, self).tearDown() - - def cnat_create_translation(self, vip, nbr): - ip_v = "ip6" if vip.isV6 else "ip4" - dep = Ep(getattr(self.pg1.remote_hosts[nbr], ip_v), 4000 + nbr) - sep = Ep("::", 0) if vip.isV6 else Ep("0.0.0.0", 0) - t1 = VppCNatTranslation( - self, vip.l4p, vip, - [EpTuple(sep, dep), EpTuple(sep, dep)]) - t1.add_vpp_config() - return t1 - - def cnat_test_translation(self, t1, nbr, sports, isV6=False): - ip_v = "ip6" if isV6 else "ip4" - ip_class = IPv6 if isV6 else IP - vip = t1.vip - - # - # Flows - # - for src in self.pg0.remote_hosts: - for sport in sports: - # from client to vip - p1 = (Ether(dst=self.pg0.local_mac, - src=src.mac) / - ip_class(src=getattr(src, ip_v), dst=vip.ip) / - vip.l4p(sport=sport, dport=vip.port) / - Raw()) - - self.vapi.cli("trace add pg-input 1") - rxs = self.send_and_expect(self.pg0, - p1 * N_PKTS, - self.pg1) - self.logger.info(self.vapi.cli("show trace max 1")) - - for rx in rxs: - self.assert_packet_checksums_valid(rx) - self.assertEqual( - rx[ip_class].dst, - getattr(self.pg1.remote_hosts[nbr], ip_v)) - self.assertEqual(rx[vip.l4p].dport, 4000 + nbr) - self.assertEqual( - rx[ip_class].src, - getattr(src, ip_v)) - self.assertEqual(rx[vip.l4p].sport, sport) - - # from vip to client - p1 = (Ether(dst=self.pg1.local_mac, - src=self.pg1.remote_mac) / - ip_class(src=getattr( - self.pg1.remote_hosts[nbr], - ip_v), - dst=getattr(src, ip_v)) / - vip.l4p(sport=4000 + nbr, dport=sport) / - Raw()) - - rxs = self.send_and_expect(self.pg1, - p1 * N_PKTS, - self.pg0) - - for rx in rxs: - self.assert_packet_checksums_valid(rx) - self.assertEqual( - rx[ip_class].dst, - getattr(src, ip_v)) - self.assertEqual(rx[vip.l4p].dport, sport) - self.assertEqual(rx[ip_class].src, vip.ip) - self.assertEqual(rx[vip.l4p].sport, vip.port) - - # - # packets to the VIP that do not match a - # translation are dropped - # - p1 = (Ether(dst=self.pg0.local_mac, - src=src.mac) / - ip_class(src=getattr(src, ip_v), dst=vip.ip) / - vip.l4p(sport=sport, dport=6666) / - Raw()) - - self.send_and_assert_no_replies(self.pg0, - p1 * N_PKTS, - self.pg1) - - # - # packets from the VIP that do not match a - # session are forwarded - # - p1 = (Ether(dst=self.pg1.local_mac, - src=self.pg1.remote_mac) / - ip_class(src=getattr( - self.pg1.remote_hosts[nbr], - ip_v), - dst=getattr(src, ip_v)) / - vip.l4p(sport=6666, dport=sport) / - Raw()) - - rxs = self.send_and_expect(self.pg1, - p1 * N_PKTS, - self.pg0) - - def cnat_test_translation_update(self, t1, sports, isV6=False): - ip_v = "ip6" if isV6 else "ip4" - ip_class = IPv6 if isV6 else IP - vip = t1.vip - - # - # modify the translation to use a different backend - # - dep = Ep(getattr(self.pg2, 'remote_' + ip_v), 5000) - sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0) - t1.modify_vpp_config([EpTuple(sep, dep)]) - - # - # existing flows follow the old path - # - for src in self.pg0.remote_hosts: - for sport in sports: - # from client to vip - p1 = (Ether(dst=self.pg0.local_mac, - src=src.mac) / - ip_class(src=getattr(src, ip_v), dst=vip.ip) / - vip.l4p(sport=sport, dport=vip.port) / - Raw()) - - rxs = self.send_and_expect(self.pg0, - p1 * N_PKTS, - self.pg1) - - # - # new flows go to the new backend - # - for src in self.pg0.remote_hosts: - p1 = (Ether(dst=self.pg0.local_mac, - src=src.mac) / - ip_class(src=getattr(src, ip_v), dst=vip.ip) / - vip.l4p(sport=9999, dport=vip.port) / - Raw()) - - rxs = self.send_and_expect(self.pg0, - p1 * N_PKTS, - self.pg2) - - def cnat_translation(self, vips, isV6=False): - """ CNat Translation """ - - ip_class = IPv6 if isV6 else IP - ip_v = "ip6" if isV6 else "ip4" - sports = [1234, 1233] - - # - # turn the scanner off whilst testing otherwise sessions - # will time out - # - self.vapi.cli("test cnat scanner off") - - sessions = self.vapi.cnat_session_dump() - - trs = [] - for nbr, vip in enumerate(vips): - trs.append(self.cnat_create_translation(vip, nbr)) - - self.logger.info(self.vapi.cli("sh cnat client")) - self.logger.info(self.vapi.cli("sh cnat translation")) - - # - # translations - # - for nbr, vip in enumerate(vips): - self.cnat_test_translation(trs[nbr], nbr, sports, isV6=isV6) - self.cnat_test_translation_update(trs[nbr], sports, isV6=isV6) - if isV6: - self.logger.info(self.vapi.cli( - "sh ip6 fib %s" % self.pg0.remote_ip6)) - else: - self.logger.info(self.vapi.cli( - "sh ip fib %s" % self.pg0.remote_ip4)) - self.logger.info(self.vapi.cli("sh cnat session verbose")) - - # - # turn the scanner back on and wait until the sessions - # all disapper - # - self.vapi.cli("test cnat scanner on") - - n_tries = 0 - sessions = self.vapi.cnat_session_dump() - while (len(sessions) and n_tries < 100): - n_tries += 1 - sessions = self.vapi.cnat_session_dump() - self.sleep(2) - self.logger.info(self.vapi.cli("show cnat session verbose")) - - self.assertTrue(n_tries < 100) - self.vapi.cli("test cnat scanner off") - - # - # load some flows again and purge - # - for vip in vips: - for src in self.pg0.remote_hosts: - for sport in sports: - # from client to vip - p1 = (Ether(dst=self.pg0.local_mac, - src=src.mac) / - ip_class(src=getattr(src, ip_v), dst=vip.ip) / - vip.l4p(sport=sport, dport=vip.port) / - Raw()) - self.send_and_expect(self.pg0, - p1 * N_PKTS, - self.pg2) - - for tr in trs: - tr.remove_vpp_config() - - self.assertTrue(self.vapi.cnat_session_dump()) - self.vapi.cnat_session_purge() - self.assertFalse(self.vapi.cnat_session_dump()) - - def test_icmp(self): - vips = [ - Ep("30.0.0.1", 5555), - Ep("30.0.0.2", 5554), - Ep("30.0.0.2", 5553, UDP), - Ep("30::1", 6666), - Ep("30::2", 5553, UDP), - ] - sport = 1234 - - self.pg0.generate_remote_hosts(len(vips)) - self.pg0.configure_ipv6_neighbors() - self.pg0.configure_ipv4_neighbors() - - self.pg1.generate_remote_hosts(len(vips)) - self.pg1.configure_ipv6_neighbors() - self.pg1.configure_ipv4_neighbors() - - self.vapi.cli("test cnat scanner off") - trs = [] - for nbr, vip in enumerate(vips): - trs.append(self.cnat_create_translation(vip, nbr)) - - self.logger.info(self.vapi.cli("sh cnat client")) - self.logger.info(self.vapi.cli("sh cnat translation")) - - for nbr, vip in enumerate(vips): - if vip.isV6: - client_addr = self.pg0.remote_hosts[0].ip6 - remote_addr = self.pg1.remote_hosts[nbr].ip6 - remote2_addr = self.pg2.remote_hosts[0].ip6 - else: - client_addr = self.pg0.remote_hosts[0].ip4 - remote_addr = self.pg1.remote_hosts[nbr].ip4 - remote2_addr = self.pg2.remote_hosts[0].ip4 - IP46 = IPv6 if vip.isV6 else IP - # from client to vip - p1 = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_hosts[0].mac) / - IP46(src=client_addr, dst=vip.ip) / - vip.l4p(sport=sport, dport=vip.port) / - Raw()) - - rxs = self.send_and_expect(self.pg0, - p1 * N_PKTS, - self.pg1) - - for rx in rxs: - self.assert_packet_checksums_valid(rx) - self.assertEqual(rx[IP46].dst, remote_addr) - self.assertEqual(rx[vip.l4p].dport, 4000 + nbr) - self.assertEqual(rx[IP46].src, client_addr) - self.assertEqual(rx[vip.l4p].sport, sport) - - InnerIP = rxs[0][IP46] - - ICMP46 = ICMPv6DestUnreach if vip.isV6 else ICMP - ICMPelem = ICMPv6DestUnreach(code=1) if vip.isV6 else ICMP(type=11) - # from vip to client, ICMP error - p1 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / - IP46(src=remote_addr, dst=client_addr) / - ICMPelem / InnerIP) - - rxs = self.send_and_expect(self.pg1, - p1 * N_PKTS, - self.pg0) - - TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror - IP46error = IPerror6 if vip.isV6 else IPerror - for rx in rxs: - self.assert_packet_checksums_valid(rx) - self.assertEqual(rx[IP46].src, vip.ip) - self.assertEqual(rx[ICMP46][IP46error].src, client_addr) - self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip) - self.assertEqual(rx[ICMP46][IP46error] - [TCPUDPError].sport, sport) - self.assertEqual(rx[ICMP46][IP46error] - [TCPUDPError].dport, vip.port) - - # from other remote to client, ICMP error - # outside shouldn't be NAT-ed - p1 = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) / - IP46(src=remote2_addr, dst=client_addr) / - ICMPelem / InnerIP) - - rxs = self.send_and_expect(self.pg1, - p1 * N_PKTS, - self.pg0) - - TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror - IP46error = IPerror6 if vip.isV6 else IPerror - for rx in rxs: - self.assert_packet_checksums_valid(rx) - self.assertEqual(rx[IP46].src, remote2_addr) - self.assertEqual(rx[ICMP46][IP46error].src, client_addr) - self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip) - self.assertEqual(rx[ICMP46][IP46error] - [TCPUDPError].sport, sport) - self.assertEqual(rx[ICMP46][IP46error] - [TCPUDPError].dport, vip.port) - - self.vapi.cnat_session_purge() - - def test_cnat6(self): - # """ CNat Translation ipv6 """ - vips = [ - Ep("30::1", 5555), - Ep("30::2", 5554), - Ep("30::2", 5553, UDP), - ] - - self.pg0.generate_remote_hosts(len(vips)) - self.pg0.configure_ipv6_neighbors() - self.pg1.generate_remote_hosts(len(vips)) - self.pg1.configure_ipv6_neighbors() - - self.cnat_translation(vips, isV6=True) - - def test_cnat4(self): - # """ CNat Translation ipv4 """ - - vips = [ - Ep("30.0.0.1", 5555), - Ep("30.0.0.2", 5554), - Ep("30.0.0.2", 5553, UDP), - ] - - self.pg0.generate_remote_hosts(len(vips)) - self.pg0.configure_ipv4_neighbors() - self.pg1.generate_remote_hosts(len(vips)) - self.pg1.configure_ipv4_neighbors() - - self.cnat_translation(vips) - - -class TestCNatSourceNAT(VppTestCase): - """ CNat Source NAT """ - extra_vpp_punt_config = ["cnat", "{", - "session-cleanup-timeout", "0.1", - "session-max-age", "1", - "tcp-max-age", "1", - "scanner", "off", "}"] - - @classmethod - def setUpClass(cls): - super(TestCNatSourceNAT, cls).setUpClass() - - @classmethod - def tearDownClass(cls): - super(TestCNatSourceNAT, cls).tearDownClass() - - def setUp(self): - super(TestCNatSourceNAT, self).setUp() - - self.create_pg_interfaces(range(3)) - - for i in self.pg_interfaces: - i.admin_up() - i.config_ip4() - i.resolve_arp() - i.config_ip6() - i.resolve_ndp() - - self.pg0.configure_ipv6_neighbors() - self.pg0.configure_ipv4_neighbors() - self.pg1.generate_remote_hosts(2) - self.pg1.configure_ipv4_neighbors() - self.pg1.configure_ipv6_neighbors() - - self.vapi.cnat_set_snat_addresses( - snat_ip4=self.pg2.remote_hosts[0].ip4, - snat_ip6=self.pg2.remote_hosts[0].ip6, - sw_if_index=INVALID_INDEX) - self.vapi.feature_enable_disable( - enable=1, - arc_name="ip6-unicast", - feature_name="cnat-snat-ip6", - sw_if_index=self.pg0.sw_if_index) - self.vapi.feature_enable_disable( - enable=1, - arc_name="ip4-unicast", - feature_name="cnat-snat-ip4", - sw_if_index=self.pg0.sw_if_index) - - policie_tbls = VppEnum.vl_api_cnat_snat_policy_table_t - self.vapi.cnat_set_snat_policy( - policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX) - for i in self.pg_interfaces: - self.vapi.cnat_snat_policy_add_del_if( - sw_if_index=i.sw_if_index, is_add=1, - table=policie_tbls.CNAT_POLICY_INCLUDE_V6) - self.vapi.cnat_snat_policy_add_del_if( - sw_if_index=i.sw_if_index, is_add=1, - table=policie_tbls.CNAT_POLICY_INCLUDE_V4) - - def tearDown(self): - self.vapi.cnat_session_purge() - for i in self.pg_interfaces: - i.unconfig_ip4() - i.unconfig_ip6() - i.admin_down() - super(TestCNatSourceNAT, self).tearDown() - - def test_snat_v6(self): - # """ CNat Source Nat v6 """ - self.sourcenat_test_tcp_udp_conf(TCP, isV6=True) - self.sourcenat_test_tcp_udp_conf(UDP, isV6=True) - self.sourcenat_test_icmp_err_conf(isV6=True) - self.sourcenat_test_icmp_echo6_conf() - - def test_snat_v4(self): - # """ CNat Source Nat v4 """ - self.sourcenat_test_tcp_udp_conf(TCP) - self.sourcenat_test_tcp_udp_conf(UDP) - self.sourcenat_test_icmp_err_conf() - self.sourcenat_test_icmp_echo4_conf() - - def sourcenat_test_icmp_echo6_conf(self): - sports = [1234, 1235] - dports = [6661, 6662] - - for nbr, remote_host in enumerate(self.pg1.remote_hosts): - client_addr = self.pg0.remote_hosts[0].ip6 - remote_addr = self.pg1.remote_hosts[nbr].ip6 - src_nat_addr = self.pg2.remote_hosts[0].ip6 - - # ping from pods to outside network - p1 = ( - Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_hosts[0].mac) / - IPv6(src=client_addr, dst=remote_addr) / - ICMPv6EchoRequest(id=0xfeed) / - Raw()) - - rxs = self.send_and_expect( - self.pg0, - p1 * N_PKTS, - self.pg1) - - for rx in rxs: - self.assertEqual(rx[IPv6].src, src_nat_addr) - self.assert_packet_checksums_valid(rx) - - received_id = rx[0][ICMPv6EchoRequest].id - # ping reply from outside to pods - p2 = ( - Ether(dst=self.pg1.local_mac, - src=self.pg1.remote_hosts[nbr].mac) / - IPv6(src=remote_addr, dst=src_nat_addr) / - ICMPv6EchoReply(id=received_id)) - rxs = self.send_and_expect( - self.pg1, - p2 * N_PKTS, - self.pg0) - - for rx in rxs: - self.assert_packet_checksums_valid(rx) - self.assertEqual(rx[IPv6].src, remote_addr) - self.assertEqual(rx[ICMPv6EchoReply].id, 0xfeed) - - def sourcenat_test_icmp_echo4_conf(self): - sports = [1234, 1235] - dports = [6661, 6662] - - for nbr, remote_host in enumerate(self.pg1.remote_hosts): - IP46 = IP - client_addr = self.pg0.remote_hosts[0].ip4 - remote_addr = self.pg1.remote_hosts[nbr].ip4 - src_nat_addr = self.pg2.remote_hosts[0].ip4 - - # ping from pods to outside network - p1 = ( - Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_hosts[0].mac) / - IP46(src=client_addr, dst=remote_addr) / - ICMP(type=8, id=0xfeed) / - Raw()) - - rxs = self.send_and_expect( - self.pg0, - p1 * N_PKTS, - self.pg1) - - for rx in rxs: - self.assertEqual(rx[IP46].src, src_nat_addr) - self.assert_packet_checksums_valid(rx) - - received_id = rx[0][ICMP].id - # ping reply from outside to pods - p2 = ( - Ether(dst=self.pg1.local_mac, - src=self.pg1.remote_hosts[nbr].mac) / - IP46(src=remote_addr, dst=src_nat_addr) / - ICMP(type=0, id=received_id)) - rxs = self.send_and_expect( - self.pg1, - p2 * N_PKTS, - self.pg0) - - for rx in rxs: - self.assert_packet_checksums_valid(rx) - self.assertEqual(rx[IP46].src, remote_addr) - self.assertEqual(rx[ICMP].id, 0xfeed) - - def sourcenat_test_icmp_err_conf(self, isV6=False): - sports = [1234, 1235] - dports = [6661, 6662] - - for nbr, remote_host in enumerate(self.pg1.remote_hosts): - if isV6: - IP46 = IPv6 - client_addr = self.pg0.remote_hosts[0].ip6 - remote_addr = self.pg1.remote_hosts[nbr].ip6 - src_nat_addr = self.pg2.remote_hosts[0].ip6 - ICMP46 = ICMPv6DestUnreach - ICMPelem = ICMPv6DestUnreach(code=1) - IP46error = IPerror6 - else: - IP46 = IP - client_addr = self.pg0.remote_hosts[0].ip4 - remote_addr = self.pg1.remote_hosts[nbr].ip4 - src_nat_addr = self.pg2.remote_hosts[0].ip4 - IP46error = IPerror - ICMP46 = ICMP - ICMPelem = ICMP(type=11) - - # from pods to outside network - p1 = ( - Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_hosts[0].mac) / - IP46(src=client_addr, dst=remote_addr) / - TCP(sport=sports[nbr], dport=dports[nbr]) / - Raw()) - - rxs = self.send_and_expect( - self.pg0, - p1 * N_PKTS, - self.pg1) - for rx in rxs: - self.assert_packet_checksums_valid(rx) - self.assertEqual(rx[IP46].dst, remote_addr) - self.assertEqual(rx[TCP].dport, dports[nbr]) - self.assertEqual(rx[IP46].src, src_nat_addr) - sport = rx[TCP].sport - - InnerIP = rxs[0][IP46] - # from outside to pods, ICMP error - p2 = ( - Ether(dst=self.pg1.local_mac, - src=self.pg1.remote_hosts[nbr].mac) / - IP46(src=remote_addr, dst=src_nat_addr) / - ICMPelem / InnerIP) - - rxs = self.send_and_expect( - self.pg1, - p2 * N_PKTS, - self.pg0) - - for rx in rxs: - self.assert_packet_checksums_valid(rx) - self.assertEqual(rx[IP46].src, remote_addr) - self.assertEqual(rx[ICMP46][IP46error].src, client_addr) - self.assertEqual(rx[ICMP46][IP46error].dst, remote_addr) - self.assertEqual(rx[ICMP46][IP46error] - [TCPerror].sport, sports[nbr]) - self.assertEqual(rx[ICMP46][IP46error] - [TCPerror].dport, dports[nbr]) - - def sourcenat_test_tcp_udp_conf(self, l4p, isV6=False): - sports = [1234, 1235] - dports = [6661, 6662] - - for nbr, remote_host in enumerate(self.pg1.remote_hosts): - if isV6: - IP46 = IPv6 - client_addr = self.pg0.remote_hosts[0].ip6 - remote_addr = self.pg1.remote_hosts[nbr].ip6 - src_nat_addr = self.pg2.remote_hosts[0].ip6 - exclude_prefix = ip_network( - "%s/100" % remote_addr, strict=False) - else: - IP46 = IP - client_addr = self.pg0.remote_hosts[0].ip4 - remote_addr = self.pg1.remote_hosts[nbr].ip4 - src_nat_addr = self.pg2.remote_hosts[0].ip4 - exclude_prefix = ip_network( - "%s/16" % remote_addr, strict=False) - # from pods to outside network - p1 = ( - Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_hosts[0].mac) / - IP46(src=client_addr, dst=remote_addr) / - l4p(sport=sports[nbr], dport=dports[nbr]) / - Raw()) - - self.vapi.cli("trace add pg-input 1") - rxs = self.send_and_expect( - self.pg0, - p1 * N_PKTS, - self.pg1) - self.logger.info(self.vapi.cli("show trace max 1")) - - for rx in rxs: - self.assert_packet_checksums_valid(rx) - self.assertEqual(rx[IP46].dst, remote_addr) - self.assertEqual(rx[l4p].dport, dports[nbr]) - self.assertEqual(rx[IP46].src, src_nat_addr) - sport = rx[l4p].sport - - # from outside to pods - p2 = ( - Ether(dst=self.pg1.local_mac, - src=self.pg1.remote_hosts[nbr].mac) / - IP46(src=remote_addr, dst=src_nat_addr) / - l4p(sport=dports[nbr], dport=sport) / - Raw()) - - rxs = self.send_and_expect( - self.pg1, - p2 * N_PKTS, - self.pg0) - - for rx in rxs: - self.assert_packet_checksums_valid(rx) - self.assertEqual(rx[IP46].dst, client_addr) - self.assertEqual(rx[l4p].dport, sports[nbr]) - self.assertEqual(rx[l4p].sport, dports[nbr]) - self.assertEqual(rx[IP46].src, remote_addr) - - # add remote host to exclude list - self.vapi.cnat_snat_policy_add_del_exclude_pfx( - prefix=exclude_prefix, is_add=1) - self.vapi.cnat_session_purge() - - rxs = self.send_and_expect( - self.pg0, - p1 * N_PKTS, - self.pg1) - for rx in rxs: - self.assert_packet_checksums_valid(rx) - self.assertEqual(rx[IP46].dst, remote_addr) - self.assertEqual(rx[l4p].dport, dports[nbr]) - self.assertEqual(rx[IP46].src, client_addr) - - # remove remote host from exclude list - self.vapi.cnat_snat_policy_add_del_exclude_pfx( - prefix=exclude_prefix, is_add=0) - self.vapi.cnat_session_purge() - - rxs = self.send_and_expect( - self.pg0, - p1 * N_PKTS, - self.pg1) - - for rx in rxs: - self.assert_packet_checksums_valid(rx) - self.assertEqual(rx[IP46].dst, remote_addr) - self.assertEqual(rx[l4p].dport, dports[nbr]) - self.assertEqual(rx[IP46].src, src_nat_addr) - - self.vapi.cnat_session_purge() - - -class TestCNatDHCP(VppTestCase): - """ CNat Translation """ - extra_vpp_punt_config = ["cnat", "{", - "session-db-buckets", "64", - "session-cleanup-timeout", "0.1", - "session-max-age", "1", - "tcp-max-age", "1", - "scanner", "off", "}"] - - @classmethod - def setUpClass(cls): - super(TestCNatDHCP, cls).setUpClass() - - @classmethod - def tearDownClass(cls): - super(TestCNatDHCP, cls).tearDownClass() - - def tearDown(self): - for i in self.pg_interfaces: - i.admin_down() - super(TestCNatDHCP, self).tearDown() - - def create_translation(self, vip_pg, *args, is_v6=False): - vip = Ep(sw_if_index=vip_pg.sw_if_index, is_v6=is_v6) - paths = [] - for (src_pg, dst_pg) in args: - paths.append(EpTuple( - Ep.from_pg(src_pg, is_v6=is_v6), - Ep.from_pg(dst_pg, is_v6=is_v6) - )) - t1 = VppCNatTranslation(self, TCP, vip, paths) - t1.add_vpp_config() - return t1 - - def make_addr(self, sw_if_index, i, is_v6): - if is_v6: - return "fd01:%x::%u" % (sw_if_index, i + 1) - else: - return "172.16.%u.%u" % (sw_if_index, i) - - def make_prefix(self, sw_if_index, i, is_v6): - if is_v6: - return "%s/128" % self.make_addr(sw_if_index, i, is_v6) - else: - return "%s/32" % self.make_addr(sw_if_index, i, is_v6) - - def check_resolved(self, tr, vip_pg, *args, i=0, is_v6=False): - qt1 = tr.query_vpp_config() - self.assertEqual(str(qt1.vip.addr), self.make_addr( - vip_pg.sw_if_index, i, is_v6)) - for (src_pg, dst_pg), path in zip(args, qt1.paths): - if src_pg: - self.assertEqual(str(path.src_ep.addr), self.make_addr( - src_pg.sw_if_index, i, is_v6)) - if dst_pg: - self.assertEqual(str(path.dst_ep.addr), self.make_addr( - dst_pg.sw_if_index, i, is_v6)) - - def config_ips(self, rng, is_add=1, is_v6=False): - for pg, i in product(self.pg_interfaces, rng): - self.vapi.sw_interface_add_del_address( - sw_if_index=pg.sw_if_index, - prefix=self.make_prefix(pg.sw_if_index, i, is_v6), - is_add=is_add) - - def test_dhcp_v4(self): - self.create_pg_interfaces(range(5)) - for i in self.pg_interfaces: - i.admin_up() - pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4)) - t1 = self.create_translation(*pglist) - self.config_ips([0]) - self.check_resolved(t1, *pglist) - self.config_ips([1]) - self.config_ips([0], is_add=0) - self.check_resolved(t1, *pglist, i=1) - self.config_ips([1], is_add=0) - t1.remove_vpp_config() - - def test_dhcp_v6(self): - self.create_pg_interfaces(range(5)) - for i in self.pg_interfaces: - i.admin_up() - pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4)) - t1 = self.create_translation(*pglist, is_v6=True) - self.config_ips([0], is_v6=True) - self.check_resolved(t1, *pglist, is_v6=True) - self.config_ips([1], is_v6=True) - self.config_ips([0], is_add=0, is_v6=True) - self.check_resolved(t1, *pglist, i=1, is_v6=True) - self.config_ips([1], is_add=0, is_v6=True) - t1.remove_vpp_config() - - def test_dhcp_snat(self): - self.create_pg_interfaces(range(1)) - for i in self.pg_interfaces: - i.admin_up() - self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index) - self.config_ips([0], is_v6=False) - self.config_ips([0], is_v6=True) - r = self.vapi.cnat_get_snat_addresses() - self.assertEqual(str(r.snat_ip4), self.make_addr( - self.pg0.sw_if_index, 0, False)) - self.assertEqual(str(r.snat_ip6), self.make_addr( - self.pg0.sw_if_index, 0, True)) - self.config_ips([1], is_v6=False) - self.config_ips([1], is_v6=True) - self.config_ips([0], is_add=0, is_v6=False) - self.config_ips([0], is_add=0, is_v6=True) - r = self.vapi.cnat_get_snat_addresses() - self.assertEqual(str(r.snat_ip4), self.make_addr( - self.pg0.sw_if_index, 1, False)) - self.assertEqual(str(r.snat_ip6), self.make_addr( - self.pg0.sw_if_index, 1, True)) - self.config_ips([1], is_add=0, is_v6=False) - self.config_ips([1], is_add=0, is_v6=True) - self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX) - - -if __name__ == '__main__': - unittest.main(testRunner=VppTestRunner) |