diff options
Diffstat (limited to 'test/test_cnat.py')
-rw-r--r-- | test/test_cnat.py | 975 |
1 files changed, 975 insertions, 0 deletions
diff --git a/test/test_cnat.py b/test/test_cnat.py new file mode 100644 index 00000000000..ff4c44033cb --- /dev/null +++ b/test/test_cnat.py @@ -0,0 +1,975 @@ +#!/usr/bin/env python3 + +import unittest + +from framework import VppTestCase, VppTestRunner +from vpp_ip import DpoProto, INVALID_INDEX +from itertools import product + +from scapy.packet import Raw +from scapy.layers.l2 import Ether +from scapy.layers.inet import IP, UDP, TCP, ICMP +from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror +from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach +from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply + +import struct + +from ipaddress import ip_address, ip_network, \ + IPv4Address, IPv6Address, IPv4Network, IPv6Network + +from vpp_object import VppObject +from vpp_papi import VppEnum + +N_PKTS = 15 + + +class Ep(object): + """ CNat endpoint """ + + def __init__(self, ip=None, port=0, l4p=TCP, + sw_if_index=INVALID_INDEX, is_v6=False): + self.ip = ip + if ip is None: + self.ip = "::" if is_v6 else "0.0.0.0" + self.port = port + self.l4p = l4p + self.sw_if_index = sw_if_index + if is_v6: + self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP6 + else: + self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP4 + + def encode(self): + return {'addr': self.ip, + 'port': self.port, + 'sw_if_index': self.sw_if_index, + 'if_af': self.if_af} + + @classmethod + def from_pg(cls, pg, is_v6=False): + if pg is None: + return cls(is_v6=is_v6) + else: + return cls(sw_if_index=pg.sw_if_index, is_v6=is_v6) + + @property + def isV6(self): + return ":" in self.ip + + def __str__(self): + return ("%s:%d" % (self.ip, self.port)) + + +class EpTuple(object): + """ CNat endpoint """ + + def __init__(self, src, dst): + self.src = src + self.dst = dst + + def encode(self): + return {'src_ep': self.src.encode(), + 'dst_ep': self.dst.encode()} + + def __str__(self): + return ("%s->%s" % (self.src, self.dst)) + + +class VppCNatTranslation(VppObject): + + def __init__(self, test, iproto, vip, paths): + self._test = test + self.vip = vip + self.iproto = iproto + self.paths = paths + self.encoded_paths = [] + for path in self.paths: + self.encoded_paths.append(path.encode()) + + def __str__(self): + return ("%s %s %s" % (self.vip, self.iproto, self.paths)) + + @property + def vl4_proto(self): + ip_proto = VppEnum.vl_api_ip_proto_t + return { + UDP: ip_proto.IP_API_PROTO_UDP, + TCP: ip_proto.IP_API_PROTO_TCP, + }[self.iproto] + + def add_vpp_config(self): + r = self._test.vapi.cnat_translation_update( + {'vip': self.vip.encode(), + 'ip_proto': self.vl4_proto, + 'n_paths': len(self.paths), + 'paths': self.encoded_paths}) + self._test.registry.register(self, self._test.logger) + self.id = r.id + + def modify_vpp_config(self, paths): + self.paths = paths + self.encoded_paths = [] + for path in self.paths: + self.encoded_paths.append(path.encode()) + + r = self._test.vapi.cnat_translation_update( + {'vip': self.vip.encode(), + 'ip_proto': self.vl4_proto, + 'n_paths': len(self.paths), + 'paths': self.encoded_paths}) + self._test.registry.register(self, self._test.logger) + + def remove_vpp_config(self): + self._test.vapi.cnat_translation_del(id=self.id) + + def query_vpp_config(self): + for t in self._test.vapi.cnat_translation_dump(): + if self.id == t.translation.id: + return t.translation + return None + + def object_id(self): + return ("cnat-translation-%s" % (self.vip)) + + def get_stats(self): + c = self._test.statistics.get_counter("/net/cnat-translation") + return c[0][self.id] + + +class TestCNatTranslation(VppTestCase): + """ CNat Translation """ + extra_vpp_punt_config = ["cnat", "{", + "session-db-buckets", "64", + "session-cleanup-timeout", "0.1", + "session-max-age", "1", + "tcp-max-age", "1", + "scanner", "off", "}"] + + @classmethod + def setUpClass(cls): + super(TestCNatTranslation, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestCNatTranslation, cls).tearDownClass() + + def setUp(self): + super(TestCNatTranslation, self).setUp() + + self.create_pg_interfaces(range(3)) + + for i in self.pg_interfaces: + i.admin_up() + i.config_ip4() + i.resolve_arp() + i.config_ip6() + i.resolve_ndp() + + def tearDown(self): + for i in self.pg_interfaces: + i.unconfig_ip4() + i.unconfig_ip6() + i.admin_down() + super(TestCNatTranslation, self).tearDown() + + def cnat_create_translation(self, vip, nbr): + ip_v = "ip6" if vip.isV6 else "ip4" + dep = Ep(getattr(self.pg1.remote_hosts[nbr], ip_v), 4000 + nbr) + sep = Ep("::", 0) if vip.isV6 else Ep("0.0.0.0", 0) + t1 = VppCNatTranslation( + self, vip.l4p, vip, + [EpTuple(sep, dep), EpTuple(sep, dep)]) + t1.add_vpp_config() + return t1 + + def cnat_test_translation(self, t1, nbr, sports, isV6=False): + ip_v = "ip6" if isV6 else "ip4" + ip_class = IPv6 if isV6 else IP + vip = t1.vip + + # + # Flows + # + for src in self.pg0.remote_hosts: + for sport in sports: + # from client to vip + p1 = (Ether(dst=self.pg0.local_mac, + src=src.mac) / + ip_class(src=getattr(src, ip_v), dst=vip.ip) / + vip.l4p(sport=sport, dport=vip.port) / + Raw()) + + self.vapi.cli("trace add pg-input 1") + rxs = self.send_and_expect(self.pg0, + p1 * N_PKTS, + self.pg1) + self.logger.info(self.vapi.cli("show trace max 1")) + + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual( + rx[ip_class].dst, + getattr(self.pg1.remote_hosts[nbr], ip_v)) + self.assertEqual(rx[vip.l4p].dport, 4000 + nbr) + self.assertEqual( + rx[ip_class].src, + getattr(src, ip_v)) + self.assertEqual(rx[vip.l4p].sport, sport) + + # from vip to client + p1 = (Ether(dst=self.pg1.local_mac, + src=self.pg1.remote_mac) / + ip_class(src=getattr( + self.pg1.remote_hosts[nbr], + ip_v), + dst=getattr(src, ip_v)) / + vip.l4p(sport=4000 + nbr, dport=sport) / + Raw()) + + rxs = self.send_and_expect(self.pg1, + p1 * N_PKTS, + self.pg0) + + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual( + rx[ip_class].dst, + getattr(src, ip_v)) + self.assertEqual(rx[vip.l4p].dport, sport) + self.assertEqual(rx[ip_class].src, vip.ip) + self.assertEqual(rx[vip.l4p].sport, vip.port) + + # + # packets to the VIP that do not match a + # translation are dropped + # + p1 = (Ether(dst=self.pg0.local_mac, + src=src.mac) / + ip_class(src=getattr(src, ip_v), dst=vip.ip) / + vip.l4p(sport=sport, dport=6666) / + Raw()) + + self.send_and_assert_no_replies(self.pg0, + p1 * N_PKTS, + self.pg1) + + # + # packets from the VIP that do not match a + # session are forwarded + # + p1 = (Ether(dst=self.pg1.local_mac, + src=self.pg1.remote_mac) / + ip_class(src=getattr( + self.pg1.remote_hosts[nbr], + ip_v), + dst=getattr(src, ip_v)) / + vip.l4p(sport=6666, dport=sport) / + Raw()) + + rxs = self.send_and_expect(self.pg1, + p1 * N_PKTS, + self.pg0) + + def cnat_test_translation_update(self, t1, sports, isV6=False): + ip_v = "ip6" if isV6 else "ip4" + ip_class = IPv6 if isV6 else IP + vip = t1.vip + + # + # modify the translation to use a different backend + # + dep = Ep(getattr(self.pg2, 'remote_' + ip_v), 5000) + sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0) + t1.modify_vpp_config([EpTuple(sep, dep)]) + + # + # existing flows follow the old path + # + for src in self.pg0.remote_hosts: + for sport in sports: + # from client to vip + p1 = (Ether(dst=self.pg0.local_mac, + src=src.mac) / + ip_class(src=getattr(src, ip_v), dst=vip.ip) / + vip.l4p(sport=sport, dport=vip.port) / + Raw()) + + rxs = self.send_and_expect(self.pg0, + p1 * N_PKTS, + self.pg1) + + # + # new flows go to the new backend + # + for src in self.pg0.remote_hosts: + p1 = (Ether(dst=self.pg0.local_mac, + src=src.mac) / + ip_class(src=getattr(src, ip_v), dst=vip.ip) / + vip.l4p(sport=9999, dport=vip.port) / + Raw()) + + rxs = self.send_and_expect(self.pg0, + p1 * N_PKTS, + self.pg2) + + def cnat_translation(self, vips, isV6=False): + """ CNat Translation """ + + ip_class = IPv6 if isV6 else IP + ip_v = "ip6" if isV6 else "ip4" + sports = [1234, 1233] + + # + # turn the scanner off whilst testing otherwise sessions + # will time out + # + self.vapi.cli("test cnat scanner off") + + sessions = self.vapi.cnat_session_dump() + + trs = [] + for nbr, vip in enumerate(vips): + trs.append(self.cnat_create_translation(vip, nbr)) + + self.logger.info(self.vapi.cli("sh cnat client")) + self.logger.info(self.vapi.cli("sh cnat translation")) + + # + # translations + # + for nbr, vip in enumerate(vips): + self.cnat_test_translation(trs[nbr], nbr, sports, isV6=isV6) + self.cnat_test_translation_update(trs[nbr], sports, isV6=isV6) + if isV6: + self.logger.info(self.vapi.cli( + "sh ip6 fib %s" % self.pg0.remote_ip6)) + else: + self.logger.info(self.vapi.cli( + "sh ip fib %s" % self.pg0.remote_ip4)) + self.logger.info(self.vapi.cli("sh cnat session verbose")) + + # + # turn the scanner back on and wait until the sessions + # all disapper + # + self.vapi.cli("test cnat scanner on") + + n_tries = 0 + sessions = self.vapi.cnat_session_dump() + while (len(sessions) and n_tries < 100): + n_tries += 1 + sessions = self.vapi.cnat_session_dump() + self.sleep(2) + self.logger.info(self.vapi.cli("show cnat session verbose")) + + self.assertTrue(n_tries < 100) + self.vapi.cli("test cnat scanner off") + + # + # load some flows again and purge + # + for vip in vips: + for src in self.pg0.remote_hosts: + for sport in sports: + # from client to vip + p1 = (Ether(dst=self.pg0.local_mac, + src=src.mac) / + ip_class(src=getattr(src, ip_v), dst=vip.ip) / + vip.l4p(sport=sport, dport=vip.port) / + Raw()) + self.send_and_expect(self.pg0, + p1 * N_PKTS, + self.pg2) + + for tr in trs: + tr.remove_vpp_config() + + self.assertTrue(self.vapi.cnat_session_dump()) + self.vapi.cnat_session_purge() + self.assertFalse(self.vapi.cnat_session_dump()) + + def test_icmp(self): + vips = [ + Ep("30.0.0.1", 5555), + Ep("30.0.0.2", 5554), + Ep("30.0.0.2", 5553, UDP), + Ep("30::1", 6666), + Ep("30::2", 5553, UDP), + ] + sport = 1234 + + self.pg0.generate_remote_hosts(len(vips)) + self.pg0.configure_ipv6_neighbors() + self.pg0.configure_ipv4_neighbors() + + self.pg1.generate_remote_hosts(len(vips)) + self.pg1.configure_ipv6_neighbors() + self.pg1.configure_ipv4_neighbors() + + self.vapi.cli("test cnat scanner off") + trs = [] + for nbr, vip in enumerate(vips): + trs.append(self.cnat_create_translation(vip, nbr)) + + self.logger.info(self.vapi.cli("sh cnat client")) + self.logger.info(self.vapi.cli("sh cnat translation")) + + for nbr, vip in enumerate(vips): + if vip.isV6: + client_addr = self.pg0.remote_hosts[0].ip6 + remote_addr = self.pg1.remote_hosts[nbr].ip6 + remote2_addr = self.pg2.remote_hosts[0].ip6 + else: + client_addr = self.pg0.remote_hosts[0].ip4 + remote_addr = self.pg1.remote_hosts[nbr].ip4 + remote2_addr = self.pg2.remote_hosts[0].ip4 + IP46 = IPv6 if vip.isV6 else IP + # from client to vip + p1 = (Ether(dst=self.pg0.local_mac, + src=self.pg0.remote_hosts[0].mac) / + IP46(src=client_addr, dst=vip.ip) / + vip.l4p(sport=sport, dport=vip.port) / + Raw()) + + rxs = self.send_and_expect(self.pg0, + p1 * N_PKTS, + self.pg1) + + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(rx[IP46].dst, remote_addr) + self.assertEqual(rx[vip.l4p].dport, 4000 + nbr) + self.assertEqual(rx[IP46].src, client_addr) + self.assertEqual(rx[vip.l4p].sport, sport) + + InnerIP = rxs[0][IP46] + + ICMP46 = ICMPv6DestUnreach if vip.isV6 else ICMP + ICMPelem = ICMPv6DestUnreach(code=1) if vip.isV6 else ICMP(type=11) + # from vip to client, ICMP error + p1 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / + IP46(src=remote_addr, dst=client_addr) / + ICMPelem / InnerIP) + + rxs = self.send_and_expect(self.pg1, + p1 * N_PKTS, + self.pg0) + + TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror + IP46error = IPerror6 if vip.isV6 else IPerror + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(rx[IP46].src, vip.ip) + self.assertEqual(rx[ICMP46][IP46error].src, client_addr) + self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip) + self.assertEqual(rx[ICMP46][IP46error] + [TCPUDPError].sport, sport) + self.assertEqual(rx[ICMP46][IP46error] + [TCPUDPError].dport, vip.port) + + # from other remote to client, ICMP error + # outside shouldn't be NAT-ed + p1 = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) / + IP46(src=remote2_addr, dst=client_addr) / + ICMPelem / InnerIP) + + rxs = self.send_and_expect(self.pg1, + p1 * N_PKTS, + self.pg0) + + TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror + IP46error = IPerror6 if vip.isV6 else IPerror + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(rx[IP46].src, remote2_addr) + self.assertEqual(rx[ICMP46][IP46error].src, client_addr) + self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip) + self.assertEqual(rx[ICMP46][IP46error] + [TCPUDPError].sport, sport) + self.assertEqual(rx[ICMP46][IP46error] + [TCPUDPError].dport, vip.port) + + self.vapi.cnat_session_purge() + + def test_cnat6(self): + # """ CNat Translation ipv6 """ + vips = [ + Ep("30::1", 5555), + Ep("30::2", 5554), + Ep("30::2", 5553, UDP), + ] + + self.pg0.generate_remote_hosts(len(vips)) + self.pg0.configure_ipv6_neighbors() + self.pg1.generate_remote_hosts(len(vips)) + self.pg1.configure_ipv6_neighbors() + + self.cnat_translation(vips, isV6=True) + + def test_cnat4(self): + # """ CNat Translation ipv4 """ + + vips = [ + Ep("30.0.0.1", 5555), + Ep("30.0.0.2", 5554), + Ep("30.0.0.2", 5553, UDP), + ] + + self.pg0.generate_remote_hosts(len(vips)) + self.pg0.configure_ipv4_neighbors() + self.pg1.generate_remote_hosts(len(vips)) + self.pg1.configure_ipv4_neighbors() + + self.cnat_translation(vips) + + +class TestCNatSourceNAT(VppTestCase): + """ CNat Source NAT """ + extra_vpp_punt_config = ["cnat", "{", + "session-cleanup-timeout", "0.1", + "session-max-age", "1", + "tcp-max-age", "1", + "scanner", "off", "}"] + + @classmethod + def setUpClass(cls): + super(TestCNatSourceNAT, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestCNatSourceNAT, cls).tearDownClass() + + def setUp(self): + super(TestCNatSourceNAT, self).setUp() + + self.create_pg_interfaces(range(3)) + + for i in self.pg_interfaces: + i.admin_up() + i.config_ip4() + i.resolve_arp() + i.config_ip6() + i.resolve_ndp() + + self.pg0.configure_ipv6_neighbors() + self.pg0.configure_ipv4_neighbors() + self.pg1.generate_remote_hosts(2) + self.pg1.configure_ipv4_neighbors() + self.pg1.configure_ipv6_neighbors() + + self.vapi.cnat_set_snat_addresses( + snat_ip4=self.pg2.remote_hosts[0].ip4, + snat_ip6=self.pg2.remote_hosts[0].ip6, + sw_if_index=INVALID_INDEX) + self.vapi.feature_enable_disable( + enable=1, + arc_name="ip6-unicast", + feature_name="cnat-snat-ip6", + sw_if_index=self.pg0.sw_if_index) + self.vapi.feature_enable_disable( + enable=1, + arc_name="ip4-unicast", + feature_name="cnat-snat-ip4", + sw_if_index=self.pg0.sw_if_index) + + policie_tbls = VppEnum.vl_api_cnat_snat_policy_table_t + self.vapi.cnat_set_snat_policy( + policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX) + for i in self.pg_interfaces: + self.vapi.cnat_snat_policy_add_del_if( + sw_if_index=i.sw_if_index, is_add=1, + table=policie_tbls.CNAT_POLICY_INCLUDE_V6) + self.vapi.cnat_snat_policy_add_del_if( + sw_if_index=i.sw_if_index, is_add=1, + table=policie_tbls.CNAT_POLICY_INCLUDE_V4) + + def tearDown(self): + self.vapi.cnat_session_purge() + for i in self.pg_interfaces: + i.unconfig_ip4() + i.unconfig_ip6() + i.admin_down() + super(TestCNatSourceNAT, self).tearDown() + + def test_snat_v6(self): + # """ CNat Source Nat v6 """ + self.sourcenat_test_tcp_udp_conf(TCP, isV6=True) + self.sourcenat_test_tcp_udp_conf(UDP, isV6=True) + self.sourcenat_test_icmp_err_conf(isV6=True) + self.sourcenat_test_icmp_echo6_conf() + + def test_snat_v4(self): + # """ CNat Source Nat v4 """ + self.sourcenat_test_tcp_udp_conf(TCP) + self.sourcenat_test_tcp_udp_conf(UDP) + self.sourcenat_test_icmp_err_conf() + self.sourcenat_test_icmp_echo4_conf() + + def sourcenat_test_icmp_echo6_conf(self): + sports = [1234, 1235] + dports = [6661, 6662] + + for nbr, remote_host in enumerate(self.pg1.remote_hosts): + client_addr = self.pg0.remote_hosts[0].ip6 + remote_addr = self.pg1.remote_hosts[nbr].ip6 + src_nat_addr = self.pg2.remote_hosts[0].ip6 + + # ping from pods to outside network + p1 = ( + Ether(dst=self.pg0.local_mac, + src=self.pg0.remote_hosts[0].mac) / + IPv6(src=client_addr, dst=remote_addr) / + ICMPv6EchoRequest(id=0xfeed) / + Raw()) + + rxs = self.send_and_expect( + self.pg0, + p1 * N_PKTS, + self.pg1) + + for rx in rxs: + self.assertEqual(rx[IPv6].src, src_nat_addr) + self.assert_packet_checksums_valid(rx) + + received_id = rx[0][ICMPv6EchoRequest].id + # ping reply from outside to pods + p2 = ( + Ether(dst=self.pg1.local_mac, + src=self.pg1.remote_hosts[nbr].mac) / + IPv6(src=remote_addr, dst=src_nat_addr) / + ICMPv6EchoReply(id=received_id)) + rxs = self.send_and_expect( + self.pg1, + p2 * N_PKTS, + self.pg0) + + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(rx[IPv6].src, remote_addr) + self.assertEqual(rx[ICMPv6EchoReply].id, 0xfeed) + + def sourcenat_test_icmp_echo4_conf(self): + sports = [1234, 1235] + dports = [6661, 6662] + + for nbr, remote_host in enumerate(self.pg1.remote_hosts): + IP46 = IP + client_addr = self.pg0.remote_hosts[0].ip4 + remote_addr = self.pg1.remote_hosts[nbr].ip4 + src_nat_addr = self.pg2.remote_hosts[0].ip4 + + # ping from pods to outside network + p1 = ( + Ether(dst=self.pg0.local_mac, + src=self.pg0.remote_hosts[0].mac) / + IP46(src=client_addr, dst=remote_addr) / + ICMP(type=8, id=0xfeed) / + Raw()) + + rxs = self.send_and_expect( + self.pg0, + p1 * N_PKTS, + self.pg1) + + for rx in rxs: + self.assertEqual(rx[IP46].src, src_nat_addr) + self.assert_packet_checksums_valid(rx) + + received_id = rx[0][ICMP].id + # ping reply from outside to pods + p2 = ( + Ether(dst=self.pg1.local_mac, + src=self.pg1.remote_hosts[nbr].mac) / + IP46(src=remote_addr, dst=src_nat_addr) / + ICMP(type=0, id=received_id)) + rxs = self.send_and_expect( + self.pg1, + p2 * N_PKTS, + self.pg0) + + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(rx[IP46].src, remote_addr) + self.assertEqual(rx[ICMP].id, 0xfeed) + + def sourcenat_test_icmp_err_conf(self, isV6=False): + sports = [1234, 1235] + dports = [6661, 6662] + + for nbr, remote_host in enumerate(self.pg1.remote_hosts): + if isV6: + IP46 = IPv6 + client_addr = self.pg0.remote_hosts[0].ip6 + remote_addr = self.pg1.remote_hosts[nbr].ip6 + src_nat_addr = self.pg2.remote_hosts[0].ip6 + ICMP46 = ICMPv6DestUnreach + ICMPelem = ICMPv6DestUnreach(code=1) + IP46error = IPerror6 + else: + IP46 = IP + client_addr = self.pg0.remote_hosts[0].ip4 + remote_addr = self.pg1.remote_hosts[nbr].ip4 + src_nat_addr = self.pg2.remote_hosts[0].ip4 + IP46error = IPerror + ICMP46 = ICMP + ICMPelem = ICMP(type=11) + + # from pods to outside network + p1 = ( + Ether(dst=self.pg0.local_mac, + src=self.pg0.remote_hosts[0].mac) / + IP46(src=client_addr, dst=remote_addr) / + TCP(sport=sports[nbr], dport=dports[nbr]) / + Raw()) + + rxs = self.send_and_expect( + self.pg0, + p1 * N_PKTS, + self.pg1) + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(rx[IP46].dst, remote_addr) + self.assertEqual(rx[TCP].dport, dports[nbr]) + self.assertEqual(rx[IP46].src, src_nat_addr) + sport = rx[TCP].sport + + InnerIP = rxs[0][IP46] + # from outside to pods, ICMP error + p2 = ( + Ether(dst=self.pg1.local_mac, + src=self.pg1.remote_hosts[nbr].mac) / + IP46(src=remote_addr, dst=src_nat_addr) / + ICMPelem / InnerIP) + + rxs = self.send_and_expect( + self.pg1, + p2 * N_PKTS, + self.pg0) + + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(rx[IP46].src, remote_addr) + self.assertEqual(rx[ICMP46][IP46error].src, client_addr) + self.assertEqual(rx[ICMP46][IP46error].dst, remote_addr) + self.assertEqual(rx[ICMP46][IP46error] + [TCPerror].sport, sports[nbr]) + self.assertEqual(rx[ICMP46][IP46error] + [TCPerror].dport, dports[nbr]) + + def sourcenat_test_tcp_udp_conf(self, l4p, isV6=False): + sports = [1234, 1235] + dports = [6661, 6662] + + for nbr, remote_host in enumerate(self.pg1.remote_hosts): + if isV6: + IP46 = IPv6 + client_addr = self.pg0.remote_hosts[0].ip6 + remote_addr = self.pg1.remote_hosts[nbr].ip6 + src_nat_addr = self.pg2.remote_hosts[0].ip6 + exclude_prefix = ip_network( + "%s/100" % remote_addr, strict=False) + else: + IP46 = IP + client_addr = self.pg0.remote_hosts[0].ip4 + remote_addr = self.pg1.remote_hosts[nbr].ip4 + src_nat_addr = self.pg2.remote_hosts[0].ip4 + exclude_prefix = ip_network( + "%s/16" % remote_addr, strict=False) + # from pods to outside network + p1 = ( + Ether(dst=self.pg0.local_mac, + src=self.pg0.remote_hosts[0].mac) / + IP46(src=client_addr, dst=remote_addr) / + l4p(sport=sports[nbr], dport=dports[nbr]) / + Raw()) + + self.vapi.cli("trace add pg-input 1") + rxs = self.send_and_expect( + self.pg0, + p1 * N_PKTS, + self.pg1) + self.logger.info(self.vapi.cli("show trace max 1")) + + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(rx[IP46].dst, remote_addr) + self.assertEqual(rx[l4p].dport, dports[nbr]) + self.assertEqual(rx[IP46].src, src_nat_addr) + sport = rx[l4p].sport + + # from outside to pods + p2 = ( + Ether(dst=self.pg1.local_mac, + src=self.pg1.remote_hosts[nbr].mac) / + IP46(src=remote_addr, dst=src_nat_addr) / + l4p(sport=dports[nbr], dport=sport) / + Raw()) + + rxs = self.send_and_expect( + self.pg1, + p2 * N_PKTS, + self.pg0) + + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(rx[IP46].dst, client_addr) + self.assertEqual(rx[l4p].dport, sports[nbr]) + self.assertEqual(rx[l4p].sport, dports[nbr]) + self.assertEqual(rx[IP46].src, remote_addr) + + # add remote host to exclude list + self.vapi.cnat_snat_policy_add_del_exclude_pfx( + prefix=exclude_prefix, is_add=1) + self.vapi.cnat_session_purge() + + rxs = self.send_and_expect( + self.pg0, + p1 * N_PKTS, + self.pg1) + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(rx[IP46].dst, remote_addr) + self.assertEqual(rx[l4p].dport, dports[nbr]) + self.assertEqual(rx[IP46].src, client_addr) + + # remove remote host from exclude list + self.vapi.cnat_snat_policy_add_del_exclude_pfx( + prefix=exclude_prefix, is_add=0) + self.vapi.cnat_session_purge() + + rxs = self.send_and_expect( + self.pg0, + p1 * N_PKTS, + self.pg1) + + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual(rx[IP46].dst, remote_addr) + self.assertEqual(rx[l4p].dport, dports[nbr]) + self.assertEqual(rx[IP46].src, src_nat_addr) + + self.vapi.cnat_session_purge() + + +class TestCNatDHCP(VppTestCase): + """ CNat Translation """ + extra_vpp_punt_config = ["cnat", "{", + "session-db-buckets", "64", + "session-cleanup-timeout", "0.1", + "session-max-age", "1", + "tcp-max-age", "1", + "scanner", "off", "}"] + + @classmethod + def setUpClass(cls): + super(TestCNatDHCP, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestCNatDHCP, cls).tearDownClass() + + def tearDown(self): + for i in self.pg_interfaces: + i.admin_down() + super(TestCNatDHCP, self).tearDown() + + def create_translation(self, vip_pg, *args, is_v6=False): + vip = Ep(sw_if_index=vip_pg.sw_if_index, is_v6=is_v6) + paths = [] + for (src_pg, dst_pg) in args: + paths.append(EpTuple( + Ep.from_pg(src_pg, is_v6=is_v6), + Ep.from_pg(dst_pg, is_v6=is_v6) + )) + t1 = VppCNatTranslation(self, TCP, vip, paths) + t1.add_vpp_config() + return t1 + + def make_addr(self, sw_if_index, i, is_v6): + if is_v6: + return "fd01:%x::%u" % (sw_if_index, i + 1) + else: + return "172.16.%u.%u" % (sw_if_index, i) + + def make_prefix(self, sw_if_index, i, is_v6): + if is_v6: + return "%s/128" % self.make_addr(sw_if_index, i, is_v6) + else: + return "%s/32" % self.make_addr(sw_if_index, i, is_v6) + + def check_resolved(self, tr, vip_pg, *args, i=0, is_v6=False): + qt1 = tr.query_vpp_config() + self.assertEqual(str(qt1.vip.addr), self.make_addr( + vip_pg.sw_if_index, i, is_v6)) + for (src_pg, dst_pg), path in zip(args, qt1.paths): + if src_pg: + self.assertEqual(str(path.src_ep.addr), self.make_addr( + src_pg.sw_if_index, i, is_v6)) + if dst_pg: + self.assertEqual(str(path.dst_ep.addr), self.make_addr( + dst_pg.sw_if_index, i, is_v6)) + + def config_ips(self, rng, is_add=1, is_v6=False): + for pg, i in product(self.pg_interfaces, rng): + self.vapi.sw_interface_add_del_address( + sw_if_index=pg.sw_if_index, + prefix=self.make_prefix(pg.sw_if_index, i, is_v6), + is_add=is_add) + + def test_dhcp_v4(self): + self.create_pg_interfaces(range(5)) + for i in self.pg_interfaces: + i.admin_up() + pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4)) + t1 = self.create_translation(*pglist) + self.config_ips([0]) + self.check_resolved(t1, *pglist) + self.config_ips([1]) + self.config_ips([0], is_add=0) + self.check_resolved(t1, *pglist, i=1) + self.config_ips([1], is_add=0) + t1.remove_vpp_config() + + def test_dhcp_v6(self): + self.create_pg_interfaces(range(5)) + for i in self.pg_interfaces: + i.admin_up() + pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4)) + t1 = self.create_translation(*pglist, is_v6=True) + self.config_ips([0], is_v6=True) + self.check_resolved(t1, *pglist, is_v6=True) + self.config_ips([1], is_v6=True) + self.config_ips([0], is_add=0, is_v6=True) + self.check_resolved(t1, *pglist, i=1, is_v6=True) + self.config_ips([1], is_add=0, is_v6=True) + t1.remove_vpp_config() + + def test_dhcp_snat(self): + self.create_pg_interfaces(range(1)) + for i in self.pg_interfaces: + i.admin_up() + self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index) + self.config_ips([0], is_v6=False) + self.config_ips([0], is_v6=True) + r = self.vapi.cnat_get_snat_addresses() + self.assertEqual(str(r.snat_ip4), self.make_addr( + self.pg0.sw_if_index, 0, False)) + self.assertEqual(str(r.snat_ip6), self.make_addr( + self.pg0.sw_if_index, 0, True)) + self.config_ips([1], is_v6=False) + self.config_ips([1], is_v6=True) + self.config_ips([0], is_add=0, is_v6=False) + self.config_ips([0], is_add=0, is_v6=True) + r = self.vapi.cnat_get_snat_addresses() + self.assertEqual(str(r.snat_ip4), self.make_addr( + self.pg0.sw_if_index, 1, False)) + self.assertEqual(str(r.snat_ip6), self.make_addr( + self.pg0.sw_if_index, 1, True)) + self.config_ips([1], is_add=0, is_v6=False) + self.config_ips([1], is_add=0, is_v6=True) + self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX) + + +if __name__ == '__main__': + unittest.main(testRunner=VppTestRunner) |