#!/usr/bin/env python import socket import unittest import struct from framework import VppTestCase, VppTestRunner from scapy.layers.inet import IP, TCP, UDP, ICMP from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror from scapy.layers.l2 import Ether, ARP from scapy.data import IP_PROTOS from util import ppp from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder class TestSNAT(VppTestCase): """ SNAT Test Cases """ @classmethod def setUpClass(cls): super(TestSNAT, cls).setUpClass() try: cls.tcp_port_in = 6303 cls.tcp_port_out = 6303 cls.udp_port_in = 6304 cls.udp_port_out = 6304 cls.icmp_id_in = 6305 cls.icmp_id_out = 6305 cls.snat_addr = '10.0.0.3' cls.create_pg_interfaces(range(8)) cls.interfaces = list(cls.pg_interfaces[0:4]) for i in cls.interfaces: i.admin_up() i.config_ip4() i.resolve_arp() cls.pg0.generate_remote_hosts(2) cls.pg0.configure_ipv4_neighbors() cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7])) cls.pg4._local_ip4 = "172.16.255.1" cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4) cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2" cls.pg4.set_table_ip4(10) cls.pg5._local_ip4 = "172.16.255.3" cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4) cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4" cls.pg5.set_table_ip4(10) cls.pg6._local_ip4 = "172.16.255.1" cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4) cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2" cls.pg6.set_table_ip4(20) for i in cls.overlapping_interfaces: i.config_ip4() i.admin_up() i.resolve_arp() cls.pg7.admin_up() except Exception: super(TestSNAT, cls).tearDownClass() raise def create_stream_in(self, in_if, out_if, ttl=64): """ Create packet stream for inside network :param in_if: Inside interface :param out_if: Outside interface :param ttl: TTL of generated packets """ pkts = [] # TCP p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / TCP(sport=self.tcp_port_in)) pkts.append(p) # UDP p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / UDP(sport=self.udp_port_in)) pkts.append(p) # ICMP p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / ICMP(id=self.icmp_id_in, type='echo-request')) pkts.append(p) return pkts def create_stream_out(self, out_if, dst_ip=None, ttl=64): """ Create packet stream for outside network :param out_if: Outside interface :param dst_ip: Destination IP address (Default use global SNAT address) :param ttl: TTL of generated packets """ if dst_ip is None: dst_ip = self.snat_addr pkts = [] # TCP p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / TCP(dport=self.tcp_port_out)) pkts.append(p) # UDP p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / UDP(dport=self.udp_port_out)) pkts.append(p) # ICMP p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / ICMP(id=self.icmp_id_out, type='echo-reply')) pkts.append(p) return pkts def verify_capture_out(self, capture, nat_ip=None, same_port=False, packet_num=3): """ Verify captured packets on outside network :param capture: Captured packets :param nat_ip: Translated IP address (Default use global SNAT address) :param same_port: Sorce port number is not translated (Default False) :param packet_num: Expected number of packets (Default 3) """ if nat_ip is None: nat_ip = self.snat_addr self.assertEqual(packet_num, len(capture)) for packet in capture: try: self.assertEqual(packet[IP].src, nat_ip) if packet.haslayer(TCP): if same_port: self.assertEqual(packet[TCP].sport, self.tcp_port_in) else: self.assertNotEqual( packet[TCP].sport, self.tcp_port_in) self.tcp_port_out = packet[TCP].sport elif packet.haslayer(UDP): if same_port: self.assertEqual(packet[UDP].sport, self.udp_port_in) else: self.assertNotEqual( packet[UDP].sport, self.udp_port_in) self.udp_port_out = packet[UDP].sport else: if same_port: self.assertEqual(packet[ICMP].id, self.icmp_id_in) else: self.assertNotEqual(packet[ICMP].id, self.icmp_id_in) self.icmp_id_out = packet[ICMP].id except: self.logger.error(ppp("Unexpected or invalid packet " "(outside network):", packet)) raise def verify_capture_in(self, capture, in_if, packet_num=3): """ Verify captured packets on inside network :param capture: Captured packets :param in_if: Inside interface :param packet_num: Expected number of packets (Default 3) """ self.assertEqual(packet_num, len(capture)) for packet in capture: try: self.assertEqual(packet[IP].dst, in_if.remote_ip4) if packet.haslayer(TCP): self.assertEqual(packet[TCP].dport, self.tcp_port_in) elif packet.haslayer(UDP): self.assertEqual(packet[UDP].dport, self.udp_port_in) else: self.assertEqual(packet[ICMP].id, self.icmp_id_in) except: self.logger.error(ppp("Unexpected or invalid packet " "(inside network):", packet)) raise def verify_capture_no_translation(self, capture, ingress_if, egress_if): """ Verify captured packet that don't have to be translated :param capture: Captured packets :param ingress_if: Ingress interface :param egress_if: Egress interface """ for packet in capture: try: self.assertEqual(packet[IP].src, ingress_if.remote_ip4) self.assertEqual(packet[IP].dst, egress_if.remote_ip4) if packet.haslayer(TCP): self.assertEqual(packet[TCP].sport, self.tcp_port_in) elif packet.haslayer(UDP): self.assertEqual(packet[UDP].sport, self.udp_port_in) else: self.assertEqual(packet[ICMP].id, self.icmp_id_in) except: self.logger.error(ppp("Unexpected or invalid packet " "(inside network):", packet)) raise def verify_capture_out_with_icmp_errors(self, capture, src_ip=None, packet_num=3, icmp_type=11): """ Verify captured packets with ICMP errors on outside network :param capture: Captured packets :param src_ip: Translated IP address or IP address of VPP (Default use global SNAT address) :param packet_num: Expected number of packets (Default 3) :param icmp_type: Type of error ICMP packet we are expecting (Default 11) """ if src_ip is None: src_ip = self.snat_addr self.assertEqual(packet_num, len(capture)) for packet in capture: try: self.assertEqual(packet[IP].src, src_ip) self.assertTrue(packet.haslayer(ICMP)) icmp = packet[ICMP] self.assertEqual(icmp.type, icmp_type) self.assertTrue(icmp.haslayer(IPerror)) inner_ip = icmp[IPerror] if inner_ip.haslayer(TCPerror): self.assertEqual(inner_ip[TCPerror].dport, self.tcp_port_out) elif inner_ip.haslayer(UDPerror): self.assertEqual(inner_ip[UDPerror].dport, self.udp_port_out) else: self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out) except: self.logger.error(ppp("Unexpected or invalid packet " "(outside network):", packet)) raise def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3, icmp_type=11): """ Verify captured packets with ICMP errors on inside network :param capture: Captured packets :param in_if: Inside interface :param packet_num: Expected number of packets (Default 3) :param icmp_type: Type of error ICMP packet we are expecting (Default 11) """ self.assertEqual(packet_num, len(capture)) for packet in capture: try: self.assertEqual(packet[IP].dst, in_if.remote_ip4) self.assertTrue(packet.haslayer(ICMP)) icmp = packet[ICMP] self.assertEqual(icmp.type, icmp_type) self.assertTrue(icmp.haslayer(IPerror)) inner_ip = icmp[IPerror] if inner_ip.haslayer(TCPerror): self.assertEqual(inner_ip[TCPerror].sport, self.tcp_port_in) elif inner_ip.haslayer(UDPerror): self.assertEqual(inner_ip[UDPerror].sport, self.udp_port_in) else: self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in) except: self.logger.err
/*
 * Copyright (c) 2018 Cisco and/or its affiliates.
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at:
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <unistd.h>
#include <sys/types.h>
#include <sys/mount.h>
#include <sys/mman.h>
#include <sys/fcntl.h>
#include <sys/stat.h>
#include <unistd.h>

#include <vppinfra/linux/sysfs.h>
#include <vlib/vlib.h>
#include <vlib/physmem.h>
#include <vlib/unix/unix.h>
#include <vlib/pci/pci.h>
#include <vlib/linux/vfio.h>

#if defined(__x86_64__) && !defined(CLIB_SANITIZE_ADDR)
/* we keep physmem in low 38 bits of VA address space as some
   IOMMU implamentation cannot map above that range */
