#!/usr/bin/env python3 import unittest from io import BytesIO from random import randint, choice import re import scapy.compat from framework import VppTestCase, VppLoInterface from asfframework import VppTestRunner, tag_fixme_ubuntu2204, is_distro_ubuntu2204 from scapy.data import IP_PROTOS from scapy.layers.inet import IP, TCP, UDP, ICMP, GRE from scapy.layers.inet import IPerror, TCPerror from scapy.layers.l2 import Ether from scapy.packet import Raw from statistics import variance from syslog_rfc5424_parser import SyslogMessage, ParseError from syslog_rfc5424_parser.constants import SyslogSeverity from util import ppp, pr, ip4_range from vpp_acl import AclRule, VppAcl, VppAclInterface from vpp_ip_route import VppIpRoute, VppRoutePath from vpp_papi import VppEnum from util import StatsDiff class TestNAT44ED(VppTestCase): """NAT44ED Test Case""" nat_addr = "10.0.10.3" tcp_port_in = 6303 tcp_port_out = 6303 udp_port_in = 6304 udp_port_out = 6304 icmp_id_in = 6305 icmp_id_out = 6305 tcp_external_port = 80 max_sessions = 100 def setUp(self): super().setUp() self.plugin_enable() def tearDown(self): super().tearDown() if not self.vpp_dead: self.plugin_disable() def plugin_enable(self, max_sessions=None): max_sessions = max_sessions or self.max_sessions self.vapi.nat44_ed_plugin_enable_disable(sessions=max_sessions, enable=1) def plugin_disable(self): self.vapi.nat44_ed_plugin_enable_disable(enable=0) @property def config_flags(self): return VppEnum.vl_api_nat_config_flags_t @property def nat44_config_flags(self): return VppEnum.vl_api_nat44_config_flags_t @property def syslog_severity(self): return VppEnum.vl_api_syslog_severity_t @property def server_addr(self): return self.pg1.remote_hosts[0].ip4 @staticmethod def random_port(): return randint(1024, 65535) @staticmethod def proto2layer(proto): if proto == IP_PROTOS.tcp: return TCP elif proto == IP_PROTOS.udp: return UDP elif proto == IP_PROTOS.icmp: return ICMP else: raise Exception("Unsupported protocol") @classmethod def create_and_add_ip4_table(cls, i, table_id=0): cls.vapi.ip_table_add_del(is_add=1, table={"table_id": table_id}) i.set_table_ip4(table_id) @classmethod def configure_ip4_interface(cls, i, hosts=0, table_id=None): if table_id: cls.create_and_add_ip4_table(i, table_id) i.admin_up() i.config_ip4() i.resolve_arp() if hosts: i.generate_remote_hosts(hosts) i.configure_ipv4_neighbors() @classmethod def nat_add_interface_address(cls, i): cls.vapi.nat44_add_del_interface_addr(sw_if_index=i.sw_if_index, is_add=1) def nat_add_inside_interface(self, i): self.vapi.nat44_interface_add_del_feature( flags=self.config_flags.NAT_IS_INSIDE, sw_if_index=i.sw_if_index, is_add=1 ) def nat_add_outside_interface(self, i): self.vapi.nat44_interface_add_del_feature( flags=self.config_flags.NAT_IS_OUTSIDE, sw_if_index=i.sw_if_index, is_add=1 ) def nat_add_address(self, address, twice_nat=0, vrf_id=0xFFFFFFFF, is_add=1): flags = self.config_flags.NAT_IS_TWICE_NAT if twice_nat else 0 self.vapi.nat44_add_del_address_range( first_ip_address=address, last_ip_address=address, vrf_id=vrf_id, is_add=is_add, flags=flags, ) def nat_add_static_mapping( self, local_ip, external_ip="0.0.0.0", local_port=0, external_port=0, vrf_id=0, is_add=1, external_sw_if_index=0xFFFFFFFF, proto=0, tag="", flags=0, ): if not (local_port and external_port): flags |= self.config_flags.NAT_IS_ADDR_ONLY self.vapi.nat44_add_del_static_mapping( is_add=is_add, local_ip_address=local_ip, external_ip_address=external_ip, external_sw_if_index=external_sw_if_index, local_port=local_port, external_port=external_port, vrf_id=vrf_id, protocol=proto, flags=flags, tag=tag, ) @classmethod def setUpClass(cls): super().setUpClass() if is_distro_ubuntu2204 == True and not hasattr(cls, "vpp"): return cls.create_pg_interfaces(range(12)) cls.interfaces = list(cls.pg_interfaces[:4]) cls.create_and_add_ip4_table(cls.pg2, 10) for i in cls.interfaces: cls.configure_ip4_interface(i, hosts=3) # test specific (test-multiple-vrf) cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 1}) # test specific (test-one-armed-nat44-static) cls.pg4.generate_remote_hosts(2) cls.pg4.config_ip4() cls.vapi.sw_interface_add_del_address( sw_if_index=cls.pg4.sw_if_index, prefix="10.0.0.1/24" ) cls.pg4.admin_up() cls.pg4.resolve_arp() cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4 cls.pg4.resolve_arp() # test specific interface (pg5) cls.pg5._local_ip4 = "10.1.1.1" cls.pg5._remote_hosts[0]._ip4 = "10.1.1.2" cls.pg5.set_table_ip4(1) cls.pg5.config_ip4() cls.pg5.admin_up() cls.pg5.resolve_arp() # test specific interface (pg6) cls.pg6._local_ip4 = "10.1.2.1" cls.pg6._remote_hosts[0]._ip4 = "10.1.2.2" cls.pg6.set_table_ip4(1) cls.pg6.config_ip4() cls.pg6.admin_up() cls.pg6.resolve_arp() rl = list() rl.append( VppIpRoute( cls, "0.0.0.0", 0, [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=0)], register=False, table_id=1, ) ) rl.append( VppIpRoute( cls, "0.0.0.0", 0, [VppRoutePath(cls.pg1.local_ip4, cls.pg1.sw_if_index)], register=False, ) ) rl.append( VppIpRoute( cls, cls.pg5.remote_ip4, 32, [VppRoutePath("0.0.0.0", cls.pg5.sw_if_index)], register=False, table_id=1, ) ) rl.append( VppIpRoute( cls, cls.pg6.remote_ip4, 32, [VppRoutePath("0.0.0.0", cls.pg6.sw_if_index)], register=False, table_id=1, ) ) rl.append( VppIpRoute( cls, cls.pg6.remote_ip4, 16, [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=1)], register=False, table_id=0, ) ) for r in rl: r.add_vpp_config() cls.no_diff = StatsDiff( { pg.sw_if_index: { "/nat44-ed/in2out/fastpath/tcp": 0, "/nat44-ed/in2out/fastpath/udp": 0, "/nat44-ed/in2out/fastpath/icmp": 0, "/nat44-ed/in2out/fastpath/drops": 0, "/nat44-ed/in2out/slowpath/tcp": 0, "/nat44-ed/in2out/slowpath/udp": 0, "/nat44-ed/in2out/slowpath/icmp": 0, "/nat44-ed/in2out/slowpath/drops": 0, "/nat44-ed/in2out/fastpath/tcp": 0, "/nat44-ed/in2out/fastpath/udp": 0, "/nat44-ed/in2out/fastpath/icmp": 0, "/nat44-ed/in2out/fastpath/drops": 0, "/nat44-ed/in2out/slowpath/tcp": 0, "/nat44-ed/in2out/slowpath/udp": 0, "/nat44-ed/in2out/slowpath/icmp": 0, "/nat44-ed/in2out/slowpath/drops": 0, } for pg in cls.pg_interfaces } ) def get_err_counter(self, path): return self.statistics.get_err_counter(path) def reass_hairpinning( self, server_addr, server_in_port, server_out_port, host_in_port, proto=IP_PROTOS.tcp, ignore_port=False, ): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: data = b"A" * 4 + b"B" * 16 + b"C" * 3 else: data = b"A" * 16 + b"B" * 16 + b"C" * 3 # send packet from host to server pkts = self.create_stream_frag( self.pg0, self.nat_addr, host_in_port, server_out_port, data, proto ) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg0.get_capture(len(pkts)) p = self.reass_frags_and_verify(frags, self.nat_addr, server_addr) if proto != IP_PROTOS.icmp: if not ignore_port: self.assertNotEqual(p[layer].sport, host_in_port) self.assertEqual(p[layer].dport, server_in_port) else: if not ignore_port: self.assertNotEqual(p[layer].id, host_in_port) self.assertEqual(data, p[Raw].load) def frag_out_of_order( self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False ): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: data = b"A" * 4 + b"B" * 16 + b"C" * 3 else: data = b"A" * 16 + b"B" * 16 + b"C" * 3 self.port_in = self.random_port() for i in range(2): # in2out pkts = self.create_stream_frag( self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto ) pkts.reverse() self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg1.get_capture(len(pkts)) if not dont_translate: p = self.reass_frags_and_verify( frags, self.nat_addr, self.pg1.remote_ip4 ) else: p = self.reass_frags_and_verify( frags, self.pg0.remote_ip4, self.pg1.remote_ip4 ) if proto != IP_PROTOS.icmp: if not dont_translate: self.assertEqual(p[layer].dport, 20) if not ignore_port: self.assertNotEqual(p[layer].sport, self.port_in) else: self.assertEqual(p[layer].sport, self.port_in) else: if not ignore_port: if not dont_translate: self.assertNotEqual(p[layer].id, self.port_in) else: self.assertEqual(p[layer].id, self.port_in) self.assertEqual(data, p[Raw].load) # out2in if not dont_translate: dst_addr = self.nat_addr else: dst_addr = self.pg0.remote_ip4 if proto != IP_PROTOS.icmp: sport = 20 dport = p[layer].sport else: sport = p[layer].id dport = 0 pkts = self.create_stream_frag( self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True ) pkts.reverse() self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.logger.info(self.vapi.cli("show trace")) self.pg_start() frags = self.pg0.get_capture(len(pkts)) p = self.reass_frags_and_verify( frags, self.pg1.remote_ip4, self.pg0.remote_ip4 ) if proto != IP_PROTOS.icmp: self.assertEqual(p[layer].sport, 20) self.assertEqual(p[layer].dport, self.port_in) else: self.assertEqual(p[layer].id, self.port_in) self.assertEqual(data, p[Raw].load) def reass_frags_and_verify(self, frags, src, dst): buffer = BytesIO() for p in frags: self.assertEqual(p[IP].src, src) self.assertEqual(p[IP].dst, dst) self.assert_ip_checksum_valid(p) buffer.seek(p[IP].frag * 8) buffer.write(bytes(p[IP].payload)) ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto) if ip.proto == IP_PROTOS.tcp: p = ip / TCP(buffer.getvalue()) self.logger.debug(ppp("Reassembled:", p)) self.assert_tcp_checksum_valid(p) elif ip.proto == IP_PROTOS.udp: p = ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:]) elif ip.proto == IP_PROTOS.icmp: p = ip / ICMP(buffer.getvalue()) return p def frag_in_order( self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False ): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: data = b"A" * 4 + b"B" * 16 + b"C" * 3 else: data = b"A" * 16 + b"B" * 16 + b"C" * 3 self.port_in = self.random_port() # in2out pkts = self.create_stream_frag( self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto ) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg1.get_capture(len(pkts)) if not dont_translate: p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4) else: p = self.reass_frags_and_verify( frags, self.pg0.remote_ip4, self.pg1.remote_ip4 ) if proto != IP_PROTOS.icmp:
"""
BIER Tables and Routes
"""
import socket
from vpp_object import VppObject
from vpp_ip_route import MPLS_LABEL_INVALID, VppRoutePath, VppMplsLabel
class BIER_HDR_PAYLOAD:
BIER_HDR_PROTO_MPLS_DOWN_STREAM = 1
BIER_HDR_PROTO_MPLS_UP_STREAM = 2
BIER_HDR_PROTO_ETHERNET = 3
BIER_HDR_PROTO_IPV4 = 4
BIER_HDR_PROTO_IPV6 = 5
BIER_HDR_PROTO_VXLAN = 6
BIER_HDR_PROTO_CTRL = 7
BIER_HDR_PROTO_OAM = 8
class VppBierTableID():
def __init__(self, sub_domain_id, set_id, hdr_len_id):
self.set_id = set_id
self.sub_domain_id = sub_domain_id
self.hdr_len_id = hdr_len_id
def find_bier_table(test, bti):
tables = test.vapi.bier_table_dump()
for t in tables:
if bti.set_id == t.bt_tbl_id.bt_set \
and bti.sub_domain_id == t.bt_tbl_id.bt_sub_domain \
and bti.hdr_len_id == t.bt_tbl_id.bt_hdr_len_id:
return True
return False
def find_bier_route(test, bti, bp):
routes = test.vapi.bier_route_dump(bti)
for r in routes:
if bti.set_id == r.br_route.br_tbl_id.bt_set \
and bti.sub_domain_id == r.br_route.br_tbl_id.bt_sub_domain \
and bti.hdr_len_id == r.br_route.br_tbl_id.bt_hdr_len_id \
and bp == r.br_route.br_bp:
return True
return False
def find_bier_disp_table(test, bdti):
tables = test.vapi.bier_disp_table_dump()
for t in tables:
if bdti == t.bdt_tbl_id:
return True
return False
def find_bier_disp_entry(test, bdti, bp):
entries = test.vapi.bier_disp_entry_dump(bdti)
for e in entries:
if bp == e.bde_bp \
and bdti == e.bde_tbl_id:
return True
return False
def find_bier_imp(test, bti, bp):
imps = test.vapi.bier_imp_dump()
for i in imps:
if bti.set_id == i.bi_tbl_id.bt_set \
and bti.sub_domain_id == i.bi_tbl_id.bt_sub_domain \
and bti.hdr_len_id == i.bi_tbl_id.bt_hdr_len_id \
and bp == i.bi_src:
return True
return False
class VppBierTable(VppObject):
"""
BIER Table
"""
def __init__(self, test, id, mpls_label):
self._test = test
self.id = id
self.mpls_label = mpls_label
def add_vpp_config(self):
self._test.vapi.bier_table_add_del(
self.id,
self.mpls_label,
is_add=1)
self._test.registry.register(self, self._test.logger)
def remove_vpp_config(self):
self._test.vapi.bier_table_add_del(
self.id,
self.mpls_label,
is_add=0)
def object_id(self):
return "bier-table;[%d:%d:%d]" % (self.id.set_id,
self.id.sub_domain_id,
self.id.hdr_len_id)
def query_vpp_config(self):
return find_bier_table(self._test, self.id)
class VppBierRoute(VppObject):
"""
BIER route
"""
def __init__(self, test, tbl_id, bp, paths):
self._test = test
self.tbl_id = tbl_id
self.bp = bp
self.paths = paths
self.encoded_paths = []
for path in self.paths:
self.encoded_paths.append(path.encode())
def add_vpp_config(self):
self._test.vapi.bier_route_add_del(
self.tbl_id,
self.bp,
self.encoded_paths,
is_add=1)
self._test.registry.register(self, self._test.logger)
def remove_vpp_config(self):
self._test.vapi.bier_route_add_del(
self.tbl_id,
self.bp,
self.encoded_paths,
is_add=0)
def update_paths(self, paths):
self.paths = paths
self.encoded_paths = []
for path in self.paths:
self.encoded_paths.append(path.encode())
self._test.vapi.bier_route_add_del(
self.tbl_id,
self.bp,
self.encoded_paths,
is_replace=1)
def add_path(self, path):
self.encoded_paths.append(path.encode())
self._test.vapi.bier_route_add_del(
self.tbl_id,
self.bp,
[path.encode()],
is_add=1,
is_replace=0)
self.paths.append(path)
self._test.registry.register(self, self._test.logger)
def remove_path(self, path):
self.encoded_paths.remove(path.encode())
self._test.vapi.bier_route_add_del(
self.tbl_id,
self.bp,
[path.encode()],
is_add=0,
is_replace=0)
self.paths.remove(path)
def remove_all_paths(self):
self._test.vapi.bier_route_add_del(
self.tbl_id,
self.bp,
[],
is_add=0,
is_replace=1)
self.paths = []
def object_id(self):
return "bier-route;[%d:%d:%d:%d]" % (self.tbl_id.set_id,
self.tbl_id.sub_domain_id,
self.tbl_id.hdr_len_id,
self.bp)
def query_vpp_config(self):
return find_bier_route(self._test, self.tbl_id, self.bp)
class VppBierImp(VppObject):
"""
BIER route
"""
def __init__(self, test, tbl_id, src, ibytes):
self._test = test
self.tbl_id = tbl_id
self.ibytes = ibytes
self.src = src
def add_vpp_config(self):
res = self._test.vapi.bier_imp_add(
self.tbl_id,
self.src,
self.ibytes)
self.bi_index = res.bi_index
self._test.registry.register(self, self._test.logger)
def remove_vpp_config(self):
self._test.vapi.bier_imp_del(
self.bi_index)
def object_id(self):
return "bier-imp;[%d:%d:%d:%d]" % (self.tbl_id.set_id,
self.tbl_id.sub_domain_id,
self.tbl_id.hdr_len_id,
self.src)
def query_vpp_config(self):
return find_bier_imp(self._test, self.tbl_id, self.src)
class VppBierDispTable(VppObject):
"""
BIER Disposition Table
"""
def __init__(self, test, id):
self._test = test
self.id = id
def add_vpp_config(self):
self._test.vapi.bier_disp_table_add_del(
self.id,
is_add=1)
self._test.registry.register(self, self._test.logger)
def remove_vpp_config(self):
self._test.vapi.bier_disp_table_add_del(
self.id,
is_add=0)
def object_id(self):
return "bier-disp-table;[%d]" % (self.id)
def query_vpp_config(self):
return find_bier_disp_table(self._test, self.id)
class VppBierDispEntry(VppObject):
"""
BIER Disposition Entry
"""
def __init__(self, test, tbl_id, bp, payload_proto, nh_proto,
nh, nh_tbl, rpf_id=~0):
self._test = test
self.tbl_id = tbl_id
self.nh_tbl = nh_tbl
self.nh_proto = nh_proto
self.bp = bp
self.payload_proto = payload_proto
self.rpf_id = rpf_id
self.nh = socket.inet_pton(socket.AF_INET, nh)
def add_vpp_config(self):
self._test.vapi.bier_disp_entry_add_del(
self.tbl_id,
self.bp,
self.payload_proto,
self.nh_proto,
self.nh,
self.nh_tbl,
self.rpf_id,
is_add=1)
self._test.registry.register(self, self._test.logger)
def remove_vpp_config(self):
self._test.vapi.bier_disp_entry_add_del(
self.tbl_id,
self.bp,
self.payload_proto,
self.nh_proto,
self.nh,
self.nh_tbl,
self.rpf_id,
is_add=0)
def object_id(self):
return "bier-disp-entry;[%d:%d]" % (self.tbl_id,
self.bp)
def query_vpp_config(self):
return find_bier_disp_entry(self._test, self.tbl_id, self.bp)