#!/usr/bin/env python3 import unittest from io import BytesIO from random import randint, shuffle, choice import scapy.compat from framework import VppTestCase, VppTestRunner from scapy.data import IP_PROTOS from scapy.layers.inet import IP, TCP, UDP, ICMP, GRE from scapy.layers.inet import IPerror, TCPerror from scapy.layers.l2 import Ether from scapy.packet import Raw from syslog_rfc5424_parser import SyslogMessage, ParseError from syslog_rfc5424_parser.constants import SyslogSeverity from util import ppp, ip4_range from vpp_acl import AclRule, VppAcl, VppAclInterface from vpp_ip_route import VppIpRoute, VppRoutePath from vpp_papi import VppEnum class NAT44EDTestCase(VppTestCase): nat_addr = '10.0.0.3' tcp_port_in = 6303 tcp_port_out = 6303 udp_port_in = 6304 udp_port_out = 6304 icmp_id_in = 6305 icmp_id_out = 6305 tcp_external_port = 80 max_sessions = 100 def setUp(self): super(NAT44EDTestCase, self).setUp() self.plugin_enable() def tearDown(self): super(NAT44EDTestCase, self).tearDown() if not self.vpp_dead: self.plugin_disable() def plugin_enable(self): self.vapi.nat44_ed_plugin_enable_disable( sessions=self.max_sessions, enable=1) def plugin_disable(self): self.vapi.nat44_ed_plugin_enable_disable(enable=0) @property def config_flags(self): return VppEnum.vl_api_nat_config_flags_t @property def nat44_config_flags(self): return VppEnum.vl_api_nat44_config_flags_t @property def syslog_severity(self): return VppEnum.vl_api_syslog_severity_t @property def server_addr(self): return self.pg1.remote_hosts[0].ip4 @staticmethod def random_port(): return randint(1025, 65535) @staticmethod def proto2layer(proto): if proto == IP_PROTOS.tcp: return TCP elif proto == IP_PROTOS.udp: return UDP elif proto == IP_PROTOS.icmp: return ICMP else: raise Exception("Unsupported protocol") @classmethod def create_and_add_ip4_table(cls, i, table_id=0): cls.vapi.ip_table_add_del(is_add=1, table={'table_id': table_id}) i.set_table_ip4(table_id) @classmethod def configure_ip4_interface(cls, i, hosts=0, table_id=None): if table_id: cls.create_and_add_ip4_table(i, table_id) i.admin_up() i.config_ip4() i.resolve_arp() if hosts: i.generate_remote_hosts(hosts) i.configure_ipv4_neighbors() @classmethod def nat_add_interface_address(cls, i): cls.vapi.nat44_add_del_interface_addr( sw_if_index=i.sw_if_index, is_add=1) def nat_add_inside_interface(self, i): self.vapi.nat44_interface_add_del_feature( flags=self.config_flags.NAT_IS_INSIDE, sw_if_index=i.sw_if_index, is_add=1) def nat_add_outside_interface(self, i): self.vapi.nat44_interface_add_del_feature( flags=self.config_flags.NAT_IS_OUTSIDE, sw_if_index=i.sw_if_index, is_add=1) def nat_add_address(self, address, twice_nat=0, vrf_id=0xFFFFFFFF, is_add=1): flags = self.config_flags.NAT_IS_TWICE_NAT if twice_nat else 0 self.vapi.nat44_add_del_address_range(first_ip_address=address, last_ip_address=address, vrf_id=vrf_id, is_add=is_add, flags=flags) def nat_add_static_mapping(self, local_ip, external_ip='0.0.0.0', local_port=0, external_port=0, vrf_id=0, is_add=1, external_sw_if_index=0xFFFFFFFF, proto=0, tag="", flags=0): if not (local_port and external_port): flags |= self.config_flags.NAT_IS_ADDR_ONLY self.vapi.nat44_add_del_static_mapping( is_add=is_add, local_ip_address=local_ip, external_ip_address=external_ip, external_sw_if_index=external_sw_if_index, local_port=local_port, external_port=external_port, vrf_id=vrf_id, protocol=proto, flags=flags, tag=tag) @classmethod def setUpClass(cls): super(NAT44EDTestCase, cls).setUpClass() cls.create_pg_interfaces(range(12)) cls.interfaces = list(cls.pg_interfaces[:4]) cls.create_and_add_ip4_table(cls.pg2, 10) for i in cls.interfaces: cls.configure_ip4_interface(i, hosts=3) # test specific (test-multiple-vrf) cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 1}) # test specific (test-one-armed-nat44-static) cls.pg4.generate_remote_hosts(2) cls.pg4.config_ip4() cls.vapi.sw_interface_add_del_address( sw_if_index=cls.pg4.sw_if_index, prefix="10.0.0.1/24") cls.pg4.admin_up() cls.pg4.resolve_arp() cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4 cls.pg4.resolve_arp() # test specific interface (pg5) cls.pg5._local_ip4 = "10.1.1.1" cls.pg5._remote_hosts[0]._ip4 = "10.1.1.2" cls.pg5.set_table_ip4(1) cls.pg5.config_ip4() cls.pg5.admin_up() cls.pg5.resolve_arp() # test specific interface (pg6) cls.pg6._local_ip4 = "10.1.2.1" cls.pg6._remote_hosts[0]._ip4 = "10.1.2.2" cls.pg6.set_table_ip4(1) cls.pg6.config_ip4() cls.pg6.admin_up() cls.pg6.resolve_arp() rl = list() rl.append(VppIpRoute(cls, "0.0.0.0", 0, [VppRoutePath("0.0.0.0", 0xffffffff, nh_table_id=0)], register=False, table_id=1)) rl.append(VppIpRoute(cls, "0.0.0.0", 0, [VppRoutePath(cls.pg1.local_ip4, cls.pg1.sw_if_index)], register=False)) rl.append(VppIpRoute(cls, cls.pg5.remote_ip4, 32, [VppRoutePath("0.0.0.0", cls.pg5.sw_if_index)], register=False, table_id=1)) rl.append(VppIpRoute(cls, cls.pg6.remote_ip4, 32, [VppRoutePath("0.0.0.0", cls.pg6.sw_if_index)], register=False, table_id=1)) rl.append(VppIpRoute(cls, cls.pg6.remote_ip4, 16, [VppRoutePath("0.0.0.0", 0xffffffff, nh_table_id=1)], register=False, table_id=0)) for r in rl: r.add_vpp_config() def get_err_counter(self, path): return self.statistics.get_err_counter(path) def reass_hairpinning(self, server_addr, server_in_port, server_out_port, host_in_port, proto=IP_PROTOS.tcp, ignore_port=False): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: data = b"A" * 4 + b"B" * 16 + b"C" * 3 else: data = b"A" * 16 + b"B" * 16 + b"C" * 3 # send packet from host to server pkts = self.create_stream_frag(self.pg0, self.nat_addr, host_in_port, server_out_port, data, proto) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg0.get_capture(len(pkts)) p = self.reass_frags_and_verify(frags, self.nat_addr, server_addr) if proto != IP_PROTOS.icmp: if not ignore_port: self.assertNotEqual(p[layer].sport, host_in_port) self.assertEqual(p[layer].dport, server_in_port) else: if not ignore_port: self.assertNotEqual(p[layer].id, host_in_port) self.assertEqual(data, p[Raw].load) def frag_out_of_order(self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: data = b"A" * 4 + b"B" * 16 + b"C" * 3 else: data = b"A" * 16 + b"B" * 16 + b"C" * 3 self.port_in = self.random_port() for i in range(2): # in2out pkts = self.create_stream_frag(self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto) pkts.reverse() self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg1.get_capture(len(pkts)) if not dont_translate: p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4) else: p = self.reass_frags_and_verify(frags, self.pg0.remote_ip4, self.pg1.remote_ip4) if proto != IP_PROTOS.icmp: if not dont_translate: self.assertEqual(p[layer].dport, 20) if not ignore_port: self.assertNotEqual(p[layer].sport, self.port_in) else: self.assertEqual(p[layer].sport, self.port_in) else: if not ignore_port: if not dont_translate: self.assertNotEqual(p[layer].id, self.port_in) else: self.assertEqual(p[layer].id, self.port_in) self.assertEqual(data, p[Raw].load) # out2in if not dont_translate: dst_addr = self.nat_addr else: dst_addr = self.pg0.remote_ip4 if proto != IP_PROTOS.icmp: sport = 20 dport = p[layer].sport else: sport = p[layer].id dport = 0 pkts = self.create_stream_frag(self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True) pkts.reverse() self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.logger.info(self.vapi.cli("show trace")) self.pg_start() frags = self.pg0.get_capture(len(pkts)) p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4) if proto != IP_PROTOS.icmp: self.assertEqual(p[layer].sport, 20) self.assertEqual(p[layer].dport, self.port_in) else: self.assertEqual(p[layer].id, self.port_in) self.assertEqual(data, p[Raw].load) def reass_frags_and_verify(self, frags, src, dst): buffer = BytesIO() for p in frags: self.assertEqual(p[IP].src, src) self.assertEqual(p[IP].dst, dst) self.assert_ip_checksum_valid(p) buffer.seek(p[IP].frag * 8) buffer.write(bytes(p[IP].payload)) ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto) if ip.proto == IP_PROTOS.tcp: p = (ip / TCP(buffer.getvalue())) self.logger.debug(ppp("Reassembled:", p)) self.assert_tcp_checksum_valid(p) elif ip.proto == IP_PROTOS.udp: p = (ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:])) elif ip.proto == IP_PROTOS.icmp: p = (ip / ICMP(buffer.getvalue())) return p def frag_in_order(self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: data = b"A" * 4 + b"B" * 16 + b"C" * 3 else: data = b"A" * 16 + b"B" * 16 + b"C" * 3 self.port_in = self.random_port() # in2out pkts = self.create_stream_frag(self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg1.get_capture(len(pkts)) if not dont_translate: p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4) else: p = self.reass_frags_and_verify(frags, self.pg0.remote_ip4, self.pg1.remote_ip4) if proto != IP_PROTOS.icmp: if not dont_translate: self.assertEqual(p[layer].dport, 20) if not ignore_port: self.assertNotEqual(p[layer].sport, self.port_in) else: self.assertEqual(p[layer].sport, self.port_in) else: if not ignore_port: if not dont_translate: self.assertNotEqual(p[layer].id, self.port_in) else: self.assertEqual(p[layer].id, self.port_in) self.assertEqual(data, p[Raw].load) # out2in if not dont_translate: dst_addr = self.nat_addr else: dst_addr = self.pg0.remote_ip4 if proto != IP_PROTOS.icmp: sport = 20 dport = p[layer].sport else: sport = p[layer].id dport = 0 pkts = self.create_stream_frag(self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg0.get_capture(len(pkts)) p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4) if proto != IP_PROTOS.icmp: self.assertEqual(p[layer].sport, 20) self.assertEqual(p[layer].dport, self.port_in) else: self.assertEqual(p[layer].id, self.port_in) self.assertEqual(data, p[Raw].load) def verify_capture_out(self, capture, nat_ip=None, same_port=False, dst_ip=None, ignore_port=False): if nat_ip is None: nat_ip = self.nat_addr for packet in capture: try: self.assert_packet_checksums_valid(packet) self.assertEqual(packet[IP].src, nat_ip) if dst_ip is not None: self.assertEqual(packet[IP].dst, dst_ip) if packet.haslayer(TCP): if not ignore_port: if same_port: self.assertEqual( packet[TCP].sport, self.tcp_port_in) else: self.assertNotEqual( packet[TCP].sport, self.tcp_port_in) self.tcp_port_out = packet[TCP].sport self.assert_packet_checksums_valid(packet) elif packet.haslayer(UDP): if not ignore_port: if same_port: self.assertEqual( packet[UDP].sport, self.udp_port_in) else: self.assertNotEqual( packet[UDP].sport, self.udp_port_in) self.udp_port_out = packet[UDP].sport else: if not ignore_port: if same_port: self.assertEqual( packet[ICMP].id, self.icmp_id_in) else: self.assertNotEqual( packet[ICMP].id, self.icmp_id_in) self.icmp_id_out = packet[ICMP].id self.assert_packet_checksums_valid(packet) except: self.logger.error(ppp("Unexpected or invalid packet " "(outside network):", packet)) raise def verify_capture_in(self, capture, in_if): for packet in capture: try: self.assert_packet_checksums_valid(packet) self.assertEqual(packet[IP].dst, in_if.remote_ip4) if packet.haslayer(TCP): self.assertEqual(packet[TCP].dport, self.tcp_port_in) elif packet.haslayer(UDP): self.assertEqual(packet[UDP].dport, self.udp_port_in) else: self.assertEqual(packet[ICMP].id, self.icmp_id_in) except: self.logger.error(ppp("Unexpected or invalid packet " "(inside network):", packet)) raise def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64): if dst_ip is None: dst_ip = out_if.remote_ip4 pkts = [] # TCP p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) / TCP(sport=self.tcp_port_in, dport=20)) pkts.extend([p, p]) # UDP p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) / UDP(sport=self.udp_port_in, dport=20)) pkts.append(p) # ICMP p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) / ICMP(id=self.icmp_id_in, type='echo-request')) pkts.append(p) return pkts def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False): if dst_ip is None: dst_ip = self.nat_addr if not use_inside_ports: tcp_port = self.tcp_port_out udp_port = self.udp_port_out icmp_id = self.icmp_id_out else: tcp_port = self.tcp_port_in udp_port = self.udp_port_in icmp_id = self.icmp_id_in pkts = [] # TCP p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / TCP(dport=tcp_port, sport=20)) pkts.extend([p, p]) # UDP p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / UDP(dport=udp_port, sport=20)) pkts.append(p) # ICMP p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / ICMP(id=icmp_id, type='echo-reply')) pkts.append(p) return pkts def create_tcp_stream(self, in_if, out_if, count): pkts = [] port = 6303 for i in range(count): p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=64) / TCP(sport=port + i, dport=20)) pkts.append(p) return pkts def create_stream_frag(self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False): if proto == IP_PROTOS.tcp: p = (IP(src=src_if.remote_ip4, dst=dst) / TCP(sport=sport, dport=dport) / Raw(data)) p = p.__class__(scapy.compat.raw(p)) chksum = p[TCP].chksum proto_header = TCP(sport=sport, dport=dport, chksum=chksum) elif proto == IP_PROTOS.udp: proto_header = UDP(sport=sport, dport=dport) elif proto == IP_PROTOS.icmp: if not echo_reply: proto_header = ICMP(id=sport, type='echo-request') else: proto_header = ICMP(id=sport, type='echo-reply') else: raise Exception("Unsupported protocol") id = self.random_port() pkts = [] if proto == IP_PROTOS.tcp: raw = Raw(data[0:4]) else: raw = Raw(data[0:16]) p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) / proto_header / raw) pkts.append(p) if proto == IP_PROTOS.tcp: raw = Raw(data[4:20]) else: raw = Raw(data[16:32]) p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto) / raw) pkts.append(p) if proto == IP_PROTOS.tcp: raw = Raw(data[20:]) else: raw = Raw(data[32:]) p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id) / raw) pkts.append(p) return pkts def frag_in_order_in_plus_out(self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: data = b"A" * 4 + b"B" * 16 + b"C" * 3 else: data = b"A" * 16 + b"B" * 16 + b"C" * 3 port_in = self.random_port() for i in range(2): # out2in pkts = self.create_stream_frag(self.pg0, out_addr, port_in, out_port, data, proto) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg1.get_capture(len(pkts)) p = self.reass_frags_and_verify(frags, self.pg0.remote_ip4, in_addr) if proto != IP_PROTOS.icmp: self.assertEqual(p[layer].sport, port_in) self.assertEqual(p[layer].dport, in_port) else: self.assertEqual(p[layer].id, port_in) self.assertEqual(data, p[Raw].load) # in2out if proto != IP_PROTOS.icmp: pkts = self.create_stream_frag(self.pg1, self.pg0.remote_ip4, in_port, p[layer].sport, data, proto) else: pkts = self.create_stream_frag(self.pg1, self.pg0.remote_ip4, p[layer].id, 0, data, proto, echo_reply=True) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg0.get_capture(len(pkts)) p = self.reass_frags_and_verify(frags, out_addr, self.pg0.remote_ip4) if proto != IP_PROTOS.icmp: self.assertEqual(p[layer].sport, out_port) self.assertEqual(p[layer].dport, port_in) else: self.assertEqual(p[layer].id, port_in) self.assertEqual(data, p[Raw].load) def frag_out_of_order_in_plus_out(self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: data = b"A" * 4 + b"B" * 16 + b"C" * 3 else: data = b"A" * 16 + b"B" * 16 + b"C" * 3 port_in = self.random_port() for i in range(2): # out2in pkts = self.create_stream_frag(self.pg0, out_addr, port_in, out_port, data, proto) pkts.reverse() self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg1.get_capture(len(pkts)) p = self.reass_frags_and_verify(frags, self.pg0.remote_ip4, in_addr) if proto != IP_PROTOS.icmp: self.assertEqual(p[layer].dport, in_port) self.assertEqual(p[layer].sport, port_in) self.assertEqual(p[layer].dport, in_port) else: self.assertEqual(p[layer].id, port_in) self.assertEqual(data, p[Raw].load) # in2out if proto != IP_PROTOS.icmp: pkts = self.create_stream_frag(self.pg1, self.pg0.remote_ip4, in_port, p[layer].sport, data, proto) else: pkts = self.create_stream_frag(self.pg1, self.pg0.remote_ip4, p[layer].id, 0, data, proto, echo_reply=True) pkts.reverse() self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg0.get_capture(len(pkts)) p = self.reass_frags_and_verify(frags, out_addr, self.pg0.remote_ip4) if proto != IP_PROTOS.icmp: self.assertEqual(p[layer].sport, out_port) self.assertEqual(p[layer].dport, port_in) else: self.assertEqual(p[layer].id, port_in) self.assertEqual(data, p[Raw].load) def init_tcp_session(self, in_if, out_if, in_port, ext_port): # SYN packet in->out p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) / TCP(sport=in_port, dport=ext_port, flags="S")) in_if.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = out_if.get_capture(1) p = capture[0] out_port = p[TCP].sport # SYN + ACK packet out->in p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) / IP(src=out_if.remote_ip4, dst=self.nat_addr) / TCP(sport=ext_port, dport=out_port, flags="SA")) out_if.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() in_if.get_capture(1) # ACK packet in->out p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) / TCP(sport=in_port, dport=ext_port, flags="A")) in_if.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() out_if.get_capture(1) return out_port def twice_nat_common(self, self_twice_nat=False, same_pg=False, lb=False, client_id=None): twice_nat_addr = '10.0.1.3' port_in = 8080 if lb: if not same_pg: port_in1 = port_in port_in2 = port_in else: port_in1 = port_in + 1 port_in2 = port_in + 2 port_out = 80 eh_port_out = 4567 server1 = self.pg0.remote_hosts[0] server2 = self.pg0.remote_hosts[1] if lb and same_pg: server2 = server1 if not lb: server = server1 pg0 = self.pg0 if same_pg: pg1 = self.pg0 else: pg1 = self.pg1 eh_translate = ((not self_twice_nat) or (not lb and same_pg) or client_id == 1) self.nat_add_address(self.nat_addr) self.nat_add_address(twice_nat_addr, twice_nat=1) flags = 0 if self_twice_nat: flags |= self.config_flags.NAT_IS_SELF_TWICE_NAT else: flags |= self.config_flags.NAT_IS_TWICE_NAT if not lb: self.nat_add_static_mapping(pg0.remote_ip4, self.nat_addr, port_in, port_out, proto=IP_PROTOS.tcp, flags=flags) else: locals = [{'addr': server1.ip4, 'port': port_in1, 'probability': 50, 'vrf_id': 0}, {'addr': server2.ip4, 'port': port_in2, 'probability': 50, 'vrf_id': 0}] out_addr = self.nat_addr self.vapi.nat44_add_del_lb_static_mapping(is_add=1, flags=flags, external_addr=out_addr, external_port=port_out, protocol=IP_PROTOS.tcp, local_num=len(locals), locals=locals) self.nat_add_inside_interface(pg0) self.nat_add_outside_interface(pg1) if same_pg: if not lb: client = server else: assert client_id is not None if client_id == 1: client = self.pg0.remote_hosts[0] elif client_id == 2: client = self.pg0.remote_hosts[1] else: client = pg1.remote_hosts[0] p = (Ether(src=pg1.remote_mac, dst=pg1.local_mac) / IP(src=client.ip4, dst=self.nat_addr) / TCP(sport=eh_port_out, dport=port_out)) pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = pg0.get_capture(1) p = capture[0] try: ip = p[IP] tcp = p[TCP] if lb: if ip.dst == server1.ip4: server = server1 port_in = port_in1 else: server = server2 port_in = port_in2 self.assertEqual(ip.dst, server.ip4) if lb and same_pg: self.assertIn(tcp.dport, [port_in1, port_in2]) else: self.assertEqual(tcp.dport, port_in) if eh_translate: self.assertEqual(ip.src, twice_nat_addr) self.assertNotEqual(tcp.sport, eh_port_out) else: self.assertEqual(ip.src, client.ip4) self.assertEqual(tcp.sport, eh_port_out) eh_addr_in = ip.src eh_port_in = tcp.sport saved_port_in = tcp.dport self.assert_packet_checksums_valid(p) except: self.logger.error(ppp("Unexpected or invalid packet:", p)) raise p = (Ether(src=server.mac, dst=pg0.local_mac) / IP(src=server.ip4, dst=eh_addr_in) / TCP(sport=saved_port_in, dport=eh_port_in)) pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = pg1.get_capture(1) p = capture[0] try: ip = p[IP] tcp = p[TCP] self.assertEqual(ip.dst, client.ip4) self.assertEqual(ip.src, self.nat_addr) self.assertEqual(tcp.dport, eh_port_out) self.assertEqual(tcp.sport, port_out) self.assert_packet_checksums_valid(p) except: self.logger.error(ppp("Unexpected or invalid packet:", p)) raise if eh_translate: sessions = self.vapi.nat44_user_session_dump(server.ip4, 0) self.assertEqual(len(sessions), 1) self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID) self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_TWICE_NAT) self.logger.info(self.vapi.cli("show nat44 sessions")) self.vapi.nat44_del_session( address=sessions[0].inside_ip_address, port=sessions[0].inside_port, protocol=sessions[0].protocol, flags=(self.config_flags.NAT_IS_INSIDE | self.config_flags.NAT_IS_EXT_HOST_VALID), ext_host_address=sessions[0].ext_host_nat_address, ext_host_port=sessions[0].ext_host_nat_port) sessions = self.vapi.nat44_user_session_dump(server.ip4, 0) self.assertEqual(len(sessions), 0) def verify_syslog_sess(self, data, is_add=True, is_ip6=False): message = data.decode('utf-8') try: message = SyslogMessage.parse(message) except ParseError as e: self.logger.error(e) raise else: self.assertEqual(message.severity, SyslogSeverity.info) self.assertEqual(message.appname, 'NAT') self.assertEqual(message.msgid, 'SADD' if is_add else 'SDEL') sd_params = message.sd.get('nsess') self.assertTrue(sd_params is not None) if is_ip6: self.assertEqual(sd_params.get('IATYP'), 'IPv6') self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip6) else: self.assertEqual(sd_params.get('IATYP'), 'IPv4') self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4) self.assertTrue(sd_params.get('SSUBIX') is not None) self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in) self.assertEqual(sd_params.get('XATYP'), 'IPv4') self.assertEqual(sd_params.get('XSADDR'), self.nat_addr) self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out) self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp) self.assertEqual(sd_params.get('SVLAN'), '0') self.assertEqual(sd_params.get('XDADDR'), self.pg1.remote_ip4) self.assertEqual(sd_params.get('XDPORT'), "%d" % self.tcp_external_port) class TestNAT44ED(NAT44EDTestCase): """ NAT44ED Test Case """ def test_icmp_error(self): """ NAT44ED test ICMP error message with inner header""" payload = "H" * 10 self.nat_add_address(self.nat_addr) self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg1) # in2out (initiate connection) p1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / UDP(sport=21, dport=20) / payload) self.pg0.add_stream(p1) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(1)[0] # out2in (send error message) p2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / ICMP(type='dest-unreach', code='port-unreachable') / capture[IP:]) self.pg1.add_stream(p2) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg0.get_capture(1)[0] self.logger.info(ppp("p1 packet:", p1)) self.logger.info(ppp("p2 packet:", p2)) self.logger.info(ppp("capture packet:", capture)) def test_icmp_echo_reply_trailer(self): """ ICMP echo reply with ethernet trailer""" self.nat_add_address(self.nat_addr) self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg1) # in2out p1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / ICMP(type=8, id=0xabcd, seq=0)) self.pg0.add_stream(p1) self.pg_enable_capture(self.pg_interfaces) self.pg_start() c = self.pg1.get_capture(1)[0] self.logger.debug(self.vapi.cli("show trace")) # out2in p2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / IP(src=self.pg1.remote_ip4, dst=self.nat_addr, id=0xee59) / ICMP(type=0, id=c[ICMP].id, seq=0)) # force checksum calculation p2 = p2.__class__(bytes(p2)) self.logger.debug(ppp("Packet before modification:", p2)) # hex representation of vss monitoring ethernet trailer # this seems to be just added to end of packet without modifying # IP or ICMP lengths / checksums p2 = p2 / Raw("\x00\x00\x52\x54\x00\x46\xab\x04\x84\x18") # change it so that IP/ICMP is unaffected p2[IP].len = 28 self.logger.debug(ppp("Packet with added trailer:", p2)) self.pg1.add_stream(p2) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.pg0.get_capture(1) def test_users_dump(self): """ NAT44ED API test - nat44_user_dump """ self.nat_add_address(self.nat_addr) self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg1) self.vapi.nat44_forwarding_enable_disable(enable=1) local_ip = self.pg0.remote_ip4 external_ip = self.nat_addr self.nat_add_static_mapping(local_ip, external_ip) users = self.vapi.nat44_user_dump() self.assertEqual(len(users), 0) # in2out - static mapping match pkts = self.create_stream_out(self.pg1) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg0.get_capture(len(pkts)) self.verify_capture_in(capture, self.pg0) pkts = self.create_stream_in(self.pg0, self.pg1) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) self.verify_capture_out(capture, same_port=True) users = self.vapi.nat44_user_dump() self.assertEqual(len(users), 1) static_user = users[0] self.assertEqual(static_user.nstaticsessions, 3) self.assertEqual(static_user.nsessions, 0) # in2out - no static mapping match (forwarding test) host0 = self.pg0.remote_hosts[0] self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1] try: pkts = self.create_stream_out(self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg0.get_capture(len(pkts)) self.verify_capture_in(capture, self.pg0) pkts = self.create_stream_in(self.pg0, self.pg1) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4, same_port=True) finally: self.pg0.remote_hosts[0] = host0 users = self.vapi.nat44_user_dump() self.assertEqual(len(users), 2) if str(users[0].ip_address) == self.pg0.remote_hosts[0].ip4: non_static_user = users[1] static_user = users[0] else: non_static_user = users[0] static_user = users[1] self.assertEqual(static_user.nstaticsessions, 3) self.assertEqual(static_user.nsessions, 0) self.assertEqual(non_static_user.nstaticsessions, 0) self.assertEqual(non_static_user.nsessions, 3) users = self.vapi.nat44_user_dump() self.assertEqual(len(users), 2) if str(users[0].ip_address) == self.pg0.remote_hosts[0].ip4: non_static_user = users[1] static_user = users[0] else: non_static_user = users[0] static_user = users[1] self.assertEqual(static_user.nstaticsessions, 3) self.assertEqual(static_user.nsessions, 0) self.assertEqual(non_static_user.nstaticsessions, 0) self.assertEqual(non_static_user.nsessions, 3) def test_frag_out_of_order_do_not_translate(self): """ NAT44ED don't translate fragments arriving out of order """ self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg1) self.vapi.nat44_forwarding_enable_disable(enable=True) self.frag_out_of_order(proto=IP_PROTOS.tcp, dont_translate=True) def test_forwarding(self): """ NAT44ED forwarding test """ self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg1) self.vapi.nat44_forwarding_enable_disable(enable=1) real_ip = self.pg0.remote_ip4 alias_ip = self.nat_addr flags = self.config_flags.NAT_IS_ADDR_ONLY self.vapi.nat44_add_del_static_mapping(is_add=1, local_ip_address=real_ip, external_ip_address=alias_ip, external_sw_if_index=0xFFFFFFFF, flags=flags) try: # in2out - static mapping match pkts = self.crea
#!/usr/bin/env python3
"""IP{4,6} over IP{v,6} tunnel functional tests"""

