diff options
author | Klement Sekera <klement.sekera@gmail.com> | 2022-04-26 19:02:15 +0200 |
---|---|---|
committer | Ole Tr�an <otroan@employees.org> | 2022-05-10 18:52:08 +0000 |
commit | d9b0c6fbf7aa5bd9af84264105b39c82028a4a29 (patch) | |
tree | 4f786cfd8ebc2443cb11e11b74c8657204068898 /test/test_ip6_vrf_multi_instance.py | |
parent | f90348bcb4afd0af2611cefc43b17ef3042b511c (diff) |
tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is
much faster, stable PEP8 compliant code style checker offering also
automatic formatting. It aims to be very stable and produce smallest
diffs. It's used by many small and big projects.
Running checkstyle with black takes a few seconds with a terse output.
Thus, test-checkstyle-diff is no longer necessary.
Expand scope of checkstyle to all python files in the repo, replacing
test-checkstyle with checkstyle-python.
Also, fixstyle-python is now available for automatic style formatting.
Note: python virtualenv has been consolidated in test/Makefile,
test/requirements*.txt which will eventually be moved to a central
location. This is required to simply the automated generation of
docker executor images in the CI.
Type: improvement
Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8
Signed-off-by: Klement Sekera <klement.sekera@gmail.com>
Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'test/test_ip6_vrf_multi_instance.py')
-rw-r--r-- | test/test_ip6_vrf_multi_instance.py | 184 |
1 files changed, 98 insertions, 86 deletions
diff --git a/test/test_ip6_vrf_multi_instance.py b/test/test_ip6_vrf_multi_instance.py index d95e7927f98..73df30d77f2 100644 --- a/test/test_ip6_vrf_multi_instance.py +++ b/test/test_ip6_vrf_multi_instance.py @@ -69,8 +69,14 @@ import socket from scapy.packet import Raw from scapy.layers.l2 import Ether -from scapy.layers.inet6 import UDP, IPv6, ICMPv6ND_NS, ICMPv6ND_RA, \ - RouterAlert, IPv6ExtHdrHopByHop +from scapy.layers.inet6 import ( + UDP, + IPv6, + ICMPv6ND_NS, + ICMPv6ND_RA, + RouterAlert, + IPv6ExtHdrHopByHop, +) from scapy.utils6 import in6_ismaddr, in6_isllsnmaddr, in6_getAddrType from scapy.pton_ntop import inet_ntop @@ -80,8 +86,8 @@ from vrf import VRFState def is_ipv6_misc_ext(p): - """ Is packet one of uninteresting IPv6 broadcasts (extended to filter out - ICMPv6 Neighbor Discovery - Neighbor Advertisement packets too)? """ + """Is packet one of uninteresting IPv6 broadcasts (extended to filter out + ICMPv6 Neighbor Discovery - Neighbor Advertisement packets too)?""" if p.haslayer(ICMPv6ND_RA): if in6_ismaddr(p[IPv6].dst): return True @@ -96,7 +102,7 @@ def is_ipv6_misc_ext(p): class TestIP6VrfMultiInst(VppTestCase): - """ IP6 VRF Multi-instance Test Case """ + """IP6 VRF Multi-instance Test Case""" @classmethod def setUpClass(cls): @@ -114,8 +120,7 @@ class TestIP6VrfMultiInst(VppTestCase): try: # Create pg interfaces - cls.create_pg_interfaces( - range(cls.nr_of_vrfs * cls.pg_ifs_per_vrf)) + cls.create_pg_interfaces(range(cls.nr_of_vrfs * cls.pg_ifs_per_vrf)) # Packet flows mapping pg0 -> pg1, pg2 etc. cls.flows = dict() @@ -124,7 +129,8 @@ class TestIP6VrfMultiInst(VppTestCase): pg_list = [ cls.pg_interfaces[multiplicand * cls.pg_ifs_per_vrf + j] for j in range(cls.pg_ifs_per_vrf) - if (multiplicand * cls.pg_ifs_per_vrf + j) != i] + if (multiplicand * cls.pg_ifs_per_vrf + j) != i + ] cls.flows[cls.pg_interfaces[i]] = pg_list # Packet sizes - jumbo packet (9018 bytes) skipped @@ -153,7 +159,8 @@ class TestIP6VrfMultiInst(VppTestCase): set_id = i + 1 pg_list = [ cls.pg_interfaces[i * cls.pg_ifs_per_vrf + j] - for j in range(cls.pg_ifs_per_vrf)] + for j in range(cls.pg_ifs_per_vrf) + ] cls.pg_if_sets[set_id] = pg_list except Exception: @@ -185,8 +192,9 @@ class TestIP6VrfMultiInst(VppTestCase): for i in range(self.pg_ifs_per_vrf): pg_if = self.pg_if_sets[if_set_id][i] pg_if.set_table_ip6(vrf_id) - self.logger.info("pg-interface %s added to IPv6 VRF ID %d" - % (pg_if.name, vrf_id)) + self.logger.info( + "pg-interface %s added to IPv6 VRF ID %d" % (pg_if.name, vrf_id) + ) if pg_if not in self.pg_in_vrf: self.pg_in_vrf.append(pg_if) if pg_if in self.pg_not_in_vrf: @@ -206,8 +214,9 @@ class TestIP6VrfMultiInst(VppTestCase): """ for i in range(count): vrf_id = i + start - self.vapi.ip_table_add_del(is_add=1, - table={'table_id': vrf_id, 'is_ip6': 1}) + self.vapi.ip_table_add_del( + is_add=1, table={"table_id": vrf_id, "is_ip6": 1} + ) self.logger.info("IPv6 VRF ID %d created" % vrf_id) if vrf_id not in self.vrf_list: self.vrf_list.append(vrf_id) @@ -217,8 +226,7 @@ class TestIP6VrfMultiInst(VppTestCase): self.logger.debug(self.vapi.ppcli("show ip6 fib")) self.logger.debug(self.vapi.ppcli("show ip6 neighbors")) - def create_vrf_by_id_and_assign_interfaces(self, set_id, - vrf_id=0xffffffff): + def create_vrf_by_id_and_assign_interfaces(self, set_id, vrf_id=0xFFFFFFFF): """ Create a FIB table / VRF by vrf_id, put 3 pg-ip6 interfaces to FIB table / VRF. @@ -226,8 +234,7 @@ class TestIP6VrfMultiInst(VppTestCase): :param int vrf_id: Required table ID / VRF ID. \ (Default value = 0xffffffff, ID will be selected automatically) """ - ret = self.vapi.ip_table_allocate(table={'table_id': vrf_id, - 'is_ip6': 1}) + ret = self.vapi.ip_table_allocate(table={"table_id": vrf_id, "is_ip6": 1}) vrf_id = ret.table.table_id self.logger.info("IPv6 VRF ID %d created" % vrf_id) if vrf_id not in self.vrf_list: @@ -248,7 +255,7 @@ class TestIP6VrfMultiInst(VppTestCase): """ if if_set_id is None: if_set_id = vrf_id - self.vapi.ip_table_flush(table={'table_id': vrf_id, 'is_ip6': 1}) + self.vapi.ip_table_flush(table={"table_id": vrf_id, "is_ip6": 1}) if vrf_id in self.vrf_list: self.vrf_list.remove(vrf_id) if vrf_id not in self.vrf_reset_list: @@ -270,8 +277,7 @@ class TestIP6VrfMultiInst(VppTestCase): self.vrf_list.remove(vrf_id) if vrf_id in self.vrf_reset_list: self.vrf_reset_list.remove(vrf_id) - self.vapi.ip_table_add_del(is_add=0, - table={'table_id': vrf_id, 'is_ip6': 1}) + self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id, "is_ip6": 1}) def create_stream(self, src_if, packet_sizes): """ @@ -288,16 +294,20 @@ class TestIP6VrfMultiInst(VppTestCase): src_host = random.choice(src_hosts) pkt_info = self.create_packet_info(src_if, dst_if) payload = self.info_to_payload(pkt_info) - p = (Ether(dst=src_if.local_mac, src=src_host.mac) / - IPv6(src=src_host.ip6, dst=dst_host.ip6) / - UDP(sport=1234, dport=1234) / - Raw(payload)) + p = ( + Ether(dst=src_if.local_mac, src=src_host.mac) + / IPv6(src=src_host.ip6, dst=dst_host.ip6) + / UDP(sport=1234, dport=1234) + / Raw(payload) + ) pkt_info.data = p.copy() size = random.choice(packet_sizes) self.extend_packet(p, size) pkts.append(p) - self.logger.debug("Input stream created for port %s. Length: %u pkt(s)" - % (src_if.name, len(pkts))) + self.logger.debug( + "Input stream created for port %s. Length: %u pkt(s)" + % (src_if.name, len(pkts)) + ) return pkts def create_stream_crosswise_vrf(self, src_if, vrf_id, packet_sizes): @@ -320,16 +330,20 @@ class TestIP6VrfMultiInst(VppTestCase): src_host = random.choice(src_hosts) pkt_info = self.create_packet_info(src_if, dst_if) payload = self.info_to_payload(pkt_info) - p = (Ether(dst=src_if.local_mac, src=src_host.mac) / - IPv6(src=src_host.ip6, dst=dst_host.ip6) / - UDP(sport=1234, dport=1234) / - Raw(payload)) + p = ( + Ether(dst=src_if.local_mac, src=src_host.mac) + / IPv6(src=src_host.ip6, dst=dst_host.ip6) + / UDP(sport=1234, dport=1234) + / Raw(payload) + ) pkt_info.data = p.copy() size = random.choice(packet_sizes) self.extend_packet(p, size) pkts.append(p) - self.logger.debug("Input stream created for port %s. Length: %u pkt(s)" - % (src_if.name, len(pkts))) + self.logger.debug( + "Input stream created for port %s. Length: %u pkt(s)" + % (src_if.name, len(pkts)) + ) return pkts def verify_capture(self, pg_if, capture): @@ -350,11 +364,13 @@ class TestIP6VrfMultiInst(VppTestCase): payload_info = self.payload_to_info(packet[Raw]) packet_index = payload_info.index self.assertEqual(payload_info.dst, dst_sw_if_index) - self.logger.debug("Got packet on port %s: src=%u (id=%u)" % - (pg_if.name, payload_info.src, packet_index)) + self.logger.debug( + "Got packet on port %s: src=%u (id=%u)" + % (pg_if.name, payload_info.src, packet_index) + ) next_info = self.get_next_packet_info_for_interface2( - payload_info.src, dst_sw_if_index, - last_info[payload_info.src]) + payload_info.src, dst_sw_if_index, last_info[payload_info.src] + ) last_info[payload_info.src] = next_info self.assertIsNotNone(next_info) self.assertEqual(packet_index, next_info.index) @@ -369,11 +385,13 @@ class TestIP6VrfMultiInst(VppTestCase): raise for i in self.pg_interfaces: remaining_packet = self.get_next_packet_info_for_interface2( - i, dst_sw_if_index, last_info[i.sw_if_index]) + i, dst_sw_if_index, last_info[i.sw_if_index] + ) self.assertIsNone( remaining_packet, - "Port %u: Packet expected from source %u didn't arrive" % - (dst_sw_if_index, i.sw_if_index)) + "Port %u: Packet expected from source %u didn't arrive" + % (dst_sw_if_index, i.sw_if_index), + ) def verify_vrf(self, vrf_id, if_set_id=None): """ @@ -437,8 +455,9 @@ class TestIP6VrfMultiInst(VppTestCase): capture = pg_if.get_capture(remark="interface is in VRF") self.verify_capture(pg_if, capture) elif pg_if in self.pg_not_in_vrf: - pg_if.assert_nothing_captured(remark="interface is not in VRF", - filter_out_fn=is_ipv6_misc_ext) + pg_if.assert_nothing_captured( + remark="interface is not in VRF", filter_out_fn=is_ipv6_misc_ext + ) self.logger.debug("No capture for interface %s" % pg_if.name) else: raise Exception("Unknown interface: %s" % pg_if.name) @@ -458,7 +477,8 @@ class TestIP6VrfMultiInst(VppTestCase): for vrf_id in self.vrf_list: for pg_if in self.pg_if_sets[vrf_id]: pkts = self.create_stream_crosswise_vrf( - pg_if, vrf_id, self.pg_if_packet_sizes) + pg_if, vrf_id, self.pg_if_packet_sizes + ) pg_if.add_stream(pkts) # Enable packet capture and start packet sending @@ -468,29 +488,27 @@ class TestIP6VrfMultiInst(VppTestCase): # Verify # Verify outgoing packet streams per packet-generator interface for pg_if in self.pg_interfaces: - pg_if.assert_nothing_captured(remark="interface is in other VRF", - filter_out_fn=is_ipv6_misc_ext) + pg_if.assert_nothing_captured( + remark="interface is in other VRF", filter_out_fn=is_ipv6_misc_ext + ) self.logger.debug("No capture for interface %s" % pg_if.name) def test_ip6_vrf_01(self): - """ IP6 VRF Multi-instance test 1 - create 4 VRFs - """ + """IP6 VRF Multi-instance test 1 - create 4 VRFs""" # Config 1 # Create 4 VRFs self.create_vrf_and_assign_interfaces(4) # Verify 1 for vrf_id in self.vrf_list: - self.assert_equal(self.verify_vrf(vrf_id), - VRFState.configured, VRFState) + self.assert_equal(self.verify_vrf(vrf_id), VRFState.configured, VRFState) # Test 1 self.run_verify_test() self.run_crosswise_vrf_test() def test_ip6_vrf_02(self): - """ IP6 VRF Multi-instance test 2 - reset 2 VRFs - """ + """IP6 VRF Multi-instance test 2 - reset 2 VRFs""" # Config 2 # Delete 2 VRFs self.reset_vrf_and_remove_from_vrf_list(1) @@ -498,11 +516,9 @@ class TestIP6VrfMultiInst(VppTestCase): # Verify 2 for vrf_id in self.vrf_reset_list: - self.assert_equal(self.verify_vrf(vrf_id), - VRFState.reset, VRFState) + self.assert_equal(self.verify_vrf(vrf_id), VRFState.reset, VRFState) for vrf_id in self.vrf_list: - self.assert_equal(self.verify_vrf(vrf_id), - VRFState.configured, VRFState) + self.assert_equal(self.verify_vrf(vrf_id), VRFState.configured, VRFState) # Test 2 self.run_verify_test() @@ -513,8 +529,7 @@ class TestIP6VrfMultiInst(VppTestCase): # self.reset_vrf_and_remove_from_vrf_list(vrf_id) def test_ip6_vrf_03(self): - """ IP6 VRF Multi-instance 3 - add 2 VRFs - """ + """IP6 VRF Multi-instance 3 - add 2 VRFs""" # Config 3 # Add 1 of reset VRFs and 1 new VRF self.create_vrf_and_assign_interfaces(1) @@ -522,11 +537,9 @@ class TestIP6VrfMultiInst(VppTestCase): # Verify 3 for vrf_id in self.vrf_reset_list: - self.assert_equal(self.verify_vrf(vrf_id), - VRFState.reset, VRFState) + self.assert_equal(self.verify_vrf(vrf_id), VRFState.reset, VRFState) for vrf_id in self.vrf_list: - self.assert_equal(self.verify_vrf(vrf_id), - VRFState.configured, VRFState) + self.assert_equal(self.verify_vrf(vrf_id), VRFState.configured, VRFState) # Test 3 self.run_verify_test() @@ -537,8 +550,7 @@ class TestIP6VrfMultiInst(VppTestCase): # self.reset_vrf_and_remove_from_vrf_list(vrf_id) def test_ip6_vrf_04(self): - """ IP6 VRF Multi-instance test 4 - reset 4 VRFs - """ + """IP6 VRF Multi-instance test 4 - reset 4 VRFs""" # Config 4 # Reset all VRFs (i.e. no VRF except VRF=0 configured) for i in range(len(self.vrf_list)): @@ -547,20 +559,20 @@ class TestIP6VrfMultiInst(VppTestCase): # Verify 4 for vrf_id in self.vrf_reset_list: - self.assert_equal(self.verify_vrf(vrf_id), - VRFState.reset, VRFState) + self.assert_equal(self.verify_vrf(vrf_id), VRFState.reset, VRFState) vrf_list_length = len(self.vrf_list) self.assertEqual( - vrf_list_length, 0, - "List of configured VRFs is not empty: %s != 0" % vrf_list_length) + vrf_list_length, + 0, + "List of configured VRFs is not empty: %s != 0" % vrf_list_length, + ) # Test 4 self.run_verify_test() self.run_crosswise_vrf_test() def test_ip6_vrf_05(self): - """ IP6 VRF Multi-instance test 5 - auto allocate vrf id - """ + """IP6 VRF Multi-instance test 5 - auto allocate vrf id""" # Config 5 # Create several VRFs # Set vrf_id manually first @@ -571,11 +583,11 @@ class TestIP6VrfMultiInst(VppTestCase): ] # Verify 5 - self.assert_equal(self.verify_vrf(10, 1), VRFState.configured, - VRFState) + self.assert_equal(self.verify_vrf(10, 1), VRFState.configured, VRFState) for i, vrf in enumerate(auto_vrf_id): - self.assert_equal(self.verify_vrf(vrf, i+2), - VRFState.configured, VRFState) + self.assert_equal( + self.verify_vrf(vrf, i + 2), VRFState.configured, VRFState + ) # Test 5 self.run_verify_test() @@ -584,18 +596,19 @@ class TestIP6VrfMultiInst(VppTestCase): # Reset VRFs self.reset_vrf_and_remove_from_vrf_list(10, 1) for i, vrf in enumerate(auto_vrf_id): - self.reset_vrf_and_remove_from_vrf_list(vrf, i+2) + self.reset_vrf_and_remove_from_vrf_list(vrf, i + 2) # Verify 5.1 self.assert_equal(self.verify_vrf(10, 1), VRFState.reset, VRFState) for i, vrf in enumerate(auto_vrf_id): - self.assert_equal(self.verify_vrf(vrf, i+2), - VRFState.reset, VRFState) + self.assert_equal(self.verify_vrf(vrf, i + 2), VRFState.reset, VRFState) vrf_list_length = len(self.vrf_list) self.assertEqual( - vrf_list_length, 0, - "List of configured VRFs is not empty: %s != 0" % vrf_list_length) + vrf_list_length, + 0, + "List of configured VRFs is not empty: %s != 0" % vrf_list_length, + ) # Cleanup our extra created VRFs for vrf in auto_vrf_id: @@ -604,14 +617,12 @@ class TestIP6VrfMultiInst(VppTestCase): self.delete_vrf(10) def test_ip6_vrf_06(self): - """ IP6 VRF Multi-instance test 6 - recreate 4 VRFs - """ + """IP6 VRF Multi-instance test 6 - recreate 4 VRFs""" # Reconfigure all the VRFs self.create_vrf_and_assign_interfaces(4) # Verify for vrf_id in self.vrf_list: - self.assert_equal(self.verify_vrf(vrf_id), - VRFState.configured, VRFState) + self.assert_equal(self.verify_vrf(vrf_id), VRFState.configured, VRFState) # Test self.run_verify_test() self.run_crosswise_vrf_test() @@ -620,16 +631,17 @@ class TestIP6VrfMultiInst(VppTestCase): self.reset_vrf_and_remove_from_vrf_list(self.vrf_list[0]) # Verify for vrf_id in self.vrf_reset_list: - self.assert_equal(self.verify_vrf(vrf_id), - VRFState.reset, VRFState) + self.assert_equal(self.verify_vrf(vrf_id), VRFState.reset, VRFState) vrf_list_length = len(self.vrf_list) self.assertEqual( - vrf_list_length, 0, - "List of configured VRFs is not empty: %s != 0" % vrf_list_length) + vrf_list_length, + 0, + "List of configured VRFs is not empty: %s != 0" % vrf_list_length, + ) # Test self.run_verify_test() self.run_crosswise_vrf_test() -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner) |