#!/usr/bin/env python3 import ipaddress import random import socket import struct import unittest from io import BytesIO import scapy.compat from framework import VppTestCase, VppTestRunner from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \ IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \ PacketListField from scapy.data import IP_PROTOS from scapy.layers.inet import IP, TCP, UDP, ICMP from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply from scapy.layers.l2 import Ether, ARP, GRE from scapy.packet import Raw from syslog_rfc5424_parser import SyslogMessage, ParseError from syslog_rfc5424_parser.constants import SyslogSeverity from util import ppp from vpp_ip_route import VppIpRoute, VppRoutePath from vpp_neighbor import VppNeighbor from vpp_papi import VppEnum # NAT HA protocol event data class Event(Packet): name = "Event" fields_desc = [ByteEnumField("event_type", None, {1: "add", 2: "del", 3: "refresh"}), ByteEnumField("protocol", None, {0: "other", 1: "udp", 2: "tcp", 3: "icmp"}), ShortField("flags", 0), IPField("in_addr", None), IPField("out_addr", None), ShortField("in_port", None), ShortField("out_port", None), IPField("eh_addr", None), IPField("ehn_addr", None), ShortField("eh_port", None), ShortField("ehn_port", None), IntField("fib_index", None), IntField("total_pkts", 0), LongField("total_bytes", 0)] def extract_padding(self, s): return "", s # NAT HA protocol header class HANATStateSync(Packet): name = "HA NAT state sync" fields_desc = [XByteField("version", 1), FlagsField("flags", 0, 8, ['ACK']), FieldLenField("count", None, count_of="events"), IntField("sequence_number", 1), IntField("thread_index", 0), PacketListField("events", [], Event, count_from=lambda pkt: pkt.count)] class MethodHolder(VppTestCase): """ NAT create capture and verify method holder """ @property def config_flags(self): return VppEnum.vl_api_nat44_ei_config_flags_t @property def SYSLOG_SEVERITY(self): return VppEnum.vl_api_syslog_severity_t def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0', local_port=0, external_port=0, vrf_id=0, is_add=1, external_sw_if_index=0xFFFFFFFF, proto=0, tag="", flags=0): """ Add/delete NAT44EI static mapping :param local_ip: Local IP address :param external_ip: External IP address :param local_port: Local port number (Optional) :param external_port: External port number (Optional) :param vrf_id: VRF ID (Default 0) :param is_add: 1 if add, 0 if delete (Default add) :param external_sw_if_index: External interface instead of IP address :param proto: IP protocol (Mandatory if port specified) :param tag: Opaque string tag :param flags: NAT configuration flags """ if not (local_port and external_port): flags |= self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING self.vapi.nat44_ei_add_del_static_mapping( is_add=is_add, local_ip_address=local_ip, external_ip_address=external_ip, external_sw_if_index=external_sw_if_index, local_port=local_port, external_port=external_port, vrf_id=vrf_id, protocol=proto, flags=flags, tag=tag) def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF): """ Add/delete NAT44EI address :param ip: IP address :param is_add: 1 if add, 0 if delete (Default add) """ self.vapi.nat44_ei_add_del_address_range(first_ip_address=ip, last_ip_address=ip, vrf_id=vrf_id, is_add=is_add) def create_routes_and_neigbors(self): r1 = VppIpRoute(self, self.pg7.remote_ip4, 32, [VppRoutePath(self.pg7.remote_ip4, self.pg7.sw_if_index)]) r2 = VppIpRoute(self, self.pg8.remote_ip4, 32, [VppRoutePath(self.pg8.remote_ip4, self.pg8.sw_if_index)]) r1.add_vpp_config() r2.add_vpp_config() n1 = VppNeighbor(self, self.pg7.sw_if_index, self.pg7.remote_mac, self.pg7.remote_ip4, is_static=1) n2 = VppNeighbor(self, self.pg8.sw_if_index, self.pg8.remote_mac, self.pg8.remote_ip4, is_static=1) n1.add_vpp_config() n2.add_vpp_config() def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64): """ Create packet stream for inside network :param in_if: Inside interface :param out_if: Outside interface :param dst_ip: Destination address :param ttl: TTL of generated packets """ if dst_ip is None: dst_ip = out_if.remote_ip4 pkts = [] # TCP p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) / TCP(sport=self.tcp_port_in, dport=20)) pkts.extend([p, p]) # UDP p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) / UDP(sport=self.udp_port_in, dport=20)) pkts.append(p) # ICMP p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) / ICMP(id=self.icmp_id_in, type='echo-request')) pkts.append(p) return pkts def compose_ip6(self, ip4, pref, plen): """ Compose IPv4-embedded IPv6 addresses :param ip4: IPv4 address :param pref: IPv6 prefix :param plen: IPv6 prefix length :returns: IPv4-embedded IPv6 addresses """ pref_n = list(socket.inet_pton(socket.AF_INET6, pref)) ip4_n = list(socket.inet_pton(socket.AF_INET, ip4)) if plen == 32: pref_n[4] = ip4_n[0] pref_n[5] = ip4_n[1] pref_n[6] = ip4_n[2] pref_n[7] = ip4_n[3] elif plen == 40: pref_n[5] = ip4_n[0] pref_n[6] = ip4_n[1] pref_n[7] = ip4_n[2] pref_n[9] = ip4_n[3] elif plen == 48: pref_n[6] = ip4_n[0] pref_n[7] = ip4_n[1] pref_n[9] = ip4_n[2] pref_n[10] = ip4_n[3] elif plen == 56: pref_n[7] = ip4_n[0] pref_n[9] = ip4_n[1] pref_n[10] = ip4_n[2] pref_n[11] = ip4_n[3] elif plen == 64: pref_n[9] = ip4_n[0] pref_n[10] = ip4_n[1] pref_n[11] = ip4_n[2] pref_n[12] = ip4_n[3] elif plen == 96: pref_n[12] = ip4_n[0] pref_n[13] = ip4_n[1] pref_n[14] = ip4_n[2] pref_n[15] = ip4_n[3] packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n]) return socket.inet_ntop(socket.AF_INET6, packed_pref_n) def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False): """ Create packet stream for outside network :param out_if: Outside interface :param dst_ip: Destination IP address (Default use global NAT address) :param ttl: TTL of generated packets :param use_inside_ports: Use inside NAT ports as destination ports instead of outside ports """ if dst_ip is None: dst_ip = self.nat_addr if not use_inside_ports: tcp_port = self.tcp_port_out udp_port = self.udp_port_out icmp_id = self.icmp_id_out else: tcp_port = self.tcp_port_in udp_port = self.udp_port_in icmp_id = self.icmp_id_in pkts = [] # TCP p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / TCP(dport=tcp_port, sport=20)) pkts.extend([p, p]) # UDP p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / UDP(dport=udp_port, sport=20)) pkts.append(p) # ICMP p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / ICMP(id=icmp_id, type='echo-reply')) pkts.append(p) return pkts def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64): """ Create packet stream for outside network :param out_if: Outside interface :param dst_ip: Destination IP address (Default use global NAT address) :param hl: HL of generated packets """ pkts = [] # TCP p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IPv6(src=src_ip, dst=dst_ip, hlim=hl) / TCP(dport=self.tcp_port_out, sport=20)) pkts.append(p) # UDP p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IPv6(src=src_ip, dst=dst_ip, hlim=hl) / UDP(dport=self.udp_port_out, sport=20)) pkts.append(p) # ICMP p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IPv6(src=src_ip, dst=dst_ip, hlim=hl) / ICMPv6EchoReply(id=self.icmp_id_out)) pkts.append(p) return pkts def verify_capture_out(self, capture, nat_ip=None, same_port=False, dst_ip=None, is_ip6=False, ignore_port=False): """ Verify captured packets on outside network :param capture: Captured packets :param nat_ip: Translated IP address (Default use global NAT address) :param same_port: Source port number is not translated (Default False) :param dst_ip: Destination IP address (Default do not verify) :param is_ip6: If L3 protocol is IPv6 (Default False) """ if is_ip6: IP46 = IPv6 ICMP46 = ICMPv6EchoRequest else: IP46 = IP ICMP46 = ICMP if nat_ip is None: nat_ip = self.nat_addr for packet in capture: try: if not is_ip6: self.assert_packet_checksums_valid(packet) self.assertEqual(packet[IP46].src, nat_ip) if dst_ip is not None: self.assertEqual(packet[IP46].dst, dst_ip) if packet.haslayer(TCP): if not ignore_port: if same_port: self.assertEqual( packet[TCP].sport, self.tcp_port_in) else: self.assertNotEqual( packet[TCP].sport, self.tcp_port_in) self.tcp_port_out = packet[TCP].sport self.assert_packet_checksums_valid(packet) elif packet.haslayer(UDP): if not ignore_port: if same_port: self.assertEqual( packet[UDP].sport, self.udp_port_in) else: self.assertNotEqual( packet[UDP].sport, self.udp_port_in) self.udp_port_out = packet[UDP].sport else: if not ignore_port: if same_port: self.assertEqual( packet[ICMP46].id, self.icmp_id_in) else: self.assertNotEqual( packet[ICMP46].id, self.icmp_id_in) self.icmp_id_out = packet[ICMP46].id self.assert_packet_checksums_valid(packet) except: self.logger.error(ppp("Unexpected or invalid packet " "(outside network):", packet)) raise def verify_capture_out_ip6(self, capture, nat_ip, same_port=False, dst_ip=None): """ Verify captured packets on outside network :param capture: Captured packets :param nat_ip: Translated IP address :param same_port: Source port number is not translated (Default False) :param dst_ip: Destination IP address (Default do not verify) """ return self.verify_capture_out(capture, nat_ip, same_port, dst_ip, True) def verify_capture_in(self, capture, in_if): """ Verify captured packets on inside network :param capture: Captured packets :param in_if: Inside interface """ for packet in capture: try: self.assert_packet_checksums_valid(packet) self.assertEqual(packet[IP].dst, in_if.remote_ip4) if packet.haslayer(TCP): self.assertEqual(packet[TCP].dport, self.tcp_port_in) elif packet.haslayer(UDP): self.assertEqual(packet[UDP].dport, self.udp_port_in) else: self.assertEqual(packet[ICMP].id, self.icmp_id_in) except: self.logger.error(ppp("Unexpected or invalid packet " "(inside network):", packet)) raise def verify_capture_no_translation(self, capture, ingress_if, egress_if): """ Verify captured packet that don't have to be translated :param capture: Captured packets :param ingress_if: Ingress interface :param egress_if: Egress interface """ for packet in capture: try: self.assertEqual(packet[IP].src, ingress_if.remote_ip4) self.assertEqual(packet[IP].dst, egress_if.remote_ip4) if packet.haslayer(TCP): self.assertEqual(packet[TCP].sport, self.tcp_port_in) elif packet.haslayer(UDP): self.assertEqual(packet[UDP].sport, self.udp_port_in) else: self.assertEqual(packet[ICMP].id, self.icmp_id_in) except: self.logger.error(ppp("Unexpected or invalid packet " "(inside network):", packet)) raise def verify_capture_out_with_icmp_errors(self, capture, src_ip=None, icmp_type=11): """ Verify captured packets with ICMP errors on outside network :param capture: Captured packets :param src_ip: Translated IP address or IP address of VPP (Default use global NAT address) :param icmp_type: Type of error ICMP packet we are expecting (Default 11) """ if src_ip is None: src_ip = self.nat_addr for packet in capture: try: self.assertEqual(packet[IP].src, src_ip) self.assertEqual(packet.haslayer(ICMP), 1) icmp = packet[ICMP] self.assertEqual(icmp.type, icmp_type) self.assertTrue(icmp.haslayer(IPerror)) inner_ip = icmp[IPerror] if inner_ip.haslayer(TCPerror): self.assertEqual(inner_ip[TCPerror].dport, self.tcp_port_out) elif inner_ip.haslayer(UDPerror): self.assertEqual(inner_ip[UDPerror].dport, self.udp_port_out) else: self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out) except: self.logger.error(ppp("Unexpected or invalid packet " "(outside network):", packet)) raise def verify_capture_in_with_icmp_errors(self, capture, in_if, icmp_type=11): """ Verify captured packets with ICMP errors on inside network :param capture: Captured packets :param in_if: Inside interface :param icmp_type: Type of error ICMP packet we are expecting (Default 11) """ for packet in capture: try: self.assertEqual(packet[IP].dst, in_if.remote_ip4) self.assertEqual(packet.haslayer(ICMP), 1) icmp = packet[ICMP] self.assertEqual(icmp.type, icmp_type) self.assertTrue(icmp.haslayer(IPerror)) inner_ip = icmp[IPerror] if inner_ip.haslayer(TCPerror): self.assertEqual(inner_ip[TCPerror].sport, self.tcp_port_in) elif inner_ip.haslayer(UDPerror): self.assertEqual(inner_ip[UDPerror].sport, self.udp_port_in) else: self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in) except: self.logger.error(ppp("Unexpected or invalid packet " "(inside network):", packet)) raise def create_stream_frag(self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False): """ Create fragmented packet stream :param src_if: Source interface :param dst: Destination IPv4 address :param sport: Source port :param dport: Destination port :param data: Payload data :param proto: protocol (TCP, UDP, ICMP) :param echo_reply: use echo_reply if protocol is ICMP :returns: Fragments """ if proto == IP_PROTOS.tcp: p = (IP(src=src_if.remote_ip4, dst=dst) / TCP(sport=sport, dport=dport) / Raw(data)) p = p.__class__(scapy.compat.raw(p)) chksum = p[TCP].chksum proto_header = TCP(sport=sport, dport=dport, chksum=chksum) elif proto == IP_PROTOS.udp: proto_header = UDP(sport=sport, dport=dport) elif proto == IP_PROTOS.icmp: if not echo_reply: proto_header = ICMP(id=sport, type='echo-request') else: proto_header = ICMP(id=sport, type='echo-reply') else: raise Exception("Unsupported protocol") id = random.randint(0, 65535) pkts = [] if proto == IP_PROTOS.tcp: raw = Raw(data[0:4]) else: raw = Raw(data[0:16]) p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) / proto_header / raw) pkts.append(p) if proto == IP_PROTOS.tcp: raw = Raw(data[4:20]) else: raw = Raw(data[16:32]) p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto) / raw) pkts.append(p) if proto == IP_PROTOS.tcp: raw = Raw(data[20:]) else: raw = Raw(data[32:]) p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id) / raw) pkts.append(p) return pkts def reass_frags_and_verify(self, frags, src, dst): """ Reassemble and verify fragmented packet :param frags: Captured fragments :param src: Source IPv4 address to verify :param dst: Destination IPv4 address to verify :returns: Reassembled IPv4 packet """ buffer = BytesIO() for p in frags: self.assertEqual(p[IP].src, src) self.assertEqual(p[IP].dst, dst) self.assert_ip_checksum_valid(p) buffer.seek(p[IP].frag * 8) buffer.write(bytes(p[IP].payload)) ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto) if ip.proto == IP_PROTOS.tcp: p = (ip / TCP(buffer.getvalue())) self.logger.debug(ppp("Reassembled:", p)) self.assert_tcp_checksum_valid(p) elif ip.proto == IP_PROTOS.udp: p = (ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:])) elif ip.proto == IP_PROTOS.icmp: p = (ip / ICMP(buffer.getvalue())) return p def verify_ipfix_nat44_ses(self, data): """ Verify IPFIX NAT44EI session create/delete event :param data: Decoded IPFIX data records """ nat44_ses_create_num = 0 nat44_ses_delete_num = 0 self.assertEqual(6, len(data)) for record in data: # natEvent self.assertIn(scapy.compat.orb(record[230]), [4, 5]) if scapy.compat.orb(record[230]) == 4: nat44_ses_create_num += 1 else: nat44_ses_delete_num += 1 # sourceIPv4Address self.assertEqual(self.pg0.remote_ip4, str(ipaddress.IPv4Address(record[8]))) # postNATSourceIPv4Address self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr), record[225]) # ingressVRFID self.assertEqual(struct.pack("!I", 0), record[234]) # protocolIdentifier/sourceTransportPort # /postNAPTSourceTransportPort if IP_PROTOS.icmp == scapy.compat.orb(record[4]): self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7]) self.assertEqual(struct.pack("!H", self.icmp_id_out), record[227]) elif IP_PROTOS.tcp == scapy.compat.orb(record[4]): self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7]) self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227]) elif IP_PROTOS.udp == scapy.compat.orb(record[4]): self.assertEqual(struct.pack("!H", self.udp_port_in), record[7]) self.assertEqual(struct.pack("!H", self.udp_port_out), record[227]) else: self.fail(f"Invalid protocol {scapy.compat.orb(record[4])}") self.assertEqual(3, nat44_ses_create_num) self.assertEqual(3, nat44_ses_delete_num) def verify_ipfix_addr_exhausted(self, data): self.assertEqual(1, len(data)) record = data[0] # natEvent self.assertEqual(scapy.compat.orb(record[230]), 3) # natPoolID self.assertEqual(struct.pack("!I", 0), record[283]) def verify_ipfix_max_sessions(self, data, limit): self.assertEqual(1, len(data)) record = data[0] # natEvent self.assertEqual(scapy.compat.orb(record[230]), 13) # natQuotaExceededEvent self.assertEqual(struct.pack("!I", 1), record[466]) # maxSessionEntries self.assertEqual(struct.pack("!I", limit), record[471]) def verify_no_nat44_user(self): """ Verify that there is no NAT44EI user """ users = self.vapi.nat44_ei_user_dump() self.assertEqual(len(users), 0) users = self.statistics['/nat44-ei/total-users'] self.assertEqual(users[0][0], 0) sessions = self.statistics['/nat44-ei/total-sessions'] self.assertEqual(sessions[0][0], 0) def verify_syslog_apmap(self, data, is_add=True): message = data.decode('utf-8') try: message = SyslogMessage.parse(message) except ParseError as e: self.logger.error(e) raise else: self.assertEqual(message.severity, SyslogSeverity.info) self.assertEqual(message.appname, 'NAT') self.assertEqual(message.msgid, 'APMADD' if is_add else 'APMDEL') sd_params = message.sd.get('napmap') self.assertTrue(sd_params is not None) self.assertEqual(sd_params.get('IATYP'), 'IPv4') self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4) self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in) self.assertEqual(sd_params.get('XATYP'), 'IPv4') self.assertEqual(sd_params.get('XSADDR'), self.nat_addr) self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out) self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp) self.assertTrue(sd_params.get('SSUBIX') is not None) self.assertEqual(sd_params.get('SVLAN'), '0') def verify_mss_value(self, pkt, mss): if not pkt.haslayer(IP) or not pkt.haslayer(TCP): raise TypeError("Not a TCP/IP packet") for option in pkt[TCP].options: if option[0] == 'MSS': self.assertEqual(option[1], mss) self.assert_tcp_checksum_valid(pkt) @staticmethod def proto2layer(proto): if proto == IP_PROTOS.tcp: return TCP elif proto == IP_PROTOS.udp: return UDP elif proto == IP_PROTOS.icmp: return ICMP else: raise Exception("Unsupported protocol") def frag_in_order(self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: data = b"A" * 4 + b"B" * 16 + b"C" * 3 else: data = b"A" * 16 + b"B" * 16 + b"C" * 3 self.port_in = random.randint(1025, 65535) # in2out pkts = self.create_stream_frag(self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg1.get_capture(len(pkts)) if not dont_translate: p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4) else: p = self.reass_frags_and_verify(frags, self.pg0.remote_ip4, self.pg1.remote_ip4) if proto != IP_PROTOS.icmp: if not dont_translate: self.assertEqual(p[layer].dport, 20) if not ignore_port: self.assertNotEqual(p[layer].sport, self.port_in) else: self.assertEqual(p[layer].sport, self.port_in) else: if not ignore_port: if not dont_translate: self.assertNotEqual(p[layer].id, self.port_in) else: self.assertEqual(p[layer].id, self.port_in) self.assertEqual(data, p[Raw].load) # out2in if not dont_translate: dst_addr = self.nat_addr else: dst_addr = self.pg0.remote_ip4 if proto != IP_PROTOS.icmp: sport = 20 dport = p[layer].sport else: sport = p[layer].id dport = 0 pkts = self.create_stream_frag(self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg0.get_capture(len(pkts)) p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4) if proto != IP_PROTOS.icmp: self.assertEqual(p[layer].sport, 20) self.assertEqual(p[layer].dport, self.port_in) else: self.assertEqual(p[layer].id, self.port_in) self.assertEqual(data, p[Raw].load) def reass_hairpinning(self, server_addr, server_in_port, server_out_port, host_in_port, proto=IP_PROTOS.tcp, ignore_port=False): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: data = b"A" * 4 + b"B" * 16 + b"C" * 3 else: data = b"A" * 16 + b"B" * 16 + b"C" * 3 # send packet from host to server pkts = self.create_stream_frag(self.pg0, self.nat_addr, host_in_port, server_out_port, data, proto) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg0.get_capture(len(pkts)) p = self.reass_frags_and_verify(frags, self.nat_addr, server_addr) if proto != IP_PROTOS.icmp: if not ignore_port: self.assertNotEqual(p[layer].sport, host_in_port) se