From d9b0c6fbf7aa5bd9af84264105b39c82028a4a29 Mon Sep 17 00:00:00 2001 From: Klement Sekera Date: Tue, 26 Apr 2022 19:02:15 +0200 Subject: tests: replace pycodestyle with black Drop pycodestyle for code style checking in favor of black. Black is much faster, stable PEP8 compliant code style checker offering also automatic formatting. It aims to be very stable and produce smallest diffs. It's used by many small and big projects. Running checkstyle with black takes a few seconds with a terse output. Thus, test-checkstyle-diff is no longer necessary. Expand scope of checkstyle to all python files in the repo, replacing test-checkstyle with checkstyle-python. Also, fixstyle-python is now available for automatic style formatting. Note: python virtualenv has been consolidated in test/Makefile, test/requirements*.txt which will eventually be moved to a central location. This is required to simply the automated generation of docker executor images in the CI. Type: improvement Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8 Signed-off-by: Klement Sekera Signed-off-by: Dave Wallace --- test/test_nat44_ei.py | 2521 ++++++++++++++++++++++++++++--------------------- 1 file changed, 1419 insertions(+), 1102 deletions(-) (limited to 'test/test_nat44_ei.py') diff --git a/test/test_nat44_ei.py b/test/test_nat44_ei.py index aafd345f43f..9eb127aaf0b 100644 --- a/test/test_nat44_ei.py +++ b/test/test_nat44_ei.py @@ -10,9 +10,19 @@ from io import BytesIO import scapy.compat from framework import VppTestCase, VppTestRunner from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder -from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \ - IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \ - PacketListField +from scapy.all import ( + bind_layers, + Packet, + ByteEnumField, + ShortField, + IPField, + IntField, + LongField, + XByteField, + FlagsField, + FieldLenField, + PacketListField, +) from scapy.data import IP_PROTOS from scapy.layers.inet import IP, TCP, UDP, ICMP from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror @@ -30,22 +40,22 @@ from vpp_papi import VppEnum # NAT HA protocol event data class Event(Packet): name = "Event" - fields_desc = [ByteEnumField("event_type", None, - {1: "add", 2: "del", 3: "refresh"}), - ByteEnumField("protocol", None, - {0: "other", 1: "udp", 2: "tcp", 3: "icmp"}), - ShortField("flags", 0), - IPField("in_addr", None), - IPField("out_addr", None), - ShortField("in_port", None), - ShortField("out_port", None), - IPField("eh_addr", None), - IPField("ehn_addr", None), - ShortField("eh_port", None), - ShortField("ehn_port", None), - IntField("fib_index", None), - IntField("total_pkts", 0), - LongField("total_bytes", 0)] + fields_desc = [ + ByteEnumField("event_type", None, {1: "add", 2: "del", 3: "refresh"}), + ByteEnumField("protocol", None, {0: "other", 1: "udp", 2: "tcp", 3: "icmp"}), + ShortField("flags", 0), + IPField("in_addr", None), + IPField("out_addr", None), + ShortField("in_port", None), + ShortField("out_port", None), + IPField("eh_addr", None), + IPField("ehn_addr", None), + ShortField("eh_port", None), + ShortField("ehn_port", None), + IntField("fib_index", None), + IntField("total_pkts", 0), + LongField("total_bytes", 0), + ] def extract_padding(self, s): return "", s @@ -54,17 +64,18 @@ class Event(Packet): # NAT HA protocol header class HANATStateSync(Packet): name = "HA NAT state sync" - fields_desc = [XByteField("version", 1), - FlagsField("flags", 0, 8, ['ACK']), - FieldLenField("count", None, count_of="events"), - IntField("sequence_number", 1), - IntField("thread_index", 0), - PacketListField("events", [], Event, - count_from=lambda pkt: pkt.count)] + fields_desc = [ + XByteField("version", 1), + FlagsField("flags", 0, 8, ["ACK"]), + FieldLenField("count", None, count_of="events"), + IntField("sequence_number", 1), + IntField("thread_index", 0), + PacketListField("events", [], Event, count_from=lambda pkt: pkt.count), + ] class MethodHolder(VppTestCase): - """ NAT create capture and verify method holder """ + """NAT create capture and verify method holder""" @property def config_flags(self): @@ -74,10 +85,19 @@ class MethodHolder(VppTestCase): def SYSLOG_SEVERITY(self): return VppEnum.vl_api_syslog_severity_t - def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0', - local_port=0, external_port=0, vrf_id=0, - is_add=1, external_sw_if_index=0xFFFFFFFF, - proto=0, tag="", flags=0): + def nat44_add_static_mapping( + self, + local_ip, + external_ip="0.0.0.0", + local_port=0, + external_port=0, + vrf_id=0, + is_add=1, + external_sw_if_index=0xFFFFFFFF, + proto=0, + tag="", + flags=0, + ): """ Add/delete NAT44EI static mapping @@ -103,9 +123,11 @@ class MethodHolder(VppTestCase): external_sw_if_index=external_sw_if_index, local_port=local_port, external_port=external_port, - vrf_id=vrf_id, protocol=proto, + vrf_id=vrf_id, + protocol=proto, flags=flags, - tag=tag) + tag=tag, + ) def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF): """ @@ -114,31 +136,40 @@ class MethodHolder(VppTestCase): :param ip: IP address :param is_add: 1 if add, 0 if delete (Default add) """ - self.vapi.nat44_ei_add_del_address_range(first_ip_address=ip, - last_ip_address=ip, - vrf_id=vrf_id, - is_add=is_add) + self.vapi.nat44_ei_add_del_address_range( + first_ip_address=ip, last_ip_address=ip, vrf_id=vrf_id, is_add=is_add + ) def create_routes_and_neigbors(self): - r1 = VppIpRoute(self, self.pg7.remote_ip4, 32, - [VppRoutePath(self.pg7.remote_ip4, - self.pg7.sw_if_index)]) - r2 = VppIpRoute(self, self.pg8.remote_ip4, 32, - [VppRoutePath(self.pg8.remote_ip4, - self.pg8.sw_if_index)]) + r1 = VppIpRoute( + self, + self.pg7.remote_ip4, + 32, + [VppRoutePath(self.pg7.remote_ip4, self.pg7.sw_if_index)], + ) + r2 = VppIpRoute( + self, + self.pg8.remote_ip4, + 32, + [VppRoutePath(self.pg8.remote_ip4, self.pg8.sw_if_index)], + ) r1.add_vpp_config() r2.add_vpp_config() - n1 = VppNeighbor(self, - self.pg7.sw_if_index, - self.pg7.remote_mac, - self.pg7.remote_ip4, - is_static=1) - n2 = VppNeighbor(self, - self.pg8.sw_if_index, - self.pg8.remote_mac, - self.pg8.remote_ip4, - is_static=1) + n1 = VppNeighbor( + self, + self.pg7.sw_if_index, + self.pg7.remote_mac, + self.pg7.remote_ip4, + is_static=1, + ) + n2 = VppNeighbor( + self, + self.pg8.sw_if_index, + self.pg8.remote_mac, + self.pg8.remote_ip4, + is_static=1, + ) n1.add_vpp_config() n2.add_vpp_config() @@ -156,21 +187,27 @@ class MethodHolder(VppTestCase): pkts = [] # TCP - p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / - IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) / - TCP(sport=self.tcp_port_in, dport=20)) + p = ( + Ether(dst=in_if.local_mac, src=in_if.remote_mac) + / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) + / TCP(sport=self.tcp_port_in, dport=20) + ) pkts.extend([p, p]) # UDP - p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / - IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) / - UDP(sport=self.udp_port_in, dport=20)) + p = ( + Ether(dst=in_if.local_mac, src=in_if.remote_mac) + / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) + / UDP(sport=self.udp_port_in, dport=20) + ) pkts.append(p) # ICMP - p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / - IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) / - ICMP(id=self.icmp_id_in, type='echo-request')) + p = ( + Ether(dst=in_if.local_mac, src=in_if.remote_mac) + / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) + / ICMP(id=self.icmp_id_in, type="echo-request") + ) pkts.append(p) return pkts @@ -216,11 +253,10 @@ class MethodHolder(VppTestCase): pref_n[13] = ip4_n[1] pref_n[14] = ip4_n[2] pref_n[15] = ip4_n[3] - packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n]) + packed_pref_n = b"".join([scapy.compat.chb(x) for x in pref_n]) return socket.inet_ntop(socket.AF_INET6, packed_pref_n) - def create_stream_out(self, out_if, dst_ip=None, ttl=64, - use_inside_ports=False): + def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False): """ Create packet stream for outside network @@ -242,21 +278,27 @@ class MethodHolder(VppTestCase): icmp_id = self.icmp_id_in pkts = [] # TCP - p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / - IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / - TCP(dport=tcp_port, sport=20)) + p = ( + Ether(dst=out_if.local_mac, src=out_if.remote_mac) + / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) + / TCP(dport=tcp_port, sport=20) + ) pkts.extend([p, p]) # UDP - p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / - IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / - UDP(dport=udp_port, sport=20)) + p = ( + Ether(dst=out_if.local_mac, src=out_if.remote_mac) + / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) + / UDP(dport=udp_port, sport=20) + ) pkts.append(p) # ICMP - p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / - IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / - ICMP(id=icmp_id, type='echo-reply')) + p = ( + Ether(dst=out_if.local_mac, src=out_if.remote_mac) + / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) + / ICMP(id=icmp_id, type="echo-reply") + ) pkts.append(p) return pkts @@ -271,27 +313,40 @@ class MethodHolder(VppTestCase): """ pkts = [] # TCP - p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / - IPv6(src=src_ip, dst=dst_ip, hlim=hl) / - TCP(dport=self.tcp_port_out, sport=20)) + p = ( + Ether(dst=out_if.local_mac, src=out_if.remote_mac) + / IPv6(src=src_ip, dst=dst_ip, hlim=hl) + / TCP(dport=self.tcp_port_out, sport=20) + ) pkts.append(p) # UDP - p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / - IPv6(src=src_ip, dst=dst_ip, hlim=hl) / - UDP(dport=self.udp_port_out, sport=20)) + p = ( + Ether(dst=out_if.local_mac, src=out_if.remote_mac) + / IPv6(src=src_ip, dst=dst_ip, hlim=hl) + / UDP(dport=self.udp_port_out, sport=20) + ) pkts.append(p) # ICMP - p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / - IPv6(src=src_ip, dst=dst_ip, hlim=hl) / - ICMPv6EchoReply(id=self.icmp_id_out)) + p = ( + Ether(dst=out_if.local_mac, src=out_if.remote_mac) + / IPv6(src=src_ip, dst=dst_ip, hlim=hl) + / ICMPv6EchoReply(id=self.icmp_id_out) + ) pkts.append(p) return pkts - def verify_capture_out(self, capture, nat_ip=None, same_port=False, - dst_ip=None, is_ip6=False, ignore_port=False): + def verify_capture_out( + self, + capture, + nat_ip=None, + same_port=False, + dst_ip=None, + is_ip6=False, + ignore_port=False, + ): """ Verify captured packets on outside network @@ -319,39 +374,33 @@ class MethodHolder(VppTestCase): if packet.haslayer(TCP): if not ignore_port: if same_port: - self.assertEqual( - packet[TCP].sport, self.tcp_port_in) + self.assertEqual(packet[TCP].sport, self.tcp_port_in) else: - self.assertNotEqual( - packet[TCP].sport, self.tcp_port_in) + self.assertNotEqual(packet[TCP].sport, self.tcp_port_in) self.tcp_port_out = packet[TCP].sport self.assert_packet_checksums_valid(packet) elif packet.haslayer(UDP): if not ignore_port: if same_port: - self.assertEqual( - packet[UDP].sport, self.udp_port_in) + self.assertEqual(packet[UDP].sport, self.udp_port_in) else: - self.assertNotEqual( - packet[UDP].sport, self.udp_port_in) + self.assertNotEqual(packet[UDP].sport, self.udp_port_in) self.udp_port_out = packet[UDP].sport else: if not ignore_port: if same_port: - self.assertEqual( - packet[ICMP46].id, self.icmp_id_in) + self.assertEqual(packet[ICMP46].id, self.icmp_id_in) else: - self.assertNotEqual( - packet[ICMP46].id, self.icmp_id_in) + self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in) self.icmp_id_out = packet[ICMP46].id self.assert_packet_checksums_valid(packet) except: - self.logger.error(ppp("Unexpected or invalid packet " - "(outside network):", packet)) + self.logger.error( + ppp("Unexpected or invalid packet (outside network):", packet) + ) raise - def verify_capture_out_ip6(self, capture, nat_ip, same_port=False, - dst_ip=None): + def verify_capture_out_ip6(self, capture, nat_ip, same_port=False, dst_ip=None): """ Verify captured packets on outside network @@ -360,8 +409,7 @@ class MethodHolder(VppTestCase): :param same_port: Source port number is not translated (Default False) :param dst_ip: Destination IP address (Default do not verify) """ - return self.verify_capture_out(capture, nat_ip, same_port, dst_ip, - True) + return self.verify_capture_out(capture, nat_ip, same_port, dst_ip, True) def verify_capture_in(self, capture, in_if): """ @@ -381,8 +429,9 @@ class MethodHolder(VppTestCase): else: self.assertEqual(packet[ICMP].id, self.icmp_id_in) except: - self.logger.error(ppp("Unexpected or invalid packet " - "(inside network):", packet)) + self.logger.error( + ppp("Unexpected or invalid packet (inside network):", packet) + ) raise def verify_capture_no_translation(self, capture, ingress_if, egress_if): @@ -404,12 +453,12 @@ class MethodHolder(VppTestCase): else: self.assertEqual(packet[ICMP].id, self.icmp_id_in) except: - self.logger.error(ppp("Unexpected or invalid packet " - "(inside network):", packet)) + self.logger.error( + ppp("Unexpected or invalid packet (inside network):", packet) + ) raise - def verify_capture_out_with_icmp_errors(self, capture, src_ip=None, - icmp_type=11): + def verify_capture_out_with_icmp_errors(self, capture, src_ip=None, icmp_type=11): """ Verify captured packets with ICMP errors on outside network @@ -430,16 +479,15 @@ class MethodHolder(VppTestCase): self.assertTrue(icmp.haslayer(IPerror)) inner_ip = icmp[IPerror] if inner_ip.haslayer(TCPerror): - self.assertEqual(inner_ip[TCPerror].dport, - self.tcp_port_out) + self.assertEqual(inner_ip[TCPerror].dport, self.tcp_port_out) elif inner_ip.haslayer(UDPerror): - self.assertEqual(inner_ip[UDPerror].dport, - self.udp_port_out) + self.assertEqual(inner_ip[UDPerror].dport, self.udp_port_out) else: self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out) except: - self.logger.error(ppp("Unexpected or invalid packet " - "(outside network):", packet)) + self.logger.error( + ppp("Unexpected or invalid packet (outside network):", packet) + ) raise def verify_capture_in_with_icmp_errors(self, capture, in_if, icmp_type=11): @@ -460,20 +508,20 @@ class MethodHolder(VppTestCase): self.assertTrue(icmp.haslayer(IPerror)) inner_ip = icmp[IPerror] if inner_ip.haslayer(TCPerror): - self.assertEqual(inner_ip[TCPerror].sport, - self.tcp_port_in) + self.assertEqual(inner_ip[TCPerror].sport, self.tcp_port_in) elif inner_ip.haslayer(UDPerror): - self.assertEqual(inner_ip[UDPerror].sport, - self.udp_port_in) + self.assertEqual(inner_ip[UDPerror].sport, self.udp_port_in) else: self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in) except: - self.logger.error(ppp("Unexpected or invalid packet " - "(inside network):", packet)) + self.logger.error( + ppp("Unexpected or invalid packet (inside network):", packet) + ) raise - def create_stream_frag(self, src_if, dst, sport, dport, data, - proto=IP_PROTOS.tcp, echo_reply=False): + def create_stream_frag( + self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False + ): """ Create fragmented packet stream @@ -487,9 +535,11 @@ class MethodHolder(VppTestCase): :returns: Fragments """ if proto == IP_PROTOS.tcp: - p = (IP(src=src_if.remote_ip4, dst=dst) / - TCP(sport=sport, dport=dport) / - Raw(data)) + p = ( + IP(src=src_if.remote_ip4, dst=dst) + / TCP(sport=sport, dport=dport) + / Raw(data) + ) p = p.__class__(scapy.compat.raw(p)) chksum = p[TCP].chksum proto_header = TCP(sport=sport, dport=dport, chksum=chksum) @@ -497,9 +547,9 @@ class MethodHolder(VppTestCase): proto_header = UDP(sport=sport, dport=dport) elif proto == IP_PROTOS.icmp: if not echo_reply: - proto_header = ICMP(id=sport, type='echo-request') + proto_header = ICMP(id=sport, type="echo-request") else: - proto_header = ICMP(id=sport, type='echo-reply') + proto_header = ICMP(id=sport, type="echo-reply") else: raise Exception("Unsupported protocol") id = random.randint(0, 65535) @@ -508,28 +558,32 @@ class MethodHolder(VppTestCase): raw = Raw(data[0:4]) else: raw = Raw(data[0:16]) - p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) / - IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) / - proto_header / - raw) + p = ( + Ether(src=src_if.remote_mac, dst=src_if.local_mac) + / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) + / proto_header + / raw + ) pkts.append(p) if proto == IP_PROTOS.tcp: raw = Raw(data[4:20]) else: raw = Raw(data[16:32]) - p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) / - IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, - proto=proto) / - raw) + p = ( + Ether(src=src_if.remote_mac, dst=src_if.local_mac) + / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto) + / raw + ) pkts.append(p) if proto == IP_PROTOS.tcp: raw = Raw(data[20:]) else: raw = Raw(data[32:]) - p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) / - IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, - id=id) / - raw) + p = ( + Ether(src=src_if.remote_mac, dst=src_if.local_mac) + / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id) + / raw + ) pkts.append(p) return pkts @@ -550,17 +604,15 @@ class MethodHolder(VppTestCase): self.assert_ip_checksum_valid(p) buffer.seek(p[IP].frag * 8) buffer.write(bytes(p[IP].payload)) - ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, - proto=frags[0][IP].proto) + ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto) if ip.proto == IP_PROTOS.tcp: - p = (ip / TCP(buffer.getvalue())) + p = ip / TCP(buffer.getvalue()) self.logger.debug(ppp("Reassembled:", p)) self.assert_tcp_checksum_valid(p) elif ip.proto == IP_PROTOS.udp: - p = (ip / UDP(buffer.getvalue()[:8]) / - Raw(buffer.getvalue()[8:])) + p = ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:]) elif ip.proto == IP_PROTOS.icmp: - p = (ip / ICMP(buffer.getvalue())) + p = ip / ICMP(buffer.getvalue()) return p def verify_ipfix_nat44_ses(self, data): @@ -580,29 +632,24 @@ class MethodHolder(VppTestCase): else: nat44_ses_delete_num += 1 # sourceIPv4Address - self.assertEqual(self.pg0.remote_ip4, - str(ipaddress.IPv4Address(record[8]))) + self.assertEqual(self.pg0.remote_ip4, str(ipaddress.IPv4Address(record[8]))) # postNATSourceIPv4Address - self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr), - record[225]) + self.assertEqual( + socket.inet_pton(socket.AF_INET, self.nat_addr), record[225] + ) # ingressVRFID self.assertEqual(struct.pack("!I", 0), record[234]) # protocolIdentifier/sourceTransportPort # /postNAPTSourceTransportPort if IP_PROTOS.icmp == scapy.compat.orb(record[4]): self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7]) - self.assertEqual(struct.pack("!H", self.icmp_id_out), - record[227]) + self.assertEqual(struct.pack("!H", self.icmp_id_out), record[227]) elif IP_PROTOS.tcp == scapy.compat.orb(record[4]): - self.assertEqual(struct.pack("!H", self.tcp_port_in), - record[7]) - self.assertEqual(struct.pack("!H", self.tcp_port_out), - record[227]) + self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7]) + self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227]) elif IP_PROTOS.udp == scapy.compat.orb(record[4]): - self.assertEqual(struct.pack("!H", self.udp_port_in), - record[7]) - self.assertEqual(struct.pack("!H", self.udp_port_out), - record[227]) + self.assertEqual(struct.pack("!H", self.udp_port_in), record[7]) + self.assertEqual(struct.pack("!H", self.udp_port_out), record[227]) else: self.fail(f"Invalid protocol {scapy.compat.orb(record[4])}") self.assertEqual(3, nat44_ses_create_num) @@ -627,16 +674,16 @@ class MethodHolder(VppTestCase): self.assertEqual(struct.pack("!I", limit), record[471]) def verify_no_nat44_user(self): - """ Verify that there is no NAT44EI user """ + """Verify that there is no NAT44EI user""" users = self.vapi.nat44_ei_user_dump() self.assertEqual(len(users), 0) - users = self.statistics['/nat44-ei/total-users'] + users = self.statistics["/nat44-ei/total-users"] self.assertEqual(users[0][0], 0) - sessions = self.statistics['/nat44-ei/total-sessions'] + sessions = self.statistics["/nat44-ei/total-sessions"] self.assertEqual(sessions[0][0], 0) def verify_syslog_apmap(self, data, is_add=True): - message = data.decode('utf-8') + message = data.decode("utf-8") try: message = SyslogMessage.parse(message) except ParseError as e: @@ -644,26 +691,26 @@ class MethodHolder(VppTestCase): raise else: self.assertEqual(message.severity, SyslogSeverity.info) - self.assertEqual(message.appname, 'NAT') - self.assertEqual(message.msgid, 'APMADD' if is_add else 'APMDEL') - sd_params = message.sd.get('napmap') + self.assertEqual(message.appname, "NAT") + self.assertEqual(message.msgid, "APMADD" if is_add else "APMDEL") + sd_params = message.sd.get("napmap") self.assertTrue(sd_params is not None) - self.assertEqual(sd_params.get('IATYP'), 'IPv4') - self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4) - self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in) - self.assertEqual(sd_params.get('XATYP'), 'IPv4') - self.assertEqual(sd_params.get('XSADDR'), self.nat_addr) - self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out) - self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp) - self.assertTrue(sd_params.get('SSUBIX') is not None) - self.assertEqual(sd_params.get('SVLAN'), '0') + self.assertEqual(sd_params.get("IATYP"), "IPv4") + self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip4) + self.assertEqual(sd_params.get("ISPORT"), "%d" % self.tcp_port_in) + self.assertEqual(sd_params.get("XATYP"), "IPv4") + self.assertEqual(sd_params.get("XSADDR"), self.nat_addr) + self.assertEqual(sd_params.get("XSPORT"), "%d" % self.tcp_port_out) + self.assertEqual(sd_params.get("PROTO"), "%d" % IP_PROTOS.tcp) + self.assertTrue(sd_params.get("SSUBIX") is not None) + self.assertEqual(sd_params.get("SVLAN"), "0") def verify_mss_value(self, pkt, mss): if not pkt.haslayer(IP) or not pkt.haslayer(TCP): raise TypeError("Not a TCP/IP packet") for option in pkt[TCP].options: - if option[0] == 'MSS': + if option[0] == "MSS": self.assertEqual(option[1], mss) self.assert_tcp_checksum_valid(pkt) @@ -678,8 +725,9 @@ class MethodHolder(VppTestCase): else: raise Exception("Unsupported protocol") - def frag_in_order(self, proto=IP_PROTOS.tcp, dont_translate=False, - ignore_port=False): + def frag_in_order( + self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False + ): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: @@ -689,20 +737,19 @@ class MethodHolder(VppTestCase): self.port_in = random.randint(1025, 65535) # in2out - pkts = self.create_stream_frag(self.pg0, self.pg1.remote_ip4, - self.port_in, 20, data, proto) + pkts = self.create_stream_frag( + self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto + ) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg1.get_capture(len(pkts)) if not dont_translate: - p = self.reass_frags_and_verify(frags, - self.nat_addr, - self.pg1.remote_ip4) + p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4) else: - p = self.reass_frags_and_verify(frags, - self.pg0.remote_ip4, - self.pg1.remote_ip4) + p = self.reass_frags_and_verify( + frags, self.pg0.remote_ip4, self.pg1.remote_ip4 + ) if proto != IP_PROTOS.icmp: if not dont_translate: self.assertEqual(p[layer].dport, 20) @@ -729,15 +776,14 @@ class MethodHolder(VppTestCase): else: sport = p[layer].id dport = 0 - pkts = self.create_stream_frag(self.pg1, dst_addr, sport, dport, data, - proto, echo_reply=True) + pkts = self.create_stream_frag( + self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True + ) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg0.get_capture(len(pkts)) - p = self.reass_frags_and_verify(frags, - self.pg1.remote_ip4, - self.pg0.remote_ip4) + p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4) if proto != IP_PROTOS.icmp: self.assertEqual(p[layer].sport, 20) self.assertEqual(p[layer].dport, self.port_in) @@ -745,9 +791,15 @@ class MethodHolder(VppTestCase): self.assertEqual(p[layer].id, self.port_in) self.assertEqual(data, p[Raw].load) - def reass_hairpinning(self, server_addr, server_in_port, server_out_port, - host_in_port, proto=IP_PROTOS.tcp, - ignore_port=False): + def reass_hairpinning( + self, + server_addr, + server_in_port, + server_out_port, + host_in_port, + proto=IP_PROTOS.tcp, + ignore_port=False, + ): layer = self.proto2layer(proto) @@ -757,19 +809,14 @@ class MethodHolder(VppTestCase): data = b"A" * 16 + b"B" * 16 + b"C" * 3 # send packet from host to server - pkts = self.create_stream_frag(self.pg0, - self.nat_addr, - host_in_port, - server_out_port, - data, - proto) + pkts = self.create_stream_frag( + self.pg0, self.nat_addr, host_in_port, server_out_port, data, proto + ) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg0.get_capture(len(pkts)) - p = self.reass_frags_and_verify(frags, - self.nat_addr, - server_addr) + p = self.reass_frags_and_verify(frags, self.nat_addr, server_addr) if proto != IP_PROTOS.icmp: if not ignore_port: self.assertNotEqual(p[layer].sport, host_in_port) @@ -779,8 +826,9 @@ class MethodHolder(VppTestCase): self.assertNotEqual(p[layer].id, host_in_port) self.assertEqual(data, p[Raw].load) - def frag_out_of_order(self, proto=IP_PROTOS.tcp, dont_translate=False, - ignore_port=False): + def frag_out_of_order( + self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False + ): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: @@ -791,21 +839,22 @@ class MethodHolder(VppTestCase): for i in range(2): # in2out - pkts = self.create_stream_frag(self.pg0, self.pg1.remote_ip4, - self.port_in, 20, data, proto) + pkts = self.create_stream_frag( + self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto + ) pkts.reverse() self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg1.get_capture(len(pkts)) if not dont_translate: - p = self.reass_frags_and_verify(frags, - self.nat_addr, - self.pg1.remote_ip4) + p = self.reass_frags_and_verify( + frags, self.nat_addr, self.pg1.remote_ip4 + ) else: - p = self.reass_frags_and_verify(frags, - self.pg0.remote_ip4, - self.pg1.remote_ip4) + p = self.reass_frags_and_verify( + frags, self.pg0.remote_ip4, self.pg1.remote_ip4 + ) if proto != IP_PROTOS.icmp: if not dont_translate: self.assertEqual(p[layer].dport, 20) @@ -832,16 +881,17 @@ class MethodHolder(VppTestCase): else: sport = p[layer].id dport = 0 - pkts = self.create_stream_frag(self.pg1, dst_addr, sport, dport, - data, proto, echo_reply=True) + pkts = self.create_stream_frag( + self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True + ) pkts.reverse() self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg0.get_capture(len(pkts)) - p = self.reass_frags_and_verify(frags, - self.pg1.remote_ip4, - self.pg0.remote_ip4) + p = self.reass_frags_and_verify( + frags, self.pg1.remote_ip4, self.pg0.remote_ip4 + ) if proto != IP_PROTOS.icmp: self.assertEqual(p[layer].sport, 20) self.assertEqual(p[layer].dport, self.port_in) @@ -861,7 +911,7 @@ def get_nat44_ei_in2out_worker_index(ip, vpp_worker_count): class TestNAT44EI(MethodHolder): - """ NAT44EI Test Cases """ + """NAT44EI Test Cases""" max_translations = 10240 max_users = 10240 @@ -877,7 +927,7 @@ class TestNAT44EI(MethodHolder): cls.udp_port_out = 6304 cls.icmp_id_in = 6305 cls.icmp_id_out = 6305 - cls.nat_addr = '10.0.0.3' + cls.nat_addr = "10.0.0.3" cls.ipfix_src_port = 4739 cls.ipfix_domain_id = 1 cls.tcp_external_port = 80 @@ -898,8 +948,8 @@ class TestNAT44EI(MethodHolder): cls.pg1.configure_ipv4_neighbors() cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7])) - cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 10}) - cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 20}) + cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 10}) + cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 20}) cls.pg4._local_ip4 = "172.16.255.1" cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2" @@ -921,8 +971,8 @@ class TestNAT44EI(MethodHolder): cls.pg9.generate_remote_hosts(2) cls.pg9.config_ip4() cls.vapi.sw_interface_add_del_address( - sw_if_index=cls.pg9.sw_if_index, - prefix="10.0.0.1/24") + sw_if_index=cls.pg9.sw_if_index, prefix="10.0.0.1/24" + ) cls.pg9.admin_up() cls.pg9.resolve_arp() @@ -932,8 +982,8 @@ class TestNAT44EI(MethodHolder): def plugin_enable(self): self.vapi.nat44_ei_plugin_enable_disable( - sessions=self.max_translations, - users=self.max_users, enable=1) + sessions=self.max_translations, users=self.max_users, enable=1 + ) def setUp(self): super(TestNAT44EI, self).setUp() @@ -943,8 +993,8 @@ class TestNAT44EI(MethodHolder): super(TestNAT44EI, self).tearDown() if not self.vpp_dead: self.vapi.nat44_ei_ipfix_enable_disable( - domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, - enable=0) + domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0 + ) self.ipfix_src_port = 4739 self.ipfix_domain_id = 1 @@ -952,16 +1002,16 @@ class TestNAT44EI(MethodHolder): self.vapi.cli("clear logging") def test_clear_sessions(self): - """ NAT44EI session clearing test """ + """NAT44EI session clearing test""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) pkts = self.create_stream_in(self.pg0, self.pg1) self.pg0.add_stream(pkts) @@ -970,32 +1020,32 @@ class TestNAT44EI(MethodHolder): capture = self.pg1.get_capture(len(pkts)) self.verify_capture_out(capture) - sessions = self.statistics['/nat44-ei/total-sessions'] + sessions = self.statistics["/nat44-ei/total-sessions"] self.assertGreater(sessions[:, 0].sum(), 0, "Session count invalid") self.logger.info("sessions before clearing: %s" % sessions[0][0]) self.vapi.cli("clear nat44 ei sessions") - sessions = self.statistics['/nat44-ei/total-sessions'] + sessions = self.statistics["/nat44-ei/total-sessions"] self.assertEqual(sessions[:, 0].sum(), 0, "Session count invalid") self.logger.info("sessions after clearing: %s" % sessions[0][0]) def test_dynamic(self): - """ NAT44EI dynamic translation test """ + """NAT44EI dynamic translation test""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # in2out - tcpn = self.statistics['/nat44-ei/in2out/slowpath/tcp'] - udpn = self.statistics['/nat44-ei/in2out/slowpath/udp'] - icmpn = self.statistics['/nat44-ei/in2out/slowpath/icmp'] - drops = self.statistics['/nat44-ei/in2out/slowpath/drops'] + tcpn = self.statistics["/nat44-ei/in2out/slowpath/tcp"] + udpn = self.statistics["/nat44-ei/in2out/slowpath/udp"] + icmpn = self.statistics["/nat44-ei/in2out/slowpath/icmp"] + drops = self.statistics["/nat44-ei/in2out/slowpath/drops"] pkts = self.create_stream_in(self.pg0, self.pg1) self.pg0.add_stream(pkts) @@ -1005,20 +1055,20 @@ class TestNAT44EI(MethodHolder): self.verify_capture_out(capture) if_idx = self.pg0.sw_if_index - cnt = self.statistics['/nat44-ei/in2out/slowpath/tcp'] + cnt = self.statistics["/nat44-ei/in2out/slowpath/tcp"] self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2) - cnt = self.statistics['/nat44-ei/in2out/slowpath/udp'] + cnt = self.statistics["/nat44-ei/in2out/slowpath/udp"] self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1) - cnt = self.statistics['/nat44-ei/in2out/slowpath/icmp'] + cnt = self.statistics["/nat44-ei/in2out/slowpath/icmp"] self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1) - cnt = self.statistics['/nat44-ei/in2out/slowpath/drops'] + cnt = self.statistics["/nat44-ei/in2out/slowpath/drops"] self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0) # out2in - tcpn = self.statistics['/nat44-ei/out2in/slowpath/tcp'] - udpn = self.statistics['/nat44-ei/out2in/slowpath/udp'] - icmpn = self.statistics['/nat44-ei/out2in/slowpath/icmp'] - drops = self.statistics['/nat44-ei/out2in/slowpath/drops'] + tcpn = self.statistics["/nat44-ei/out2in/slowpath/tcp"] + udpn = self.statistics["/nat44-ei/out2in/slowpath/udp"] + icmpn = self.statistics["/nat44-ei/out2in/slowpath/icmp"] + drops = self.statistics["/nat44-ei/out2in/slowpath/drops"] pkts = self.create_stream_out(self.pg1) self.pg1.add_stream(pkts) @@ -1028,31 +1078,31 @@ class TestNAT44EI(MethodHolder): self.verify_capture_in(capture, self.pg0) if_idx = self.pg1.sw_if_index - cnt = self.statistics['/nat44-ei/out2in/slowpath/tcp'] + cnt = self.statistics["/nat44-ei/out2in/slowpath/tcp"] self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2) - cnt = self.statistics['/nat44-ei/out2in/slowpath/udp'] + cnt = self.statistics["/nat44-ei/out2in/slowpath/udp"] self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1) - cnt = self.statistics['/nat44-ei/out2in/slowpath/icmp'] + cnt = self.statistics["/nat44-ei/out2in/slowpath/icmp"] self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1) - cnt = self.statistics['/nat44-ei/out2in/slowpath/drops'] + cnt = self.statistics["/nat44-ei/out2in/slowpath/drops"] self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0) - users = self.statistics['/nat44-ei/total-users'] + users = self.statistics["/nat44-ei/total-users"] self.assertEqual(users[:, 0].sum(), 1) - sessions = self.statistics['/nat44-ei/total-sessions'] + sessions = self.statistics["/nat44-ei/total-sessions"] self.assertEqual(sessions[:, 0].sum(), 3) def test_dynamic_icmp_errors_in2out_ttl_1(self): - """ NAT44EI handling of client packets with TTL=1 """ + """NAT44EI handling of client packets with TTL=1""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # Client side - generate traffic pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1) @@ -1062,16 +1112,16 @@ class TestNAT44EI(MethodHolder): self.verify_capture_in_with_icmp_errors(capture, self.pg0) def test_dynamic_icmp_errors_out2in_ttl_1(self): - """ NAT44EI handling of server packets with TTL=1 """ + """NAT44EI handling of server packets with TTL=1""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # Client side - create sessions pkts = self.create_stream_in(self.pg0, self.pg1) @@ -1086,21 +1136,19 @@ class TestNAT44EI(MethodHolder): capture = self.send_and_expect_some(self.pg1, pkts, self.pg1) # Server side - verify ICMP type 11 packets - self.verify_capture_out_with_icmp_errors(capture, - src_ip=self.pg1.local_ip4) + self.verify_capture_out_with_icmp_errors(capture, src_ip=self.pg1.local_ip4) def test_dynamic_icmp_errors_in2out_ttl_2(self): - """ NAT44EI handling of error responses to client packets with TTL=2 - """ + """NAT44EI handling of error responses to client packets with TTL=2""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # Client side - generate traffic pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2) @@ -1110,9 +1158,13 @@ class TestNAT44EI(MethodHolder): # Server side - simulate ICMP type 11 response capture = self.pg1.get_capture(len(pkts)) - pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - ICMP(type=11) / packet[IP] for packet in capture] + pkts = [ + Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / ICMP(type=11) + / packet[IP] + for packet in capture + ] self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1122,17 +1174,16 @@ class TestNAT44EI(MethodHolder): self.verify_capture_in_with_icmp_errors(capture, self.pg0) def test_dynamic_icmp_errors_out2in_ttl_2(self): - """ NAT44EI handling of error responses to server packets with TTL=2 - """ + """NAT44EI handling of error responses to server packets with TTL=2""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # Client side - create sessions pkts = self.create_stream_in(self.pg0, self.pg1) @@ -1150,9 +1201,13 @@ class TestNAT44EI(MethodHolder): # Client side - simulate ICMP type 11 response capture = self.pg0.get_capture(len(pkts)) - pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - ICMP(type=11) / packet[IP] for packet in capture] + pkts = [ + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / ICMP(type=11) + / packet[IP] + for packet in capture + ] self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1162,20 +1217,22 @@ class TestNAT44EI(MethodHolder): self.verify_capture_out_with_icmp_errors(capture) def test_ping_out_interface_from_outside(self): - """ NAT44EI ping out interface from outside network """ + """NAT44EI ping out interface from outside network""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - - p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / - IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) / - ICMP(id=self.icmp_id_out, type='echo-request')) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) + + p = ( + Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) + / IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) + / ICMP(id=self.icmp_id_out, type="echo-request") + ) pkts = [p] self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) @@ -1188,26 +1245,29 @@ class TestNAT44EI(MethodHolder): self.assertEqual(packet[ICMP].id, self.icmp_id_in) self.assertEqual(packet[ICMP].type, 0) # echo reply except: - self.logger.error(ppp("Unexpected or invalid packet " - "(outside network):", packet)) + self.logger.error( + ppp("Unexpected or invalid packet (outside network):", packet) + ) raise def test_ping_internal_host_from_outside(self): - """ NAT44EI ping internal host from outside network """ + """NAT44EI ping internal host from outside network""" self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # out2in - pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) / - ICMP(id=self.icmp_id_out, type='echo-request')) + pkt = ( + Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) + / ICMP(id=self.icmp_id_out, type="echo-request") + ) self.pg1.add_stream(pkt) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1216,9 +1276,11 @@ class TestNAT44EI(MethodHolder): self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp) # in2out - pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) / - ICMP(id=self.icmp_id_in, type='echo-reply')) + pkt = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) + / ICMP(id=self.icmp_id_in, type="echo-reply") + ) self.pg0.add_stream(pkt) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1227,25 +1289,27 @@ class TestNAT44EI(MethodHolder): self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp) def test_forwarding(self): - """ NAT44EI forwarding test """ + """NAT44EI forwarding test""" flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) self.vapi.nat44_ei_forwarding_enable_disable(enable=1) real_ip = self.pg0.remote_ip4 alias_ip = self.nat_addr flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING self.vapi.nat44_ei_add_del_static_mapping( - is_add=1, local_ip_address=real_ip, + is_add=1, + local_ip_address=real_ip, external_ip_address=alias_ip, external_sw_if_index=0xFFFFFFFF, - flags=flags) + flags=flags, + ) try: # static mapping match @@ -1269,9 +1333,9 @@ class TestNAT44EI(MethodHolder): host0 = self.pg0.remote_hosts[0] self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1] try: - pkts = self.create_stream_out(self.pg1, - dst_ip=self.pg0.remote_ip4, - use_inside_ports=True) + pkts = self.create_stream_out( + self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True + ) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1283,8 +1347,9 @@ class TestNAT44EI(MethodHolder): self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) - self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4, - same_port=True) + self.verify_capture_out( + capture, nat_ip=self.pg0.remote_ip4, same_port=True + ) finally: self.pg0.remote_hosts[0] = host0 @@ -1296,10 +1361,11 @@ class TestNAT44EI(MethodHolder): local_ip_address=real_ip, external_ip_address=alias_ip, external_sw_if_index=0xFFFFFFFF, - flags=flags) + flags=flags, + ) def test_static_in(self): - """ NAT44EI 1:1 NAT initialized from inside network """ + """NAT44EI 1:1 NAT initialized from inside network""" nat_ip = "10.0.0.10" self.tcp_port_out = 6303 @@ -1309,14 +1375,14 @@ class TestNAT44EI(MethodHolder): self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) sm = self.vapi.nat44_ei_static_mapping_dump() self.assertEqual(len(sm), 1) - self.assertEqual(sm[0].tag, '') + self.assertEqual(sm[0].tag, "") self.assertEqual(sm[0].protocol, 0) self.assertEqual(sm[0].local_port, 0) self.assertEqual(sm[0].external_port, 0) @@ -1338,7 +1404,7 @@ class TestNAT44EI(MethodHolder): self.verify_capture_in(capture, self.pg0) def test_static_out(self): - """ NAT44EI 1:1 NAT initialized from outside network """ + """NAT44EI 1:1 NAT initialized from outside network""" nat_ip = "10.0.0.20" self.tcp_port_out = 6303 @@ -1349,11 +1415,11 @@ class TestNAT44EI(MethodHolder): self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) sm = self.vapi.nat44_ei_static_mapping_dump() self.assertEqual(len(sm), 1) self.assertEqual(sm[0].tag, tag) @@ -1375,29 +1441,41 @@ class TestNAT44EI(MethodHolder): self.verify_capture_out(capture, nat_ip, True) def test_static_with_port_in(self): - """ NAT44EI 1:1 NAPT initialized from inside network """ + """NAT44EI 1:1 NAPT initialized from inside network""" self.tcp_port_out = 3606 self.udp_port_out = 3607 self.icmp_id_out = 3608 self.nat44_add_address(self.nat_addr) - self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr, - self.tcp_port_in, self.tcp_port_out, - proto=IP_PROTOS.tcp) - self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr, - self.udp_port_in, self.udp_port_out, - proto=IP_PROTOS.udp) - self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr, - self.icmp_id_in, self.icmp_id_out, - proto=IP_PROTOS.icmp) + self.nat44_add_static_mapping( + self.pg0.remote_ip4, + self.nat_addr, + self.tcp_port_in, + self.tcp_port_out, + proto=IP_PROTOS.tcp, + ) + self.nat44_add_static_mapping( + self.pg0.remote_ip4, + self.nat_addr, + self.udp_port_in, + self.udp_port_out, + proto=IP_PROTOS.udp, + ) + self.nat44_add_static_mapping( + self.pg0.remote_ip4, + self.nat_addr, + self.icmp_id_in, + self.icmp_id_out, + proto=IP_PROTOS.icmp, + ) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # in2out pkts = self.create_stream_in(self.pg0, self.pg1) @@ -1416,29 +1494,41 @@ class TestNAT44EI(MethodHolder): self.verify_capture_in(capture, self.pg0) def test_static_with_port_out(self): - """ NAT44EI 1:1 NAPT initialized from outside network """ + """NAT44EI 1:1 NAPT initialized from outside network""" self.tcp_port_out = 30606 self.udp_port_out = 30607 self.icmp_id_out = 30608 self.nat44_add_address(self.nat_addr) - self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr, - self.tcp_port_in, self.tcp_port_out, - proto=IP_PROTOS.tcp) - self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr, - self.udp_port_in, self.udp_port_out, - proto=IP_PROTOS.udp) - self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr, - self.icmp_id_in, self.icmp_id_out, - proto=IP_PROTOS.icmp) + self.nat44_add_static_mapping( + self.pg0.remote_ip4, + self.nat_addr, + self.tcp_port_in, + self.tcp_port_out, + proto=IP_PROTOS.tcp, + ) + self.nat44_add_static_mapping( + self.pg0.remote_ip4, + self.nat_addr, + self.udp_port_in, + self.udp_port_out, + proto=IP_PROTOS.udp, + ) + self.nat44_add_static_mapping( + self.pg0.remote_ip4, + self.nat_addr, + self.icmp_id_in, + self.icmp_id_out, + proto=IP_PROTOS.icmp, + ) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # out2in pkts = self.create_stream_out(self.pg1) @@ -1457,7 +1547,7 @@ class TestNAT44EI(MethodHolder): self.verify_capture_out(capture) def test_static_vrf_aware(self): - """ NAT44EI 1:1 NAT VRF awareness """ + """NAT44EI 1:1 NAT VRF awareness""" nat_ip1 = "10.0.0.30" nat_ip2 = "10.0.0.40" @@ -1465,20 +1555,18 @@ class TestNAT44EI(MethodHolder): self.udp_port_out = 6304 self.icmp_id_out = 6305 - self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1, - vrf_id=10) - self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2, - vrf_id=10) + self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1, vrf_id=10) + self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2, vrf_id=10) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg3.sw_if_index, - is_add=1) + sw_if_index=self.pg3.sw_if_index, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg4.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1 + ) # inside interface VRF match NAT44EI static mapping VRF pkts = self.create_stream_in(self.pg4, self.pg3) @@ -1497,7 +1585,7 @@ class TestNAT44EI(MethodHolder): self.pg3.assert_nothing_captured() def test_dynamic_to_static(self): - """ NAT44EI Switch from dynamic translation to 1:1NAT """ + """NAT44EI Switch from dynamic translation to 1:1NAT""" nat_ip = "10.0.0.10" self.tcp_port_out = 6303 self.udp_port_out = 6304 @@ -1506,11 +1594,11 @@ class TestNAT44EI(MethodHolder): self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # dynamic pkts = self.create_stream_in(self.pg0, self.pg1) @@ -1532,22 +1620,27 @@ class TestNAT44EI(MethodHolder): self.verify_capture_out(capture, nat_ip, True) def test_identity_nat(self): - """ NAT44EI Identity NAT """ + """NAT44EI Identity NAT""" flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING self.vapi.nat44_ei_add_del_identity_mapping( - ip_address=self.pg0.remote_ip4, sw_if_index=0xFFFFFFFF, - flags=flags, is_add=1) + ip_address=self.pg0.remote_ip4, + sw_if_index=0xFFFFFFFF, + flags=flags, + is_add=1, + ) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) / - TCP(sport=12345, dport=56789)) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) + + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) + / TCP(sport=12345, dport=56789) + ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1569,26 +1662,29 @@ class TestNAT44EI(MethodHolder): self.assertEqual(len(sessions), 0) flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING self.vapi.nat44_ei_add_del_identity_mapping( - ip_address=self.pg0.remote_ip4, sw_if_index=0xFFFFFFFF, - flags=flags, vrf_id=1, is_add=1) + ip_address=self.pg0.remote_ip4, + sw_if_index=0xFFFFFFFF, + flags=flags, + vrf_id=1, + is_add=1, + ) identity_mappings = self.vapi.nat44_ei_identity_mapping_dump() self.assertEqual(len(identity_mappings), 2) def test_multiple_inside_interfaces(self): - """ NAT44EI multiple non-overlapping address space inside interfaces - """ + """NAT44EI multiple non-overlapping address space inside interfaces""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg3.sw_if_index, - is_add=1) + sw_if_index=self.pg3.sw_if_index, is_add=1 + ) # between two NAT44EI inside interfaces (no translation) pkts = self.create_stream_in(self.pg0, self.pg1) @@ -1639,26 +1735,24 @@ class TestNAT44EI(MethodHolder): self.verify_capture_in(capture, self.pg1) def test_inside_overlapping_interfaces(self): - """ NAT44EI multiple inside interfaces with overlapping address space - """ + """NAT44EI multiple inside interfaces with overlapping address space""" static_nat_ip = "10.0.0.10" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg3.sw_if_index, - is_add=1) + sw_if_index=self.pg3.sw_if_index, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg4.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg5.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg5.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg6.sw_if_index, - flags=flags, is_add=1) - self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip, - vrf_id=20) + sw_if_index=self.pg6.sw_if_index, flags=flags, is_add=1 + ) + self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip, vrf_id=20) # between NAT44EI inside interfaces with same VRF (no translation) pkts = self.create_stream_in(self.pg4, self.pg5) @@ -1669,9 +1763,11 @@ class TestNAT44EI(MethodHolder): self.verify_capture_no_translation(capture, self.pg4, self.pg5) # between NAT44EI inside interfaces with different VRF (hairpinning) - p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) / - IP(src=self.pg4.remote_ip4, dst=static_nat_ip) / - TCP(sport=1234, dport=5678)) + p = ( + Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) + / IP(src=self.pg4.remote_ip4, dst=static_nat_ip) + / TCP(sport=1234, dport=5678) + ) self.pg4.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1723,16 +1819,12 @@ class TestNAT44EI(MethodHolder): # pg5 session dump addresses = self.vapi.nat44_ei_address_dump() self.assertEqual(len(addresses), 1) - sessions = self.vapi.nat44_ei_user_session_dump( - self.pg5.remote_ip4, 10) + sessions = self.vapi.nat44_ei_user_session_dump(self.pg5.remote_ip4, 10) self.assertEqual(len(sessions), 3) for session in sessions: - self.assertFalse(session.flags & - self.config_flags.NAT44_EI_STATIC_MAPPING) - self.assertEqual(str(session.inside_ip_address), - self.pg5.remote_ip4) - self.assertEqual(session.outside_ip_address, - addresses[0].ip_address) + self.assertFalse(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING) + self.assertEqual(str(session.inside_ip_address), self.pg5.remote_ip4) + self.assertEqual(session.outside_ip_address, addresses[0].ip_address) self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp) self.assertEqual(sessions[1].protocol, IP_PROTOS.udp) self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp) @@ -1765,44 +1857,38 @@ class TestNAT44EI(MethodHolder): addresses = self.vapi.nat44_ei_address_dump() self.assertEqual(len(addresses), 1) for user in users: - sessions = self.vapi.nat44_ei_user_session_dump(user.ip_address, - user.vrf_id) + sessions = self.vapi.nat44_ei_user_session_dump( + user.ip_address, user.vrf_id + ) for session in sessions: self.assertEqual(user.ip_address, session.inside_ip_address) self.assertTrue(session.total_bytes > session.total_pkts > 0) - self.assertTrue(session.protocol in - [IP_PROTOS.tcp, IP_PROTOS.udp, - IP_PROTOS.icmp]) + self.assertTrue( + session.protocol in [IP_PROTOS.tcp, IP_PROTOS.udp, IP_PROTOS.icmp] + ) # pg4 session dump - sessions = self.vapi.nat44_ei_user_session_dump( - self.pg4.remote_ip4, 10) + sessions = self.vapi.nat44_ei_user_session_dump(self.pg4.remote_ip4, 10) self.assertGreaterEqual(len(sessions), 4) for session in sessions: - self.assertFalse( - session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING) - self.assertEqual(str(session.inside_ip_address), - self.pg4.remote_ip4) - self.assertEqual(session.outside_ip_address, - addresses[0].ip_address) + self.assertFalse(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING) + self.assertEqual(str(session.inside_ip_address), self.pg4.remote_ip4) + self.assertEqual(session.outside_ip_address, addresses[0].ip_address) # pg6 session dump - sessions = self.vapi.nat44_ei_user_session_dump( - self.pg6.remote_ip4, 20) + sessions = self.vapi.nat44_ei_user_session_dump(self.pg6.remote_ip4, 20) self.assertGreaterEqual(len(sessions), 3) for session in sessions: + self.assertTrue(session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING) + self.assertEqual(str(session.inside_ip_address), self.pg6.remote_ip4) + self.assertEqual(str(session.outside_ip_address), static_nat_ip) self.assertTrue( - session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING) - self.assertEqual(str(session.inside_ip_address), - self.pg6.remote_ip4) - self.assertEqual(str(session.outside_ip_address), - static_nat_ip) - self.assertTrue(session.inside_port in - [self.tcp_port_in, self.udp_port_in, - self.icmp_id_in]) + session.inside_port + in [self.tcp_port_in, self.udp_port_in, self.icmp_id_in] + ) def test_hairpinning(self): - """ NAT44EI hairpinning - 1:1 NAPT """ + """NAT44EI hairpinning - 1:1 NAPT""" host = self.pg0.remote_hosts[0] server = self.pg0.remote_hosts[1] @@ -1814,22 +1900,28 @@ class TestNAT44EI(MethodHolder): self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # add static mapping for server - self.nat44_add_static_mapping(server.ip4, self.nat_addr, - server_in_port, server_out_port, - proto=IP_PROTOS.tcp) - - cnt = self.statistics['/nat44-ei/hairpinning'] + self.nat44_add_static_mapping( + server.ip4, + self.nat_addr, + server_in_port, + server_out_port, + proto=IP_PROTOS.tcp, + ) + + cnt = self.statistics["/nat44-ei/hairpinning"] # send packet from host to server - p = (Ether(src=host.mac, dst=self.pg0.local_mac) / - IP(src=host.ip4, dst=self.nat_addr) / - TCP(sport=host_in_port, dport=server_out_port)) + p = ( + Ether(src=host.mac, dst=self.pg0.local_mac) + / IP(src=host.ip4, dst=self.nat_addr) + / TCP(sport=host_in_port, dport=server_out_port) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1848,14 +1940,16 @@ class TestNAT44EI(MethodHolder): self.logger.error(ppp("Unexpected or invalid packet:", p)) raise - after = self.statistics['/nat44-ei/hairpinning'] + after = self.statistics["/nat44-ei/hairpinning"] if_idx = self.pg0.sw_if_index self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), 1) # send reply from server to host - p = (Ether(src=server.mac, dst=self.pg0.local_mac) / - IP(src=server.ip4, dst=self.nat_addr) / - TCP(sport=server_in_port, dport=host_out_port)) + p = ( + Ether(src=server.mac, dst=self.pg0.local_mac) + / IP(src=server.ip4, dst=self.nat_addr) + / TCP(sport=server_in_port, dport=host_out_port) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1873,13 +1967,15 @@ class TestNAT44EI(MethodHolder): self.logger.error(ppp("Unexpected or invalid packet:", p)) raise - after = self.statistics['/nat44-ei/hairpinning'] + after = self.statistics["/nat44-ei/hairpinning"] if_idx = self.pg0.sw_if_index - self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), - 2+(1 if self.vpp_worker_count > 0 else 0)) + self.assertEqual( + after[:, if_idx].sum() - cnt[:, if_idx].sum(), + 2 + (1 if self.vpp_worker_count > 0 else 0), + ) def test_hairpinning2(self): - """ NAT44EI hairpinning - 1:1 NAT""" + """NAT44EI hairpinning - 1:1 NAT""" server1_nat_ip = "10.0.0.10" server2_nat_ip = "10.0.0.11" @@ -1892,11 +1988,11 @@ class TestNAT44EI(MethodHolder): self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # add static mapping for servers self.nat44_add_static_mapping(server1.ip4, server1_nat_ip) @@ -1904,17 +2000,23 @@ class TestNAT44EI(MethodHolder): # host to server1 pkts = [] - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=host.ip4, dst=server1_nat_ip) / - TCP(sport=self.tcp_port_in, dport=server_tcp_port)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=host.ip4, dst=server1_nat_ip) + / TCP(sport=self.tcp_port_in, dport=server_tcp_port) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=host.ip4, dst=server1_nat_ip) / - UDP(sport=self.udp_port_in, dport=server_udp_port)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=host.ip4, dst=server1_nat_ip) + / UDP(sport=self.udp_port_in, dport=server_udp_port) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=host.ip4, dst=server1_nat_ip) / - ICMP(id=self.icmp_id_in, type='echo-request')) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=host.ip4, dst=server1_nat_ip) + / ICMP(id=self.icmp_id_in, type="echo-request") + ) pkts.append(p) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) @@ -1942,17 +2044,23 @@ class TestNAT44EI(MethodHolder): # server1 to host pkts = [] - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server1.ip4, dst=self.nat_addr) / - TCP(sport=server_tcp_port, dport=self.tcp_port_out)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server1.ip4, dst=self.nat_addr) + / TCP(sport=server_tcp_port, dport=self.tcp_port_out) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server1.ip4, dst=self.nat_addr) / - UDP(sport=server_udp_port, dport=self.udp_port_out)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server1.ip4, dst=self.nat_addr) + / UDP(sport=server_udp_port, dport=self.udp_port_out) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server1.ip4, dst=self.nat_addr) / - ICMP(id=self.icmp_id_out, type='echo-reply')) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server1.ip4, dst=self.nat_addr) + / ICMP(id=self.icmp_id_out, type="echo-reply") + ) pkts.append(p) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) @@ -1977,17 +2085,23 @@ class TestNAT44EI(MethodHolder): # server2 to server1 pkts = [] - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server2.ip4, dst=server1_nat_ip) / - TCP(sport=self.tcp_port_in, dport=server_tcp_port)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server2.ip4, dst=server1_nat_ip) + / TCP(sport=self.tcp_port_in, dport=server_tcp_port) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server2.ip4, dst=server1_nat_ip) / - UDP(sport=self.udp_port_in, dport=server_udp_port)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server2.ip4, dst=server1_nat_ip) + / UDP(sport=self.udp_port_in, dport=server_udp_port) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server2.ip4, dst=server1_nat_ip) / - ICMP(id=self.icmp_id_in, type='echo-request')) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server2.ip4, dst=server1_nat_ip) + / ICMP(id=self.icmp_id_in, type="echo-request") + ) pkts.append(p) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) @@ -2015,17 +2129,23 @@ class TestNAT44EI(MethodHolder): # server1 to server2 pkts = [] - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server1.ip4, dst=server2_nat_ip) / - TCP(sport=server_tcp_port, dport=self.tcp_port_out)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server1.ip4, dst=server2_nat_ip) + / TCP(sport=server_tcp_port, dport=self.tcp_port_out) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server1.ip4, dst=server2_nat_ip) / - UDP(sport=server_udp_port, dport=self.udp_port_out)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server1.ip4, dst=server2_nat_ip) + / UDP(sport=server_udp_port, dport=self.udp_port_out) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server1.ip4, dst=server2_nat_ip) / - ICMP(id=self.icmp_id_out, type='echo-reply')) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server1.ip4, dst=server2_nat_ip) + / ICMP(id=self.icmp_id_out, type="echo-reply") + ) pkts.append(p) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) @@ -2049,7 +2169,7 @@ class TestNAT44EI(MethodHolder): raise def test_hairpinning_avoid_inf_loop(self): - """ NAT44EI hairpinning - 1:1 NAPT avoid infinite loop """ + """NAT44EI hairpinning - 1:1 NAPT avoid infinite loop""" host = self.pg0.remote_hosts[0] server = self.pg0.remote_hosts[1] @@ -2061,34 +2181,42 @@ class TestNAT44EI(MethodHolder): self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # add static mapping for server - self.nat44_add_static_mapping(server.ip4, self.nat_addr, - server_in_port, server_out_port, - proto=IP_PROTOS.tcp) + self.nat44_add_static_mapping( + server.ip4, + self.nat_addr, + server_in_port, + server_out_port, + proto=IP_PROTOS.tcp, + ) # add another static mapping that maps pg0.local_ip4 address to itself self.nat44_add_static_mapping(self.pg0.local_ip4, self.pg0.local_ip4) # send packet from host to VPP (the packet should get dropped) - p = (Ether(src=host.mac, dst=self.pg0.local_mac) / - IP(src=host.ip4, dst=self.pg0.local_ip4) / - TCP(sport=host_in_port, dport=server_out_port)) + p = ( + Ether(src=host.mac, dst=self.pg0.local_mac) + / IP(src=host.ip4, dst=self.pg0.local_ip4) + / TCP(sport=host_in_port, dport=server_out_port) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() # Here VPP used to crash due to an infinite loop - cnt = self.statistics['/nat44-ei/hairpinning'] + cnt = self.statistics["/nat44-ei/hairpinning"] # send packet from host to server - p = (Ether(src=host.mac, dst=self.pg0.local_mac) / - IP(src=host.ip4, dst=self.nat_addr) / - TCP(sport=host_in_port, dport=server_out_port)) + p = ( + Ether(src=host.mac, dst=self.pg0.local_mac) + / IP(src=host.ip4, dst=self.nat_addr) + / TCP(sport=host_in_port, dport=server_out_port) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2107,14 +2235,16 @@ class TestNAT44EI(MethodHolder): self.logger.error(ppp("Unexpected or invalid packet:", p)) raise - after = self.statistics['/nat44-ei/hairpinning'] + after = self.statistics["/nat44-ei/hairpinning"] if_idx = self.pg0.sw_if_index self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), 1) # send reply from server to host - p = (Ether(src=server.mac, dst=self.pg0.local_mac) / - IP(src=server.ip4, dst=self.nat_addr) / - TCP(sport=server_in_port, dport=host_out_port)) + p = ( + Ether(src=server.mac, dst=self.pg0.local_mac) + / IP(src=server.ip4, dst=self.nat_addr) + / TCP(sport=server_in_port, dport=host_out_port) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2132,16 +2262,18 @@ class TestNAT44EI(MethodHolder): self.logger.error(ppp("Unexpected or invalid packet:", p)) raise - after = self.statistics['/nat44-ei/hairpinning'] + after = self.statistics["/nat44-ei/hairpinning"] if_idx = self.pg0.sw_if_index - self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), - 2+(1 if self.vpp_worker_count > 0 else 0)) + self.assertEqual( + after[:, if_idx].sum() - cnt[:, if_idx].sum(), + 2 + (1 if self.vpp_worker_count > 0 else 0), + ) def test_interface_addr(self): - """ NAT44EI acquire addresses from interface """ + """NAT44EI acquire addresses from interface""" self.vapi.nat44_ei_add_del_interface_addr( - is_add=1, - sw_if_index=self.pg7.sw_if_index) + is_add=1, sw_if_index=self.pg7.sw_if_index + ) # no address in NAT pool addresses = self.vapi.nat44_ei_address_dump() @@ -2159,22 +2291,20 @@ class TestNAT44EI(MethodHolder): self.assertEqual(0, len(addresses)) def test_interface_addr_static_mapping(self): - """ NAT44EI Static mapping with addresses from interface """ + """NAT44EI Static mapping with addresses from interface""" tag = "testTAG" self.vapi.nat44_ei_add_del_interface_addr( - is_add=1, - sw_if_index=self.pg7.sw_if_index) + is_add=1, sw_if_index=self.pg7.sw_if_index + ) self.nat44_add_static_mapping( - '1.2.3.4', - external_sw_if_index=self.pg7.sw_if_index, - tag=tag) + "1.2.3.4", external_sw_if_index=self.pg7.sw_if_index, tag=tag + ) # static mappings with external interface static_mappings = self.vapi.nat44_ei_static_mapping_dump() self.assertEqual(1, len(static_mappings)) - self.assertEqual(self.pg7.sw_if_index, - static_mappings[0].external_sw_if_index) + self.assertEqual(self.pg7.sw_if_index, static_mappings[0].external_sw_if_index) self.assertEqual(static_mappings[0].tag, tag) # configure interface address and check static mappings @@ -2184,8 +2314,7 @@ class TestNAT44EI(MethodHolder): resolved = False for sm in static_mappings: if sm.external_sw_if_index == 0xFFFFFFFF: - self.assertEqual(str(sm.external_ip_address), - self.pg7.local_ip4) + self.assertEqual(str(sm.external_ip_address), self.pg7.local_ip4) self.assertEqual(sm.tag, tag) resolved = True self.assertTrue(resolved) @@ -2194,8 +2323,7 @@ class TestNAT44EI(MethodHolder): self.pg7.unconfig_ip4() static_mappings = self.vapi.nat44_ei_static_mapping_dump() self.assertEqual(1, len(static_mappings)) - self.assertEqual(self.pg7.sw_if_index, - static_mappings[0].external_sw_if_index) + self.assertEqual(self.pg7.sw_if_index, static_mappings[0].external_sw_if_index) self.assertEqual(static_mappings[0].tag, tag) # configure interface address again and check static mappings @@ -2205,40 +2333,37 @@ class TestNAT44EI(MethodHolder): resolved = False for sm in static_mappings: if sm.external_sw_if_index == 0xFFFFFFFF: - self.assertEqual(str(sm.external_ip_address), - self.pg7.local_ip4) + self.assertEqual(str(sm.external_ip_address), self.pg7.local_ip4) self.assertEqual(sm.tag, tag) resolved = True self.assertTrue(resolved) # remove static mapping self.nat44_add_static_mapping( - '1.2.3.4', - external_sw_if_index=self.pg7.sw_if_index, - tag=tag, - is_add=0) + "1.2.3.4", external_sw_if_index=self.pg7.sw_if_index, tag=tag, is_add=0 + ) static_mappings = self.vapi.nat44_ei_static_mapping_dump() self.assertEqual(0, len(static_mappings)) def test_interface_addr_identity_nat(self): - """ NAT44EI Identity NAT with addresses from interface """ + """NAT44EI Identity NAT with addresses from interface""" port = 53053 self.vapi.nat44_ei_add_del_interface_addr( - is_add=1, - sw_if_index=self.pg7.sw_if_index) + is_add=1, sw_if_index=self.pg7.sw_if_index + ) self.vapi.nat44_ei_add_del_identity_mapping( - ip_address=b'0', + ip_address=b"0", sw_if_index=self.pg7.sw_if_index, port=port, protocol=IP_PROTOS.tcp, - is_add=1) + is_add=1, + ) # identity mappings with external interface identity_mappings = self.vapi.nat44_ei_identity_mapping_dump() self.assertEqual(1, len(identity_mappings)) - self.assertEqual(self.pg7.sw_if_index, - identity_mappings[0].sw_if_index) + self.assertEqual(self.pg7.sw_if_index, identity_mappings[0].sw_if_index) # configure interface address and check identity mappings self.pg7.config_ip4() @@ -2247,8 +2372,9 @@ class TestNAT44EI(MethodHolder): self.assertEqual(2, len(identity_mappings)) for sm in identity_mappings: if sm.sw_if_index == 0xFFFFFFFF: - self.assertEqual(str(identity_mappings[0].ip_address), - self.pg7.local_ip4) + self.assertEqual( + str(identity_mappings[0].ip_address), self.pg7.local_ip4 + ) self.assertEqual(port, identity_mappings[0].port) self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol) resolved = True @@ -2258,11 +2384,10 @@ class TestNAT44EI(MethodHolder): self.pg7.unconfig_ip4() identity_mappings = self.vapi.nat44_ei_identity_mapping_dump() self.assertEqual(1, len(identity_mappings)) - self.assertEqual(self.pg7.sw_if_index, - identity_mappings[0].sw_if_index) + self.assertEqual(self.pg7.sw_if_index, identity_mappings[0].sw_if_index) def test_ipfix_nat44_sess(self): - """ NAT44EI IPFIX logging NAT44EI session created/deleted """ + """NAT44EI IPFIX logging NAT44EI session created/deleted""" self.ipfix_domain_id = 10 self.ipfix_src_port = 20202 collector_port = 30303 @@ -2270,19 +2395,21 @@ class TestNAT44EI(MethodHolder): self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4, - src_address=self.pg3.local_ip4, - path_mtu=512, - template_interval=10, - collector_port=collector_port) - self.vapi.nat44_ei_ipfix_enable_disable(domain_id=self.ipfix_domain_id, - src_port=self.ipfix_src_port, - enable=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) + self.vapi.set_ipfix_exporter( + collector_address=self.pg3.remote_ip4, + src_address=self.pg3.local_ip4, + path_mtu=512, + template_interval=10, + collector_port=collector_port, + ) + self.vapi.nat44_ei_ipfix_enable_disable( + domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1 + ) pkts = self.create_stream_in(self.pg0, self.pg1) self.pg0.add_stream(pkts) @@ -2301,8 +2428,7 @@ class TestNAT44EI(MethodHolder): self.assertEqual(p[IP].dst, self.pg3.remote_ip4) self.assertEqual(p[UDP].sport, self.ipfix_src_port) self.assertEqual(p[UDP].dport, collector_port) - self.assertEqual(p[IPFIX].observationDomainID, - self.ipfix_domain_id) + self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id) if p.haslayer(Template): ipfix.add_template(p.getlayer(Template)) # verify events in data set @@ -2312,25 +2438,29 @@ class TestNAT44EI(MethodHolder): self.verify_ipfix_nat44_ses(data) def test_ipfix_addr_exhausted(self): - """ NAT44EI IPFIX logging NAT addresses exhausted """ + """NAT44EI IPFIX logging NAT addresses exhausted""" flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4, - src_address=self.pg3.local_ip4, - path_mtu=512, - template_interval=10) - self.vapi.nat44_ei_ipfix_enable_disable(domain_id=self.ipfix_domain_id, - src_port=self.ipfix_src_port, - enable=1) - - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=3025)) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) + self.vapi.set_ipfix_exporter( + collector_address=self.pg3.remote_ip4, + src_address=self.pg3.local_ip4, + path_mtu=512, + template_interval=10, + ) + self.vapi.nat44_ei_ipfix_enable_disable( + domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1 + ) + + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=3025) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2345,8 +2475,7 @@ class TestNAT44EI(MethodHolder): self.assertEqual(p[IP].dst, self.pg3.remote_ip4) self.assertEqual(p[UDP].sport, self.ipfix_src_port) self.assertEqual(p[UDP].dport, 4739) - self.assertEqual(p[IPFIX].observationDomainID, - self.ipfix_domain_id) + self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id) if p.haslayer(Template): ipfix.add_template(p.getlayer(Template)) # verify events in data set @@ -2356,15 +2485,15 @@ class TestNAT44EI(MethodHolder): self.verify_ipfix_addr_exhausted(data) def test_ipfix_max_sessions(self): - """ NAT44EI IPFIX logging maximum session entries exceeded """ + """NAT44EI IPFIX logging maximum session entries exceeded""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) max_sessions_per_thread = self.max_translations max_sessions = max(1, self.vpp_worker_count) * max_sessions_per_thread @@ -2372,26 +2501,32 @@ class TestNAT44EI(MethodHolder): pkts = [] for i in range(0, max_sessions): src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=src, dst=self.pg1.remote_ip4) / - TCP(sport=1025)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=src, dst=self.pg1.remote_ip4) + / TCP(sport=1025) + ) pkts.append(p) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.pg1.get_capture(max_sessions) - self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4, - src_address=self.pg3.local_ip4, - path_mtu=512, - template_interval=10) - self.vapi.nat44_ei_ipfix_enable_disable(domain_id=self.ipfix_domain_id, - src_port=self.ipfix_src_port, - enable=1) - - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=1025)) + self.vapi.set_ipfix_exporter( + collector_address=self.pg3.remote_ip4, + src_address=self.pg3.local_ip4, + path_mtu=512, + template_interval=10, + ) + self.vapi.nat44_ei_ipfix_enable_disable( + domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=1 + ) + + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=1025) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2406,8 +2541,7 @@ class TestNAT44EI(MethodHolder): self.assertEqual(p[IP].dst, self.pg3.remote_ip4) self.assertEqual(p[UDP].sport, self.ipfix_src_port) self.assertEqual(p[UDP].dport, 4739) - self.assertEqual(p[IPFIX].observationDomainID, - self.ipfix_domain_id) + self.assertEqual(p[IPFIX].observationDomainID, self.ipfix_domain_id) if p.haslayer(Template): ipfix.add_template(p.getlayer(Template)) # verify events in data set @@ -2417,22 +2551,23 @@ class TestNAT44EI(MethodHolder): self.verify_ipfix_max_sessions(data, max_sessions_per_thread) def test_syslog_apmap(self): - """ NAT44EI syslog address and port mapping creation and deletion """ - self.vapi.syslog_set_filter( - self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO) + """NAT44EI syslog address and port mapping creation and deletion""" + self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO) self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4) self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=20)) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) + + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=self.tcp_port_in, dport=20) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2448,22 +2583,25 @@ class TestNAT44EI(MethodHolder): self.verify_syslog_apmap(capture[0][Raw].load, False) def test_pool_addr_fib(self): - """ NAT44EI add pool addresses to FIB """ - static_addr = '10.0.0.10' + """NAT44EI add pool addresses to FIB""" + static_addr = "10.0.0.10" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr) # NAT44EI address - p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') / - ARP(op=ARP.who_has, pdst=self.nat_addr, - psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac)) + p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP( + op=ARP.who_has, + pdst=self.nat_addr, + psrc=self.pg1.remote_ip4, + hwsrc=self.pg1.remote_mac, + ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2472,9 +2610,12 @@ class TestNAT44EI(MethodHolder): self.assertTrue(capture[0][ARP].op, ARP.is_at) # 1:1 NAT address - p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') / - ARP(op=ARP.who_has, pdst=static_addr, - psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac)) + p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP( + op=ARP.who_has, + pdst=static_addr, + psrc=self.pg1.remote_ip4, + hwsrc=self.pg1.remote_mac, + ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2483,9 +2624,12 @@ class TestNAT44EI(MethodHolder): self.assertTrue(capture[0][ARP].op, ARP.is_at) # send ARP to non-NAT44EI interface - p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') / - ARP(op=ARP.who_has, pdst=self.nat_addr, - psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac)) + p = Ether(src=self.pg2.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP( + op=ARP.who_has, + pdst=self.nat_addr, + psrc=self.pg2.remote_ip4, + hwsrc=self.pg2.remote_mac, + ) self.pg2.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2493,27 +2637,32 @@ class TestNAT44EI(MethodHolder): # remove addresses and verify self.nat44_add_address(self.nat_addr, is_add=0) - self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr, - is_add=0) - - p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') / - ARP(op=ARP.who_has, pdst=self.nat_addr, - psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac)) + self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr, is_add=0) + + p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP( + op=ARP.who_has, + pdst=self.nat_addr, + psrc=self.pg1.remote_ip4, + hwsrc=self.pg1.remote_mac, + ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.pg1.assert_nothing_captured() - p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') / - ARP(op=ARP.who_has, pdst=static_addr, - psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac)) + p = Ether(src=self.pg1.remote_mac, dst="ff:ff:ff:ff:ff:ff") / ARP( + op=ARP.who_has, + pdst=static_addr, + psrc=self.pg1.remote_ip4, + hwsrc=self.pg1.remote_mac, + ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.pg1.assert_nothing_captured() def test_vrf_mode(self): - """ NAT44EI tenant VRF aware address pool mode """ + """NAT44EI tenant VRF aware address pool mode""" vrf_id1 = 1 vrf_id2 = 2 @@ -2522,8 +2671,8 @@ class TestNAT44EI(MethodHolder): self.pg0.unconfig_ip4() self.pg1.unconfig_ip4() - self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id1}) - self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id2}) + self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id1}) + self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id2}) self.pg0.set_table_ip4(vrf_id1) self.pg1.set_table_ip4(vrf_id2) self.pg0.config_ip4() @@ -2535,14 +2684,14 @@ class TestNAT44EI(MethodHolder): self.nat44_add_address(nat_ip2, vrf_id=vrf_id2) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg2.sw_if_index, - is_add=1) + sw_if_index=self.pg2.sw_if_index, is_add=1 + ) try: # first VRF @@ -2570,11 +2719,11 @@ class TestNAT44EI(MethodHolder): self.pg1.config_ip4() self.pg0.resolve_arp() self.pg1.resolve_arp() - self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id1}) - self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id2}) + self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id1}) + self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id2}) def test_vrf_feature_independent(self): - """ NAT44EI tenant VRF independent address pool mode """ + """NAT44EI tenant VRF independent address pool mode""" nat_ip1 = "10.0.0.10" nat_ip2 = "10.0.0.11" @@ -2583,14 +2732,14 @@ class TestNAT44EI(MethodHolder): self.nat44_add_address(nat_ip2, vrf_id=99) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg1.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg2.sw_if_index, - is_add=1) + sw_if_index=self.pg2.sw_if_index, is_add=1 + ) # first VRF pkts = self.create_stream_in(self.pg0, self.pg2) @@ -2609,16 +2758,16 @@ class TestNAT44EI(MethodHolder): self.verify_capture_out(capture, nat_ip1) def test_dynamic_ipless_interfaces(self): - """ NAT44EI interfaces without configured IP address """ + """NAT44EI interfaces without configured IP address""" self.create_routes_and_neigbors() self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg7.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg8.sw_if_index, - is_add=1) + sw_if_index=self.pg8.sw_if_index, is_add=1 + ) # in2out pkts = self.create_stream_in(self.pg7, self.pg8) @@ -2637,17 +2786,17 @@ class TestNAT44EI(MethodHolder): self.verify_capture_in(capture, self.pg7) def test_static_ipless_interfaces(self): - """ NAT44EI interfaces without configured IP address - 1:1 NAT """ + """NAT44EI interfaces without configured IP address - 1:1 NAT""" self.create_routes_and_neigbors() self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg7.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg8.sw_if_index, - is_add=1) + sw_if_index=self.pg8.sw_if_index, is_add=1 + ) # out2in pkts = self.create_stream_out(self.pg8) @@ -2666,7 +2815,7 @@ class TestNAT44EI(MethodHolder): self.verify_capture_out(capture, self.nat_addr, True) def test_static_with_port_ipless_interfaces(self): - """ NAT44EI interfaces without configured IP address - 1:1 NAPT """ + """NAT44EI interfaces without configured IP address - 1:1 NAPT""" self.tcp_port_out = 30606 self.udp_port_out = 30607 @@ -2674,22 +2823,34 @@ class TestNAT44EI(MethodHolder): self.create_routes_and_neigbors() self.nat44_add_address(self.nat_addr) - self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr, - self.tcp_port_in, self.tcp_port_out, - proto=IP_PROTOS.tcp) - self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr, - self.udp_port_in, self.udp_port_out, - proto=IP_PROTOS.udp) - self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr, - self.icmp_id_in, self.icmp_id_out, - proto=IP_PROTOS.icmp) + self.nat44_add_static_mapping( + self.pg7.remote_ip4, + self.nat_addr, + self.tcp_port_in, + self.tcp_port_out, + proto=IP_PROTOS.tcp, + ) + self.nat44_add_static_mapping( + self.pg7.remote_ip4, + self.nat_addr, + self.udp_port_in, + self.udp_port_out, + proto=IP_PROTOS.udp, + ) + self.nat44_add_static_mapping( + self.pg7.remote_ip4, + self.nat_addr, + self.icmp_id_in, + self.icmp_id_out, + proto=IP_PROTOS.icmp, + ) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg7.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg7.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg8.sw_if_index, - is_add=1) + sw_if_index=self.pg8.sw_if_index, is_add=1 + ) # out2in pkts = self.create_stream_out(self.pg8) @@ -2708,23 +2869,25 @@ class TestNAT44EI(MethodHolder): self.verify_capture_out(capture) def test_static_unknown_proto(self): - """ NAT44EI 1:1 translate packet with unknown protocol """ + """NAT44EI 1:1 translate packet with unknown protocol""" nat_ip = "10.0.0.10" self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # in2out - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - GRE() / - IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) / - TCP(sport=1234, dport=1234)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / GRE() + / IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) + / TCP(sport=1234, dport=1234) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2740,11 +2903,13 @@ class TestNAT44EI(MethodHolder): raise # out2in - p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / - IP(src=self.pg1.remote_ip4, dst=nat_ip) / - GRE() / - IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) / - TCP(sport=1234, dport=1234)) + p = ( + Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) + / IP(src=self.pg1.remote_ip4, dst=nat_ip) + / GRE() + / IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) + / TCP(sport=1234, dport=1234) + ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2760,8 +2925,7 @@ class TestNAT44EI(MethodHolder): raise def test_hairpinning_static_unknown_proto(self): - """ NAT44EI 1:1 translate packet with unknown protocol - hairpinning - """ + """NAT44EI 1:1 translate packet with unknown protocol - hairpinning""" host = self.pg0.remote_hosts[0] server = self.pg0.remote_hosts[1] @@ -2773,18 +2937,20 @@ class TestNAT44EI(MethodHolder): self.nat44_add_static_mapping(server.ip4, server_nat_ip) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # host to server - p = (Ether(dst=self.pg0.local_mac, src=host.mac) / - IP(src=host.ip4, dst=server_nat_ip) / - GRE() / - IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) / - TCP(sport=1234, dport=1234)) + p = ( + Ether(dst=self.pg0.local_mac, src=host.mac) + / IP(src=host.ip4, dst=server_nat_ip) + / GRE() + / IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) + / TCP(sport=1234, dport=1234) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2800,11 +2966,13 @@ class TestNAT44EI(MethodHolder): raise # server to host - p = (Ether(dst=self.pg0.local_mac, src=server.mac) / - IP(src=server.ip4, dst=host_nat_ip) / - GRE() / - IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) / - TCP(sport=1234, dport=1234)) + p = ( + Ether(dst=self.pg0.local_mac, src=server.mac) + / IP(src=server.ip4, dst=host_nat_ip) + / GRE() + / IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) + / TCP(sport=1234, dport=1234) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2820,10 +2988,11 @@ class TestNAT44EI(MethodHolder): raise def test_output_feature(self): - """ NAT44EI output feature (in2out postrouting) """ + """NAT44EI output feature (in2out postrouting)""" self.nat44_add_address(self.nat_addr) self.vapi.nat44_ei_add_del_output_interface( - sw_if_index=self.pg3.sw_if_index, is_add=1) + sw_if_index=self.pg3.sw_if_index, is_add=1 + ) # in2out pkts = self.create_stream_in(self.pg0, self.pg3) @@ -2850,25 +3019,32 @@ class TestNAT44EI(MethodHolder): self.verify_capture_no_translation(capture, self.pg2, self.pg0) def test_output_feature_vrf_aware(self): - """ NAT44EI output feature VRF aware (in2out postrouting) """ + """NAT44EI output feature VRF aware (in2out postrouting)""" nat_ip_vrf10 = "10.0.0.10" nat_ip_vrf20 = "10.0.0.20" - r1 = VppIpRoute(self, self.pg3.remote_ip4, 32, - [VppRoutePath(self.pg3.remote_ip4, - self.pg3.sw_if_index)], - table_id=10) - r2 = VppIpRoute(self, self.pg3.remote_ip4, 32, - [VppRoutePath(self.pg3.remote_ip4, - self.pg3.sw_if_index)], - table_id=20) + r1 = VppIpRoute( + self, + self.pg3.remote_ip4, + 32, + [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)], + table_id=10, + ) + r2 = VppIpRoute( + self, + self.pg3.remote_ip4, + 32, + [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)], + table_id=20, + ) r1.add_vpp_config() r2.add_vpp_config() self.nat44_add_address(nat_ip_vrf10, vrf_id=10) self.nat44_add_address(nat_ip_vrf20, vrf_id=20) self.vapi.nat44_ei_add_del_output_interface( - sw_if_index=self.pg3.sw_if_index, is_add=1) + sw_if_index=self.pg3.sw_if_index, is_add=1 + ) # in2out VRF 10 pkts = self.create_stream_in(self.pg4, self.pg3) @@ -2903,7 +3079,7 @@ class TestNAT44EI(MethodHolder): self.verify_capture_in(capture, self.pg6) def test_output_feature_hairpinning(self): - """ NAT44EI output feature hairpinning (in2out postrouting) """ + """NAT44EI output feature hairpinning (in2out postrouting)""" host = self.pg0.remote_hosts[0] server = self.pg0.remote_hosts[1] host_in_port = 1234 @@ -2913,19 +3089,27 @@ class TestNAT44EI(MethodHolder): self.nat44_add_address(self.nat_addr) self.vapi.nat44_ei_add_del_output_interface( - sw_if_index=self.pg0.sw_if_index, is_add=1) + sw_if_index=self.pg0.sw_if_index, is_add=1 + ) self.vapi.nat44_ei_add_del_output_interface( - sw_if_index=self.pg1.sw_if_index, is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # add static mapping for server - self.nat44_add_static_mapping(server.ip4, self.nat_addr, - server_in_port, server_out_port, - proto=IP_PROTOS.tcp) + self.nat44_add_static_mapping( + server.ip4, + self.nat_addr, + server_in_port, + server_out_port, + proto=IP_PROTOS.tcp, + ) # send packet from host to server - p = (Ether(src=host.mac, dst=self.pg0.local_mac) / - IP(src=host.ip4, dst=self.nat_addr) / - TCP(sport=host_in_port, dport=server_out_port)) + p = ( + Ether(src=host.mac, dst=self.pg0.local_mac) + / IP(src=host.ip4, dst=self.nat_addr) + / TCP(sport=host_in_port, dport=server_out_port) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2945,9 +3129,11 @@ class TestNAT44EI(MethodHolder): raise # send reply from server to host - p = (Ether(src=server.mac, dst=self.pg0.local_mac) / - IP(src=server.ip4, dst=self.nat_addr) / - TCP(sport=server_in_port, dport=host_out_port)) + p = ( + Ether(src=server.mac, dst=self.pg0.local_mac) + / IP(src=server.ip4, dst=self.nat_addr) + / TCP(sport=server_in_port, dport=host_out_port) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2966,7 +3152,7 @@ class TestNAT44EI(MethodHolder): raise def test_one_armed_nat44(self): - """ NAT44EI One armed NAT """ + """NAT44EI One armed NAT""" remote_host = self.pg9.remote_hosts[0] local_host = self.pg9.remote_hosts[1] external_port = 0 @@ -2974,16 +3160,18 @@ class TestNAT44EI(MethodHolder): self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg9.sw_if_index, - is_add=1) + sw_if_index=self.pg9.sw_if_index, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg9.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg9.sw_if_index, flags=flags, is_add=1 + ) # in2out - p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) / - IP(src=local_host.ip4, dst=remote_host.ip4) / - TCP(sport=12345, dport=80)) + p = ( + Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) + / IP(src=local_host.ip4, dst=remote_host.ip4) + / TCP(sport=12345, dport=80) + ) self.pg9.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -3003,9 +3191,11 @@ class TestNAT44EI(MethodHolder): raise # out2in - p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) / - IP(src=remote_host.ip4, dst=self.nat_addr) / - TCP(sport=80, dport=external_port)) + p = ( + Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) + / IP(src=remote_host.ip4, dst=self.nat_addr) + / TCP(sport=80, dport=external_port) + ) self.pg9.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -3028,21 +3218,21 @@ class TestNAT44EI(MethodHolder): else: node = "nat44-ei-classify" - err = self.statistics.get_err_counter('/err/%s/next in2out' % node) + err = self.statistics.get_err_counter("/err/%s/next in2out" % node) self.assertEqual(err, 1) - err = self.statistics.get_err_counter('/err/%s/next out2in' % node) + err = self.statistics.get_err_counter("/err/%s/next out2in" % node) self.assertEqual(err, 1) def test_del_session(self): - """ NAT44EI delete session """ + """NAT44EI delete session""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) pkts = self.create_stream_in(self.pg0, self.pg1) self.pg0.add_stream(pkts) @@ -3057,12 +3247,14 @@ class TestNAT44EI(MethodHolder): address=sessions[0].inside_ip_address, port=sessions[0].inside_port, protocol=sessions[0].protocol, - flags=self.config_flags.NAT44_EI_IF_INSIDE) + flags=self.config_flags.NAT44_EI_IF_INSIDE, + ) self.vapi.nat44_ei_del_session( address=sessions[1].outside_ip_address, port=sessions[1].outside_port, - protocol=sessions[1].protocol) + protocol=sessions[1].protocol, + ) sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0) self.assertEqual(nsessions - len(sessions), 2) @@ -3071,60 +3263,56 @@ class TestNAT44EI(MethodHolder): address=sessions[0].inside_ip_address, port=sessions[0].inside_port, protocol=sessions[0].protocol, - flags=self.config_flags.NAT44_EI_IF_INSIDE) + flags=self.config_flags.NAT44_EI_IF_INSIDE, + ) self.verify_no_nat44_user() def test_frag_in_order(self): - """ NAT44EI translate fragments arriving in order """ + """NAT44EI translate fragments arriving in order""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) self.frag_in_order(proto=IP_PROTOS.tcp) self.frag_in_order(proto=IP_PROTOS.udp) self.frag_in_order(proto=IP_PROTOS.icmp) def test_frag_forwarding(self): - """ NAT44EI forwarding fragment test """ + """NAT44EI forwarding fragment test""" self.vapi.nat44_ei_add_del_interface_addr( - is_add=1, - sw_if_index=self.pg1.sw_if_index) + is_add=1, sw_if_index=self.pg1.sw_if_index + ) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) self.vapi.nat44_ei_forwarding_enable_disable(enable=1) data = b"A" * 16 + b"B" * 16 + b"C" * 3 - pkts = self.create_stream_frag(self.pg1, - self.pg0.remote_ip4, - 4789, - 4789, - data, - proto=IP_PROTOS.udp) + pkts = self.create_stream_frag( + self.pg1, self.pg0.remote_ip4, 4789, 4789, data, proto=IP_PROTOS.udp + ) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg0.get_capture(len(pkts)) - p = self.reass_frags_and_verify(frags, - self.pg1.remote_ip4, - self.pg0.remote_ip4) + p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4) self.assertEqual(p[UDP].sport, 4789) self.assertEqual(p[UDP].dport, 4789) self.assertEqual(data, p[Raw].load) def test_reass_hairpinning(self): - """ NAT44EI fragments hairpinning """ + """NAT44EI fragments hairpinning""" server_addr = self.pg0.remote_hosts[1].ip4 host_in_port = random.randint(1025, 65535) @@ -3134,63 +3322,85 @@ class TestNAT44EI(MethodHolder): self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # add static mapping for server - self.nat44_add_static_mapping(server_addr, self.nat_addr, - server_in_port, - server_out_port, - proto=IP_PROTOS.tcp) - self.nat44_add_static_mapping(server_addr, self.nat_addr, - server_in_port, - server_out_port, - proto=IP_PROTOS.udp) + self.nat44_add_static_mapping( + server_addr, + self.nat_addr, + server_in_port, + server_out_port, + proto=IP_PROTOS.tcp, + ) + self.nat44_add_static_mapping( + server_addr, + self.nat_addr, + server_in_port, + server_out_port, + proto=IP_PROTOS.udp, + ) self.nat44_add_static_mapping(server_addr, self.nat_addr) - self.reass_hairpinning(server_addr, server_in_port, server_out_port, - host_in_port, proto=IP_PROTOS.tcp) - self.reass_hairpinning(server_addr, server_in_port, server_out_port, - host_in_port, proto=IP_PROTOS.udp) - self.reass_hairpinning(server_addr, server_in_port, server_out_port, - host_in_port, proto=IP_PROTOS.icmp) + self.reass_hairpinning( + server_addr, + server_in_port, + server_out_port, + host_in_port, + proto=IP_PROTOS.tcp, + ) + self.reass_hairpinning( + server_addr, + server_in_port, + server_out_port, + host_in_port, + proto=IP_PROTOS.udp, + ) + self.reass_hairpinning( + server_addr, + server_in_port, + server_out_port, + host_in_port, + proto=IP_PROTOS.icmp, + ) def test_frag_out_of_order(self): - """ NAT44EI translate fragments arriving out of order """ + """NAT44EI translate fragments arriving out of order""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) self.frag_out_of_order(proto=IP_PROTOS.tcp) self.frag_out_of_order(proto=IP_PROTOS.udp) self.frag_out_of_order(proto=IP_PROTOS.icmp) def test_port_restricted(self): - """ NAT44EI Port restricted NAT44EI (MAP-E CE) """ + """NAT44EI Port restricted NAT44EI (MAP-E CE)""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - self.vapi.nat44_ei_set_addr_and_port_alloc_alg(alg=1, - psid_offset=6, - psid_length=6, - psid=10) - - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=4567, dport=22)) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) + self.vapi.nat44_ei_set_addr_and_port_alloc_alg( + alg=1, psid_offset=6, psid_length=6, psid=10 + ) + + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=4567, dport=22) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -3210,24 +3420,26 @@ class TestNAT44EI(MethodHolder): raise def test_port_range(self): - """ NAT44EI External address port range """ + """NAT44EI External address port range""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - self.vapi.nat44_ei_set_addr_and_port_alloc_alg(alg=2, - start_port=1025, - end_port=1027) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) + self.vapi.nat44_ei_set_addr_and_port_alloc_alg( + alg=2, start_port=1025, end_port=1027 + ) pkts = [] for port in range(0, 5): - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=1125 + port)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=1125 + port) + ) pkts.append(p) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) @@ -3239,14 +3451,14 @@ class TestNAT44EI(MethodHolder): self.assertLessEqual(tcp.sport, 1027) def test_multiple_outside_vrf(self): - """ NAT44EI Multiple outside VRF """ + """NAT44EI Multiple outside VRF""" vrf_id1 = 1 vrf_id2 = 2 self.pg1.unconfig_ip4() self.pg2.unconfig_ip4() - self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id1}) - self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id2}) + self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id1}) + self.vapi.ip_table_add_del(is_add=1, table={"table_id": vrf_id2}) self.pg1.set_table_ip4(vrf_id1) self.pg2.set_table_ip4(vrf_id2) self.pg1.config_ip4() @@ -3257,14 +3469,14 @@ class TestNAT44EI(MethodHolder): self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg2.sw_if_index, - is_add=1) + sw_if_index=self.pg2.sw_if_index, is_add=1 + ) try: # first VRF @@ -3313,20 +3525,26 @@ class TestNAT44EI(MethodHolder): self.pg2.resolve_arp() def test_mss_clamping(self): - """ NAT44EI TCP MSS clamping """ + """NAT44EI TCP MSS clamping""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, - flags="S", options=[('MSS', 1400)])) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) + + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP( + sport=self.tcp_port_in, + dport=self.tcp_external_port, + flags="S", + options=[("MSS", 1400)], + ) + ) self.vapi.nat44_ei_set_mss_clamping(enable=1, mss_value=1000) self.pg0.add_stream(p) @@ -3353,21 +3571,22 @@ class TestNAT44EI(MethodHolder): self.verify_mss_value(capture[0], 1400) def test_ha_send(self): - """ NAT44EI Send HA session synchronization events (active) """ + """NAT44EI Send HA session synchronization events (active)""" flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) self.nat44_add_address(self.nat_addr) self.vapi.nat44_ei_ha_set_listener( - ip_address=self.pg3.local_ip4, port=12345, path_mtu=512) + ip_address=self.pg3.local_ip4, port=12345, path_mtu=512 + ) self.vapi.nat44_ei_ha_set_failover( - ip_address=self.pg3.remote_ip4, port=12346, - session_refresh_interval=10) + ip_address=self.pg3.remote_ip4, port=12346, session_refresh_interval=10 + ) bind_layers(UDP, HANATStateSync, sport=12345) # create sessions @@ -3379,7 +3598,7 @@ class TestNAT44EI(MethodHolder): self.verify_capture_out(capture) # active send HA events self.vapi.nat44_ei_ha_flush() - stats = self.statistics['/nat44-ei/ha/add-event-send'] + stats = self.statistics["/nat44-ei/ha/add-event-send"] self.assertEqual(stats[:, 0].sum(), 3) capture = self.pg3.get_capture(1) p = capture[0] @@ -3407,23 +3626,29 @@ class TestNAT44EI(MethodHolder): self.assertEqual(event.fib_index, 0) # ACK received events - ack = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) / - IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) / - UDP(sport=12346, dport=12345) / - HANATStateSync(sequence_number=seq, flags='ACK', - thread_index=hanat.thread_index)) + ack = ( + Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) + / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) + / UDP(sport=12346, dport=12345) + / HANATStateSync( + sequence_number=seq, flags="ACK", thread_index=hanat.thread_index + ) + ) self.pg3.add_stream(ack) self.pg_start() - stats = self.statistics['/nat44-ei/ha/ack-recv'] + stats = self.statistics["/nat44-ei/ha/ack-recv"] self.assertEqual(stats[:, 0].sum(), 1) # delete one session self.pg_enable_capture(self.pg_interfaces) self.vapi.nat44_ei_del_session( - address=self.pg0.remote_ip4, port=self.tcp_port_in, - protocol=IP_PROTOS.tcp, flags=self.config_flags.NAT44_EI_IF_INSIDE) + address=self.pg0.remote_ip4, + port=self.tcp_port_in, + protocol=IP_PROTOS.tcp, + flags=self.config_flags.NAT44_EI_IF_INSIDE, + ) self.vapi.nat44_ei_ha_flush() - stats = self.statistics['/nat44-ei/ha/del-event-send'] + stats = self.statistics["/nat44-ei/ha/del-event-send"] self.assertEqual(stats[:, 0].sum(), 1) capture = self.pg3.get_capture(1) p = capture[0] @@ -3438,9 +3663,9 @@ class TestNAT44EI(MethodHolder): # do not send ACK, active retry send HA event again self.pg_enable_capture(self.pg_interfaces) self.virtual_sleep(12) - stats = self.statistics['/nat44-ei/ha/retry-count'] + stats = self.statistics["/nat44-ei/ha/retry-count"] self.assertEqual(stats[:, 0].sum(), 3) - stats = self.statistics['/nat44-ei/ha/missed-count'] + stats = self.statistics["/nat44-ei/ha/missed-count"] self.assertEqual(stats[:, 0].sum(), 1) capture = self.pg3.get_capture(3) for packet in capture: @@ -3453,7 +3678,7 @@ class TestNAT44EI(MethodHolder): self.pg_start() self.pg0.get_capture(2) self.vapi.nat44_ei_ha_flush() - stats = self.statistics['/nat44-ei/ha/refresh-event-send'] + stats = self.statistics["/nat44-ei/ha/refresh-event-send"] self.assertEqual(stats[:, 0].sum(), 2) capture = self.pg3.get_capture(1) p = capture[0] @@ -3480,29 +3705,33 @@ class TestNAT44EI(MethodHolder): self.assertEqual(event.total_pkts, 2) self.assertGreater(event.total_bytes, 0) - stats = self.statistics['/nat44-ei/ha/ack-recv'] - ack = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) / - IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) / - UDP(sport=12346, dport=12345) / - HANATStateSync(sequence_number=seq, flags='ACK', - thread_index=hanat.thread_index)) + stats = self.statistics["/nat44-ei/ha/ack-recv"] + ack = ( + Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) + / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) + / UDP(sport=12346, dport=12345) + / HANATStateSync( + sequence_number=seq, flags="ACK", thread_index=hanat.thread_index + ) + ) self.pg3.add_stream(ack) self.pg_start() - stats = self.statistics['/nat44-ei/ha/ack-recv'] + stats = self.statistics["/nat44-ei/ha/ack-recv"] self.assertEqual(stats[:, 0].sum(), 2) def test_ha_recv(self): - """ NAT44EI Receive HA session synchronization events (passive) """ + """NAT44EI Receive HA session synchronization events (passive)""" self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) - self.vapi.nat44_ei_ha_set_listener(ip_address=self.pg3.local_ip4, - port=12345, path_mtu=512) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) + self.vapi.nat44_ei_ha_set_listener( + ip_address=self.pg3.local_ip4, port=12345, path_mtu=512 + ) bind_layers(UDP, HANATStateSync, sport=12345) # this is a bit tricky - HA dictates thread index due to how it's @@ -3512,11 +3741,12 @@ class TestNAT44EI(MethodHolder): # IP address) and out2in (based on outside port) # first choose a thread index which is correct for IP - thread_index = get_nat44_ei_in2out_worker_index(self.pg0.remote_ip4, - self.vpp_worker_count) + thread_index = get_nat44_ei_in2out_worker_index( + self.pg0.remote_ip4, self.vpp_worker_count + ) # now pick a port which is correct for given thread - port_per_thread = int((0xffff-1024) / max(1, self.vpp_worker_count)) + port_per_thread = int((0xFFFF - 1024) / max(1, self.vpp_worker_count)) self.tcp_port_out = 1024 + random.randint(1, port_per_thread) self.udp_port_out = 1024 + random.randint(1, port_per_thread) if self.vpp_worker_count > 0: @@ -3524,25 +3754,43 @@ class TestNAT44EI(MethodHolder): self.udp_port_out += port_per_thread * (thread_index - 1) # send HA session add events to failover/passive - p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) / - IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) / - UDP(sport=12346, dport=12345) / - HANATStateSync(sequence_number=1, events=[ - Event(event_type='add', protocol='tcp', - in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr, - in_port=self.tcp_port_in, out_port=self.tcp_port_out, - eh_addr=self.pg1.remote_ip4, - ehn_addr=self.pg1.remote_ip4, - eh_port=self.tcp_external_port, - ehn_port=self.tcp_external_port, fib_index=0), - Event(event_type='add', protocol='udp', - in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr, - in_port=self.udp_port_in, out_port=self.udp_port_out, - eh_addr=self.pg1.remote_ip4, - ehn_addr=self.pg1.remote_ip4, - eh_port=self.udp_external_port, - ehn_port=self.udp_external_port, fib_index=0)], - thread_index=thread_index)) + p = ( + Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) + / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) + / UDP(sport=12346, dport=12345) + / HANATStateSync( + sequence_number=1, + events=[ + Event( + event_type="add", + protocol="tcp", + in_addr=self.pg0.remote_ip4, + out_addr=self.nat_addr, + in_port=self.tcp_port_in, + out_port=self.tcp_port_out, + eh_addr=self.pg1.remote_ip4, + ehn_addr=self.pg1.remote_ip4, + eh_port=self.tcp_external_port, + ehn_port=self.tcp_external_port, + fib_index=0, + ), + Event( + event_type="add", + protocol="udp", + in_addr=self.pg0.remote_ip4, + out_addr=self.nat_addr, + in_port=self.udp_port_in, + out_port=self.udp_port_out, + eh_addr=self.pg1.remote_ip4, + ehn_addr=self.pg1.remote_ip4, + eh_port=self.udp_external_port, + ehn_port=self.udp_external_port, + fib_index=0, + ), + ], + thread_index=thread_index, + ) + ) self.pg3.add_stream(p) self.pg_enable_capture(self.pg_interfaces) @@ -3557,49 +3805,57 @@ class TestNAT44EI(MethodHolder): raise else: self.assertEqual(hanat.sequence_number, 1) - self.assertEqual(hanat.flags, 'ACK') + self.assertEqual(hanat.flags, "ACK") self.assertEqual(hanat.version, 1) self.assertEqual(hanat.thread_index, thread_index) - stats = self.statistics['/nat44-ei/ha/ack-send'] + stats = self.statistics["/nat44-ei/ha/ack-send"] self.assertEqual(stats[:, 0].sum(), 1) - stats = self.statistics['/nat44-ei/ha/add-event-recv'] + stats = self.statistics["/nat44-ei/ha/add-event-recv"] self.assertEqual(stats[:, 0].sum(), 2) - users = self.statistics['/nat44-ei/total-users'] + users = self.statistics["/nat44-ei/total-users"] self.assertEqual(users[:, 0].sum(), 1) - sessions = self.statistics['/nat44-ei/total-sessions'] + sessions = self.statistics["/nat44-ei/total-sessions"] self.assertEqual(sessions[:, 0].sum(), 2) users = self.vapi.nat44_ei_user_dump() self.assertEqual(len(users), 1) - self.assertEqual(str(users[0].ip_address), - self.pg0.remote_ip4) + self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4) # there should be 2 sessions created by HA sessions = self.vapi.nat44_ei_user_session_dump( - users[0].ip_address, users[0].vrf_id) + users[0].ip_address, users[0].vrf_id + ) self.assertEqual(len(sessions), 2) for session in sessions: - self.assertEqual(str(session.inside_ip_address), - self.pg0.remote_ip4) - self.assertEqual(str(session.outside_ip_address), - self.nat_addr) - self.assertIn(session.inside_port, - [self.tcp_port_in, self.udp_port_in]) - self.assertIn(session.outside_port, - [self.tcp_port_out, self.udp_port_out]) + self.assertEqual(str(session.inside_ip_address), self.pg0.remote_ip4) + self.assertEqual(str(session.outside_ip_address), self.nat_addr) + self.assertIn(session.inside_port, [self.tcp_port_in, self.udp_port_in]) + self.assertIn(session.outside_port, [self.tcp_port_out, self.udp_port_out]) self.assertIn(session.protocol, [IP_PROTOS.tcp, IP_PROTOS.udp]) # send HA session delete event to failover/passive - p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) / - IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) / - UDP(sport=12346, dport=12345) / - HANATStateSync(sequence_number=2, events=[ - Event(event_type='del', protocol='udp', - in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr, - in_port=self.udp_port_in, out_port=self.udp_port_out, - eh_addr=self.pg1.remote_ip4, - ehn_addr=self.pg1.remote_ip4, - eh_port=self.udp_external_port, - ehn_port=self.udp_external_port, fib_index=0)], - thread_index=thread_index)) + p = ( + Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) + / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) + / UDP(sport=12346, dport=12345) + / HANATStateSync( + sequence_number=2, + events=[ + Event( + event_type="del", + protocol="udp", + in_addr=self.pg0.remote_ip4, + out_addr=self.nat_addr, + in_port=self.udp_port_in, + out_port=self.udp_port_out, + eh_addr=self.pg1.remote_ip4, + ehn_addr=self.pg1.remote_ip4, + eh_port=self.udp_external_port, + ehn_port=self.udp_external_port, + fib_index=0, + ) + ], + thread_index=thread_index, + ) + ) self.pg3.add_stream(p) self.pg_enable_capture(self.pg_interfaces) @@ -3614,37 +3870,49 @@ class TestNAT44EI(MethodHolder): raise else: self.assertEqual(hanat.sequence_number, 2) - self.assertEqual(hanat.flags, 'ACK') + self.assertEqual(hanat.flags, "ACK") self.assertEqual(hanat.version, 1) users = self.vapi.nat44_ei_user_dump() self.assertEqual(len(users), 1) - self.assertEqual(str(users[0].ip_address), - self.pg0.remote_ip4) + self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4) # now we should have only 1 session, 1 deleted by HA - sessions = self.vapi.nat44_ei_user_session_dump(users[0].ip_address, - users[0].vrf_id) + sessions = self.vapi.nat44_ei_user_session_dump( + users[0].ip_address, users[0].vrf_id + ) self.assertEqual(len(sessions), 1) - stats = self.statistics['/nat44-ei/ha/del-event-recv'] + stats = self.statistics["/nat44-ei/ha/del-event-recv"] self.assertEqual(stats[:, 0].sum(), 1) - stats = self.statistics.get_err_counter( - '/err/nat44-ei-ha/pkts-processed') + stats = self.statistics.get_err_counter("/err/nat44-ei-ha/pkts-processed") self.assertEqual(stats, 2) # send HA session refresh event to failover/passive - p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) / - IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) / - UDP(sport=12346, dport=12345) / - HANATStateSync(sequence_number=3, events=[ - Event(event_type='refresh', protocol='tcp', - in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr, - in_port=self.tcp_port_in, out_port=self.tcp_port_out, - eh_addr=self.pg1.remote_ip4, - ehn_addr=self.pg1.remote_ip4, - eh_port=self.tcp_external_port, - ehn_port=self.tcp_external_port, fib_index=0, - total_bytes=1024, total_pkts=2)], - thread_index=thread_index)) + p = ( + Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) + / IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) + / UDP(sport=12346, dport=12345) + / HANATStateSync( + sequence_number=3, + events=[ + Event( + event_type="refresh", + protocol="tcp", + in_addr=self.pg0.remote_ip4, + out_addr=self.nat_addr, + in_port=self.tcp_port_in, + out_port=self.tcp_port_out, + eh_addr=self.pg1.remote_ip4, + ehn_addr=self.pg1.remote_ip4, + eh_port=self.tcp_external_port, + ehn_port=self.tcp_external_port, + fib_index=0, + total_bytes=1024, + total_pkts=2, + ) + ], + thread_index=thread_index, + ) + ) self.pg3.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -3658,29 +3926,30 @@ class TestNAT44EI(MethodHolder): raise else: self.assertEqual(hanat.sequence_number, 3) - self.assertEqual(hanat.flags, 'ACK') + self.assertEqual(hanat.flags, "ACK") self.assertEqual(hanat.version, 1) users = self.vapi.nat44_ei_user_dump() self.assertEqual(len(users), 1) - self.assertEqual(str(users[0].ip_address), - self.pg0.remote_ip4) + self.assertEqual(str(users[0].ip_address), self.pg0.remote_ip4) sessions = self.vapi.nat44_ei_user_session_dump( - users[0].ip_address, users[0].vrf_id) + users[0].ip_address, users[0].vrf_id + ) self.assertEqual(len(sessions), 1) session = sessions[0] self.assertEqual(session.total_bytes, 1024) self.assertEqual(session.total_pkts, 2) - stats = self.statistics['/nat44-ei/ha/refresh-event-recv'] + stats = self.statistics["/nat44-ei/ha/refresh-event-recv"] self.assertEqual(stats[:, 0].sum(), 1) - stats = self.statistics.get_err_counter( - '/err/nat44-ei-ha/pkts-processed') + stats = self.statistics.get_err_counter("/err/nat44-ei-ha/pkts-processed") self.assertEqual(stats, 3) # send packet to test session created by HA - p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=self.tcp_external_port, dport=self.tcp_port_out)) + p = ( + Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out) + ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -3706,7 +3975,7 @@ class TestNAT44EI(MethodHolder): return self.vapi.nat44_ei_show_fq_options().frame_queue_nelts def test_set_frame_queue_nelts(self): - """ NAT44EI API test - worker handoff frame queue elements """ + """NAT44EI API test - worker handoff frame queue elements""" self.assertEqual(self.reconfigure_frame_queue_nelts(512), 512) def show_commands_at_teardown(self): @@ -3718,11 +3987,10 @@ class TestNAT44EI(MethodHolder): self.logger.info(self.vapi.cli("show nat44 ei sessions detail")) self.logger.info(self.vapi.cli("show nat44 ei hash tables detail")) self.logger.info(self.vapi.cli("show nat44 ei ha")) - self.logger.info( - self.vapi.cli("show nat44 ei addr-port-assignment-alg")) + self.logger.info(self.vapi.cli("show nat44 ei addr-port-assignment-alg")) def test_outside_address_distribution(self): - """ Outside address distribution based on source address """ + """Outside address distribution based on source address""" x = 100 nat_addresses = [] @@ -3733,16 +4001,18 @@ class TestNAT44EI(MethodHolder): flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) self.vapi.nat44_ei_add_del_address_range( first_ip_address=nat_addresses[0], last_ip_address=nat_addresses[-1], - vrf_id=0xFFFFFFFF, is_add=1) + vrf_id=0xFFFFFFFF, + is_add=1, + ) self.pg0.generate_remote_hosts(x) @@ -3750,11 +4020,12 @@ class TestNAT44EI(MethodHolder): for i in range(x): info = self.create_packet_info(self.pg0, self.pg1) payload = self.info_to_payload(info) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_hosts[i].ip4, - dst=self.pg1.remote_ip4) / - UDP(sport=7000+i, dport=8000+i) / - Raw(payload)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_hosts[i].ip4, dst=self.pg1.remote_ip4) + / UDP(sport=7000 + i, dport=8000 + i) + / Raw(payload) + ) info.data = p pkts.append(p) @@ -3772,21 +4043,23 @@ class TestNAT44EI(MethodHolder): packed = socket.inet_aton(p_sent[IP].src) numeric = struct.unpack("!L", packed)[0] numeric = socket.htonl(numeric) - a = nat_addresses[(numeric-1) % len(nat_addresses)] + a = nat_addresses[(numeric - 1) % len(nat_addresses)] self.assertEqual( - a, p_recvd[IP].src, + a, + p_recvd[IP].src, "Invalid packet (src IP %s translated to %s, but expected %s)" - % (p_sent[IP].src, p_recvd[IP].src, a)) + % (p_sent[IP].src, p_recvd[IP].src, a), + ) def test_default_user_sessions(self): - """ NAT44EI default per-user session limit is used and reported """ + """NAT44EI default per-user session limit is used and reported""" nat44_ei_config = self.vapi.nat44_ei_show_running_config() # a nonzero default should be reported for user_sessions self.assertNotEqual(nat44_ei_config.user_sessions, 0) class TestNAT44Out2InDPO(MethodHolder): - """ NAT44EI Test Cases using out2in DPO """ + """NAT44EI Test Cases using out2in DPO""" @classmethod def setUpClass(cls): @@ -3799,8 +4072,8 @@ class TestNAT44Out2InDPO(MethodHolder): cls.udp_port_out = 6304 cls.icmp_id_in = 6305 cls.icmp_id_out = 6305 - cls.nat_addr = '10.0.0.3' - cls.dst_ip4 = '192.168.70.1' + cls.nat_addr = "10.0.0.3" + cls.dst_ip4 = "192.168.70.1" cls.create_pg_interfaces(range(2)) @@ -3812,10 +4085,13 @@ class TestNAT44Out2InDPO(MethodHolder): cls.pg1.config_ip6() cls.pg1.resolve_ndp() - r1 = VppIpRoute(cls, "::", 0, - [VppRoutePath(cls.pg1.remote_ip6, - cls.pg1.sw_if_index)], - register=False) + r1 = VppIpRoute( + cls, + "::", + 0, + [VppRoutePath(cls.pg1.remote_ip6, cls.pg1.sw_if_index)], + register=False, + ) r1.add_vpp_config() def setUp(self): @@ -3830,37 +4106,44 @@ class TestNAT44Out2InDPO(MethodHolder): self.vapi.cli("clear logging") def configure_xlat(self): - self.dst_ip6_pfx = '1:2:3::' - self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, - self.dst_ip6_pfx) + self.dst_ip6_pfx = "1:2:3::" + self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, self.dst_ip6_pfx) self.dst_ip6_pfx_len = 96 - self.src_ip6_pfx = '4:5:6::' - self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, - self.src_ip6_pfx) + self.src_ip6_pfx = "4:5:6::" + self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6, self.src_ip6_pfx) self.src_ip6_pfx_len = 96 - self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len, - self.src_ip6_pfx_n, self.src_ip6_pfx_len, - '\x00\x00\x00\x00', 0) - - @unittest.skip('Temporary disabled') + self.vapi.map_add_domain( + self.dst_ip6_pfx_n, + self.dst_ip6_pfx_len, + self.src_ip6_pfx_n, + self.src_ip6_pfx_len, + "\x00\x00\x00\x00", + 0, + ) + + @unittest.skip("Temporary disabled") def test_464xlat_ce(self): - """ Test 464XLAT CE with NAT44EI """ + """Test 464XLAT CE with NAT44EI""" self.configure_xlat() flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_add_del_address_range( first_ip_address=self.nat_addr_n, last_ip_address=self.nat_addr_n, - vrf_id=0xFFFFFFFF, is_add=1) + vrf_id=0xFFFFFFFF, + is_add=1, + ) - out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx, - self.dst_ip6_pfx_len) - out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx, - self.src_ip6_pfx_len) + out_src_ip6 = self.compose_ip6( + self.dst_ip4, self.dst_ip6_pfx, self.dst_ip6_pfx_len + ) + out_dst_ip6 = self.compose_ip6( + self.nat_addr, self.src_ip6_pfx, self.src_ip6_pfx_len + ) try: pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4) @@ -3868,11 +4151,9 @@ class TestNAT44Out2InDPO(MethodHolder): self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) - self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6, - dst_ip=out_src_ip6) + self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6, dst_ip=out_src_ip6) - pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, - out_dst_ip6) + pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -3880,31 +4161,35 @@ class TestNAT44Out2InDPO(MethodHolder): self.verify_capture_in(capture, self.pg0) finally: self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags) + sw_if_index=self.pg0.sw_if_index, flags=flags + ) self.vapi.nat44_ei_add_del_address_range( first_ip_address=self.nat_addr_n, last_ip_address=self.nat_addr_n, - vrf_id=0xFFFFFFFF) + vrf_id=0xFFFFFFFF, + ) - @unittest.skip('Temporary disabled') + @unittest.skip("Temporary disabled") def test_464xlat_ce_no_nat(self): - """ Test 464XLAT CE without NAT44EI """ + """Test 464XLAT CE without NAT44EI""" self.configure_xlat() - out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx, - self.dst_ip6_pfx_len) - out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx, - self.src_ip6_pfx_len) + out_src_ip6 = self.compose_ip6( + self.dst_ip4, self.dst_ip6_pfx, self.dst_ip6_pfx_len + ) + out_dst_ip6 = self.compose_ip6( + self.pg0.remote_ip4, self.src_ip6_pfx, self.src_ip6_pfx_len + ) pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) - self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6, - nat_ip=out_dst_ip6, same_port=True) + self.verify_capture_out_ip6( + capture, dst_ip=out_src_ip6, nat_ip=out_dst_ip6, same_port=True + ) pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6) self.pg1.add_stream(pkts) @@ -3915,7 +4200,8 @@ class TestNAT44Out2InDPO(MethodHolder): class TestNAT44EIMW(MethodHolder): - """ NAT44EI Test Cases (multiple workers) """ + """NAT44EI Test Cases (multiple workers)""" + vpp_worker_count = 2 max_translations = 10240 max_users = 10240 @@ -3931,7 +4217,7 @@ class TestNAT44EIMW(MethodHolder): cls.udp_port_out = 6304 cls.icmp_id_in = 6305 cls.icmp_id_out = 6305 - cls.nat_addr = '10.0.0.3' + cls.nat_addr = "10.0.0.3" cls.ipfix_src_port = 4739 cls.ipfix_domain_id = 1 cls.tcp_external_port = 80 @@ -3952,8 +4238,8 @@ class TestNAT44EIMW(MethodHolder): cls.pg1.configure_ipv4_neighbors() cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7])) - cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 10}) - cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 20}) + cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 10}) + cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 20}) cls.pg4._local_ip4 = "172.16.255.1" cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2" @@ -3975,8 +4261,8 @@ class TestNAT44EIMW(MethodHolder): cls.pg9.generate_remote_hosts(2) cls.pg9.config_ip4() cls.vapi.sw_interface_add_del_address( - sw_if_index=cls.pg9.sw_if_index, - prefix="10.0.0.1/24") + sw_if_index=cls.pg9.sw_if_index, prefix="10.0.0.1/24" + ) cls.pg9.admin_up() cls.pg9.resolve_arp() @@ -3987,16 +4273,15 @@ class TestNAT44EIMW(MethodHolder): def setUp(self): super(TestNAT44EIMW, self).setUp() self.vapi.nat44_ei_plugin_enable_disable( - sessions=self.max_translations, - users=self.max_users, enable=1) + sessions=self.max_translations, users=self.max_users, enable=1 + ) def tearDown(self): super(TestNAT44EIMW, self).tearDown() if not self.vpp_dead: self.vapi.nat44_ei_ipfix_enable_disable( - domain_id=self.ipfix_domain_id, - src_port=self.ipfix_src_port, - enable=0) + domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port, enable=0 + ) self.ipfix_src_port = 4739 self.ipfix_domain_id = 1 @@ -4004,7 +4289,7 @@ class TestNAT44EIMW(MethodHolder): self.vapi.cli("clear logging") def test_hairpinning(self): - """ NAT44EI hairpinning - 1:1 NAPT """ + """NAT44EI hairpinning - 1:1 NAPT""" host = self.pg0.remote_hosts[0] server = self.pg0.remote_hosts[1] @@ -4018,22 +4303,28 @@ class TestNAT44EIMW(MethodHolder): self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # add static mapping for server - self.nat44_add_static_mapping(server.ip4, self.nat_addr, - server_in_port, server_out_port, - proto=IP_PROTOS.tcp) - - cnt = self.statistics['/nat44-ei/hairpinning'] + self.nat44_add_static_mapping( + server.ip4, + self.nat_addr, + server_in_port, + server_out_port, + proto=IP_PROTOS.tcp, + ) + + cnt = self.statistics["/nat44-ei/hairpinning"] # send packet from host to server - p = (Ether(src=host.mac, dst=self.pg0.local_mac) / - IP(src=host.ip4, dst=self.nat_addr) / - TCP(sport=host_in_port, dport=server_out_port)) + p = ( + Ether(src=host.mac, dst=self.pg0.local_mac) + / IP(src=host.ip4, dst=self.nat_addr) + / TCP(sport=host_in_port, dport=server_out_port) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -4052,15 +4343,17 @@ class TestNAT44EIMW(MethodHolder): self.logger.error(ppp("Unexpected or invalid packet:", p)) raise - after = self.statistics['/nat44-ei/hairpinning'] + after = self.statistics["/nat44-ei/hairpinning"] if_idx = self.pg0.sw_if_index self.assertEqual(after[worker_2][if_idx] - cnt[worker_1][if_idx], 1) # send reply from server to host - p = (Ether(src=server.mac, dst=self.pg0.local_mac) / - IP(src=server.ip4, dst=self.nat_addr) / - TCP(sport=server_in_port, dport=host_out_port)) + p = ( + Ether(src=server.mac, dst=self.pg0.local_mac) + / IP(src=server.ip4, dst=self.nat_addr) + / TCP(sport=server_in_port, dport=host_out_port) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -4078,13 +4371,13 @@ class TestNAT44EIMW(MethodHolder): self.logger.error(ppp("Unexpected or invalid packet:", p)) raise - after = self.statistics['/nat44-ei/hairpinning'] + after = self.statistics["/nat44-ei/hairpinning"] if_idx = self.pg0.sw_if_index self.assertEqual(after[worker_1][if_idx] - cnt[worker_1][if_idx], 1) self.assertEqual(after[worker_2][if_idx] - cnt[worker_2][if_idx], 2) def test_hairpinning2(self): - """ NAT44EI hairpinning - 1:1 NAT""" + """NAT44EI hairpinning - 1:1 NAT""" server1_nat_ip = "10.0.0.10" server2_nat_ip = "10.0.0.11" @@ -4097,11 +4390,11 @@ class TestNAT44EIMW(MethodHolder): self.nat44_add_address(self.nat_addr) flags = self.config_flags.NAT44_EI_IF_INSIDE self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ei_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # add static mapping for servers self.nat44_add_static_mapping(server1.ip4, server1_nat_ip) @@ -4109,17 +4402,23 @@ class TestNAT44EIMW(MethodHolder): # host to server1 pkts = [] - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=host.ip4, dst=server1_nat_ip) / - TCP(sport=self.tcp_port_in, dport=server_tcp_port)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=host.ip4, dst=server1_nat_ip) + / TCP(sport=self.tcp_port_in, dport=server_tcp_port) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=host.ip4, dst=server1_nat_ip) / - UDP(sport=self.udp_port_in, dport=server_udp_port)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=host.ip4, dst=server1_nat_ip) + / UDP(sport=self.udp_port_in, dport=server_udp_port) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=host.ip4, dst=server1_nat_ip) / - ICMP(id=self.icmp_id_in, type='echo-request')) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=host.ip4, dst=server1_nat_ip) + / ICMP(id=self.icmp_id_in, type="echo-request") + ) pkts.append(p) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) @@ -4147,17 +4446,23 @@ class TestNAT44EIMW(MethodHolder): # server1 to host pkts = [] - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server1.ip4, dst=self.nat_addr) / - TCP(sport=server_tcp_port, dport=self.tcp_port_out)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server1.ip4, dst=self.nat_addr) + / TCP(sport=server_tcp_port, dport=self.tcp_port_out) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server1.ip4, dst=self.nat_addr) / - UDP(sport=server_udp_port, dport=self.udp_port_out)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server1.ip4, dst=self.nat_addr) + / UDP(sport=server_udp_port, dport=self.udp_port_out) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server1.ip4, dst=self.nat_addr) / - ICMP(id=self.icmp_id_out, type='echo-reply')) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server1.ip4, dst=self.nat_addr) + / ICMP(id=self.icmp_id_out, type="echo-reply") + ) pkts.append(p) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) @@ -4182,17 +4487,23 @@ class TestNAT44EIMW(MethodHolder): # server2 to server1 pkts = [] - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server2.ip4, dst=server1_nat_ip) / - TCP(sport=self.tcp_port_in, dport=server_tcp_port)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server2.ip4, dst=server1_nat_ip) + / TCP(sport=self.tcp_port_in, dport=server_tcp_port) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server2.ip4, dst=server1_nat_ip) / - UDP(sport=self.udp_port_in, dport=server_udp_port)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server2.ip4, dst=server1_nat_ip) + / UDP(sport=self.udp_port_in, dport=server_udp_port) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server2.ip4, dst=server1_nat_ip) / - ICMP(id=self.icmp_id_in, type='echo-request')) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server2.ip4, dst=server1_nat_ip) + / ICMP(id=self.icmp_id_in, type="echo-request") + ) pkts.append(p) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) @@ -4220,17 +4531,23 @@ class TestNAT44EIMW(MethodHolder): # server1 to server2 pkts = [] - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server1.ip4, dst=server2_nat_ip) / - TCP(sport=server_tcp_port, dport=self.tcp_port_out)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server1.ip4, dst=server2_nat_ip) + / TCP(sport=server_tcp_port, dport=self.tcp_port_out) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server1.ip4, dst=server2_nat_ip) / - UDP(sport=server_udp_port, dport=self.udp_port_out)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server1.ip4, dst=server2_nat_ip) + / UDP(sport=server_udp_port, dport=self.udp_port_out) + ) pkts.append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=server1.ip4, dst=server2_nat_ip) / - ICMP(id=self.icmp_id_out, type='echo-reply')) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=server1.ip4, dst=server2_nat_ip) + / ICMP(id=self.icmp_id_out, type="echo-reply") + ) pkts.append(p) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) @@ -4254,5 +4571,5 @@ class TestNAT44EIMW(MethodHolder): raise -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner) -- cgit 1.2.3-korg