diff options
author | Klement Sekera <klement.sekera@gmail.com> | 2022-04-26 19:02:15 +0200 |
---|---|---|
committer | Ole Tr�an <otroan@employees.org> | 2022-05-10 18:52:08 +0000 |
commit | d9b0c6fbf7aa5bd9af84264105b39c82028a4a29 (patch) | |
tree | 4f786cfd8ebc2443cb11e11b74c8657204068898 /test/test_cnat.py | |
parent | f90348bcb4afd0af2611cefc43b17ef3042b511c (diff) |
tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is
much faster, stable PEP8 compliant code style checker offering also
automatic formatting. It aims to be very stable and produce smallest
diffs. It's used by many small and big projects.
Running checkstyle with black takes a few seconds with a terse output.
Thus, test-checkstyle-diff is no longer necessary.
Expand scope of checkstyle to all python files in the repo, replacing
test-checkstyle with checkstyle-python.
Also, fixstyle-python is now available for automatic style formatting.
Note: python virtualenv has been consolidated in test/Makefile,
test/requirements*.txt which will eventually be moved to a central
location. This is required to simply the automated generation of
docker executor images in the CI.
Type: improvement
Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8
Signed-off-by: Klement Sekera <klement.sekera@gmail.com>
Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'test/test_cnat.py')
-rw-r--r-- | test/test_cnat.py | 382 |
1 files changed, 227 insertions, 155 deletions
diff --git a/test/test_cnat.py b/test/test_cnat.py index 25c2a6454cb..e2e7c6bab8c 100644 --- a/test/test_cnat.py +++ b/test/test_cnat.py @@ -15,8 +15,14 @@ from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply import struct -from ipaddress import ip_address, ip_network, \ - IPv4Address, IPv6Address, IPv4Network, IPv6Network +from ipaddress import ( + ip_address, + ip_network, + IPv4Address, + IPv6Address, + IPv4Network, + IPv6Network, +) from vpp_object import VppObject from vpp_papi import VppEnum @@ -29,18 +35,27 @@ DST = 1 class CnatCommonTestCase(VppTestCase): - """ CNat common test class """ + """CNat common test class""" # # turn the scanner off whilst testing otherwise sessions # will time out # - extra_vpp_punt_config = ["cnat", "{", - "session-db-buckets", "64", - "session-cleanup-timeout", "0.1", - "session-max-age", "1", - "tcp-max-age", "1", - "scanner", "off", "}"] + extra_vpp_punt_config = [ + "cnat", + "{", + "session-db-buckets", + "64", + "session-cleanup-timeout", + "0.1", + "session-max-age", + "1", + "tcp-max-age", + "1", + "scanner", + "off", + "}", + ] @classmethod def setUpClass(cls): @@ -52,7 +67,7 @@ class CnatCommonTestCase(VppTestCase): class Endpoint(object): - """ CNat endpoint """ + """CNat endpoint""" def __init__(self, pg=None, pgi=None, port=0, is_v6=False, ip=None): self.port = port @@ -83,17 +98,18 @@ class Endpoint(object): return VppEnum.vl_api_address_family_t.ADDRESS_IP4 def encode(self): - return {'addr': self.ip, - 'port': self.port, - 'sw_if_index': self.sw_if_index, - 'if_af': self._vpp_if_af()} + return { + "addr": self.ip, + "port": self.port, + "sw_if_index": self.sw_if_index, + "if_af": self._vpp_if_af(), + } def __str__(self): - return ("%s:%d" % (self.ip, self.port)) + return "%s:%d" % (self.ip, self.port) class Translation(VppObject): - def __init__(self, test, iproto, vip, paths): self._test = test self.vip = vip @@ -102,7 +118,7 @@ class Translation(VppObject): self.id = None def __str__(self): - return ("%s %s %s" % (self.vip, self.iproto, self.paths)) + return "%s %s %s" % (self.vip, self.iproto, self.paths) def _vl4_proto(self): ip_proto = VppEnum.vl_api_ip_proto_t @@ -112,21 +128,26 @@ class Translation(VppObject): }[self.iproto] def _encoded_paths(self): - return [{'src_ep': src.encode(), - 'dst_ep': dst.encode()} for (src, dst) in self.paths] + return [ + {"src_ep": src.encode(), "dst_ep": dst.encode()} + for (src, dst) in self.paths + ] def add_vpp_config(self): r = self._test.vapi.cnat_translation_update( - {'vip': self.vip.encode(), - 'ip_proto': self._vl4_proto(), - 'n_paths': len(self.paths), - 'paths': self._encoded_paths()}) + { + "vip": self.vip.encode(), + "ip_proto": self._vl4_proto(), + "n_paths": len(self.paths), + "paths": self._encoded_paths(), + } + ) self._test.registry.register(self, self._test.logger) self.id = r.id return self def remove_vpp_config(self): - assert(self.id is not None) + assert self.id is not None self._test.vapi.cnat_translation_del(id=self.id) return self @@ -172,8 +193,9 @@ class CnatTestContext(object): def IP46(self): return IPv6 if self.is_v6 else IP - def cnat_send(self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port, - no_replies=False): + def cnat_send( + self, src_pg, src_id, src_port, dst_pg, dst_id, dst_port, no_replies=False + ): if isinstance(src_id, int): self.src_addr = self.get_ip46(src_pg.remote_hosts[src_id]) else: @@ -191,11 +213,12 @@ class CnatTestContext(object): l4 = self.L4PROTO(id=self.src_port, type=self.dst_port) elif self.L4PROTO in [ICMP] and self.is_v6: l4 = ICMPv6EchoRequest(id=self.src_port) - p1 = (Ether(src=src_pg.remote_mac, - dst=src_pg.local_mac) / - self.IP46(src=self.src_addr, dst=self.dst_addr) / - l4 / - Raw()) + p1 = ( + Ether(src=src_pg.remote_mac, dst=src_pg.local_mac) + / self.IP46(src=self.src_addr, dst=self.dst_addr) + / l4 + / Raw() + ) if no_replies: self._test.send_and_assert_no_replies(src_pg, p1 * N_PKTS, dst_pg) @@ -230,38 +253,35 @@ class CnatTestContext(object): self._test.assertEqual(rx[self.IP46].dst, self.expect_dst_addr) self._test.assertEqual(rx[self.IP46].src, self.expect_src_addr) if self.L4PROTO in [TCP, UDP]: - self._test.assertEqual( - rx[self.L4PROTO].dport, self.expect_dst_port) - self._test.assertEqual( - rx[self.L4PROTO].sport, self.expect_src_port) + self._test.assertEqual(rx[self.L4PROTO].dport, self.expect_dst_port) + self._test.assertEqual(rx[self.L4PROTO].sport, self.expect_src_port) elif self.L4PROTO in [ICMP] and not self.is_v6: - self._test.assertEqual( - rx[self.L4PROTO].type, self.expect_dst_port) - self._test.assertEqual( - rx[self.L4PROTO].id, self.expect_src_port) + self._test.assertEqual(rx[self.L4PROTO].type, self.expect_dst_port) + self._test.assertEqual(rx[self.L4PROTO].id, self.expect_src_port) elif self.L4PROTO in [ICMP] and self.is_v6: - self._test.assertEqual( - rx[ICMPv6EchoRequest].id, self.expect_src_port) + self._test.assertEqual(rx[ICMPv6EchoRequest].id, self.expect_src_port) return self def cnat_send_return(self): """This sends the return traffic""" if self.L4PROTO in [TCP, UDP]: - l4 = self.L4PROTO(sport=self.expect_dst_port, - dport=self.expect_src_port) + l4 = self.L4PROTO(sport=self.expect_dst_port, dport=self.expect_src_port) elif self.L4PROTO in [ICMP] and not self.is_v6: # icmp type 0 if echo reply l4 = self.L4PROTO(id=self.expect_src_port, type=0) elif self.L4PROTO in [ICMP] and self.is_v6: l4 = ICMPv6EchoReply(id=self.expect_src_port) src_mac = self.expected_dst_pg.remote_mac - p1 = (Ether(src=src_mac, dst=self.expected_dst_pg.local_mac) / - self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr) / - l4 / - Raw()) + p1 = ( + Ether(src=src_mac, dst=self.expected_dst_pg.local_mac) + / self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr) + / l4 + / Raw() + ) self.return_rxs = self._test.send_and_expect( - self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg) + self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg + ) return self def cnat_expect_return(self): @@ -288,12 +308,16 @@ class CnatTestContext(object): ICMPelem = ICMPv6DestUnreach(code=1) if self.is_v6 else ICMP(type=11) InnerIP = self.rxs[0][self.IP46] p1 = ( - Ether(src=self.expected_dst_pg.remote_mac, - dst=self.expected_dst_pg.local_mac) / - self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr) / - ICMPelem / InnerIP) + Ether( + src=self.expected_dst_pg.remote_mac, dst=self.expected_dst_pg.local_mac + ) + / self.IP46(src=self.expect_dst_addr, dst=self.expect_src_addr) + / ICMPelem + / InnerIP + ) self.return_rxs = self._test.send_and_expect( - self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg) + self.expected_dst_pg, p1 * N_PKTS, self.expected_src_pg + ) return self def cnat_expect_icmp_error_return(self): @@ -306,12 +330,11 @@ class CnatTestContext(object): self._test.assertEqual(rx[self.IP46].src, self.dst_addr) self._test.assertEqual(rx[ICMP46][IP46err].src, self.src_addr) self._test.assertEqual(rx[ICMP46][IP46err].dst, self.dst_addr) - self._test.assertEqual( - rx[ICMP46][IP46err][L4err].sport, self.src_port) - self._test.assertEqual( - rx[ICMP46][IP46err][L4err].dport, self.dst_port) + self._test.assertEqual(rx[ICMP46][IP46err][L4err].sport, self.src_port) + self._test.assertEqual(rx[ICMP46][IP46err][L4err].dport, self.dst_port) return self + # ------------------------------------------------------------------- # ------------------------------------------------------------------- # ------------------------------------------------------------------- @@ -319,7 +342,7 @@ class CnatTestContext(object): class TestCNatTranslation(CnatCommonTestCase): - """ CNat Translation """ + """CNat Translation""" @classmethod def setUpClass(cls): @@ -359,7 +382,7 @@ class TestCNatTranslation(CnatCommonTestCase): super(TestCNatTranslation, self).tearDown() def cnat_translation(self): - """ CNat Translation """ + """CNat Translation""" self.logger.info(self.vapi.cli("sh cnat client")) self.logger.info(self.vapi.cli("sh cnat translation")) @@ -372,11 +395,9 @@ class TestCNatTranslation(CnatCommonTestCase): ctx = CnatTestContext(self, translation.iproto, vip.is_v6) for src_pgi, sport in product(range(N_REMOTE_HOSTS), [1234, 1233]): # from client to vip - ctx.cnat_send(self.pg0, src_pgi, sport, - self.pg1, vip.ip, vip.port) + ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port) dst_port = translation.paths[0][DST].port - ctx.cnat_expect(self.pg0, src_pgi, sport, - self.pg1, nbr, dst_port) + ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg1, nbr, dst_port) # from vip to client ctx.cnat_send_return().cnat_expect_return() @@ -384,8 +405,9 @@ class TestCNatTranslation(CnatCommonTestCase): # packets to the VIP that do not match a # translation are dropped # - ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, - vip.ip, 6666, no_replies=True) + ctx.cnat_send( + self.pg0, src_pgi, sport, self.pg1, vip.ip, 6666, no_replies=True + ) # # packets from the VIP that do not match a @@ -399,7 +421,8 @@ class TestCNatTranslation(CnatCommonTestCase): # old_dst_port = translation.paths[0][DST].port translation.paths[0][DST].udpate( - pg=self.pg2, pgi=0, port=5000, is_v6=vip.is_v6) + pg=self.pg2, pgi=0, port=5000, is_v6=vip.is_v6 + ) translation.add_vpp_config() # @@ -408,10 +431,10 @@ class TestCNatTranslation(CnatCommonTestCase): for src_pgi in range(N_REMOTE_HOSTS): for sport in [1234, 1233]: # from client to vip - ctx.cnat_send(self.pg0, src_pgi, sport, - self.pg1, vip.ip, vip.port) - ctx.cnat_expect(self.pg0, src_pgi, sport, - self.pg1, nbr, old_dst_port) + ctx.cnat_send(self.pg0, src_pgi, sport, self.pg1, vip.ip, vip.port) + ctx.cnat_expect( + self.pg0, src_pgi, sport, self.pg1, nbr, old_dst_port + ) # from vip to client ctx.cnat_send_return().cnat_expect_return() @@ -419,8 +442,7 @@ class TestCNatTranslation(CnatCommonTestCase): # new flows go to the new backend # for src_pgi in range(N_REMOTE_HOSTS): - ctx.cnat_send(self.pg0, src_pgi, 9999, - self.pg2, vip.ip, vip.port) + ctx.cnat_send(self.pg0, src_pgi, 9999, self.pg2, vip.ip, vip.port) ctx.cnat_expect(self.pg0, src_pgi, 9999, self.pg2, 0, 5000) self.logger.info(self.vapi.cli("sh cnat session verbose")) @@ -444,10 +466,8 @@ class TestCNatTranslation(CnatCommonTestCase): for src_pgi in range(N_REMOTE_HOSTS): for sport in [1234, 1233]: # from client to vip - ctx.cnat_send(self.pg0, src_pgi, sport, - self.pg2, vip.ip, vip.port) - ctx.cnat_expect(self.pg0, src_pgi, - sport, self.pg2, 0, 5000) + ctx.cnat_send(self.pg0, src_pgi, sport, self.pg2, vip.ip, vip.port) + ctx.cnat_expect(self.pg0, src_pgi, sport, self.pg2, 0, 5000) def _test_icmp(self): @@ -477,51 +497,87 @@ class TestCNatTranslation(CnatCommonTestCase): def _make_translations_v4(self): self.translations = [] - self.translations.append(Translation( - self, TCP, Endpoint(ip="30.0.0.1", port=5555, is_v6=False), - [( - Endpoint(is_v6=False), - Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False), - )] - ).add_vpp_config()) - self.translations.append(Translation( - self, TCP, Endpoint(ip="30.0.0.2", port=5554, is_v6=False), - [( - Endpoint(is_v6=False), - Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=False), - )] - ).add_vpp_config()) - self.translations.append(Translation( - self, UDP, Endpoint(ip="30.0.0.2", port=5553, is_v6=False), - [( - Endpoint(is_v6=False), - Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=False), - )] - ).add_vpp_config()) + self.translations.append( + Translation( + self, + TCP, + Endpoint(ip="30.0.0.1", port=5555, is_v6=False), + [ + ( + Endpoint(is_v6=False), + Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=False), + ) + ], + ).add_vpp_config() + ) + self.translations.append( + Translation( + self, + TCP, + Endpoint(ip="30.0.0.2", port=5554, is_v6=False), + [ + ( + Endpoint(is_v6=False), + Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=False), + ) + ], + ).add_vpp_config() + ) + self.translations.append( + Translation( + self, + UDP, + Endpoint(ip="30.0.0.2", port=5553, is_v6=False), + [ + ( + Endpoint(is_v6=False), + Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=False), + ) + ], + ).add_vpp_config() + ) def _make_translations_v6(self): self.translations = [] - self.translations.append(Translation( - self, TCP, Endpoint(ip="30::1", port=5555, is_v6=True), - [( - Endpoint(is_v6=True), - Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=True), - )] - ).add_vpp_config()) - self.translations.append(Translation( - self, TCP, Endpoint(ip="30::2", port=5554, is_v6=True), - [( - Endpoint(is_v6=True), - Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=True), - )] - ).add_vpp_config()) - self.translations.append(Translation( - self, UDP, Endpoint(ip="30::2", port=5553, is_v6=True), - [( - Endpoint(is_v6=True), - Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=True), - )] - ).add_vpp_config()) + self.translations.append( + Translation( + self, + TCP, + Endpoint(ip="30::1", port=5555, is_v6=True), + [ + ( + Endpoint(is_v6=True), + Endpoint(pg=self.pg1, pgi=0, port=4001, is_v6=True), + ) + ], + ).add_vpp_config() + ) + self.translations.append( + Translation( + self, + TCP, + Endpoint(ip="30::2", port=5554, is_v6=True), + [ + ( + Endpoint(is_v6=True), + Endpoint(pg=self.pg1, pgi=1, port=4002, is_v6=True), + ) + ], + ).add_vpp_config() + ) + self.translations.append( + Translation( + self, + UDP, + Endpoint(ip="30::2", port=5553, is_v6=True), + [ + ( + Endpoint(is_v6=True), + Endpoint(pg=self.pg1, pgi=2, port=4003, is_v6=True), + ) + ], + ).add_vpp_config() + ) def test_icmp4(self): # """ CNat Translation icmp v4 """ @@ -545,7 +601,7 @@ class TestCNatTranslation(CnatCommonTestCase): class TestCNatSourceNAT(CnatCommonTestCase): - """ CNat Source NAT """ + """CNat Source NAT""" @classmethod def setUpClass(cls): @@ -559,28 +615,36 @@ class TestCNatSourceNAT(CnatCommonTestCase): self.vapi.cnat_set_snat_addresses( snat_ip4=self.pg2.remote_hosts[0].ip4, snat_ip6=self.pg2.remote_hosts[0].ip6, - sw_if_index=INVALID_INDEX) + sw_if_index=INVALID_INDEX, + ) self.vapi.feature_enable_disable( enable=1 if is_enable else 0, arc_name="ip6-unicast", feature_name="cnat-snat-ip6", - sw_if_index=self.pg0.sw_if_index) + sw_if_index=self.pg0.sw_if_index, + ) self.vapi.feature_enable_disable( enable=1 if is_enable else 0, arc_name="ip4-unicast", feature_name="cnat-snat-ip4", - sw_if_index=self.pg0.sw_if_index) + sw_if_index=self.pg0.sw_if_index, + ) policie_tbls = VppEnum.vl_api_cnat_snat_policy_table_t self.vapi.cnat_set_snat_policy( - policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX) + policy=VppEnum.vl_api_cnat_snat_policies_t.CNAT_POLICY_IF_PFX + ) for i in self.pg_interfaces: self.vapi.cnat_snat_policy_add_del_if( - sw_if_index=i.sw_if_index, is_add=1 if is_enable else 0, - table=policie_tbls.CNAT_POLICY_INCLUDE_V6) + sw_if_index=i.sw_if_index, + is_add=1 if is_enable else 0, + table=policie_tbls.CNAT_POLICY_INCLUDE_V6, + ) self.vapi.cnat_snat_policy_add_del_if( - sw_if_index=i.sw_if_index, is_add=1 if is_enable else 0, - table=policie_tbls.CNAT_POLICY_INCLUDE_V4) + sw_if_index=i.sw_if_index, + is_add=1 if is_enable else 0, + table=policie_tbls.CNAT_POLICY_INCLUDE_V4, + ) def setUp(self): super(TestCNatSourceNAT, self).setUp() @@ -624,7 +688,7 @@ class TestCNatSourceNAT(CnatCommonTestCase): def sourcenat_test_icmp_echo_conf(self, is_v6=False): ctx = CnatTestContext(self, ICMP, is_v6=is_v6) # 8 is ICMP type echo (v4 only) - ctx.cnat_send(self.pg0, 0, 0xfeed, self.pg1, 0, 8) + ctx.cnat_send(self.pg0, 0, 0xFEED, self.pg1, 0, 8) ctx.cnat_expect(self.pg2, 0, None, self.pg1, 0, 8) ctx.cnat_send_return().cnat_expect_return() @@ -638,14 +702,15 @@ class TestCNatSourceNAT(CnatCommonTestCase): # exclude dst address of pg1.1 from snat if is_v6: exclude_prefix = ip_network( - "%s/100" % self.pg1.remote_hosts[1].ip6, strict=False) + "%s/100" % self.pg1.remote_hosts[1].ip6, strict=False + ) else: exclude_prefix = ip_network( - "%s/16" % self.pg1.remote_hosts[1].ip4, strict=False) + "%s/16" % self.pg1.remote_hosts[1].ip4, strict=False + ) # add remote host to exclude list - self.vapi.cnat_snat_policy_add_del_exclude_pfx( - prefix=exclude_prefix, is_add=1) + self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=1) # We should not source NAT the id=1 ctx.cnat_send(self.pg0, 0, 1234, self.pg1, 1, 6661) @@ -658,8 +723,7 @@ class TestCNatSourceNAT(CnatCommonTestCase): ctx.cnat_send_return().cnat_expect_return() # remove remote host from exclude list - self.vapi.cnat_snat_policy_add_del_exclude_pfx( - prefix=exclude_prefix, is_add=0) + self.vapi.cnat_snat_policy_add_del_exclude_pfx(prefix=exclude_prefix, is_add=0) self.vapi.cnat_session_purge() # We should source NAT again @@ -676,7 +740,7 @@ class TestCNatSourceNAT(CnatCommonTestCase): class TestCNatDHCP(CnatCommonTestCase): - """ CNat Translation """ + """CNat Translation""" @classmethod def setUpClass(cls): @@ -703,33 +767,35 @@ class TestCNatDHCP(CnatCommonTestCase): def check_resolved(self, tr, addr_id, is_v6=False): qt = tr.query_vpp_config() - self.assertEqual(str(qt.vip.addr), self.make_addr( - tr.vip.sw_if_index, addr_id, is_v6)) + self.assertEqual( + str(qt.vip.addr), self.make_addr(tr.vip.sw_if_index, addr_id, is_v6) + ) self.assertEqual(len(qt.paths), len(tr.paths)) for path_tr, path_qt in zip(tr.paths, qt.paths): src_qt = path_qt.src_ep dst_qt = path_qt.dst_ep src_tr, dst_tr = path_tr - self.assertEqual(str(src_qt.addr), self.make_addr( - src_tr.sw_if_index, addr_id, is_v6)) - self.assertEqual(str(dst_qt.addr), self.make_addr( - dst_tr.sw_if_index, addr_id, is_v6)) + self.assertEqual( + str(src_qt.addr), self.make_addr(src_tr.sw_if_index, addr_id, is_v6) + ) + self.assertEqual( + str(dst_qt.addr), self.make_addr(dst_tr.sw_if_index, addr_id, is_v6) + ) def add_del_address(self, pg, addr_id, is_add=True, is_v6=False): self.vapi.sw_interface_add_del_address( sw_if_index=pg.sw_if_index, prefix=self.make_prefix(pg.sw_if_index, addr_id, is_v6), - is_add=1 if is_add else 0) + is_add=1 if is_add else 0, + ) def _test_dhcp_v46(self, is_v6): self.create_pg_interfaces(range(4)) for i in self.pg_interfaces: i.admin_up() paths = [ - (Endpoint(pg=self.pg1, is_v6=is_v6), - Endpoint(pg=self.pg2, is_v6=is_v6)), - (Endpoint(pg=self.pg1, is_v6=is_v6), - Endpoint(pg=self.pg3, is_v6=is_v6)) + (Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg2, is_v6=is_v6)), + (Endpoint(pg=self.pg1, is_v6=is_v6), Endpoint(pg=self.pg3, is_v6=is_v6)), ] ep = Endpoint(pg=self.pg0, is_v6=is_v6) t = Translation(self, TCP, ep, paths).add_vpp_config() @@ -766,10 +832,13 @@ class TestCNatDHCP(CnatCommonTestCase): self.add_del_address(pg, addr_id=0, is_add=True, is_v6=False) self.add_del_address(pg, addr_id=0, is_add=True, is_v6=True) r = self.vapi.cnat_get_snat_addresses() - self.assertEqual(str(r.snat_ip4), self.make_addr( - self.pg0.sw_if_index, addr_id=0, is_v6=False)) - self.assertEqual(str(r.snat_ip6), self.make_addr( - self.pg0.sw_if_index, addr_id=0, is_v6=True)) + self.assertEqual( + str(r.snat_ip4), + self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=False), + ) + self.assertEqual( + str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=0, is_v6=True) + ) # Add a new address on every interface, remove the old one # and check it is reflected in the cnat config for pg in self.pg_interfaces: @@ -778,10 +847,13 @@ class TestCNatDHCP(CnatCommonTestCase): self.add_del_address(pg, addr_id=0, is_add=False, is_v6=False) self.add_del_address(pg, addr_id=0, is_add=False, is_v6=True) r = self.vapi.cnat_get_snat_addresses() - self.assertEqual(str(r.snat_ip4), self.make_addr( - self.pg0.sw_if_index, addr_id=1, is_v6=False)) - self.assertEqual(str(r.snat_ip6), self.make_addr( - self.pg0.sw_if_index, addr_id=1, is_v6=True)) + self.assertEqual( + str(r.snat_ip4), + self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=False), + ) + self.assertEqual( + str(r.snat_ip6), self.make_addr(self.pg0.sw_if_index, addr_id=1, is_v6=True) + ) # remove the configuration for pg in self.pg_interfaces: self.add_del_address(pg, addr_id=1, is_add=False, is_v6=False) @@ -789,5 +861,5 @@ class TestCNatDHCP(CnatCommonTestCase): self.vapi.cnat_set_snat_addresses(sw_if_index=INVALID_INDEX) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner) |