#define VLIB_PHYSMEM_DEFAULT_BASE_ADDDR		(1ULL << 36)
#else
/* let kernel decide */
#define VLIB_PHYSMEM_DEFAULT_BASE_ADDDR		0
#endif

clib_error_t *
vlib_physmem_shared_map_create (vlib_main_t * vm, char *name, uword size,
				u32 log2_page_sz, u32 numa_node,
				u32 * map_index)
{
  clib_pmalloc_main_t *pm = vm->physmem_main.pmalloc_main;
  vlib_physmem_main_t *vpm = &vm->physmem_main;
  vlib_physmem_map_t *map;
  clib_pmalloc_arena_t *a;
  clib_error_t *error = 0;
  void *va;
  uword i;

  va = clib_pmalloc_create_shared_arena (pm, name, size, log2_page_sz,
					 numa_node);

  if (va == 0)
    return clib_error_return (0, "%U", format_clib_error,
			      clib_pmalloc_last_error (pm));

  a = clib_pmalloc_get_arena (pm, va);

  pool_get (vpm->maps, map);
  *map_index = map->index = map - vpm->maps;
  map->base = va;
  map->fd = a->fd;
  map->n_pages = a->n_pages * a->subpages_per_page;
  map->log2_page_size = a->log2_subpage_sz;
  map->numa_node = a->numa_node;

  for (i = 0; i < a->n_pages; i++)
    {
      uword pa =
	clib_pmalloc_get_pa (pm, (u8 *) va + (i << a->log2_subpage_sz));

      /* maybe iova */
      if (pa == 0)
	pa = pointer_to_uword (va);

      vec_add1 (map->page_table, pa);
    }

  return error;
}

