#!/usr/bin/env python3 import unittest from framework import VppTestCase, VppTestRunner from vpp_ip import DpoProto, INVALID_INDEX from itertools import product from scapy.packet import Raw from scapy.layers.l2 import Ether from scapy.layers.inet import IP, UDP, TCP, ICMP from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror from scapy.layers.inet6 import IPv6, IPerror6, ICMPv6DestUnreach from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply import struct from ipaddress import ( ip_address, ip_network, IPv4Address, IPv6Address, IPv4Network, IPv6Network, ) from vpp_object import VppObject from vpp_papi import VppEnum N_PKTS = 15 N_REMOTE_HOSTS = 3 SRC = 0 DST = 1 class CnatCommonTestCase(VppTestCase): """CNat common test class""" # # turn the scanner off whilst testing otherwise sessions # will time out # extra_vpp_punt_config = [ "cnat", "{", "session-db-buckets", "64", "session-cleanup-timeout", "0.1", "session-max-age", "1", "tcp-max-age", "1", "scanner", "off", "}", ] @classmethod def setUpClass(cls): super(CnatCommonTestCase, cls).setUpClass() @classmethod def tearDownClass(cls): super(CnatCommonTestCase, cls).tearDownClass() class Endpoint(object): """CNat endpoint""" def __init__(self, pg=None, pgi=None, port=0, is_v6=False, ip=None): self.port = port self.is_v6 = is_v6 self.sw_if_index = INVALID_INDEX if pg is not None and pgi is not None: # pg interface specified and remote index self.ip = self.get_ip46(pg.remote_hosts[pgi]) elif pg is not None: self.ip = None self.sw_if_index = pg.sw_if_index elif ip is not None: self.ip = ip else: self.ip = "::" if self.is_v6 else "0.0.0.0" def get_ip46(self, obj): if self.is_v6: return obj.ip6 return obj.ip4 def udpate(self, **kwargs): self.__init__(**kwargs) def _vpp_if_af(self): if self.is_v6: return VppEnum.vl_api_address_family_t.ADDRESS_IP6 return VppEnum.vl_api_address_family_t.ADDRESS_IP4 def encode(self): return { "addr": self.ip, "port": self.port, "sw_if_index": self.sw_if_index, "if_af": self._vpp_if_af(), } def __str__(self): return "%s:%d" % (self.ip, self.port) class Translation(VppObject): def __init__(self, test, iproto, vip, paths): self._test = test self.vip = vip self.iproto = iproto self.paths = paths self.id = None def __str__(self): return "%s %s %s" % (self.vip, self.iproto, self.paths) def _vl4_proto(self): ip_proto = VppEnum.vl_api_ip_proto_t return { UDP: ip_proto.IP_API_PROTO_UDP, TCP: ip_proto.IP_API_PROTO_TCP, }[self.iproto] def _encoded_paths(self): return [ {"src_ep": src.encode(), "dst_ep": dst.encode()} for (src, dst) in self.paths ] def add_vpp_config(self): r = self._test.vapi.cnat_translation_update( { "vip": self.vip.encode(), "ip_proto": self._vl4_proto(), "n_paths": len(self.paths), "paths": self._encoded_paths(), } ) self._test.registry.register(self, self._test.logger) self.id = r.id return self def remove_vpp_config(self): assert self.id is not None self._test.vapi.cnat_translation_del(id=self.id) return self def query_vpp_config(self): for t in self._test.vapi.cnat_translation_dump(): if self.id == t.translation.id: return t.translation return None class CnatTestContext(object): """ Usage : ctx = CnatTestContext(self, TCP, is_v6=True) # send pg0.remote[0]:1234 -> pg1.remote[0]:6661 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661) # We expect this to be NATed as # pg2.remote[0]: -> pg1.remote[0]:6661 ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661) # After running cnat_expect, we can send back the received packet # and expect it be 'unnated' so that we get the original packet ctx.cnat_send_return().cnat_expect_return() # same thing for ICMP errors ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return() """ def __init__(self, test, L4PROTO, is_v6): self.L4PROTO = L4PROTO self.is_v6 = is_v6 self._test = test def get_ip46(self, obj): if self.is_v6: return obj.ip6 return obj.ip4 @property def IP46(self): return IPv6 if self.is_v6 else IP def cnat_send( self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port, no_replies=False ): if isinstance(src_id, int): self.src_addr = self.get_ip46(src_pg.remote_hosts[src_id]) else: self.dst_addr = src_id if isinstance(dst_id, int): self.dst_addr = self.get_ip46(dst_pg.remote_hosts[dst_id]) else: self.dst_addr = dst_id self.src_port = src_port # also ICMP id self.dst_port = dst_port # also ICMP type if self.L4PROTO in [TCP, UDP]: l4 = self.L4PROTO(sport=self.src_port, dport=self.dst_port) elif self.L4PROTO in [ICMP] and not self.is_v6: l4 = self.L4PROTO(id=self.src_port, type=self.dst_port) elif self.L4PROTO in [ICMP] and self.is_v6: l4 = ICMPv6EchoRequest(id=self.src_port) p1 = ( Ether(src=src_pg.remote_mac, dst=src_pg.local_mac) / self.IP46(src=self.src_addr, dst=self.dst_addr) / l4 / Raw() ) if
# Copyright (c) 2018 Cisco and/or its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

