#!/usr/bin/env python3 import ipaddress import random import socket import struct import unittest from io import BytesIO import scapy.compat from config import config from framework import tag_fixme_vpp_workers, tag_fixme_ubuntu2204, is_distro_ubuntu2204 from framework import VppTestCase, VppTestRunner from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder from scapy.data import IP_PROTOS from scapy.layers.inet import IP, TCP, UDP, ICMP from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment from scapy.layers.inet6 import ( IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6, ) from scapy.layers.l2 import Ether, GRE from scapy.packet import Raw from syslog_rfc5424_parser import SyslogMessage, ParseError from syslog_rfc5424_parser.constants import SyslogSeverity from util import ppc, ppp from vpp_papi import VppEnum @tag_fixme_vpp_workers @tag_fixme_ubuntu2204 class TestNAT64(VppTestCase): """NAT64 Test Cases""" @property def SYSLOG_SEVERITY(self): return VppEnum.vl_api_syslog_severity_t @property def config_flags(self): return VppEnum.vl_api_nat_config_flags_t @classmethod def setUpClass(cls): super(TestNAT64, cls).setUpClass() if is_distro_ubuntu2204 == True and not hasattr(cls, "vpp"): return cls.tcp_port_in = 6303 cls.tcp_port_out = 6303 cls.udp_port_in = 6304 cls.udp_port_out = 6304 cls.icmp_id_in = 6305 cls.icmp_id_out = 6305 cls.tcp_external_port = 80 cls.nat_addr = "10.0.0.3" cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr) cls.vrf1_id = 10 cls.vrf1_nat_addr = "10.0.10.3" cls.ipfix_src_port = 4739 cls.ipfix_domain_id = 1 cls.create_pg_interfaces(range(6)) cls.ip6_interfaces = list(cls.pg_interfaces[0:1]) cls.ip6_interfaces.append(cls.pg_interfaces[2]) cls.ip4_interfaces = list(cls.pg_interfaces[1:2]) cls.vapi.ip_table_add_del( is_add=1, table={"table_id": cls.vrf1_id, "is_ip6": 1} ) cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id) cls.pg0.generate_remote_hosts(2) for i in cls.ip6_interfaces: i.admin_up() i.config_ip6() i.configure_ipv6_neighbors() for i in cls.ip4_interfaces: i.admin_up() i.config_ip4() i.resolve_arp() cls.pg3.admin_up() cls.pg3.config_ip4() cls.pg3.resolve_arp() cls.pg3.config_ip6() cls.pg3.configure_ipv6_neighbors() cls.pg5.admin_up() cls.pg5.config_ip6() @classmethod def tearDownClass(cls): super(TestNAT64, cls).tearDownClass() def setUp(self): super(TestNAT64, self).setUp() self.vapi.nat64_plugin_enable_disable(enable=1, bib_buckets=128, st_buckets=256) def tearDown(self): super(TestNAT64, self).tearDown() if not self.vpp_dead: self.vapi.nat64_plugin_enable_disable(enable=0) def show_commands_at_teardown(self): self.logger.info(self.vapi.cli("show nat64 pool")) self.logger.info(self.vapi.cli("show nat64 interfaces")) self.logger.info(self.vapi.cli("show nat64 prefix")) self.logger.info(self.vapi.cli("show nat64 bib all")) self.logger.info(self.vapi.cli("show nat64 session table all")) def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0): """ Create IPv6 packet stream for inside network :param in_if: Inside interface :param out_if: Outside interface :param ttl: Hop Limit of generated packets :param pref: NAT64 prefix :param plen: NAT64 prefix length """ pkts = [] if pref is None: dst = "".join(["64:ff9b::", out_if.remote_ip4]) else: dst = self.compose_ip6(out_if.remote_ip4, pref, plen) # TCP p = ( Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) / TCP(sport=self.tcp_port_in, dport=20) ) pkts.append(p) # UDP p = ( Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) / UDP(sport=self.udp_port_in, dport=20) ) pkts.append(p) # ICMP p = ( Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) / ICMPv6EchoRequest(id=self.icmp_id_in) ) pkts.append(p) return pkts def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False): """ Create packet stream for outside network :param out_if: Outside interface :param dst_ip: Destination IP address (Default use global NAT address) :param ttl: TTL of generated packets :param use_inside_ports: Use inside NAT ports as destination ports instead of outside ports """ if dst_ip is None: dst_ip = self.nat_addr if not use_inside_ports: tcp_port = self.tcp_port_out udp_port = self.udp_port_out icmp_id = self.icmp_id_out else: tcp_port = self.tcp_port_in udp_port = self.udp_port_in icmp_id = self.icmp_id_in pkts = [] # TCP p = ( Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / TCP(dport=tcp_port, sport=20) ) pkts.extend([p, p]) # UDP p = ( Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / UDP(dport=udp_port, sport=20) ) pkts.append(p) # ICMP p = ( Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / ICMP(id=icmp_id, type="echo-reply") ) pkts.append(p) return pkts def verify_capture_out( self, capture, nat_ip=None, same_port=False, dst_ip=None, is_ip6=False, ignore_port=False, ): """ Verify captured packets on outside network :param capture: Captured packets :param nat_ip: Translated IP address (Default use global NAT address) :param same_port: Source port number is not translated (Default False) :param dst_ip: Destination IP address (Default do not verify) :param is_ip6: If L3 protocol is IPv6 (Default False) """ if is_ip6: IP46 = IPv6 ICMP46 = ICMPv6EchoRequest else: IP46 = IP ICMP46 = ICMP if nat_ip is None: nat_ip = self.na
"""
Neighbour Entries
object abstractions for ARP and ND
"""
from ipaddress import ip_address
from vpp_object import *
from vpp_papi import mac_pton, VppEnum
def find_nbr(test, sw_if_index, nbr_addr, is_static=0, mac=None):
ip_addr = ip_address(unicode(nbr_addr))
e = VppEnum.vl_api_ip_neighbor_flags_t
nbrs = test.vapi.ip_neighbor_dump(sw_if_index,
is_ipv6=(6 is ip_addr.version))
for n in nbrs:
if ip_addr == n.neighbor.ip_address and \
is_static == (n.neighbor.flags & e.IP_API_NEIGHBOR_FLAG_STATIC):
if mac:
if mac == str(n.neighbor.mac_address):
return True
else:
return True
return False
class VppNeighbor(VppObject):
"""
ARP Entry
"""
def __init__(self, test, sw_if_index, mac_addr, nbr_addr,
is_static=False, is_no_fib_entry=False):
self._test = test
self.sw_if_index = sw_if_index
self.mac_addr = mac_addr
self.nbr_addr = nbr_addr
e = VppEnum.vl_api_ip_neighbor_flags_t
self.flags = e.IP_API_NEIGHBOR_FLAG_NONE
if is_static:
self.flags |= e.IP_API_NEIGHBOR_FLAG_STATIC
if is_no_fib_entry:
self.flags |= e.IP_API_NEIGHBOR_FLAG_NO_FIB_ENTRY
def add_vpp_config(self):
r = self._test.vapi.ip_neighbor_add_del(
self.sw_if_index,
self.mac_addr,
self.nbr_addr,
is_add=1,
flags=self.flags)
self.stats_index = r.stats_index
self._test.registry.register(self, self._test.logger)
def remove_vpp_config(self):
self._test.vapi.ip_neighbor_add_del(
self.sw_if_index,
self.mac_addr,
self.nbr_addr,
is_add=0,
flags=self.flags)
def is_static(self):
e = VppEnum.vl_api_ip_neighbor_flags_t
return (self.flags & e.IP_API_NEIGHBOR_FLAG_STATIC)
def query_vpp_config(self):
return find_nbr(self._test,
self.sw_if_index,
self.nbr_addr,
self.is_static())
def __str__(self):
return self.object_id()
def object_id(self):
return ("%d:%s" % (self.sw_if_index, self.nbr_addr))
def get_stats(self):
c = self._test.statistics.get_counter("/net/adjacency")
return c[0][self.stats_index]