vlib_physmem_map_t *
vlib_physmem_get_map (vlib_main_t * vm, u32 index)
{
  vlib_physmem_main_t *vpm = &vm->physmem_main;
  return pool_elt_at_index (vpm->maps, index);
}

clib_error_t *
vlib_physmem_init (vlib_main_t * vm)
{
  vlib_physmem_main_t *vpm = &vm->physmem_main;
  clib_error_t *error = 0;
  u64 *pt = 0;
  void *p;

  /* check if pagemap is accessible */
  pt = clib_mem_vm_get_paddr (&pt, min_log2 (sysconf (_SC_PAGESIZE)), 1);
  if (pt && pt[0])
    vpm->flags |= VLIB_PHYSMEM_MAIN_F_HAVE_PAGEMAP;
  vec_free (pt);

  if ((error = linux_vfio_init (vm)))
    return error;

  p = clib_mem_alloc_aligned (sizeof (clib_pmalloc_main_t),
			      CLIB_CACHE_LINE_BYTES);
  memset (p, 0, sizeof (clib_pmalloc_main_t));
  vpm->pmalloc_main = (clib_pmalloc_main_t *) p;

  if (vpm->base_addr == 0)
    vpm->base_addr = VLIB_PHYSMEM_DEFAULT_BASE_ADDDR;

  clib_pmalloc_init (vpm->pmalloc_main, vpm->base_addr, vpm->max_size);

  /* update base_addr and max_size per actual allocation */
  vpm->base_addr = (uword) vpm->pmalloc_main->base;
  vpm->max_size = (uword) vpm->pmalloc_main->max_pages <<
    vpm->pmalloc_main->def_log2_page_sz;

  return error;
}

static clib_error_t *
show_physmem (vlib_main_t * vm,
	      unformat_input_t * input, vlib_cli_command_t * cmd)
{
  vlib_physmem_main_t *vpm = &vm->physmem_main;
  unformat_input_t _line_input, *line_input = &_line_input;
  u32 verbose = 0, map = 0;

  if (unformat_user (input, unformat_line_input, line_input))
    {
      while (unformat_check_input (line_input) != UNFORMAT_END_OF_INPUT)
	{
	  if (unformat (line_input, "verbose"))
	    verbose = 1;
	  else if (unformat (line_input, "v"))
	    verbose = 1;
	  else if (unformat (line_input, "detail"))
	    verbose = 2;
	  else if (unformat (line_input, "d"))
	    verbose = 2;
	  else if (unformat (line_input, "map"))
	    map = 1;
	  else
	    break;
	}
      unformat_free (line_input);
    }

  if (map)
    vlib_cli_output (vm, " %U", format_pmalloc_map, vpm->pmalloc_main);
  else
    vlib_cli_output (vm, " %U", format_pmalloc, vpm->pmalloc_main, verbose);

  return 0;
}