##############################################################################
# Check for memfd_create headers and libs
##############################################################################
check_c_source_compiles("
  #define _GNU_SOURCE
  #include <sys/mman.h>
  int main() { return memfd_create (\"/dev/false\", 0); }
" HAVE_MEMFD_CREATE)

if (HAVE_MEMFD_CREATE)
    add_definitions(-DHAVE_MEMFD_CREATE)
endif()

check_c_source_compiles("
  #define _GNU_SOURCE
  #include <sched.h>
  int main() { return getcpu (0, 0); }
" HAVE_GETCPU)

if (HAVE_GETCPU)
    add_definitions(-DHAVE_GETCPU)
endif()

check_c_source_compiles("
  #define _GNU_SOURCE
  #include <fcntl.h>
  int main() { return fcntl64 (0, 0); }
" HAVE_FCNTL64)

if (HAVE_FCNTL64)
    add_definitions(-DHAVE_FCNTL64)
endif()
Translation( self, TCP, Endpoint(ip="30::2", port=5554, is_v6=True), [ ( Endpoint(is_v6=True), Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=True), ) ], ).add_vpp_config() ) self.translations.append( Translation( self, UDP, Endpoint(ip="30::2", port=5553, is_v6=True), [ ( Endpoint(is_v6=True), Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=True), ) ], ).add_vpp_config() ) def test_icmp4(self): # """ CNat Translation icmp v4 """ self._make_translations_v4() self._test_icmp() def test_icmp6(self): # """ CNat Translation icmp v6 """ self._make_translations_v6() self._test_icmp() def test_cnat6(self): # """ CNat Translation ipv6 """ self._make_translations_v6() self.cnat_translation() def test_cnat4(self): # """ CNat Translation ipv4 """ self._make_translations_v4() self.cnat_translation() class TestCNatSourceNAT(CnatCommonTestCase): """CNat Source NAT""" @classmethod def setUpClass(cls): super(TestCNatSourceNAT, cls).setUpClass() @classmethod def tearDownClass(cls): super(TestCNatSourceNAT, cls).tearDownClass() def _enable_disable_snat(self, is_enable=True): self.vapi.cnat_set_snat_addresses( snat_ip4=self.pg2.remote_hosts[0].ip4, snat_ip6=self.pg2.remote_hosts[0].ip6, sw_if_index=INVALID_INDEX, ) self.vapi.feature_enable_disable( enable=1 if is_enable else 0, arc_name="ip6-unicast", feature_name="cnat-snat-ip6", sw_if_index=self.pg0.sw_if_index, ) self.vapi.feature_enable_disable( enable=1 if is_enable else 0, arc_name="ip4-unicast", feature_name="cnat-snat-ip4", sw_if_index=self.pg0.sw_if_index, ) policie_tbls = VppEnum.vl_api_cnat_snat_policy_table_t self.vapi.cnat_set_snat_policy( policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX ) for i in self.pg_interfaces: self.vapi.cnat_snat_policy_add_del_if( sw_if_index=i.sw_if_index, is_add=1 if is_enable else 0, table=policie_tbls.CNAT_POLICY_INCLUDE_V6, ) self.vapi.cnat_snat_policy_add_del_if( sw_if_index=i.sw_if_index, is_add=1 if is_enable else 0, table=policie_tbls.CNAT_POLICY_INCLUDE_V4, ) def setUp(self): super(TestCNatSourceNAT, self).setUp() self.create_pg_interfaces(range(3)) self.pg1.generate_remote_hosts(2) for i in self.pg_interfaces: i.admin_up() i.config_ip4() i.resolve_arp() i.config_ip6() i.resolve_ndp() i.configure_ipv6_neighbors() i.configure_ipv4_neighbors() self._enable_disable_snat(is_enable=True) def tearDown(self): self._enable_disable_snat(is_enable=True) self.vapi.cnat_session_purge() for i in self.pg_interfaces: i.unconfig_ip4() i.unconfig_ip6() i.admin_down() super(TestCNatSourceNAT, self).tearDown() def test_snat_v6(self): # """ CNat Source Nat v6 """ self.sourcenat_test_tcp_udp_conf(TCP, is_v6=True) self.sourcenat_test_tcp_udp_conf(UDP, is_v6=True) self.sourcenat_test_icmp_echo_conf(is_v6=True) def test_snat_v4(self): # """ CNat Source Nat v4 """ self.sourcenat_test_tcp_udp_conf(TCP) self.sourcenat_test_tcp_udp_conf(UDP) self.sourcenat_test_icmp_echo_conf() def sourcenat_test_icmp_echo_conf(self, is_v6=False): ctx = CnatTestContext(self, ICMP, is_v6=is_v6) # 8 is ICMP type echo (v4 only) ctx.cnat_send(self.pg0, 0, 0xFEED, self.pg1, 0, 8) ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 8) ctx.cnat_send_return().cnat_expect_return() def sourcenat_test_tcp_udp_conf(self, L4PROTO, is_v6=False): ctx = CnatTestContext(self, L4PROTO, is_v6) # we should source NAT ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661) ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661) ctx.cnat_send_return().cnat_expect_return() # exclude dst address of pg1.1 from snat if is_v6: exclude_prefix = ip_network( "%s/100" % self.pg1.remote_hosts[1].ip6, strict=False ) else: exclude_prefix = ip_network( "%s/16" % self.pg1.remote_hosts[1].ip4, strict=False ) # add remote host to exclude list self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=1) # We should not source NAT the id=1 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661) ctx.cnat_expect(self.pg0, 0, 1234, self.pg1, 1, 6661) ctx.cnat_send_return().cnat_expect_return() # But we should source NAT the id=0 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 0, 6661) ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 6661) ctx.cnat_send_return().cnat_expect_return() # remove remote host from exclude list self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=0) self.vapi.cnat_session_purge() # We should source NAT again ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661) ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661) ctx.cnat_send_return().cnat_expect_return() # test return ICMP error nating ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661) ctx.cnat_expect(self.pg2, 0, None, self.pg1, 1, 6661) ctx.cnat_send_icmp_return_error().cnat_expect_icmp_error_return() self.vapi.cnat_session_purge() class TestCNatDHCP(CnatCommonTestCase): """CNat Translation""" @classmethod def setUpClass(cls): super(TestCNatDHCP, cls).setUpClass() @classmethod def tearDownClass(cls): super(TestCNatDHCP, cls).tearDownClass() def tearDown(self): for i in self.pg_interfaces: i.admin_down() super(TestCNatDHCP, self).tearDown() def make_addr(self, sw_if_index, addr_id, is_v6): if is_v6: return "fd01:%x::%u" % (sw_if_index, addr_id + 1) return "172.16.%u.%u" % (sw_if_index, addr_id) def make_prefix(self, sw_if_index, addr_id, is_v6): if is_v6: return "%s/128" % self.make_addr(sw_if_index, addr_id, is_v6) return "%s/32" % self.make_addr(sw_if_index, addr_id, is_v6) def check_resolved(self, tr, addr_id, is_v6=False): qt = tr.query_vpp_config() self.assertEqual( str(qt.vip.addr), self.make_addr(tr.vip.sw_if_index, addr_id, is_v6) ) self.assertEqual(len(qt.paths), len(tr.paths)) for path_tr, path_qt in zip(tr.paths, qt.paths): src_qt = path_qt.src_ep dst_qt = path_qt.dst_ep src_tr, dst_tr = path_tr self.assertEqual( str(src_qt.addr), self.make_addr(src_tr.sw_if_index, addr_id, is_v6) ) self.assertEqual( str(dst_qt.addr), self.make_addr(dst_tr.sw_if_index, addr_id, is_v6) ) def add_del_address(self, pg, addr_id, is_add=True, is_v6=False): self.vapi.sw_interface_add_del_address( sw_if_index=pg.sw_if_index, prefix=self.make_prefix(pg.sw_if_index, addr_id, is_v6), is_add=1 if is_add else 0, ) def _test_dhcp_v46(self, is_v6): self.create_pg_interfaces(range(4)) for i in self.pg_interfaces: i.admin_up() paths = [ (Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg2, is_v6=is_v6)), (Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg3, is_v6=is_v6)), ] ep = Endpoint(pg=self.pg0, is_v6=is_v6) t = Translation(self, TCP, ep, paths).add_vpp_config() # Add an address on every interface # and check it is reflected in the cnat config for pg in self.pg_interfaces: self.add_del_address(pg, addr_id=0, is_add=True, is_v6=is_v6) self.check_resolved(t, addr_id=0, is_v6=is_v6) # Add a new address on every interface, remove the old one # and check it is reflected in the cnat config for pg in self.pg_interfaces: self.add_del_address(pg, addr_id=1, is_add=True, is_v6=is_v6) self.add_del_address(pg, addr_id=0, is_add=False, is_v6=is_v6) self.check_resolved(t, addr_id=1, is_v6=is_v6) # remove the configuration for pg in self.pg_interfaces: self.add_del_address(pg, addr_id=1, is_add=False, is_v6=is_v6) t.remove_vpp_config() def test_dhcp_v4(self): self._test_dhcp_v46(False) def test_dhcp_v6(self): self._test_dhcp_v46(True) def test_dhcp_snat(self): self.create_pg_interfaces(range(1)) for i in self.pg_interfaces: i.admin_up() self.vapi.cnat_set_snat_addresses(sw_if_index=self.pg0.sw_if_index) # Add an address on every interface # and check it is reflected in the cnat config for pg in self.pg_interfaces: self.add_del_address(pg, addr_id=0, is_add=True, is_v6=False) self.add_del_address(pg, addr_id=0, is_add=True, is_v6=True) r = self.vapi.cnat_get_snat_addresses() self.assertEqual( str(r.snat_ip4), self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=False), ) self.assertEqual( str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=True) ) # Add a new address on every interface, remove the old one # and check it is reflected in the cnat config for pg in self.pg_interfaces: self.add_del_address(pg, addr_id=1, is_add=True, is_v6=False) self.add_del_address(pg, addr_id=1, is_add=True, is_v6=True) self.add_del_address(pg, addr_id=0, is_add=False, is_v6=False) self.add_del_address(pg, addr_id=0, is_add=False, is_v6=True) r = self.vapi.cnat_get_snat_addresses() self.assertEqual( str(r.snat_ip4), self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=False), ) self.assertEqual( str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=True) ) # remove the configuration for pg in self.pg_interfaces: self.add_del_address(pg, addr_id=1, is_add=False, is_v6=False) self.add_del_address(pg, addr_id=1, is_add=False, is_v6=True) self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX) if __name__ == "__main__": unittest.main(testRunner=VppTestRunner)