#!/usr/bin/env python3 import ipaddress import random import socket import struct import unittest from io import BytesIO import scapy.compat from config import config from framework import VppTestCase from asfframework import ( tag_fixme_vpp_workers, VppTestRunner, ) from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder from scapy.data import IP_PROTOS from scapy.layers.inet import IP, TCP, UDP, ICMP from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment from scapy.layers.inet6 import ( IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6, ) from scapy.layers.l2 import Ether, GRE from scapy.packet import Raw from syslog_rfc5424_parser import SyslogMessage, ParseError from syslog_rfc5424_parser.constants import SyslogSeverity from util import ppc, ppp from vpp_papi import VppEnum from config import config @tag_fixme_vpp_workers @unittest.skipIf("nat" in config.excluded_plugins, "Exclude NAT plugin tests") class TestNAT64(VppTestCase): """NAT64 Test Cases""" @property def SYSLOG_SEVERITY(self): return VppEnum.vl_api_syslog_severity_t @property def config_flags(self): return VppEnum.vl_api_nat_config_flags_t @classmethod def setUpClass(cls): super(TestNAT64, cls).setUpClass() cls.tcp_port_in = 6303 cls.tcp_port_out = 6303 cls.udp_port_in = 6304 cls.udp_port_out = 6304 cls.icmp_id_in = 6305 cls.icmp_id_out = 6305 cls.tcp_external_port = 80 cls.nat_addr = "10.0.0.3" cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr) cls.vrf1_id = 10 cls.vrf1_nat_addr = "10.0.10.3" cls.ipfix_src_port = 4739 cls.ipfix_domain_id = 1 cls.create_pg_interfaces(range(6)) cls.ip6_interfaces = list(cls.pg_interfaces[0:1]) cls.ip6_interfaces.append(cls.pg_interfaces[2]) cls.ip4_interfaces = list(cls.pg_interfaces[1:2]) cls.vapi.ip_table_add_del_v2( is_add=1, table={"table_id": cls.vrf1_id, "is_ip6": 1} ) cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id) cls.pg0.generate_remote_hosts(2) for i in cls.ip6_interfaces: i.admin_up() i.config_ip6() i.configure_ipv6_neighbors() for i in cls.ip4_interfaces: i.admin_up() i.config_ip4() i.resolve_arp() cls.pg3.admin_up() cls.pg3.config_ip4() cls.pg3.resolve_arp() cls.pg3.config_ip6() cls.pg3.configure_ipv6_neighbors() cls.pg5.admin_up() cls.pg5.config_ip6() @classmethod def tearDownClass(cls): super(TestNAT64, cls).tearDownClass() def setUp(self): super(TestNAT64, self).setUp() self.vapi.nat64_plugin_enable_disable(enable=1, bib_buckets=128, st_buckets=256) def tearDown(self): super(TestNAT64, self).tearDown() if not self.vpp_dead: self.vapi.nat64_plugin_enable_disable(enable=0) def show_commands_at_teardown(self): self.logger.info(self.vapi.cli("show nat64 pool")) self.logger.info(self.vapi.cli("show nat64 interfaces")) self.logger.info(self.vapi.cli("show nat64 prefix")) self.logger.info(self.vapi.cli("show nat64 bib all")) self.logger.info(self.vapi.cli("show nat64 session table all")) def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0): """ Create IPv6 packet stream for inside network :param in_if: Inside interface :param out_if: Outside interface :param ttl: Hop Limit of generated packets :param pref: NAT64 prefix :param plen: NAT64 prefix length """ pkts = [] if pref is None: dst = "".join(["64:ff9b::", out_if.remote_ip4]) else: dst = self.compose_ip6(out_if.remote_ip4, pref, plen) # TCP p = ( Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) / TCP(sport=self.tcp_port_in, dport=20) ) pkts.append(p) # UDP p = ( Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) / UDP(sport=self.udp_port_in, dport=20) ) pkts.append(p) # ICMP p = ( Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) / ICMPv6EchoRequest(id=self.icmp_id_in) ) pkts.append(p) return pkts def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False): """ Create packet stream for outside network :param out_if: Outside interface :param dst_ip: Destination IP address (Default use global NAT address) :param ttl: TTL of generated packets :param use_inside_ports: Use inside NAT ports as destination ports instead of outside ports """ if dst_ip is None: dst_ip = self.nat_addr if not use_inside_ports: tcp_port = self.tcp_port_out udp_port = self.udp_port_out icmp_id = self.icmp_id_out else: tcp_port = self.tcp_port_in udp_port = self.udp_port_in icmp_id = self.icmp_id_in pkts = [] # TCP p = ( Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / TCP(dport=tcp_port, sport=20) ) pkts.extend([p, p]) # UDP p = ( Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / UDP(dport=udp_port, sport=20) ) pkts.append(p) # ICMP p = ( Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / ICMP(id=icmp_id, type="echo-reply") ) pkts.append(p) return pkts def verify_capture_out( self, capture, nat_ip=None, same_port=False, dst_ip=None, is_ip6=False, ignore_port=False, ): """ Verify captured packets on outside network :param capture: Captured packets :param nat_ip: Translated IP address (Default use global NAT address) :param same_port: Source port number is not translated (Default False) :param dst_ip: Destination IP address (Default do not verify) :param is_ip6: If L3 protocol is IPv6 (Default False) """ if is_ip6: IP46 = IPv6 ICMP46 = ICMPv6EchoRequest else: IP46 = IP ICMP46 = ICMP if nat_ip is None: nat_ip = self.nat_addr for packet in capture: try: if not is_ip6: self.assert_packet_checksums_valid(packet) self.assertEqual(packet[IP46].src, nat_ip) if dst_ip is not None: self.assertEqual(packet[IP46].dst, dst_ip) if packet.haslayer(TCP): if not ignore_port: if same_port: self.assertEqual(packet[TCP].sport, self.tcp_port_in) else: self.assertNotEqual(packet[TCP].sport, self.tcp_port_in) self.tcp_port_out = packet[TCP].sport self.assert_packet_checksums_valid(packet) elif packet.haslayer(UDP): if not ignore_port: if same_port: self.assertEqual(packet[UDP].sport, self.udp_port_in) else: self.assertNotEqual(packet[UDP].sport, self.udp_port_in) self.udp_port_out = packet[UDP].sport else: if not ignore_port: if same_port: self.assertEqual(packet[ICMP46].id, self.icmp_id_in) else: self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in) self.icmp_id_out = packet[ICMP46].id self.assert_packet_checksums_valid(packet) except: self.logger.error( ppp("Unexpected or invalid packet (outside network):", packet) ) raise def verify_capture_in_ip6(self, capture, src_ip, dst_ip): """ Verify captured IPv6 packets on inside network :param capture: Captured packets :param src_ip: Source IP :param dst_ip: Destination IP address """ for packet in capture: try: self.assertEqual(packet[IPv6].src, src_ip) self.assertEqual(packet[IPv6].dst, dst_ip) self.assert_packet_checksums_valid(packet) if packet.haslayer(TCP): self.assertEqual(packet[TCP].dport, self.tcp_port_in) elif packet.haslayer(UDP): self.assertEqual(packet[UDP].dport, self.udp_port_in) else: self.assertEqual(packet[ICMPv6EchoReply].id, self.icmp_id_in) except: self.logger.error( ppp("Unexpected or invalid packet (inside network):", packet) ) raise def create_stream_frag( self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False ): """ Create fragmented packet stream :param src_if: Source interface :param dst: Destination IPv4 address :param sport: Source port :param dport: Destination port :param data: Payload data :param proto: protocol (TCP, UDP, ICMP) :param echo_reply: use echo_reply if protocol is ICMP :returns: Fragments """ if proto == IP_PROTOS.tcp: p = ( IP(src=src_if.remote_ip4, dst=dst) / TCP(sport=sport, dport=dport) / Raw(data) ) p = p.__class__(scapy.compat.raw(p)) chksum = p[TCP].chksum proto_header = TCP(sport=sport, dport=dport, chksum=chksum) elif proto == IP_PROTOS.udp: proto_header = UDP(sport=sport, dport=dport) elif proto == IP_PROTOS.icmp: if not echo_reply: proto_header = ICMP(id=sport, type="echo-request") else: proto_header = ICMP(id=sport, type="echo-reply") else: raise Exception("Unsupported protocol") id = random.randint(0, 65535) pkts = [] if proto == IP_PROTOS.tcp: raw = Raw(data[0:4]) else: raw = Raw(data[0:16]) p = ( Ether(src=src_if.remote_mac, dst=src_if.local_mac) / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) / proto_header / raw ) pkts.append(p) if proto == IP_PROTOS.tcp: raw = Raw(data[4:20]) else: raw = Raw(data[16:32]) p = ( Ether(src=src_if.remote_mac, dst=src_if.local_mac) / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto) / raw ) pkts.append(p) if proto == IP_PROTOS.tcp: raw = Raw(data[20:]) else: raw = Raw(data[32:]) p = ( Ether(src=src_if.remote_mac, dst=src_if.local_mac) / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id) / raw ) pkts.append(p) return pkts def create_stream_frag_ip6( self, src_if, dst, sport, dport, data, pref=None, plen=0, frag_size=128 ): """ Create fragmented packet stream :param src_if: Source interface :param dst: Destination IPv4 address :param sport: Source TCP port :param dport: Destination TCP port :param data: Payload data :param pref: NAT64 prefix :param plen: NAT64 prefix length :param fragsize: size of fragments :returns: Fragments """ if pref is None: dst_ip6 = "".join(["64:ff9b::", dst]) else: dst_ip6 = self.compose_ip6(dst, pref, plen) p = ( Ether(dst=src_if.local_mac, src=src_if.remote_mac) / IPv6(src=src_if.remote_ip6, dst=dst_ip6) / IPv6ExtHdrFragment(id=random.randint(0, 65535)) / TCP(sport=sport, dport=dport) / Raw(data) ) return fragment6(p, frag_size) def reass_frags_and_verify(self, frags, src, dst): """ Reassemble and verify fragmented packet :param frags: Captured fragments :param src: Source IPv4 address to verify :param dst: Destination IPv4 address to verify :returns: Reassembled IPv4 packet """ buffer = BytesIO() for p in frags: self.assertEqual(p[IP].src, src) self.assertEqual(p[IP].dst, dst) self.assert_ip_checksum_valid(p) buffer.seek(p[IP].frag * 8) buffer.write(bytes(p[IP].payload)) ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto) if ip.proto == IP_PROTOS.tcp: p = ip / TCP(buffer.getvalue()) self.logger.debug(ppp("Reassembled:", p)) self.assert_tcp_checksum_valid(p) elif ip.proto == IP_PROTOS.udp: p = ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:]) elif ip.proto == IP_PROTOS.icmp: p = ip / ICMP(buffer.getvalue()) return p def reass_frags_and_verify_ip6(self, frags, src, dst): """ Reassemble and verify fragmented packet :param frags: Captured fragments :param src: Source IPv6 address to verify :param dst: Destination IPv6 address to verify :returns: Reassembled IPv6 packet """ buffer = BytesIO() for p in frags: self.assertEqual(p[IPv6].src, src) self.assertEqual(p[IPv6].dst, dst) buffer.seek(p[IPv6ExtHdrFragment].offset * 8) buffer.write(bytes(p[IPv6ExtHdrFragment].payload)) ip = IPv6( src=frags[0][IPv6].src, dst=frags[0][IPv6].dst, nh=frags[0][IPv6ExtHdrFragment].nh, ) if ip.nh == IP_PROTOS.tcp: p = ip / TCP(buffer.getvalue()) elif ip.nh == IP_PROTOS.udp: p = ip / UDP(buffer.getvalue()) self.logger.debug(ppp("Reassembled:", p)) self.assert_packet_checksums_valid(p) return p def verify_ipfix_max_bibs(self, data, limit): """ Verify IPFIX maximum BIB entries exceeded event :param data: Decoded IPFIX data records :param limit: Number of maximum BIB entries that can be created. """ self.assertEqual(1, len(data)) record = data[0] # natEvent self.assertEqual(scapy.compat.orb(record[230]), 13) # natQuotaExceededEvent self.assertEqual(struct.pack("!I", 2), record[466]) # maxBIBEntries self.assertEqual(struct.pack("!I", limit), record[472]) return len(data) def verify_ipfix_bib(self, data, is_create, src_addr): """ Verify IPFIX NAT64 BIB create and delete events :param data: Decoded IPFIX data records :param is_create: Create event if nonzero value otherwise delete event :param src_addr: IPv6 source address """ self.assertEqual(1, len(data)) record = data[0] # natEvent if is_create: self.assertEqual(scapy.compat.orb(record[230]), 10) else: self.assertEqual(scapy.compat.orb(record[230]), 11) # sourceIPv6Address self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27]))) # postNATSourceIPv4Address self.assertEqual(self.nat_addr_n, record[225]) # protocolIdentifier self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4])) # ingressVRFID self.assertEqual(struct.pack("!I", 0), record[234]) # sourceTransportPort self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7]) # postNAPTSourceTransportPort self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227]) def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr, dst_port): """ Verify IPFIX NAT64 session create and delete events :param data: Decoded IPFIX data records :param is_create: Create event if nonzero value otherwise delete event :param src_addr: IPv6 source address :param dst_addr: IPv4 destination address :param dst_port: destination TCP port """ self.assertEqual(1, len(data)) record = data[0] # natEvent if is_create: self.assertEqual(scapy.compat.orb(record[230]), 6) else: self.assertEqual(scapy.compat.orb(record[230]), 7) # sourceIPv6Address self.assertEqual(src_addr, str(ipaddress.IPv6Address(record[27]))) # destinationIPv6Address self.assertEqual( socket.inet_pton( socket.AF_INET6, self.compose_ip6(dst_addr, "64:ff9b::", 96) ), record[28], ) # postNATSourceIPv4Address self.assertEqual(self.nat_addr_n, record[225]) # postNATDestinationIPv4Address self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr), record[226]) # protocolIdentifier self.assertEqual(IP_PROTOS.tcp, scapy.compat.orb(record[4])) # ingressVRFID self.assertEqual(struct.pack("!I", 0), record[234]) # sourceTransportPort self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7]) # postNAPTSourceTransportPort self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227]) # destinationTransportPort self.assertEqual(struct.pack("!H", dst_port), record[11]) # postNAPTDestinationTransportPort self.assertEqual(struct.pack("!H", dst_port), record[228]) def verify_syslog_sess(self, data, is_add=True, is_ip6=False): message = data.decode("utf-8") try: message = SyslogMessage.parse(message) except ParseError as e: self.logger.error(e) raise else: self.assertEqual(message.severity, SyslogSeverity.info) self.assertEqual(message.appname, "NAT") self.assertEqual(message.msgid, "SADD" if is_add else "SDEL") sd_params = message.sd.get("nsess") self.assertTrue(sd_params is not None) if is_ip6: self.assertEqual(sd_params.get("IATYP"), "IPv6") self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip6) else: self.assertEqual(sd_params.get("IATYP"), "IPv4") self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip4) self.assertTrue(sd_params.get("SSUBIX") is not None) self.assertEqual(sd_params.get("ISPORT"), "%d" % self.tcp_port_in) self.assertEqual(sd_params.get("XATYP"), "IPv4") self.assertEqual(sd_params.get("XSADDR"), self.nat_addr) self.assertEqual(sd_params.get("XSPORT"), "%d" % self.tcp_port_out) self.assertEqual(sd_params.get("PROTO"), "%d" % IP_PROTOS.tcp) self.assertEqual(sd_params.get("SVLAN"), "0") self.assertEqual(sd_params.get("XDADDR"), self.pg1.remote_ip4) self.assertEqual(sd_params.get("XDPORT"), "%d" % self.tcp_external_port) def compose_ip6(self, ip4, pref, plen): """ Compose IPv4-embedded IPv6 addresses :param ip4: IPv4 address :param pref: IPv6 prefix :param plen: IPv6 prefix length :returns: IPv4-embedded IPv6 addresses """ pref_n = list(socket.inet_pton(socket.AF_INET6, pref)) ip4_n = list(socket.inet_pton(socket.AF_INET, ip4)) if plen == 32: pref_n[4] = ip4_n[0] pref_n[5] = ip4_n[1] pref_n[6] = ip4_n[2] pref_n[7] = ip4_n[3] elif plen == 40: pref_n[5] = ip4_n[0] pref_n[6] = ip4_n[1] pref_n[7] = ip4_n[2] pref_n[9] = ip4_n[3] elif plen == 48: pref_n[6] = ip4_n[0] pref_n[7] = ip4_n[1] pref_n[9] = ip4_n[2] pref_n[10] = ip4_n[3] elif plen == 56: pref_n[7] = ip4_n[0] pref_n[9] = ip4_n[1] pref_n[10] = ip4_n[2] pref_n[11] = ip4_n[3] elif plen == 64: pref_n[9] = ip4_n[0] pref_n[10] = ip4_n[1] pref_n[11] = ip4_n[2] pref_n[12] = ip4_n[3] elif plen == 96: pref_n[12] = ip4_n[0] pref_n[13] = ip4_n[1] pref_n[14] = ip4_n[2] pref_n[15] = ip4_n[3] packed_pref_n = b"".join([scapy.compat.chb(x) for x in pref_n]) return socket.inet_ntop(socket.AF_INET6, packed_pref_n) def verify_ipfix_max_sessions(self, data, limit): """ Verify IPFIX maximum session entries exceeded event :param data: Decoded IPFIX data records :param limit: Number of maximum session entries that can be created. """ self.assertEqual(1, len(data)) record = data[0] # natEvent self.assertEqual(scapy.compat.orb(record[230]), 13) # natQuotaExceededEvent self.assertEqual(struct.pack("!I", 1), record[466]) # maxSessionEntries self.assertEqual(struct.pack("!I", limit), record[471]) return len(data) def test_nat64_inside_interface_handles_neighbor_advertisement(self): """NAT64 inside interface handles Neighbor Advertisement""" flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat64_add_del_interface( is_add=1, flags=flags, sw_if_index=self.pg5.sw_if_index ) # Try to send ping ping = ( Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) / IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) / ICMPv6EchoRequest() ) pkts = [ping] self.pg5.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() # Wait for Neighbor Solicitation capture = self.pg5.get_capture(len(pkts)) packet = capture[0] try: self.assertEqual(packet[IPv6].src, self.pg5.local_ip6_ll) self.assertEqual(packet.haslayer(ICMPv6ND_NS), 1) tgt = packet[ICMPv6ND_NS].tgt except: self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise # Send Neighbor Advertisement p = ( Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) / IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) / ICMPv6ND_NA(tgt=tgt) / ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac) ) pkts = [p] self.pg5.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() # Try to send ping again pkts = [ping] self.pg5.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() # Wait for ping reply capture = self.pg5.get_capture(len(pkts)) packet = capture[0] try: self.assertEqual(packet[IPv6].src, self.pg5.local_ip6) self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6) self.assertEqual(packet.haslayer(ICMPv6EchoReply), 1) except: self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise def test_pool(self): """Add/delete address to NAT64 pool""" nat_addr = "1.2.3.4" self.vapi.nat64_add_del_pool_addr_range( start_addr=nat_addr, end_addr=nat_addr, vrf_id=0xFFFFFFFF, is_add=1 ) addresses = self.vapi.nat64_pool_addr_dump() self.assertEqual(len(addresses), 1) self.assertEqual(str(addresses[0].address), nat_addr) self.vapi.nat64_add_del_pool_addr_range( start_addr=nat_addr, end_addr=nat_addr, vrf_id=0xFFFFFFFF, is_add=0 ) addresses = self.vapi.nat64_pool_addr_dump() self.assertEqual(len(addresses), 0) def test_interface(self): """Enable/disable NAT64 feature on the interface""" flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat64_add_del_interface( is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index ) self.vapi.nat64_add_del_interface( is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index ) interfaces = self.vapi.nat64_interface_dump() self.assertEqual(len(interfaces), 2) pg0_found = False pg1_found = False for intf in interfaces: if intf.sw_if_index == self.pg0.sw_if_index: self.assertEqual(intf.flags, self.config_flags.NAT_IS_INSIDE) pg0_found = True elif intf.sw_if_index == self.pg1.sw_if_index: self.assertEqual(intf.flags, self.config_flags.NAT_IS_OUTSIDE) pg1_found = True self.assertTrue(pg0_found) self.assertTrue(pg1_found) features = self.vapi.cli("show interface features pg0") self.assertIn("nat64-in2out", features) features = self.vapi.cli("show interface features pg1") self.assertIn("nat64-out2in", features) self.vapi.nat64_add_del_interface( is_add=0, flags=flags, sw_if_index=self.pg0.sw_if_index ) self.vapi.nat64_add_del_interface( is_add=0, flags=flags, sw_if_index=self.pg1.sw_if_index ) interfaces = self.vapi.nat64_interface_dump() self.assertEqual(len(interfaces), 0) def test_static_bib(self): """Add/delete static BIB entry""" in_addr = "2001:db8:85a3::8a2e:370:7334" out_addr = "10.1.1.3" in_port = 1234 out_port = 5678 proto = IP_PROTOS.tcp self.vapi.nat64_add_del_static_bib( i_addr=in_addr, o_addr=out_addr, i_port=in_port, o_port=out_port, proto=proto, vrf_id=0, is_add=1, ) bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp) static_bib_num = 0 for bibe in bib: if bibe.flags & self.config_flags.NAT_IS_STATIC: static_bib_num += 1 self.assertEqual(str(bibe.i_addr), in_addr) self.assertEqual(str(bibe.o_addr), out_addr) self.assertEqual(bibe.i_port, in_port) self.assertEqual(bibe.o_port, out_port) self.assertEqual(static_bib_num, 1) bibs = self.statistics.get_counter("/nat64/total-bibs") self.assertEqual(bibs[0][0], 1) self.vapi.nat64_add_del_static_bib( i_addr=in_addr, o_addr=out_addr, i_port=in_port, o_port=out_port, proto=proto, vrf_id=0, is_add=0, ) bib = self.vapi.nat64_bib_dump(proto=IP_PROTOS.tcp) static_bib_num = 0 for bibe in bib: if bibe.flags & self.config_flags.NAT_IS_STATIC: static_bib_num += 1 self.assertEqual(static_bib_num, 0) bibs = self.statistics.get_counter("/nat64/total-bibs") self.assertEqual(bibs[0][0], 0) def test_set_timeouts(self): """Set NAT64 timeouts""" # verify default values timeouts = self.vapi.nat64_get_timeouts() self.assertEqual(timeouts.udp, 300) self.assertEqual(timeouts.icmp, 60) self.assertEqual(timeouts.tcp_transitory, 240) self.assertEqual(timeouts.tcp_established, 7440) # set and verify custom values self.vapi.nat64_set_timeouts( udp=200, tcp_established=7450, tcp_transitory=250, icmp=30 ) timeouts = self.vapi.nat64_get_timeouts() self.assertEqual(timeouts.udp, 200) self.assertEqual(timeouts.icmp, 30) self.assertEqual(timeouts.tcp_transitory, 250) self.assertEqual(timeouts.tcp_established, 7450) def test_dynamic(self): """NAT64 dynamic translation test""" self.tcp_port_in = 6303 self.udp_port_in = 6304 self.icmp_id_in = 6305 ses_num_start = self.nat64_get_ses_num() self.vapi.nat64_add_del_pool_addr_range( start_addr=self.nat_addr, end_addr=self.nat_addr, vrf_id=0xFFFFFFFF, is_add=1, ) flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat64_add_del_interface( is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index ) self.vapi.nat64_add_del_interface( is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index ) # in2out tcpn = self.statistics.get_counter("/nat64/in2out/tcp")[0] udpn = self.statistics.get_counter("/nat64/in2out/udp")[0] icmpn = self.statistics.get_counter("/nat64/in2out/icmp")[0] drops = self.statistics.get_counter("/nat64/in2out/drops")[0] pkts = self.create_stream_in_ip6(self.pg0, self.pg1) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) self.verify_capture_out( capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4 ) if_idx = self.pg0.sw_if_index cnt = self.statistics.get_counter("/nat64/in2out/tcp")[0] self.assertEqual(cnt[if_idx] - tcpn[if_idx], 1) cnt = self.statistics.get_counter("/nat64/in2out/udp")[0] self.assertEqual(cnt[if_idx] - udpn[if_idx], 1) cnt = self.statistics.get_counter("/nat64/in2out/icmp")[0] self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1) cnt = self.statistics.get_counter("/nat64/in2out/drops")[0] self.assertEqual(cnt[if_idx] - drops[if_idx], 0) # out2in tcpn = self.statistics.get_counter("/nat64/out2in/tcp")[0] udpn = self.statistics.get_counter("/nat64/out2in/udp")[0] icmpn = self.statistics.get_counter("/nat64/out2in/icmp")[0] drops = self.statistics.get_counter("/nat64/out2in/drops")[0] pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg0.get_capture(len(pkts)) ip = IPv6(src="".join(["64:ff9b::", self.pg1.remote_ip4])) self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6) if_idx = self.pg1.sw_if_index cnt = self.statistics.get_counter("/nat64/out2in/tcp")[0] self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2) cnt = self.statistics.get_counter("/nat64/out2in/udp")[0] self.assertEqual(cnt[if_idx] - udpn[if_idx], 1) cnt = self.statistics.get_counter("/nat64/out2in/icmp")[0] self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1) cnt = self.statistics.get_counter("/nat64/out2in/drops")[0] self.assertEqual(cnt[if_idx] - drops[if_idx], 0) bibs = self.statistics.get_counter("/nat64/total-bibs") self.assertEqual(bibs[0][0], 3) sessions = self.statistics.get_counter("/nat64/total-sessions") self.assertEqual(sessions[0][0], 3) # in2out pkts = self.create_stream_in_ip6(self.pg0, self.pg1) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) self.verify_capture_out( capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4 ) # out2in pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg0.get_capture(len(pkts)) self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6) ses_num_end = self.nat64_get_ses_num() self.assertEqual(ses_num_end - ses_num_start, 3) # tenant with specific VRF self.vapi.nat64_add_del_pool_addr_range( start_addr=self.vrf1_nat_addr, end_addr=self.vrf1_nat_addr, vrf_id=self.vrf1_id, is_add=1, ) flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat64_add_del_interface( is_add=1, flags=flags, sw_if_index=self.pg2.sw_if_index ) pkts = self.create_stream_in_ip6(self.pg2, self.pg1) self.pg2.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) self.verify_capture_out( capture, nat_ip=self.vrf1_nat_addr, dst_ip=self.pg1.remote_ip4 ) pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg2.get_capture(len(pkts)) self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6) def test_static(self): """NAT64 static translation test""" self.tcp_port_in = 60303 self.udp_port_in = 60304 self.icmp_id_in = 60305 self.tcp_port_out = 60303 self.udp_port_out = 60304 self.icmp_id_out = 60305 ses_num_start = self.nat64_get_ses_num() self.vapi.nat64_add_del_pool_addr_range( start_addr=self.nat_addr, end_addr=self.nat_addr, vrf_id=0xFFFFFFFF, is_add=1, ) flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat64_add_del_interface( is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index ) self.vapi.nat64_add_del_interface( is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index ) self.vapi.nat64_add_del_static_bib( i_addr=self.pg0.remote_ip6, o_addr=self.nat_addr, i_port=self.tcp_port_in, o_port=self.tcp_port_out, proto=IP_PROTOS.tcp, vrf_id=0, is_add=1, ) self.vapi.nat64_add_del_static_bib( i_addr=self.pg0.remote_ip6, o_addr=self.nat_addr, i_port=self.udp_port_in, o_port=self.udp_port_out, proto=IP_PROTOS.udp, vrf_id=0, is_add=1, ) self.vapi.nat64_add_del_static_bib( i_addr=self.pg0.remote_ip6, o_addr=self.nat_addr, i_port=self.icmp_id_in, o_port=self.icmp_id_out, proto=IP_PROTOS.icmp, vrf_id=0, is_add=1, ) # in2out pkts = self.create_stream_in_ip6(self.pg0, self.pg1) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) self.verify_capture_out( capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4, same_port=True ) # out2in pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg0.get_capture(len(pkts)) ip = IPv6(src="".join(["64:ff9b::", self.pg1.remote_ip4])) self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6) ses_num_end = self.nat64_get_ses_num() self.assertEqual(ses_num_end - ses_num_start, 3) def test_session_timeout(self): """NAT64 session timeout""" self.icmp_id_in = 1234 self.vapi.nat64_add_del_pool_addr_range( start_addr=self.nat_addr, end_addr=self.nat_addr, vrf_id=0xFFFFFFFF, is_add=1, ) flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat64_add_del_interface( is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index ) self.vapi.nat64_add_del_interface( is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index ) self.vapi.nat64_set_timeouts( udp=300, tcp_established=5, tcp_transitory=5, icmp=5 ) pkts = self.create_stream_in_ip6(self.pg0, self.pg1) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) ses_num_before_timeout = self.nat64_get_ses_num() self.virtual_sleep(15) # ICMP and TCP session after timeout ses_num_after_timeout = self.nat64_get_ses_num() self.assertEqual(ses_num_before_timeout - ses_num_after_timeout, 2) def test_icmp_error(self): """NAT64 ICMP Error message translation""" self.tcp_port_in = 6303 self.udp_port_in = 6304 self.icmp_id_in = 6305 self.vapi.nat64_add_del_pool_addr_range( start_addr=self.nat_addr, end_addr=self.nat_addr, vrf_id=0xFFFFFFFF, is_add=1, ) flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat64_add_del_interface( is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index ) self.vapi.nat64_add_del_interface( is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index ) # send some packets to create sessions pkts = self.create_stream_in_ip6(self.pg0, self.pg1) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture_ip4 = self.pg1.get_capture(len(pkts)) self.verify_capture_out( capture_ip4, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4 ) pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture_ip6 = self.pg0.get_capture(len(pkts)) ip = IPv6(src="".join(["64:ff9b::", self.pg1.remote_ip4])) self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src, self.pg0.remote_ip6) # in2out pkts = [ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) / ICMPv6DestUnreach(code=1) / packet[IPv6] for packet in capture_ip6 ] self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) for packet in capture: try: self.assertEqual(packet[IP].src, self.nat_addr) self.assertEqual(packet[IP].dst, self.pg1.remote_ip4) self.assertEqual(packet[ICMP].type, 3) self.assertEqual(packet[ICMP].code, 13) inner = packet[IPerror] self.assertEqual(inner.src, self.pg1.remote_ip4) self.assertEqual(inner.dst, self.nat_addr) self.assert_packet_checksums_valid(packet) if inner.haslayer(TCPerror): self.assertEqual(inner[TCPerror].dport, self.tcp_port_out) elif inner.haslayer(UDPerror): self.assertEqual(inner[UDPerror].dport, self.udp_port_out) else: self.assertEqual(inner[ICMPerror].id, self.icmp_id_out) except: self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise # out2in pkts = [ Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / ICMP(type=3, code=13) / packet[IP] for packet in capture_ip4 ] self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg0.get_capture(len(pkts)) for packet in capture: try: self.assertEqual(packet[IPv6].src, ip.src) self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6) icmp = packet[ICMPv6DestUnreach] self.assertEqual(icmp.code, 1) inner = icmp[IPerror6] self.assertEqual(inner.src, self.pg0.remote_ip6) self.assertEqual(inner.dst, ip.src) self.assert_icmpv6_checksum_valid(packet) if inner.haslayer(TCPerror): self.assertEqual(inner[TCPerror].sport, self.tcp_port_in) elif inner.haslayer(UDPerror): self.assertEqual(inner[UDPerror].sport, self.udp_port_in) else: self.assertEqual(inner[ICMPv6EchoRequest].id, self.icmp_id_in) except: self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise def test_hairpinning(self): """NAT64 hairpinning""" client = self.pg0.remote_hosts[0] server = self.pg0.remote_hosts[1] server_tcp_in_port = 22 server_tcp_out_port = 4022 server_udp_in_port = 23 server_udp_out_port = 4023 client_tcp_in_port = 1234 client_udp_in_port = 1235 client_tcp_out_port = 0 client_udp_out_port = 0 ip = IPv6(src="".join(["64:ff9b::", self.nat_addr])) nat_addr_ip6 = ip.src self.vapi.nat64_add_del_pool_addr_range( start_addr=self.nat_addr, end_addr=self.nat_addr, vrf_id=0xFFFFFFFF, is_add=1, ) flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat64_add_del_interface( is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index ) self.vapi.nat64_add_del_interface( is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index ) self.vapi.nat64_add_del_static_bib( i_addr=server.ip6n, o_addr=self.nat_addr, i_port=server_tcp_in_port, o_port=server_tcp_out_port, proto=IP_PROTOS.tcp, vrf_id=0, is_add=1, ) self.vapi.nat64_add_del_static_bib( i_addr=server.ip6n, o_addr=self.nat_addr, i_port=server_udp_in_port, o_port=server_udp_out_port, proto=IP_PROTOS.udp, vrf_id=0, is_add=1, ) # client to server pkts = [] p = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IPv6(src=client.ip6, dst=nat_addr_ip6) / TCP(sport=client_tcp_in_port, dport=server_tcp_out_port) ) pkts.append(p) p = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IPv6(src=client.ip6, dst=nat_addr_ip6) / UDP(sport=client_udp_in_port, dport=server_udp_out_port) ) pkts.append(p) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg0.get_capture(len(pkts)) for packet in capture: try: self.assertEqual(packet[IPv6].src, nat_addr_ip6) self.assertEqual(packet[IPv6].dst, server.ip6) self.assert_packet_checksums_valid(packet) if packet.haslayer(TCP): self.assertNotEqual(packet[TCP].sport, client_tcp_in_port) self.assertEqual(packet[TCP].dport, server_tcp_in_port) client_tcp_out_port = packet[TCP].sport else: self.assertNotEqual(packet[UDP].sport, client_udp_in_port) self.assertEqual(packet[UDP].dport, server_udp_in_port) client_udp_out_port = packet[UDP].sport except: self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise # server to client pkts = [] p = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IPv6(src=server.ip6, dst=nat_addr_ip6) / TCP(sport=server_tcp_in_port, dport=client_tcp_out_port) ) pkts.append(p) p = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IPv6(src=server.ip6, dst=nat_addr_ip6) / UDP(sport=server_udp_in_port, dport=client_udp_out_port) ) pkts.append(p) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg0.get_capture(len(pkts)) for packet in capture: try: self.assertEqual(packet[IPv6].src, nat_addr_ip6) self.assertEqual(packet[IPv6].dst, client.ip6) self.assert_packet_checksums_valid(packet) if packet.haslayer(TCP): self.assertEqual(packet[TCP].sport, server_tcp_out_port) self.assertEqual(packet[TCP].dport, client_tcp_in_port) else: self.assertEqual(packet[UDP].sport, server_udp_out_port) self.assertEqual(packet[UDP].dport, client_udp_in_port) except: self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise # ICMP error pkts = [] pkts = [ Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IPv6(src=client.ip6, dst=nat_addr_ip6) / ICMPv6DestUnreach(code=1) / packet[IPv6] for packet in capture ] self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg0.get_capture(len(pkts)) for packet in capture: try: self.assertEqual(packet[IPv6].src, nat_addr_ip6) self.assertEqual(packet[IPv6].dst, server.ip6) icmp = packet[ICMPv6DestUnreach] self.assertEqual(icmp.code, 1) inner = icmp[IPerror6] self.assertEqual(inner.src, server.ip6) self.assertEqual(inner.dst, nat_addr_ip6) self.assert_packet_checksums_valid(packet) if inner.haslayer(TCPerror): self.assertEqual(inner[TCPerror].sport, server_tcp_in_port) self.assertEqual(inner[TCPerror].dport, client_tcp_out_port) else: self.assertEqual(inner[UDPerror].sport, server_udp_in_port) self.assertEqual(inner[UDPerror].dport, client_udp_out_port) except: self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise def test_prefix(self): """NAT64 Network-Specific Prefix""" self.vapi.nat64_add_del_pool_addr_range( start_addr=self.nat_addr, end_addr=self.nat_addr, vrf_id=0xFFFFFFFF, is_add=1, ) flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat64_add_del_interface( is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index ) self.vapi.nat64_add_del_interface( is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index ) self.vapi.nat64_add_del_pool_addr_range( start_addr=self.vrf1_nat_addr, end_addr=self.vrf1_nat_addr, vrf_id=self.vrf1_id, is_add=1, ) self.vapi.nat64_add_del_interface( is_add=1, flags=flags, sw_if_index=self.pg2.sw_if_index ) # Add global prefix global_pref64 = "2001:db8::" global_pref64_len = 32 global_pref64_str = "{}/{}".format(global_pref64, global_pref64_len) self.vapi.nat64_add_del_prefix(prefix=global_pref64_str, vrf_id=0, is_add=1) prefix = self.vapi.nat64_prefix_dump() self.assertEqual(len(prefix), 1) self.assertEqual(str(prefix[0].prefix), global_pref64_str) self.assertEqual(prefix[0].vrf_id, 0) # Add tenant specific prefix vrf1_pref64 = "2001:db8:122:300::" vrf1_pref64_len = 56 vrf1_pref64_str = "{}/{}".format(vrf1_pref64, vrf1_pref64_len) self.vapi.nat64_add_del_prefix( prefix=vrf1_pref64_str, vrf_id=self.vrf1_id, is_add=1 ) prefix = self.vapi.nat64_prefix_dump() self.assertEqual(len(prefix), 2) # Global prefix pkts = self.create_stream_in_ip6( self.pg0, self.pg1, pref=global_pref64, plen=global_pref64_len ) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) self.verify_capture_out( capture, nat_ip=self.nat_addr, dst_ip=self.pg1.remote_ip4 ) pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg0.get_capture(len(pkts)) dst_ip = self.compose_ip6(self.pg1.remote_ip4, global_pref64, global_pref64_len) self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6) # Tenant specific prefix pkts = self.create_stream_in_ip6( self.pg2, self.pg1, pref=vrf1_pref64, plen=vrf1_pref64_len ) self.pg2.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) self.verify_capture_out( capture, nat_ip=self.vrf1_nat_addr, dst_ip=self.pg1.remote_ip4 ) pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg2.get_capture(len(pkts)) dst_ip = self.compose_ip6(self.pg1.remote_ip4, vrf1_pref64, vrf1_pref64_len) self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6) def test_unknown_proto(self): """NAT64 translate packet with unknown protocol""" self.vapi.nat64_add_del_pool_addr_range( start_addr=self.nat_addr, end_addr=self.nat_addr, vrf_id=0xFFFFFFFF, is_add=1, ) flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat64_add_del_interface( is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index ) self.vapi.nat64_add_del_interface( is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index ) remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96) # in2out p = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) / TCP(sport=self.tcp_port_in, dport=20) ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() p = self.pg1.get_capture(1) p = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) / GRE() / IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) / TCP(sport=1234, dport=1234) ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() p = self.pg1.get_capture(1) packet = p[0] try: self.assertEqual(packet[IP].src, self.nat_addr) self.assertEqual(packet[IP].dst, self.pg1.remote_ip4) self.assertEqual(packet.haslayer(GRE), 1) self.assert_packet_checksums_valid(packet) except: self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise # out2in p = ( Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / GRE() / IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) / TCP(sport=1234, dport=1234) ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() p = self.pg0.get_capture(1) packet = p[0] try: self.assertEqual(packet[IPv6].src, remote_ip6) self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6) self.assertEqual(packet[IPv6].nh, 47) except: self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise def test_hairpinning_unknown_proto(self): """NAT64 translate packet with unknown protocol - hairpinning""" client = self.pg0.remote_hosts[0] server = self.pg0.remote_hosts[1] server_tcp_in_port = 22 server_tcp_out_port = 4022 client_tcp_in_port = 1234 client_tcp_out_port = 1235 server_nat_ip = "10.0.0.100" client_nat_ip = "10.0.0.110" server_nat_ip6 = self.compose_ip6(server_nat_ip, "64:ff9b::", 96) client_nat_ip6 = self.compose_ip6(client_nat_ip, "64:ff9b::", 96) self.vapi.nat64_add_del_pool_addr_range( start_addr=server_nat_ip, end_addr=client_nat_ip, vrf_id=0xFFFFFFFF, is_add=1, ) flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat64_add_del_interface( is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index ) self.vapi.nat64_add_del_interface( is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index ) self.vapi.nat64_add_del_static_bib( i_addr=server.ip6n, o_addr=server_nat_ip, i_port=server_tcp_in_port, o_port=server_tcp_out_port, proto=IP_PROTOS.tcp, vrf_id=0, is_add=1, ) self.vapi.nat64_add_del_static_bib( i_addr=server.ip6n, o_addr=server_nat_ip, i_port=0, o_port=0, proto=IP_PROTOS.gre, vrf_id=0, is_add=1, ) self.vapi.nat64_add_del_static_bib( i_addr=client.ip6n, o_addr=client_nat_ip, i_port=client_tcp_in_port, o_port=client_tcp_out_port, proto=IP_PROTOS.tcp, vrf_id=0, is_add=1, ) # client to server p = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IPv6(src=client.ip6, dst=server_nat_ip6) / TCP(sport=client_tcp_in_port, dport=server_tcp_out_port) ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() p = self.pg0.get_capture(1) p = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) / GRE() / IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) / TCP(sport=1234, dport=1234) ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() p = self.pg0.get_capture(1) packet = p[0] try: self.assertEqual(packet[IPv6].src, client_nat_ip6) self.assertEqual(packet[IPv6].dst, server.ip6) self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre) except: self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise # server to client p = ( Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) / GRE() / IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) / TCP(sport=1234, dport=1234) ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() p = self.pg0.get_capture(1) packet = p[0] try: self.assertEqual(packet[IPv6].src, server_nat_ip6) self.assertEqual(packet[IPv6].dst, client.ip6) self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre) except: self.logger.error(ppp("Unexpected or invalid packet:", packet)) raise def test_one_armed_nat64(self): """One armed NAT64""" external_port = 0 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4, "64:ff9b::", 96) self.vapi.nat64_add_del_pool_addr_range( start_addr=self.nat_addr, end_addr=self.nat_addr, vrf_id=0xFFFFFFFF, is_add=1, ) flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat64_add_del_interface( is_add=1, flags=flags, sw_if_index=self.pg3.sw_if_index ) self.vapi.nat64_add_del_interface( is_add=1, flags=0, sw_if_index=self.pg3.sw_if_index ) # in2out p = ( Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) / IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) / TCP(sport=12345, dport=80) ) self.pg3.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg3.get_capture(1) p = capture[0] try: ip = p[IP] tcp = p[TCP] self.assertEqual(ip.src, self.nat_addr) self.assertEqual(ip.dst, self.pg3.remote_ip4) self.assertNotEqual(tcp.sport, 12345) external_port = tcp.sport self.assertEqual(tcp.dport, 80) self.assert_packet_checksums_valid(p) except: self.logger.error(ppp("Unexpected or invalid packet:", p)) raise # out2in p = ( Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) / IP(src=self.pg3.remote_ip4, dst=self.nat_addr) / TCP(sport=80, dport=external_port) ) self.pg3.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg3.get_capture(1) p = capture[0] try: ip = p[IPv6] tcp = p[TCP] self.assertEqual(ip.src, remote_host_ip6) self.assertEqual(ip.dst, self.pg3.remote_ip6) self.assertEqual(tcp.sport, 80) self.assertEqual(tcp.dport, 12345) self.assert_packet_checksums_valid(p) except: self.logger.error(ppp("Unexpected or invalid packet:", p)) raise def test_frag_in_order(self): """NAT64 translate fragments arriving in order""" self.tcp_port_in = random.randint(1025, 65535) self.vapi.nat64_add_del_pool_addr_range( start_addr=self.nat_addr, end_addr=self.nat_addr, vrf_id=0xFFFFFFFF, is_add=1, ) flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat64_add_del_interface( is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index ) self.vapi.nat64_add_del_interface( is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index ) # in2out data = b"a" * 200 pkts = self.create_stream_frag_ip6( self.pg0, self.pg1.remote_ip4, self.tcp_port_in, 20, data ) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg1.get_capture(len(pkts)) p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4) self.assertEqual(p[TCP].dport, 20) self.assertNotEqual(p[TCP].sport, self.tcp_port_in) self.tcp_port_out = p[TCP].sport self.assertEqual(data, p[Raw].load) # out2in data = b"A" * 4 + b"b" * 16 + b"C" * 3 pkts = self.create_stream_frag( self.pg1, self.nat_addr, 20, self.tcp_port_out, data ) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg0.get_capture(len(pkts)) self.logger.debug(ppc("Captured:", frags)) src = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96) p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6) self.assertEqual(p[TCP].sport, 20) self.assertEqual(p[TCP].dport, self.tcp_port_in) self.assertEqual(data, p[Raw].load) def test_reass_hairpinning(self): """NAT64 fragments hairpinning""" data = b"a" * 200 server = self.pg0.remote_hosts[1] server_in_port = random.randint(1025, 65535) server_out_port = random.randint(1025, 65535) client_in_port = random.randint(1025, 65535) ip = IPv6(src="".join(["64:ff9b::", self.nat_addr])) nat_addr_ip6 = ip.src self.vapi.nat64_add_del_pool_addr_range( start_addr=self.nat_addr, end_addr=self.nat_addr, vrf_id=0xFFFFFFFF, is_add=1, ) flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat64_add_del_interface( is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index ) self.vapi.nat64_add_del_interface( is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index ) # add static BIB entry for server self.vapi.nat64_add_del_static_bib( i_addr=server.ip6n, o_addr=self.nat_addr, i_port=server_in_port, o_port=server_out_port, proto=IP_PROTOS.tcp, vrf_id=0, is_add=1, ) # send packet from host to server pkts = self.create_stream_frag_ip6( self.pg0, self.nat_addr, client_in_port, server_out_port, data ) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg0.get_capture(len(pkts)) self.logger.debug(ppc("Captured:", frags)) p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6) self.assertNotEqual(p[TCP].sport, client_in_port) self.assertEqual(p[TCP].dport, server_in_port) self.assertEqual(data, p[Raw].load) def test_frag_out_of_order(self): """NAT64 translate fragments arriving out of order""" self.tcp_port_in = random.randint(1025, 65535) self.vapi.nat64_add_del_pool_addr_range( start_addr=self.nat_addr, end_addr=self.nat_addr, vrf_id=0xFFFFFFFF, is_add=1, ) flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat64_add_del_interface( is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index ) self.vapi.nat64_add_del_interface( is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index ) # in2out data = b"a" * 200 pkts = self.create_stream_frag_ip6( self.pg0, self.pg1.remote_ip4, self.tcp_port_in, 20, data ) pkts.reverse() self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg1.get_capture(len(pkts)) p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4) self.assertEqual(p[TCP].dport, 20) self.assertNotEqual(p[TCP].sport, self.tcp_port_in) self.tcp_port_out = p[TCP].sport self.assertEqual(data, p[Raw].load) # out2in data = b"A" * 4 + b"B" * 16 + b"C" * 3 pkts = self.create_stream_frag( self.pg1, self.nat_addr, 20, self.tcp_port_out, data ) pkts.reverse() self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg0.get_capture(len(pkts)) src = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96) p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6) self.assertEqual(p[TCP].sport, 20) self.assertEqual(p[TCP].dport, self.tcp_port_in) self.assertEqual(data, p[Raw].load) def test_interface_addr(self): """Acquire NAT64 pool addresses from interface""" self.vapi.nat64_add_del_interface_addr( is_add=1, sw_if_index=self.pg4.sw_if_index ) # no address in NAT64 pool addresses = self.vapi.nat44_address_dump() self.assertEqual(0, len(addresses)) # configure interface address and check NAT64 address pool self.pg4.config_ip4() addresses = self.vapi.nat64_pool_addr_dump() self.assertEqual(len(addresses), 1) self.assertEqual(str(addresses[0].address), self.pg4.local_ip4) # remove interface address and check NAT64 address pool self.pg4.unconfig_ip4() addresses = self.vapi.nat64_pool_addr_dump() self.assertEqual(0, len(addresses)) @unittest.skipUnless(config.extended, "part of extended tests") def test_ipfix_max_bibs_sessions(self): """IPFIX logging maximum session and BIB entries exceeded""" max_bibs = 1280 max_sessions = 2560 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96) self.vapi.nat64_add_del_pool_addr_range( start_addr=self.nat_addr, end_addr=self.nat_addr, vrf_id=0xFFFFFFFF, is_add=1, ) flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat64_add_del_interface( is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index ) self.vapi.nat64_add_del_interface( is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index ) pkts = [] src = "" for i in range(0, max_bibs): src = "fd01:aa::%x" % (i) p = ( Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IPv6(src=src, dst=remote_host_ip6) / TCP(sport=12345, dport=80) ) pkts.append(p) p = ( Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IPv6(src=src, dst=remote_host_ip6) / TCP(sport=12345, dport=22) ) pkts.append(p) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.pg1.get_capture(max_sessions) self.vapi.set_ipfix_exporter( collector_address=self.pg3.remote_ip4, src_address=self.pg3.local_ip4, path_mtu=512, template_interval=10, ) self.vapi.nat_ipfix_enable_disable( domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1 ) p = ( Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IPv6(src=src, dst=remote_host_ip6) / TCP(sport=12345, dport=25) ) * 3 self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.pg1.assert_nothing_captured() self.vapi.ipfix_flush() capture = self.pg3.get_capture(7) ipfix = IPFIXDecoder() # first load template for p in capture: self.assertTrue(p.haslayer(IPFIX)) self.assertEqual(p[IP].src, self.pg3.local_ip4) self.assertEqual(p[IP].dst, self.pg3.remote_ip4) self.assertEqual(p[UDP].sport, self.ipfix_src_port) self.assertEqual(p[UDP].dport, 4739) self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id) if p.haslayer(Template): ipfix.add_template(p.getlayer(Template)) # verify events in data set event_count = 0 for p in capture: if p.haslayer(Data): data = ipfix.decode_data_set(p.getlayer(Set)) event_count += self.verify_ipfix_max_sessions(data, max_sessions) self.assertEqual(event_count, 1) p = ( Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) / TCP(sport=12345, dport=80) ) * 3 self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.pg1.assert_nothing_captured() self.vapi.ipfix_flush() capture = self.pg3.get_capture(1) # verify events in data set event_count = 0 for p in capture: self.assertTrue(p.haslayer(IPFIX)) self.assertEqual(p[IP].src, self.pg3.local_ip4) self.assertEqual(p[IP].dst, self.pg3.remote_ip4) self.assertEqual(p[UDP].sport, self.ipfix_src_port) self.assertEqual(p[UDP].dport, 4739) self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id) if p.haslayer(Data): data = ipfix.decode_data_set(p.getlayer(Set)) event_count += self.verify_ipfix_max_bibs(data, max_bibs) self.assertEqual(event_count, 1) def test_ipfix_bib_ses(self): """IPFIX logging NAT64 BIB/session create and delete events""" self.tcp_port_in = random.randint(1025, 65535) remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96) self.vapi.nat64_add_del_pool_addr_range( start_addr=self.nat_addr, end_addr=self.nat_addr, vrf_id=0xFFFFFFFF, is_add=1, ) flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat64_add_del_interface( is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index ) self.vapi.nat64_add_del_interface( is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index ) self.vapi.set_ipfix_exporter( collector_address=self.pg3.remote_ip4, src_address=self.pg3.local_ip4, path_mtu=512, template_interval=10, ) self.vapi.nat_ipfix_enable_disable( domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1 ) # Create p = ( Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) / TCP(sport=self.tcp_port_in, dport=25) ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() p = self.pg1.get_capture(1) self.tcp_port_out = p[0][TCP].sport self.vapi.ipfix_flush() capture = self.pg3.get_capture(8) ipfix = IPFIXDecoder() # first load template for p in capture: self.assertTrue(p.haslayer(IPFIX)) self.assertEqual(p[IP].src, self.pg3.local_ip4) self.assertEqual(p[IP].dst, self.pg3.remote_ip4) self.assertEqual(p[UDP].sport, self.ipfix_src_port) self.assertEqual(p[UDP].dport, 4739) self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id) if p.haslayer(Template): ipfix.add_template(p.getlayer(Template)) # verify events in data set for p in capture: if p.haslayer(Data): data = ipfix.decode_data_set(p.getlayer(Set)) if scapy.compat.orb(data[0][230]) == 10: self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6) elif scapy.compat.orb(data[0][230]) == 6: self.verify_ipfix_nat64_ses( data, 1, self.pg0.remote_ip6, self.pg1.remote_ip4, 25 ) else: self.logger.error(ppp("Unexpected or invalid packet: ", p)) # Delete self.pg_enable_capture(self.pg_interfaces) self.vapi.nat64_add_del_pool_addr_range( start_addr=self.nat_addr, end_addr=self.nat_addr, vrf_id=0xFFFFFFFF, is_add=0, ) self.vapi.ipfix_flush() capture = self.pg3.get_capture(2) # verify events in data set for p in capture: self.assertTrue(p.haslayer(IPFIX)) self.assertEqual(p[IP].src, self.pg3.local_ip4) self.assertEqual(p[IP].dst, self.pg3.remote_ip4) self.assertEqual(p[UDP].sport, self.ipfix_src_port) self.assertEqual(p[UDP].dport, 4739) self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id) if p.haslayer(Data): data = ipfix.decode_data_set(p.getlayer(Set)) if scapy.compat.orb(data[0][230]) == 11: self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6) elif scapy.compat.orb(data[0][230]) == 7: self.verify_ipfix_nat64_ses( data, 0, self.pg0.remote_ip6, self.pg1.remote_ip4, 25 ) else: self.logger.error(ppp("Unexpected or invalid packet: ", p)) def test_syslog_sess(self): """Test syslog session creation and deletion""" self.tcp_port_in = random.randint(1025, 65535) remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4, "64:ff9b::", 96) self.vapi.nat64_add_del_pool_addr_range( start_addr=self.nat_addr, end_addr=self.nat_addr, vrf_id=0xFFFFFFFF, is_add=1, ) flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat64_add_del_interface( is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index ) self.vapi.nat64_add_del_interface( is_add=1, flags=0, sw_if_index=self.pg1.sw_if_index ) self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO) self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4) p = ( Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port) ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() p = self.pg1.get_capture(1) self.tcp_port_out = p[0][TCP].sport capture = self.pg3.get_capture(1) self.verify_syslog_sess(capture[0][Raw].load, is_ip6=True) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.vapi.nat64_add_del_pool_addr_range( start_addr=self.nat_addr, end_addr=self.nat_addr, vrf_id=0xFFFFFFFF, is_add=0, ) capture = self.pg3.get_capture(1) self.verify_syslog_sess(capture[0][Raw].load, False, True) def nat64_get_ses_num(self): """ Return number of active NAT64 sessions. """ st = self.vapi.nat64_st_dump(proto=255) return len(st) def clear_nat64(self): """ Clear NAT64 configuration. """ self.vapi.nat_ipfix_enable_disable( domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0 ) self.ipfix_src_port = 4739 self.ipfix_domain_id = 1 self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_EMERG) self.vapi.nat64_set_timeouts( udp=300, tcp_established=7440, tcp_transitory=240, icmp=60 ) interfaces = self.vapi.nat64_interface_dump() for intf in interfaces: self.vapi.nat64_add_del_interface( is_add=0, flags=intf.flags, sw_if_index=intf.sw_if_index ) bib = self.vapi.nat64_bib_dump(proto=255) for bibe in bib: if bibe.flags & self.config_flags.NAT_IS_STATIC: self.vapi.nat64_add_del_static_bib( i_addr=bibe.i_addr, o_addr=bibe.o_addr, i_port=bibe.i_port, o_port=bibe.o_port, proto=bibe.proto, vrf_id=bibe.vrf_id, is_add=0, ) adresses = self.vapi.nat64_pool_addr_dump() for addr in adresses: self.vapi.nat64_add_del_pool_addr_range( start_addr=addr.address, end_addr=addr.address, vrf_id=addr.vrf_id, is_add=0, ) prefixes = self.vapi.nat64_prefix_dump() for prefix in prefixes: self.vapi.nat64_add_del_prefix( prefix=str(prefix.prefix), vrf_id=prefix.vrf_id, is_add=0 ) bibs = self.statistics.get_counter("/nat64/total-bibs") self.assertEqual(bibs[0][0], 0) sessions = self.statistics.get_counter("/nat64/total-sessions") self.assertEqual(sessions[0][0], 0) if __name__ == "__main__": unittest.main(testRunner=VppTestRunner)