diff options
Diffstat (limited to 'test/test_nat44_ei.py')
-rw-r--r-- | test/test_nat44_ei.py | 2521 |
1 files changed, 1419 insertions, 1102 deletions
diff --git a/test/test_nat44_ei.py b/test/test_nat44_ei.py index aafd345f43f..9eb127aaf0b 100644 --- a/test/test_nat44_ei.py +++ b/test/test_nat44_ei.py @@ -10,9 +10,19 @@ from io import BytesIO import scapy.compat from framework import VppTestCase, VppTestRunner from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder -from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \ - IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \ - PacketListField +from scapy.all import ( + bind_layers, + Packet, + ByteEnumField, + ShortField, + IPField, + IntField, + LongField, + XByteField, + FlagsField, + FieldLenField, + PacketListField, +) from scapy.data import IP_PROTOS from scapy.layers.inet import IP, TCP, UDP, ICMP from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror @@ -30,22 +40,22 @@ from vpp_papi import VppEnum # NAT HA protocol event data class Event(Packet): name = "Event" - fields_desc = [ByteEnumField("event_type", None, - {1: "add", 2: "del", 3: "refresh"}), - ByteEnumField("protocol", None, - {0: "other", 1: "udp", 2: "tcp", 3: "icmp"}), - ShortField("flags", 0), - IPField("in_addr", None), - IPField("out_addr", None), - ShortField("in_port", None), - ShortField("out_port", None), - IPField("eh_addr", None), - IPField("ehn_addr", None), - ShortField("eh_port", None), - ShortField("ehn_port", None), - IntField("fib_index", None), - IntField("total_pkts", 0), - LongField("total_bytes", 0)] + fields_desc = [ + ByteEnumField("event_type", None, {1: "add", 2: "del", 3: "refresh"}), + ByteEnumField("protocol", None, {0: "other", 1: "udp", 2: "tcp", 3: "icmp"}), + ShortField("flags", 0), + IPField("in_addr", None), + IPField("out_addr", None), + ShortField("in_port", None), + ShortField("out_port", None), + IPField("eh_addr", None), + IPField("ehn_addr", None), + ShortField("eh_port", None), + ShortField("ehn_port", None), + IntField("fib_index", None), + IntField("total_pkts", 0), + LongField("total_bytes", 0), + ] def extract_padding(self, s): return "", s @@ -54,17 +64,18 @@ class Event(Packet): # NAT HA protocol header class HANATStateSync(Packet): name = "HA NAT state sync" - fields_desc = [XByteField("version", 1), - FlagsField("flags", 0, 8, ['ACK']), - FieldLenField("count", None, count_of="events"), - IntField("sequence_number", 1), - IntField("thread_index", 0), - PacketListField("events", [], Event, - count_from=lambda pkt: pkt.count)] + fields_desc = [ + XByteField("version", 1), + FlagsField("flags", 0, 8, ["ACK"]), + FieldLenField("count", None, count_of="events"), + IntField("sequence_number", 1), + IntField("thread_index", 0), + PacketListField("events", [], Event, count_from=lambda pkt: pkt.count), + ] class MethodHolder(VppTestCase): - """ NAT create capture and verify method holder """ + """NAT create capture and verify method holder""" @property def config_flags(self): @@ -74,10 +85,19 @@ class MethodHolder(VppTestCase): def SYSLOG_SEVERITY(self): return VppEnum.vl_api_syslog_severity_t - def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0', - local_port=0, external_port=0, vrf_id=0, - is_add=1, external_sw_if_index=0xFFFFFFFF, - proto=0, tag="", flags=0): + def nat44_add_static_mapping( + self, + local_ip, + external_ip="0.0.0.0", + local_port=0, + external_port=0, + vrf_id=0, + is_add=1, + external_sw_if_index=0xFFFFFFFF, + proto=0, + tag="", + flags=0, + ): """ Add/delete NAT44EI static mapping @@ -103,9 +123,11 @@ class MethodHolder(VppTestCase): external_sw_if_index=external_sw_if_index, local_port=local_port, external_port=external_port, - vrf_id=vrf_id, protocol=proto, + vrf_id=vrf_id, + protocol=proto, flags=flags, - tag=tag) + tag=tag, + ) def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF): """ @@ -114,31 +136,40 @@ class MethodHolder(VppTestCase): :param ip: IP address :param is_add: 1 if add, 0 if delete (Default add) """ - self.vapi.nat44_ei_add_del_address_range(first_ip_address=ip, - last_ip_address=ip, - vrf_id=vrf_id, - is_add=is_add) + self.vapi.nat44_ei_add_del_address_range( + first_ip_address=ip, last_ip_address=ip, vrf_id=vrf_id, is_add=is_add + ) def create_routes_and_neigbors(self): - r1 = VppIpRoute(self, self.pg7.remote_ip4, 32, - [VppRoutePath(self.pg7.remote_ip4, - self.pg7.sw_if_index)]) - r2 = VppIpRoute(self, self.pg8.remote_ip4, 32, - [VppRoutePath(self.pg8.remote_ip4, - self.pg8.sw_if_index)]) + r1 = VppIpRoute( + self, + self.pg7.remote_ip4, + 32, + [VppRoutePath(self.pg7.remote_ip4, self.pg7.sw_if_index)], + ) + r2 = VppIpRoute( + self, + self.pg8.remote_ip4, + 32, + [VppRoutePath(self.pg8.remote_ip4, self.pg8.sw_if_index)], + ) r1.add_vpp_config() r2.add_vpp_config() - n1 = VppNeighbor(self, - self.pg7.sw_if_index, - self.pg7.remote_mac, - self.pg7.remote_ip4, - is_static=1) - n2 = VppNeighbor(self, - self.pg8.sw_if_index, - self.pg8.remote_mac, - self.pg8.remote_ip4, - is_static=1) + n1 = VppNeighbor( + self, + self.pg7.sw_if_index, + self.pg7.remote_mac, + self.pg7.remote_ip4, + is_static=1, + ) + n2 = VppNeighbor( + self, + self.pg8.sw_if_index, + self.pg8.remote_mac, + self.pg8.remote_ip4, + is_static=1, + ) n1.add_vpp_config() n2.add_vpp_config() @@ -156,21 +187,27 @@ class MethodHolder(VppTestCase): pkts = [] # TCP - p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / - IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) / - TCP(sport=self.tcp_port_in, dport=20)) + p = ( + Ether(dst=in_if.local_mac, src=in_if.remote_mac) + / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) + / TCP(sport=self.tcp_port_in, dport=20) + ) pkts.extend([p, p]) # UDP - p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / - IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) / - UDP(sport=self.udp_port_in, dport=20)) + p = ( + Ether(dst=in_if.local_mac, src=in_if.remote_mac) + / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) + / UDP(sport=self.udp_port_in, dport=20) + ) pkts.append(p) # ICMP - p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / - IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) / - ICMP(id=self.icmp_id_in, type='echo-request')) + p = ( + Ether(dst=in_if.local_mac, src=in_if.remote_mac) + / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) + / ICMP(id=self.icmp_id_in, type="echo-request") + ) pkts.append(p) return pkts @@ -216,11 +253,10 @@ class MethodHolder(VppTestCase): pref_n[13] = ip4_n[1] pref_n[14] = ip4_n[2] pref_n[15] = ip4_n[3] - packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n]) + packed_pref_n = b"".join([scapy.compat.chb(x) for x in pref_n]) return socket.inet_ntop(socket.AF_INET6, packed_pref_n) - def create_stream_out(self, out_if, dst_ip=None, ttl=64, - use_inside_ports=False): + def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False): """ Create packet stream for outside network @@ -242,21 +278,27 @@ class MethodHolder(VppTestCase): icmp_id = self.icmp_id_in pkts = [] # TCP - p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / - IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / - TCP(dport=tcp_port, sport=20)) + p = ( + Ether(dst=out_if.local_mac, src=out_if.remote_mac) + / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) + / TCP(dport=tcp_port, sport=20) + ) pkts.extend([p, p]) # UDP - p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / - IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / - UDP(dport=udp_port, sport=20)) + p = ( + Ether(dst=out_if.local_mac, src=out_if.remote_mac) + / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) + / UDP(dport=udp_port, sport=20) + ) pkts.append(p) # ICMP - p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / - IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / - ICMP(id=icmp_id, type='echo-reply')) + p = ( + Ether(dst=out_if.local_mac, src=out_if.remote_mac) + / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) + / ICMP(id=icmp_id, type="echo-reply") + ) pkts.append(p) return pkts @@ -271,27 +313,40 @@ class MethodHolder(VppTestCase): """ pkts = [] # TCP - p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / - IPv6(src=src_ip, dst=dst_ip, hlim=hl) / - TCP(dport=self.tcp_port_out, sport=20)) + p = ( + Ether(dst=out_if.local_mac, src=out_if.remote_mac) + / IPv6(src=src_ip, dst=dst_ip, hlim=hl) + / TCP(dport=self.tcp_port_out, sport=20) + ) pkts.append(p) # UDP - p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / - IPv6(src=src_ip, dst=dst_ip, hlim=hl) / - UDP(dport=self.udp_port_out, sport=20)) + p = ( + Ether(dst=out_if.local_mac, src=out_if.remote_mac) + / IPv6(src=src_ip, dst=dst_ip, hlim=hl) + / UDP(dport=self.udp_port_out, sport=20) + ) pkts.append(p) # ICMP - p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / - IPv6(src=src_ip, dst=dst_ip, hlim=hl) / - ICMPv6EchoReply(id=self.icmp_id_out)) + p = ( + Ether(dst=out_if.local_mac, src=out_if.remote_mac) + / IPv6(src=src_ip, dst=dst_ip, hlim=hl) + / ICMPv6EchoReply(id=self.icmp_id_out) + ) pkts.append(p) return pkts - def verify_capture_out(self, capture, nat_ip=None, same_port=False, - dst_ip=None, is_ip6=False, ignore_port=False): + def verify_capture_out( + self, + capture, + nat_ip=None, + same_port=False, + dst_ip=None, + is_ip6=False, + ignore_port=False, + ): """ Verify captured packets on outside network @@ -319,39 +374,33 @@ class MethodHolder(VppTestCase): if packet.haslayer(TCP): if not ignore_port: if same_port: - self.assertEqual( - packet[TCP].sport, self.tcp_port_in) + self.assertEqual(packet[TCP].sport, self.tcp_port_in) else: - self.assertNotEqual( - packet[TCP].sport, self.tcp_port_in) + self.assertNotEqual(packet[TCP].sport, self.tcp_port_in) self.tcp_port_out = packet[TCP].sport self.assert_packet_checksums_valid(packet) elif packet.haslayer(UDP): if not ignore_port: if same_port: - self.assertEqual( - packet[UDP].sport, self.udp_port_in) + self.assertEqual(packet[UDP].sport, self.udp_port_in) else: - self.assertNotEqual( - packet[UDP].sport, self.udp_port_in) + self.assertNotEqual(packet[UDP].sport, self.udp_port_in) self.udp_port_out = packet[UDP].sport else: if not ignore_port: if same_port: - self.assertEqual( - packet[ICMP46].id, self.icmp_id_in) + self.assertEqual(packet[ICMP46].id, self.icmp_id_in) else: - self.assertNotEqual( - packet[ICMP46].id, self.icmp_id_in) + self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in) self.icmp_id_out = packet[ICMP46].id self.assert_packet_checksums_valid(packet) except: - self.logger.error(ppp("Unexpected or invalid packet " - "(outside network):", packet)) + self.logger.error( + ppp("Unexpected or invalid packet (outside network):", packet) + ) raise - def verify_capture_out_ip6(self, capture, nat_ip, same_port=False, - dst_ip=None): + def verify_capture_out_ip6(self, capture, nat_ip, same_port=False, dst_ip=None): """ Verify captured packets on outside network @@ -360,8 +409,7 @@ class MethodHolder(VppTestCase): :param same_port: Source port number is not translated (Default False) :param dst_ip: Destination IP address (Default do not verify) """ - return self.verify_capture_out(capture, nat_ip, same_port, dst_ip, - True) + return self.verify_capture_out(capture, nat_ip, same_port, dst_ip, True) def verify_capture_in(self, capture, in_if): """ @@ -381,8 +429,9 @@ class MethodHolder(VppTestCase): else: self.assertEqual(packet[ICMP].id, self.icmp_id_in) except: - self.logger.error(ppp("Unexpected or invalid packet " - "(inside network):", packet)) + self.logger.error( + ppp("Unexpected or invalid packet (inside network):", packet) + ) raise def verify_capture_no_translation(self, capture, ingress_if, egress_if): @@ -404,12 +453,12 @@ class MethodHolder(VppTestCase): else: self.assertEqual(packet[ICMP].id, self.icmp_id_in) except: - self.logger.error(ppp("Unexpected or invalid packet " - "(inside network):", packet)) + self.logger.error( + ppp("Unexpected or invalid packet (inside network):", packet) + ) raise - def verify_capture_out_with_icmp_errors(self, capture, src_ip=None, - icmp_type=11): + def verify_capture_out_with_icmp_errors(self, capture, src_ip=None, icmp_type=11): """ Verify captured packets with ICMP errors on outside network @@ -430,16 +479,15 @@ class MethodHolder(VppTestCase): self.assertTrue(icmp.haslayer(IPerror)) inner_ip = icmp[IPerror] if inner_ip.haslayer(TCPerror): - self.assertEqual(inner_ip[TCPerror].dport, - self.tcp_port_out) + self.assertEqual(inner_ip[TCPerror].dport, self.tcp_port_out) elif inner_ip.haslayer(UDPerror): - self.assertEqual(inner_ip[UDPerror].dport, - self.udp_port_out) + self.assertEqual(inner_ip[UDPerror].dport, self.udp_port_out) else: self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out) except: - self.logger.error(ppp("Unexpected or invalid packet " - "(outside network):", packet)) + self.logger.error( + ppp("Unexpected or invalid packet (outside network):", packet) + ) raise def verify_capture_in_with_icmp_errors(self, capture, in_if, icmp_type=11): @@ -460,20 +508,20 @@ class MethodHolder(VppTestCase): self.assertTrue(icmp.haslayer(IPerror)) inner_ip = icmp[IPerror] if inner_ip.haslayer(TCPerror): - self.assertEqual(inner_ip[TCPerror].sport, - self.tcp_port_in) + self.assertEqual(inner_ip[TCPerror].sport, self.tcp_port_in) elif inner_ip.haslayer(UDPerror): - self.assertEqual(inner_ip[UDPerror].sport, - self.udp_port_in) + self.assertEqual(inner_ip[UDPerror].sport, self.udp_port_in) else: self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in) except: - self.logger.error(ppp("Unexpected or invalid packet " - "(inside network):", packet)) + self.logger.error( + ppp("Unexpected or invalid packet (inside network):", packet) + ) raise - def create_stream_frag(self, src_if, dst, sport, dport, data, - proto=IP_PROTOS.tcp, echo_reply=False): + def create_stream_frag( + self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False + ): """ Create fragmented packet stream @@ -487,9 +535,11 @@ class MethodHolder(VppTestCase): :returns: Fragments """ if proto == IP_PROTOS.tcp: - p = (IP(src=src_if.remote_ip4, dst=dst) / - TCP(sport=sport, dport=dport) / - Raw(data)) + p = ( + IP(src=src_if.remote_ip4, dst=dst) + / TCP(sport=sport, dport=dport) + / Raw(data) + ) p = p.__class__(scapy.compat.raw(p)) chksum = p[TCP].chksum proto_header = TCP(sport=sport, dport=dport, chksum=chksum) @@ -497,9 +547,9 @@ class MethodHolder(VppTestCase): proto_header = UDP(sport=sport, dport=dport) elif proto == IP_PROTOS.icmp: if not echo_reply: - proto_header = ICMP(id=sport, type='echo-request') + proto_header = ICMP(id=sport, type="echo-request") else: - proto_header = ICMP(id=sport, type='echo-reply') + proto_header = ICMP(id=sport, type="echo-reply") else: raise Exception("Unsupported protocol") id = random.randint(0, 65535) @@ -508,28 +558,32 @@ class MethodHolder(VppTestCase): raw = Raw(data[0:4]) else: raw = Raw(data[0:16]) - p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) / - IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) / - proto_header / - raw) + p = ( + Ether(src=src_if.remote_mac, dst=src_if.local_mac) + / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) + / proto_header + / raw + ) pkts.append(p) if proto == IP_PROTOS.tcp: raw = Raw(data[4:20]) else: raw = Raw(data[16:32]) - p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) / - IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, - proto=proto) / - raw) + p = ( + Ether(src=src_if.remote_mac, dst=src_if.local_mac) + / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto) + / raw + ) pkts.append(p) if proto == IP_PROTOS.tcp: raw = Raw(data[20:]) else: raw = Raw(data[32:]) - p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) / - IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, - id=id) / - raw) + p = ( + Ether(src=src_if.remote_mac, dst=src_if.local_mac) + / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id) + / raw + ) pkts.append(p) return pkts @@ -550,17 +604,15 @@ class MethodHolder(VppTestCase): self.assert_ip_checksum_valid(p) buffer.seek(p[IP].frag * 8) buffer.write(bytes(p[IP].payload)) - ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, - proto=frags[0][IP].proto) + ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto) if ip.proto == IP_PROTOS.tcp: - p = (ip / TCP(buffer.getvalue())) + p = ip / TCP(buffer.getvalue()) self.logger.debug(ppp("Reassembled:", p)) self.assert_tcp_checksum_valid(p) elif ip.proto == IP_PROTOS.udp: - p = (ip / UDP(buffer.getvalue()[:8]) / - Raw(buffer.getvalue()[8:])) + p = ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:]) elif ip.proto == IP_PROTOS.icmp: - p = (ip / ICMP(buffer.getvalue())) + p = ip / ICMP(buffer.getvalue()) return p def verify_ipfix_nat44_ses(self, data): @@ -580,29 +632,24 @@ class MethodHolder(VppTestCase): else: nat44_ses_delete_num += 1 # sourceIPv4Address - self.assertEqual(self.pg0.remote_ip4, - str(ipaddress.IPv4Address(record[8]))) + self.assertEqual(self.pg0.remote_ip4, str(ipaddress.IPv4Address(record[8]))) # postNATSourceIPv4Address - self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr), - record[225]) + self.assertEqual( + socket.inet_pton(socket.AF_INET, self.nat_addr), record[225] + ) # ingressVRFID self.assertEqual(struct.pack("!I", 0), record[234]) # protocolIdentifier/sourceTransportPort # /postNAPTSourceTransportPort if IP_PROTOS.icmp == scapy.compat.orb(record[4]): self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7]) - self.assertEqual(struct.pack("!H", self.icmp_id_out), - record[227]) + self.assertEqual(struct.pack("!H", self.icmp_id_out), record[227]) elif IP_PROTOS.tcp == scapy.compat.orb(record[4]): - self.assertEqual(struct.pack("!H", self.tcp_port_in), - record[7]) - self.assertEqual(struct.pack("!H", self.tcp_port_out), - record[227]) + self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7]) + self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227]) elif IP_PROTOS.udp == scapy.compat.orb(record[4]): - self.assertEqual(struct.pack("!H", self.udp_port_in), - record[7]) - self.assertEqual(struct.pack("!H", self.udp_port_out), - record[227]) + self.assertEqual(struct.pack("!H", self.udp_port_in), record[7]) + self.assertEqual(struct.pack("!H", self.udp_port_out), record[227]) else: self.fail(f"Invalid protocol {scapy.compat.orb(record[4])}") self.assertEqual(3, nat44_ses_create_num) @@ -627,16 +674,16 @@ class MethodHolder(VppTestCase): self.assertEqual(struct.pack("!I", limit), record[471]) def verify_no_nat44_user(self): - """ Verify that there is no NAT44EI user """ + """Verify that there is no NAT44EI user""" users = self.vapi.nat44_ei_user_dump() self.assertEqual(len(users), 0) - users = self.statistics['/nat44-ei/total-users'] + users = self.statistics["/nat44-ei/total-users"] self.assertEqual(users[0][0], 0) - sessions = self.statistics['/nat44-ei/total-sessions'] + sessions = self.statistics["/nat44-ei/total-sessions"] self.assertEqual(sessions[0][0], 0) def verify_syslog_apmap(self, data, is_add=True): - message = data.decode('utf-8') + message = data.decode("utf-8") try: message = SyslogMessage.parse(message) except ParseError as e: @@ -644,26 +691,26 @@ class MethodHolder(VppTestCase): raise else: self.assertEqual(message.severity, SyslogSeverity.info) - self.assertEqual(message.appname, 'NAT') - self.assertEqual(message.msgid, 'APMADD' if is_add else 'APMDEL') - sd_params = message.sd.get('napmap') + self.assertEqual(message.appname, "NAT") + self.assertEqual(message.msgid, "APMADD" if is_add else "APMDEL") + sd_params = message.sd.get("napmap") self.assertTrue(sd_params is not None) - self.assertEqual(sd_params.get('IATYP'), 'IPv4') - self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4) - self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in) - self.assertEqual(sd_params.get('XATYP'), 'IPv4') - self.assertEqual(sd_params.get('XSADDR'), self.nat_addr) - self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out) - self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp) - self.assertTrue(sd_params.get('SSUBIX') is not None) - self.assertEqual(sd_params.get('SVLAN'), '0') + self.assertEqual(sd_params.get("IATYP"), "IPv4") + self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip4) + self.assertEqual(sd_params.get("ISPORT"), "%d" % self.tcp_port_in) + self.assertEqual(sd_params.get("XATYP"), "IPv4") + self.assertEqual(sd_params.get("XSADDR"), self.nat_addr) + self.assertEqual(sd_params.get("XSPORT"), "%d" % self.tcp_port_out) + self.assertEqual(sd_params.get("PROTO"), "%d" % IP_PROTOS.tcp) + self.assertTrue(sd_params.get("SSUBIX") is not None) + self.assertEqual(sd_params.get("SVLAN"), "0") def verify_mss_value(self, pkt, mss): if not pkt.haslayer(IP) or not pkt.haslayer(TCP): raise TypeError("Not a TCP/IP packet") for option in pkt[TCP].options: - if option[0] == 'MSS': + if option[0] == "MSS": self.assertEqual(option[1], mss) self.assert_tcp_checksum_valid(pkt) @@ -678,8 +725,9 @@ class MethodHolder(VppTestCase): else: raise Exception("Unsupported protocol") - def frag_in_order(self, proto=IP_PROTOS.tcp, dont_translate=False, - ignore_port=False): + def frag_in_order( + self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False + ): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: @@ -689,20 +737,19 @@ class MethodHolder(VppTestCase): self.port_in = random.randint(1025, 65535) # in2out - pkts = self.create_stream_frag(self.pg0, self.pg1.remote_ip4, - self.port_in, 20, data, proto) + pkts = self.create_stream_frag( + self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto + ) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg1.get_capture(len(pkts)) if not dont_translate: - p = self.reass_frags_and_verify(frags, - self.nat_addr, - self.pg1.remote_ip4) + p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4) else: - p = self.reass_frags_and_verify(frags, - self.pg0.remote_ip4, - self.pg1.remote_ip4) + p = self.reass_frags_and_verify( + frags, self.pg0.remote_ip4, self.pg1.remote_ip4 + ) if proto != IP_PROTOS.icmp: if not dont_translate: self.assertEqual(p[layer].dport, 20) @@ -729,15 +776,14 @@ class MethodHolder(VppTestCase): else: sport = p[layer].id dport = 0 - pkts = self.create_stream_frag(self.pg1, dst_addr, sport, dport, data, - proto, echo_reply=True) + pkts = self.create_stream_frag( + self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True + ) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg0.get_capture(len(pkts)) - p = self.reass_frags_and_verify(frags, - self.pg1.remote_ip4, - self.pg0.remote_ip4) + p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4) if proto != IP_PROTOS.icmp: self.assertEqual(p[layer].sport, 20) self.assertEqual(p[layer].dport, self.port_in) @@ -745,9 +791,15 @@ class MethodHolder(VppTestCase): self.assertEqual(p[layer].id, self.port_in) self.assertEqual(data, p[Raw].load) - def reass_hairpinning(self, server_addr, server_in_port, server_out_port, - host_in_port, proto=IP_PROTOS.tcp, - ignore_port=False): + def reass_hairpinning( + self, + server_addr, + server_in_port, + server_out_port, + host_in_port, + proto=IP_PROTOS.tcp, + ignore_port=False, + ): layer = self.proto2layer(proto) @@ -757,19 +809,14 @@ class MethodHolder(VppTestCase): data = b"A" * 16 + b"B" * 16 + b"C" * 3 # send packet from host to server - pkts = self.create_stream_frag(self.pg0, - self.nat_addr, - host_in_port, - server_out_port, - data, - proto) + pkts = self.create_stream_frag( + self.pg0, self.nat_addr, host_in_port, server_out_port, data, proto + ) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg0.get_capture(len(pkts)) - p = self.reass_frags_and_verify(frags, - self.nat_addr, - server_addr) + p = self.reass_frags_and_verify(frags, self.nat_addr, server_addr) if proto != IP_PROTOS.icmp: if not ignore_port: self.assertNotEqual(p[layer].sport, host_in_port) @@ -779,8 +826,9 @@ class MethodHolder(VppTestCase): self.assertNotEqual(p[layer].id, host_in_port) self.assertEqual(data, p[Raw].load) - def frag_out_of_order(self, proto=IP_PROTOS.tcp, dont_translate=False, - ignore_port=False): + def frag_out_of_order( + self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False + ): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: @@ -791,21 +839,22 @@ class MethodHolder(VppTestCase): for i in range(2): # in2out - pkts = self.create_stream_frag(self.pg0, self.pg1.remote_ip4, - self.port_in, 20, data, proto) + pkts = self.create_stream_frag( + self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto + ) pkts.reverse() self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg1.get_capture(len(pkts)) if not dont_translate: - p = self.reass_frags_and_verify(frags, - self.nat_addr, - self.pg1.remote_ip4) + p = self.reass_frags_and_verify( + frags, self.nat_addr, self.pg1.remote_ip4 + ) else: - p = self.reass_frags_and_verify(frags, - self.pg0.remote_ip4, - self.pg1.remote_ip4) + p = self.reass_frags_and_verify( + frags, self.pg0.remote_ip4, self.pg1.remote_ip4 + ) if proto != IP_PROTOS.icmp: if not dont_translate: self.assertEqual(p[layer].dport, 20) @@ -832,16 +881,17 @@ class MethodHolder(VppTestCase): else: sport = p[layer].id dport = 0 - pkts = self.create_stream_frag(self.pg1, dst_addr, sport, dport, - data, proto, echo_reply=True) + pkts = self.create_stream_frag( + self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True + ) pkts.reverse() self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg0.get_capture(len(pkts)) - p = self.reass_frags_and_verify(frags, - self.pg1.remote_ip4, - self.pg0.remote_ip4) + p = self.reass_frags_and_verify( + frags, self.pg1.remote_ip4, self.pg0.remote_ip4 + ) if proto != IP_PROTOS.icmp: self.assertEqual(p[layer].sport, 20) self.assertEqual(p[layer].dport, self.port_in) @@ -861,7 +911,7 @@ def get_nat44_ei_in2out_worker_index(ip, vpp_worker_count): class TestNAT44EI(MethodHolder): - """ NAT44EI Test Cases """ + """NAT44EI Test Cases""" max_translations = 10240 max_users = 10240 @@ -877,7 +927,7 @@ class TestNAT44EI(MethodHolder): cls.udp_port_out = 6304 cls.icmp_id_in = 6305 cls.icmp_id_out = 6305 - cls.nat_addr = '10.0.0.3' + cls.nat_addr = "10.0.0.3" cls.ipfix_src_port = 4739 cls.ipfix_domain_id = 1 cls.tcp_external_port = 80 @@ -898,8 +948,8 @@ class TestNAT44EI(MethodHolder): cls.pg1.configure_ipv4_neighbors() cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7])) - cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 10}) - cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 20}) + cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 10}) + cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 20}) cls.pg4._local_ip4 = "172.16.255.1" cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2" @@ -921,8 +971,8 @@ class TestNAT44EI(MethodHolder): cls.pg9.generate_remote_hosts(2) cls.pg9.config_ip4() cls.vapi.sw_interface_add_del_address( - sw_if_index=cls.pg9.sw_if_index, - prefix="10.0.0.1/24") + sw_if_index=cls.pg9.sw_if_index, prefix="10.0.0.1/24" + ) cls.pg9.admin_up() cls.pg9.resolve_arp() @@ -932,8 +982,8 @@ class TestNAT44EI(MethodHolder): def plugin_enable(self): self.vapi.nat44_ei_plugin_enable_disable( - sessions=self.max_translations, - users=self.max_users, enable=1) + sessions=self.max_translations, users=self.max_users, enable=1 + ) def setUp(self): super(TestNAT44EI, self).setUp() @@ -943,8 +993,8 @@ class TestNAT44EI(MethodHolder): super(TestNAT44EI, self).tearDown() if not self.vpp_dead: self.vapi.nat44_ei_ipfix_enable_disable( - domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, - enable=0) + domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0 + ) self.ipfix_src_port = 4739 self.ipfix_domain_id = 1 @@ -952,16 +1002,16 @@ class TestNAT44EI(MethodHolder): self.vapi.cli("clear logging") def test_clear_sessions(self): - """ NAT44EI session clearing test """ + """NAT44EI session clearing test""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) pkts = self.create_stream_in(self.pg0, self.pg1) self.pg0.add_stream(pkts) @@ -970,32 +1020,32 @@ class TestNAT44EI(MethodHolder): capture = self.pg1.get_capture(len(pkts)) self.verify_capture_out(capture) - sessions = self.statistics['/nat44-ei/total-sessions'] + sessions = self.statistics["/nat44-ei/total-sessions"] self.assertGreater(sessions[:, 0].sum(), 0, "Session count invalid") self.logger.info("sessions before clearing: %s" % sessions[0][0]) self.vapi.cli("clear nat44 ei sessions") - sessions = self.statistics['/nat44-ei/total-sessions'] + sessions = self.statistics["/nat44-ei/total-sessions"] self.assertEqual(sessions[:, 0].sum(), 0, "Session count invalid") self.logger.info("sessions after clearing: %s" % sessions[0][0]) def test_dynamic(self): - """ NAT44EI dynamic translation test """ + """NAT44EI dynamic translation test""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # in2out - tcpn = self.statistics['/nat44-ei/in2out/slowpath/tcp'] - udpn = self.statistics['/nat44-ei/in2out/slowpath/udp'] - icmpn = self.statistics['/nat44-ei/in2out/slowpath/icmp'] - drops = self.statistics['/nat44-ei/in2out/slowpath/drops'] + tcpn = self.statistics["/nat44-ei/in2out/slowpath/tcp"] + udpn = self.statistics["/nat44-ei/in2out/slowpath/udp"] + icmpn = self.statistics["/nat44-ei/in2out/slowpath/icmp"] + drops = self.statistics["/nat44-ei/in2out/slowpath/drops"] pkts = self.create_stream_in(self.pg0, self.pg1) self.pg0.add_stream(pkts) @@ -1005,20 +1055,20 @@ class TestNAT44EI(MethodHolder): self.verify_capture_out(capture) if_idx = self.pg0.sw_if_index - cnt = self.statistics['/nat44-ei/in2out/slowpath/tcp'] + cnt = self.statistics["/nat44-ei/in2out/slowpath/tcp"] self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2) - cnt = self.statistics['/nat44-ei/in2out/slowpath/udp'] + cnt = self.statistics["/nat44-ei/in2out/slowpath/udp"] self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1) - cnt = self.statistics['/nat44-ei/in2out/slowpath/icmp'] + cnt = self.statistics["/nat44-ei/in2out/slowpath/icmp"] self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1) - cnt = self.statistics['/nat44-ei/in2out/slowpath/drops'] + cnt = self.statistics["/nat44-ei/in2out/slowpath/drops"] self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0) # out2in - tcpn = self.statistics['/nat44-ei/out2in/slowpath/tcp'] - udpn = self.statistics['/nat44-ei/out2in/slowpath/udp'] - icmpn = self.statistics['/nat44-ei/out2in/slowpath/icmp'] - drops = self.statistics['/nat44-ei/out2in/slowpath/drops'] + tcpn = self.statistics["/nat44-ei/out2in/slowpath/tcp"] + udpn = self.statistics["/nat44-ei/out2in/slowpath/udp"] + icmpn = self.statistics["/nat44-ei/out2in/slowpath/icmp"] + drops = self.statistics["/nat44-ei/out2in/slowpath/drops"] pkts = self.create_stream_out(self.pg1) self.pg1.add_stream(pkts) @@ -1028,31 +1078,31 @@ class TestNAT44EI(MethodHolder): self.verify_capture_in(capture, self.pg0) if_idx = self.pg1.sw_if_index - cnt = self.statistics['/nat44-ei/out2in/slowpath/tcp'] + cnt = self.statistics["/nat44-ei/out2in/slowpath/tcp"] self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2) - cnt = self.statistics['/nat44-ei/out2in/slowpath/udp'] + cnt = self.statistics["/nat44-ei/out2in/slowpath/udp"] self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1) - cnt = self.statistics['/nat44-ei/out2in/slowpath/icmp'] + cnt = self.statistics["/nat44-ei/out2in/slowpath/icmp"] self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1) - cnt = self.statistics['/nat44-ei/out2in/slowpath/drops'] + cnt = self.statistics["/nat44-ei/out2in/slowpath/drops"] self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0) - users = self.statistics['/nat44-ei/total-users'] + users = self.statistics["/nat44-ei/total-users"] self.assertEqual(users[:, 0].sum(), 1) - sessions = self.statistics['/nat44-ei/total-sessions'] + sessions = self.statistics["/nat44-ei/total-sessions"] self.assertEqual(sessions[:, 0].sum(), 3) def test_dynamic_icmp_errors_in2out_ttl_1(self): - """ NAT44EI handling of client packets with TTL=1 """ + """NAT44EI handling of client packets with TTL=1""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # Client side - generate traffic pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1) @@ -1062,16 +1112,16 @@ class TestNAT44EI(MethodHolder): self.verify_capture_in_with_icmp_errors(capture, self.pg0) def test_dynamic_icmp_errors_out2in_ttl_1(self): - """ NAT44EI handling of server packets with TTL=1 """ + """NAT44EI handling of server packets with TTL=1""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # Client side - create sessions pkts = self.create_stream_in(self.pg0, self.pg1) @@ -1086,21 +1136,19 @@ class TestNAT44EI(MethodHolder): capture = self.send_and_expect_some(self.pg1, pkts, self.pg1) # Server side - verify ICMP type 11 packets - self.verify_capture_out_with_icmp_errors(capture, - src_ip=self.pg1.local_ip4) + self.verify_capture_out_with_icmp_errors(capture, src_ip=self.pg1.local_ip4) def test_dynamic_icmp_errors_in2out_ttl_2(self): - """ NAT44EI handling of error responses to client packets with TTL=2 - """ + """NAT44EI handling of error responses to client packets with TTL=2""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # Client side - generate traffic pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2) @@ -1110,9 +1158,13 @@ class TestNAT44EI(MethodHolder): # Server side - simulate ICMP type 11 response capture = self.pg1.get_capture(len(pkts)) - pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - ICMP(type=11) / packet[IP] for packet in capture] + pkts = [ + Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / ICMP(type=11) + / packet[IP] + for packet in capture + ] self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1122,17 +1174,16 @@ class TestNAT44EI(MethodHolder): self.verify_capture_in_with_icmp_errors(capture, self.pg0) def test_dynamic_icmp_errors_out2in_ttl_2(self): - """ NAT44EI handling of error responses to server packets with TTL=2 - """ + """NAT44EI handling of error responses to server packets with TTL=2""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # Client side - create sessions pkts = self.create_stream_in(self.pg0, self.pg1) @@ -1150,9 +1201,13 @@ class TestNAT44EI(MethodHolder): # Client side - simulate ICMP type 11 response capture = self.pg0.get_capture(len(pkts)) - pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - ICMP(type=11) / packet[IP] for packet in capture] + pkts = [ + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / ICMP(type=11) + / packet[IP] + for packet in capture + ] self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1162,20 +1217,22 @@ class TestNAT44EI(MethodHolder): self.verify_capture_out_with_icmp_errors(capture) def test_ping_out_interface_from_outside(self): - """ NAT44EI ping out interface from outside network """ + """NAT44EI ping out interface from outside network""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - - p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / - IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) / - ICMP(id=self.icmp_id_out, type='echo-request')) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) + + p = ( + Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) + / IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) + / ICMP(id=self.icmp_id_out, type="echo-request") + ) pkts = [p] self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) @@ -1188,26 +1245,29 @@ class TestNAT44EI(MethodHolder): self.assertEqual(packet[ICMP].id, self.icmp_id_in) self.assertEqual(packet[ICMP].type, 0) # echo reply except: - self.logger.error(ppp("Unexpected or invalid packet " - "(outside network):", packet)) + self.logger.error( + ppp("Unexpected or invalid packet (outside network):", packet) + ) raise def test_ping_internal_host_from_outside(self): - """ NAT44EI ping internal host from outside network """ + """NAT44EI ping internal host from outside network""" self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # out2in - pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) / - ICMP(id=self.icmp_id_out, type='echo-request')) + pkt = ( + Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) + / ICMP(id=self.icmp_id_out, type="echo-request") + ) self.pg1.add_stream(pkt) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1216,9 +1276,11 @@ class TestNAT44EI(MethodHolder): self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp) # in2out - pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) / - ICMP(id=self.icmp_id_in, type='echo-reply')) + pkt = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) + / ICMP(id=self.icmp_id_in, type="echo-reply") + ) self.pg0.add_stream(pkt) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1227,25 +1289,27 @@ class TestNAT44EI(MethodHolder): self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp) def test_forwarding(self): - """ NAT44EI forwarding test """ + """NAT44EI forwarding test""" flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) self.vapi.nat44_ei_forwarding_enable_disable(enable=1) real_ip = self.pg0.remote_ip4 alias_ip = self.nat_addr flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING self.vapi.nat44_ei_add_del_static_mapping( - is_add=1, local_ip_address=real_ip, + is_add=1, + local_ip_address=real_ip, external_ip_address=alias_ip, external_sw_if_index=0xFFFFFFFF, - flags=flags) + flags=flags, + ) try: # static mapping match @@ -1269,9 +1333,9 @@ class TestNAT44EI(MethodHolder): host0 = self.pg0.remote_hosts[0] self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1] try: - pkts = self.create_stream_out(self.pg1, - dst_ip=self.pg0.remote_ip4, - use_inside_ports=True) + pkts = self.create_stream_out( + self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True + ) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1283,8 +1347,9 @@ class TestNAT44EI(MethodHolder): self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) - self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4, - same_port=True) + self.verify_capture_out( + capture, nat_ip=self.pg0.remote_ip4, same_port=True + ) finally: self.pg0.remote_hosts[0] = host0 @@ -1296,10 +1361,11 @@ class TestNAT44EI(MethodHolder): local_ip_address=real_ip, external_ip_address=alias_ip, external_sw_if_index=0xFFFFFFFF, - flags=flags) + flags=flags, + ) def test_static_in(self): - """ NAT44EI 1:1 NAT initialized from inside network """ + """NAT44EI 1:1 NAT initialized from inside network""" nat_ip = "10.0.0.10" self.tcp_port_out = 6303 @@ -1309,14 +1375,14 @@ class TestNAT44EI(MethodHolder): self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) sm = self.vapi.nat44_ei_static_mapping_dump() self.assertEqual(len(sm), 1) - self.assertEqual(sm[0].tag, '') + self.assertEqual(sm[0].tag, "") self.assertEqual(sm[0].protocol, 0) self.assertEqual(sm[0].local_port, 0) self.assertEqual(sm[0].external_port, 0) @@ -1338,7 +1404,7 @@ class TestNAT44EI(MethodHolder): self.verify_capture_in(capture, self.pg0) def test_static_out(self): - """ NAT44EI 1:1 NAT initialized from outside network """ + """NAT44EI 1:1 NAT initialized from outside network""" nat_ip = "10.0.0.20" self.tcp_port_out = 6303 @@ -1349,11 +1415,11 @@ class TestNAT44EI(MethodHolder): self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) sm = self.vapi.nat44_ei_static_mapping_dump() self.assertEqual(len(sm), 1) self.assertEqual(sm[0].tag, tag) @@ -1375,29 +1441,41 @@ class TestNAT44EI(MethodHolder): self.verify_capture_out(capture, nat_ip, True) def test_static_with_port_in(self): - """ NAT44EI 1:1 NAPT initialized from inside network """ + """NAT44EI 1:1 NAPT initialized from inside network""" self.tcp_port_out = 3606 self.udp_port_out = 3607 self.icmp_id_out = 3608 self.nat44_add_address(self.nat_addr) - self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr, - self.tcp_port_in, self.tcp_port_out, - proto=IP_PROTOS.tcp) - self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr, - self.udp_port_in, self.udp_port_out, - proto=IP_PROTOS.udp) - self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr, - self.icmp_id_in, self.icmp_id_out, - proto=IP_PROTOS.icmp) + self.nat44_add_static_mapping( + self.pg0.remote_ip4, + self.nat_addr, + self.tcp_port_in, + self.tcp_port_out, + proto=IP_PROTOS.tcp, + ) + self.nat44_add_static_mapping( + self.pg0.remote_ip4, + self.nat_addr, + self.udp_port_in, + self.udp_port_out, + proto=IP_PROTOS.udp, + ) + self.nat44_add_static_mapping( + self.pg0.remote_ip4, + self.nat_addr, + self.icmp_id_in, + self.icmp_id_out, + proto=IP_PROTOS.icmp, + ) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # in2out pkts = self.create_stream_in(self.pg0, self.pg1) @@ -1416,29 +1494,41 @@ class TestNAT44EI(MethodHolder): self.verify_capture_in(capture, self.pg0) def test_static_with_port_out(self): - """ NAT44EI 1:1 NAPT initialized from outside network """ + """NAT44EI 1:1 NAPT initialized from outside network""" self.tcp_port_out = 30606 self.udp_port_out = 30607 self.icmp_id_out = 30608 self.nat44_add_address(self.nat_addr) - self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr, - self.tcp_port_in, self.tcp_port_out, - proto=IP_PROTOS.tcp) - self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr, - self.udp_port_in, self.udp_port_out, - proto=IP_PROTOS.udp) - self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr, - self.icmp_id_in, self.icmp_id_out, - proto=IP_PROTOS.icmp) + self.nat44_add_static_mapping( + self.pg0.remote_ip4, + self.nat_addr, + self.tcp_port_in, + self.tcp_port_out, + proto=IP_PROTOS.tcp, + ) + self.nat44_add_static_mapping( + self.pg0.remote_ip4, + self.nat_addr, + self.udp_port_in, + self.udp_port_out, + proto=IP_PROTOS.udp, + ) + self.nat44_add_static_mapping( + self.pg0.remote_ip4, + self.nat_addr, + self.icmp_id_in, + self.icmp_id_out, + proto=IP_PROTOS.icmp, + ) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # out2in pkts = self.create_stream_out(self.pg1) @@ -1457,7 +1547,7 @@ class TestNAT44EI(MethodHolder): self.verify_capture_out(capture) def test_static_vrf_aware(self): - """ NAT44EI 1:1 NAT VRF awareness """ + """NAT44EI 1:1 NAT VRF awareness""" nat_ip1 = "10.0.0.30" nat_ip2 = "10.0.0.40" @@ -1465,20 +1555,18 @@ class TestNAT44EI(MethodHolder): self.udp_port_out = 6304 self.icmp_id_out = 6305 - self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1, - vrf_id=10) - self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2, - vrf_id=10) + self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1, vrf_id=10) + self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2, vrf_id=10) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg3.sw_if_index, - is_add=1) + sw_if_index=self.pg3.sw_if_index, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg4.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1 + ) # inside interface VRF match NAT44EI static mapping VRF pkts = self.create_stream_in(self.pg4, self.pg3) @@ -1497,7 +1585,7 @@ class TestNAT44EI(MethodHolder): self.pg3.assert_nothing_captured() def test_dynamic_to_static(self): - """ NAT44EI Switch from dynamic translation to 1:1NAT """ + """NAT44EI Switch from dynamic translation to 1:1NAT""" nat_ip = "10.0.0.10" self.tcp_port_out = 6303 self.udp_port_out = 6304 @@ -1506,11 +1594,11 @@ class TestNAT44EI(MethodHolder): self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # dynamic pkts = self.create_stream_in(self.pg0, self.pg1) @@ -1532,22 +1620,27 @@ class TestNAT44EI(MethodHolder): self.verify_capture_out(capture, nat_ip, True) def test_identity_nat(self): - """ NAT44EI Identity NAT """ + """NAT44EI Identity NAT""" flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING self.vapi.nat44_ei_add_del_identity_mapping( - ip_address=self.pg0.remote_ip4, sw_if_index=0xFFFFFFFF, - flags=flags, is_add=1) + ip_address=self.pg0.remote_ip4, + sw_if_index=0xFFFFFFFF, + flags=flags, + is_add=1, + ) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) / - TCP(sport=12345, dport=56789)) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) + + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) + / TCP(sport=12345, dport=56789) + ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1569,26 +1662,29 @@ class TestNAT44EI(MethodHolder): self.assertEqual(len(sessions), 0) flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING self.vapi.nat44_ei_add_del_identity_mapping( - ip_address=self.pg0.remote_ip4, sw_if_index=0xFFFFFFFF, - flags=flags, vrf_id=1, is_add=1) + ip_address=self.pg0.remote_ip4, + sw_if_index=0xFFFFFFFF, + flags=flags, + vrf_id=1, + is_add=1, + ) identity_mappings = self.vapi.nat44_ei_identity_mapping_dump() self.assertEqual(len(identity_mappings), 2) def test_multiple_inside_interfaces(self): - """ NAT44EI multiple non-overlapping address space inside interfaces - """ + """NAT44EI multiple non-overlapping address space inside interfaces""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg3.sw_if_index, - is_add=1) + sw_if_index=self.pg3.sw_if_index, is_add=1 + ) # between two NAT44EI inside interfaces (no translation) pkts = self.create_stream_in(self.pg0, self.pg1) @@ -1639,26 +1735,24 @@ class TestNAT44EI(MethodHolder): self.verify_capture_in(capture, self.pg1) def test_inside_overlapping_interfaces(self): - """ NAT44EI multiple inside interfaces with overlapping address space - """ + """NAT44EI multiple inside interfaces with overlapping address space""" static_nat_ip = "10.0.0.10" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg3.sw_if_index, - is_add=1) + sw_if_index=self.pg3.sw_if_index, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg4.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg5.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg5.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg6.sw_if_index, - flags=flags, is_add=1) - self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip, - vrf_id=20) + sw_if_index=self.pg6.sw_if_index, flags=flags, is_add=1 + ) + self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip, vrf_id=20) # between NAT44EI inside interfaces with same VRF (no translation) pkts = self.create_stream_in(self.pg4, self.pg5) @@ -1669,9 +1763,11 @@ class TestNAT44EI(MethodHolder): self.verify_capture_no_translation(capture, self.pg4, self.pg5) # between NAT44EI inside interfaces with different VRF (hairpinning) - p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) / - IP(src=self.pg4.remote_ip4, dst=static_nat_ip) / - TCP(sport=1234, dport=5678)) + p = ( + Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) + / IP(src=self.pg4.remote_ip4, dst=static_nat_ip) + / TCP(sport=1234, dport=5678) + ) self.pg4.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1723,16 +1819,12 @@ class TestNAT44EI(MethodHolder): # pg5 session dump addresses = self.vapi.nat44_ei_address_dump() self.assertEqual(len(addresses), 1) - sessions = self.vapi.nat44_ei_user_session_dump( - self.pg5.remote_ip4, 10) + sessions = self.vapi.nat44_ei_user_session_dump(self.pg5.remote_ip4, 10) self.assertEqual(len(sessions), 3) for session in sessions: - self.assertFalse(session.flags & - self.config_flags.NAT44_EI_STATIC_MAPPING) - self.assertEqual(str(session.inside_ip_address), - self.pg5.remote_ip4) - self.assertEqual(session.outside_ip_address, - addresses[0].ip_address) + self.assertFalse(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING) + self.assertEqual(str(session.inside_ip_address), self.pg5.remote_ip4) + self.assertEqual(session.outside_ip_address, addresses[0].ip_address) self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp) self.assertEqual(sessions[1].protocol, IP_PROTOS.udp) self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp) @@ -1765,44 +1857,38 @@ class TestNAT44EI(MethodHolder): addresses = self.vapi.nat44_ei_address_dump() self.assertEqual(len(addresses), 1) for user in users: - sessions = self.vapi.nat44_ei_user_session_dump(user.ip_address, - user.vrf_id) + sessions = self.vapi.nat44_ei_user_session_dump( + user.ip_address, user.vrf_id + ) for session in sessions: self.assertEqual(user.ip_address, session.inside_ip_address) self.assertTrue(session.total_bytes > session.total_pkts > 0) - self.assertTrue(session.protocol in - [IP_PROTOS.tcp, IP_PROTOS.udp, - IP_PROTOS.icmp]) + self.assertTrue( + session.protocol in [IP_PROTOS.tcp, IP_PROTOS.udp, IP_PROTOS.icmp] + ) # pg4 session dump - sessions = self.vapi.nat44_ei_user_session_dump( - self.pg4.remote_ip4, 10) + sessions = self.vapi.nat44_ei_user_session_dump(self.pg4.remote_ip4, 10) self.assertGreaterEqual(len(sessions), 4) for session in sessions: - self.assertFalse( - session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING) - self.assertEqual(str(session.inside_ip_address), - self.pg4.remote_ip4) - self.assertEqual(session.outside_ip_address, - addresses[0].ip_address) + self.assertFalse(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING) + self.assertEqual(str(session.inside_ip_address), self.pg4.remote_ip4) + self.assertEqual(session.outside_ip_address, addresses[0].ip_address) # pg6 session dump - sessions = self.vapi.nat44_ei_user_session_dump( - self.pg6.remote_ip4, 20) + sessions = self.vapi.nat44_ei_user_session_dump(self.pg6.remote_ip4, 20) self.assertGreaterEqual(len(sessions), 3) for session in sessions: + self.assertTrue(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING) + self.assertEqual(str(session.inside_ip_address), self.pg6.remote_ip4) + self.assertEqual(str(session.outside_ip_address), static_nat_ip) self.assertTrue( - session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING) - self.assertEqual(str(session.inside_ip_address), - self.pg6.remote_ip4) - self.assertEqual(str(session.outside_ip_address), - static_nat_ip) - self.assertTrue(session.inside_port in - [self.tcp_port_in, self.udp_port_in, - self.icmp_id_in]) + session.inside_port + in [self.tcp_port_in, self.udp_port_in, self.icmp_id_in] + ) def test_hairpinning(self): - """ NAT44EI hairpinning - 1:1 NAPT """ + """NAT44EI hairpinning - 1:1 NAPT""" host = self.pg0.remote_hosts[0] server = self.pg0.remote_hosts[1] @@ -1814,22 +1900,28 @@ class TestNAT44EI(MethodHolder): self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # add static mapping for server - self.nat44_add_static_mapping(server.ip4, self.nat_addr, - server_in_port, server_out_port, - proto=IP_PROTOS.tcp) - - cnt = self.statistics['/nat44-ei/hairpinning'] + self.nat44_add_static_mapping( + server.ip4, + self.nat_addr, + server_in_port, + server_out_port, + proto=IP_PROTOS.tcp, + ) + + cnt = self.statistics["/nat44-ei/hairpinning"] # send packet from host to server - p = (Ether(src=host.mac, dst=self.pg0.local_mac) / - IP(src=host.ip4, dst=self.nat_addr) / - TCP(sport=host_in_port, dport=server_out_port)) + p = ( + Ether(src=host.mac, dst=self.pg0.local_mac) + / IP(src=host.ip4, dst=self.nat_addr) + / TCP(sport=host_in_port, dport=server_out_port) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1848,14 +1940,16 @@ class TestNAT44EI(MethodHolder): self.logger.error(ppp("Unexpected or invalid packet:", p)) raise - after = self.statistics['/nat44-ei/hairpinning'] + after = self.statistics["/nat44-ei/hairpinning"] if_idx = self.pg0.sw_if_index self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), 1) # send reply from server to host - p = (Ether(src=server.mac, dst=self.pg0.local_mac) / - IP(src=server.ip4, dst=self.nat_addr) / - TCP(sport=server_in_port, dport=host_out_port)) + p = ( + Ether(src=server.mac, dst=self.pg0.local_mac) + / IP(src=server.ip4, dst=self.nat_addr) + / TCP(sport=server_in_port, dport=host_out_port) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1873,13 +1967,15 @@ class TestNAT44EI(MethodHolder): self.logger.error(ppp("Unexpected or invalid packet:", p)) raise - after = self.statistics['/nat44-ei/hairpinning'] + after = self.statistics["/nat44-ei/hairpinning"] if_idx = self.pg0.sw_if_index - self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), - 2+(1 if self.vpp_worker_count > 0 else 0)) + self.assertEqual( + after[:, if_idx].sum() - cnt[:, if_idx].sum(), + 2 + (1 if self.vpp_worker_count > 0 else 0), + ) def test_hairpinning2(self): - """ NAT44EI hairpinning - 1:1 NAT""" + """NAT44EI hairpinning - 1:1 NAT""" server1_nat_ip = "10.0.0.10" server2_nat_ip = "10.0.0.11" @@ -1892,11 +1988,11 @@ class TestNAT44EI(MethodHolder): self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # add static mapping for servers self.nat44_add_static_mapping(server1.ip4, server1_nat_ip) @@ -1904,17 +2000,23 @@ class TestNAT44EI(MethodHolder): # host to server1 pkts = [] - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=host.ip4, dst=server1_nat_ip) / - TCP(sport=self.tcp_port_in, dport=server_tcp_port)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=host.ip4, dst=server1_nat_ip) + / TCP(sport=self.tcp_port_in, dport=server_tcp_port) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=host.ip4, dst=server1_nat_ip) / - UDP(sport=self.udp_port_in, dport=server_udp_port)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=host.ip4, dst=server1_nat_ip) + / UDP(sport=self.udp_port_in, dport=server_udp_port) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=host.ip4, dst=server1_nat_ip) / - ICMP(id=self.icmp_id_in, type='echo-request')) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=host.ip4, dst=server1_nat_ip) + / ICMP(id=self.icmp_id_in, type="echo-request") + ) pkts.append(p) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) @@ -1942,17 +2044,23 @@ class TestNAT44EI(MethodHolder): # server1 to host pkts = [] - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server1.ip4, dst=self.nat_addr) / - TCP(sport=server_tcp_port, dport=self.tcp_port_out)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server1.ip4, dst=self.nat_addr) + / TCP(sport=server_tcp_port, dport=self.tcp_port_out) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server1.ip4, dst=self.nat_addr) / - UDP(sport=server_udp_port, dport=self.udp_port_out)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server1.ip4, dst=self.nat_addr) + / UDP(sport=server_udp_port, dport=self.udp_port_out) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server1.ip4, dst=self.nat_addr) / - ICMP(id=self.icmp_id_out, type='echo-reply')) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server1.ip4, dst=self.nat_addr) + / ICMP(id=self.icmp_id_out, type="echo-reply") + ) pkts.append(p) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) @@ -1977,17 +2085,23 @@ class TestNAT44EI(MethodHolder): # server2 to server1 pkts = [] - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server2.ip4, dst=server1_nat_ip) / - TCP(sport=self.tcp_port_in, dport=server_tcp_port)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server2.ip4, dst=server1_nat_ip) + / TCP(sport=self.tcp_port_in, dport=server_tcp_port) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server2.ip4, dst=server1_nat_ip) / - UDP(sport=self.udp_port_in, dport=server_udp_port)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server2.ip4, dst=server1_nat_ip) + / UDP(sport=self.udp_port_in, dport=server_udp_port) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server2.ip4, dst=server1_nat_ip) / - ICMP(id=self.icmp_id_in, type='echo-request')) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server2.ip4, dst=server1_nat_ip) + / ICMP(id=self.icmp_id_in, type="echo-request") + ) pkts.append(p) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) @@ -2015,17 +2129,23 @@ class TestNAT44EI(MethodHolder): # server1 to server2 pkts = [] - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server1.ip4, dst=server2_nat_ip) / - TCP(sport=server_tcp_port, dport=self.tcp_port_out)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server1.ip4, dst=server2_nat_ip) + / TCP(sport=server_tcp_port, dport=self.tcp_port_out) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server1.ip4, dst=server2_nat_ip) / - UDP(sport=server_udp_port, dport=self.udp_port_out)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server1.ip4, dst=server2_nat_ip) + / UDP(sport=server_udp_port, dport=self.udp_port_out) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server1.ip4, dst=server2_nat_ip) / - ICMP(id=self.icmp_id_out, type='echo-reply')) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server1.ip4, dst=server2_nat_ip) + / ICMP(id=self.icmp_id_out, type="echo-reply") + ) pkts.append(p) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) @@ -2049,7 +2169,7 @@ class TestNAT44EI(MethodHolder): raise def test_hairpinning_avoid_inf_loop(self): - """ NAT44EI hairpinning - 1:1 NAPT avoid infinite loop """ + """NAT44EI hairpinning - 1:1 NAPT avoid infinite loop""" host = self.pg0.remote_hosts[0] server = self.pg0.remote_hosts[1] @@ -2061,34 +2181,42 @@ class TestNAT44EI(MethodHolder): self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # add static mapping for server - self.nat44_add_static_mapping(server.ip4, self.nat_addr, - server_in_port, server_out_port, - proto=IP_PROTOS.tcp) + self.nat44_add_static_mapping( + server.ip4, + self.nat_addr, + server_in_port, + server_out_port, + proto=IP_PROTOS.tcp, + ) # add another static mapping that maps pg0.local_ip4 address to itself self.nat44_add_static_mapping(self.pg0.local_ip4, self.pg0.local_ip4) # send packet from host to VPP (the packet should get dropped) - p = (Ether(src=host.mac, dst=self.pg0.local_mac) / - IP(src=host.ip4, dst=self.pg0.local_ip4) / - TCP(sport=host_in_port, dport=server_out_port)) + p = ( + Ether(src=host.mac, dst=self.pg0.local_mac) + / IP(src=host.ip4, dst=self.pg0.local_ip4) + / TCP(sport=host_in_port, dport=server_out_port) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() # Here VPP used to crash due to an infinite loop - cnt = self.statistics['/nat44-ei/hairpinning'] + cnt = self.statistics["/nat44-ei/hairpinning"] # send packet from host to server - p = (Ether(src=host.mac, dst=self.pg0.local_mac) / - IP(src=host.ip4, dst=self.nat_addr) / - TCP(sport=host_in_port, dport=server_out_port)) + p = ( + Ether(src=host.mac, dst=self.pg0.local_mac) + / IP(src=host.ip4, dst=self.nat_addr) + / TCP(sport=host_in_port, dport=server_out_port) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2107,14 +2235,16 @@ class TestNAT44EI(MethodHolder): self.logger.error(ppp("Unexpected or invalid packet:", p)) raise - after = self.statistics['/nat44-ei/hairpinning'] + after = self.statistics["/nat44-ei/hairpinning"] if_idx = self.pg0.sw_if_index self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), 1) # send reply from server to host - p = (Ether(src=server.mac, dst=self.pg0.local_mac) / - IP(src=server.ip4, dst=self.nat_addr) / - TCP(sport=server_in_port, dport=host_out_port)) + p = ( + Ether(src=server.mac, dst=self.pg0.local_mac) + / IP(src=server.ip4, dst=self.nat_addr) + / TCP(sport=server_in_port, dport=host_out_port) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2132,16 +2262,18 @@ class TestNAT44EI(MethodHolder): self.logger.error(ppp("Unexpected or invalid packet:", p)) raise - after = self.statistics['/nat44-ei/hairpinning'] + after = self.statistics["/nat44-ei/hairpinning"] if_idx = self.pg0.sw_if_index - self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), - 2+(1 if self.vpp_worker_count > 0 else 0)) + self.assertEqual( + after[:, if_idx].sum() - cnt[:, if_idx].sum(), + 2 + (1 if self.vpp_worker_count > 0 else 0), + ) def test_interface_addr(self): - """ NAT44EI acquire addresses from interface """ + """NAT44EI acquire addresses from interface""" self.vapi.nat44_ei_add_del_interface_addr( - is_add=1, - sw_if_index=self.pg7.sw_if_index) + is_add=1, sw_if_index=self.pg7.sw_if_index + ) # no address in NAT pool addresses = self.vapi.nat44_ei_address_dump() @@ -2159,22 +2291,20 @@ class TestNAT44EI(MethodHolder): self.assertEqual(0, len(addresses)) def test_interface_addr_static_mapping(self): - """ NAT44EI Static mapping with addresses from interface """ + """NAT44EI Static mapping with addresses from interface""" tag = "testTAG" self.vapi.nat44_ei_add_del_interface_addr( - is_add=1, - sw_if_index=self.pg7.sw_if_index) + is_add=1, sw_if_index=self.pg7.sw_if_index + ) self.nat44_add_static_mapping( - '1.2.3.4', - external_sw_if_index=self.pg7.sw_if_index, - tag=tag) + "1.2.3.4", external_sw_if_index=self.pg7.sw_if_index, tag=tag + ) # static mappings with external interface static_mappings = self.vapi.nat44_ei_static_mapping_dump() self.assertEqual(1, len(static_mappings)) - self.assertEqual(self.pg7.sw_if_index, - static_mappings[0].external_sw_if_index) + self.assertEqual(self.pg7.sw_if_index, static_mappings[0].external_sw_if_index) self.assertEqual(static_mappings[0].tag, tag) # configure interface address and check static mappings @@ -2184,8 +2314,7 @@ class TestNAT44EI(MethodHolder): resolved = False for sm in static_mappings: if sm.external_sw_if_index == 0xFFFFFFFF: - self.assertEqual(str(sm.external_ip_address), - self.pg7.local_ip4) + self.assertEqual(str(sm.external_ip_address), self.pg7.local_ip4) self.assertEqual(sm.tag, tag) resolved = True self.assertTrue(resolved) @@ -2194,8 +2323,7 @@ class TestNAT44EI(MethodHolder): self.pg7.unconfig_ip4() static_mappings = self.vapi.nat44_ei_static_mapping_dump() self.assertEqual(1, len(static_mappings)) - self.assertEqual(self.pg7.sw_if_index, - static_mappings[0].external_sw_if_index) + self.assertEqual(self.pg7.sw_if_index, static_mappings[0].external_sw_if_index) self.assertEqual(static_mappings[0].tag, tag) # configure interface address again and check static mappings @@ -2205,40 +2333,37 @@ class TestNAT44EI(MethodHolder): resolved = False for sm in static_mappings: if sm.external_sw_if_index == 0xFFFFFFFF: - self.assertEqual(str(sm.external_ip_address), - self.pg7.local_ip4) + self.assertEqual(str(sm.external_ip_address), self.pg7.local_ip4) self.assertEqual(sm.tag, tag) resolved = True self.assertTrue(resolved) # remove static mapping self.nat44_add_static_mapping( - '1.2.3.4', - external_sw_if_index=self.pg7.sw_if_index, - tag=tag, - is_add=0) + "1.2.3.4", external_sw_if_index=self.pg7.sw_if_index, tag=tag, is_add=0 + ) static_mappings = self.vapi.nat44_ei_static_mapping_dump() self.assertEqual(0, len(static_mappings)) def test_interface_addr_identity_nat(self): - """ NAT44EI Identity NAT with addresses from interface """ + """NAT44EI Identity NAT with addresses from interface""" port = 53053 self.vapi.nat44_ei_add_del_interface_addr( - is_add=1, - sw_if_index=self.pg7.sw_if_index) + is_add=1, sw_if_index=self.pg7.sw_if_index + ) self.vapi.nat44_ei_add_del_identity_mapping( - ip_address=b'0', + ip_address=b"0", sw_if_index=self.pg7.sw_if_index, port=port, protocol=IP_PROTOS.tcp, - is_add=1) + is_add=1, + ) # identity mappings with external interface identity_mappings = self.vapi.nat44_ei_identity_mapping_dump() self.assertEqual(1, len(identity_mappings)) - self.assertEqual(self.pg7.sw_if_index, - identity_mappings[0].sw_if_index) + self.assertEqual(self.pg7.sw_if_index, identity_mappings[0].sw_if_index) # configure interface address and check identity mappings self.pg7.config_ip4() @@ -2247,8 +2372,9 @@ class TestNAT44EI(MethodHolder): self.assertEqual(2, len(identity_mappings)) for sm in identity_mappings: if sm.sw_if_index == 0xFFFFFFFF: - self.assertEqual(str(identity_mappings[0].ip_address), - self.pg7.local_ip4) + self.assertEqual( + str(identity_mappings[0].ip_address), self.pg7.local_ip4 + ) self.assertEqual(port, identity_mappings[0].port) self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol) resolved = True @@ -2258,11 +2384,10 @@ class TestNAT44EI(MethodHolder): self.pg7.unconfig_ip4() identity_mappings = self.vapi.nat44_ei_identity_mapping_dump() self.assertEqual(1, len(identity_mappings)) - self.assertEqual(self.pg7.sw_if_index, - identity_mappings[0].sw_if_index) + self.assertEqual(self.pg7.sw_if_index, identity_mappings[0].sw_if_index) def test_ipfix_nat44_sess(self): - """ NAT44EI IPFIX logging NAT44EI session created/deleted """ + """NAT44EI IPFIX logging NAT44EI session created/deleted""" self.ipfix_domain_id = 10 self.ipfix_src_port = 20202 collector_port = 30303 @@ -2270,19 +2395,21 @@ class TestNAT44EI(MethodHolder): self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4, - src_address=self.pg3.local_ip4, - path_mtu=512, - template_interval=10, - collector_port=collector_port) - self.vapi.nat44_ei_ipfix_enable_disable(domain_id=self.ipfix_domain_id, - src_port=self.ipfix_src_port, - enable=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) + self.vapi.set_ipfix_exporter( + collector_address=self.pg3.remote_ip4, + src_address=self.pg3.local_ip4, + path_mtu=512, + template_interval=10, + collector_port=collector_port, + ) + self.vapi.nat44_ei_ipfix_enable_disable( + domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1 + ) pkts = self.create_stream_in(self.pg0, self.pg1) self.pg0.add_stream(pkts) @@ -2301,8 +2428,7 @@ class TestNAT44EI(MethodHolder): self.assertEqual(p[IP].dst, self.pg3.remote_ip4) self.assertEqual(p[UDP].sport, self.ipfix_src_port) self.assertEqual(p[UDP].dport, collector_port) - self.assertEqual(p[IPFIX].observationDomainID, - self.ipfix_domain_id) + self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id) if p.haslayer(Template): ipfix.add_template(p.getlayer(Template)) # verify events in data set @@ -2312,25 +2438,29 @@ class TestNAT44EI(MethodHolder): self.verify_ipfix_nat44_ses(data) def test_ipfix_addr_exhausted(self): - """ NAT44EI IPFIX logging NAT addresses exhausted """ + """NAT44EI IPFIX logging NAT addresses exhausted""" flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4, - src_address=self.pg3.local_ip4, - path_mtu=512, - template_interval=10) - self.vapi.nat44_ei_ipfix_enable_disable(domain_id=self.ipfix_domain_id, - src_port=self.ipfix_src_port, - enable=1) - - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=3025)) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) + self.vapi.set_ipfix_exporter( + collector_address=self.pg3.remote_ip4, + src_address=self.pg3.local_ip4, + path_mtu=512, + template_interval=10, + ) + self.vapi.nat44_ei_ipfix_enable_disable( + domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1 + ) + + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=3025) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2345,8 +2475,7 @@ class TestNAT44EI(MethodHolder): self.assertEqual(p[IP].dst, self.pg3.remote_ip4) self.assertEqual(p[UDP].sport, self.ipfix_src_port) self.assertEqual(p[UDP].dport, 4739) - self.assertEqual(p[IPFIX].observationDomainID, - self.ipfix_domain_id) + self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id) if p.haslayer(Template): ipfix.add_template(p.getlayer(Template)) # verify events in data set @@ -2356,15 +2485,15 @@ class TestNAT44EI(MethodHolder): self.verify_ipfix_addr_exhausted(data) def test_ipfix_max_sessions(self): - """ NAT44EI IPFIX logging maximum session entries exceeded """ + """NAT44EI IPFIX logging maximum session entries exceeded""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) max_sessions_per_thread = self.max_translations max_sessions = max(1, self.vpp_worker_count) * max_sessions_per_thread @@ -2372,26 +2501,32 @@ class TestNAT44EI(MethodHolder): pkts = [] for i in range(0, max_sessions): src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=src, dst=self.pg1.remote_ip4) / - TCP(sport=1025)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=src, dst=self.pg1.remote_ip4) + / TCP(sport=1025) + ) pkts.append(p) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.pg1.get_capture(max_sessions) - self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4, - src_address=self.pg3.local_ip4, - path_mtu=512, - template_interval=10) - self.vapi.nat44_ei_ipfix_enable_disable(domain_id=self.ipfix_domain_id, - src_port=self.ipfix_src_port, - enable=1) - - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=1025)) + self.vapi.set_ipfix_exporter( + collector_address=self.pg3.remote_ip4, + src_address=self.pg3.local_ip4, + path_mtu=512, + template_interval=10, + ) + self.vapi.nat44_ei_ipfix_enable_disable( + domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1 + ) + + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=1025) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2406,8 +2541,7 @@ class TestNAT44EI(MethodHolder): self.assertEqual(p[IP].dst, self.pg3.remote_ip4) self.assertEqual(p[UDP].sport, self.ipfix_src_port) self.assertEqual(p[UDP].dport, 4739) - self.assertEqual(p[IPFIX].observationDomainID, - self.ipfix_domain_id) + self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id) if p.haslayer(Template): ipfix.add_template(p.getlayer(Template)) # verify events in data set @@ -2417,22 +2551,23 @@ class TestNAT44EI(MethodHolder): self.verify_ipfix_max_sessions(data, max_sessions_per_thread) def test_syslog_apmap(self): - """ NAT44EI syslog address and port mapping creation and deletion """ - self.vapi.syslog_set_filter( - self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO) + """NAT44EI syslog address and port mapping creation and deletion""" + self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO) self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4) self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=20)) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) + + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=self.tcp_port_in, dport=20) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2448,22 +2583,25 @@ class TestNAT44EI(MethodHolder): self.verify_syslog_apmap(capture[0][Raw].load, False) def test_pool_addr_fib(self): - """ NAT44EI add pool addresses to FIB """ - static_addr = '10.0.0.10' + """NAT44EI add pool addresses to FIB""" + static_addr = "10.0.0.10" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr) # NAT44EI address - p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') / - ARP(op=ARP.who_has, pdst=self.nat_addr, - psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac)) + p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP( + op=ARP.who_has, + pdst=self.nat_addr, + psrc=self.pg1.remote_ip4, + hwsrc=self.pg1.remote_mac, + ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2472,9 +2610,12 @@ class TestNAT44EI(MethodHolder): self.assertTrue(capture[0][ARP].op, ARP.is_at) # 1:1 NAT address - p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') / - ARP(op=ARP.who_has, pdst=static_addr, - psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac)) + p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP( + op=ARP.who_has, + pdst=static_addr, + psrc=self.pg1.remote_ip4, + hwsrc=self.pg1.remote_mac, + ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2483,9 +2624,12 @@ class TestNAT44EI(MethodHolder): self.assertTrue(capture[0][ARP].op, ARP.is_at) # send ARP to non-NAT44EI interface - p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') / - ARP(op=ARP.who_has, pdst=self.nat_addr, - psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac)) + p = Ether(src=self.pg2.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP( + op=ARP.who_has, + pdst=self.nat_addr, + psrc=self.pg2.remote_ip4, + hwsrc=self.pg2.remote_mac, + ) self.pg2.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2493,27 +2637,32 @@ class TestNAT44EI(MethodHolder): # remove addresses and verify self.nat44_add_address(self.nat_addr, is_add=0) - self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr, - is_add=0) - - p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') / - ARP(op=ARP.who_has, pdst=self.nat_addr, - psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac)) + self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr, is_add=0) + + p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP( + op=ARP.who_has, + pdst=self.nat_addr, + psrc=self.pg1.remote_ip4, + hwsrc=self.pg1.remote_mac, + ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.pg1.assert_nothing_captured() - p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') / - ARP(op=ARP.who_has, pdst=static_addr, - psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac)) + p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP( + op=ARP.who_has, + pdst=static_addr, + psrc=self.pg1.remote_ip4, + hwsrc=self.pg1.remote_mac, + ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.pg1.assert_nothing_captured() def test_vrf_mode(self): - """ NAT44EI tenant VRF aware address pool mode """ + """NAT44EI tenant VRF aware address pool mode""" vrf_id1 = 1 vrf_id2 = 2 @@ -2522,8 +2671,8 @@ class TestNAT44EI(MethodHolder): self.pg0.unconfig_ip4() self.pg1.unconfig_ip4() - self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id1}) - self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id2}) + self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id1}) + self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id2}) self.pg0.set_table_ip4(vrf_id1) self.pg1.set_table_ip4(vrf_id2) self.pg0.config_ip4() @@ -2535,14 +2684,14 @@ class TestNAT44EI(MethodHolder): self.nat44_add_address(nat_ip2, vrf_id=vrf_id2) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg2.sw_if_index, - is_add=1) + sw_if_index=self.pg2.sw_if_index, is_add=1 + ) try: # first VRF @@ -2570,11 +2719,11 @@ class TestNAT44EI(MethodHolder): self.pg1.config_ip4() self.pg0.resolve_arp() self.pg1.resolve_arp() - self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id1}) - self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id2}) + self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id1}) + self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id2}) def test_vrf_feature_independent(self): - """ NAT44EI tenant VRF independent address pool mode """ + """NAT44EI tenant VRF independent address pool mode""" nat_ip1 = "10.0.0.10" nat_ip2 = "10.0.0.11" @@ -2583,14 +2732,14 @@ class TestNAT44EI(MethodHolder): self.nat44_add_address(nat_ip2, vrf_id=99) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg2.sw_if_index, - is_add=1) + sw_if_index=self.pg2.sw_if_index, is_add=1 + ) # first VRF pkts = self.create_stream_in(self.pg0, self.pg2) @@ -2609,16 +2758,16 @@ class TestNAT44EI(MethodHolder): self.verify_capture_out(capture, nat_ip1) def test_dynamic_ipless_interfaces(self): - """ NAT44EI interfaces without configured IP address """ + """NAT44EI interfaces without configured IP address""" self.create_routes_and_neigbors() self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg7.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg8.sw_if_index, - is_add=1) + sw_if_index=self.pg8.sw_if_index, is_add=1 + ) # in2out pkts = self.create_stream_in(self.pg7, self.pg8) @@ -2637,17 +2786,17 @@ class TestNAT44EI(MethodHolder): self.verify_capture_in(capture, self.pg7) def test_static_ipless_interfaces(self): - """ NAT44EI interfaces without configured IP address - 1:1 NAT """ + """NAT44EI interfaces without configured IP address - 1:1 NAT""" self.create_routes_and_neigbors() self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg7.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg8.sw_if_index, - is_add=1) + sw_if_index=self.pg8.sw_if_index, is_add=1 + ) # out2in pkts = self.create_stream_out(self.pg8) @@ -2666,7 +2815,7 @@ class TestNAT44EI(MethodHolder): self.verify_capture_out(capture, self.nat_addr, True) def test_static_with_port_ipless_interfaces(self): - """ NAT44EI interfaces without configured IP address - 1:1 NAPT """ + """NAT44EI interfaces without configured IP address - 1:1 NAPT""" self.tcp_port_out = 30606 self.udp_port_out = 30607 @@ -2674,22 +2823,34 @@ class TestNAT44EI(MethodHolder): self.create_routes_and_neigbors() self.nat44_add_address(self.nat_addr) - self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr, - self.tcp_port_in, self.tcp_port_out, - proto=IP_PROTOS.tcp) - self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr, - self.udp_port_in, self.udp_port_out, - proto=IP_PROTOS.udp) - self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr, - self.icmp_id_in, self.icmp_id_out, - proto=IP_PROTOS.icmp) + self.nat44_add_static_mapping( + self.pg7.remote_ip4, + self.nat_addr, + self.tcp_port_in, + self.tcp_port_out, + proto=IP_PROTOS.tcp, + ) + self.nat44_add_static_mapping( + self.pg7.remote_ip4, + self.nat_addr, + self.udp_port_in, + self.udp_port_out, + proto=IP_PROTOS.udp, + ) + self.nat44_add_static_mapping( + self.pg7.remote_ip4, + self.nat_addr, + self.icmp_id_in, + self.icmp_id_out, + proto=IP_PROTOS.icmp, + ) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg7.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg8.sw_if_index, - is_add=1) + sw_if_index=self.pg8.sw_if_index, is_add=1 + ) # out2in pkts = self.create_stream_out(self.pg8) @@ -2708,23 +2869,25 @@ class TestNAT44EI(MethodHolder): self.verify_capture_out(capture) def test_static_unknown_proto(self): - """ NAT44EI 1:1 translate packet with unknown protocol """ + """NAT44EI 1:1 translate packet with unknown protocol""" nat_ip = "10.0.0.10" self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # in2out - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - GRE() / - IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) / - TCP(sport=1234, dport=1234)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / GRE() + / IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) + / TCP(sport=1234, dport=1234) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2740,11 +2903,13 @@ class TestNAT44EI(MethodHolder): raise # out2in - p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / - IP(src=self.pg1.remote_ip4, dst=nat_ip) / - GRE() / - IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) / - TCP(sport=1234, dport=1234)) + p = ( + Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) + / IP(src=self.pg1.remote_ip4, dst=nat_ip) + / GRE() + / IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) + / TCP(sport=1234, dport=1234) + ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2760,8 +2925,7 @@ class TestNAT44EI(MethodHolder): raise def test_hairpinning_static_unknown_proto(self): - """ NAT44EI 1:1 translate packet with unknown protocol - hairpinning - """ + """NAT44EI 1:1 translate packet with unknown protocol - hairpinning""" host = self.pg0.remote_hosts[0] server = self.pg0.remote_hosts[1] @@ -2773,18 +2937,20 @@ class TestNAT44EI(MethodHolder): self.nat44_add_static_mapping(server.ip4, server_nat_ip) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # host to server - p = (Ether(dst=self.pg0.local_mac, src=host.mac) / - IP(src=host.ip4, dst=server_nat_ip) / - GRE() / - IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) / - TCP(sport=1234, dport=1234)) + p = ( + Ether(dst=self.pg0.local_mac, src=host.mac) + / IP(src=host.ip4, dst=server_nat_ip) + / GRE() + / IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) + / TCP(sport=1234, dport=1234) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2800,11 +2966,13 @@ class TestNAT44EI(MethodHolder): raise # server to host - p = (Ether(dst=self.pg0.local_mac, src=server.mac) / - IP(src=server.ip4, dst=host_nat_ip) / - GRE() / - IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) / - TCP(sport=1234, dport=1234)) + p = ( + Ether(dst=self.pg0.local_mac, src=server.mac) + / IP(src=server.ip4, dst=host_nat_ip) + / GRE() + / IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) + / TCP(sport=1234, dport=1234) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2820,10 +2988,11 @@ class TestNAT44EI(MethodHolder): raise def test_output_feature(self): - """ NAT44EI output feature (in2out postrouting) """ + """NAT44EI output feature (in2out postrouting)""" self.nat44_add_address(self.nat_addr) self.vapi.nat44_ei_add_del_output_interface( - sw_if_index=self.pg3.sw_if_index, is_add=1) + sw_if_index=self.pg3.sw_if_index, is_add=1 + ) # in2out pkts = self.create_stream_in(self.pg0, self.pg3) @@ -2850,25 +3019,32 @@ class TestNAT44EI(MethodHolder): self.verify_capture_no_translation(capture, self.pg2, self.pg0) def test_output_feature_vrf_aware(self): - """ NAT44EI output feature VRF aware (in2out postrouting) """ + """NAT44EI output feature VRF aware (in2out postrouting)""" nat_ip_vrf10 = "10.0.0.10" nat_ip_vrf20 = "10.0.0.20" - r1 = VppIpRoute(self, self.pg3.remote_ip4, 32, - [VppRoutePath(self.pg3.remote_ip4, - self.pg3.sw_if_index)], - table_id=10) - r2 = VppIpRoute(self, self.pg3.remote_ip4, 32, - [VppRoutePath(self.pg3.remote_ip4, - self.pg3.sw_if_index)], - table_id=20) + r1 = VppIpRoute( + self, + self.pg3.remote_ip4, + 32, + [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)], + table_id=10, + ) + r2 = VppIpRoute( + self, + self.pg3.remote_ip4, + 32, + [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)], + table_id=20, + ) r1.add_vpp_config() r2.add_vpp_config() self.nat44_add_address(nat_ip_vrf10, vrf_id=10) self.nat44_add_address(nat_ip_vrf20, vrf_id=20) self.vapi.nat44_ei_add_del_output_interface( - sw_if_index=self.pg3.sw_if_index, is_add=1) + sw_if_index=self.pg3.sw_if_index, is_add=1 + ) # in2out VRF 10 pkts = self.create_stream_in(self.pg4, self.pg3) @@ -2903,7 +3079,7 @@ class TestNAT44EI(MethodHolder): self.verify_capture_in(capture, self.pg6) def test_output_feature_hairpinning(self): - """ NAT44EI output feature hairpinning (in2out postrouting) """ + """NAT44EI output feature hairpinning (in2out postrouting)""" host = self.pg0.remote_hosts[0] server = self.pg0.remote_hosts[1] host_in_port = 1234 @@ -2913,19 +3089,27 @@ class TestNAT44EI(MethodHolder): self.nat44_add_address(self.nat_addr) self.vapi.nat44_ei_add_del_output_interface( - sw_if_index=self.pg0.sw_if_index, is_add=1) + sw_if_index=self.pg0.sw_if_index, is_add=1 + ) self.vapi.nat44_ei_add_del_output_interface( - sw_if_index=self.pg1.sw_if_index, is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # add static mapping for server - self.nat44_add_static_mapping(server.ip4, self.nat_addr, - server_in_port, server_out_port, - proto=IP_PROTOS.tcp) + self.nat44_add_static_mapping( + server.ip4, + self.nat_addr, + server_in_port, + server_out_port, + proto=IP_PROTOS.tcp, + ) # send packet from host to server - p = (Ether(src=host.mac, dst=self.pg0.local_mac) / - IP(src=host.ip4, dst=self.nat_addr) / - TCP(sport=host_in_port, dport=server_out_port)) + p = ( + Ether(src=host.mac, dst=self.pg0.local_mac) + / IP(src=host.ip4, dst=self.nat_addr) + / TCP(sport=host_in_port, dport=server_out_port) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2945,9 +3129,11 @@ class TestNAT44EI(MethodHolder): raise # send reply from server to host - p = (Ether(src=server.mac, dst=self.pg0.local_mac) / - IP(src=server.ip4, dst=self.nat_addr) / - TCP(sport=server_in_port, dport=host_out_port)) + p = ( + Ether(src=server.mac, dst=self.pg0.local_mac) + / IP(src=server.ip4, dst=self.nat_addr) + / TCP(sport=server_in_port, dport=host_out_port) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2966,7 +3152,7 @@ class TestNAT44EI(MethodHolder): raise def test_one_armed_nat44(self): - """ NAT44EI One armed NAT """ + """NAT44EI One armed NAT""" remote_host = self.pg9.remote_hosts[0] local_host = self.pg9.remote_hosts[1] external_port = 0 @@ -2974,16 +3160,18 @@ class TestNAT44EI(MethodHolder): self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg9.sw_if_index, - is_add=1) + sw_if_index=self.pg9.sw_if_index, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg9.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg9.sw_if_index, flags=flags, is_add=1 + ) # in2out - p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) / - IP(src=local_host.ip4, dst=remote_host.ip4) / - TCP(sport=12345, dport=80)) + p = ( + Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) + / IP(src=local_host.ip4, dst=remote_host.ip4) + / TCP(sport=12345, dport=80) + ) self.pg9.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -3003,9 +3191,11 @@ class TestNAT44EI(MethodHolder): raise # out2in - p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) / - IP(src=remote_host.ip4, dst=self.nat_addr) / - TCP(sport=80, dport=external_port)) + p = ( + Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) + / IP(src=remote_host.ip4, dst=self.nat_addr) + / TCP(sport=80, dport=external_port) + ) self.pg9.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -3028,21 +3218,21 @@ class TestNAT44EI(MethodHolder): else: node = "nat44-ei-classify" - err = self.statistics.get_err_counter('/err/%s/next in2out' % node) + err = self.statistics.get_err_counter("/err/%s/next in2out" % node) self.assertEqual(err, 1) - err = self.statistics.get_err_counter('/err/%s/next out2in' % node) + err = self.statistics.get_err_counter("/err/%s/next out2in" % node) self.assertEqual(err, 1) def test_del_session(self): - """ NAT44EI delete session """ + """NAT44EI delete session""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) pkts = self.create_stream_in(self.pg0, self.pg1) self.pg0.add_stream(pkts) @@ -3057,12 +3247,14 @@ class TestNAT44EI(MethodHolder): address=sessions[0].inside_ip_address, port=sessions[0].inside_port, protocol=sessions[0].protocol, - flags=self.config_flags.NAT44_EI_IF_INSIDE) + flags=self.config_flags.NAT44_EI_IF_INSIDE, + ) self.vapi.nat44_ei_del_session( address=sessions[1].outside_ip_address, port=sessions[1].outside_port, - protocol=sessions[1].protocol) + protocol=sessions[1].protocol, + ) sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0) self.assertEqual(nsessions - len(sessions), 2) @@ -3071,60 +3263,56 @@ class TestNAT44EI(MethodHolder): address=sessions[0].inside_ip_address, port=sessions[0].inside_port, protocol=sessions[0].protocol, - flags=self.config_flags.NAT44_EI_IF_INSIDE) + flags=self.config_flags.NAT44_EI_IF_INSIDE, + ) self.verify_no_nat44_user() def test_frag_in_order(self): - """ NAT44EI translate fragments arriving in order """ + """NAT44EI translate fragments arriving in order""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) self.frag_in_order(proto=IP_PROTOS.tcp) self.frag_in_order(proto=IP_PROTOS.udp) self.frag_in_order(proto=IP_PROTOS.icmp) def test_frag_forwarding(self): - """ NAT44EI forwarding fragment test """ + """NAT44EI forwarding fragment test""" self.vapi.nat44_ei_add_del_interface_addr( - is_add=1, - sw_if_index=self.pg1.sw_if_index) + is_add=1, sw_if_index=self.pg1.sw_if_index + ) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) self.vapi.nat44_ei_forwarding_enable_disable(enable=1) data = b"A" * 16 + b"B" * 16 + b"C" * 3 - pkts = self.create_stream_frag(self.pg1, - self.pg0.remote_ip4, - 4789, - 4789, - data, - proto=IP_PROTOS.udp) + pkts = self.create_stream_frag( + self.pg1, self.pg0.remote_ip4, 4789, 4789, data, proto=IP_PROTOS.udp + ) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg0.get_capture(len(pkts)) - p = self.reass_frags_and_verify(frags, - self.pg1.remote_ip4, - self.pg0.remote_ip4) + p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4) self.assertEqual(p[UDP].sport, 4789) self.assertEqual(p[UDP].dport, 4789) self.assertEqual(data, p[Raw].load) def test_reass_hairpinning(self): - """ NAT44EI fragments hairpinning """ + """NAT44EI fragments hairpinning""" server_addr = self.pg0.remote_hosts[1].ip4 host_in_port = random.randint(1025, 65535) @@ -3134,63 +3322,85 @@ class TestNAT44EI(MethodHolder): self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # add static mapping for server - self.nat44_add_static_mapping(server_addr, self.nat_addr, - server_in_port, - server_out_port, - proto=IP_PROTOS.tcp) - self.nat44_add_static_mapping(server_addr, self.nat_addr, - server_in_port, - server_out_port, - proto=IP_PROTOS.udp) + self.nat44_add_static_mapping( + server_addr, + self.nat_addr, + server_in_port, + server_out_port, + proto=IP_PROTOS.tcp, + ) + self.nat44_add_static_mapping( + server_addr, + self.nat_addr, + server_in_port, + server_out_port, + proto=IP_PROTOS.udp, + ) self.nat44_add_static_mapping(server_addr, self.nat_addr) - self.reass_hairpinning(server_addr, server_in_port, server_out_port, - host_in_port, proto=IP_PROTOS.tcp) - self.reass_hairpinning(server_addr, server_in_port, server_out_port, - host_in_port, proto=IP_PROTOS.udp) - self.reass_hairpinning(server_addr, server_in_port, server_out_port, - host_in_port, proto=IP_PROTOS.icmp) + self.reass_hairpinning( + server_addr, + server_in_port, + server_out_port, + host_in_port, + proto=IP_PROTOS.tcp, + ) + self.reass_hairpinning( + server_addr, + server_in_port, + server_out_port, + host_in_port, + proto=IP_PROTOS.udp, + ) + self.reass_hairpinning( + server_addr, + server_in_port, + server_out_port, + host_in_port, + proto=IP_PROTOS.icmp, + ) def test_frag_out_of_order(self): - """ NAT44EI translate fragments arriving out of order """ + """NAT44EI translate fragments arriving out of order""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) self.frag_out_of_order(proto=IP_PROTOS.tcp) self.frag_out_of_order(proto=IP_PROTOS.udp) self.frag_out_of_order(proto=IP_PROTOS.icmp) def test_port_restricted(self): - """ NAT44EI Port restricted NAT44EI (MAP-E CE) """ + """NAT44EI Port restricted NAT44EI (MAP-E CE)""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - self.vapi.nat44_ei_set_addr_and_port_alloc_alg(alg=1, - psid_offset=6, - psid_length=6, - psid=10) - - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=4567, dport=22)) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) + self.vapi.nat44_ei_set_addr_and_port_alloc_alg( + alg=1, psid_offset=6, psid_length=6, psid=10 + ) + + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=4567, dport=22) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -3210,24 +3420,26 @@ class TestNAT44EI(MethodHolder): raise def test_port_range(self): - """ NAT44EI External address port range """ + """NAT44EI External address port range""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - self.vapi.nat44_ei_set_addr_and_port_alloc_alg(alg=2, - start_port=1025, - end_port=1027) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) + self.vapi.nat44_ei_set_addr_and_port_alloc_alg( + alg=2, start_port=1025, end_port=1027 + ) pkts = [] for port in range(0, 5): - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=1125 + port)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=1125 + port) + ) pkts.append(p) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) @@ -3239,14 +3451,14 @@ class TestNAT44EI(MethodHolder): self.assertLessEqual(tcp.sport, 1027) def test_multiple_outside_vrf(self): - """ NAT44EI Multiple outside VRF """ + """NAT44EI Multiple outside VRF""" vrf_id1 = 1 vrf_id2 = 2 self.pg1.unconfig_ip4() self.pg2.unconfig_ip4() - self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id1}) - self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id2}) + self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id1}) + self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id2}) self.pg1.set_table_ip4(vrf_id1) self.pg2.set_table_ip4(vrf_id2) self.pg1.config_ip4() @@ -3257,14 +3469,14 @@ class TestNAT44EI(MethodHolder): self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg2.sw_if_index, - is_add=1) + sw_if_index=self.pg2.sw_if_index, is_add=1 + ) try: # first VRF @@ -3313,20 +3525,26 @@ class TestNAT44EI(MethodHolder): self.pg2.resolve_arp() def test_mss_clamping(self): - """ NAT44EI TCP MSS clamping """ + """NAT44EI TCP MSS clamping""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, - flags="S", options=[('MSS', 1400)])) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) + + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP( + sport=self.tcp_port_in, + dport=self.tcp_external_port, + flags="S", + options=[("MSS", 1400)], + ) + ) self.vapi.nat44_ei_set_mss_clamping(enable=1, mss_value=1000) self.pg0.add_stream(p) @@ -3353,21 +3571,22 @@ class TestNAT44EI(MethodHolder): self.verify_mss_value(capture[0], 1400) def test_ha_send(self): - """ NAT44EI Send HA session synchronization events (active) """ + """NAT44EI Send HA session synchronization events (active)""" flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) self.nat44_add_address(self.nat_addr) self.vapi.nat44_ei_ha_set_listener( - ip_address=self.pg3.local_ip4, port=12345, path_mtu=512) + ip_address=self.pg3.local_ip4, port=12345, path_mtu=512 + ) self.vapi.nat44_ei_ha_set_failover( - ip_address=self.pg3.remote_ip4, port=12346, - session_refresh_interval=10) + ip_address=self.pg3.remote_ip4, port=12346, session_refresh_interval=10 + ) bind_layers(UDP, HANATStateSync, sport=12345) # create sessions @@ -3379,7 +3598,7 @@ class TestNAT44EI(MethodHolder): self.verify_capture_out(capture) # active send HA events self.vapi.nat44_ei_ha_flush() - stats = self.statistics['/nat44-ei/ha/add-event-send'] + stats = self.statistics["/nat44-ei/ha/add-event-send"] self.assertEqual(stats[:, 0].sum(), 3) capture = self.pg3.get_capture(1) p = capture[0] @@ -3407,23 +3626,29 @@ class TestNAT44EI(MethodHolder): self.assertEqual(event.fib_index, 0) # ACK received events - ack = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) / - IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) / - UDP(sport=12346, dport=12345) / - HANATStateSync(sequence_number=seq, flags='ACK', - thread_index=hanat.thread_index)) + ack = ( + Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) + / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) + / UDP(sport=12346, dport=12345) + / HANATStateSync( + sequence_number=seq, flags="ACK", thread_index=hanat.thread_index + ) + ) self.pg3.add_stream(ack) self.pg_start() - stats = self.statistics['/nat44-ei/ha/ack-recv'] + stats = self.statistics["/nat44-ei/ha/ack-recv"] self.assertEqual(stats[:, 0].sum(), 1) # delete one session self.pg_enable_capture(self.pg_interfaces) self.vapi.nat44_ei_del_session( - address=self.pg0.remote_ip4, port=self.tcp_port_in, - protocol=IP_PROTOS.tcp, flags=self.config_flags.NAT44_EI_IF_INSIDE) + address=self.pg0.remote_ip4, + port=self.tcp_port_in, + protocol=IP_PROTOS.tcp, + flags=self.config_flags.NAT44_EI_IF_INSIDE, + ) self.vapi.nat44_ei_ha_flush() - stats = self.statistics['/nat44-ei/ha/del-event-send'] + stats = self.statistics["/nat44-ei/ha/del-event-send"] self.assertEqual(stats[:, 0].sum(), 1) capture = self.pg3.get_capture(1) p = capture[0] @@ -3438,9 +3663,9 @@ class TestNAT44EI(MethodHolder): # do not send ACK, active retry send HA event again self.pg_enable_capture(self.pg_interfaces) self.virtual_sleep(12) - stats = self.statistics['/nat44-ei/ha/retry-count'] + stats = self.statistics["/nat44-ei/ha/retry-count"] self.assertEqual(stats[:, 0].sum(), 3) - stats = self.statistics['/nat44-ei/ha/missed-count'] + stats = self.statistics["/nat44-ei/ha/missed-count"] self.assertEqual(stats[:, 0].sum(), 1) capture = self.pg3.get_capture(3) for packet in capture: @@ -3453,7 +3678,7 @@ class TestNAT44EI(MethodHolder): self.pg_start() self.pg0.get_capture(2) self.vapi.nat44_ei_ha_flush() - stats = self.statistics['/nat44-ei/ha/refresh-event-send'] + stats = self.statistics["/nat44-ei/ha/refresh-event-send"] self.assertEqual(stats[:, 0].sum(), 2) capture = self.pg3.get_capture(1) p = capture[0] @@ -3480,29 +3705,33 @@ class TestNAT44EI(MethodHolder): self.assertEqual(event.total_pkts, 2) self.assertGreater(event.total_bytes, 0) - stats = self.statistics['/nat44-ei/ha/ack-recv'] - ack = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) / - IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) / - UDP(sport=12346, dport=12345) / - HANATStateSync(sequence_number=seq, flags='ACK', - thread_index=hanat.thread_index)) + stats = self.statistics["/nat44-ei/ha/ack-recv"] + ack = ( + Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) + / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) + / UDP(sport=12346, dport=12345) + / HANATStateSync( + sequence_number=seq, flags="ACK", thread_index=hanat.thread_index + ) + ) self.pg3.add_stream(ack) self.pg_start() - stats = self.statistics['/nat44-ei/ha/ack-recv'] + stats = self.statistics["/nat44-ei/ha/ack-recv"] self.assertEqual(stats[:, 0].sum(), 2) def test_ha_recv(self): - """ NAT44EI Receive HA session synchronization events (passive) """ + """NAT44EI Receive HA session synchronization events (passive)""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - self.vapi.nat44_ei_ha_set_listener(ip_address=self.pg3.local_ip4, - port=12345, path_mtu=512) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) + self.vapi.nat44_ei_ha_set_listener( + ip_address=self.pg3.local_ip4, port=12345, path_mtu=512 + ) bind_layers(UDP, HANATStateSync, sport=12345) # this is a bit tricky - HA dictates thread index due to how it's @@ -3512,11 +3741,12 @@ class TestNAT44EI(MethodHolder): # IP address) and out2in (based on outside port) # first choose a thread index which is correct for IP - thread_index = get_nat44_ei_in2out_worker_index(self.pg0.remote_ip4, - self.vpp_worker_count) + thread_index = get_nat44_ei_in2out_worker_index( + self.pg0.remote_ip4, self.vpp_worker_count + ) # now pick a port which is correct for given thread - port_per_thread = int((0xffff-1024) / max(1, self.vpp_worker_count)) + port_per_thread = int((0xFFFF - 1024) / max(1, self.vpp_worker_count)) self.tcp_port_out = 1024 + random.randint(1, port_per_thread) self.udp_port_out = 1024 + random.randint(1, port_per_thread) if self.vpp_worker_count > 0: @@ -3524,25 +3754,43 @@ class TestNAT44EI(MethodHolder): self.udp_port_out += port_per_thread * (thread_index - 1) # send HA session add events to failover/passive - p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) / - IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) / - UDP(sport=12346, dport=12345) / - HANATStateSync(sequence_number=1, events=[ - Event(event_type='add', protocol='tcp', - in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr, - in_port=self.tcp_port_in, out_port=self.tcp_port_out, - eh_addr=self.pg1.remote_ip4, - ehn_addr=self.pg1.remote_ip4, - eh_port=self.tcp_external_port, - ehn_port=self.tcp_external_port, fib_index=0), - Event(event_type='add', protocol='udp', - in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr, - in_port=self.udp_port_in, out_port=self.udp_port_out, - eh_addr=self.pg1.remote_ip4, - ehn_addr=self.pg1.remote_ip4, - eh_port=self.udp_external_port, - ehn_port=self.udp_external_port, fib_index=0)], - thread_index=thread_index)) + p = ( + Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) + / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) + / UDP(sport=12346, dport=12345) + / HANATStateSync( + sequence_number=1, + events=[ + Event( + event_type="add", + protocol="tcp", + in_addr=self.pg0.remote_ip4, + out_addr=self.nat_addr, + in_port=self.tcp_port_in, + out_port=self.tcp_port_out, + eh_addr=self.pg1.remote_ip4, + ehn_addr=self.pg1.remote_ip4, + eh_port=self.tcp_external_port, + ehn_port=self.tcp_external_port, + fib_index=0, + ), + Event( + event_type="add", + protocol="udp", + in_addr=self.pg0.remote_ip4, + out_addr=self.nat_addr, + in_port=self.udp_port_in, + out_port=self.udp_port_out, + eh_addr=self.pg1.remote_ip4, + ehn_addr=self.pg1.remote_ip4, + eh_port=self.udp_external_port, + ehn_port=self.udp_external_port, + fib_index=0, + ), + ], + thread_index=thread_index, + ) + ) self.pg3.add_stream(p) self.pg_enable_capture(self.pg_interfaces) @@ -3557,49 +3805,57 @@ class TestNAT44EI(MethodHolder): raise else: self.assertEqual(hanat.sequence_number, 1) - self.assertEqual(hanat.flags, 'ACK') + self.assertEqual(hanat.flags, "ACK") self.assertEqual(hanat.version, 1) self.assertEqual(hanat.thread_index, thread_index) - stats = self.statistics['/nat44-ei/ha/ack-send'] + stats = self.statistics["/nat44-ei/ha/ack-send"] self.assertEqual(stats[:, 0].sum(), 1) - stats = self.statistics['/nat44-ei/ha/add-event-recv'] + stats = self.statistics["/nat44-ei/ha/add-event-recv"] self.assertEqual(stats[:, 0].sum(), 2) - users = self.statistics['/nat44-ei/total-users'] + users = self.statistics["/nat44-ei/total-users"] self.assertEqual(users[:, 0].sum(), 1) - sessions = self.statistics['/nat44-ei/total-sessions'] + sessions = self.statistics["/nat44-ei/total-sessions"] self.assertEqual(sessions[:, 0].sum(), 2) users = self.vapi.nat44_ei_user_dump() self.assertEqual(len(users), 1) - self.assertEqual(str(users[0].ip_address), - self.pg0.remote_ip4) + self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4) # there should be 2 sessions created by HA sessions = self.vapi.nat44_ei_user_session_dump( - users[0].ip_address, users[0].vrf_id) + users[0].ip_address, users[0].vrf_id + ) self.assertEqual(len(sessions), 2) for session in sessions: - self.assertEqual(str(session.inside_ip_address), - self.pg0.remote_ip4) - self.assertEqual(str(session.outside_ip_address), - self.nat_addr) - self.assertIn(session.inside_port, - [self.tcp_port_in, self.udp_port_in]) - self.assertIn(session.outside_port, - [self.tcp_port_out, self.udp_port_out]) + self.assertEqual(str(session.inside_ip_address), self.pg0.remote_ip4) + self.assertEqual(str(session.outside_ip_address), self.nat_addr) + self.assertIn(session.inside_port, [self.tcp_port_in, self.udp_port_in]) + self.assertIn(session.outside_port, [self.tcp_port_out, self.udp_port_out]) self.assertIn(session.protocol, [IP_PROTOS.tcp, IP_PROTOS.udp]) # send HA session delete event to failover/passive - p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) / - IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) / - UDP(sport=12346, dport=12345) / - HANATStateSync(sequence_number=2, events=[ - Event(event_type='del', protocol='udp', - in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr, - in_port=self.udp_port_in, out_port=self.udp_port_out, - eh_addr=self.pg1.remote_ip4, - ehn_addr=self.pg1.remote_ip4, - eh_port=self.udp_external_port, - ehn_port=self.udp_external_port, fib_index=0)], - thread_index=thread_index)) + p = ( + Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) + / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) + / UDP(sport=12346, dport=12345) + / HANATStateSync( + sequence_number=2, + events=[ + Event( + event_type="del", + protocol="udp", + in_addr=self.pg0.remote_ip4, + out_addr=self.nat_addr, + in_port=self.udp_port_in, + out_port=self.udp_port_out, + eh_addr=self.pg1.remote_ip4, + ehn_addr=self.pg1.remote_ip4, + eh_port=self.udp_external_port, + ehn_port=self.udp_external_port, + fib_index=0, + ) + ], + thread_index=thread_index, + ) + ) self.pg3.add_stream(p) self.pg_enable_capture(self.pg_interfaces) @@ -3614,37 +3870,49 @@ class TestNAT44EI(MethodHolder): raise else: self.assertEqual(hanat.sequence_number, 2) - self.assertEqual(hanat.flags, 'ACK') + self.assertEqual(hanat.flags, "ACK") self.assertEqual(hanat.version, 1) users = self.vapi.nat44_ei_user_dump() self.assertEqual(len(users), 1) - self.assertEqual(str(users[0].ip_address), - self.pg0.remote_ip4) + self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4) # now we should have only 1 session, 1 deleted by HA - sessions = self.vapi.nat44_ei_user_session_dump(users[0].ip_address, - users[0].vrf_id) + sessions = self.vapi.nat44_ei_user_session_dump( + users[0].ip_address, users[0].vrf_id + ) self.assertEqual(len(sessions), 1) - stats = self.statistics['/nat44-ei/ha/del-event-recv'] + stats = self.statistics["/nat44-ei/ha/del-event-recv"] self.assertEqual(stats[:, 0].sum(), 1) - stats = self.statistics.get_err_counter( - '/err/nat44-ei-ha/pkts-processed') + stats = self.statistics.get_err_counter("/err/nat44-ei-ha/pkts-processed") self.assertEqual(stats, 2) # send HA session refresh event to failover/passive - p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) / - IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) / - UDP(sport=12346, dport=12345) / - HANATStateSync(sequence_number=3, events=[ - Event(event_type='refresh', protocol='tcp', - in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr, - in_port=self.tcp_port_in, out_port=self.tcp_port_out, - eh_addr=self.pg1.remote_ip4, - ehn_addr=self.pg1.remote_ip4, - eh_port=self.tcp_external_port, - ehn_port=self.tcp_external_port, fib_index=0, - total_bytes=1024, total_pkts=2)], - thread_index=thread_index)) + p = ( + Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) + / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) + / UDP(sport=12346, dport=12345) + / HANATStateSync( + sequence_number=3, + events=[ + Event( + event_type="refresh", + protocol="tcp", + in_addr=self.pg0.remote_ip4, + out_addr=self.nat_addr, + in_port=self.tcp_port_in, + out_port=self.tcp_port_out, + eh_addr=self.pg1.remote_ip4, + ehn_addr=self.pg1.remote_ip4, + eh_port=self.tcp_external_port, + ehn_port=self.tcp_external_port, + fib_index=0, + total_bytes=1024, + total_pkts=2, + ) + ], + thread_index=thread_index, + ) + ) self.pg3.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -3658,29 +3926,30 @@ class TestNAT44EI(MethodHolder): raise else: self.assertEqual(hanat.sequence_number, 3) - self.assertEqual(hanat.flags, 'ACK') + self.assertEqual(hanat.flags, "ACK") self.assertEqual(hanat.version, 1) users = self.vapi.nat44_ei_user_dump() self.assertEqual(len(users), 1) - self.assertEqual(str(users[0].ip_address), - self.pg0.remote_ip4) + self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4) sessions = self.vapi.nat44_ei_user_session_dump( - users[0].ip_address, users[0].vrf_id) + users[0].ip_address, users[0].vrf_id + ) self.assertEqual(len(sessions), 1) session = sessions[0] self.assertEqual(session.total_bytes, 1024) self.assertEqual(session.total_pkts, 2) - stats = self.statistics['/nat44-ei/ha/refresh-event-recv'] + stats = self.statistics["/nat44-ei/ha/refresh-event-recv"] self.assertEqual(stats[:, 0].sum(), 1) - stats = self.statistics.get_err_counter( - '/err/nat44-ei-ha/pkts-processed') + stats = self.statistics.get_err_counter("/err/nat44-ei-ha/pkts-processed") self.assertEqual(stats, 3) # send packet to test session created by HA - p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=self.tcp_external_port, dport=self.tcp_port_out)) + p = ( + Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out) + ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -3706,7 +3975,7 @@ class TestNAT44EI(MethodHolder): return self.vapi.nat44_ei_show_fq_options().frame_queue_nelts def test_set_frame_queue_nelts(self): - """ NAT44EI API test - worker handoff frame queue elements """ + """NAT44EI API test - worker handoff frame queue elements""" self.assertEqual(self.reconfigure_frame_queue_nelts(512), 512) def show_commands_at_teardown(self): @@ -3718,11 +3987,10 @@ class TestNAT44EI(MethodHolder): self.logger.info(self.vapi.cli("show nat44 ei sessions detail")) self.logger.info(self.vapi.cli("show nat44 ei hash tables detail")) self.logger.info(self.vapi.cli("show nat44 ei ha")) - self.logger.info( - self.vapi.cli("show nat44 ei addr-port-assignment-alg")) + self.logger.info(self.vapi.cli("show nat44 ei addr-port-assignment-alg")) def test_outside_address_distribution(self): - """ Outside address distribution based on source address """ + """Outside address distribution based on source address""" x = 100 nat_addresses = [] @@ -3733,16 +4001,18 @@ class TestNAT44EI(MethodHolder): flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) self.vapi.nat44_ei_add_del_address_range( first_ip_address=nat_addresses[0], last_ip_address=nat_addresses[-1], - vrf_id=0xFFFFFFFF, is_add=1) + vrf_id=0xFFFFFFFF, + is_add=1, + ) self.pg0.generate_remote_hosts(x) @@ -3750,11 +4020,12 @@ class TestNAT44EI(MethodHolder): for i in range(x): info = self.create_packet_info(self.pg0, self.pg1) payload = self.info_to_payload(info) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_hosts[i].ip4, - dst=self.pg1.remote_ip4) / - UDP(sport=7000+i, dport=8000+i) / - Raw(payload)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_hosts[i].ip4, dst=self.pg1.remote_ip4) + / UDP(sport=7000 + i, dport=8000 + i) + / Raw(payload) + ) info.data = p pkts.append(p) @@ -3772,21 +4043,23 @@ class TestNAT44EI(MethodHolder): packed = socket.inet_aton(p_sent[IP].src) numeric = struct.unpack("!L", packed)[0] numeric = socket.htonl(numeric) - a = nat_addresses[(numeric-1) % len(nat_addresses)] + a = nat_addresses[(numeric - 1) % len(nat_addresses)] self.assertEqual( - a, p_recvd[IP].src, + a, + p_recvd[IP].src, "Invalid packet (src IP %s translated to %s, but expected %s)" - % (p_sent[IP].src, p_recvd[IP].src, a)) + % (p_sent[IP].src, p_recvd[IP].src, a), + ) def test_default_user_sessions(self): - """ NAT44EI default per-user session limit is used and reported """ + """NAT44EI default per-user session limit is used and reported""" nat44_ei_config = self.vapi.nat44_ei_show_running_config() # a nonzero default should be reported for user_sessions self.assertNotEqual(nat44_ei_config.user_sessions, 0) class TestNAT44Out2InDPO(MethodHolder): - """ NAT44EI Test Cases using out2in DPO """ + """NAT44EI Test Cases using out2in DPO""" @classmethod def setUpClass(cls): @@ -3799,8 +4072,8 @@ class TestNAT44Out2InDPO(MethodHolder): cls.udp_port_out = 6304 cls.icmp_id_in = 6305 cls.icmp_id_out = 6305 - cls.nat_addr = '10.0.0.3' - cls.dst_ip4 = '192.168.70.1' + cls.nat_addr = "10.0.0.3" + cls.dst_ip4 = "192.168.70.1" cls.create_pg_interfaces(range(2)) @@ -3812,10 +4085,13 @@ class TestNAT44Out2InDPO(MethodHolder): cls.pg1.config_ip6() cls.pg1.resolve_ndp() - r1 = VppIpRoute(cls, "::", 0, - [VppRoutePath(cls.pg1.remote_ip6, - cls.pg1.sw_if_index)], - register=False) + r1 = VppIpRoute( + cls, + "::", + 0, + [VppRoutePath(cls.pg1.remote_ip6, cls.pg1.sw_if_index)], + register=False, + ) r1.add_vpp_config() def setUp(self): @@ -3830,37 +4106,44 @@ class TestNAT44Out2InDPO(MethodHolder): self.vapi.cli("clear logging") def configure_xlat(self): - self.dst_ip6_pfx = '1:2:3::' - self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, - self.dst_ip6_pfx) + self.dst_ip6_pfx = "1:2:3::" + self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, self.dst_ip6_pfx) self.dst_ip6_pfx_len = 96 - self.src_ip6_pfx = '4:5:6::' - self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, - self.src_ip6_pfx) + self.src_ip6_pfx = "4:5:6::" + self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, self.src_ip6_pfx) self.src_ip6_pfx_len = 96 - self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len, - self.src_ip6_pfx_n, self.src_ip6_pfx_len, - '\x00\x00\x00\x00', 0) - - @unittest.skip('Temporary disabled') + self.vapi.map_add_domain( + self.dst_ip6_pfx_n, + self.dst_ip6_pfx_len, + self.src_ip6_pfx_n, + self.src_ip6_pfx_len, + "\x00\x00\x00\x00", + 0, + ) + + @unittest.skip("Temporary disabled") def test_464xlat_ce(self): - """ Test 464XLAT CE with NAT44EI """ + """Test 464XLAT CE with NAT44EI""" self.configure_xlat() flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_add_del_address_range( first_ip_address=self.nat_addr_n, last_ip_address=self.nat_addr_n, - vrf_id=0xFFFFFFFF, is_add=1) + vrf_id=0xFFFFFFFF, + is_add=1, + ) - out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx, - self.dst_ip6_pfx_len) - out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx, - self.src_ip6_pfx_len) + out_src_ip6 = self.compose_ip6( + self.dst_ip4, self.dst_ip6_pfx, self.dst_ip6_pfx_len + ) + out_dst_ip6 = self.compose_ip6( + self.nat_addr, self.src_ip6_pfx, self.src_ip6_pfx_len + ) try: pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4) @@ -3868,11 +4151,9 @@ class TestNAT44Out2InDPO(MethodHolder): self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) - self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6, - dst_ip=out_src_ip6) + self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6, dst_ip=out_src_ip6) - pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, - out_dst_ip6) + pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -3880,31 +4161,35 @@ class TestNAT44Out2InDPO(MethodHolder): self.verify_capture_in(capture, self.pg0) finally: self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags) + sw_if_index=self.pg0.sw_if_index, flags=flags + ) self.vapi.nat44_ei_add_del_address_range( first_ip_address=self.nat_addr_n, last_ip_address=self.nat_addr_n, - vrf_id=0xFFFFFFFF) + vrf_id=0xFFFFFFFF, + ) - @unittest.skip('Temporary disabled') + @unittest.skip("Temporary disabled") def test_464xlat_ce_no_nat(self): - """ Test 464XLAT CE without NAT44EI """ + """Test 464XLAT CE without NAT44EI""" self.configure_xlat() - out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx, - self.dst_ip6_pfx_len) - out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx, - self.src_ip6_pfx_len) + out_src_ip6 = self.compose_ip6( + self.dst_ip4, self.dst_ip6_pfx, self.dst_ip6_pfx_len + ) + out_dst_ip6 = self.compose_ip6( + self.pg0.remote_ip4, self.src_ip6_pfx, self.src_ip6_pfx_len + ) pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) - self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6, - nat_ip=out_dst_ip6, same_port=True) + self.verify_capture_out_ip6( + capture, dst_ip=out_src_ip6, nat_ip=out_dst_ip6, same_port=True + ) pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6) self.pg1.add_stream(pkts) @@ -3915,7 +4200,8 @@ class TestNAT44Out2InDPO(MethodHolder): class TestNAT44EIMW(MethodHolder): - """ NAT44EI Test Cases (multiple workers) """ + """NAT44EI Test Cases (multiple workers)""" + vpp_worker_count = 2 max_translations = 10240 max_users = 10240 @@ -3931,7 +4217,7 @@ class TestNAT44EIMW(MethodHolder): cls.udp_port_out = 6304 cls.icmp_id_in = 6305 cls.icmp_id_out = 6305 - cls.nat_addr = '10.0.0.3' + cls.nat_addr = "10.0.0.3" cls.ipfix_src_port = 4739 cls.ipfix_domain_id = 1 cls.tcp_external_port = 80 @@ -3952,8 +4238,8 @@ class TestNAT44EIMW(MethodHolder): cls.pg1.configure_ipv4_neighbors() cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7])) - cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 10}) - cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 20}) + cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 10}) + cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 20}) cls.pg4._local_ip4 = "172.16.255.1" cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2" @@ -3975,8 +4261,8 @@ class TestNAT44EIMW(MethodHolder): cls.pg9.generate_remote_hosts(2) cls.pg9.config_ip4() cls.vapi.sw_interface_add_del_address( - sw_if_index=cls.pg9.sw_if_index, - prefix="10.0.0.1/24") + sw_if_index=cls.pg9.sw_if_index, prefix="10.0.0.1/24" + ) cls.pg9.admin_up() cls.pg9.resolve_arp() @@ -3987,16 +4273,15 @@ class TestNAT44EIMW(MethodHolder): def setUp(self): super(TestNAT44EIMW, self).setUp() self.vapi.nat44_ei_plugin_enable_disable( - sessions=self.max_translations, - users=self.max_users, enable=1) + sessions=self.max_translations, users=self.max_users, enable=1 + ) def tearDown(self): super(TestNAT44EIMW, self).tearDown() if not self.vpp_dead: self.vapi.nat44_ei_ipfix_enable_disable( - domain_id=self.ipfix_domain_id, - src_port=self.ipfix_src_port, - enable=0) + domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0 + ) self.ipfix_src_port = 4739 self.ipfix_domain_id = 1 @@ -4004,7 +4289,7 @@ class TestNAT44EIMW(MethodHolder): self.vapi.cli("clear logging") def test_hairpinning(self): - """ NAT44EI hairpinning - 1:1 NAPT """ + """NAT44EI hairpinning - 1:1 NAPT""" host = self.pg0.remote_hosts[0] server = self.pg0.remote_hosts[1] @@ -4018,22 +4303,28 @@ class TestNAT44EIMW(MethodHolder): self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # add static mapping for server - self.nat44_add_static_mapping(server.ip4, self.nat_addr, - server_in_port, server_out_port, - proto=IP_PROTOS.tcp) - - cnt = self.statistics['/nat44-ei/hairpinning'] + self.nat44_add_static_mapping( + server.ip4, + self.nat_addr, + server_in_port, + server_out_port, + proto=IP_PROTOS.tcp, + ) + + cnt = self.statistics["/nat44-ei/hairpinning"] # send packet from host to server - p = (Ether(src=host.mac, dst=self.pg0.local_mac) / - IP(src=host.ip4, dst=self.nat_addr) / - TCP(sport=host_in_port, dport=server_out_port)) + p = ( + Ether(src=host.mac, dst=self.pg0.local_mac) + / IP(src=host.ip4, dst=self.nat_addr) + / TCP(sport=host_in_port, dport=server_out_port) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -4052,15 +4343,17 @@ class TestNAT44EIMW(MethodHolder): self.logger.error(ppp("Unexpected or invalid packet:", p)) raise - after = self.statistics['/nat44-ei/hairpinning'] + after = self.statistics["/nat44-ei/hairpinning"] if_idx = self.pg0.sw_if_index self.assertEqual(after[worker_2][if_idx] - cnt[worker_1][if_idx], 1) # send reply from server to host - p = (Ether(src=server.mac, dst=self.pg0.local_mac) / - IP(src=server.ip4, dst=self.nat_addr) / - TCP(sport=server_in_port, dport=host_out_port)) + p = ( + Ether(src=server.mac, dst=self.pg0.local_mac) + / IP(src=server.ip4, dst=self.nat_addr) + / TCP(sport=server_in_port, dport=host_out_port) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -4078,13 +4371,13 @@ class TestNAT44EIMW(MethodHolder): self.logger.error(ppp("Unexpected or invalid packet:", p)) raise - after = self.statistics['/nat44-ei/hairpinning'] + after = self.statistics["/nat44-ei/hairpinning"] if_idx = self.pg0.sw_if_index self.assertEqual(after[worker_1][if_idx] - cnt[worker_1][if_idx], 1) self.assertEqual(after[worker_2][if_idx] - cnt[worker_2][if_idx], 2) def test_hairpinning2(self): - """ NAT44EI hairpinning - 1:1 NAT""" + """NAT44EI hairpinning - 1:1 NAT""" server1_nat_ip = "10.0.0.10" server2_nat_ip = "10.0.0.11" @@ -4097,11 +4390,11 @@ class TestNAT44EIMW(MethodHolder): self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # add static mapping for servers self.nat44_add_static_mapping(server1.ip4, server1_nat_ip) @@ -4109,17 +4402,23 @@ class TestNAT44EIMW(MethodHolder): # host to server1 pkts = [] - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=host.ip4, dst=server1_nat_ip) / - TCP(sport=self.tcp_port_in, dport=server_tcp_port)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=host.ip4, dst=server1_nat_ip) + / TCP(sport=self.tcp_port_in, dport=server_tcp_port) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=host.ip4, dst=server1_nat_ip) / - UDP(sport=self.udp_port_in, dport=server_udp_port)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=host.ip4, dst=server1_nat_ip) + / UDP(sport=self.udp_port_in, dport=server_udp_port) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=host.ip4, dst=server1_nat_ip) / - ICMP(id=self.icmp_id_in, type='echo-request')) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=host.ip4, dst=server1_nat_ip) + / ICMP(id=self.icmp_id_in, type="echo-request") + ) pkts.append(p) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) @@ -4147,17 +4446,23 @@ class TestNAT44EIMW(MethodHolder): # server1 to host pkts = [] - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server1.ip4, dst=self.nat_addr) / - TCP(sport=server_tcp_port, dport=self.tcp_port_out)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server1.ip4, dst=self.nat_addr) + / TCP(sport=server_tcp_port, dport=self.tcp_port_out) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server1.ip4, dst=self.nat_addr) / - UDP(sport=server_udp_port, dport=self.udp_port_out)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server1.ip4, dst=self.nat_addr) + / UDP(sport=server_udp_port, dport=self.udp_port_out) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server1.ip4, dst=self.nat_addr) / - ICMP(id=self.icmp_id_out, type='echo-reply')) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server1.ip4, dst=self.nat_addr) + / ICMP(id=self.icmp_id_out, type="echo-reply") + ) pkts.append(p) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) @@ -4182,17 +4487,23 @@ class TestNAT44EIMW(MethodHolder): # server2 to server1 pkts = [] - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server2.ip4, dst=server1_nat_ip) / - TCP(sport=self.tcp_port_in, dport=server_tcp_port)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server2.ip4, dst=server1_nat_ip) + / TCP(sport=self.tcp_port_in, dport=server_tcp_port) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server2.ip4, dst=server1_nat_ip) / - UDP(sport=self.udp_port_in, dport=server_udp_port)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server2.ip4, dst=server1_nat_ip) + / UDP(sport=self.udp_port_in, dport=server_udp_port) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server2.ip4, dst=server1_nat_ip) / - ICMP(id=self.icmp_id_in, type='echo-request')) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server2.ip4, dst=server1_nat_ip) + / ICMP(id=self.icmp_id_in, type="echo-request") + ) pkts.append(p) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) @@ -4220,17 +4531,23 @@ class TestNAT44EIMW(MethodHolder): # server1 to server2 pkts = [] - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server1.ip4, dst=server2_nat_ip) / - TCP(sport=server_tcp_port, dport=self.tcp_port_out)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server1.ip4, dst=server2_nat_ip) + / TCP(sport=server_tcp_port, dport=self.tcp_port_out) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server1.ip4, dst=server2_nat_ip) / - UDP(sport=server_udp_port, dport=self.udp_port_out)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server1.ip4, dst=server2_nat_ip) + / UDP(sport=server_udp_port, dport=self.udp_port_out) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server1.ip4, dst=server2_nat_ip) / - ICMP(id=self.icmp_id_out, type='echo-reply')) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server1.ip4, dst=server2_nat_ip) + / ICMP(id=self.icmp_id_out, type="echo-reply") + ) pkts.append(p) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) @@ -4254,5 +4571,5 @@ class TestNAT44EIMW(MethodHolder): raise -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner) |