#!/usr/bin/env python3 import unittest from io import BytesIO from random import randint, shuffle, choice import scapy.compat from framework import VppTestCase, VppTestRunner from scapy.data import IP_PROTOS from scapy.layers.inet import IP, TCP, UDP, ICMP, GRE from scapy.layers.inet import IPerror, TCPerror from scapy.layers.l2 import Ether from scapy.packet import Raw from syslog_rfc5424_parser import SyslogMessage, ParseError from syslog_rfc5424_parser.constants import SyslogSeverity from util import ppp, ip4_range from vpp_acl import AclRule, VppAcl, VppAclInterface from vpp_ip_route import VppIpRoute, VppRoutePath from vpp_papi import VppEnum class NAT44EDTestCase(VppTestCase): nat_addr = '10.0.0.3' tcp_port_in = 6303 tcp_port_out = 6303 udp_port_in = 6304 udp_port_out = 6304 icmp_id_in = 6305 icmp_id_out = 6305 tcp_external_port = 80 max_sessions = 100 def setUp(self): super(NAT44EDTestCase, self).setUp() self.plugin_enable() def tearDown(self): super(NAT44EDTestCase, self).tearDown() if not self.vpp_dead: self.plugin_disable() def plugin_enable(self): self.vapi.nat44_ed_plugin_enable_disable( sessions=self.max_sessions, enable=1) def plugin_disable(self): self.vapi.nat44_ed_plugin_enable_disable(enable=0) @property def config_flags(self): return VppEnum.vl_api_nat_config_flags_t @property def nat44_config_flags(self): return VppEnum.vl_api_nat44_config_flags_t @property def syslog_severity(self): return VppEnum.vl_api_syslog_severity_t @property def server_addr(self): return self.pg1.remote_hosts[0].ip4 @staticmethod def random_port(): return randint(1025, 65535) @staticmethod def proto2layer(proto): if proto == IP_PROTOS.tcp: return TCP elif proto == IP_PROTOS.udp: return UDP elif proto == IP_PROTOS.icmp: return ICMP else: raise Exception("Unsupported protocol") @classmethod def create_and_add_ip4_table(cls, i, table_id=0): cls.vapi.ip_table_add_del(is_add=1, table={'table_id': table_id}) i.set_table_ip4(table_id) @classmethod def configure_ip4_interface(cls, i, hosts=0, table_id=None): if table_id: cls.create_and_add_ip4_table(i, table_id) i.admin_up() i.config_ip4() i.resolve_arp() if hosts: i.generate_remote_hosts(hosts) i.configure_ipv4_neighbors() @classmethod def nat_add_interface_address(cls, i): cls.vapi.nat44_add_del_interface_addr( sw_if_index=i.sw_if_index, is_add=1) def nat_add_inside_interface(self, i): self.vapi.nat44_interface_add_del_feature( flags=self.config_flags.NAT_IS_INSIDE, sw_if_index=i.sw_if_index, is_add=1) def nat_add_outside_interface(self, i): self.vapi.nat44_interface_add_del_feature( flags=self.config_flags.NAT_IS_OUTSIDE, sw_if_index=i.sw_if_index, is_add=1) def nat_add_address(self, address, twice_nat=0, vrf_id=0xFFFFFFFF, is_add=1): flags = self.config_flags.NAT_IS_TWICE_NAT if twice_nat else 0 self.vapi.nat44_add_del_address_range(first_ip_address=address, last_ip_address=address, vrf_id=vrf_id, is_add=is_add, flags=flags) def nat_add_static_mapping(self, local_ip, external_ip='0.0.0.0', local_port=0, external_port=0, vrf_id=0, is_add=1, external_sw_if_index=0xFFFFFFFF, proto=0, tag="", flags=0): if not (local_port and external_port): flags |= self.config_flags.NAT_IS_ADDR_ONLY self.vapi.nat44_add_del_static_mapping( is_add=is_add, local_ip_address=local_ip, external_ip_address=external_ip, external_sw_if_index=external_sw_if_index, local_port=local_port, external_port=external_port, vrf_id=vrf_id, protocol=proto, flags=flags, tag=tag) @classmethod def setUpClass(cls): super(NAT44EDTestCase, cls).setUpClass() cls.create_pg_interfaces(range(12)) cls.interfaces = list(cls.pg_interfaces[:4]) cls.create_and_add_ip4_table(cls.pg2, 10) for i in cls.interfaces: cls.configure_ip4_interface(i, hosts=3) # test specific (test-multiple-vrf) cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 1}) # test specific (test-one-armed-nat44-static) cls.pg4.generate_remote_hosts(2) cls.pg4.config_ip4() cls.vapi.sw_interface_add_del_address( sw_if_index=cls.pg4.sw_if_index, prefix="10.0.0.1/24") cls.pg4.admin_up() cls.pg4.resolve_arp() cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4 cls.pg4.resolve_arp() # test specific interface (pg5) cls.pg5._local_ip4 = "10.1.1.1" cls.pg5._remote_hosts[0]._ip4 = "10.1.1.2" cls.pg5.set_table_ip4(1) cls.pg5.config_ip4() cls.pg5.admin_up() cls.pg5.resolve_arp() # test specific interface (pg6) cls.pg6._local_ip4 = "10.1.2.1" cls.pg6._remote_hosts[0]._ip4 = "10.1.2.2" cls.pg6.set_table_ip4(1) cls.pg6.config_ip4() cls.pg6.admin_up() cls.pg6.resolve_arp() rl = list() rl.append(VppIpRoute(cls, "0.0.0.0", 0, [VppRoutePath("0.0.0.0", 0xffffffff, nh_table_id=0)], register=False, table_id=1)) rl.append(VppIpRoute(cls, "0.0.0.0", 0, [VppRoutePath(cls.pg1.local_ip4, cls.pg1.sw_if_index)], register=False)) rl.append(VppIpRoute(cls, cls.pg5.remote_ip4, 32, [VppRoutePath("0.0.0.0", cls.pg5.sw_if_index)], register=False, table_id=1)) rl.append(VppIpRoute(cls, cls.pg6.remote_ip4, 32, [VppRoutePath("0.0.0.0", cls.pg6.sw_if_index)], register=False, table_id=1)) rl.append(VppIpRoute(cls, cls.pg6.remote_ip4, 16, [VppRoutePath("0.0.0.0", 0xffffffff, nh_table_id=1)], register=False, table_id=0)) for r in rl: r.add_vpp_config() def get_err_counter(self, path): return self.statistics.get_err_counter(path) def reass_hairpinning(self, server_addr, server_in_port, server_out_port, host_in_port, proto=IP_PROTOS.tcp, ignore_port=False): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: data = b"A" * 4 + b"B" * 16 + b"C" * 3 else: data = b"A" * 16 + b"B" * 16 + b"C" * 3 # send packet from host to server pkts = self.create_stream_frag(self.pg0, self.nat_addr, host_in_port, server_out_port, data, proto) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg0.get_capture(len(pkts)) p = self.reass_frags_and_verify(frags, self.nat_addr, server_addr) if proto != IP_PROTOS.icmp: if not ignore_port: self.assertNotEqual(p[layer].sport, host_in_port) self.assertEqual(p[layer].dport, server_in_port) else: if not ignore_port: self.assertNotEqual(p[layer].id, host_in_port) self.assertEqual(data, p[Raw].load) def frag_out_of_order(self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: data = b"A" * 4 + b"B" * 16 + b"C" * 3 else: data = b"A" * 16 + b"B" * 16 + b"C" * 3 self.port_in = self.random_port() for i in range(2): # in2out pkts = self.create_stream_frag(self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto) pkts.reverse() self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg1.get_capture(len(pkts)) if not dont_translate: p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4) else: p = self.reass_frags_and_verify(frags, self.pg0.remote_ip4, self.pg1.remote_ip4) if proto != IP_PROTOS.icmp: if not dont_translate: self.assertEqual(p[layer].dport, 20) if not ignore_port: self.assertNotEqual(p[layer].sport, self.port_in) else: self.assertEqual(p[layer].sport, self.port_in) else: if not ignore_port: if not dont_translate: self.assertNotEqual(p[layer].id, self.port_in) else: self.assertEqual(p[layer].id, self.port_in) self.assertEqual(data, p[Raw].load) # out2in if not dont_translate: dst_addr = self.nat_addr else: dst_addr = self.pg0.remote_ip4 if proto != IP_PROTOS.icmp: sport = 20 dport = p[layer].sport else: sport = p[layer].id dport = 0 pkts = self.create_stream_frag(self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True) pkts.reverse() self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.logger.info(self.vapi.cli("show trace")) self.pg_start() frags = self.pg0.get_capture(len(pkts)) p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4) if proto != IP_PROTOS.icmp: self.assertEqual(p[layer].sport, 20) self.assertEqual(p[layer].dport, self.port_in) else: self.assertEqual(p[layer].id, self.port_in) self.assertEqual(data, p[Raw].load) def reass_frags_and_verify(self, frags, src, dst): buffer = BytesIO() for p in frags: self.assertEqual(p[IP].src, src) self.assertEqual(p[IP].dst, dst) self.assert_ip_checksum_valid(p) buffer.seek(p[IP].frag * 8) buffer.write(bytes(p[IP].payload)) ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto) if ip.proto == IP_PROTOS.tcp: p = (ip / TCP(buffer.getvalue())) self.logger.debug(ppp("Reassembled:", p)) self.assert_tcp_checksum_valid(p) elif ip.proto == IP_PROTOS.udp: p = (ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:])) elif ip.proto == IP_PROTOS.icmp: p = (ip / ICMP(buffer.getvalue())) return p def frag_in_order(self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: data = b"A" * 4 + b"B" * 16 + b"C" * 3 else: data = b"A" * 16 + b"B" * 16 + b"C" * 3 self.port_in = self.random_port() # in2out pkts = self.create_stream_frag(self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg1.get_capture(len(pkts)) if not dont_translate: p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4) else: p = self.reass_frags_and_verify(frags, self.pg0.remote_ip4, self.pg1.remote_ip4) if proto != IP_PROTOS.icmp: if not dont_translate: self.assertEqual(p[layer].dport, 20) if not ignore_port: self.assertNotEqual(p[layer].sport, self.port_in) else: self.assertEqual(p[layer].sport, self.port_in) else: if not ignore_port: if not dont_translate: self.assertNotEqual(p[layer].id, self.port_in) else: self.assertEqual(p[layer].id, self.port_in) self.assertEqual(data, p[Raw].load) # out2in if not dont_translate: dst_addr = self.nat_addr else: dst_addr = self.pg0.remote_ip4 if proto != IP_PROTOS.icmp: sport = 20 dport = p[layer].sport else: sport = p[layer].id dport = 0 pkts = self.create_stream_frag(self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg0.get_capture(len(pkts)) p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4) if proto != IP_PROTOS.icmp: self.assertEqual(p[layer].sport, 20) self.assertEqual(p[layer].dport, self.port_in) else: self.assertEqual(p[layer].id, self.port_in) self.assertEqual(data, p[Raw].load) def verify_capture_out(self, capture, nat_ip=None, same_port=False, dst_ip=None, ignore_port=False): if nat_ip is None: nat_ip = self.nat_addr for packet in capture: try: self.assert_packet_checksums_valid(packet) self.assertEqual(packet[IP].src, nat_ip) if dst_ip is not None: self.assertEqual(packet[IP].dst, dst_ip) if packet.haslayer(TCP): if not ignore_port: if same_port: self.assertEqual( packet[TCP].sport, self.tcp_port_in) else: self.assertNotEqual( packet[TCP].sport, self.tcp_port_in) self.tcp_port_out = packet[TCP].sport self.assert_packet_checksums_valid(packet) elif packet.haslayer(UDP): if not ignore_port: if same_port: self.assertEqual( packet[UDP].sport, self.udp_port_in) else: self.assertNotEqual( packet[UDP].sport, self.udp_port_in) self.udp_port_out = packet[UDP].sport else: if not ignore_port: if same_port: self.assertEqual( packet[ICMP].id, self.icmp_id_in) else: self.assertNotEqual( packet[ICMP].id, self.icmp_id_in) self.icmp_id_out = packet[ICMP].id self.assert_packet_checksums_valid(packet) except: self.logger.error(ppp("Unexpected or invalid packet " "(outside network):", packet)) raise def verify_capture_in(self, capture, in_if): for packet in capture: try: self.assert_packet_checksums_valid(packet) self.assertEqual(packet[IP].dst, in_if.remote_ip4) if packet.haslayer(TCP): self.assertEqual(packet[TCP].dport, self.tcp_port_in) elif packet.haslayer(UDP): self.assertEqual(packet[UDP].dport, self.udp_port_in) else: self.assertEqual(packet[ICMP].id, self.icmp_id_in) except: self.logger.error(ppp("Unexpected or invalid packet " "(inside network):", packet)) raise def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64): if dst_ip is None: dst_ip = out_if.remote_ip4 pkts = [] # TCP p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) / TCP(sport=self.tcp_port_in, dport=20)) pkts.extend([p, p]) # UDP p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) / UDP(sport=self.udp_port_in, dport=20)) pkts.append(p) # ICMP p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) / ICMP(id=self.icmp_id_in, type='echo-request')) pkts.append(p) return pkts def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False): if dst_ip is None: dst_ip = self.nat_addr if not use_inside_ports: tcp_port = self.tcp_port_out udp_port = self.udp_port_out icmp_id = self.icmp_id_out else: tcp_port = self.tcp_port_in udp_port = self.udp_port_in icmp_id = self.icmp_id_in pkts = [] # TCP p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / TCP(dport=tcp_port, sport=20)) pkts.extend([p, p]) # UDP p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / UDP(dport=udp_port, sport=20)) pkts.append(p) # ICMP p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / ICMP(id=icmp_id, type='echo-reply')) pkts.append(p) return pkts def create_tcp_stream(self, in_if, out_if, count): pkts = [] port = 6303 for i in range(count): p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=64) / TCP(sport=port + i, dport=20)) pkts.append(p) return pkts def create_stream_frag(self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False): if proto == IP_PROTOS.tcp: p = (IP(src=src_if.remote_ip4, dst=dst) / TCP(sport=sport, dport=dport) / Raw(data)) p = p.__class__(scapy.compat.raw(p)) chksum = p[TCP].chksum proto_header = TCP(sport=sport, dport=dport, chksum=chksum) elif proto == IP_PROTOS.udp: proto_header = UDP(sport=sport, dport=dport) elif proto == IP_PROTOS.icmp: if not echo_reply: proto_header = ICMP(id=sport, type='echo-request') else: proto_header = ICMP(id=sport, type='echo-reply') else: raise Exception("Unsupported protocol") id = self.random_port() pkts = [] if proto == IP_PROTOS.tcp: raw = Raw(data[0:4]) else: raw = Raw(data[0:16]) p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) / proto_header / raw) pkts.append(p) if proto == IP_PROTOS.tcp: raw = Raw(data[4:20]) else: raw = Raw(data[16:32]) p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto) / raw) pkts.append(p) if proto == IP_PROTOS.tcp: raw = Raw(data[20:]) else: raw = Raw(data[32:]) p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id) / raw) pkts.append(p) return pkts def frag_in_order_in_plus_out(self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: data = b"A" * 4 + b"B" * 16 + b"C" * 3 else: data = b"A" * 16 + b"B" * 16 + b"C" * 3 port_in = self.random_port() for i in range(2): # out2in pkts = self.create_stream_frag(self.pg0, out_addr, port_in, out_port, data, proto) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg1.get_capture(len(pkts)) p = self.reass_frags_and_verify(frags, self.pg0.remote_ip4, in_addr) if proto != IP_PROTOS.icmp: self.assertEqual(p[layer].sport, port_in) self.assertEqual(p[layer].dport, in_port) else: self.assertEqual(p[layer].id, port_in) self.assertEqual(data, p[Raw].load) # in2out if proto != IP_PROTOS.icmp: pkts = self.create_stream_frag(self.pg1, self.pg0.remote_ip4, in_port, p[layer].sport, data, proto) else: pkts = self.create_stream_frag(self.pg1, self.pg0.remote_ip4, p[layer].id, 0, data, proto, echo_reply=True) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg0.get_capture(len(pkts)) p = self.reass_frags_and_verify(frags, out_addr, self.pg0.remote_ip4) if proto != IP_PROTOS.icmp: self.assertEqual(p[layer].sport, out_port) self.assertEqual(p[layer].dport, port_in) else: self.assertEqual(p[layer].id, port_in) self.assertEqual(data, p[Raw].load) def frag_out_of_order_in_plus_out(self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: data = b"A" * 4 + b"B" * 16 + b"C" * 3 else: data = b"A" * 16 + b"B" * 16 + b"C" * 3 port_in = self.random_port() for i in range(2): # out2in pkts = self.create_stream_frag(self.pg0, out_addr, port_in, out_port, data, proto) pkts.reverse() self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg1.get_capture(len(pkts)) p = self.reass_frags_and_verify(frags, self.pg0.remote_ip4, in_addr) if proto != IP_PROTOS.icmp: self.assertEqual(p[layer].dport, in_port) self.assertEqual(p[layer].sport, port_in) self.assertEqual(p[layer].dport, in_port) else: self.assertEqual(p[layer].id, port_in) self.assertEqual(data, p[Raw].load) # in2out if proto != IP_PROTOS.icmp: pkts = self.create_stream_frag(self.pg1, self.pg0.remote_ip4, in_port, p[layer].sport, data, proto) else: pkts = self.create_stream_frag(self.pg1, self.pg0.remote_ip4, p[layer].id, 0, data, proto, echo_reply=True) pkts.reverse() self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg0.get_capture(len(pkts)) p = self.reass_frags_and_verify(frags, out_addr, self.pg0.remote_ip4) if proto != IP_PROTOS.icmp: self.assertEqual(p[layer].sport, out_port) self.assertEqual(p[layer].dport, port_in) else: self.assertEqual(p[layer].id, port_in) self.assertEqual(data, p[Raw].load) def init_tcp_session(self, in_if, out_if, in_port, ext_port): # SYN packet in->out p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) / TCP(sport=in_port, dport=ext_port, flags="S")) in_if.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = out_if.get_capture(1) p = capture[0] out_port = p[TCP].sport # SYN + ACK packet out->in p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) / IP(src=out_if.remote_ip4, dst=self.nat_addr) / TCP(sport=ext_port, dport=out_port, flags="SA")) out_if.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() in_if.get_capture(1) # ACK packet in->out p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) / TCP(sport=in_port, dport=ext_port, flags="A")) in_if.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() out_if.get_capture(1) return out_port def twice_nat_common(self, self_twice_nat=False, same_pg=False, lb=False, client_id=None): twice_nat_addr = '10.0.1.3' port_in = 8080 if lb: if not same_pg: port_in1 = port_in port_in2 = port_in else: port_in1 = port_in + 1 port_in2 = port_in + 2 port_out = 80 eh_port_out = 4567 server1 = self.pg0.remote_hosts[0] server2 = self.pg0.remote_hosts[1] if lb and same_pg: server2 = server1 if not lb: server = server1 pg0 = self.pg0 if same_pg: pg1 = self.pg0 else: pg1 = self.pg1 eh_translate = ((not self_twice_nat) or (not lb and same_pg) or client_id == 1) self.nat_add_address(self.nat_addr) self.nat_add_address(twice_nat_addr, twice_nat=1) flags = 0 if self_twice_nat: flags |= self.config_flags.NAT_IS_SELF_TWICE_NAT else: flags |= self.config_flags.NAT_IS_TWICE_NAT if not lb: self.nat_add_static_mapping(pg0.remote_ip4, self.nat_addr, port_in, port_out, proto=IP_PROTOS.tcp, flags=flags) else: locals = [{'addr': server1.ip4, 'port': port_in1, 'probability': 50, 'vrf_id': 0}, {'addr': server2.ip4, 'port': port_in2, 'probability': 50, 'vrf_id': 0}] out_addr = self.nat_addr self.vapi.nat44_add_del_lb_static_mapping(is_add=1, flags=flags, external_addr=out_addr, external_port=port_out, protocol=IP_PROTOS.tcp, local_num=len(locals), locals=locals) self.nat_add_inside_interface(pg0) self.nat_add_outside_interface(pg1) if same_pg: if not lb: client = server else: assert client_id is not None if client_id == 1: client = self.pg0.remote_hosts[0] elif client_id == 2: client = self.pg0.remote_hosts[1] else: client = pg1.remote_hosts[0] p = (Ether(src=pg1.remote_mac, dst=pg1.local_mac) / IP(src=client.ip4, dst=self.nat_addr) / TCP(sport=eh_port_out, dport=port_out)) pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = pg0.get_capture(1) p = capture[0] try: ip = p[IP] tcp = p[TCP] if lb: if ip.dst == server1.ip4: server = server1 port_in = port_in1 else: server = server2 port_in = port_in2 self.assertEqual(ip.dst, server.ip4) if lb and same_pg: self.assertIn(tcp.dport, [port_in1, port_in2]) else: self.assertEqual(tcp.dport, port_in) if eh_translate: self.assertEqual(ip.src, twice_nat_addr) self.assertNotEqual(tcp.sport, eh_port_out) else: self.assertEqual(ip.src, client.ip4) self.assertEqual(tcp.sport, eh_port_out) eh_addr_in = ip.src eh_port_in = tcp.sport saved_port_in = tcp.dport self.assert_packet_checksums_valid(p) except: self.logger.error(ppp("Unexpected or invalid packet:", p)) raise p = (Ether(src=server.mac, dst=pg0.local_mac) / IP(src=server.ip4, dst=eh_addr_in) / TCP(sport=saved_port_in, dport=eh_port_in)) pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = pg1.get_capture(1) p = capture[0] try: ip = p[IP] tcp = p[TCP] self.assertEqual(ip.dst, client.ip4) self.assertEqual(ip.src, self.nat_addr) self.assertEqual(tcp.dport, eh_port_out) self.assertEqual(tcp.sport, port_out) self.assert_packet_checksums_valid(p) except: self.logger.error(ppp("Unexpected or invalid packet:", p)) raise if eh_translate: sessions = self.vapi.nat44_user_session_dump(server.ip4, 0) self.assertEqual(len(sessions), 1) self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID) self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_TWICE_NAT) self.logger.info(self.vapi.cli("show nat44 sessions")) self.vapi.nat44_del_session( address=sessions[0].inside_ip_address, port=sessions[0].inside_port, protocol=sessions[0].protocol, flags=(self.config_flags.NAT_IS_INSIDE | self.config_flags.NAT_IS_EXT_HOST_VALID), ext_host_address=sessions[0].ext_host_nat_address, ext_host_port=sessions[0].ext_host_nat_port) sessions = self.vapi.nat44_user_session_dump(server.ip4, 0) self.assertEqual(len(sessions), 0) def verify_syslog_sess(self, data, is_add=True, is_ip6=False): message = data.decode('utf-8') try: message = SyslogMessage.parse(message) except ParseError as e: self.logger.error(e) raise else: self.assertEqual(message.severity, SyslogSeverity.info) self.assertEqual(message.appname, 'NAT') self.assertEqual(message.msgid, 'SADD' if is_add else 'SDEL') sd_params = message.sd.get('nsess') self.assertTrue(sd_params is not None) if is_ip6: self.assertEqual(sd_params.get('IATYP'), 'IPv6') self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip6) else: self.assertEqual(sd_params.get('IATYP'), 'IPv4') self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4) self.assertTrue(sd_params.get('SSUBIX') is not None) self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in) self.assertEqual(sd_params.get('XATYP'), 'IPv4') self.assertEqual(sd_params.get('XSADDR'), self.nat_addr) self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out) self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp) self.assertEqual(sd_params.get('SVLAN'), '0') self.assertEqual(sd_params.get('XDADDR'), self.pg1.remote_ip4) self.assertEqual(sd_params.get('XDPORT'), "%d" % self.tcp_external_port) class TestNAT44ED(NAT44EDTestCase): """ NAT44ED Test Case """ def test_icmp_error(self): """ NAT44ED test ICMP error message with inner header""" payload = "H" * 10 self.nat_add_address(self.nat_addr) self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg1) # in2out (initiate connection) p1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / UDP(sport=21, dport=20) / payload) self.pg0.add_stream(p1) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(1)[0] # out2in (send error message) p2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / ICMP(type='dest-unreach', code='port-unreachable') / capture[IP:]) self.pg1.add_stream(p2) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg0.get_capture(1)[0] self.logger.info(ppp("p1 packet:", p1)) self.logger.info(ppp("p2 packet:", p2)) self.logger.info(ppp("capture packet:", capture)) def test_icmp_echo_reply_trailer(self): """ ICMP echo reply with ethernet trailer""" self.nat_add_address(self.nat_addr) self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg1) # in2out p1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / ICMP(type=8, id=0xabcd, seq=0)) self.pg0.add_stream(p1) self.pg_enable_capture(self.pg_interfaces) self.pg_start() c = self.pg1.get_capture(1)[0] self.logger.debug(self.vapi.cli("show trace")) # out2in p2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / IP(src=self.pg1.remote_ip4, dst=self.nat_addr, id=0xee59) / ICMP(type=0, id=c[ICMP].id, seq=0)) # force checksum calculation p2 = p2.__class__(bytes(p2)) self.logger.debug(ppp("Packet before modification:", p2)) # hex representation of vss monitoring ethernet trailer # this seems to be just added to end of packet without modifying # IP or ICMP lengths / checksums p2 = p2 / Raw("\x00\x00\x52\x54\x00\x46\xab\x04\x84\x18") # change it so that IP/ICMP is unaffected p2[IP].len = 28 self.logger.debug(ppp("Packet with added trailer:", p2)) self.pg1.add_stream(p2) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.pg0.get_capture(1) def test_users_dump(self): """ NAT44ED API test - nat44_user_dump """ self.nat_add_address(self.nat_addr) self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg1) self.vapi.nat44_forwarding_enable_disable(enable=1) local_ip = self.pg0.remote_ip4 external_ip = self.nat_addr self.nat_add_static_mapping(local_ip, external_ip) users = self.vapi.nat44_user_dump() self.assertEqual(len(users), 0) # in2out - static mapping match pkts = self.create_stream_out(self.pg1) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg0.get_capture(len(pkts)) self.verify_capture_in(capture, self.pg0) pkts = self.create_stream_in(self.pg0, self.pg1) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) self.verify_capture_out(capture, same_port=True) users = self.vapi.nat44_user_dump() self.assertEqual(len(users), 1) static_user = users[0] self.assertEqual(static_user.nstaticsessions, 3) self.assertEqual(static_user.nsessions, 0) # in2out - no static mapping match (forwarding test) host0 = self.pg0.remote_hosts[0] self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1] try: pkts = self.create_stream_out(self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg0.get_capture(len(pkts)) self.verify_capture_in(capture, self.pg0) pkts = self.create_stream_in(self.pg0, self.pg1) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4, same_port=True) finally: self.pg0.remote_hosts[0] = host0 users = self.vapi.nat44_user_dump() self.assertEqual(len(users), 2) if str(users[0].ip_address) == self.pg0.remote_hosts[0].ip4: non_static_user = users[1] static_user = users[0] else: non_static_user = users[0] static_user = users[1] self.assertEqual(static_user.nstaticsessions, 3) self.assertEqual(static_user.nsessions, 0) self.assertEqual(non_static_user.nstaticsessions, 0) self.assertEqual(non_static_user.nsessions, 3) users = self.vapi.nat44_user_dump() self.assertEqual(len(users), 2) if str(users[0].ip_address) == self.pg0.remote_hosts[0].ip4: non_static_user = users[1] static_user = users[0] else: non_static_user = users[0] static_user = users[1] self.assertEqual(static_user.nstaticsessions, 3) self.assertEqual(static_user.nsessions, 0) self.assertEqual(non_static_user.nstaticsessions, 0) self.assertEqual(non_static_user.nsessions, 3) def test_frag_out_of_order_do_not_translate(self): """ NAT44ED don't translate fragments arriving out of order """ self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg1) self.vapi.nat44_forwarding_enable_disable(enable=True) self.frag_out_of_order(proto=IP_PROTOS.tcp, dont_translate=True) def test_forwarding(self): """ NAT44ED forwarding test """ self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg1) self.vapi.nat44_forwarding_enable_disable(enable=1) real_ip = self.pg0.remote_ip4 alias_ip = self.nat_addr flags = self.config_flags.NAT_IS_ADDR_ONLY self.vapi.nat44_add_del_static_mapping(is_add=1, local_ip_address=real_ip, external_ip_address=alias_ip, external_sw_if_index=0xFFFFFFFF, flags=flags) try: # in2out - static mapping match pkts = self.crea
#!/usr/bin/env python3
"""IP{4,6} over IP{v,6} tunnel functional tests"""
import unittest
from scapy.layers.inet6 import IPv6, Ether, IP, UDP, IPv6ExtHdrFragment, Raw
from scapy.all import fragment, fragment6, RandShort, defragment6
from framework import VppTestCase, VppTestRunner
from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable, FibPathProto
from vpp_ipip_tun_interface import VppIpIpTunInterface
from vpp_teib import VppTeib
from vpp_papi import VppEnum
from socket import AF_INET, AF_INET6, inet_pton
from util import reassemble4
""" Testipip is a subclass of VPPTestCase classes.
IPIP tests.
"""
def ipip_add_tunnel(test, src, dst, table_id=0, dscp=0x0,
flags=0):
""" Add a IPIP tunnel """
return test.vapi.ipip_add_tunnel(
tunnel={
'src': src,
'dst': dst,
'table_id': table_id,
'instance': 0xffffffff,
'dscp': dscp,
'flags': flags
}
)
# the number of packets to send when injecting traffic.
# a multiple of 8 minus one, so we test all by 8/4/2/1 loops
N_PACKETS = 64 - 1
class TestIPIP(VppTestCase):
""" IPIP Test Case """
@classmethod
def setUpClass(cls):
super(TestIPIP, cls).setUpClass()
cls.create_pg_interfaces(range(2))
cls.interfaces = list(cls.pg_interfaces)
@classmethod
def tearDownClass(cls):
super(TestIPIP, cls).tearDownClass()
def setUp(self):
super(TestIPIP, self).setUp()
for i in self.interfaces:
i.admin_up()
i.config_ip4()
i.config_ip6()
i.disable_ipv6_ra()
i.resolve_arp()
i.resolve_ndp()
def tearDown(self):
super(TestIPIP, self).tearDown()
if not self.vpp_dead:
for i in self.pg_interfaces:
i.unconfig_ip4()
i.unconfig_ip6()
i.admin_down()
def validate(self, rx, expected):
self.assertEqual(rx, expected.__class__(expected))
def generate_ip4_frags(self, payload_length, fragment_size):
p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
p_payload = UDP(sport=1234, dport=1234) / self.payload(payload_length)
p_ip4 = IP(src="1.2.3.4", dst=self.pg0.remote_ip4)
outer_ip4 = (p_ether / IP(src=self.pg1.remote_ip4,
id=RandShort(),
dst=self.pg0.local_ip4) / p_ip4 / p_payload)
frags = fragment(outer_ip4, fragment_size)
p4_reply = (p_ip4 / p_payload)
p4_reply.ttl -= 1
return frags, p4_reply
def verify_ip4ip4_encaps(self, a, p_ip4s, p_ip4_encaps):
for i, p_ip4 in enumerate(p_ip4s):
p_ip4.dst = a
p4 = (self.p_ether / p_ip4 / self.p_payload)
p_ip4_inner = p_ip4
p_ip4_inner.ttl -= 1
p4_reply = (p_ip4_encaps[i] / p_ip4_inner / self.p_payload)
p4_reply.ttl -= 1
p4_reply.id = 0
rx = self.send_and_expect(self.pg0, p4 * N_PACKETS, self.pg1)
for p in rx:
self.validate(p[1], p4_reply)
self.assert_packet_checksums_valid(p)
def verify_ip6ip4_encaps(self, a, p_ip6s, p_ip4_encaps):
for i, p_ip6 in enumerate(p_ip6s):
p_ip6.dst = a
p6 = (self.p_ether / p_ip6 / self.p_payload)
p_inner_ip6 = p_ip6
p_inner_ip6.hlim -= 1
p6_reply = (p_ip4_encaps[i] / p_inner_ip6 / self.p_payload)
p6_reply.ttl -= 1
rx = self.send_and_expect(self.pg0, p6 * N_PACKETS, self.pg1)
for p in rx:
self.validate(p[1], p6_reply)
self.assert_packet_checksums_valid(p)
def test_ipip4(self):
""" ip{v4,v6} over ip4 test """
self.pg1.generate_remote_hosts(5)
self.pg1.configure_ipv4_neighbors()
e = VppEnum.vl_api_tunnel_encap_decap_flags_t
d = VppEnum.vl_api_ip_dscp_t
self.p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
self.p_payload = UDP(sport=1234, dport=1234) / Raw(b'X' * 100)
# create a TOS byte by shifting a DSCP code point 2 bits. those 2 bits
# are for the ECN.
dscp = d.IP_API_DSCP_AF31 << 2
ecn = 3
dscp_ecn = d.IP_API_DSCP_AF31 << 2 | ecn
# IPv4 transport that copies the DCSP from the payload
tun_dscp = VppIpIpTunInterface(
self,
self.pg0,
self.pg0.local_ip4,
self.pg1.remote_hosts[0].ip4,
flags=e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DSCP)
tun_dscp.add_vpp_config()
# IPv4 transport that copies the DCSP and ECN from the payload
tun_dscp_ecn = VppIpIpTunInterface(
self,
self.pg0,
self.pg0.local_ip4,
self.pg1.remote_hosts[1].ip4,
flags=(e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DSCP |
e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_ECN))
tun_dscp_ecn.add_vpp_config()
# IPv4 transport that copies the ECN from the payload and sets the
# DF bit on encap. copies the ECN on decap
tun_ecn = VppIpIpTunInterface(
self,
self.pg0,
self.pg0.local_ip4,
self.pg1.remote_hosts[2].ip4,
flags=(e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_ECN |
e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_SET_DF |
e.TUNNEL_API_ENCAP_DECAP_FLAG_DECAP_COPY_ECN))
tun_ecn.add_vpp_config()
# IPv4 transport that sets a fixed DSCP in the encap and copies
# the DF bit
tun = VppIpIpTunInterface(
self,
self.pg0,
self.pg0.local_ip4,
self.pg1.remote_hosts[3].ip4,
dscp=d.IP_API_DSCP_AF11,
flags=e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DF)
tun.add_vpp_config()
# array of all the tunnels
tuns = [tun_dscp, tun_dscp_ecn, tun_ecn, tun]
# addresses for prefixes routed via each tunnel
a4s = ["" for i in range(len(tuns))]
a6s = ["" for i in range(len(tuns))]
# IP headers with each combination of DSCp/ECN tested
p_ip6s = [IPv6(src="1::1", dst="DEAD::1", nh='UDP', tc=dscp),
IPv6(src="1::1", dst="DEAD::1", nh='UDP', tc=dscp_ecn),
IPv6(src="1::1", dst="DEAD::1", nh='UDP', tc=ecn),
IPv6(src="1::1", dst="DEAD::1", nh='UDP', tc=0xff)]
p_ip4s = [IP(src="1.2.3.4", dst="130.67.0.1", tos=dscp, flags='DF'),
IP(src="1.2.3.4", dst="130.67.0.1", tos=dscp_ecn),
IP(src="1.2.3.4", dst="130.67.0.1", tos=ecn),
IP(src="1.2.3.4", dst="130.67.0.1", tos=0xff)]
# Configure each tunnel
for i, t in enumerate(tuns):
# Set interface up and enable IP on it
self.vapi.sw_interface_set_flags(t.sw_if_index, 1)
self.vapi.sw_interface_set_unnumbered(
sw_if_index=self.pg0.sw_if_index,
unnumbered_sw_if_index=t.sw_if_index)
# prefix for route / destination address for packets
a4s[i] = "130.67.%d.0" % i
a6s[i] = "dead:%d::" % i
# Add IPv4 and IPv6 routes via tunnel interface
ip4_via_tunnel = VppIpRoute(
self, a4s[i], 24,
[VppRoutePath("0.0.0.0",
t.sw_if_index,
proto=FibPathProto.FIB_PATH_NH_PROTO_IP4)])
ip4_via_tunnel.add_vpp_config()
ip6_via_tunnel = VppIpRoute(
self, a6s[i], 64,
[VppRoutePath("::",
t.sw_if_index,
proto=FibPathProto.FIB_PATH_NH_PROTO_IP6)])
ip6_via_tunnel.add_vpp_config()
#
# Encapsulation
#
# tun_dscp copies only the dscp
# expected TC values are thus only the DCSP value is present from the
# inner
exp_tcs = [dscp, dscp, 0, 0xfc]
p_ip44_encaps = [IP(src=self.pg0.local_ip4,
dst=tun_dscp.dst,
tos=tc) for tc in exp_tcs]
p_ip64_encaps = [IP(src=self.pg0.local_ip4,
dst=tun_dscp.dst,
proto='ipv6', id=0, tos=tc) for tc in exp_tcs]
# IPv4 in to IPv4 tunnel
self.verify_ip4ip4_encaps(a4s[0], p_ip4s, p_ip44_encaps)
# IPv6 in to IPv4 tunnel
self.verify_ip6ip4_encaps(a6s[0], p_ip6s, p_ip64_encaps)
# tun_dscp_ecn copies the dscp and the ecn
exp_tcs = [dscp, dscp_ecn, ecn, 0xff]
p_ip44_encaps = [IP(src=self.pg0.local_ip4,
dst=tun_dscp_ecn.dst,
tos=tc) for tc in exp_tcs]
p_ip64_encaps = [IP(src=self.pg0.local_ip4,
dst=tun_dscp_ecn.dst,
proto='ipv6', id=0, tos=tc) for tc in exp_tcs]
self.verify_ip4ip4_encaps(a4s[1], p_ip4s, p_ip44_encaps)
self.verify_ip6ip4_encaps(a6s[1], p_ip6s, p_ip64_encaps)
# tun_ecn copies only the ecn and always sets DF
exp_tcs = [0, ecn, ecn, ecn]
p_ip44_encaps = [IP(src=self.pg0.local_ip4,
dst=tun_ecn.dst,
flags='DF', tos=tc) for tc in exp_tcs]
p_ip64_encaps = [IP(src=self.pg0.local_ip4,
dst=tun_ecn.dst,
flags='DF', proto='ipv6', id=0, tos=tc)
for tc in exp_tcs]
self.verify_ip4ip4_encaps(a4s[2], p_ip4s, p_ip44_encaps)
self.verify_ip6ip4_encaps(a6s[2], p_ip6s, p_ip64_encaps)
# tun sets a fixed dscp and copies DF
fixed_dscp = tun.dscp << 2
flags = ['DF', 0, 0, 0]
p_ip44_encaps = [IP(src=self.pg0.local_ip4,
dst=tun.dst,
flags=f,
tos=fixed_dscp) for f in flags]
p_ip64_encaps = [IP(src=self.pg0.local_ip4,
dst=tun.dst,
proto='ipv6', id=0,
tos=fixed_dscp) for i in range(len(p_ip4s))]
self.verify_ip4ip4_encaps(a4s[3], p_ip4s, p_ip44_encaps)
self.verify_ip6ip4_encaps(a6s[3], p_ip6s, p_ip64_encaps)
#
# Decapsulation
#
n_packets_decapped = 0
self.p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
# IPv4 tunnel to IPv4
tcs = [0, dscp, dscp_ecn, ecn]
# one overlay packet and all combinations of its encap
p_ip4 = IP(src="1.2.3.4", dst=self.pg0.remote_ip4)
p_ip4_encaps = [IP(src=tun.dst,
dst=self.pg0.local_ip4,
tos=tc) for tc in tcs]
# for each encap tun will produce the same inner packet because it does
# not copy up fields from the payload
for p_ip4_encap in p_ip4_encaps:
p4 = (self.p_ether / p_ip4_encap / p_ip4 / self.p_payload)
p4_reply = (p_ip4 / self.p_payload)
p4_reply.ttl -= 1
rx = self.send_and_expect(self.pg1, p4 * N_PACKETS, self.pg0)
n_packets_decapped += N_PACKETS
for p in rx:
self.validate(p[1], p4_reply)
self.assert_packet_checksums_valid(p)
err = self.statistics.get_err_counter(
'/err/ipip4-input/packets decapsulated')
self.assertEqual(err, n_packets_decapped)
# tun_ecn copies the ECN bits from the encap to the inner
p_ip4_encaps = [IP(src=tun_ecn.dst,
dst=self.pg0.local_ip4,
tos=tc) for tc in tcs]
p_ip4_replys = [p_ip4.copy() for i in range(len(p_ip4_encaps))]
p_ip4_replys[2].tos = ecn
p_ip4_replys[3].tos = ecn
for i, p_ip4_encap in enumerate(p_ip4_encaps):
p4 = (self.p_ether / p_ip4_encap / p_ip4 / self.p_payload)
p4_reply = (p_ip4_replys[i] / self.p_payload)
p4_reply.ttl -= 1
rx = self.send_and_expect(self.pg1, p4 * N_PACKETS, self.pg0)
n_packets_decapped += N_PACKETS
for p in rx:
self.validate(p[1], p4_reply)
self.assert_packet_checksums_valid(p)
err = self.statistics.get_err_counter(
'/err/ipip4-input/packets decapsulated')
self.assertEqual(err, n_packets_decapped)
# IPv4 tunnel to IPv6
# for each encap tun will produce the same inner packet because it does
# not copy up fields from the payload
p_ip4_encaps = [IP(src=tun.dst,
dst=self.pg0.local_ip4,
tos=tc) for tc in tcs]
p_ip6 = IPv6(src="1:2:3::4", dst=self.pg0.remote_ip6)
for p_ip4_encap in p_ip4_encaps:
p6 = (self.p_ether /
p_ip4_encap / p_ip6 /
self.p_payload)
p6_reply = (p_ip6 / self.p_payload)
p6_reply.hlim = 63
rx = self.send_and_expect(self.pg1, p6 * N_PACKETS, self.pg0)
n_packets_decapped += N_PACKETS
for p in rx:
self.validate(p[1], p6_reply)
self.assert_packet_checksums_valid(p)
err = self.statistics.get_err_counter(
'/err/ipip4-input/packets decapsulated')
self.assertEqual(err, n_packets_decapped)
# IPv4 tunnel to IPv6
# tun_ecn copies the ECN bits from the encap to the inner
p_ip4_encaps = [IP(src=tun_ecn.dst,
dst=self.pg0.local_ip4,
tos=tc) for tc in tcs]
p_ip6 = IPv6(src="1:2:3::4", dst=self.pg0.remote_ip6)
p_ip6_replys = [p_ip6.copy() for i in range(len(p_ip4_encaps))]
p_ip6_replys[2].tc = ecn
p_ip6_replys[3].tc = ecn
for i, p_ip4_encap in enumerate(p_ip4_encaps):
p6 = (self.p_ether / p_ip4_encap / p_ip6 / self.p_payload)
p6_reply = (p_ip6_replys[i] / self.p_payload)
p6_reply.hlim = 63
rx = self.send_and_expect(self.pg1, p6 * N_PACKETS, self.pg0)
n_packets_decapped += N_PACKETS
for p in rx:
self.validate(p[1], p6_reply)
self.assert_packet_checksums_valid(p)
err = self.statistics.get_err_counter(
'/err/ipip4-input/packets decapsulated')
self.assertEqual(err, n_packets_decapped)
#
# Fragmentation / Reassembly and Re-fragmentation
#
rv = self.vapi.ip_reassembly_enable_disable(
sw_if_index=self.pg1.sw_if_index,
enable_ip4=1)
self.vapi.ip_reassembly_set(timeout_ms=1000, max_reassemblies=1000,
max_reassembly_length=1000,
expire_walk_interval_ms=10000,
is_ip6=0)
# Send lots of fragments, verify reassembled packet
frags, p4_reply = self.generate_ip4_frags(3131, 1400)
f = []
for i in range(0, 1000):
f.extend(frags)
self.pg1.add_stream(f)
self.pg_enable_capture()
self.pg_start()
rx = self.pg0.get_capture(1000)
n_packets_decapped += 1000
for p in rx:
self.validate(p[1], p4_reply)
err = self.statistics.get_err_counter(
'/err/ipip4-input/packets decapsulated')
self.assertEqual(err, n_packets_decapped)
f = []
r = []
for i in range(1, 90):
frags, p4_reply = self.generate_ip4_frags(i * 100, 1000)
f.extend(frags)
r.extend(p4_reply)
self.pg_enable_capture()
self.pg1.add_stream(f)
self.pg_start()
rx = self.pg0.get_capture(89)
i = 0
for p in rx:
self.validate(p[1], r[i])
i += 1
# Now try with re-fragmentation
#
# Send fragments to tunnel head-end, for the tunnel head end
# to reassemble and then refragment
#
self.vapi.sw_interface_set_mtu(self.pg0.sw_if_index, [576, 0, 0, 0])
frags, p4_reply = self.generate_ip4_frags(3123, 1200)
self.pg_enable_capture()
self.pg1.add_stream(frags)
self.pg_start()
rx = self.pg0.get_capture(6)
reass_pkt = reassemble4(rx)
p4_reply.id = 256
self.validate(reass_pkt, p4_reply)
self.vapi.sw_interface_set_mtu(self.pg0.sw_if_index, [1600, 0, 0, 0])
frags, p4_reply = self.generate_ip4_frags(3123, 1200)
self.pg_enable_capture()
self.pg1.add_stream(frags)
self.pg_start()
rx = self.pg0.get_capture(2)
reass_pkt = reassemble4(rx)
p4_reply.id = 512
self.validate(reass_pkt, p4_reply)
# send large packets through the tunnel, expect them to be fragmented
self.vapi.sw_interface_set_mtu(tun_dscp.sw_if_index, [600, 0, 0, 0])
p4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src="1.2.3.4", dst="130.67.0.1", tos=42) /
UDP(sport=1234, dport=1234) / Raw(b'Q' * 1000))
rx = self.send_and_expect(self.pg0, p4 * 15, self.pg1, 30)
inners = []
for p in rx:
inners.append(p[IP].payload)
reass_pkt = reassemble4(inners)
for p in reass_pkt:
self.assert_packet_checksums_valid(p)
self.assertEqual(p[IP].ttl, 63)
def test_ipip_create(self):
""" ipip create / delete interface test """
rv = ipip_add_tunnel(self, '1.2.3.4', '2.3.4.5')
sw_if_index = rv.sw_if_index
self.vapi.ipip_del_tunnel(sw_if_index)
def test_ipip_vrf_create(self):
""" ipip create / delete interface VRF test """
t = VppIpTable(self, 20)
t.add_vpp_config()
rv = ipip_add_tunnel(self, '1.2.3.4', '2.3.4.5', table_id=20)
sw_if_index = rv.sw_if_index
self.vapi.ipip_del_tunnel(sw_if_index)
def payload(self, len):
return 'x' * len
def test_mipip4(self):
""" p2mp IPv4 tunnel Tests """
for itf in self.pg_interfaces:
#
# one underlay nh for each overlay/tunnel peer
#
itf.generate_remote_hosts(4)
itf.configure_ipv4_neighbors()
#
# Create an p2mo IPIP tunnel.
# - set it admin up
# - assign an IP Addres
# - Add a route via the tunnel
#
ipip_if = VppIpIpTunInterface(self, itf,
itf.local_ip4,
"0.0.0.0",
mode=(VppEnum.vl_api_tunnel_mode_t.
TUNNEL_API_MODE_MP))
ipip_if.add_vpp_config()
ipip_if.admin_up()
ipip_if.config_ip4()
ipip_if.generate_remote_hosts(4)
self.logger.info(self.vapi.cli("sh adj"))
self.logger.info(self.vapi.cli("sh ip fib"))
#
# ensure we don't match to the tunnel if the source address
# is all zeros
#
# tx = self.create_tunnel_stream_4o4(self.pg0,
# "0.0.0.0",
# itf.local_ip4,
# self.pg0.local_ip4,
# self.pg0.remote_ip4)
# self.send_and_assert_no_replies(self.pg0, tx)
#
# for-each peer
#
for ii in range(1, 4):
route_addr = "4.4.4.%d" % ii
#
# route traffic via the peer
#
route_via_tun = VppIpRoute(
self, route_addr, 32,
[VppRoutePath(ipip_if._remote_hosts[ii].ip4,
ipip_if.sw_if_index)])
route_via_tun.add_vpp_config()
#
# Add a TEIB entry resolves the peer
#
teib = VppTeib(self, ipip_if,
ipip_if._remote_hosts[ii].ip4,
itf._remote_hosts[ii].ip4)
teib.add_vpp_config()
self.logger.info(self.vapi.cli("sh adj nbr ipip0 %s" %
ipip_if._remote_hosts[ii].ip4))
#
# Send a packet stream that is routed into the tunnel
# - packets are IPIP encapped
#
inner = (IP(dst=route_addr, src="5.5.5.5") /
UDP(sport=1234, dport=1234) /
Raw(b'0x44' * 100))
tx_e = [(Ether(dst=self.pg0.local_mac,
src=self.pg0.remote_mac) /
inner) for x in range(63)]
rxs = self.send_and_expect(self.pg0, tx_e, itf)
for rx in rxs:
self.assertEqual(rx[IP].src, itf.local_ip4)
self.assertEqual(rx[IP].dst, itf._remote_hosts[ii].ip4)
tx_i = [(Ether(dst=self.pg0.local_mac,
src=self.pg0.remote_mac) /
IP(src=itf._remote_hosts[ii].ip4,
dst=itf.local_ip4) /
IP(src=self.pg0.local_ip4, dst=self.pg0.remote_ip4) /
UDP(sport=1234, dport=1234) /
Raw(b'0x44' * 100)) for x in range(63)]
self.logger.info(self.vapi.cli("sh ipip tunnel-hash"))
rx = self.send_and_expect(self.pg0, tx_i, self.pg0)
#
# delete and re-add the TEIB
#
teib.remove_vpp_config()
self.send_and_assert_no_replies(self.pg0, tx_e)
self.send_and_assert_no_replies(self.pg0, tx_i)
teib.add_vpp_config()
rx = self.send_and_expect(self.pg0, tx_e, itf)
for rx in rxs:
self.assertEqual(rx[IP].src, itf.local_ip4)
self.assertEqual(rx[IP].dst, itf._remote_hosts[ii].ip4)
rx = self.send_and_expect(self.pg0, tx_i, self.pg0)
ipip_if.admin_down()
ipip_if.unconfig_ip4()
class TestIPIP6(VppTestCase):
""" IPIP6 Test Case """
@classmethod
def setUpClass(cls):
super(TestIPIP6, cls).setUpClass()
cls.create_pg_interfaces(range(2))
cls.interfaces = list(cls.pg_interfaces)
@classmethod
def tearDownClass(cls):
super(TestIPIP6, cls).tearDownClass()
def setUp(self):
super(TestIPIP6, self).setUp()
for i in self.interfaces:
i.admin_up()
i.