From af897c5e3fa76180fbe0634052bde98b4b3c34d7 Mon Sep 17 00:00:00 2001 From: Nathan Skrzypczak Date: Mon, 21 Sep 2020 19:14:08 +0200 Subject: cnat: Add DHCP support Type: feature Change-Id: I4bd50fd672ac35cf14ebda2b0b10ec0b9a208628 Signed-off-by: Nathan Skrzypczak --- src/plugins/cnat/test/test_cnat.py | 171 +++++++++++++++++++++++++++++++++---- 1 file changed, 153 insertions(+), 18 deletions(-) (limited to 'src/plugins/cnat/test/test_cnat.py') diff --git a/src/plugins/cnat/test/test_cnat.py b/src/plugins/cnat/test/test_cnat.py index 3f8d33cec4e..d46d047e558 100644 --- a/src/plugins/cnat/test/test_cnat.py +++ b/src/plugins/cnat/test/test_cnat.py @@ -3,7 +3,8 @@ import unittest from framework import VppTestCase, VppTestRunner -from vpp_ip import DpoProto +from vpp_ip import DpoProto, INVALID_INDEX +from itertools import product from scapy.packet import Raw from scapy.layers.l2 import Ether @@ -23,25 +24,34 @@ from vpp_papi import VppEnum N_PKTS = 15 -def find_cnat_translation(test, id): - ts = test.vapi.cnat_translation_dump() - for t in ts: - if id == t.translation.id: - return True - return False - - class Ep(object): """ CNat endpoint """ - def __init__(self, ip, port, l4p=TCP): + def __init__(self, ip=None, port=0, l4p=TCP, + sw_if_index=INVALID_INDEX, is_v6=False): self.ip = ip + if ip is None: + self.ip = "::" if is_v6 else "0.0.0.0" self.port = port self.l4p = l4p + self.sw_if_index = sw_if_index + if is_v6: + self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP6 + else: + self.if_af = VppEnum.vl_api_address_family_t.ADDRESS_IP4 def encode(self): return {'addr': self.ip, - 'port': self.port} + 'port': self.port, + 'sw_if_index': self.sw_if_index, + 'if_af': self.if_af} + + @classmethod + def from_pg(cls, pg, is_v6=False): + if pg is None: + return cls(is_v6=is_v6) + else: + return cls(sw_if_index=pg.sw_if_index, is_v6=is_v6) @property def isV6(self): @@ -77,6 +87,9 @@ class VppCNatTranslation(VppObject): for path in self.paths: self.encoded_paths.append(path.encode()) + def __str__(self): + return ("%s %s %s" % (self.vip, self.iproto, self.paths)) + @property def vl4_proto(self): ip_proto = VppEnum.vl_api_ip_proto_t @@ -85,9 +98,6 @@ class VppCNatTranslation(VppObject): TCP: ip_proto.IP_API_PROTO_TCP, }[self.iproto] - def delete(self): - r = self._test.vapi.cnat_translation_del(id=self.id) - def add_vpp_config(self): r = self._test.vapi.cnat_translation_update( {'vip': self.vip.encode(), @@ -111,10 +121,13 @@ class VppCNatTranslation(VppObject): self._test.registry.register(self, self._test.logger) def remove_vpp_config(self): - self._test.vapi.cnat_translation_del(self.id) + self._test.vapi.cnat_translation_del(id=self.id) def query_vpp_config(self): - return find_cnat_translation(self._test, self.id) + for t in self._test.vapi.cnat_translation_dump(): + if self.id == t.translation.id: + return t.translation + return None def object_id(self): return ("cnat-translation-%s" % (self.vip)) @@ -191,6 +204,7 @@ class TestCNatTranslation(VppTestCase): rxs = self.send_and_expect(self.pg0, p1 * N_PKTS, self.pg1) + self.logger.info(self.vapi.cli("show trace max 1")) for rx in rxs: self.assert_packet_checksums_valid(rx) @@ -352,7 +366,7 @@ class TestCNatTranslation(VppTestCase): n_tries += 1 sessions = self.vapi.cnat_session_dump() self.sleep(2) - print(self.vapi.cli("show cnat session verbose")) + self.logger.info(self.vapi.cli("show cnat session verbose")) self.assertTrue(n_tries < 100) self.vapi.cli("test cnat scanner off") @@ -374,7 +388,7 @@ class TestCNatTranslation(VppTestCase): self.pg2) for tr in trs: - tr.delete() + tr.remove_vpp_config() self.assertTrue(self.vapi.cnat_session_dump()) self.vapi.cnat_session_purge() @@ -762,10 +776,13 @@ class TestCNatSourceNAT(VppTestCase): l4p(sport=sports[nbr], dport=dports[nbr]) / Raw()) + self.vapi.cli("trace add pg-input 1") rxs = self.send_and_expect( self.pg0, p1 * N_PKTS, self.pg1) + self.logger.info(self.vapi.cli("show trace max 1")) + for rx in rxs: self.assert_packet_checksums_valid(rx) self.assertEqual(rx[IP46].dst, remote_addr) @@ -825,5 +842,123 @@ class TestCNatSourceNAT(VppTestCase): self.vapi.cnat_session_purge() +class TestCNatDHCP(VppTestCase): + """ CNat Translation """ + extra_vpp_punt_config = ["cnat", "{", + "session-db-buckets", "64", + "session-cleanup-timeout", "0.1", + "session-max-age", "1", + "tcp-max-age", "1", + "scanner", "off", "}"] + + @classmethod + def setUpClass(cls): + super(TestCNatDHCP, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestCNatDHCP, cls).tearDownClass() + + def tearDown(self): + for i in self.pg_interfaces: + i.admin_down() + super(TestCNatDHCP, self).tearDown() + + def create_translation(self, vip_pg, *args, is_v6=False): + vip = Ep(sw_if_index=vip_pg.sw_if_index, is_v6=is_v6) + paths = [] + for (src_pg, dst_pg) in args: + paths.append(EpTuple( + Ep.from_pg(src_pg, is_v6=is_v6), + Ep.from_pg(dst_pg, is_v6=is_v6) + )) + t1 = VppCNatTranslation(self, TCP, vip, paths) + t1.add_vpp_config() + return t1 + + def make_addr(self, sw_if_index, i, is_v6): + if is_v6: + return "fd01:%x::%u" % (sw_if_index, i + 1) + else: + return "172.16.%u.%u" % (sw_if_index, i) + + def make_prefix(self, sw_if_index, i, is_v6): + if is_v6: + return "%s/128" % self.make_addr(sw_if_index, i, is_v6) + else: + return "%s/32" % self.make_addr(sw_if_index, i, is_v6) + + def check_resolved(self, tr, vip_pg, *args, i=0, is_v6=False): + qt1 = tr.query_vpp_config() + self.assertEqual(str(qt1.vip.addr), self.make_addr( + vip_pg.sw_if_index, i, is_v6)) + for (src_pg, dst_pg), path in zip(args, qt1.paths): + if src_pg: + self.assertEqual(str(path.src_ep.addr), self.make_addr( + src_pg.sw_if_index, i, is_v6)) + if dst_pg: + self.assertEqual(str(path.dst_ep.addr), self.make_addr( + dst_pg.sw_if_index, i, is_v6)) + + def config_ips(self, rng, is_add=1, is_v6=False): + for pg, i in product(self.pg_interfaces, rng): + self.vapi.sw_interface_add_del_address( + sw_if_index=pg.sw_if_index, + prefix=self.make_prefix(pg.sw_if_index, i, is_v6), + is_add=is_add) + + def test_dhcp_v4(self): + self.create_pg_interfaces(range(5)) + for i in self.pg_interfaces: + i.admin_up() + pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4)) + t1 = self.create_translation(*pglist) + self.config_ips([0]) + self.check_resolved(t1, *pglist) + self.config_ips([1]) + self.config_ips([0], is_add=0) + self.check_resolved(t1, *pglist, i=1) + self.config_ips([1], is_add=0) + t1.remove_vpp_config() + + def test_dhcp_v6(self): + self.create_pg_interfaces(range(5)) + for i in self.pg_interfaces: + i.admin_up() + pglist = (self.pg0, (self.pg1, self.pg2), (self.pg1, self.pg4)) + t1 = self.create_translation(*pglist, is_v6=True) + self.config_ips([0], is_v6=True) + self.check_resolved(t1, *pglist, is_v6=True) + self.config_ips([1], is_v6=True) + self.config_ips([0], is_add=0, is_v6=True) + self.check_resolved(t1, *pglist, i=1, is_v6=True) + self.config_ips([1], is_add=0, is_v6=True) + t1.remove_vpp_config() + + def test_dhcp_snat(self): + self.create_pg_interfaces(range(1)) + for i in self.pg_interfaces: + i.admin_up() + self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index) + self.config_ips([0], is_v6=False) + self.config_ips([0], is_v6=True) + r = self.vapi.cnat_get_snat_addresses() + self.assertEqual(str(r.snat_ip4), self.make_addr( + self.pg0.sw_if_index, 0, False)) + self.assertEqual(str(r.snat_ip6), self.make_addr( + self.pg0.sw_if_index, 0, True)) + self.config_ips([1], is_v6=False) + self.config_ips([1], is_v6=True) + self.config_ips([0], is_add=0, is_v6=False) + self.config_ips([0], is_add=0, is_v6=True) + r = self.vapi.cnat_get_snat_addresses() + self.assertEqual(str(r.snat_ip4), self.make_addr( + self.pg0.sw_if_index, 1, False)) + self.assertEqual(str(r.snat_ip6), self.make_addr( + self.pg0.sw_if_index, 1, True)) + self.config_ips([1], is_add=0, is_v6=False) + self.config_ips([1], is_add=0, is_v6=True) + + if __name__ == '__main__': unittest.main(testRunner=VppTestRunner) -- cgit 1.2.3-korg