diff options
author | Klement Sekera <klement.sekera@gmail.com> | 2022-04-26 19:02:15 +0200 |
---|---|---|
committer | Ole Tr�an <otroan@employees.org> | 2022-05-10 18:52:08 +0000 |
commit | d9b0c6fbf7aa5bd9af84264105b39c82028a4a29 (patch) | |
tree | 4f786cfd8ebc2443cb11e11b74c8657204068898 /test/test_nat44_ed.py | |
parent | f90348bcb4afd0af2611cefc43b17ef3042b511c (diff) |
tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is
much faster, stable PEP8 compliant code style checker offering also
automatic formatting. It aims to be very stable and produce smallest
diffs. It's used by many small and big projects.
Running checkstyle with black takes a few seconds with a terse output.
Thus, test-checkstyle-diff is no longer necessary.
Expand scope of checkstyle to all python files in the repo, replacing
test-checkstyle with checkstyle-python.
Also, fixstyle-python is now available for automatic style formatting.
Note: python virtualenv has been consolidated in test/Makefile,
test/requirements*.txt which will eventually be moved to a central
location. This is required to simply the automated generation of
docker executor images in the CI.
Type: improvement
Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8
Signed-off-by: Klement Sekera <klement.sekera@gmail.com>
Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'test/test_nat44_ed.py')
-rw-r--r-- | test/test_nat44_ed.py | 2902 |
1 files changed, 1657 insertions, 1245 deletions
diff --git a/test/test_nat44_ed.py b/test/test_nat44_ed.py index 21eebb22de4..77459874c09 100644 --- a/test/test_nat44_ed.py +++ b/test/test_nat44_ed.py @@ -21,9 +21,9 @@ from util import StatsDiff class TestNAT44ED(VppTestCase): - """ NAT44ED Test Case """ + """NAT44ED Test Case""" - nat_addr = '10.0.10.3' + nat_addr = "10.0.10.3" tcp_port_in = 6303 tcp_port_out = 6303 @@ -48,8 +48,7 @@ class TestNAT44ED(VppTestCase): self.plugin_disable() def plugin_enable(self): - self.vapi.nat44_ed_plugin_enable_disable( - sessions=self.max_sessions, enable=1) + self.vapi.nat44_ed_plugin_enable_disable(sessions=self.max_sessions, enable=1) def plugin_disable(self): self.vapi.nat44_ed_plugin_enable_disable(enable=0) @@ -87,7 +86,7 @@ class TestNAT44ED(VppTestCase): @classmethod def create_and_add_ip4_table(cls, i, table_id=0): - cls.vapi.ip_table_add_del(is_add=1, table={'table_id': table_id}) + cls.vapi.ip_table_add_del(is_add=1, table={"table_id": table_id}) i.set_table_ip4(table_id) @classmethod @@ -105,32 +104,41 @@ class TestNAT44ED(VppTestCase): @classmethod def nat_add_interface_address(cls, i): - cls.vapi.nat44_add_del_interface_addr( - sw_if_index=i.sw_if_index, is_add=1) + cls.vapi.nat44_add_del_interface_addr(sw_if_index=i.sw_if_index, is_add=1) def nat_add_inside_interface(self, i): self.vapi.nat44_interface_add_del_feature( - flags=self.config_flags.NAT_IS_INSIDE, - sw_if_index=i.sw_if_index, is_add=1) + flags=self.config_flags.NAT_IS_INSIDE, sw_if_index=i.sw_if_index, is_add=1 + ) def nat_add_outside_interface(self, i): self.vapi.nat44_interface_add_del_feature( - flags=self.config_flags.NAT_IS_OUTSIDE, - sw_if_index=i.sw_if_index, is_add=1) + flags=self.config_flags.NAT_IS_OUTSIDE, sw_if_index=i.sw_if_index, is_add=1 + ) - def nat_add_address(self, address, twice_nat=0, - vrf_id=0xFFFFFFFF, is_add=1): + def nat_add_address(self, address, twice_nat=0, vrf_id=0xFFFFFFFF, is_add=1): flags = self.config_flags.NAT_IS_TWICE_NAT if twice_nat else 0 - self.vapi.nat44_add_del_address_range(first_ip_address=address, - last_ip_address=address, - vrf_id=vrf_id, - is_add=is_add, - flags=flags) + self.vapi.nat44_add_del_address_range( + first_ip_address=address, + last_ip_address=address, + vrf_id=vrf_id, + is_add=is_add, + flags=flags, + ) - def nat_add_static_mapping(self, local_ip, external_ip='0.0.0.0', - local_port=0, external_port=0, vrf_id=0, - is_add=1, external_sw_if_index=0xFFFFFFFF, - proto=0, tag="", flags=0): + def nat_add_static_mapping( + self, + local_ip, + external_ip="0.0.0.0", + local_port=0, + external_port=0, + vrf_id=0, + is_add=1, + external_sw_if_index=0xFFFFFFFF, + proto=0, + tag="", + flags=0, + ): if not (local_port and external_port): flags |= self.config_flags.NAT_IS_ADDR_ONLY @@ -142,9 +150,11 @@ class TestNAT44ED(VppTestCase): external_sw_if_index=external_sw_if_index, local_port=local_port, external_port=external_port, - vrf_id=vrf_id, protocol=proto, + vrf_id=vrf_id, + protocol=proto, flags=flags, - tag=tag) + tag=tag, + ) @classmethod def setUpClass(cls): @@ -159,14 +169,14 @@ class TestNAT44ED(VppTestCase): cls.configure_ip4_interface(i, hosts=3) # test specific (test-multiple-vrf) - cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 1}) + cls.vapi.ip_table_add_del(is_add=1, table={"table_id": 1}) # test specific (test-one-armed-nat44-static) cls.pg4.generate_remote_hosts(2) cls.pg4.config_ip4() cls.vapi.sw_interface_add_del_address( - sw_if_index=cls.pg4.sw_if_index, - prefix="10.0.0.1/24") + sw_if_index=cls.pg4.sw_if_index, prefix="10.0.0.1/24" + ) cls.pg4.admin_up() cls.pg4.resolve_arp() cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4 @@ -190,58 +200,95 @@ class TestNAT44ED(VppTestCase): rl = list() - rl.append(VppIpRoute(cls, "0.0.0.0", 0, - [VppRoutePath("0.0.0.0", 0xffffffff, - nh_table_id=0)], - register=False, table_id=1)) - rl.append(VppIpRoute(cls, "0.0.0.0", 0, - [VppRoutePath(cls.pg1.local_ip4, - cls.pg1.sw_if_index)], - register=False)) - rl.append(VppIpRoute(cls, cls.pg5.remote_ip4, 32, - [VppRoutePath("0.0.0.0", - cls.pg5.sw_if_index)], - register=False, table_id=1)) - rl.append(VppIpRoute(cls, cls.pg6.remote_ip4, 32, - [VppRoutePath("0.0.0.0", - cls.pg6.sw_if_index)], - register=False, table_id=1)) - rl.append(VppIpRoute(cls, cls.pg6.remote_ip4, 16, - [VppRoutePath("0.0.0.0", 0xffffffff, - nh_table_id=1)], - register=False, table_id=0)) + rl.append( + VppIpRoute( + cls, + "0.0.0.0", + 0, + [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=0)], + register=False, + table_id=1, + ) + ) + rl.append( + VppIpRoute( + cls, + "0.0.0.0", + 0, + [VppRoutePath(cls.pg1.local_ip4, cls.pg1.sw_if_index)], + register=False, + ) + ) + rl.append( + VppIpRoute( + cls, + cls.pg5.remote_ip4, + 32, + [VppRoutePath("0.0.0.0", cls.pg5.sw_if_index)], + register=False, + table_id=1, + ) + ) + rl.append( + VppIpRoute( + cls, + cls.pg6.remote_ip4, + 32, + [VppRoutePath("0.0.0.0", cls.pg6.sw_if_index)], + register=False, + table_id=1, + ) + ) + rl.append( + VppIpRoute( + cls, + cls.pg6.remote_ip4, + 16, + [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=1)], + register=False, + table_id=0, + ) + ) for r in rl: r.add_vpp_config() - cls.no_diff = StatsDiff({ - pg.sw_if_index: { - '/nat44-ed/in2out/fastpath/tcp': 0, - '/nat44-ed/in2out/fastpath/udp': 0, - '/nat44-ed/in2out/fastpath/icmp': 0, - '/nat44-ed/in2out/fastpath/drops': 0, - '/nat44-ed/in2out/slowpath/tcp': 0, - '/nat44-ed/in2out/slowpath/udp': 0, - '/nat44-ed/in2out/slowpath/icmp': 0, - '/nat44-ed/in2out/slowpath/drops': 0, - '/nat44-ed/in2out/fastpath/tcp': 0, - '/nat44-ed/in2out/fastpath/udp': 0, - '/nat44-ed/in2out/fastpath/icmp': 0, - '/nat44-ed/in2out/fastpath/drops': 0, - '/nat44-ed/in2out/slowpath/tcp': 0, - '/nat44-ed/in2out/slowpath/udp': 0, - '/nat44-ed/in2out/slowpath/icmp': 0, - '/nat44-ed/in2out/slowpath/drops': 0, + cls.no_diff = StatsDiff( + { + pg.sw_if_index: { + "/nat44-ed/in2out/fastpath/tcp": 0, + "/nat44-ed/in2out/fastpath/udp": 0, + "/nat44-ed/in2out/fastpath/icmp": 0, + "/nat44-ed/in2out/fastpath/drops": 0, + "/nat44-ed/in2out/slowpath/tcp": 0, + "/nat44-ed/in2out/slowpath/udp": 0, + "/nat44-ed/in2out/slowpath/icmp": 0, + "/nat44-ed/in2out/slowpath/drops": 0, + "/nat44-ed/in2out/fastpath/tcp": 0, + "/nat44-ed/in2out/fastpath/udp": 0, + "/nat44-ed/in2out/fastpath/icmp": 0, + "/nat44-ed/in2out/fastpath/drops": 0, + "/nat44-ed/in2out/slowpath/tcp": 0, + "/nat44-ed/in2out/slowpath/udp": 0, + "/nat44-ed/in2out/slowpath/icmp": 0, + "/nat44-ed/in2out/slowpath/drops": 0, + } + for pg in cls.pg_interfaces } - for pg in cls.pg_interfaces - }) + ) def get_err_counter(self, path): return self.statistics.get_err_counter(path) - def reass_hairpinning(self, server_addr, server_in_port, server_out_port, - host_in_port, proto=IP_PROTOS.tcp, - ignore_port=False): + def reass_hairpinning( + self, + server_addr, + server_in_port, + server_out_port, + host_in_port, + proto=IP_PROTOS.tcp, + ignore_port=False, + ): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: @@ -250,19 +297,14 @@ class TestNAT44ED(VppTestCase): data = b"A" * 16 + b"B" * 16 + b"C" * 3 # send packet from host to server - pkts = self.create_stream_frag(self.pg0, - self.nat_addr, - host_in_port, - server_out_port, - data, - proto) + pkts = self.create_stream_frag( + self.pg0, self.nat_addr, host_in_port, server_out_port, data, proto + ) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg0.get_capture(len(pkts)) - p = self.reass_frags_and_verify(frags, - self.nat_addr, - server_addr) + p = self.reass_frags_and_verify(frags, self.nat_addr, server_addr) if proto != IP_PROTOS.icmp: if not ignore_port: self.assertNotEqual(p[layer].sport, host_in_port) @@ -272,8 +314,9 @@ class TestNAT44ED(VppTestCase): self.assertNotEqual(p[layer].id, host_in_port) self.assertEqual(data, p[Raw].load) - def frag_out_of_order(self, proto=IP_PROTOS.tcp, dont_translate=False, - ignore_port=False): + def frag_out_of_order( + self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False + ): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: @@ -284,21 +327,22 @@ class TestNAT44ED(VppTestCase): for i in range(2): # in2out - pkts = self.create_stream_frag(self.pg0, self.pg1.remote_ip4, - self.port_in, 20, data, proto) + pkts = self.create_stream_frag( + self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto + ) pkts.reverse() self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg1.get_capture(len(pkts)) if not dont_translate: - p = self.reass_frags_and_verify(frags, - self.nat_addr, - self.pg1.remote_ip4) + p = self.reass_frags_and_verify( + frags, self.nat_addr, self.pg1.remote_ip4 + ) else: - p = self.reass_frags_and_verify(frags, - self.pg0.remote_ip4, - self.pg1.remote_ip4) + p = self.reass_frags_and_verify( + frags, self.pg0.remote_ip4, self.pg1.remote_ip4 + ) if proto != IP_PROTOS.icmp: if not dont_translate: self.assertEqual(p[layer].dport, 20) @@ -325,17 +369,18 @@ class TestNAT44ED(VppTestCase): else: sport = p[layer].id dport = 0 - pkts = self.create_stream_frag(self.pg1, dst_addr, sport, dport, - data, proto, echo_reply=True) + pkts = self.create_stream_frag( + self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True + ) pkts.reverse() self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.logger.info(self.vapi.cli("show trace")) self.pg_start() frags = self.pg0.get_capture(len(pkts)) - p = self.reass_frags_and_verify(frags, - self.pg1.remote_ip4, - self.pg0.remote_ip4) + p = self.reass_frags_and_verify( + frags, self.pg1.remote_ip4, self.pg0.remote_ip4 + ) if proto != IP_PROTOS.icmp: self.assertEqual(p[layer].sport, 20) self.assertEqual(p[layer].dport, self.port_in) @@ -351,21 +396,20 @@ class TestNAT44ED(VppTestCase): self.assert_ip_checksum_valid(p) buffer.seek(p[IP].frag * 8) buffer.write(bytes(p[IP].payload)) - ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, - proto=frags[0][IP].proto) + ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst, proto=frags[0][IP].proto) if ip.proto == IP_PROTOS.tcp: - p = (ip / TCP(buffer.getvalue())) + p = ip / TCP(buffer.getvalue()) self.logger.debug(ppp("Reassembled:", p)) self.assert_tcp_checksum_valid(p) elif ip.proto == IP_PROTOS.udp: - p = (ip / UDP(buffer.getvalue()[:8]) / - Raw(buffer.getvalue()[8:])) + p = ip / UDP(buffer.getvalue()[:8]) / Raw(buffer.getvalue()[8:]) elif ip.proto == IP_PROTOS.icmp: - p = (ip / ICMP(buffer.getvalue())) + p = ip / ICMP(buffer.getvalue()) return p - def frag_in_order(self, proto=IP_PROTOS.tcp, dont_translate=False, - ignore_port=False): + def frag_in_order( + self, proto=IP_PROTOS.tcp, dont_translate=False, ignore_port=False + ): layer = self.proto2layer(proto) if proto == IP_PROTOS.tcp: @@ -375,20 +419,19 @@ class TestNAT44ED(VppTestCase): self.port_in = self.random_port() # in2out - pkts = self.create_stream_frag(self.pg0, self.pg1.remote_ip4, - self.port_in, 20, data, proto) + pkts = self.create_stream_frag( + self.pg0, self.pg1.remote_ip4, self.port_in, 20, data, proto + ) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg1.get_capture(len(pkts)) if not dont_translate: - p = self.reass_frags_and_verify(frags, - self.nat_addr, - self.pg1.remote_ip4) + p = self.reass_frags_and_verify(frags, self.nat_addr, self.pg1.remote_ip4) else: - p = self.reass_frags_and_verify(frags, - self.pg0.remote_ip4, - self.pg1.remote_ip4) + p = self.reass_frags_and_verify( + frags, self.pg0.remote_ip4, self.pg1.remote_ip4 + ) if proto != IP_PROTOS.icmp: if not dont_translate: self.assertEqual(p[layer].dport, 20) @@ -415,15 +458,14 @@ class TestNAT44ED(VppTestCase): else: sport = p[layer].id dport = 0 - pkts = self.create_stream_frag(self.pg1, dst_addr, sport, dport, data, - proto, echo_reply=True) + pkts = self.create_stream_frag( + self.pg1, dst_addr, sport, dport, data, proto, echo_reply=True + ) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg0.get_capture(len(pkts)) - p = self.reass_frags_and_verify(frags, - self.pg1.remote_ip4, - self.pg0.remote_ip4) + p = self.reass_frags_and_verify(frags, self.pg1.remote_ip4, self.pg0.remote_ip4) if proto != IP_PROTOS.icmp: self.assertEqual(p[layer].sport, 20) self.assertEqual(p[layer].dport, self.port_in) @@ -431,8 +473,9 @@ class TestNAT44ED(VppTestCase): self.assertEqual(p[layer].id, self.port_in) self.assertEqual(data, p[Raw].load) - def verify_capture_out(self, capture, nat_ip=None, same_port=False, - dst_ip=None, ignore_port=False): + def verify_capture_out( + self, capture, nat_ip=None, same_port=False, dst_ip=None, ignore_port=False + ): if nat_ip is None: nat_ip = self.nat_addr for packet in capture: @@ -444,35 +487,30 @@ class TestNAT44ED(VppTestCase): if packet.haslayer(TCP): if not ignore_port: if same_port: - self.assertEqual( - packet[TCP].sport, self.tcp_port_in) + self.assertEqual(packet[TCP].sport, self.tcp_port_in) else: - self.assertNotEqual( - packet[TCP].sport, self.tcp_port_in) + self.assertNotEqual(packet[TCP].sport, self.tcp_port_in) self.tcp_port_out = packet[TCP].sport self.assert_packet_checksums_valid(packet) elif packet.haslayer(UDP): if not ignore_port: if same_port: - self.assertEqual( - packet[UDP].sport, self.udp_port_in) + self.assertEqual(packet[UDP].sport, self.udp_port_in) else: - self.assertNotEqual( - packet[UDP].sport, self.udp_port_in) + self.assertNotEqual(packet[UDP].sport, self.udp_port_in) self.udp_port_out = packet[UDP].sport else: if not ignore_port: if same_port: - self.assertEqual( - packet[ICMP].id, self.icmp_id_in) + self.assertEqual(packet[ICMP].id, self.icmp_id_in) else: - self.assertNotEqual( - packet[ICMP].id, self.icmp_id_in) + self.assertNotEqual(packet[ICMP].id, self.icmp_id_in) self.icmp_id_out = packet[ICMP].id self.assert_packet_checksums_valid(packet) except: - self.logger.error(ppp("Unexpected or invalid packet " - "(outside network):", packet)) + self.logger.error( + ppp("Unexpected or invalid packet (outside network):", packet) + ) raise def verify_capture_in(self, capture, in_if): @@ -487,8 +525,9 @@ class TestNAT44ED(VppTestCase): else: self.assertEqual(packet[ICMP].id, self.icmp_id_in) except: - self.logger.error(ppp("Unexpected or invalid packet " - "(inside network):", packet)) + self.logger.error( + ppp("Unexpected or invalid packet (inside network):", packet) + ) raise def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64): @@ -497,27 +536,32 @@ class TestNAT44ED(VppTestCase): pkts = [] # TCP - p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / - IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) / - TCP(sport=self.tcp_port_in, dport=20)) + p = ( + Ether(dst=in_if.local_mac, src=in_if.remote_mac) + / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) + / TCP(sport=self.tcp_port_in, dport=20) + ) pkts.extend([p, p]) # UDP - p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / - IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) / - UDP(sport=self.udp_port_in, dport=20)) + p = ( + Ether(dst=in_if.local_mac, src=in_if.remote_mac) + / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) + / UDP(sport=self.udp_port_in, dport=20) + ) pkts.append(p) # ICMP - p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / - IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) / - ICMP(id=self.icmp_id_in, type='echo-request')) + p = ( + Ether(dst=in_if.local_mac, src=in_if.remote_mac) + / IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) + / ICMP(id=self.icmp_id_in, type="echo-request") + ) pkts.append(p) return pkts - def create_stream_out(self, out_if, dst_ip=None, ttl=64, - use_inside_ports=False): + def create_stream_out(self, out_if, dst_ip=None, ttl=64, use_inside_ports=False): if dst_ip is None: dst_ip = self.nat_addr if not use_inside_ports: @@ -530,21 +574,27 @@ class TestNAT44ED(VppTestCase): icmp_id = self.icmp_id_in pkts = [] # TCP - p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / - IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / - TCP(dport=tcp_port, sport=20)) + p = ( + Ether(dst=out_if.local_mac, src=out_if.remote_mac) + / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) + / TCP(dport=tcp_port, sport=20) + ) pkts.extend([p, p]) # UDP - p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / - IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / - UDP(dport=udp_port, sport=20)) + p = ( + Ether(dst=out_if.local_mac, src=out_if.remote_mac) + / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) + / UDP(dport=udp_port, sport=20) + ) pkts.append(p) # ICMP - p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / - IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / - ICMP(id=icmp_id, type='echo-reply')) + p = ( + Ether(dst=out_if.local_mac, src=out_if.remote_mac) + / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) + / ICMP(id=icmp_id, type="echo-reply") + ) pkts.append(p) return pkts @@ -554,19 +604,24 @@ class TestNAT44ED(VppTestCase): port = 6303 for i in range(count): - p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / - IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=64) / - TCP(sport=port + i, dport=20)) + p = ( + Ether(dst=in_if.local_mac, src=in_if.remote_mac) + / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=64) + / TCP(sport=port + i, dport=20) + ) pkts.append(p) return pkts - def create_stream_frag(self, src_if, dst, sport, dport, data, - proto=IP_PROTOS.tcp, echo_reply=False): + def create_stream_frag( + self, src_if, dst, sport, dport, data, proto=IP_PROTOS.tcp, echo_reply=False + ): if proto == IP_PROTOS.tcp: - p = (IP(src=src_if.remote_ip4, dst=dst) / - TCP(sport=sport, dport=dport) / - Raw(data)) + p = ( + IP(src=src_if.remote_ip4, dst=dst) + / TCP(sport=sport, dport=dport) + / Raw(data) + ) p = p.__class__(scapy.compat.raw(p)) chksum = p[TCP].chksum proto_header = TCP(sport=sport, dport=dport, chksum=chksum) @@ -574,9 +629,9 @@ class TestNAT44ED(VppTestCase): proto_header = UDP(sport=sport, dport=dport) elif proto == IP_PROTOS.icmp: if not echo_reply: - proto_header = ICMP(id=sport, type='echo-request') + proto_header = ICMP(id=sport, type="echo-request") else: - proto_header = ICMP(id=sport, type='echo-reply') + proto_header = ICMP(id=sport, type="echo-reply") else: raise Exception("Unsupported protocol") id = self.random_port() @@ -585,33 +640,38 @@ class TestNAT44ED(VppTestCase): raw = Raw(data[0:4]) else: raw = Raw(data[0:16]) - p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) / - IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) / - proto_header / - raw) + p = ( + Ether(src=src_if.remote_mac, dst=src_if.local_mac) + / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) + / proto_header + / raw + ) pkts.append(p) if proto == IP_PROTOS.tcp: raw = Raw(data[4:20]) else: raw = Raw(data[16:32]) - p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) / - IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, - proto=proto) / - raw) + p = ( + Ether(src=src_if.remote_mac, dst=src_if.local_mac) + / IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id, proto=proto) + / raw + ) pkts.append(p) if proto == IP_PROTOS.tcp: raw = Raw(data[20:]) else: raw = Raw(data[32:]) - p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) / - IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, - id=id) / - raw) + p = ( + Ether(src=src_if.remote_mac, dst=src_if.local_mac) + / IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto, id=id) + / raw + ) pkts.append(p) return pkts - def frag_in_order_in_plus_out(self, in_addr, out_addr, in_port, out_port, - proto=IP_PROTOS.tcp): + def frag_in_order_in_plus_out( + self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp + ): layer = self.proto2layer(proto) @@ -623,16 +683,14 @@ class TestNAT44ED(VppTestCase): for i in range(2): # out2in - pkts = self.create_stream_frag(self.pg0, out_addr, - port_in, out_port, - data, proto) + pkts = self.create_stream_frag( + self.pg0, out_addr, port_in, out_port, data, proto + ) self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg1.get_capture(len(pkts)) - p = self.reass_frags_and_verify(frags, - self.pg0.remote_ip4, - in_addr) + p = self.reass_frags_and_verify(frags, self.pg0.remote_ip4, in_addr) if proto != IP_PROTOS.icmp: self.assertEqual(p[layer].sport, port_in) self.assertEqual(p[layer].dport, in_port) @@ -642,20 +700,24 @@ class TestNAT44ED(VppTestCase): # in2out if proto != IP_PROTOS.icmp: - pkts = self.create_stream_frag(self.pg1, self.pg0.remote_ip4, - in_port, - p[layer].sport, data, proto) + pkts = self.create_stream_frag( + self.pg1, self.pg0.remote_ip4, in_port, p[layer].sport, data, proto + ) else: - pkts = self.create_stream_frag(self.pg1, self.pg0.remote_ip4, - p[layer].id, 0, data, proto, - echo_reply=True) + pkts = self.create_stream_frag( + self.pg1, + self.pg0.remote_ip4, + p[layer].id, + 0, + data, + proto, + echo_reply=True, + ) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg0.get_capture(len(pkts)) - p = self.reass_frags_and_verify(frags, - out_addr, - self.pg0.remote_ip4) + p = self.reass_frags_and_verify(frags, out_addr, self.pg0.remote_ip4) if proto != IP_PROTOS.icmp: self.assertEqual(p[layer].sport, out_port) self.assertEqual(p[layer].dport, port_in) @@ -663,8 +725,9 @@ class TestNAT44ED(VppTestCase): self.assertEqual(p[layer].id, port_in) self.assertEqual(data, p[Raw].load) - def frag_out_of_order_in_plus_out(self, in_addr, out_addr, in_port, - out_port, proto=IP_PROTOS.tcp): + def frag_out_of_order_in_plus_out( + self, in_addr, out_addr, in_port, out_port, proto=IP_PROTOS.tcp + ): layer = self.proto2layer(proto) @@ -676,17 +739,15 @@ class TestNAT44ED(VppTestCase): for i in range(2): # out2in - pkts = self.create_stream_frag(self.pg0, out_addr, - port_in, out_port, - data, proto) + pkts = self.create_stream_frag( + self.pg0, out_addr, port_in, out_port, data, proto + ) pkts.reverse() self.pg0.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg1.get_capture(len(pkts)) - p = self.reass_frags_and_verify(frags, - self.pg0.remote_ip4, - in_addr) + p = self.reass_frags_and_verify(frags, self.pg0.remote_ip4, in_addr) if proto != IP_PROTOS.icmp: self.assertEqual(p[layer].dport, in_port) self.assertEqual(p[layer].sport, port_in) @@ -697,21 +758,25 @@ class TestNAT44ED(VppTestCase): # in2out if proto != IP_PROTOS.icmp: - pkts = self.create_stream_frag(self.pg1, self.pg0.remote_ip4, - in_port, - p[layer].sport, data, proto) + pkts = self.create_stream_frag( + self.pg1, self.pg0.remote_ip4, in_port, p[layer].sport, data, proto + ) else: - pkts = self.create_stream_frag(self.pg1, self.pg0.remote_ip4, - p[layer].id, 0, data, proto, - echo_reply=True) + pkts = self.create_stream_frag( + self.pg1, + self.pg0.remote_ip4, + p[layer].id, + 0, + data, + proto, + echo_reply=True, + ) pkts.reverse() self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() frags = self.pg0.get_capture(len(pkts)) - p = self.reass_frags_and_verify(frags, - out_addr, - self.pg0.remote_ip4) + p = self.reass_frags_and_verify(frags, out_addr, self.pg0.remote_ip4) if proto != IP_PROTOS.icmp: self.assertEqual(p[layer].sport, out_port) self.assertEqual(p[layer].dport, port_in) @@ -721,9 +786,11 @@ class TestNAT44ED(VppTestCase): def init_tcp_session(self, in_if, out_if, in_port, ext_port): # SYN packet in->out - p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) / - IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) / - TCP(sport=in_port, dport=ext_port, flags="S")) + p = ( + Ether(src=in_if.remote_mac, dst=in_if.local_mac) + / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="S") + ) in_if.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -732,18 +799,22 @@ class TestNAT44ED(VppTestCase): out_port = p[TCP].sport # SYN + ACK packet out->in - p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) / - IP(src=out_if.remote_ip4, dst=self.nat_addr) / - TCP(sport=ext_port, dport=out_port, flags="SA")) + p = ( + Ether(src=out_if.remote_mac, dst=out_if.local_mac) + / IP(src=out_if.remote_ip4, dst=self.nat_addr) + / TCP(sport=ext_port, dport=out_port, flags="SA") + ) out_if.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() in_if.get_capture(1) # ACK packet in->out - p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) / - IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) / - TCP(sport=in_port, dport=ext_port, flags="A")) + p = ( + Ether(src=in_if.remote_mac, dst=in_if.local_mac) + / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="A") + ) in_if.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -751,9 +822,10 @@ class TestNAT44ED(VppTestCase): return out_port - def twice_nat_common(self, self_twice_nat=False, same_pg=False, lb=False, - client_id=None): - twice_nat_addr = '10.0.1.3' + def twice_nat_common( + self, self_twice_nat=False, same_pg=False, lb=False, client_id=None + ): + twice_nat_addr = "10.0.1.3" port_in = 8080 if lb: @@ -780,8 +852,7 @@ class TestNAT44ED(VppTestCase): else: pg1 = self.pg1 - eh_translate = ((not self_twice_nat) or (not lb and same_pg) or - client_id == 1) + eh_translate = (not self_twice_nat) or (not lb and same_pg) or client_id == 1 self.nat_add_address(self.nat_addr) self.nat_add_address(twice_nat_addr, twice_nat=1) @@ -793,27 +864,30 @@ class TestNAT44ED(VppTestCase): flags |= self.config_flags.NAT_IS_TWICE_NAT if not lb: - self.nat_add_static_mapping(pg0.remote_ip4, self.nat_addr, - port_in, port_out, - proto=IP_PROTOS.tcp, - flags=flags) + self.nat_add_static_mapping( + pg0.remote_ip4, + self.nat_addr, + port_in, + port_out, + proto=IP_PROTOS.tcp, + flags=flags, + ) else: - locals = [{'addr': server1.ip4, - 'port': port_in1, - 'probability': 50, - 'vrf_id': 0}, - {'addr': server2.ip4, - 'port': port_in2, - 'probability': 50, - 'vrf_id': 0}] + locals = [ + {"addr": server1.ip4, "port": port_in1, "probability": 50, "vrf_id": 0}, + {"addr": server2.ip4, "port": port_in2, "probability": 50, "vrf_id": 0}, + ] out_addr = self.nat_addr - self.vapi.nat44_add_del_lb_static_mapping(is_add=1, flags=flags, - external_addr=out_addr, - external_port=port_out, - protocol=IP_PROTOS.tcp, - local_num=len(locals), - locals=locals) + self.vapi.nat44_add_del_lb_static_mapping( + is_add=1, + flags=flags, + external_addr=out_addr, + external_port=port_out, + protocol=IP_PROTOS.tcp, + local_num=len(locals), + locals=locals, + ) self.nat_add_inside_interface(pg0) self.nat_add_outside_interface(pg1) @@ -828,9 +902,11 @@ class TestNAT44ED(VppTestCase): client = self.pg0.remote_hosts[1] else: client = pg1.remote_hosts[0] - p = (Ether(src=pg1.remote_mac, dst=pg1.local_mac) / - IP(src=client.ip4, dst=self.nat_addr) / - TCP(sport=eh_port_out, dport=port_out)) + p = ( + Ether(src=pg1.remote_mac, dst=pg1.local_mac) + / IP(src=client.ip4, dst=self.nat_addr) + / TCP(sport=eh_port_out, dport=port_out) + ) pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -865,9 +941,11 @@ class TestNAT44ED(VppTestCase): self.logger.error(ppp("Unexpected or invalid packet:", p)) raise - p = (Ether(src=server.mac, dst=pg0.local_mac) / - IP(src=server.ip4, dst=eh_addr_in) / - TCP(sport=saved_port_in, dport=eh_port_in)) + p = ( + Ether(src=server.mac, dst=pg0.local_mac) + / IP(src=server.ip4, dst=eh_addr_in) + / TCP(sport=saved_port_in, dport=eh_port_in) + ) pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -888,24 +966,25 @@ class TestNAT44ED(VppTestCase): if eh_translate: sessions = self.vapi.nat44_user_session_dump(server.ip4, 0) self.assertEqual(len(sessions), 1) - self.assertTrue(sessions[0].flags & - self.config_flags.NAT_IS_EXT_HOST_VALID) - self.assertTrue(sessions[0].flags & - self.config_flags.NAT_IS_TWICE_NAT) + self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID) + self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_TWICE_NAT) self.logger.info(self.vapi.cli("show nat44 sessions")) self.vapi.nat44_del_session( address=sessions[0].inside_ip_address, port=sessions[0].inside_port, protocol=sessions[0].protocol, - flags=(self.config_flags.NAT_IS_INSIDE | - self.config_flags.NAT_IS_EXT_HOST_VALID), + flags=( + self.config_flags.NAT_IS_INSIDE + | self.config_flags.NAT_IS_EXT_HOST_VALID + ), ext_host_address=sessions[0].ext_host_nat_address, - ext_host_port=sessions[0].ext_host_nat_port) + ext_host_port=sessions[0].ext_host_nat_port, + ) sessions = self.vapi.nat44_user_session_dump(server.ip4, 0) self.assertEqual(len(sessions), 0) def verify_syslog_sess(self, data, msgid, is_ip6=False): - message = data.decode('utf-8') + message = data.decode("utf-8") try: message = SyslogMessage.parse(message) except ParseError as e: @@ -913,29 +992,28 @@ class TestNAT44ED(VppTestCase): raise else: self.assertEqual(message.severity, SyslogSeverity.info) - self.assertEqual(message.appname, 'NAT') + self.assertEqual(message.appname, "NAT") self.assertEqual(message.msgid, msgid) - sd_params = message.sd.get('nsess') + sd_params = message.sd.get("nsess") self.assertTrue(sd_params is not None) if is_ip6: - self.assertEqual(sd_params.get('IATYP'), 'IPv6') - self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip6) + self.assertEqual(sd_params.get("IATYP"), "IPv6") + self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip6) else: - self.assertEqual(sd_params.get('IATYP'), 'IPv4') - self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4) - self.assertTrue(sd_params.get('SSUBIX') is not None) - self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in) - self.assertEqual(sd_params.get('XATYP'), 'IPv4') - self.assertEqual(sd_params.get('XSADDR'), self.nat_addr) - self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out) - self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp) - self.assertEqual(sd_params.get('SVLAN'), '0') - self.assertEqual(sd_params.get('XDADDR'), self.pg1.remote_ip4) - self.assertEqual(sd_params.get('XDPORT'), - "%d" % self.tcp_external_port) + self.assertEqual(sd_params.get("IATYP"), "IPv4") + self.assertEqual(sd_params.get("ISADDR"), self.pg0.remote_ip4) + self.assertTrue(sd_params.get("SSUBIX") is not None) + self.assertEqual(sd_params.get("ISPORT"), "%d" % self.tcp_port_in) + self.assertEqual(sd_params.get("XATYP"), "IPv4") + self.assertEqual(sd_params.get("XSADDR"), self.nat_addr) + self.assertEqual(sd_params.get("XSPORT"), "%d" % self.tcp_port_out) + self.assertEqual(sd_params.get("PROTO"), "%d" % IP_PROTOS.tcp) + self.assertEqual(sd_params.get("SVLAN"), "0") + self.assertEqual(sd_params.get("XDADDR"), self.pg1.remote_ip4) + self.assertEqual(sd_params.get("XDPORT"), "%d" % self.tcp_external_port) def test_icmp_error(self): - """ NAT44ED test ICMP error message with inner header""" + """NAT44ED test ICMP error message with inner header""" payload = "H" * 10 @@ -944,25 +1022,31 @@ class TestNAT44ED(VppTestCase): self.nat_add_outside_interface(self.pg1) # in2out (initiate connection) - p1 = [Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - UDP(sport=21, dport=20) / payload, - Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=21, dport=20, flags="S") / payload, - Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - ICMP(type='echo-request', id=7777) / payload, - ] + p1 = [ + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / UDP(sport=21, dport=20) + / payload, + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=21, dport=20, flags="S") + / payload, + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / ICMP(type="echo-request", id=7777) + / payload, + ] capture = self.send_and_expect(self.pg0, p1, self.pg1) # out2in (send error message) - p2 = [Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - ICMP(type='dest-unreach', code='port-unreachable') / - c[IP:] - for c in capture] + p2 = [ + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / ICMP(type="dest-unreach", code="port-unreachable") + / c[IP:] + for c in capture + ] capture = self.send_and_expect(self.pg1, p2, self.pg0) @@ -971,20 +1055,21 @@ class TestNAT44ED(VppTestCase): assert c[IP].dst == self.pg0.remote_ip4 assert c[IPerror].src == self.pg0.remote_ip4 except AssertionError as a: - raise AssertionError( - f"Packet {pr(c)} not translated properly") from a + raise AssertionError(f"Packet {pr(c)} not translated properly") from a def test_icmp_echo_reply_trailer(self): - """ ICMP echo reply with ethernet trailer""" + """ICMP echo reply with ethernet trailer""" self.nat_add_address(self.nat_addr) self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg1) # in2out - p1 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - ICMP(type=8, id=0xabcd, seq=0)) + p1 = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / ICMP(type=8, id=0xABCD, seq=0) + ) self.pg0.add_stream(p1) self.pg_enable_capture(self.pg_interfaces) @@ -994,9 +1079,11 @@ class TestNAT44ED(VppTestCase): self.logger.debug(self.vapi.cli("show trace")) # out2in - p2 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr, id=0xee59) / - ICMP(type=0, id=c[ICMP].id, seq=0)) + p2 = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr, id=0xEE59) + / ICMP(type=0, id=c[ICMP].id, seq=0) + ) # force checksum calculation p2 = p2.__class__(bytes(p2)) @@ -1019,7 +1106,7 @@ class TestNAT44ED(VppTestCase): self.pg0.get_capture(1) def test_users_dump(self): - """ NAT44ED API test - nat44_user_dump """ + """NAT44ED API test - nat44_user_dump""" self.nat_add_address(self.nat_addr) self.nat_add_inside_interface(self.pg0) @@ -1061,9 +1148,9 @@ class TestNAT44ED(VppTestCase): host0 = self.pg0.remote_hosts[0] self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1] try: - pkts = self.create_stream_out(self.pg1, - dst_ip=self.pg0.remote_ip4, - use_inside_ports=True) + pkts = self.create_stream_out( + self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True + ) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1075,8 +1162,7 @@ class TestNAT44ED(VppTestCase): self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) - self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4, - same_port=True) + self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4, same_port=True) finally: self.pg0.remote_hosts[0] = host0 @@ -1107,14 +1193,14 @@ class TestNAT44ED(VppTestCase): self.assertEqual(non_static_user.nsessions, 3) def test_frag_out_of_order_do_not_translate(self): - """ NAT44ED don't translate fragments arriving out of order """ + """NAT44ED don't translate fragments arriving out of order""" self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg1) self.vapi.nat44_forwarding_enable_disable(enable=True) self.frag_out_of_order(proto=IP_PROTOS.tcp, dont_translate=True) def test_forwarding(self): - """ NAT44ED forwarding test """ + """NAT44ED forwarding test""" self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg1) @@ -1123,11 +1209,13 @@ class TestNAT44ED(VppTestCase): real_ip = self.pg0.remote_ip4 alias_ip = self.nat_addr flags = self.config_flags.NAT_IS_ADDR_ONLY - self.vapi.nat44_add_del_static_mapping(is_add=1, - local_ip_address=real_ip, - external_ip_address=alias_ip, - external_sw_if_index=0xFFFFFFFF, - flags=flags) + self.vapi.nat44_add_del_static_mapping( + is_add=1, + local_ip_address=real_ip, + external_ip_address=alias_ip, + external_sw_if_index=0xFFFFFFFF, + flags=flags, + ) try: # in2out - static mapping match @@ -1151,9 +1239,9 @@ class TestNAT44ED(VppTestCase): host0 = self.pg0.remote_hosts[0] self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1] try: - pkts = self.create_stream_out(self.pg1, - dst_ip=self.pg0.remote_ip4, - use_inside_ports=True) + pkts = self.create_stream_out( + self.pg1, dst_ip=self.pg0.remote_ip4, use_inside_ports=True + ) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1165,24 +1253,27 @@ class TestNAT44ED(VppTestCase): self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) - self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4, - same_port=True) + self.verify_capture_out( + capture, nat_ip=self.pg0.remote_ip4, same_port=True + ) finally: self.pg0.remote_hosts[0] = host0 user = self.pg0.remote_hosts[1] sessions = self.vapi.nat44_user_session_dump(user.ip4, 0) self.assertEqual(len(sessions), 3) - self.assertTrue(sessions[0].flags & - self.config_flags.NAT_IS_EXT_HOST_VALID) + self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID) self.vapi.nat44_del_session( address=sessions[0].inside_ip_address, port=sessions[0].inside_port, protocol=sessions[0].protocol, - flags=(self.config_flags.NAT_IS_INSIDE | - self.config_flags.NAT_IS_EXT_HOST_VALID), + flags=( + self.config_flags.NAT_IS_INSIDE + | self.config_flags.NAT_IS_EXT_HOST_VALID + ), ext_host_address=sessions[0].ext_host_address, - ext_host_port=sessions[0].ext_host_port) + ext_host_port=sessions[0].ext_host_port, + ) sessions = self.vapi.nat44_user_session_dump(user.ip4, 0) self.assertEqual(len(sessions), 2) @@ -1194,16 +1285,17 @@ class TestNAT44ED(VppTestCase): local_ip_address=real_ip, external_ip_address=alias_ip, external_sw_if_index=0xFFFFFFFF, - flags=flags) + flags=flags, + ) def test_output_feature_and_service2(self): - """ NAT44ED interface output feature and service host direct access """ + """NAT44ED interface output feature and service host direct access""" self.vapi.nat44_forwarding_enable_disable(enable=1) self.nat_add_address(self.nat_addr) self.vapi.nat44_ed_add_del_output_interface( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # session initiated from service host - translate pkts = self.create_stream_in(self.pg0, self.pg1) @@ -1230,9 +1322,9 @@ class TestNAT44ED(VppTestCase): self.icmp_id_in = 60305 try: - pkts = self.create_stream_out(self.pg1, - self.pg0.remote_ip4, - use_inside_ports=True) + pkts = self.create_stream_out( + self.pg1, self.pg0.remote_ip4, use_inside_ports=True + ) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1244,40 +1336,37 @@ class TestNAT44ED(VppTestCase): self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(len(pkts)) - self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4, - same_port=True) + self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4, same_port=True) finally: self.tcp_port_in = tcp_port_in self.udp_port_in = udp_port_in self.icmp_id_in = icmp_id_in def test_twice_nat(self): - """ NAT44ED Twice NAT """ + """NAT44ED Twice NAT""" self.twice_nat_common() def test_self_twice_nat_positive(self): - """ NAT44ED Self Twice NAT (positive test) """ + """NAT44ED Self Twice NAT (positive test)""" self.twice_nat_common(self_twice_nat=True, same_pg=True) def test_self_twice_nat_lb_positive(self): - """ NAT44ED Self Twice NAT local service load balancing (positive test) - """ - self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True, - client_id=1) + """NAT44ED Self Twice NAT local service load balancing (positive test)""" + self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True, client_id=1) def test_twice_nat_lb(self): - """ NAT44ED Twice NAT local service load balancing """ + """NAT44ED Twice NAT local service load balancing""" self.twice_nat_common(lb=True) def test_output_feature(self): - """ NAT44ED interface output feature (in2out postrouting) """ + """NAT44ED interface output feature (in2out postrouting)""" self.vapi.nat44_forwarding_enable_disable(enable=1) self.nat_add_address(self.nat_addr) self.nat_add_outside_interface(self.pg0) self.vapi.nat44_ed_add_del_output_interface( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # in2out pkts = self.create_stream_in(self.pg0, self.pg1) @@ -1323,24 +1412,31 @@ class TestNAT44ED(VppTestCase): self.assertEqual(p[ICMP].type, 11) # 11 == time-exceeded def test_static_with_port_out2(self): - """ NAT44ED 1:1 NAPT asymmetrical rule """ + """NAT44ED 1:1 NAPT asymmetrical rule""" external_port = 80 local_port = 8080 self.vapi.nat44_forwarding_enable_disable(enable=1) flags = self.config_flags.NAT_IS_OUT2IN_ONLY - self.nat_add_static_mapping(self.pg0.remote_ip4, self.nat_addr, - local_port, external_port, - proto=IP_PROTOS.tcp, flags=flags) + self.nat_add_static_mapping( + self.pg0.remote_ip4, + self.nat_addr, + local_port, + external_port, + proto=IP_PROTOS.tcp, + flags=flags, + ) self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg1) # from client to service - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=12345, dport=external_port)) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=12345, dport=external_port) + ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1357,9 +1453,12 @@ class TestNAT44ED(VppTestCase): raise # ICMP error - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - ICMP(type=11) / capture[0][IP]) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / ICMP(type=11) + / capture[0][IP] + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1375,9 +1474,11 @@ class TestNAT44ED(VppTestCase): raise # from service back to client - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=local_port, dport=12345)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=local_port, dport=12345) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1394,9 +1495,12 @@ class TestNAT44ED(VppTestCase): raise # ICMP error - p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - ICMP(type=11) / capture[0][IP]) + p = ( + Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / ICMP(type=11) + / capture[0][IP] + ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1412,9 +1516,11 @@ class TestNAT44ED(VppTestCase): raise # from client to server (no translation) - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) / - TCP(sport=12346, dport=local_port)) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) + / TCP(sport=12346, dport=local_port) + ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1431,9 +1537,11 @@ class TestNAT44ED(VppTestCase): raise # from service back to client (no translation) - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=local_port, dport=12346)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=local_port, dport=12346) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1450,21 +1558,17 @@ class TestNAT44ED(VppTestCase): raise def test_static_lb(self): - """ NAT44ED local service load balancing """ + """NAT44ED local service load balancing""" external_addr_n = self.nat_addr external_port = 80 local_port = 8080 server1 = self.pg0.remote_hosts[0] server2 = self.pg0.remote_hosts[1] - locals = [{'addr': server1.ip4, - 'port': local_port, - 'probability': 70, - 'vrf_id': 0}, - {'addr': server2.ip4, - 'port': local_port, - 'probability': 30, - 'vrf_id': 0}] + locals = [ + {"addr": server1.ip4, "port": local_port, "probability": 70, "vrf_id": 0}, + {"addr": server2.ip4, "port": local_port, "probability": 30, "vrf_id": 0}, + ] self.nat_add_address(self.nat_addr) self.vapi.nat44_add_del_lb_static_mapping( @@ -1473,19 +1577,22 @@ class TestNAT44ED(VppTestCase): external_port=external_port, protocol=IP_PROTOS.tcp, local_num=len(locals), - locals=locals) + locals=locals, + ) flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # from client to service - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=12345, dport=external_port)) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=12345, dport=external_port) + ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1507,9 +1614,11 @@ class TestNAT44ED(VppTestCase): raise # from service back to client - p = (Ether(src=server.mac, dst=self.pg0.local_mac) / - IP(src=server.ip4, dst=self.pg1.remote_ip4) / - TCP(sport=local_port, dport=12345)) + p = ( + Ether(src=server.mac, dst=self.pg0.local_mac) + / IP(src=server.ip4, dst=self.pg1.remote_ip4) + / TCP(sport=local_port, dport=12345) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1527,56 +1636,59 @@ class TestNAT44ED(VppTestCase): sessions = self.vapi.nat44_user_session_dump(server.ip4, 0) self.assertEqual(len(sessions), 1) - self.assertTrue(sessions[0].flags & - self.config_flags.NAT_IS_EXT_HOST_VALID) + self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID) self.vapi.nat44_del_session( address=sessions[0].inside_ip_address, port=sessions[0].inside_port, protocol=sessions[0].protocol, - flags=(self.config_flags.NAT_IS_INSIDE | - self.config_flags.NAT_IS_EXT_HOST_VALID), + flags=( + self.config_flags.NAT_IS_INSIDE + | self.config_flags.NAT_IS_EXT_HOST_VALID + ), ext_host_address=sessions[0].ext_host_address, - ext_host_port=sessions[0].ext_host_port) + ext_host_port=sessions[0].ext_host_port, + ) sessions = self.vapi.nat44_user_session_dump(server.ip4, 0) self.assertEqual(len(sessions), 0) def test_static_lb_2(self): - """ NAT44ED local service load balancing (asymmetrical rule) """ + """NAT44ED local service load balancing (asymmetrical rule)""" external_addr = self.nat_addr external_port = 80 local_port = 8080 server1 = self.pg0.remote_hosts[0] server2 = self.pg0.remote_hosts[1] - locals = [{'addr': server1.ip4, - 'port': local_port, - 'probability': 70, - 'vrf_id': 0}, - {'addr': server2.ip4, - 'port': local_port, - 'probability': 30, - 'vrf_id': 0}] + locals = [ + {"addr": server1.ip4, "port": local_port, "probability": 70, "vrf_id": 0}, + {"addr": server2.ip4, "port": local_port, "probability": 30, "vrf_id": 0}, + ] self.vapi.nat44_forwarding_enable_disable(enable=1) flags = self.config_flags.NAT_IS_OUT2IN_ONLY - self.vapi.nat44_add_del_lb_static_mapping(is_add=1, flags=flags, - external_addr=external_addr, - external_port=external_port, - protocol=IP_PROTOS.tcp, - local_num=len(locals), - locals=locals) + self.vapi.nat44_add_del_lb_static_mapping( + is_add=1, + flags=flags, + external_addr=external_addr, + external_port=external_port, + protocol=IP_PROTOS.tcp, + local_num=len(locals), + locals=locals, + ) flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # from client to service - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=12345, dport=external_port)) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=12345, dport=external_port) + ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1598,9 +1710,11 @@ class TestNAT44ED(VppTestCase): raise # from service back to client - p = (Ether(src=server.mac, dst=self.pg0.local_mac) / - IP(src=server.ip4, dst=self.pg1.remote_ip4) / - TCP(sport=local_port, dport=12345)) + p = ( + Ether(src=server.mac, dst=self.pg0.local_mac) + / IP(src=server.ip4, dst=self.pg1.remote_ip4) + / TCP(sport=local_port, dport=12345) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1617,9 +1731,11 @@ class TestNAT44ED(VppTestCase): raise # from client to server (no translation) - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=server1.ip4) / - TCP(sport=12346, dport=local_port)) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=server1.ip4) + / TCP(sport=12346, dport=local_port) + ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1637,9 +1753,11 @@ class TestNAT44ED(VppTestCase): raise # from service back to client (no translation) - p = (Ether(src=server1.mac, dst=self.pg0.local_mac) / - IP(src=server1.ip4, dst=self.pg1.remote_ip4) / - TCP(sport=local_port, dport=12346)) + p = ( + Ether(src=server1.mac, dst=self.pg0.local_mac) + / IP(src=server1.ip4, dst=self.pg1.remote_ip4) + / TCP(sport=local_port, dport=12346) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1656,41 +1774,41 @@ class TestNAT44ED(VppTestCase): raise def test_lb_affinity(self): - """ NAT44ED local service load balancing affinity """ + """NAT44ED local service load balancing affinity""" external_addr = self.nat_addr external_port = 80 local_port = 8080 server1 = self.pg0.remote_hosts[0] server2 = self.pg0.remote_hosts[1] - locals = [{'addr': server1.ip4, - 'port': local_port, - 'probability': 50, - 'vrf_id': 0}, - {'addr': server2.ip4, - 'port': local_port, - 'probability': 50, - 'vrf_id': 0}] + locals = [ + {"addr": server1.ip4, "port": local_port, "probability": 50, "vrf_id": 0}, + {"addr": server2.ip4, "port": local_port, "probability": 50, "vrf_id": 0}, + ] self.nat_add_address(self.nat_addr) - self.vapi.nat44_add_del_lb_static_mapping(is_add=1, - external_addr=external_addr, - external_port=external_port, - protocol=IP_PROTOS.tcp, - affinity=10800, - local_num=len(locals), - locals=locals) + self.vapi.nat44_add_del_lb_static_mapping( + is_add=1, + external_addr=external_addr, + external_port=external_port, + protocol=IP_PROTOS.tcp, + affinity=10800, + local_num=len(locals), + locals=locals, + ) flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) - p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=1025, dport=external_port)) + p = ( + Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=1025, dport=external_port) + ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1699,22 +1817,26 @@ class TestNAT44ED(VppTestCase): sessions = self.vapi.nat44_user_session_dump(backend, 0) self.assertEqual(len(sessions), 1) - self.assertTrue(sessions[0].flags & - self.config_flags.NAT_IS_EXT_HOST_VALID) + self.assertTrue(sessions[0].flags & self.config_flags.NAT_IS_EXT_HOST_VALID) self.vapi.nat44_del_session( address=sessions[0].inside_ip_address, port=sessions[0].inside_port, protocol=sessions[0].protocol, - flags=(self.config_flags.NAT_IS_INSIDE | - self.config_flags.NAT_IS_EXT_HOST_VALID), + flags=( + self.config_flags.NAT_IS_INSIDE + | self.config_flags.NAT_IS_EXT_HOST_VALID + ), ext_host_address=sessions[0].ext_host_address, - ext_host_port=sessions[0].ext_host_port) + ext_host_port=sessions[0].ext_host_port, + ) pkts = [] for port in range(1030, 1100): - p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=port, dport=external_port)) + p = ( + Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=port, dport=external_port) + ) pkts.append(p) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) @@ -1724,31 +1846,39 @@ class TestNAT44ED(VppTestCase): self.assertEqual(p[IP].dst, backend) def test_multiple_vrf_1(self): - """ Multiple VRF - both client & service in VRF1 """ + """Multiple VRF - both client & service in VRF1""" - external_addr = '1.2.3.4' + external_addr = "1.2.3.4" external_port = 80 local_port = 8080 port = 0 flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg5.sw_if_index, - is_add=1) + sw_if_index=self.pg5.sw_if_index, is_add=1 + ) self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg5.sw_if_index, - is_add=1, flags=flags) + sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags + ) self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg6.sw_if_index, - is_add=1) + sw_if_index=self.pg6.sw_if_index, is_add=1 + ) flags = self.config_flags.NAT_IS_OUT2IN_ONLY - self.nat_add_static_mapping(self.pg5.remote_ip4, external_addr, - local_port, external_port, vrf_id=1, - proto=IP_PROTOS.tcp, flags=flags) + self.nat_add_static_mapping( + self.pg5.remote_ip4, + external_addr, + local_port, + external_port, + vrf_id=1, + proto=IP_PROTOS.tcp, + flags=flags, + ) - p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) / - IP(src=self.pg6.remote_ip4, dst=external_addr) / - TCP(sport=12345, dport=external_port)) + p = ( + Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) + / IP(src=self.pg6.remote_ip4, dst=external_addr) + / TCP(sport=12345, dport=external_port) + ) self.pg6.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1764,9 +1894,11 @@ class TestNAT44ED(VppTestCase): self.logger.error(ppp("Unexpected or invalid packet:", p)) raise - p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) / - IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) / - TCP(sport=local_port, dport=12345)) + p = ( + Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) + / IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) + / TCP(sport=local_port, dport=12345) + ) self.pg5.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1783,9 +1915,9 @@ class TestNAT44ED(VppTestCase): raise def test_multiple_vrf_2(self): - """ Multiple VRF - dynamic NAT from VRF1 to VRF0 (output-feature) """ + """Multiple VRF - dynamic NAT from VRF1 to VRF0 (output-feature)""" - external_addr = '1.2.3.4' + external_addr = "1.2.3.4" external_port = 80 local_port = 8080 port = 0 @@ -1793,22 +1925,30 @@ class TestNAT44ED(VppTestCase): self.nat_add_address(self.nat_addr) flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat44_ed_add_del_output_interface( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg5.sw_if_index, - is_add=1) + sw_if_index=self.pg5.sw_if_index, is_add=1 + ) self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg5.sw_if_index, - is_add=1, flags=flags) + sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags + ) flags = self.config_flags.NAT_IS_OUT2IN_ONLY - self.nat_add_static_mapping(self.pg5.remote_ip4, external_addr, - local_port, external_port, vrf_id=1, - proto=IP_PROTOS.tcp, flags=flags) + self.nat_add_static_mapping( + self.pg5.remote_ip4, + external_addr, + local_port, + external_port, + vrf_id=1, + proto=IP_PROTOS.tcp, + flags=flags, + ) - p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) / - IP(src=self.pg5.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=2345, dport=22)) + p = ( + Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) + / IP(src=self.pg5.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=2345, dport=22) + ) self.pg5.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1824,9 +1964,11 @@ class TestNAT44ED(VppTestCase): self.logger.error(ppp("Unexpected or invalid packet:", p)) raise - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=22, dport=port)) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=22, dport=port) + ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1843,23 +1985,23 @@ class TestNAT44ED(VppTestCase): raise def test_multiple_vrf_3(self): - """ Multiple VRF - client in VRF1, service in VRF0 """ + """Multiple VRF - client in VRF1, service in VRF0""" - external_addr = '1.2.3.4' + external_addr = "1.2.3.4" external_port = 80 local_port = 8080 port = 0 flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - is_add=1) + sw_if_index=self.pg0.sw_if_index, is_add=1 + ) self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - is_add=1, flags=flags) + sw_if_index=self.pg0.sw_if_index, is_add=1, flags=flags + ) self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg6.sw_if_index, - is_add=1) + sw_if_index=self.pg6.sw_if_index, is_add=1 + ) flags = self.config_flags.NAT_IS_OUT2IN_ONLY self.nat_add_static_mapping( self.pg0.remote_ip4, @@ -1868,13 +2010,15 @@ class TestNAT44ED(VppTestCase): vrf_id=0, external_port=external_port, proto=IP_PROTOS.tcp, - flags=flags + flags=flags, ) # from client VRF1 to service VRF0 - p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) / - IP(src=self.pg6.remote_ip4, dst=self.pg0.local_ip4) / - TCP(sport=12346, dport=external_port)) + p = ( + Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) + / IP(src=self.pg6.remote_ip4, dst=self.pg0.local_ip4) + / TCP(sport=12346, dport=external_port) + ) self.pg6.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1891,9 +2035,11 @@ class TestNAT44ED(VppTestCase): raise # from service VRF0 back to client VRF1 - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) / - TCP(sport=local_port, dport=12346)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) + / TCP(sport=local_port, dport=12346) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1910,35 +2056,43 @@ class TestNAT44ED(VppTestCase): raise def test_multiple_vrf_4(self): - """ Multiple VRF - client in VRF0, service in VRF1 """ + """Multiple VRF - client in VRF0, service in VRF1""" - external_addr = '1.2.3.4' + external_addr = "1.2.3.4" external_port = 80 local_port = 8080 port = 0 flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - is_add=1) + sw_if_index=self.pg0.sw_if_index, is_add=1 + ) self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - is_add=1, flags=flags) + sw_if_index=self.pg0.sw_if_index, is_add=1, flags=flags + ) self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg5.sw_if_index, - is_add=1) + sw_if_index=self.pg5.sw_if_index, is_add=1 + ) self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg5.sw_if_index, - is_add=1, flags=flags) + sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags + ) flags = self.config_flags.NAT_IS_OUT2IN_ONLY - self.nat_add_static_mapping(self.pg5.remote_ip4, external_addr, - local_port, external_port, vrf_id=1, - proto=IP_PROTOS.tcp, flags=flags) + self.nat_add_static_mapping( + self.pg5.remote_ip4, + external_addr, + local_port, + external_port, + vrf_id=1, + proto=IP_PROTOS.tcp, + flags=flags, + ) # from client VRF0 to service VRF1 - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=external_addr) / - TCP(sport=12347, dport=external_port)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=external_addr) + / TCP(sport=12347, dport=external_port) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1955,9 +2109,11 @@ class TestNAT44ED(VppTestCase): raise # from service VRF1 back to client VRF0 - p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) / - IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) / - TCP(sport=local_port, dport=12347)) + p = ( + Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) + / IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) + / TCP(sport=local_port, dport=12347) + ) self.pg5.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1974,9 +2130,9 @@ class TestNAT44ED(VppTestCase): raise def test_multiple_vrf_5(self): - """ Multiple VRF - forwarding - no translation """ + """Multiple VRF - forwarding - no translation""" - external_addr = '1.2.3.4' + external_addr = "1.2.3.4" external_port = 80 local_port = 8080 port = 0 @@ -1984,24 +2140,30 @@ class TestNAT44ED(VppTestCase): self.vapi.nat44_forwarding_enable_disable(enable=1) flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - is_add=1) + sw_if_index=self.pg0.sw_if_index, is_add=1 + ) self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - is_add=1, flags=flags) + sw_if_index=self.pg0.sw_if_index, is_add=1, flags=flags + ) self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg5.sw_if_index, - is_add=1) + sw_if_index=self.pg5.sw_if_index, is_add=1 + ) self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg5.sw_if_index, - is_add=1, flags=flags) + sw_if_index=self.pg5.sw_if_index, is_add=1, flags=flags + ) self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg6.sw_if_index, - is_add=1) + sw_if_index=self.pg6.sw_if_index, is_add=1 + ) flags = self.config_flags.NAT_IS_OUT2IN_ONLY - self.nat_add_static_mapping(self.pg5.remote_ip4, external_addr, - local_port, external_port, vrf_id=1, - proto=IP_PROTOS.tcp, flags=flags) + self.nat_add_static_mapping( + self.pg5.remote_ip4, + external_addr, + local_port, + external_port, + vrf_id=1, + proto=IP_PROTOS.tcp, + flags=flags, + ) self.nat_add_static_mapping( self.pg0.remote_ip4, external_sw_if_index=self.pg0.sw_if_index, @@ -2009,13 +2171,15 @@ class TestNAT44ED(VppTestCase): vrf_id=0, external_port=external_port, proto=IP_PROTOS.tcp, - flags=flags + flags=flags, ) # from client to server (both VRF1, no translation) - p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) / - IP(src=self.pg6.remote_ip4, dst=self.pg5.remote_ip4) / - TCP(sport=12348, dport=local_port)) + p = ( + Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) + / IP(src=self.pg6.remote_ip4, dst=self.pg5.remote_ip4) + / TCP(sport=12348, dport=local_port) + ) self.pg6.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2032,9 +2196,11 @@ class TestNAT44ED(VppTestCase): raise # from server back to client (both VRF1, no translation) - p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) / - IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) / - TCP(sport=local_port, dport=12348)) + p = ( + Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) + / IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) + / TCP(sport=local_port, dport=12348) + ) self.pg5.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2051,9 +2217,11 @@ class TestNAT44ED(VppTestCase): raise # from client VRF1 to server VRF0 (no translation) - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) / - TCP(sport=local_port, dport=12349)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) + / TCP(sport=local_port, dport=12349) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2070,9 +2238,11 @@ class TestNAT44ED(VppTestCase): raise # from server VRF0 back to client VRF1 (no translation) - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) / - TCP(sport=local_port, dport=12349)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) + / TCP(sport=local_port, dport=12349) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2089,9 +2259,11 @@ class TestNAT44ED(VppTestCase): raise # from client VRF0 to server VRF1 (no translation) - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg5.remote_ip4) / - TCP(sport=12344, dport=local_port)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg5.remote_ip4) + / TCP(sport=12344, dport=local_port) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2108,9 +2280,11 @@ class TestNAT44ED(VppTestCase): raise # from server VRF1 back to client VRF0 (no translation) - p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) / - IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) / - TCP(sport=local_port, dport=12344)) + p = ( + Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) + / IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) + / TCP(sport=local_port, dport=12344) + ) self.pg5.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2127,7 +2301,7 @@ class TestNAT44ED(VppTestCase): raise def test_outside_address_distribution(self): - """ Outside address distribution based on source address """ + """Outside address distribution based on source address""" x = 100 nat_addresses = [] @@ -2142,7 +2316,10 @@ class TestNAT44ED(VppTestCase): self.vapi.nat44_add_del_address_range( first_ip_address=nat_addresses[0], last_ip_address=nat_addresses[-1], - vrf_id=0xFFFFFFFF, is_add=1, flags=0) + vrf_id=0xFFFFFFFF, + is_add=1, + flags=0, + ) self.pg0.generate_remote_hosts(x) @@ -2150,11 +2327,12 @@ class TestNAT44ED(VppTestCase): for i in range(x): info = self.create_packet_info(self.pg0, self.pg1) payload = self.info_to_payload(info) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_hosts[i].ip4, - dst=self.pg1.remote_ip4) / - UDP(sport=7000+i, dport=8000+i) / - Raw(payload)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_hosts[i].ip4, dst=self.pg1.remote_ip4) + / UDP(sport=7000 + i, dport=8000 + i) + / Raw(payload) + ) info.data = p pkts.append(p) @@ -2172,20 +2350,23 @@ class TestNAT44ED(VppTestCase): packed = socket.inet_aton(p_sent[IP].src) numeric = struct.unpack("!L", packed)[0] numeric = socket.htonl(numeric) - a = nat_addresses[(numeric-1) % len(nat_addresses)] + a = nat_addresses[(numeric - 1) % len(nat_addresses)] self.assertEqual( - a, p_recvd[IP].src, + a, + p_recvd[IP].src, "Invalid packet (src IP %s translated to %s, but expected %s)" - % (p_sent[IP].src, p_recvd[IP].src, a)) + % (p_sent[IP].src, p_recvd[IP].src, a), + ) class TestNAT44EDMW(TestNAT44ED): - """ NAT44ED MW Test Case """ + """NAT44ED MW Test Case""" + vpp_worker_count = 4 max_sessions = 5000 def test_dynamic(self): - """ NAT44ED dynamic translation test """ + """NAT44ED dynamic translation test""" pkt_count = 1500 tcp_port_offset = 20 udp_port_offset = 20 @@ -2196,27 +2377,33 @@ class TestNAT44EDMW(TestNAT44ED): self.nat_add_outside_interface(self.pg1) # in2out - tc1 = self.statistics['/nat44-ed/in2out/slowpath/tcp'] - uc1 = self.statistics['/nat44-ed/in2out/slowpath/udp'] - ic1 = self.statistics['/nat44-ed/in2out/slowpath/icmp'] - dc1 = self.statistics['/nat44-ed/in2out/slowpath/drops'] + tc1 = self.statistics["/nat44-ed/in2out/slowpath/tcp"] + uc1 = self.statistics["/nat44-ed/in2out/slowpath/udp"] + ic1 = self.statistics["/nat44-ed/in2out/slowpath/icmp"] + dc1 = self.statistics["/nat44-ed/in2out/slowpath/drops"] i2o_pkts = [[] for x in range(0, self.vpp_worker_count)] for i in range(pkt_count): - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=tcp_port_offset + i, dport=20)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=tcp_port_offset + i, dport=20) + ) i2o_pkts[p[TCP].sport % self.vpp_worker_count].append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - UDP(sport=udp_port_offset + i, dport=20)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / UDP(sport=udp_port_offset + i, dport=20) + ) i2o_pkts[p[UDP].sport % self.vpp_worker_count].append(p) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - ICMP(id=icmp_id_offset + i, type='echo-request')) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / ICMP(id=icmp_id_offset + i, type="echo-request") + ) i2o_pkts[p[ICMP].id % self.vpp_worker_count].append(p) for i in range(0, self.vpp_worker_count): @@ -2228,26 +2415,23 @@ class TestNAT44EDMW(TestNAT44ED): capture = self.pg1.get_capture(pkt_count * 3, timeout=5) if_idx = self.pg0.sw_if_index - tc2 = self.statistics['/nat44-ed/in2out/slowpath/tcp'] - uc2 = self.statistics['/nat44-ed/in2out/slowpath/udp'] - ic2 = self.statistics['/nat44-ed/in2out/slowpath/icmp'] - dc2 = self.statistics['/nat44-ed/in2out/slowpath/drops'] - - self.assertEqual( - tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count) - self.assertEqual( - uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count) - self.assertEqual( - ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count) + tc2 = self.statistics["/nat44-ed/in2out/slowpath/tcp"] + uc2 = self.statistics["/nat44-ed/in2out/slowpath/udp"] + ic2 = self.statistics["/nat44-ed/in2out/slowpath/icmp"] + dc2 = self.statistics["/nat44-ed/in2out/slowpath/drops"] + + self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count) + self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count) + self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count) self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0) self.logger.info(self.vapi.cli("show trace")) # out2in - tc1 = self.statistics['/nat44-ed/out2in/fastpath/tcp'] - uc1 = self.statistics['/nat44-ed/out2in/fastpath/udp'] - ic1 = self.statistics['/nat44-ed/out2in/fastpath/icmp'] - dc1 = self.statistics['/nat44-ed/out2in/fastpath/drops'] + tc1 = self.statistics["/nat44-ed/out2in/fastpath/tcp"] + uc1 = self.statistics["/nat44-ed/out2in/fastpath/udp"] + ic1 = self.statistics["/nat44-ed/out2in/fastpath/icmp"] + dc1 = self.statistics["/nat44-ed/out2in/fastpath/drops"] recvd_tcp_ports = set() recvd_udp_ports = set() @@ -2267,19 +2451,25 @@ class TestNAT44EDMW(TestNAT44ED): o2i_pkts = [[] for x in range(0, self.vpp_worker_count)] for i in range(pkt_count): - p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(dport=choice(recvd_tcp_ports), sport=20)) + p = ( + Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(dport=choice(recvd_tcp_ports), sport=20) + ) o2i_pkts[p[TCP].dport % self.vpp_worker_count].append(p) - p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - UDP(dport=choice(recvd_udp_ports), sport=20)) + p = ( + Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / UDP(dport=choice(recvd_udp_ports), sport=20) + ) o2i_pkts[p[UDP].dport % self.vpp_worker_count].append(p) - p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - ICMP(id=choice(recvd_icmp_ids), type='echo-reply')) + p = ( + Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / ICMP(id=choice(recvd_icmp_ids), type="echo-reply") + ) o2i_pkts[p[ICMP].id % self.vpp_worker_count].append(p) for i in range(0, self.vpp_worker_count): @@ -2295,41 +2485,50 @@ class TestNAT44EDMW(TestNAT44ED): self.assertEqual(packet[IP].dst, self.pg0.remote_ip4) if packet.haslayer(TCP): self.assert_in_range( - packet[TCP].dport, tcp_port_offset, - tcp_port_offset + pkt_count, "dst TCP port") + packet[TCP].dport, + tcp_port_offset, + tcp_port_offset + pkt_count, + "dst TCP port", + ) elif packet.haslayer(UDP): self.assert_in_range( - packet[UDP].dport, udp_port_offset, - udp_port_offset + pkt_count, "dst UDP port") + packet[UDP].dport, + udp_port_offset, + udp_port_offset + pkt_count, + "dst UDP port", + ) else: self.assert_in_range( - packet[ICMP].id, icmp_id_offset, - icmp_id_offset + pkt_count, "ICMP id") + packet[ICMP].id, + icmp_id_offset, + icmp_id_offset + pkt_count, + "ICMP id", + ) except: - self.logger.error(ppp("Unexpected or invalid packet " - "(inside network):", packet)) + self.logger.error( + ppp("Unexpected or invalid packet (inside network):", packet) + ) raise if_idx = self.pg1.sw_if_index - tc2 = self.statistics['/nat44-ed/out2in/fastpath/tcp'] - uc2 = self.statistics['/nat44-ed/out2in/fastpath/udp'] - ic2 = self.statistics['/nat44-ed/out2in/fastpath/icmp'] - dc2 = self.statistics['/nat44-ed/out2in/fastpath/drops'] - - self.assertEqual( - tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count) - self.assertEqual( - uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count) - self.assertEqual( - ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count) + tc2 = self.statistics["/nat44-ed/out2in/fastpath/tcp"] + uc2 = self.statistics["/nat44-ed/out2in/fastpath/udp"] + ic2 = self.statistics["/nat44-ed/out2in/fastpath/icmp"] + dc2 = self.statistics["/nat44-ed/out2in/fastpath/drops"] + + self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count) + self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count) + self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count) self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0) - sc = self.statistics['/nat44-ed/total-sessions'] - self.assertEqual(sc[:, 0].sum(), len(recvd_tcp_ports) + - len(recvd_udp_ports) + len(recvd_icmp_ids)) + sc = self.statistics["/nat44-ed/total-sessions"] + self.assertEqual( + sc[:, 0].sum(), + len(recvd_tcp_ports) + len(recvd_udp_ports) + len(recvd_icmp_ids), + ) def test_frag_in_order(self): - """ NAT44ED translate fragments arriving in order """ + """NAT44ED translate fragments arriving in order""" self.nat_add_address(self.nat_addr) self.nat_add_inside_interface(self.pg0) @@ -2340,7 +2539,7 @@ class TestNAT44EDMW(TestNAT44ED): self.frag_in_order(proto=IP_PROTOS.icmp, ignore_port=True) def test_frag_in_order_do_not_translate(self): - """ NAT44ED don't translate fragments arriving in order """ + """NAT44ED don't translate fragments arriving in order""" self.nat_add_address(self.nat_addr) self.nat_add_inside_interface(self.pg0) @@ -2350,7 +2549,7 @@ class TestNAT44EDMW(TestNAT44ED): self.frag_in_order(proto=IP_PROTOS.tcp, dont_translate=True) def test_frag_out_of_order(self): - """ NAT44ED translate fragments arriving out of order """ + """NAT44ED translate fragments arriving out of order""" self.nat_add_address(self.nat_addr) self.nat_add_inside_interface(self.pg0) @@ -2361,7 +2560,7 @@ class TestNAT44EDMW(TestNAT44ED): self.frag_out_of_order(proto=IP_PROTOS.icmp, ignore_port=True) def test_frag_in_order_in_plus_out(self): - """ NAT44ED in+out interface fragments in order """ + """NAT44ED in+out interface fragments in order""" in_port = self.random_port() out_port = self.random_port() @@ -2373,39 +2572,29 @@ class TestNAT44EDMW(TestNAT44ED): self.nat_add_outside_interface(self.pg1) # add static mappings for server - self.nat_add_static_mapping(self.server_addr, - self.nat_addr, - in_port, - out_port, - proto=IP_PROTOS.tcp) - self.nat_add_static_mapping(self.server_addr, - self.nat_addr, - in_port, - out_port, - proto=IP_PROTOS.udp) - self.nat_add_static_mapping(self.server_addr, - self.nat_addr, - proto=IP_PROTOS.icmp) + self.nat_add_static_mapping( + self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.tcp + ) + self.nat_add_static_mapping( + self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.udp + ) + self.nat_add_static_mapping( + self.server_addr, self.nat_addr, proto=IP_PROTOS.icmp + ) # run tests for each protocol - self.frag_in_order_in_plus_out(self.server_addr, - self.nat_addr, - in_port, - out_port, - IP_PROTOS.tcp) - self.frag_in_order_in_plus_out(self.server_addr, - self.nat_addr, - in_port, - out_port, - IP_PROTOS.udp) - self.frag_in_order_in_plus_out(self.server_addr, - self.nat_addr, - in_port, - out_port, - IP_PROTOS.icmp) + self.frag_in_order_in_plus_out( + self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.tcp + ) + self.frag_in_order_in_plus_out( + self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.udp + ) + self.frag_in_order_in_plus_out( + self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.icmp + ) def test_frag_out_of_order_in_plus_out(self): - """ NAT44ED in+out interface fragments out of order """ + """NAT44ED in+out interface fragments out of order""" in_port = self.random_port() out_port = self.random_port() @@ -2417,39 +2606,29 @@ class TestNAT44EDMW(TestNAT44ED): self.nat_add_outside_interface(self.pg1) # add static mappings for server - self.nat_add_static_mapping(self.server_addr, - self.nat_addr, - in_port, - out_port, - proto=IP_PROTOS.tcp) - self.nat_add_static_mapping(self.server_addr, - self.nat_addr, - in_port, - out_port, - proto=IP_PROTOS.udp) - self.nat_add_static_mapping(self.server_addr, - self.nat_addr, - proto=IP_PROTOS.icmp) + self.nat_add_static_mapping( + self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.tcp + ) + self.nat_add_static_mapping( + self.server_addr, self.nat_addr, in_port, out_port, proto=IP_PROTOS.udp + ) + self.nat_add_static_mapping( + self.server_addr, self.nat_addr, proto=IP_PROTOS.icmp + ) # run tests for each protocol - self.frag_out_of_order_in_plus_out(self.server_addr, - self.nat_addr, - in_port, - out_port, - IP_PROTOS.tcp) - self.frag_out_of_order_in_plus_out(self.server_addr, - self.nat_addr, - in_port, - out_port, - IP_PROTOS.udp) - self.frag_out_of_order_in_plus_out(self.server_addr, - self.nat_addr, - in_port, - out_port, - IP_PROTOS.icmp) + self.frag_out_of_order_in_plus_out( + self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.tcp + ) + self.frag_out_of_order_in_plus_out( + self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.udp + ) + self.frag_out_of_order_in_plus_out( + self.server_addr, self.nat_addr, in_port, out_port, IP_PROTOS.icmp + ) def test_reass_hairpinning(self): - """ NAT44ED fragments hairpinning """ + """NAT44ED fragments hairpinning""" server_addr = self.pg0.remote_hosts[1].ip4 @@ -2462,26 +2641,49 @@ class TestNAT44EDMW(TestNAT44ED): self.nat_add_outside_interface(self.pg1) # add static mapping for server - self.nat_add_static_mapping(server_addr, self.nat_addr, - server_in_port, server_out_port, - proto=IP_PROTOS.tcp) - self.nat_add_static_mapping(server_addr, self.nat_addr, - server_in_port, server_out_port, - proto=IP_PROTOS.udp) + self.nat_add_static_mapping( + server_addr, + self.nat_addr, + server_in_port, + server_out_port, + proto=IP_PROTOS.tcp, + ) + self.nat_add_static_mapping( + server_addr, + self.nat_addr, + server_in_port, + server_out_port, + proto=IP_PROTOS.udp, + ) self.nat_add_static_mapping(server_addr, self.nat_addr) - self.reass_hairpinning(server_addr, server_in_port, server_out_port, - host_in_port, proto=IP_PROTOS.tcp, - ignore_port=True) - self.reass_hairpinning(server_addr, server_in_port, server_out_port, - host_in_port, proto=IP_PROTOS.udp, - ignore_port=True) - self.reass_hairpinning(server_addr, server_in_port, server_out_port, - host_in_port, proto=IP_PROTOS.icmp, - ignore_port=True) + self.reass_hairpinning( + server_addr, + server_in_port, + server_out_port, + host_in_port, + proto=IP_PROTOS.tcp, + ignore_port=True, + ) + self.reass_hairpinning( + server_addr, + server_in_port, + server_out_port, + host_in_port, + proto=IP_PROTOS.udp, + ignore_port=True, + ) + self.reass_hairpinning( + server_addr, + server_in_port, + server_out_port, + host_in_port, + proto=IP_PROTOS.icmp, + ignore_port=True, + ) def test_session_limit_per_vrf(self): - """ NAT44ED per vrf session limit """ + """NAT44ED per vrf session limit""" inside = self.pg0 inside_vrf10 = self.pg2 @@ -2520,27 +2722,29 @@ class TestNAT44EDMW(TestNAT44ED): capture = outside.get_capture(len(stream)) def test_show_max_translations(self): - """ NAT44ED API test - max translations per thread """ + """NAT44ED API test - max translations per thread""" config = self.vapi.nat44_show_running_config() - self.assertEqual(self.max_sessions, - config.sessions) + self.assertEqual(self.max_sessions, config.sessions) def test_lru_cleanup(self): - """ NAT44ED LRU cleanup algorithm """ + """NAT44ED LRU cleanup algorithm""" self.nat_add_address(self.nat_addr) self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg1) self.vapi.nat_set_timeouts( - udp=1, tcp_established=7440, tcp_transitory=30, icmp=1) + udp=1, tcp_established=7440, tcp_transitory=30, icmp=1 + ) tcp_port_out = self.init_tcp_session(self.pg0, self.pg1, 2000, 80) pkts = [] for i in range(0, self.max_sessions - 1): - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) / - UDP(sport=7000+i, dport=80)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) + / UDP(sport=7000 + i, dport=80) + ) pkts.append(p) self.pg0.add_stream(pkts) @@ -2551,9 +2755,11 @@ class TestNAT44EDMW(TestNAT44ED): pkts = [] for i in range(0, self.max_sessions - 1): - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) / - ICMP(id=8000+i, type='echo-request')) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) + / ICMP(id=8000 + i, type="echo-request") + ) pkts.append(p) self.pg0.add_stream(pkts) @@ -2562,79 +2768,89 @@ class TestNAT44EDMW(TestNAT44ED): self.pg1.get_capture(len(pkts)) def test_session_rst_timeout(self): - """ NAT44ED session RST timeouts """ + """NAT44ED session RST timeouts""" self.nat_add_address(self.nat_addr) self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg1) - self.vapi.nat_set_timeouts(udp=300, tcp_established=7440, - tcp_transitory=5, icmp=60) + self.vapi.nat_set_timeouts( + udp=300, tcp_established=7440, tcp_transitory=5, icmp=60 + ) - self.init_tcp_session(self.pg0, self.pg1, self.tcp_port_in, - self.tcp_external_port) - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, - flags="R")) + self.init_tcp_session( + self.pg0, self.pg1, self.tcp_port_in, self.tcp_external_port + ) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="R") + ) self.send_and_expect(self.pg0, p, self.pg1) self.virtual_sleep(6) # The session is already closed - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, - flags="P")) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="P") + ) self.send_and_assert_no_replies(self.pg0, p, self.pg1) # The session can be re-opened - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, - flags="S")) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="S") + ) self.send_and_expect(self.pg0, p, self.pg1) def test_session_rst_established_timeout(self): - """ NAT44ED session RST timeouts """ + """NAT44ED session RST timeouts""" self.nat_add_address(self.nat_addr) self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg1) - self.vapi.nat_set_timeouts(udp=300, tcp_established=7440, - tcp_transitory=5, icmp=60) + self.vapi.nat_set_timeouts( + udp=300, tcp_established=7440, tcp_transitory=5, icmp=60 + ) - self.init_tcp_session(self.pg0, self.pg1, self.tcp_port_in, - self.tcp_external_port) + self.init_tcp_session( + self.pg0, self.pg1, self.tcp_port_in, self.tcp_external_port + ) # Wait at least the transitory time, the session is in established # state anyway. RST followed by a data packet should move it to # transitory state. self.virtual_sleep(6) - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, - flags="R")) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="R") + ) self.send_and_expect(self.pg0, p, self.pg1) - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, - flags="P")) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="P") + ) self.send_and_expect(self.pg0, p, self.pg1) # State is transitory, session should be closed after 6 seconds self.virtual_sleep(6) - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, - flags="P")) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="P") + ) self.send_and_assert_no_replies(self.pg0, p, self.pg1) def test_dynamic_out_of_ports(self): - """ NAT44ED dynamic translation test: out of ports """ + """NAT44ED dynamic translation test: out of ports""" self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg1) @@ -2643,60 +2859,71 @@ class TestNAT44EDMW(TestNAT44ED): pkts = self.create_stream_in(self.pg0, self.pg1) self.send_and_assert_no_replies( - self.pg0, pkts, msg="i2o pkts", - stats_diff=self.no_diff | { + self.pg0, + pkts, + msg="i2o pkts", + stats_diff=self.no_diff + | { "err": { - '/err/nat44-ed-in2out-slowpath/out of ports': len(pkts), + "/err/nat44-ed-in2out-slowpath/out of ports": len(pkts), }, self.pg0.sw_if_index: { - '/nat44-ed/in2out/slowpath/drops': len(pkts), + "/nat44-ed/in2out/slowpath/drops": len(pkts), }, - } + }, ) # in2out after NAT addresses added self.nat_add_address(self.nat_addr) - tcpn, udpn, icmpn = (sum(x) for x in - zip(*((TCP in p, UDP in p, ICMP in p) - for p in pkts))) + tcpn, udpn, icmpn = ( + sum(x) for x in zip(*((TCP in p, UDP in p, ICMP in p) for p in pkts)) + ) self.send_and_expect( - self.pg0, pkts, self.pg1, msg="i2o pkts", - stats_diff=self.no_diff | { + self.pg0, + pkts, + self.pg1, + msg="i2o pkts", + stats_diff=self.no_diff + | { "err": { - '/err/nat44-ed-in2out-slowpath/out of ports': 0, + "/err/nat44-ed-in2out-slowpath/out of ports": 0, }, self.pg0.sw_if_index: { - '/nat44-ed/in2out/slowpath/drops': 0, - '/nat44-ed/in2out/slowpath/tcp': tcpn, - '/nat44-ed/in2out/slowpath/udp': udpn, - '/nat44-ed/in2out/slowpath/icmp': icmpn, + "/nat44-ed/in2out/slowpath/drops": 0, + "/nat44-ed/in2out/slowpath/tcp": tcpn, + "/nat44-ed/in2out/slowpath/udp": udpn, + "/nat44-ed/in2out/slowpath/icmp": icmpn, }, - } + }, ) def test_unknown_proto(self): - """ NAT44ED translate packet with unknown protocol """ + """NAT44ED translate packet with unknown protocol""" self.nat_add_address(self.nat_addr) self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg1) # in2out - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=20)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=self.tcp_port_in, dport=20) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() p = self.pg1.get_capture(1) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - GRE() / - IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) / - TCP(sport=1234, dport=1234)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / GRE() + / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) + / TCP(sport=1234, dport=1234) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2712,11 +2939,13 @@ class TestNAT44EDMW(TestNAT44ED): raise # out2in - p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - GRE() / - IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) / - TCP(sport=1234, dport=1234)) + p = ( + Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / GRE() + / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) + / TCP(sport=1234, dport=1234) + ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2732,7 +2961,7 @@ class TestNAT44EDMW(TestNAT44ED): raise def test_hairpinning_unknown_proto(self): - """ NAT44ED translate packet with unknown protocol - hairpinning """ + """NAT44ED translate packet with unknown protocol - hairpinning""" host = self.pg0.remote_hosts[0] server = self.pg0.remote_hosts[1] host_in_port = 1234 @@ -2747,19 +2976,23 @@ class TestNAT44EDMW(TestNAT44ED): self.nat_add_static_mapping(server.ip4, server_nat_ip) # host to server - p = (Ether(src=host.mac, dst=self.pg0.local_mac) / - IP(src=host.ip4, dst=server_nat_ip) / - TCP(sport=host_in_port, dport=server_out_port)) + p = ( + Ether(src=host.mac, dst=self.pg0.local_mac) + / IP(src=host.ip4, dst=server_nat_ip) + / TCP(sport=host_in_port, dport=server_out_port) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.pg0.get_capture(1) - p = (Ether(dst=self.pg0.local_mac, src=host.mac) / - IP(src=host.ip4, dst=server_nat_ip) / - GRE() / - IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) / - TCP(sport=1234, dport=1234)) + p = ( + Ether(dst=self.pg0.local_mac, src=host.mac) + / IP(src=host.ip4, dst=server_nat_ip) + / GRE() + / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) + / TCP(sport=1234, dport=1234) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2775,11 +3008,13 @@ class TestNAT44EDMW(TestNAT44ED): raise # server to host - p = (Ether(dst=self.pg0.local_mac, src=server.mac) / - IP(src=server.ip4, dst=self.nat_addr) / - GRE() / - IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) / - TCP(sport=1234, dport=1234)) + p = ( + Ether(dst=self.pg0.local_mac, src=server.mac) + / IP(src=server.ip4, dst=self.nat_addr) + / GRE() + / IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) + / TCP(sport=1234, dport=1234) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2795,8 +3030,8 @@ class TestNAT44EDMW(TestNAT44ED): raise def test_output_feature_and_service(self): - """ NAT44ED interface output feature and services """ - external_addr = '1.2.3.4' + """NAT44ED interface output feature and services""" + external_addr = "1.2.3.4" external_port = 80 local_port = 8080 @@ -2804,22 +3039,33 @@ class TestNAT44EDMW(TestNAT44ED): self.nat_add_address(self.nat_addr) flags = self.config_flags.NAT_IS_ADDR_ONLY self.vapi.nat44_add_del_identity_mapping( - ip_address=self.pg1.remote_ip4, sw_if_index=0xFFFFFFFF, - flags=flags, is_add=1) + ip_address=self.pg1.remote_ip4, + sw_if_index=0xFFFFFFFF, + flags=flags, + is_add=1, + ) flags = self.config_flags.NAT_IS_OUT2IN_ONLY - self.nat_add_static_mapping(self.pg0.remote_ip4, external_addr, - local_port, external_port, - proto=IP_PROTOS.tcp, flags=flags) + self.nat_add_static_mapping( + self.pg0.remote_ip4, + external_addr, + local_port, + external_port, + proto=IP_PROTOS.tcp, + flags=flags, + ) self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg0) self.vapi.nat44_ed_add_del_output_interface( - sw_if_index=self.pg1.sw_if_index, is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # from client to service - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=external_addr) / - TCP(sport=12345, dport=external_port)) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=external_addr) + / TCP(sport=12345, dport=external_port) + ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2836,9 +3082,11 @@ class TestNAT44EDMW(TestNAT44ED): raise # from service back to client - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=local_port, dport=12345)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=local_port, dport=12345) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2877,26 +3125,34 @@ class TestNAT44EDMW(TestNAT44ED): self.verify_capture_in(capture, self.pg0) def test_output_feature_and_service3(self): - """ NAT44ED interface output feature and DST NAT """ - external_addr = '1.2.3.4' + """NAT44ED interface output feature and DST NAT""" + external_addr = "1.2.3.4" external_port = 80 local_port = 8080 self.vapi.nat44_forwarding_enable_disable(enable=1) self.nat_add_address(self.nat_addr) flags = self.config_flags.NAT_IS_OUT2IN_ONLY - self.nat_add_static_mapping(self.pg1.remote_ip4, external_addr, - local_port, external_port, - proto=IP_PROTOS.tcp, flags=flags) + self.nat_add_static_mapping( + self.pg1.remote_ip4, + external_addr, + local_port, + external_port, + proto=IP_PROTOS.tcp, + flags=flags, + ) self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg0) self.vapi.nat44_ed_add_del_output_interface( - sw_if_index=self.pg1.sw_if_index, is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=external_addr) / - TCP(sport=12345, dport=external_port)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=external_addr) + / TCP(sport=12345, dport=external_port) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2914,9 +3170,11 @@ class TestNAT44EDMW(TestNAT44ED): self.logger.error(ppp("Unexpected or invalid packet:", p)) raise - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) / - TCP(sport=local_port, dport=12345)) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) + / TCP(sport=local_port, dport=12345) + ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -2935,17 +3193,15 @@ class TestNAT44EDMW(TestNAT44ED): raise def test_self_twice_nat_lb_negative(self): - """ NAT44ED Self Twice NAT local service load balancing (negative test) - """ - self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True, - client_id=2) + """NAT44ED Self Twice NAT local service load balancing (negative test)""" + self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True, client_id=2) def test_self_twice_nat_negative(self): - """ NAT44ED Self Twice NAT (negative test) """ + """NAT44ED Self Twice NAT (negative test)""" self.twice_nat_common(self_twice_nat=True) def test_static_lb_multi_clients(self): - """ NAT44ED local service load balancing - multiple clients""" + """NAT44ED local service load balancing - multiple clients""" external_addr = self.nat_addr external_port = 80 @@ -2954,39 +3210,39 @@ class TestNAT44EDMW(TestNAT44ED): server2 = self.pg0.remote_hosts[1] server3 = self.pg0.remote_hosts[2] - locals = [{'addr': server1.ip4, - 'port': local_port, - 'probability': 90, - 'vrf_id': 0}, - {'addr': server2.ip4, - 'port': local_port, - 'probability': 10, - 'vrf_id': 0}] + locals = [ + {"addr": server1.ip4, "port": local_port, "probability": 90, "vrf_id": 0}, + {"addr": server2.ip4, "port": local_port, "probability": 10, "vrf_id": 0}, + ] flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg1.sw_if_index, - is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) self.nat_add_address(self.nat_addr) - self.vapi.nat44_add_del_lb_static_mapping(is_add=1, - external_addr=external_addr, - external_port=external_port, - protocol=IP_PROTOS.tcp, - local_num=len(locals), - locals=locals) + self.vapi.nat44_add_del_lb_static_mapping( + is_add=1, + external_addr=external_addr, + external_port=external_port, + protocol=IP_PROTOS.tcp, + local_num=len(locals), + locals=locals, + ) server1_n = 0 server2_n = 0 clients = ip4_range(self.pg1.remote_ip4, 10, 50) pkts = [] for client in clients: - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=client, dst=self.nat_addr) / - TCP(sport=12345, dport=external_port)) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=client, dst=self.nat_addr) + / TCP(sport=12345, dport=external_port) + ) pkts.append(p) self.pg1.add_stream(pkts) self.pg_enable_capture(self.pg_interfaces) @@ -3000,10 +3256,10 @@ class TestNAT44EDMW(TestNAT44ED): self.assertGreaterEqual(server1_n, server2_n) local = { - 'addr': server3.ip4, - 'port': local_port, - 'probability': 20, - 'vrf_id': 0 + "addr": server3.ip4, + "port": local_port, + "probability": 20, + "vrf_id": 0, } # add new back-end @@ -3012,16 +3268,19 @@ class TestNAT44EDMW(TestNAT44ED): external_addr=external_addr, external_port=external_port, local=local, - protocol=IP_PROTOS.tcp) + protocol=IP_PROTOS.tcp, + ) server1_n = 0 server2_n = 0 server3_n = 0 clients = ip4_range(self.pg1.remote_ip4, 60, 110) pkts = [] for client in clients: - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=client, dst=self.nat_addr) / - TCP(sport=12346, dport=external_port)) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=client, dst=self.nat_addr) + / TCP(sport=12346, dport=external_port) + ) pkts.append(p) self.assertGreater(len(pkts), 0) self.pg1.add_stream(pkts) @@ -3040,10 +3299,10 @@ class TestNAT44EDMW(TestNAT44ED): self.assertGreater(server3_n, 0) local = { - 'addr': server2.ip4, - 'port': local_port, - 'probability': 10, - 'vrf_id': 0 + "addr": server2.ip4, + "port": local_port, + "probability": 10, + "vrf_id": 0, } # remove one back-end @@ -3052,7 +3311,8 @@ class TestNAT44EDMW(TestNAT44ED): external_addr=external_addr, external_port=external_port, local=local, - protocol=IP_PROTOS.tcp) + protocol=IP_PROTOS.tcp, + ) server1_n = 0 server2_n = 0 server3_n = 0 @@ -3075,39 +3335,39 @@ class TestNAT44EDMW(TestNAT44ED): # setting syslog sender cannot be undone and if it is set, it messes # with self.send_and_assert_no_replies functionality def test_zzz_syslog_sess(self): - """ NAT44ED Test syslog session creation and deletion """ - self.vapi.syslog_set_filter( - self.syslog_severity.SYSLOG_API_SEVERITY_INFO) + """NAT44ED Test syslog session creation and deletion""" + self.vapi.syslog_set_filter(self.syslog_severity.SYSLOG_API_SEVERITY_INFO) self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4) self.nat_add_address(self.nat_addr) self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg1) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.get_capture(1) self.tcp_port_out = capture[0][TCP].sport capture = self.pg3.get_capture(1) - self.verify_syslog_sess(capture[0][Raw].load, 'SADD') + self.verify_syslog_sess(capture[0][Raw].load, "SADD") self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.nat_add_address(self.nat_addr, is_add=0) capture = self.pg3.get_capture(1) - self.verify_syslog_sess(capture[0][Raw].load, 'SDEL') + self.verify_syslog_sess(capture[0][Raw].load, "SDEL") # put zzz in front of syslog test name so that it runs as a last test # setting syslog sender cannot be undone and if it is set, it messes # with self.send_and_assert_no_replies functionality def test_zzz_syslog_sess_reopen(self): - """ Syslog events for session reopen """ - self.vapi.syslog_set_filter( - self.syslog_severity.SYSLOG_API_SEVERITY_INFO) + """Syslog events for session reopen""" + self.vapi.syslog_set_filter(self.syslog_severity.SYSLOG_API_SEVERITY_INFO) self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4) self.nat_add_address(self.nat_addr) @@ -3115,55 +3375,62 @@ class TestNAT44EDMW(TestNAT44ED): self.nat_add_outside_interface(self.pg1) # SYN in2out - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port) + ) capture = self.send_and_expect(self.pg0, p, self.pg1)[0] self.tcp_port_out = capture[0][TCP].sport capture = self.pg3.get_capture(1) - self.verify_syslog_sess(capture[0][Raw].load, 'SADD') + self.verify_syslog_sess(capture[0][Raw].load, "SADD") # SYN out2in - p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, - flags='SA')) + p = ( + Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="SA") + ) self.send_and_expect(self.pg1, p, self.pg0) - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, - flags="A")) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="A") + ) self.send_and_expect(self.pg0, p, self.pg1) # FIN in2out - p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, - flags="F")) + p = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="F") + ) self.send_and_expect(self.pg0, p, self.pg1) # FIN out2in - p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, - flags="F")) + p = ( + Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="F") + ) self.send_and_expect(self.pg1, p, self.pg0) - self.init_tcp_session(self.pg0, self.pg1, self.tcp_port_in, - self.tcp_external_port) + self.init_tcp_session( + self.pg0, self.pg1, self.tcp_port_in, self.tcp_external_port + ) # 2 records should be produced - first one del & add capture = self.pg3.get_capture(2) - self.verify_syslog_sess(capture[0][Raw].load, 'SDEL') - self.verify_syslog_sess(capture[1][Raw].load, 'SADD') + self.verify_syslog_sess(capture[0][Raw].load, "SDEL") + self.verify_syslog_sess(capture[1][Raw].load, "SADD") def test_twice_nat_interface_addr(self): - """ NAT44ED Acquire twice NAT addresses from interface """ + """NAT44ED Acquire twice NAT addresses from interface""" flags = self.config_flags.NAT_IS_TWICE_NAT self.vapi.nat44_add_del_interface_addr( - sw_if_index=self.pg11.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg11.sw_if_index, flags=flags, is_add=1 + ) # no address in NAT pool adresses = self.vapi.nat44_address_dump() @@ -3173,8 +3440,7 @@ class TestNAT44EDMW(TestNAT44ED): self.pg11.config_ip4() adresses = self.vapi.nat44_address_dump() self.assertEqual(1, len(adresses)) - self.assertEqual(str(adresses[0].ip_address), - self.pg11.local_ip4) + self.assertEqual(str(adresses[0].ip_address), self.pg11.local_ip4) self.assertEqual(adresses[0].flags, flags) # remove interface address and check NAT address pool @@ -3183,11 +3449,12 @@ class TestNAT44EDMW(TestNAT44ED): self.assertEqual(0, len(adresses)) def test_output_feature_stateful_acl(self): - """ NAT44ED output feature works with stateful ACL """ + """NAT44ED output feature works with stateful ACL""" self.nat_add_address(self.nat_addr) self.vapi.nat44_ed_add_del_output_interface( - sw_if_index=self.pg1.sw_if_index, is_add=1) + sw_if_index=self.pg1.sw_if_index, is_add=1 + ) # First ensure that the NAT is working sans ACL @@ -3197,14 +3464,16 @@ class TestNAT44EDMW(TestNAT44ED): # send packets into inside intf, ensure received via outside intf pkts_in2out = self.create_stream_in(self.pg0, self.pg1) - capture = self.send_and_expect(self.pg0, pkts_in2out, self.pg1, - len(pkts_in2out)) + capture = self.send_and_expect( + self.pg0, pkts_in2out, self.pg1, len(pkts_in2out) + ) self.verify_capture_out(capture, ignore_port=True) # send out2in again, with sessions created it should work now pkts_out2in = self.create_stream_out(self.pg1) - capture = self.send_and_expect(self.pg1, pkts_out2in, self.pg0, - len(pkts_out2in)) + capture = self.send_and_expect( + self.pg1, pkts_out2in, self.pg0, len(pkts_out2in) + ) self.verify_capture_in(capture, self.pg0) # Create an ACL blocking everything @@ -3218,8 +3487,9 @@ class TestNAT44EDMW(TestNAT44ED): in2out_acl.add_vpp_config() # apply as input acl on interface and confirm it blocks everything - acl_if = VppAclInterface(self, sw_if_index=self.pg1.sw_if_index, - n_input=1, acls=[out2in_acl]) + acl_if = VppAclInterface( + self, sw_if_index=self.pg1.sw_if_index, n_input=1, acls=[out2in_acl] + ) acl_if.add_vpp_config() self.send_and_assert_no_replies(self.pg1, pkts_out2in) @@ -3227,8 +3497,9 @@ class TestNAT44EDMW(TestNAT44ED): acl_if.acls = [out2in_acl, in2out_acl] acl_if.add_vpp_config() # send in2out to generate ACL state (NAT state was created earlier) - capture = self.send_and_expect(self.pg0, pkts_in2out, self.pg1, - len(pkts_in2out)) + capture = self.send_and_expect( + self.pg0, pkts_in2out, self.pg1, len(pkts_in2out) + ) self.verify_capture_out(capture, ignore_port=True) # send out2in again. ACL state exists so it should work now. @@ -3236,13 +3507,14 @@ class TestNAT44EDMW(TestNAT44ED): for p in pkts_out2in: if p.haslayer(TCP) and p[TCP].flags & 0x02: p[TCP].flags |= 0x10 - capture = self.send_and_expect(self.pg1, pkts_out2in, self.pg0, - len(pkts_out2in)) + capture = self.send_and_expect( + self.pg1, pkts_out2in, self.pg0, len(pkts_out2in) + ) self.verify_capture_in(capture, self.pg0) self.logger.info(self.vapi.cli("show trace")) def test_tcp_close(self): - """ NAT44ED Close TCP session from inside network - output feature """ + """NAT44ED Close TCP session from inside network - output feature""" config = self.vapi.nat44_show_running_config() old_timeouts = config.timeouts new_transitory = 2 @@ -3250,77 +3522,92 @@ class TestNAT44EDMW(TestNAT44ED): udp=old_timeouts.udp, tcp_established=old_timeouts.tcp_established, icmp=old_timeouts.icmp, - tcp_transitory=new_transitory) + tcp_transitory=new_transitory, + ) self.vapi.nat44_forwarding_enable_disable(enable=1) self.nat_add_address(self.pg1.local_ip4) - twice_nat_addr = '10.0.1.3' - service_ip = '192.168.16.150' + twice_nat_addr = "10.0.1.3" + service_ip = "192.168.16.150" self.nat_add_address(twice_nat_addr, twice_nat=1) flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - is_add=1) + sw_if_index=self.pg0.sw_if_index, is_add=1 + ) self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg0.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg0.sw_if_index, flags=flags, is_add=1 + ) self.vapi.nat44_ed_add_del_output_interface( - is_add=1, - sw_if_index=self.pg1.sw_if_index) - - flags = (self.config_flags.NAT_IS_OUT2IN_ONLY | - self.config_flags.NAT_IS_TWICE_NAT) - self.nat_add_static_mapping(self.pg0.remote_ip4, - service_ip, 80, 80, - proto=IP_PROTOS.tcp, - flags=flags) + is_add=1, sw_if_index=self.pg1.sw_if_index + ) + + flags = ( + self.config_flags.NAT_IS_OUT2IN_ONLY | self.config_flags.NAT_IS_TWICE_NAT + ) + self.nat_add_static_mapping( + self.pg0.remote_ip4, service_ip, 80, 80, proto=IP_PROTOS.tcp, flags=flags + ) sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0) start_sessnum = len(sessions) # SYN packet out->in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=service_ip) / - TCP(sport=33898, dport=80, flags="S")) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=service_ip) + / TCP(sport=33898, dport=80, flags="S") + ) capture = self.send_and_expect(self.pg1, p, self.pg0, n_rx=1) p = capture[0] tcp_port = p[TCP].sport # SYN + ACK packet in->out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) / - TCP(sport=80, dport=tcp_port, flags="SA")) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) + / TCP(sport=80, dport=tcp_port, flags="SA") + ) self.send_and_expect(self.pg0, p, self.pg1, n_rx=1) # ACK packet out->in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=service_ip) / - TCP(sport=33898, dport=80, flags="A")) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=service_ip) + / TCP(sport=33898, dport=80, flags="A") + ) self.send_and_expect(self.pg1, p, self.pg0, n_rx=1) # FIN packet in -> out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) / - TCP(sport=80, dport=tcp_port, flags="FA", seq=100, ack=300)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) + / TCP(sport=80, dport=tcp_port, flags="FA", seq=100, ack=300) + ) self.send_and_expect(self.pg0, p, self.pg1, n_rx=1) # FIN+ACK packet out -> in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=service_ip) / - TCP(sport=33898, dport=80, flags="FA", seq=300, ack=101)) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=service_ip) + / TCP(sport=33898, dport=80, flags="FA", seq=300, ack=101) + ) self.send_and_expect(self.pg1, p, self.pg0, n_rx=1) # ACK packet in -> out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) / - TCP(sport=80, dport=tcp_port, flags="A", seq=101, ack=301)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) + / TCP(sport=80, dport=tcp_port, flags="A", seq=101, ack=301) + ) self.send_and_expect(self.pg0, p, self.pg1, n_rx=1) # session now in transitory timeout, but traffic still flows # try FIN packet out->in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=service_ip) / - TCP(sport=33898, dport=80, flags="F")) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=service_ip) + / TCP(sport=33898, dport=80, flags="F") + ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -3334,15 +3621,17 @@ class TestNAT44EDMW(TestNAT44ED): # send FIN+ACK packet out -> in - will cause session to be wiped # but won't create a new session - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=service_ip) / - TCP(sport=33898, dport=80, flags="FA", seq=300, ack=101)) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=service_ip) + / TCP(sport=33898, dport=80, flags="FA", seq=300, ack=101) + ) self.send_and_assert_no_replies(self.pg1, p) sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0) self.assertEqual(len(sessions) - start_sessnum, 0) def test_tcp_session_close_in(self): - """ NAT44ED Close TCP session from inside network """ + """NAT44ED Close TCP session from inside network""" in_port = self.tcp_port_in out_port = 10505 @@ -3351,79 +3640,92 @@ class TestNAT44EDMW(TestNAT44ED): self.nat_add_address(self.nat_addr) self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg1) - self.nat_add_static_mapping(self.pg0.remote_ip4, self.nat_addr, - in_port, out_port, proto=IP_PROTOS.tcp, - flags=self.config_flags.NAT_IS_TWICE_NAT) + self.nat_add_static_mapping( + self.pg0.remote_ip4, + self.nat_addr, + in_port, + out_port, + proto=IP_PROTOS.tcp, + flags=self.config_flags.NAT_IS_TWICE_NAT, + ) sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0) session_n = len(sessions) - self.vapi.nat_set_timeouts(udp=300, tcp_established=7440, - tcp_transitory=2, icmp=5) + self.vapi.nat_set_timeouts( + udp=300, tcp_established=7440, tcp_transitory=2, icmp=5 + ) self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port) # FIN packet in -> out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=in_port, dport=ext_port, - flags="FA", seq=100, ack=300)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300) + ) self.send_and_expect(self.pg0, p, self.pg1) pkts = [] # ACK packet out -> in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=ext_port, dport=out_port, - flags="A", seq=300, ack=101)) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=ext_port, dport=out_port, flags="A", seq=300, ack=101) + ) pkts.append(p) # FIN packet out -> in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=ext_port, dport=out_port, - flags="FA", seq=300, ack=101)) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101) + ) pkts.append(p) self.send_and_expect(self.pg1, pkts, self.pg0) # ACK packet in -> out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=in_port, dport=ext_port, - flags="A", seq=101, ack=301)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301) + ) self.send_and_expect(self.pg0, p, self.pg1) sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0) self.assertEqual(len(sessions) - session_n, 1) # retransmit FIN packet out -> in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=ext_port, dport=out_port, - flags="FA", seq=300, ack=101)) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101) + ) self.send_and_expect(self.pg1, p, self.pg0) # retransmit ACK packet in -> out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=in_port, dport=ext_port, - flags="A", seq=101, ack=301)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301) + ) self.send_and_expect(self.pg0, p, self.pg1) self.virtual_sleep(3) # retransmit ACK packet in -> out - this will cause session to be wiped - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=in_port, dport=ext_port, - flags="A", seq=101, ack=301)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301) + ) self.send_and_assert_no_replies(self.pg0, p) sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0) self.assertEqual(len(sessions) - session_n, 0) def test_tcp_session_close_out(self): - """ NAT44ED Close TCP session from outside network """ + """NAT44ED Close TCP session from outside network""" in_port = self.tcp_port_in out_port = 10505 @@ -3432,33 +3734,41 @@ class TestNAT44EDMW(TestNAT44ED): self.nat_add_address(self.nat_addr) self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg1) - self.nat_add_static_mapping(self.pg0.remote_ip4, self.nat_addr, - in_port, out_port, proto=IP_PROTOS.tcp, - flags=self.config_flags.NAT_IS_TWICE_NAT) + self.nat_add_static_mapping( + self.pg0.remote_ip4, + self.nat_addr, + in_port, + out_port, + proto=IP_PROTOS.tcp, + flags=self.config_flags.NAT_IS_TWICE_NAT, + ) sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0) session_n = len(sessions) - self.vapi.nat_set_timeouts(udp=300, tcp_established=7440, - tcp_transitory=2, icmp=5) + self.vapi.nat_set_timeouts( + udp=300, tcp_established=7440, tcp_transitory=2, icmp=5 + ) _ = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port) # FIN packet out -> in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=ext_port, dport=out_port, - flags="FA", seq=100, ack=300)) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=ext_port, dport=out_port, flags="FA", seq=100, ack=300) + ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() self.pg0.get_capture(1) # FIN+ACK packet in -> out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=in_port, dport=ext_port, - flags="FA", seq=300, ack=101)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="FA", seq=300, ack=101) + ) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) @@ -3466,10 +3776,11 @@ class TestNAT44EDMW(TestNAT44ED): self.pg1.get_capture(1) # ACK packet out -> in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=ext_port, dport=out_port, - flags="A", seq=101, ack=301)) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=ext_port, dport=out_port, flags="A", seq=101, ack=301) + ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -3479,31 +3790,34 @@ class TestNAT44EDMW(TestNAT44ED): self.assertEqual(len(sessions) - session_n, 1) # retransmit FIN packet out -> in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=ext_port, dport=out_port, - flags="FA", seq=300, ack=101)) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101) + ) self.send_and_expect(self.pg1, p, self.pg0) # retransmit ACK packet in -> out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=in_port, dport=ext_port, - flags="A", seq=101, ack=301)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301) + ) self.send_and_expect(self.pg0, p, self.pg1) self.virtual_sleep(3) # retransmit ACK packet in -> out - this will cause session to be wiped - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=in_port, dport=ext_port, - flags="A", seq=101, ack=301)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301) + ) self.send_and_assert_no_replies(self.pg0, p) sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0) self.assertEqual(len(sessions) - session_n, 0) def test_tcp_session_close_simultaneous(self): - """ Simultaneous TCP close from both sides """ + """Simultaneous TCP close from both sides""" in_port = self.tcp_port_in ext_port = 10505 @@ -3511,252 +3825,297 @@ class TestNAT44EDMW(TestNAT44ED): self.nat_add_address(self.nat_addr) self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg1) - self.nat_add_static_mapping(self.pg0.remote_ip4, self.nat_addr, - in_port, ext_port, proto=IP_PROTOS.tcp, - flags=self.config_flags.NAT_IS_TWICE_NAT) + self.nat_add_static_mapping( + self.pg0.remote_ip4, + self.nat_addr, + in_port, + ext_port, + proto=IP_PROTOS.tcp, + flags=self.config_flags.NAT_IS_TWICE_NAT, + ) sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0) session_n = len(sessions) - self.vapi.nat_set_timeouts(udp=300, tcp_established=7440, - tcp_transitory=2, icmp=5) + self.vapi.nat_set_timeouts( + udp=300, tcp_established=7440, tcp_transitory=2, icmp=5 + ) out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port) # FIN packet in -> out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=in_port, dport=ext_port, - flags="FA", seq=100, ack=300)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300) + ) self.send_and_expect(self.pg0, p, self.pg1) # FIN packet out -> in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=ext_port, dport=out_port, - flags="FA", seq=300, ack=100)) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100) + ) self.send_and_expect(self.pg1, p, self.pg0) # ACK packet in -> out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=in_port, dport=ext_port, - flags="A", seq=101, ack=301)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301) + ) self.send_and_expect(self.pg0, p, self.pg1) # ACK packet out -> in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=ext_port, dport=out_port, - flags="A", seq=301, ack=101)) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=ext_port, dport=out_port, flags="A", seq=301, ack=101) + ) self.send_and_expect(self.pg1, p, self.pg0) sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0) self.assertEqual(len(sessions) - session_n, 1) # retransmit FIN packet out -> in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=ext_port, dport=out_port, - flags="FA", seq=300, ack=101)) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=101) + ) self.send_and_expect(self.pg1, p, self.pg0) # retransmit ACK packet in -> out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=in_port, dport=ext_port, - flags="A", seq=101, ack=301)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301) + ) self.send_and_expect(self.pg0, p, self.pg1) self.virtual_sleep(3) # retransmit ACK packet in -> out - this will cause session to be wiped - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=in_port, dport=ext_port, - flags="A", seq=101, ack=301)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301) + ) self.pg_send(self.pg0, p) self.send_and_assert_no_replies(self.pg0, p) sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0) self.assertEqual(len(sessions) - session_n, 0) def test_tcp_session_half_reopen_inside(self): - """ TCP session in FIN/FIN state not reopened by in2out SYN only """ + """TCP session in FIN/FIN state not reopened by in2out SYN only""" in_port = self.tcp_port_in ext_port = 10505 self.nat_add_address(self.nat_addr) self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg1) - self.nat_add_static_mapping(self.pg0.remote_ip4, self.nat_addr, - in_port, ext_port, proto=IP_PROTOS.tcp, - flags=self.config_flags.NAT_IS_TWICE_NAT) + self.nat_add_static_mapping( + self.pg0.remote_ip4, + self.nat_addr, + in_port, + ext_port, + proto=IP_PROTOS.tcp, + flags=self.config_flags.NAT_IS_TWICE_NAT, + ) sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0) session_n = len(sessions) - self.vapi.nat_set_timeouts(udp=300, tcp_established=7440, - tcp_transitory=2, icmp=5) + self.vapi.nat_set_timeouts( + udp=300, tcp_established=7440, tcp_transitory=2, icmp=5 + ) out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port) # FIN packet in -> out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=in_port, dport=ext_port, - flags="FA", seq=100, ack=300)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300) + ) self.send_and_expect(self.pg0, p, self.pg1) # FIN packet out -> in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=ext_port, dport=out_port, - flags="FA", seq=300, ack=100)) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100) + ) self.send_and_expect(self.pg1, p, self.pg0) sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0) self.assertEqual(len(sessions) - session_n, 1) # send SYN packet in -> out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=in_port, dport=ext_port, - flags="S", seq=101, ack=301)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="S", seq=101, ack=301) + ) self.send_and_expect(self.pg0, p, self.pg1) self.virtual_sleep(3) # send ACK packet in -> out - session should be wiped - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=in_port, dport=ext_port, - flags="A", seq=101, ack=301)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301) + ) self.send_and_assert_no_replies(self.pg0, p, self.pg1) sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0) self.assertEqual(len(sessions) - session_n, 0) def test_tcp_session_half_reopen_outside(self): - """ TCP session in FIN/FIN state not reopened by out2in SYN only """ + """TCP session in FIN/FIN state not reopened by out2in SYN only""" in_port = self.tcp_port_in ext_port = 10505 self.nat_add_address(self.nat_addr) self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg1) - self.nat_add_static_mapping(self.pg0.remote_ip4, self.nat_addr, - in_port, ext_port, proto=IP_PROTOS.tcp, - flags=self.config_flags.NAT_IS_TWICE_NAT) + self.nat_add_static_mapping( + self.pg0.remote_ip4, + self.nat_addr, + in_port, + ext_port, + proto=IP_PROTOS.tcp, + flags=self.config_flags.NAT_IS_TWICE_NAT, + ) sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0) session_n = len(sessions) - self.vapi.nat_set_timeouts(udp=300, tcp_established=7440, - tcp_transitory=2, icmp=5) + self.vapi.nat_set_timeouts( + udp=300, tcp_established=7440, tcp_transitory=2, icmp=5 + ) out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port) # FIN packet in -> out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=in_port, dport=ext_port, - flags="FA", seq=100, ack=300)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300) + ) self.send_and_expect(self.pg0, p, self.pg1) # FIN packet out -> in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=ext_port, dport=out_port, - flags="FA", seq=300, ack=100)) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100) + ) self.send_and_expect(self.pg1, p, self.pg0) sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0) self.assertEqual(len(sessions) - session_n, 1) # send SYN packet out -> in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=ext_port, dport=out_port, - flags="S", seq=300, ack=101)) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=ext_port, dport=out_port, flags="S", seq=300, ack=101) + ) self.send_and_expect(self.pg1, p, self.pg0) self.virtual_sleep(3) # send ACK packet in -> out - session should be wiped - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=in_port, dport=ext_port, - flags="A", seq=101, ack=301)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301) + ) self.send_and_assert_no_replies(self.pg0, p, self.pg1) sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0) self.assertEqual(len(sessions) - session_n, 0) def test_tcp_session_reopen(self): - """ TCP session in FIN/FIN state reopened by SYN from both sides """ + """TCP session in FIN/FIN state reopened by SYN from both sides""" in_port = self.tcp_port_in ext_port = 10505 self.nat_add_address(self.nat_addr) self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg1) - self.nat_add_static_mapping(self.pg0.remote_ip4, self.nat_addr, - in_port, ext_port, proto=IP_PROTOS.tcp, - flags=self.config_flags.NAT_IS_TWICE_NAT) + self.nat_add_static_mapping( + self.pg0.remote_ip4, + self.nat_addr, + in_port, + ext_port, + proto=IP_PROTOS.tcp, + flags=self.config_flags.NAT_IS_TWICE_NAT, + ) sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0) session_n = len(sessions) - self.vapi.nat_set_timeouts(udp=300, tcp_established=7440, - tcp_transitory=2, icmp=5) + self.vapi.nat_set_timeouts( + udp=300, tcp_established=7440, tcp_transitory=2, icmp=5 + ) out_port = self.init_tcp_session(self.pg0, self.pg1, in_port, ext_port) # FIN packet in -> out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=in_port, dport=ext_port, - flags="FA", seq=100, ack=300)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="FA", seq=100, ack=300) + ) self.send_and_expect(self.pg0, p, self.pg1) # FIN packet out -> in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=ext_port, dport=out_port, - flags="FA", seq=300, ack=100)) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=ext_port, dport=out_port, flags="FA", seq=300, ack=100) + ) self.send_and_expect(self.pg1, p, self.pg0) sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0) self.assertEqual(len(sessions) - session_n, 1) # send SYN packet out -> in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=ext_port, dport=out_port, - flags="S", seq=300, ack=101)) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=ext_port, dport=out_port, flags="S", seq=300, ack=101) + ) self.send_and_expect(self.pg1, p, self.pg0) # send SYN packet in -> out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=in_port, dport=ext_port, - flags="SA", seq=101, ack=301)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="SA", seq=101, ack=301) + ) self.send_and_expect(self.pg0, p, self.pg1) # send ACK packet out -> in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=ext_port, dport=out_port, - flags="A", seq=300, ack=101)) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=ext_port, dport=out_port, flags="A", seq=300, ack=101) + ) self.send_and_expect(self.pg1, p, self.pg0) self.virtual_sleep(3) # send ACK packet in -> out - should be forwarded and session alive - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=in_port, dport=ext_port, - flags="A", seq=101, ack=301)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="A", seq=101, ack=301) + ) self.send_and_expect(self.pg0, p, self.pg1) sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0) self.assertEqual(len(sessions) - session_n, 1) def test_dynamic_vrf(self): - """ NAT44ED dynamic translation test: different VRF""" + """NAT44ED dynamic translation test: different VRF""" vrf_id_in = 33 vrf_id_out = 34 @@ -3789,30 +4148,28 @@ class TestNAT44EDMW(TestNAT44ED): self.pg7.unconfig() self.pg8.unconfig() - self.vapi.ip_table_add_del(is_add=0, - table={'table_id': vrf_id_in}) - self.vapi.ip_table_add_del(is_add=0, - table={'table_id': vrf_id_out}) + self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id_in}) + self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id_out}) def test_dynamic_output_feature_vrf(self): - """ NAT44ED dynamic translation test: output-feature, VRF""" + """NAT44ED dynamic translation test: output-feature, VRF""" # other then default (0) new_vrf_id = 22 self.nat_add_address(self.nat_addr) self.vapi.nat44_ed_add_del_output_interface( - sw_if_index=self.pg8.sw_if_index, - is_add=1) + sw_if_index=self.pg8.sw_if_index, is_add=1 + ) try: self.configure_ip4_interface(self.pg7, table_id=new_vrf_id) self.configure_ip4_interface(self.pg8, table_id=new_vrf_id) # in2out - tcpn = self.statistics['/nat44-ed/in2out/slowpath/tcp'] - udpn = self.statistics['/nat44-ed/in2out/slowpath/udp'] - icmpn = self.statistics['/nat44-ed/in2out/slowpath/icmp'] - drops = self.statistics['/nat44-ed/in2out/slowpath/drops'] + tcpn = self.statistics["/nat44-ed/in2out/slowpath/tcp"] + udpn = self.statistics["/nat44-ed/in2out/slowpath/udp"] + icmpn = self.statistics["/nat44-ed/in2out/slowpath/icmp"] + drops = self.statistics["/nat44-ed/in2out/slowpath/drops"] pkts = self.create_stream_in(self.pg7, self.pg8) self.pg7.add_stream(pkts) @@ -3822,20 +4179,20 @@ class TestNAT44EDMW(TestNAT44ED): self.verify_capture_out(capture, ignore_port=True) if_idx = self.pg8.sw_if_index - cnt = self.statistics['/nat44-ed/in2out/slowpath/tcp'] + cnt = self.statistics["/nat44-ed/in2out/slowpath/tcp"] self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2) - cnt = self.statistics['/nat44-ed/in2out/slowpath/udp'] + cnt = self.statistics["/nat44-ed/in2out/slowpath/udp"] self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1) - cnt = self.statistics['/nat44-ed/in2out/slowpath/icmp'] + cnt = self.statistics["/nat44-ed/in2out/slowpath/icmp"] self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1) - cnt = self.statistics['/nat44-ed/in2out/slowpath/drops'] + cnt = self.statistics["/nat44-ed/in2out/slowpath/drops"] self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0) # out2in - tcpn = self.statistics['/nat44-ed/out2in/fastpath/tcp'] - udpn = self.statistics['/nat44-ed/out2in/fastpath/udp'] - icmpn = self.statistics['/nat44-ed/out2in/fastpath/icmp'] - drops = self.statistics['/nat44-ed/out2in/fastpath/drops'] + tcpn = self.statistics["/nat44-ed/out2in/fastpath/tcp"] + udpn = self.statistics["/nat44-ed/out2in/fastpath/udp"] + icmpn = self.statistics["/nat44-ed/out2in/fastpath/icmp"] + drops = self.statistics["/nat44-ed/out2in/fastpath/drops"] pkts = self.create_stream_out(self.pg8) self.pg8.add_stream(pkts) @@ -3845,48 +4202,56 @@ class TestNAT44EDMW(TestNAT44ED): self.verify_capture_in(capture, self.pg7) if_idx = self.pg8.sw_if_index - cnt = self.statistics['/nat44-ed/out2in/fastpath/tcp'] + cnt = self.statistics["/nat44-ed/out2in/fastpath/tcp"] self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2) - cnt = self.statistics['/nat44-ed/out2in/fastpath/udp'] + cnt = self.statistics["/nat44-ed/out2in/fastpath/udp"] self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1) - cnt = self.statistics['/nat44-ed/out2in/fastpath/icmp'] + cnt = self.statistics["/nat44-ed/out2in/fastpath/icmp"] self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1) - cnt = self.statistics['/nat44-ed/out2in/fastpath/drops'] + cnt = self.statistics["/nat44-ed/out2in/fastpath/drops"] self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0) - sessions = self.statistics['/nat44-ed/total-sessions'] + sessions = self.statistics["/nat44-ed/total-sessions"] self.assertEqual(sessions[:, 0].sum(), 3) finally: self.pg7.unconfig() self.pg8.unconfig() - self.vapi.ip_table_add_del(is_add=0, - table={'table_id': new_vrf_id}) + self.vapi.ip_table_add_del(is_add=0, table={"table_id": new_vrf_id}) def test_next_src_nat(self): - """ NAT44ED On way back forward packet to nat44-in2out node. """ + """NAT44ED On way back forward packet to nat44-in2out node.""" - twice_nat_addr = '10.0.1.3' + twice_nat_addr = "10.0.1.3" external_port = 80 local_port = 8080 post_twice_nat_port = 0 self.vapi.nat44_forwarding_enable_disable(enable=1) self.nat_add_address(twice_nat_addr, twice_nat=1) - flags = (self.config_flags.NAT_IS_OUT2IN_ONLY | - self.config_flags.NAT_IS_SELF_TWICE_NAT) - self.nat_add_static_mapping(self.pg6.remote_ip4, self.pg1.remote_ip4, - local_port, external_port, - proto=IP_PROTOS.tcp, vrf_id=1, - flags=flags) + flags = ( + self.config_flags.NAT_IS_OUT2IN_ONLY + | self.config_flags.NAT_IS_SELF_TWICE_NAT + ) + self.nat_add_static_mapping( + self.pg6.remote_ip4, + self.pg1.remote_ip4, + local_port, + external_port, + proto=IP_PROTOS.tcp, + vrf_id=1, + flags=flags, + ) self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg6.sw_if_index, - is_add=1) + sw_if_index=self.pg6.sw_if_index, is_add=1 + ) - p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) / - IP(src=self.pg6.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=12345, dport=external_port)) + p = ( + Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) + / IP(src=self.pg6.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=12345, dport=external_port) + ) self.pg6.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -3905,9 +4270,11 @@ class TestNAT44EDMW(TestNAT44ED): self.logger.error(ppp("Unexpected or invalid packet:", p)) raise - p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) / - IP(src=self.pg6.remote_ip4, dst=twice_nat_addr) / - TCP(sport=local_port, dport=post_twice_nat_port)) + p = ( + Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) + / IP(src=self.pg6.remote_ip4, dst=twice_nat_addr) + / TCP(sport=local_port, dport=post_twice_nat_port) + ) self.pg6.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -3926,7 +4293,7 @@ class TestNAT44EDMW(TestNAT44ED): raise def test_one_armed_nat44_static(self): - """ NAT44ED One armed NAT and 1:1 NAPT asymmetrical rule """ + """NAT44ED One armed NAT and 1:1 NAPT asymmetrical rule""" remote_host = self.pg4.remote_hosts[0] local_host = self.pg4.remote_hosts[1] @@ -3936,23 +4303,31 @@ class TestNAT44EDMW(TestNAT44ED): self.vapi.nat44_forwarding_enable_disable(enable=1) self.nat_add_address(self.nat_addr, twice_nat=1) - flags = (self.config_flags.NAT_IS_OUT2IN_ONLY | - self.config_flags.NAT_IS_TWICE_NAT) - self.nat_add_static_mapping(local_host.ip4, self.nat_addr, - local_port, external_port, - proto=IP_PROTOS.tcp, flags=flags) + flags = ( + self.config_flags.NAT_IS_OUT2IN_ONLY | self.config_flags.NAT_IS_TWICE_NAT + ) + self.nat_add_static_mapping( + local_host.ip4, + self.nat_addr, + local_port, + external_port, + proto=IP_PROTOS.tcp, + flags=flags, + ) flags = self.config_flags.NAT_IS_INSIDE self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg4.sw_if_index, - is_add=1) + sw_if_index=self.pg4.sw_if_index, is_add=1 + ) self.vapi.nat44_interface_add_del_feature( - sw_if_index=self.pg4.sw_if_index, - flags=flags, is_add=1) + sw_if_index=self.pg4.sw_if_index, flags=flags, is_add=1 + ) # from client to service - p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) / - IP(src=remote_host.ip4, dst=self.nat_addr) / - TCP(sport=12345, dport=external_port)) + p = ( + Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) + / IP(src=remote_host.ip4, dst=self.nat_addr) + / TCP(sport=12345, dport=external_port) + ) self.pg4.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -3972,9 +4347,11 @@ class TestNAT44EDMW(TestNAT44ED): raise # from service back to client - p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) / - IP(src=local_host.ip4, dst=self.nat_addr) / - TCP(sport=local_port, dport=eh_port_in)) + p = ( + Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) + / IP(src=local_host.ip4, dst=self.nat_addr) + / TCP(sport=local_port, dport=eh_port_in) + ) self.pg4.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -3993,7 +4370,7 @@ class TestNAT44EDMW(TestNAT44ED): raise def test_icmp_error_fwd_outbound(self): - """ NAT44ED ICMP error outbound with forwarding enabled """ + """NAT44ED ICMP error outbound with forwarding enabled""" # Ensure that an outbound ICMP error message is properly associated # with the inbound forward bypass session it is related to. @@ -4005,9 +4382,12 @@ class TestNAT44EDMW(TestNAT44ED): # enable forwarding and initiate connection out2in self.vapi.nat44_forwarding_enable_disable(enable=1) - p1 = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) / - UDP(sport=21, dport=20) / payload) + p1 = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) + / UDP(sport=21, dport=20) + / payload + ) self.pg1.add_stream(p1) self.pg_enable_capture(self.pg_interfaces) @@ -4021,10 +4401,12 @@ class TestNAT44EDMW(TestNAT44ED): # session dumps for a user will only look on the worker that the # user is supposed to be mapped to in2out. The forward bypass session # is not necessarily created on that worker. - p2 = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - ICMP(type='dest-unreach', code='port-unreachable') / - capture[IP:]) + p2 = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / ICMP(type="dest-unreach", code="port-unreachable") + / capture[IP:] + ) self.pg0.add_stream(p2) self.pg_enable_capture(self.pg_interfaces) @@ -4038,11 +4420,11 @@ class TestNAT44EDMW(TestNAT44ED): self.logger.info(ppp("capture packet:", capture)) def test_tcp_session_open_retransmit1(self): - """ NAT44ED Open TCP session with SYN,ACK retransmit 1 + """NAT44ED Open TCP session with SYN,ACK retransmit 1 - The client does not receive the [SYN,ACK] or the - ACK from the client is lost. Therefore, the [SYN, ACK] - is retransmitted by the server. + The client does not receive the [SYN,ACK] or the + ACK from the client is lost. Therefore, the [SYN, ACK] + is retransmitted by the server. """ in_port = self.tcp_port_in @@ -4053,50 +4435,61 @@ class TestNAT44EDMW(TestNAT44ED): self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg1) - self.vapi.nat_set_timeouts(udp=300, tcp_established=7440, - tcp_transitory=5, icmp=60) + self.vapi.nat_set_timeouts( + udp=300, tcp_established=7440, tcp_transitory=5, icmp=60 + ) # SYN packet in->out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=in_port, dport=ext_port, flags="S")) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="S") + ) p = self.send_and_expect(self.pg0, p, self.pg1)[0] out_port = p[TCP].sport # SYN + ACK packet out->in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=ext_port, dport=out_port, flags="SA")) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=ext_port, dport=out_port, flags="SA") + ) self.send_and_expect(self.pg1, p, self.pg0) # ACK in->out does not arrive # resent SYN + ACK packet out->in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=ext_port, dport=out_port, flags="SA")) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=ext_port, dport=out_port, flags="SA") + ) self.send_and_expect(self.pg1, p, self.pg0) # ACK packet in->out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=in_port, dport=ext_port, flags="A")) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="A") + ) self.send_and_expect(self.pg0, p, self.pg1) # Verify that the data can be transmitted after the transitory time self.virtual_sleep(6) - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=in_port, dport=ext_port, flags="PA") / - Raw(payload)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="PA") + / Raw(payload) + ) self.send_and_expect(self.pg0, p, self.pg1) def test_tcp_session_open_retransmit2(self): - """ NAT44ED Open TCP session with SYN,ACK retransmit 2 + """NAT44ED Open TCP session with SYN,ACK retransmit 2 - The ACK is lost to the server after the TCP session is opened. - Data is sent by the client, then the [SYN,ACK] is - retransmitted by the server. + The ACK is lost to the server after the TCP session is opened. + Data is sent by the client, then the [SYN,ACK] is + retransmitted by the server. """ in_port = self.tcp_port_in @@ -4107,68 +4500,87 @@ class TestNAT44EDMW(TestNAT44ED): self.nat_add_inside_interface(self.pg0) self.nat_add_outside_interface(self.pg1) - self.vapi.nat_set_timeouts(udp=300, tcp_established=7440, - tcp_transitory=5, icmp=60) + self.vapi.nat_set_timeouts( + udp=300, tcp_established=7440, tcp_transitory=5, icmp=60 + ) # SYN packet in->out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=in_port, dport=ext_port, flags="S")) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="S") + ) p = self.send_and_expect(self.pg0, p, self.pg1)[0] out_port = p[TCP].sport # SYN + ACK packet out->in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=ext_port, dport=out_port, flags="SA")) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=ext_port, dport=out_port, flags="SA") + ) self.send_and_expect(self.pg1, p, self.pg0) # ACK packet in->out -- not received by the server - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=in_port, dport=ext_port, flags="A")) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="A") + ) self.send_and_expect(self.pg0, p, self.pg1) # PUSH + ACK packet in->out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=in_port, dport=ext_port, flags="PA") / - Raw(payload)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="PA") + / Raw(payload) + ) self.send_and_expect(self.pg0, p, self.pg1) # resent SYN + ACK packet out->in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=ext_port, dport=out_port, flags="SA")) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=ext_port, dport=out_port, flags="SA") + ) self.send_and_expect(self.pg1, p, self.pg0) # resent ACK packet in->out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=in_port, dport=ext_port, flags="A")) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="A") + ) self.send_and_expect(self.pg0, p, self.pg1) # resent PUSH + ACK packet in->out - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=in_port, dport=ext_port, flags="PA") / - Raw(payload)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="PA") + / Raw(payload) + ) self.send_and_expect(self.pg0, p, self.pg1) # ACK packet out->in - p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / - IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / - TCP(sport=ext_port, dport=out_port, flags="A")) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src=self.pg1.remote_ip4, dst=self.nat_addr) + / TCP(sport=ext_port, dport=out_port, flags="A") + ) self.send_and_expect(self.pg1, p, self.pg0) # Verify that the data can be transmitted after the transitory time self.virtual_sleep(6) - p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=in_port, dport=ext_port, flags="PA") / - Raw(payload)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / TCP(sport=in_port, dport=ext_port, flags="PA") + / Raw(payload) + ) self.send_and_expect(self.pg0, p, self.pg1) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner) |