import unittest
from scapy.layers.inet6 import IPv6, Ether, IP, UDP, IPv6ExtHdrFragment, Raw
from scapy.all import fragment, fragment6, RandShort, defragment6
from framework import VppTestCase, VppTestRunner
from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable, FibPathProto
from vpp_ipip_tun_interface import VppIpIpTunInterface
from vpp_teib import VppTeib
from vpp_papi import VppEnum
from socket import AF_INET, AF_INET6, inet_pton
from util import reassemble4

""" Testipip is a subclass of  VPPTestCase classes.

IPIP tests.

"""


def ipip_add_tunnel(test, src, dst, table_id=0, dscp=0x0,
                    flags=0):
    """ Add a IPIP tunnel """
    return test.vapi.ipip_add_tunnel(
        tunnel={
            'src': src,
            'dst': dst,
            'table_id': table_id,
            'instance': 0xffffffff,
            'dscp': dscp,
            'flags': flags
        }
    )

# the number of packets to send when injecting traffic.
# a multiple of 8 minus one, so we test all by 8/4/2/1 loops
N_PACKETS = 64 - 1


class TestIPIP(VppTestCase):
    """ IPIP Test Case """

    @classmethod
    def setUpClass(cls):
        super(TestIPIP, cls).setUpClass()
        cls.create_pg_interfaces(range(2))
        cls.interfaces = list(cls.pg_interfaces)

    @classmethod
    def tearDownClass(cls):
        super(TestIPIP, cls).tearDownClass()

    def setUp(self):
        super(TestIPIP, self).setUp()
        for i in self.interfaces:
            i.admin_up()
            i.config_ip4()
            i.config_ip6()
            i.disable_ipv6_ra()
            i.resolve_arp()
            i.resolve_ndp()

    def tearDown(self):
        super(TestIPIP, self).tearDown()
        if not self.vpp_dead:
            for i in self.pg_interfaces:
                i.unconfig_ip4()
                i.unconfig_ip6()
                i.admin_down()

    def validate(self, rx, expected):
        self.assertEqual(rx, expected.__class__(expected))

    def generate_ip4_frags(self, payload_length, fragment_size):
        p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
        p_payload = UDP(sport=1234, dport=1234) / self.payload(payload_length)
        p_ip4 = IP(src="1.2.3.4", dst=self.pg0.remote_ip4)
        outer_ip4 = (p_ether / IP(src=self.pg1.remote_ip4,
                                  id=RandShort(),
                                  dst=self.pg0.local_ip4) / p_ip4 / p_payload)
        frags = fragment(outer_ip4, fragment_size)
        p4_reply = (p_ip4 / p_payload)
        p4_reply.ttl -= 1
        return frags, p4_reply

    def verify_ip4ip4_encaps(self, a, p_ip4s, p_ip4_encaps):
        for i, p_ip4 in enumerate(p_ip4s):
            p_ip4.dst = a
            p4 = (self.p_ether / p_ip4 / self.p_payload)
            p_ip4_inner = p_ip4
            p_ip4_inner.ttl -= 1
            p4_reply = (p_ip4_encaps[i] / p_ip4_inner / self.p_payload)
            p4_reply.ttl -= 1
            p4_reply.id = 0
            rx = self.send_and_expect(self.pg0, p4 * N_PACKETS, self.pg1)
            for p in rx:
                self.validate(p[1], p4_reply)
                self.assert_packet_checksums_valid(p)

    def verify_ip6ip4_encaps(self, a, p_ip6s, p_ip4_encaps):
        for i, p_ip6 in enumerate(p_ip6s):
            p_ip6.dst = a
            p6 = (self.p_ether / p_ip6 / self.p_payload)
            p_inner_ip6 = p_ip6
            p_inner_ip6.hlim -= 1
            p6_reply = (p_ip4_encaps[i] / p_inner_ip6 / self.p_payload)
            p6_reply.ttl -= 1
            rx = self.send_and_expect(self.pg0, p6 * N_PACKETS, self.pg1)
            for p in rx:
                self.validate(p[1], p6_reply)
                self.assert_packet_checksums_valid(p)

    def test_ipip4(self):
        """ ip{v4,v6} over ip4 test """

        self.pg1.generate_remote_hosts(5)
        self.pg1.configure_ipv4_neighbors()
        e = VppEnum.vl_api_tunnel_encap_decap_flags_t
        d = VppEnum.vl_api_ip_dscp_t
        self.p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
        self.p_payload = UDP(sport=1234, dport=1234) / Raw(b'X' * 100)

        # create a TOS byte by shifting a DSCP code point 2 bits. those 2 bits
        # are for the ECN.
        dscp = d.IP_API_DSCP_AF31 << 2
        ecn = 3
        dscp_ecn = d.IP_API_DSCP_AF31 << 2 | ecn

        # IPv4 transport that copies the DCSP from the payload
        tun_dscp = VppIpIpTunInterface(
            self,
            self.pg0,
            self.pg0.local_ip4,
            self.pg1.remote_hosts[0].ip4,
            flags=e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DSCP)
        tun_dscp.add_vpp_config()
        # IPv4 transport that copies the DCSP and ECN from the payload
        tun_dscp_ecn = VppIpIpTunInterface(
            self,
            self.pg0,
            self.pg0.local_ip4,
            self.pg1.remote_hosts[1].ip4,
            flags=(e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DSCP |
                   e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_ECN))
        tun_dscp_ecn.add_vpp_config()
        # IPv4 transport that copies the ECN from the payload and sets the
        # DF bit on encap. copies the ECN on decap
        tun_ecn = VppIpIpTunInterface(
            self,
            self.pg0,
            self.pg0.local_ip4,
            self.pg1.remote_hosts[2].ip4,
            flags=(e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_ECN |
                   e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_SET_DF |
                   e.TUNNEL_API_ENCAP_DECAP_FLAG_DECAP_COPY_ECN))
        tun_ecn.add_vpp_config()
        # IPv4 transport that sets a fixed DSCP in the encap and copies
        # the DF bit
        tun = VppIpIpTunInterface(
            self,
            self.pg0,
            self.pg0.local_ip4,
            self.pg1.remote_hosts[3].ip4,
            dscp=d.IP_API_DSCP_AF11,
            flags=e.TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DF)
        tun.add_vpp_config()

        # array of all the tunnels
        tuns = [tun_dscp, tun_dscp_ecn, tun_ecn, tun]

        # addresses for prefixes routed via each tunnel
        a4s = ["" for i in range(len(tuns))]
        a6s = ["" for i in range(len(tuns))]

        # IP headers with each combination of DSCp/ECN tested
        p_ip6s = [IPv6(src="1::1", dst="DEAD::1", nh='UDP', tc=dscp),
                  IPv6(src="1::1", dst="DEAD::1", nh='UDP', tc=dscp_ecn),
                  IPv6(src="1::1", dst="DEAD::1", nh='UDP', tc=ecn),
                  IPv6(src="1::1", dst="DEAD::1", nh='UDP', tc=0xff)]
        p_ip4s = [IP(src="1.2.3.4", dst="130.67.0.1", tos=dscp, flags='DF'),
                  IP(src="1.2.3.4", dst="130.67.0.1", tos=dscp_ecn),
                  IP(src="1.2.3.4", dst="130.67.0.1", tos=ecn),
                  IP(src="1.2.3.4", dst="130.67.0.1", tos=0xff)]

        # Configure each tunnel
        for i, t in enumerate(tuns):
            # Set interface up and enable IP on it
            self.vapi.sw_interface_set_flags(t.sw_if_index, 1)
            self.vapi.sw_interface_set_unnumbered(
                sw_if_index=self.pg0.sw_if_index,
                unnumbered_sw_if_index=t.sw_if_index)

            # prefix for route / destination address for packets
            a4s[i] = "130.67.%d.0" % i
            a6s[i] = "dead:%d::" % i

            # Add IPv4 and IPv6 routes via tunnel interface
            ip4_via_tunnel = VppIpRoute(
                self, a4s[i], 24,
                [VppRoutePath("0.0.0.0",
                              t.sw_if_index,
                              proto=FibPathProto.FIB_PATH_NH_PROTO_IP4)])
            ip4_via_tunnel.add_vpp_config()

            ip6_via_tunnel = VppIpRoute(
                self, a6s[i], 64,
                [VppRoutePath("::",
                              t.sw_if_index,
                              proto=FibPathProto.FIB_PATH_NH_PROTO_IP6)])
            ip6_via_tunnel.add_vpp_config()

        #
        # Encapsulation
        #

        # tun_dscp copies only the dscp
        # expected TC values are thus only the DCSP value is present from the
        # inner
        exp_tcs = [dscp, dscp, 0, 0xfc]
        p_ip44_encaps = [IP(src=self.pg0.local_ip4,
                            dst=tun_dscp.dst,
                            tos=tc) for tc in exp_tcs]
        p_ip64_encaps = [IP(src=self.pg0.local_ip4,
                            dst=tun_dscp.dst,
                            proto='ipv6', id=0, tos=tc) for tc in exp_tcs]

        # IPv4 in to IPv4 tunnel
        self.verify_ip4ip4_encaps(a4s[0], p_ip4s, p_ip44_encaps)
        # IPv6 in to IPv4 tunnel
        self.verify_ip6ip4_encaps(a6s[0], p_ip6s, p_ip64_encaps)

        # tun_dscp_ecn copies the dscp and the ecn
        exp_tcs = [dscp, dscp_ecn, ecn, 0xff]
        p_ip44_encaps = [IP(src=self.pg0.local_ip4,
                            dst=tun_dscp_ecn.dst,
                            tos=tc) for tc in exp_tcs]
        p_ip64_encaps = [IP(src=self.pg0.local_ip4,
                            dst=tun_dscp_ecn.dst,
                            proto='ipv6', id=0, tos=tc) for tc in exp_tcs]

        self.verify_ip4ip4_encaps(a4s[1], p_ip4s, p_ip44_encaps)
        self.verify_ip6ip4_encaps(a6s[1], p_ip6s, p_ip64_encaps)

        # tun_ecn copies only the ecn and always sets DF
        exp_tcs = [0, ecn, ecn, ecn]
        p_ip44_encaps = [IP(src=self.pg0.local_ip4,
                            dst=tun_ecn.dst,
                            flags='DF', tos=tc) for tc in exp_tcs]
        p_ip64_encaps = [IP(src=self.pg0.local_ip4,
                            dst=tun_ecn.dst,
                            flags='DF', proto='ipv6', id=0, tos=tc)
                         for tc in exp_tcs]

        self.verify_ip4ip4_encaps(a4s[2], p_ip4s, p_ip44_encaps)
        self.verify_ip6ip4_encaps(a6s[2], p_ip6s, p_ip64_encaps)

        # tun sets a fixed dscp and copies DF
        fixed_dscp = tun.dscp << 2
        flags = ['DF', 0, 0, 0]
        p_ip44_encaps = [IP(src=self.pg0.local_ip4,
                            dst=tun.dst,
                            flags=f,
                            tos=fixed_dscp) for f in flags]
        p_ip64_encaps = [IP(src=self.pg0.local_ip4,
                            dst=tun.dst,
                            proto='ipv6', id=0,
                            tos=fixed_dscp) for i in range(len(p_ip4s))]

        self.verify_ip4ip4_encaps(a4s[3], p_ip4s, p_ip44_encaps)
        self.verify_ip6ip4_encaps(a6s[3], p_ip6s, p_ip64_encaps)

        #
        # Decapsulation
        #
        n_packets_decapped = 0
        self.p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)

        # IPv4 tunnel to IPv4
        tcs = [0, dscp, dscp_ecn, ecn]

        # one overlay packet and all combinations of its encap
        p_ip4 = IP(src="1.2.3.4", dst=self.pg0.remote_ip4)
        p_ip4_encaps = [IP(src=tun.dst,
                           dst=self.pg0.local_ip4,
                           tos=tc) for tc in tcs]

        # for each encap tun will produce the same inner packet because it does
        # not copy up fields from the payload
        for p_ip4_encap in p_ip4_encaps:
            p4 = (self.p_ether / p_ip4_encap / p_ip4 / self.p_payload)
            p4_reply = (p_ip4 / self.p_payload)
            p4_reply.ttl -= 1
            rx = self.send_and_expect(self.pg1, p4 * N_PACKETS, self.pg0)
            n_packets_decapped += N_PACKETS
            for p in rx:
                self.validate(p[1], p4_reply)
                self.assert_packet_checksums_valid(p)

        err = self.statistics.get_err_counter(
            '/err/ipip4-input/packets decapsulated')
        self.assertEqual(err, n_packets_decapped)

        # tun_ecn copies the ECN bits from the encap to the inner
        p_ip4_encaps = [IP(src=tun_ecn.dst,
                           dst=self.pg0.local_ip4,
                           tos=tc) for tc in tcs]
        p_ip4_replys = [p_ip4.copy() for i in range(len(p_ip4_encaps))]
        p_ip4_replys[2].tos = ecn
        p_ip4_replys[3].tos = ecn
        for i, p_ip4_encap in enumerate(p_ip4_encaps):
            p4 = (self.p_ether / p_ip4_encap / p_ip4 / self.p_payload)
            p4_reply = (p_ip4_replys[i] / self.p_payload)
            p4_reply.ttl -= 1
            rx = self.send_and_expect(self.pg1, p4 * N_PACKETS, self.pg0)
            n_packets_decapped += N_PACKETS
            for p in rx:
                self.validate(p[1], p4_reply)
                self.assert_packet_checksums_valid(p)

        err = self.statistics.get_err_counter(
            '/err/ipip4-input/packets decapsulated')
        self.assertEqual(err, n_packets_decapped)

        # IPv4 tunnel to IPv6
        # for each encap tun will produce the same inner packet because it does
        # not copy up fields from the payload
        p_ip4_encaps = [IP(src=tun.dst,
                           dst=self.pg0.local_ip4,
                           tos=tc) for tc in tcs]
        p_ip6 = IPv6(src="1:2:3::4", dst=self.pg0.remote_ip6)
        for p_ip4_encap in p_ip4_encaps:
            p6 = (self.p_ether /
                  p_ip4_encap / p_ip6 /
                  self.p_payload)
            p6_reply = (p_ip6 / self.p_payload)
            p6_reply.hlim = 63
            rx = self.send_and_expect(self.pg1, p6 * N_PACKETS, self.pg0)
            n_packets_decapped += N_PACKETS
            for p in rx:
                self.validate(p[1], p6_reply)
                self.assert_packet_checksums_valid(p)

        err = self.statistics.get_err_counter(
            '/err/ipip4-input/packets decapsulated')
        self.assertEqual(err, n_packets_decapped)

        # IPv4 tunnel to IPv6
        # tun_ecn copies the ECN bits from the encap to the inner
        p_ip4_encaps = [IP(src=tun_ecn.dst,
                           dst=self.pg0.local_ip4,
                           tos=tc) for tc in tcs]
        p_ip6 = IPv6(src="1:2:3::4", dst=self.pg0.remote_ip6)
        p_ip6_replys = [p_ip6.copy() for i in range(len(p_ip4_encaps))]
        p_ip6_replys[2].tc = ecn
        p_ip6_replys[3].tc = ecn
        for i, p_ip4_encap in enumerate(p_ip4_encaps):
            p6 = (self.p_ether / p_ip4_encap / p_ip6 / self.p_payload)
            p6_reply = (p_ip6_replys[i] / self.p_payload)
            p6_reply.hlim = 63
            rx = self.send_and_expect(self.pg1, p6 * N_PACKETS, self.pg0)
            n_packets_decapped += N_PACKETS
            for p in rx:
                self.validate(p[1], p6_reply)
                self.assert_packet_checksums_valid(p)

        err = self.statistics.get_err_counter(
            '/err/ipip4-input/packets decapsulated')
        self.assertEqual(err, n_packets_decapped)

        #
        # Fragmentation / Reassembly and Re-fragmentation
        #
        rv = self.vapi.ip_reassembly_enable_disable(
            sw_if_index=self.pg1.sw_if_index,
            enable_ip4=1)

        self.vapi.ip_reassembly_set(timeout_ms=1000, max_reassemblies=1000,
                                    max_reassembly_length=1000,
                                    expire_walk_interval_ms=10000,
                                    is_ip6=0)

        # Send lots of fragments, verify reassembled packet
        frags, p4_reply = self.generate_ip4_frags(3131, 1400)
        f = []
        for i in range(0, 1000):
            f.extend(frags)
        self.pg1.add_stream(f)
        self.pg_enable_capture()
        self.pg_start()
        rx = self.pg0.get_capture(1000)
        n_packets_decapped += 1000

        for p in rx:
            self.validate(p[1], p4_reply)

        err = self.statistics.get_err_counter(
            '/err/ipip4-input/packets decapsulated')
        self.assertEqual(err, n_packets_decapped)

        f = []
        r = []
        for i in range(1, 90):
            frags, p4_reply = self.generate_ip4_frags(i * 100, 1000)
            f.extend(frags)
            r.extend(p4_reply)
        self.pg_enable_capture()
        self.pg1.add_stream(f)
        self.pg_start()
        rx = self.pg0.get_capture(89)
        i = 0
        for p in rx:
            self.validate(p[1], r[i])
            i += 1

        # Now try with re-fragmentation
        #
        # Send fragments to tunnel head-end, for the tunnel head end
        # to reassemble and then refragment
        #
        self.vapi.sw_interface_set_mtu(self.pg0.sw_if_index, [576, 0, 0, 0])
        frags, p4_reply = self.generate_ip4_frags(3123, 1200)
        self.pg_enable_capture()
        self.pg1.add_stream(frags)
        self.pg_start()
        rx = self.pg0.get_capture(6)
        reass_pkt = reassemble4(rx)
        p4_reply.id = 256
        self.validate(reass_pkt, p4_reply)

        self.vapi.sw_interface_set_mtu(self.pg0.sw_if_index, [1600, 0, 0, 0])
        frags, p4_reply = self.generate_ip4_frags(3123, 1200)
        self.pg_enable_capture()
        self.pg1.add_stream(frags)
        self.pg_start()
        rx = self.pg0.get_capture(2)
        reass_pkt = reassemble4(rx)
        p4_reply.id = 512
        self.validate(reass_pkt, p4_reply)

        # send large packets through the tunnel, expect them to be fragmented
        self.vapi.sw_interface_set_mtu(tun_dscp.sw_if_index, [600, 0, 0, 0])

        p4 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
              IP(src="1.2.3.4", dst="130.67.0.1", tos=42) /
              UDP(sport=1234, dport=1234) / Raw(b'Q' * 1000))
        rx = self.send_and_expect(self.pg0, p4 * 15, self.pg1, 30)
        inners = []
        for p in rx:
            inners.append(p[IP].payload)
        reass_pkt = reassemble4(inners)
        for p in reass_pkt:
            self.assert_packet_checksums_valid(p)
            self.assertEqual(p[IP].ttl, 63)

    def test_ipip_create(self):
        """ ipip create / delete interface test """
        rv = ipip_add_tunnel(self, '1.2.3.4', '2.3.4.5')
        sw_if_index = rv.sw_if_index
        self.vapi.ipip_del_tunnel(sw_if_index)

    def test_ipip_vrf_create(self):
        """ ipip create / delete interface VRF test """

        t = VppIpTable(self, 20)
        t.add_vpp_config()
        rv = ipip_add_tunnel(self, '1.2.3.4', '2.3.4.5', table_id=20)
        sw_if_index = rv.sw_if_index
        self.vapi.ipip_del_tunnel(sw_if_index)

    def payload(self, len):
        return 'x' * len

    def test_mipip4(self):
        """ p2mp IPv4 tunnel Tests """

        for itf in self.pg_interfaces:
            #
            # one underlay nh for each overlay/tunnel peer
            #
            itf.generate_remote_hosts(4)
            itf.configure_ipv4_neighbors()

            #
            # Create an p2mo IPIP tunnel.
            #  - set it admin up
            #  - assign an IP Addres
            #  - Add a route via the tunnel
            #
            ipip_if = VppIpIpTunInterface(self, itf,
                                          itf.local_ip4,
                                          "0.0.0.0",
                                          mode=(VppEnum.vl_api_tunnel_mode_t.
                                                TUNNEL_API_MODE_MP))
            ipip_if.add_vpp_config()
            ipip_if.admin_up()
            ipip_if.config_ip4()
            ipip_if.generate_remote_hosts(4)

            self.logger.info(self.vapi.cli("sh adj"))
            self.logger.info(self.vapi.cli("sh ip fib"))

            #
            # ensure we don't match to the tunnel if the source address
            # is all zeros
            #
            # tx = self.create_tunnel_stream_4o4(self.pg0,
            #                                    "0.0.0.0",
            #                                    itf.local_ip4,
            #                                    self.pg0.local_ip4,
            #                                    self.pg0.remote_ip4)
            # self.send_and_assert_no_replies(self.pg0, tx)

            #
            # for-each peer
            #
            for ii in range(1, 4):
                route_addr = "4.4.4.%d" % ii

                #
                # route traffic via the peer
                #
                route_via_tun = VppIpRoute(
                    self, route_addr, 32,
                    [VppRoutePath(ipip_if._remote_hosts[ii].ip4,
                                  ipip_if.sw_if_index)])
                route_via_tun.add_vpp_config()

                #
                # Add a TEIB entry resolves the peer
                #
                teib = VppTeib(self, ipip_if,
                               ipip_if._remote_hosts[ii].ip4,
                               itf._remote_hosts[ii].ip4)
                teib.add_vpp_config()
                self.logger.info(self.vapi.cli("sh adj nbr ipip0 %s" %
                                               ipip_if._remote_hosts[ii].ip4))

                #
                # Send a packet stream that is routed into the tunnel
                #  - packets are IPIP encapped
                #
                inner = (IP(dst=route_addr, src="5.5.5.5") /
                         UDP(sport=1234, dport=1234) /
                         Raw(b'0x44' * 100))
                tx_e = [(Ether(dst=self.pg0.local_mac,
                               src=self.pg0.remote_mac) /
                         inner) for x in range(63)]

                rxs = self.send_and_expect(self.pg0, tx_e, itf)

                for rx in rxs:
                    self.assertEqual(rx[IP].src, itf.local_ip4)
                    self.assertEqual(rx[IP].dst, itf._remote_hosts[ii].ip4)

                tx_i = [(Ether(dst=self.pg0.local_mac,
                               src=self.pg0.remote_mac) /
                         IP(src=itf._remote_hosts[ii].ip4,
                            dst=itf.local_ip4) /
                         IP(src=self.pg0.local_ip4, dst=self.pg0.remote_ip4) /
                         UDP(sport=1234, dport=1234) /
                         Raw(b'0x44' * 100)) for x in range(63)]

                self.logger.info(self.vapi.cli("sh ipip tunnel-hash"))
                rx = self.send_and_expect(self.pg0, tx_i, self.pg0)

                #
                # delete and re-add the TEIB
                #
                teib.remove_vpp_config()
                self.send_and_assert_no_replies(self.pg0, tx_e)
                self.send_and_assert_no_replies(self.pg0, tx_i)

                teib.add_vpp_config()
                rx = self.send_and_expect(self.pg0, tx_e, itf)
                for rx in rxs:
                    self.assertEqual(rx[IP].src, itf.local_ip4)
                    self.assertEqual(rx[IP].dst, itf._remote_hosts[ii].ip4)
                rx = self.send_and_expect(self.pg0, tx_i, self.pg0)

            ipip_if.admin_down()
            ipip_if.unconfig_ip4()


class TestIPIP6(VppTestCase):
    """ IPIP6 Test Case """

    @classmethod
    def setUpClass(cls):
        super(TestIPIP6, cls).setUpClass()
        cls.create_pg_interfaces(range(2))
        cls.interfaces = list(cls.pg_interfaces)

    @classmethod
    def tearDownClass(cls):
        super(TestIPIP6, cls).tearDownClass()

    def setUp(self):
        super(TestIPIP6, self).setUp()
        for i in self.interfaces:
            i.admin_up()
            i.