diff options
Diffstat (limited to 'src/plugins/cnat/test/test_cnat.py')
-rw-r--r-- | src/plugins/cnat/test/test_cnat.py | 596 |
1 files changed, 596 insertions, 0 deletions
diff --git a/src/plugins/cnat/test/test_cnat.py b/src/plugins/cnat/test/test_cnat.py new file mode 100644 index 00000000000..18e3baadbed --- /dev/null +++ b/src/plugins/cnat/test/test_cnat.py @@ -0,0 +1,596 @@ +#!/usr/bin/env python3 + +import unittest + +from framework import VppTestCase, VppTestRunner +from vpp_ip import DpoProto + +from scapy.packet import Raw +from scapy.layers.l2 import Ether +from scapy.layers.inet import IP, UDP, TCP +from scapy.layers.inet6 import IPv6 + +from ipaddress import ip_address, ip_network, \ + IPv4Address, IPv6Address, IPv4Network, IPv6Network + +from vpp_object import VppObject +from vpp_papi import VppEnum + +N_PKTS = 15 + + +def find_cnat_translation(test, id): + ts = test.vapi.cnat_translation_dump() + for t in ts: + if id == t.translation.id: + return True + return False + + +class Ep(object): + """ CNat endpoint """ + + def __init__(self, ip, port, l4p=TCP): + self.ip = ip + self.port = port + self.l4p = l4p + + def encode(self): + return {'addr': self.ip, + 'port': self.port} + + def __str__(self): + return ("%s:%d" % (self.ip, self.port)) + + +class EpTuple(object): + """ CNat endpoint """ + + def __init__(self, src, dst): + self.src = src + self.dst = dst + + def encode(self): + return {'src_ep': self.src.encode(), + 'dst_ep': self.dst.encode()} + + def __str__(self): + return ("%s->%s" % (self.src, self.dst)) + + +class VppCNatTranslation(VppObject): + + def __init__(self, test, iproto, vip, paths): + self._test = test + self.vip = vip + self.iproto = iproto + self.paths = paths + self.encoded_paths = [] + for path in self.paths: + self.encoded_paths.append(path.encode()) + + @property + def vl4_proto(self): + ip_proto = VppEnum.vl_api_ip_proto_t + return { + UDP: ip_proto.IP_API_PROTO_UDP, + TCP: ip_proto.IP_API_PROTO_TCP, + }[self.iproto] + + def delete(self): + r = self._test.vapi.cnat_translation_del(id=self.id) + + def add_vpp_config(self): + r = self._test.vapi.cnat_translation_update( + {'vip': self.vip.encode(), + 'ip_proto': self.vl4_proto, + 'n_paths': len(self.paths), + 'paths': self.encoded_paths}) + self._test.registry.register(self, self._test.logger) + self.id = r.id + + def modify_vpp_config(self, paths): + self.paths = paths + self.encoded_paths = [] + for path in self.paths: + self.encoded_paths.append(path.encode()) + + r = self._test.vapi.cnat_translation_update( + {'vip': self.vip.encode(), + 'ip_proto': self.vl4_proto, + 'n_paths': len(self.paths), + 'paths': self.encoded_paths}) + self._test.registry.register(self, self._test.logger) + + def remove_vpp_config(self): + self._test.vapi.cnat_translation_del(self.id) + + def query_vpp_config(self): + return find_cnat_translation(self._test, self.id) + + def object_id(self): + return ("cnat-translation-%s" % (self.vip)) + + def get_stats(self): + c = self._test.statistics.get_counter("/net/cnat-translation") + return c[0][self.id] + + +class VppCNATSourceNat(VppObject): + + def __init__(self, test, address, exclude_subnets=[]): + self._test = test + self.address = address + self.exclude_subnets = exclude_subnets + + def add_vpp_config(self): + a = ip_address(self.address) + if 4 == a.version: + self._test.vapi.cnat_set_snat_addresses(snat_ip4=self.address) + else: + self._test.vapi.cnat_set_snat_addresses(snat_ip6=self.address) + for subnet in self.exclude_subnets: + self.cnat_exclude_subnet(subnet, True) + + def cnat_exclude_subnet(self, exclude_subnet, isAdd=True): + add = 1 if isAdd else 0 + self._test.vapi.cnat_add_del_snat_prefix( + prefix=exclude_subnet, is_add=add) + + def query_vpp_config(self): + return False + + def remove_vpp_config(self): + return False + + +class TestCNatTranslation(VppTestCase): + """ CNat Translation """ + extra_vpp_punt_config = ["cnat", "{", + "session-max-age", "1", + "tcp-max-age", "1", "}"] + + @classmethod + def setUpClass(cls): + super(TestCNatTranslation, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestCNatTranslation, cls).tearDownClass() + + def setUp(self): + super(TestCNatTranslation, self).setUp() + + self.create_pg_interfaces(range(3)) + + for i in self.pg_interfaces: + i.admin_up() + i.config_ip4() + i.resolve_arp() + i.config_ip6() + i.resolve_ndp() + + def tearDown(self): + for i in self.pg_interfaces: + i.unconfig_ip4() + i.unconfig_ip6() + i.admin_down() + super(TestCNatTranslation, self).tearDown() + + def cnat_create_translation(self, vip, nbr, isV6=False): + ip_v = "ip6" if isV6 else "ip4" + dep = Ep(getattr(self.pg1.remote_hosts[nbr], ip_v), 4000 + nbr) + sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0) + t1 = VppCNatTranslation( + self, vip.l4p, vip, + [EpTuple(sep, dep), EpTuple(sep, dep)]) + t1.add_vpp_config() + return t1 + + def cnat_test_translation(self, t1, nbr, sports, isV6=False): + ip_v = "ip6" if isV6 else "ip4" + ip_class = IPv6 if isV6 else IP + vip = t1.vip + + # + # Flows + # + for src in self.pg0.remote_hosts: + for sport in sports: + # from client to vip + p1 = (Ether(dst=self.pg0.local_mac, + src=src.mac) / + ip_class(src=getattr(src, ip_v), dst=vip.ip) / + vip.l4p(sport=sport, dport=vip.port) / + Raw()) + + self.vapi.cli("trace add pg-input 1") + rxs = self.send_and_expect(self.pg0, + p1 * N_PKTS, + self.pg1) + + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual( + rx[ip_class].dst, + getattr(self.pg1.remote_hosts[nbr], ip_v)) + self.assertEqual(rx[vip.l4p].dport, 4000 + nbr) + self.assertEqual( + rx[ip_class].src, + getattr(src, ip_v)) + self.assertEqual(rx[vip.l4p].sport, sport) + + # from vip to client + p1 = (Ether(dst=self.pg1.local_mac, + src=self.pg1.remote_mac) / + ip_class(src=getattr( + self.pg1.remote_hosts[nbr], + ip_v), + dst=getattr(src, ip_v)) / + vip.l4p(sport=4000 + nbr, dport=sport) / + Raw()) + + rxs = self.send_and_expect(self.pg1, + p1 * N_PKTS, + self.pg0) + + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual( + rx[ip_class].dst, + getattr(src, ip_v)) + self.assertEqual(rx[vip.l4p].dport, sport) + self.assertEqual(rx[ip_class].src, vip.ip) + self.assertEqual(rx[vip.l4p].sport, vip.port) + + # + # packets to the VIP that do not match a + # translation are dropped + # + p1 = (Ether(dst=self.pg0.local_mac, + src=src.mac) / + ip_class(src=getattr(src, ip_v), dst=vip.ip) / + vip.l4p(sport=sport, dport=6666) / + Raw()) + + self.send_and_assert_no_replies(self.pg0, + p1 * N_PKTS, + self.pg1) + + # + # packets from the VIP that do not match a + # session are forwarded + # + p1 = (Ether(dst=self.pg1.local_mac, + src=self.pg1.remote_mac) / + ip_class(src=getattr( + self.pg1.remote_hosts[nbr], + ip_v), + dst=getattr(src, ip_v)) / + vip.l4p(sport=6666, dport=sport) / + Raw()) + + rxs = self.send_and_expect(self.pg1, + p1 * N_PKTS, + self.pg0) + + self.assertEqual(t1.get_stats()['packets'], + N_PKTS * + len(sports) * + len(self.pg0.remote_hosts)) + + def cnat_test_translation_update(self, t1, sports, isV6=False): + ip_v = "ip6" if isV6 else "ip4" + ip_class = IPv6 if isV6 else IP + vip = t1.vip + + # + # modify the translation to use a different backend + # + dep = Ep(getattr(self.pg2, 'remote_' + ip_v), 5000) + sep = Ep("::", 0) if isV6 else Ep("0.0.0.0", 0) + t1.modify_vpp_config([EpTuple(sep, dep)]) + + # + # existing flows follow the old path + # + for src in self.pg0.remote_hosts: + for sport in sports: + # from client to vip + p1 = (Ether(dst=self.pg0.local_mac, + src=src.mac) / + ip_class(src=getattr(src, ip_v), dst=vip.ip) / + vip.l4p(sport=sport, dport=vip.port) / + Raw()) + + rxs = self.send_and_expect(self.pg0, + p1 * N_PKTS, + self.pg1) + + # + # new flows go to the new backend + # + for src in self.pg0.remote_hosts: + p1 = (Ether(dst=self.pg0.local_mac, + src=src.mac) / + ip_class(src=getattr(src, ip_v), dst=vip.ip) / + vip.l4p(sport=9999, dport=vip.port) / + Raw()) + + rxs = self.send_and_expect(self.pg0, + p1 * N_PKTS, + self.pg2) + + def cnat_translation(self, vips, isV6=False): + """ CNat Translation """ + + ip_class = IPv6 if isV6 else IP + ip_v = "ip6" if isV6 else "ip4" + sports = [1234, 1233] + + # + # turn the scanner off whilst testing otherwise sessions + # will time out + # + self.vapi.cli("test cnat scanner off") + + sessions = self.vapi.cnat_session_dump() + + trs = [] + for nbr, vip in enumerate(vips): + trs.append(self.cnat_create_translation(vip, nbr, isV6=isV6)) + + self.logger.info(self.vapi.cli("sh cnat client")) + self.logger.info(self.vapi.cli("sh cnat translation")) + + # + # translations + # + for nbr, vip in enumerate(vips): + self.cnat_test_translation(trs[nbr], nbr, sports, isV6=isV6) + self.cnat_test_translation_update(trs[nbr], sports, isV6=isV6) + if isV6: + self.logger.info(self.vapi.cli( + "sh ip6 fib %s" % self.pg0.remote_ip6)) + else: + self.logger.info(self.vapi.cli( + "sh ip fib %s" % self.pg0.remote_ip4)) + self.logger.info(self.vapi.cli("sh cnat session verbose")) + + # + # turn the scanner back on and wait untill the sessions + # all disapper + # + self.vapi.cli("test cnat scanner on") + + n_tries = 0 + sessions = self.vapi.cnat_session_dump() + while (len(sessions) and n_tries < 100): + n_tries += 1 + sessions = self.vapi.cnat_session_dump() + self.sleep(2) + + self.assertTrue(n_tries < 100) + + # + # load some flows again and purge + # + for vip in vips: + for src in self.pg0.remote_hosts: + for sport in sports: + # from client to vip + p1 = (Ether(dst=self.pg0.local_mac, + src=src.mac) / + ip_class(src=getattr(src, ip_v), dst=vip.ip) / + vip.l4p(sport=sport, dport=vip.port) / + Raw()) + self.send_and_expect(self.pg0, + p1 * N_PKTS, + self.pg2) + + for tr in trs: + tr.delete() + + self.assertTrue(self.vapi.cnat_session_dump()) + self.vapi.cnat_session_purge() + self.assertFalse(self.vapi.cnat_session_dump()) + + def test_cnat6(self): + # """ CNat Translation ipv6 """ + vips = [ + Ep("30::1", 5555), + Ep("30::2", 5554), + Ep("30::2", 5553, UDP), + ] + + self.pg0.generate_remote_hosts(len(vips)) + self.pg0.configure_ipv6_neighbors() + self.pg1.generate_remote_hosts(len(vips)) + self.pg1.configure_ipv6_neighbors() + + self.cnat_translation(vips, isV6=True) + + def test_cnat4(self): + # """ CNat Translation ipv4 """ + + vips = [ + Ep("30.0.0.1", 5555), + Ep("30.0.0.2", 5554), + Ep("30.0.0.2", 5553, UDP), + ] + + self.pg0.generate_remote_hosts(len(vips)) + self.pg0.configure_ipv4_neighbors() + self.pg1.generate_remote_hosts(len(vips)) + self.pg1.configure_ipv4_neighbors() + + self.cnat_translation(vips) + + +class TestCNatSourceNAT(VppTestCase): + """ CNat Source NAT """ + extra_vpp_punt_config = ["cnat", "{", + "session-max-age", "1", + "tcp-max-age", "1", "}"] + + @classmethod + def setUpClass(cls): + super(TestCNatSourceNAT, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestCNatSourceNAT, cls).tearDownClass() + + def setUp(self): + super(TestCNatSourceNAT, self).setUp() + + self.create_pg_interfaces(range(3)) + + for i in self.pg_interfaces: + i.admin_up() + i.config_ip4() + i.resolve_arp() + i.config_ip6() + i.resolve_ndp() + + def tearDown(self): + for i in self.pg_interfaces: + i.unconfig_ip4() + i.unconfig_ip6() + i.admin_down() + super(TestCNatSourceNAT, self).tearDown() + + def cnat_create_translation(self, srcNatAddr, interface, isV6=False): + t1 = VppCNATSourceNat(self, srcNatAddr) + t1.add_vpp_config() + cnat_arc_name = "ip6-unicast" if isV6 else "ip4-unicast" + cnat_feature_name = "ip6-cnat-snat" if isV6 else "ip4-cnat-snat" + self.vapi.feature_enable_disable( + enable=1, + arc_name=cnat_arc_name, + feature_name=cnat_feature_name, + sw_if_index=interface.sw_if_index) + + return t1 + + def cnat_test_sourcenat(self, srcNatAddr, l4p=TCP, isV6=False): + ip_v = "ip6" if isV6 else "ip4" + ip_class = IPv6 if isV6 else IP + sports = [1234, 1235, 1236] + dports = [6661, 6662, 6663] + + self.pg0.generate_remote_hosts(1) + self.pg0.configure_ipv4_neighbors() + self.pg0.configure_ipv6_neighbors() + self.pg1.generate_remote_hosts(len(sports)) + self.pg1.configure_ipv4_neighbors() + self.pg1.configure_ipv6_neighbors() + + self.vapi.cli("test cnat scanner on") + t1 = self.cnat_create_translation(srcNatAddr, self.pg0) + + for nbr, remote_host in enumerate(self.pg1.remote_hosts): + # from pods to outside network + p1 = ( + Ether( + dst=self.pg0.local_mac, + src=self.pg0.remote_hosts[0].mac) / + ip_class( + src=getattr(self.pg0.remote_hosts[0], ip_v), + dst=getattr(remote_host, ip_v)) / + l4p(sport=sports[nbr], dport=dports[nbr]) / + Raw()) + + rxs = self.send_and_expect( + self.pg0, + p1 * N_PKTS, + self.pg1) + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual( + rx[ip_class].dst, + getattr(remote_host, ip_v)) + self.assertEqual(rx[l4p].dport, dports[nbr]) + self.assertEqual( + rx[ip_class].src, + srcNatAddr) + sport = rx[l4p].sport + + # from outside to pods + p2 = ( + Ether( + dst=self.pg1.local_mac, + src=self.pg1.remote_hosts[nbr].mac) / + ip_class(src=getattr(remote_host, ip_v), dst=srcNatAddr) / + l4p(sport=dports[nbr], dport=sport) / + Raw()) + + rxs = self.send_and_expect( + self.pg1, + p2 * N_PKTS, + self.pg0) + + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual( + rx[ip_class].dst, + getattr(self.pg0.remote_hosts[0], ip_v)) + self.assertEqual(rx[l4p].dport, sports[nbr]) + self.assertEqual(rx[l4p].sport, dports[nbr]) + self.assertEqual( + rx[ip_class].src, + getattr(remote_host, ip_v)) + + # add remote host to exclude list + subnet_mask = 100 if isV6 else 16 + subnet = getattr(remote_host, ip_v) + "/" + str(subnet_mask) + exclude_subnet = ip_network(subnet, strict=False) + + t1.cnat_exclude_subnet(exclude_subnet) + self.vapi.cnat_session_purge() + + rxs = self.send_and_expect( + self.pg0, + p1 * N_PKTS, + self.pg1) + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual( + rx[ip_class].dst, + getattr(remote_host, ip_v)) + self.assertEqual(rx[l4p].dport, dports[nbr]) + self.assertEqual( + rx[ip_class].src, + getattr(self.pg0.remote_hosts[0], ip_v)) + + # remove remote host from exclude list + t1.cnat_exclude_subnet(exclude_subnet, isAdd=False) + self.vapi.cnat_session_purge() + + rxs = self.send_and_expect( + self.pg0, + p1 * N_PKTS, + self.pg1) + + for rx in rxs: + self.assert_packet_checksums_valid(rx) + self.assertEqual( + rx[ip_class].dst, + getattr(remote_host, ip_v)) + self.assertEqual(rx[l4p].dport, dports[nbr]) + self.assertEqual( + rx[ip_class].src, + srcNatAddr) + + # def test_cnat6_sourcenat(self): + # # """ CNat Source Nat ipv6 """ + # self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, TCP, True) + # self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip6, UDP, True) + + def test_cnat4_sourcenat(self): + # """ CNat Source Nat ipv4 """ + self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, TCP) + self.cnat_test_sourcenat(self.pg2.remote_hosts[0].ip4, UDP) + +if __name__ == '__main__': + unittest.main(testRunner=VppTestRunner) |