/* *INDENT-OFF* */
VLIB_CLI_COMMAND (show_physmem_command, static) = {
  .path = "show physmem",
  .short_help = "show physmem [verbose | detail | map]",
  .function = show_physmem,
};
/* *INDENT-ON* */

static clib_error_t *
vlib_physmem_config (vlib_main_t * vm, unformat_input_t * input)
{
  vlib_physmem_main_t *vpm = &vm->physmem_main;

  while (unformat_check_input (input) != UNFORMAT_END_OF_INPUT)
    {
      if (unformat (input, "base-addr 0x%lx", &vpm->base_addr))
	;
      else if (unformat (input, "max-size %U",
			 unformat_memory_size, &vpm->max_size))
	;
      else
	return unformat_parse_error (input);
    }

  unformat_free (input);
  return 0;
}

VLIB_EARLY_CONFIG_FUNCTION (vlib_physmem_config, "physmem");

/*
 * fd.io coding-style-patch-verification: ON
 *
 * Local Variables:
 * eval: (c-set-style "gnu")
 * End:
 */
_ip4n, path_mtu=512, template_interval=10) self.vapi.snat_ipfix() pkts = self.create_stream_in(self.pg0, self.pg1) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) self.verify_capture_out(capture) self.snat_add_address(self.snat_addr, is_add=0) self.vapi.cli("ipfix flush") # FIXME this should be an API call capture = self.pg3.get_capture(3) ipfix = IPFIXDecoder() # first load template for p in capture: self.assertTrue(p.haslayer(IPFIX)) if p.haslayer(Template): ipfix.add_template(p.getlayer(Template)) # verify events in data set for p in capture: if p.haslayer(Data): data = ipfix.decode_data_set(p.getlayer(Set)) self.verify_ipfix_nat44_ses(data) def test_ipfix_addr_exhausted(self): """ S-NAT IPFIX logging NAT addresses exhausted """ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index, is_inside=0) self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n, src_address=self.pg3.local_ip4n, path_mtu=512, template_interval=10) self.vapi.snat_ipfix() p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / TCP(sport=3025)) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(0) self.vapi.cli("ipfix flush") # FIXME this should be an API call capture = self.pg3.get_capture(3) ipfix = IPFIXDecoder() # first load template for p in capture: self.assertTrue(p.haslayer(IPFIX)) if p.haslayer(Template): ipfix.add_template(p.getlayer(Template)) # verify events in data set for p in capture: if p.haslayer(Data): data = ipfix.decode_data_set(p.getlayer(Set)) self.verify_ipfix_addr_exhausted(data) def test_pool_addr_fib(self): """ S-NAT add pool addresses to FIB """ static_addr = '10.0.0.10' self.snat_add_address(self.snat_addr) self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index, is_inside=0) self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr) # SNAT address p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') / ARP(op=ARP.who_has, pdst=self.snat_addr, psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac)) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(1) self.assertTrue(capture[0].haslayer(ARP)) self.assertTrue(capture[0][ARP].op, ARP.is_at) # 1:1 NAT address p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') / ARP(op=ARP.who_has, pdst=static_addr, psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac)) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(1) self.assertTrue(capture[0].haslayer(ARP)) self.assertTrue(capture[0][ARP].op, ARP.is_at) # send ARP to non-SNAT interface p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') / ARP(op=ARP.who_has, pdst=self.snat_addr, psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac)) self.pg2.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(0) # remove addresses and verify self.snat_add_address(self.snat_addr, is_add=0) self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr, is_add=0) p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') / ARP(op=ARP.who_has, pdst=self.snat_addr, psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac)) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(0) p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') / ARP(op=ARP.who_has, pdst=static_addr, psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac)) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(0) def test_vrf_mode(self): """ S-NAT tenant VRF aware address pool mode """ vrf_id1 = 1 vrf_id2 = 2 nat_ip1 = "10.0.0.10" nat_ip2 = "10.0.0.11" self.pg0.unconfig_ip4() self.pg1.unconfig_ip4() self.pg0.set_table_ip4(vrf_id1) self.pg1.set_table_ip4(vrf_id2) self.pg0.config_ip4() self.pg1.config_ip4() self.snat_add_address(nat_ip1, vrf_id=vrf_id1) self.snat_add_address(nat_ip2, vrf_id=vrf_id2) self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index) self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index, is_inside=0) # first VRF pkts = self.create_stream_in(self.pg0, self.pg2) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg2.get_capture(len(pkts)) self.verify_capture_out(capture, nat_ip1) # second VRF pkts = self.create_stream_in(self.pg1, self.pg2) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg2.get_capture(len(pkts)) self.verify_capture_out(capture, nat_ip2) def test_vrf_feature_independent(self): """ S-NAT tenant VRF independent address pool mode """ nat_ip1 = "10.0.0.10" nat_ip2 = "10.0.0.11" self.snat_add_address(nat_ip1) self.snat_add_address(nat_ip2) self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index) self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index, is_inside=0) # first VRF pkts = self.create_stream_in(self.pg0, self.pg2) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg2.get_capture(len(pkts)) self.verify_capture_out(capture, nat_ip1) # second VRF pkts = self.create_stream_in(self.pg1, self.pg2) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg2.get_capture(len(pkts)) self.verify_capture_out(capture, nat_ip1) def tearDown(self): super(TestSNAT, self).tearDown() if not self.vpp_dead: self.logger.info(self.vapi.cli("show snat verbose")) self.clear_snat() class TestDeterministicNAT(VppTestCase): """ Deterministic NAT Test Cases """ @classmethod def setUpConstants(cls): super(TestDeterministicNAT, cls).setUpConstants() cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"]) @classmethod def setUpClass(cls): super(TestDeterministicNAT, cls).setUpClass() try: cls.create_pg_interfaces(range(2)) cls.interfaces = list(cls.pg_interfaces) for i in cls.interfaces: i.admin_up() i.config_ip4() i.resolve_arp() except Exception: super(TestDeterministicNAT, cls).tearDownClass() raise def test_deterministic_mode(self): """ S-NAT run deterministic mode """ in_addr = '172.16.255.0' out_addr = '172.17.255.50' in_addr_t = '172.16.255.20' in_addr_n = socket.inet_aton(in_addr) out_addr_n = socket.inet_aton(out_addr) in_addr_t_n = socket.inet_aton(in_addr_t) in_plen = 24 out_plen = 32 snat_config = self.vapi.snat_show_config() self.assertEqual(1, snat_config.deterministic) self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen) rep1 = self.vapi.snat_det_forward(in_addr_t_n) self.assertEqual(rep1.out_addr[:4], out_addr_n) rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi) self.assertEqual(rep2.in_addr[:4], in_addr_t_n) deterministic_mappings = self.vapi.snat_det_map_dump() self.assertEqual(len(deterministic_mappings), 1) dsm = deterministic_mappings[0] self.assertEqual(in_addr_n, dsm.in_addr[:4]) self.assertEqual(in_plen, dsm.in_plen) self.assertEqual(out_addr_n, dsm.out_addr[:4]) self.assertEqual(out_plen, dsm.out_plen) self.clear_snat() deterministic_mappings = self.vapi.snat_det_map_dump() self.assertEqual(len(deterministic_mappings), 0) def clear_snat(self): """ Clear SNAT configuration. """ deterministic_mappings = self.vapi.snat_det_map_dump() for dsm in deterministic_mappings: self.vapi.snat_add_det_map(dsm.in_addr, dsm.in_plen, dsm.out_addr, dsm.out_plen, is_add=0) def tearDown(self): super(TestDeterministicNAT, self).tearDown() if not self.vpp_dead: self.logger.info(self.vapi.cli("show snat detail")) self.clear_snat() if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)