diff options
author | Klement Sekera <klement.sekera@gmail.com> | 2022-04-26 19:02:15 +0200 |
---|---|---|
committer | Ole Tr�an <otroan@employees.org> | 2022-05-10 18:52:08 +0000 |
commit | d9b0c6fbf7aa5bd9af84264105b39c82028a4a29 (patch) | |
tree | 4f786cfd8ebc2443cb11e11b74c8657204068898 /test/test_ip4.py | |
parent | f90348bcb4afd0af2611cefc43b17ef3042b511c (diff) |
tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is
much faster, stable PEP8 compliant code style checker offering also
automatic formatting. It aims to be very stable and produce smallest
diffs. It's used by many small and big projects.
Running checkstyle with black takes a few seconds with a terse output.
Thus, test-checkstyle-diff is no longer necessary.
Expand scope of checkstyle to all python files in the repo, replacing
test-checkstyle with checkstyle-python.
Also, fixstyle-python is now available for automatic style formatting.
Note: python virtualenv has been consolidated in test/Makefile,
test/requirements*.txt which will eventually be moved to a central
location. This is required to simply the automated generation of
docker executor images in the CI.
Type: improvement
Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8
Signed-off-by: Klement Sekera <klement.sekera@gmail.com>
Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'test/test_ip4.py')
-rw-r--r-- | test/test_ip4.py | 1788 |
1 files changed, 998 insertions, 790 deletions
diff --git a/test/test_ip4.py b/test/test_ip4.py index 3a48274691a..9079e54366a 100644 --- a/test/test_ip4.py +++ b/test/test_ip4.py @@ -15,10 +15,20 @@ from six import moves from framework import tag_fixme_vpp_workers from framework import VppTestCase, VppTestRunner from util import ppp -from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpMRoute, \ - VppMRoutePath, VppMplsIpBind, \ - VppMplsTable, VppIpTable, FibPathType, find_route, \ - VppIpInterfaceAddress, find_route_in_dump, find_mroute_in_dump +from vpp_ip_route import ( + VppIpRoute, + VppRoutePath, + VppIpMRoute, + VppMRoutePath, + VppMplsIpBind, + VppMplsTable, + VppIpTable, + FibPathType, + find_route, + VppIpInterfaceAddress, + find_route_in_dump, + find_mroute_in_dump, +) from vpp_ip import VppIpPuntPolicer, VppIpPuntRedirect, VppIpPathMtu from vpp_sub_interface import VppSubInterface, VppDot1QSubint, VppDot1ADSubint from vpp_papi import vpp_papi, VppEnum @@ -30,7 +40,7 @@ NUM_PKTS = 67 class TestIPv4(VppTestCase): - """ IPv4 Test Case """ + """IPv4 Test Case""" @classmethod def setUpClass(cls): @@ -66,7 +76,8 @@ class TestIPv4(VppTestCase): # create 2 subinterfaces for pg1 and pg2 self.sub_interfaces = [ VppDot1QSubint(self, self.pg1, 100), - VppDot1ADSubint(self, self.pg2, 200, 300, 400)] + VppDot1ADSubint(self, self.pg2, 200, 300, 400), + ] # packet flows mapping pg0 -> pg1.sub, pg2.sub, etc. self.flows = dict() @@ -108,7 +119,7 @@ class TestIPv4(VppTestCase): dst_if = self.flows[src_if][dst_if_idx] info = self.create_packet_info(src_if, dst_if) payload = self.info_to_payload(info) - p = pkt/Raw(payload) + p = pkt / Raw(payload) p[IP].dst = dst_if.remote_ip4 info.data = p.copy() if isinstance(src_if, VppSubInterface): @@ -123,17 +134,26 @@ class TestIPv4(VppTestCase): :param VppInterface src_if: Interface to create packet stream for. """ hdr_ext = 4 if isinstance(src_if, VppSubInterface) else 0 - pkt_tmpl = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / - IP(src=src_if.remote_ip4) / - UDP(sport=1234, dport=1234)) - - pkts = [self.modify_packet(src_if, i, pkt_tmpl) - for i in moves.range(self.pg_if_packet_sizes[0], - self.pg_if_packet_sizes[1], 10)] - pkts_b = [self.modify_packet(src_if, i, pkt_tmpl) - for i in moves.range(self.pg_if_packet_sizes[1] + hdr_ext, - self.pg_if_packet_sizes[2] + hdr_ext, - 50)] + pkt_tmpl = ( + Ether(dst=src_if.local_mac, src=src_if.remote_mac) + / IP(src=src_if.remote_ip4) + / UDP(sport=1234, dport=1234) + ) + + pkts = [ + self.modify_packet(src_if, i, pkt_tmpl) + for i in moves.range( + self.pg_if_packet_sizes[0], self.pg_if_packet_sizes[1], 10 + ) + ] + pkts_b = [ + self.modify_packet(src_if, i, pkt_tmpl) + for i in moves.range( + self.pg_if_packet_sizes[1] + hdr_ext, + self.pg_if_packet_sizes[2] + hdr_ext, + 50, + ) + ] pkts.extend(pkts_b) return pkts @@ -151,7 +171,7 @@ class TestIPv4(VppTestCase): last_info[i.sw_if_index] = None is_sub_if = False dst_sw_if_index = dst_if.sw_if_index - if hasattr(dst_if, 'parent'): + if hasattr(dst_if, "parent"): is_sub_if = True for packet in capture: if is_sub_if: @@ -165,11 +185,12 @@ class TestIPv4(VppTestCase): packet_index = payload_info.index self.assertEqual(payload_info.dst, dst_sw_if_index) self.logger.debug( - "Got packet on port %s: src=%u (id=%u)" % - (dst_if.name, payload_info.src, packet_index)) + "Got packet on port %s: src=%u (id=%u)" + % (dst_if.name, payload_info.src, packet_index) + ) next_info = self.get_next_packet_info_for_interface2( - payload_info.src, dst_sw_if_index, - last_info[payload_info.src]) + payload_info.src, dst_sw_if_index, last_info[payload_info.src] + ) last_info[payload_info.src] = next_info self.assertTrue(next_info is not None) self.assertEqual(packet_index, next_info.index) @@ -184,13 +205,16 @@ class TestIPv4(VppTestCase): raise for i in self.interfaces: remaining_packet = self.get_next_packet_info_for_interface2( - i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index]) - self.assertTrue(remaining_packet is None, - "Interface %s: Packet expected from interface %s " - "didn't arrive" % (dst_if.name, i.name)) + i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index] + ) + self.assertTrue( + remaining_packet is None, + "Interface %s: Packet expected from interface %s " + "didn't arrive" % (dst_if.name, i.name), + ) def test_fib(self): - """ IPv4 FIB test + """IPv4 FIB test Test scenario: @@ -218,16 +242,19 @@ class TestIPv4(VppTestCase): class TestIPv4RouteLookup(VppTestCase): - """ IPv4 Route Lookup Test Case """ + """IPv4 Route Lookup Test Case""" + routes = [] def route_lookup(self, prefix, exact): - return self.vapi.api(self.vapi.papi.ip_route_lookup, - { - 'table_id': 0, - 'exact': exact, - 'prefix': prefix, - }) + return self.vapi.api( + self.vapi.papi.ip_route_lookup, + { + "table_id": 0, + "exact": exact, + "prefix": prefix, + }, + ) @classmethod def setUpClass(cls): @@ -240,8 +267,9 @@ class TestIPv4RouteLookup(VppTestCase): def setUp(self): super(TestIPv4RouteLookup, self).setUp() - drop_nh = VppRoutePath("127.0.0.1", 0xffffffff, - type=FibPathType.FIB_PATH_TYPE_DROP) + drop_nh = VppRoutePath( + "127.0.0.1", 0xFFFFFFFF, type=FibPathType.FIB_PATH_TYPE_DROP + ) # Add 3 routes r = VppIpRoute(self, "1.1.0.0", 16, [drop_nh]) @@ -267,12 +295,12 @@ class TestIPv4RouteLookup(VppTestCase): # Verify we find the host route prefix = "1.1.1.1/32" result = self.route_lookup(prefix, True) - assert (prefix == str(result.route.prefix)) + assert prefix == str(result.route.prefix) # Verify we find a middle prefix route prefix = "1.1.1.0/24" result = self.route_lookup(prefix, True) - assert (prefix == str(result.route.prefix)) + assert prefix == str(result.route.prefix) # Verify we do not find an available LPM. with self.vapi.assert_negative_api_retval(): @@ -282,17 +310,17 @@ class TestIPv4RouteLookup(VppTestCase): # verify we find lpm lpm_prefix = "1.1.1.0/24" result = self.route_lookup("1.1.1.2/32", False) - assert (lpm_prefix == str(result.route.prefix)) + assert lpm_prefix == str(result.route.prefix) # Verify we find the exact when not requested result = self.route_lookup(lpm_prefix, False) - assert (lpm_prefix == str(result.route.prefix)) + assert lpm_prefix == str(result.route.prefix) # Can't seem to delete the default route so no negative LPM test. class TestIPv4IfAddrRoute(VppTestCase): - """ IPv4 Interface Addr Route Test Case """ + """IPv4 Interface Addr Route Test Case""" @classmethod def setUpClass(cls): @@ -320,7 +348,7 @@ class TestIPv4IfAddrRoute(VppTestCase): i.admin_down() def test_ipv4_ifaddrs_same_prefix(self): - """ IPv4 Interface Addresses Same Prefix test + """IPv4 Interface Addresses Same Prefix test Test scenario: @@ -370,7 +398,7 @@ class TestIPv4IfAddrRoute(VppTestCase): self.assertFalse(find_route(self, "10.10.10.0", 32)) def test_ipv4_ifaddr_route(self): - """ IPv4 Interface Address Route test + """IPv4 Interface Address Route test Test scenario: @@ -415,7 +443,7 @@ class TestIPv4IfAddrRoute(VppTestCase): self.assertTrue(lo_if.is_ip4_entry_in_fib_dump(fib4_dump)) def test_ipv4_ifaddr_del(self): - """ Delete an interface address that does not exist """ + """Delete an interface address that does not exist""" loopbacks = self.create_loopback_interfaces(1) lo = self.lo_interfaces[0] @@ -428,13 +456,12 @@ class TestIPv4IfAddrRoute(VppTestCase): # with self.vapi.assert_negative_api_retval(): self.vapi.sw_interface_add_del_address( - sw_if_index=lo.sw_if_index, - prefix=self.pg0.local_ip4_prefix, - is_add=0) + sw_if_index=lo.sw_if_index, prefix=self.pg0.local_ip4_prefix, is_add=0 + ) class TestICMPEcho(VppTestCase): - """ ICMP Echo Test Case """ + """ICMP Echo Test Case""" @classmethod def setUpClass(cls): @@ -462,7 +489,7 @@ class TestICMPEcho(VppTestCase): i.admin_down() def test_icmp_echo(self): - """ VPP replies to ICMP Echo Request + """VPP replies to ICMP Echo Request Test scenario: @@ -470,14 +497,15 @@ class TestICMPEcho(VppTestCase): - Check outgoing ICMP Echo Reply message on pg0 interface. """ - icmp_id = 0xb + icmp_id = 0xB icmp_seq = 5 - icmp_load = b'\x0a' * 18 - p_echo_request = (Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / - ICMP(id=icmp_id, seq=icmp_seq) / - Raw(load=icmp_load)) + icmp_load = b"\x0a" * 18 + p_echo_request = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) + / ICMP(id=icmp_id, seq=icmp_seq) + / Raw(load=icmp_load) + ) self.pg0.add_stream(p_echo_request) self.pg_enable_capture(self.pg_interfaces) @@ -502,7 +530,7 @@ class TestICMPEcho(VppTestCase): class TestIPv4FibCrud(VppTestCase): - """ FIB - add/update/delete - ip4 routes + """FIB - add/update/delete - ip4 routes Test scenario: - add 1k, @@ -513,8 +541,7 @@ class TestIPv4FibCrud(VppTestCase): ..note:: Python API is too slow to add many routes, needs replacement. """ - def config_fib_many_to_one(self, start_dest_addr, next_hop_addr, - count, start=0): + def config_fib_many_to_one(self, start_dest_addr, next_hop_addr, count, start=0): """ :param start_dest_addr: @@ -524,19 +551,26 @@ class TestIPv4FibCrud(VppTestCase): """ routes = [] for i in range(count): - r = VppIpRoute(self, start_dest_addr % (i + start), 32, - [VppRoutePath(next_hop_addr, 0xffffffff)]) + r = VppIpRoute( + self, + start_dest_addr % (i + start), + 32, + [VppRoutePath(next_hop_addr, 0xFFFFFFFF)], + ) r.add_vpp_config() routes.append(r) return routes - def unconfig_fib_many_to_one(self, start_dest_addr, next_hop_addr, - count, start=0): + def unconfig_fib_many_to_one(self, start_dest_addr, next_hop_addr, count, start=0): routes = [] for i in range(count): - r = VppIpRoute(self, start_dest_addr % (i + start), 32, - [VppRoutePath(next_hop_addr, 0xffffffff)]) + r = VppIpRoute( + self, + start_dest_addr % (i + start), + 32, + [VppRoutePath(next_hop_addr, 0xFFFFFFFF)], + ) r.remove_vpp_config() routes.append(r) return routes @@ -548,10 +582,12 @@ class TestIPv4FibCrud(VppTestCase): dst_addr = random.choice(routes).prefix.network_address info = self.create_packet_info(src_if, dst_if) payload = self.info_to_payload(info) - p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / - IP(src=src_if.remote_ip4, dst=str(dst_addr)) / - UDP(sport=1234, dport=1234) / - Raw(payload)) + p = ( + Ether(dst=src_if.local_mac, src=src_if.remote_mac) + / IP(src=src_if.remote_ip4, dst=str(dst_addr)) + / UDP(sport=1234, dport=1234) + / Raw(payload) + ) info.data = p.copy() self.extend_packet(p, random.choice(self.pg_if_packet_sizes)) pkts.append(p) @@ -560,8 +596,7 @@ class TestIPv4FibCrud(VppTestCase): def _find_ip_match(self, find_in, pkt): for p in find_in: - if self.payload_to_info(p[Raw]) == \ - self.payload_to_info(pkt[Raw]): + if self.payload_to_info(p[Raw]) == self.payload_to_info(pkt[Raw]): if p[IP].src != pkt[IP].src: break if p[IP].dst != pkt[IP].dst: @@ -585,15 +620,15 @@ class TestIPv4FibCrud(VppTestCase): def verify_route_dump(self, routes): for r in routes: - self.assertTrue(find_route(self, - r.prefix.network_address, - r.prefix.prefixlen)) + self.assertTrue( + find_route(self, r.prefix.network_address, r.prefix.prefixlen) + ) def verify_not_in_route_dump(self, routes): for r in routes: - self.assertFalse(find_route(self, - r.prefix.network_address, - r.prefix.prefixlen)) + self.assertFalse( + find_route(self, r.prefix.network_address, r.prefix.prefixlen) + ) @classmethod def setUpClass(cls): @@ -636,18 +671,21 @@ class TestIPv4FibCrud(VppTestCase): self.deleted_routes = [] def test_1_add_routes(self): - """ Add 1k routes """ + """Add 1k routes""" # add 100 routes check with traffic script. - self.configured_routes.extend(self.config_fib_many_to_one( - "10.0.0.%d", self.pg0.remote_ip4, 100)) + self.configured_routes.extend( + self.config_fib_many_to_one("10.0.0.%d", self.pg0.remote_ip4, 100) + ) self.verify_route_dump(self.configured_routes) self.stream_1 = self.create_stream( - self.pg1, self.pg0, self.configured_routes, 100) + self.pg1, self.pg0, self.configured_routes, 100 + ) self.stream_2 = self.create_stream( - self.pg2, self.pg0, self.configured_routes, 100) + self.pg2, self.pg0, self.configured_routes, 100 + ) self.pg1.add_stream(self.stream_1) self.pg2.add_stream(self.stream_2) @@ -658,28 +696,32 @@ class TestIPv4FibCrud(VppTestCase): self.verify_capture(self.pg0, pkts, self.stream_1 + self.stream_2) def test_2_del_routes(self): - """ Delete 100 routes + """Delete 100 routes - delete 10 routes check with traffic script. """ # config 1M FIB entries - self.configured_routes.extend(self.config_fib_many_to_one( - "10.0.0.%d", self.pg0.remote_ip4, 100)) - self.deleted_routes.extend(self.unconfig_fib_many_to_one( - "10.0.0.%d", self.pg0.remote_ip4, 10, start=10)) + self.configured_routes.extend( + self.config_fib_many_to_one("10.0.0.%d", self.pg0.remote_ip4, 100) + ) + self.deleted_routes.extend( + self.unconfig_fib_many_to_one( + "10.0.0.%d", self.pg0.remote_ip4, 10, start=10 + ) + ) for x in self.deleted_routes: self.configured_routes.remove(x) self.verify_route_dump(self.configured_routes) self.stream_1 = self.create_stream( - self.pg1, self.pg0, self.configured_routes, 100) + self.pg1, self.pg0, self.configured_routes, 100 + ) self.stream_2 = self.create_stream( - self.pg2, self.pg0, self.configured_routes, 100) - self.stream_3 = self.create_stream( - self.pg1, self.pg0, self.deleted_routes, 100) - self.stream_4 = self.create_stream( - self.pg2, self.pg0, self.deleted_routes, 100) + self.pg2, self.pg0, self.configured_routes, 100 + ) + self.stream_3 = self.create_stream(self.pg1, self.pg0, self.deleted_routes, 100) + self.stream_4 = self.create_stream(self.pg2, self.pg0, self.deleted_routes, 100) self.pg1.add_stream(self.stream_1 + self.stream_3) self.pg2.add_stream(self.stream_2 + self.stream_4) self.pg_enable_capture(self.pg_interfaces) @@ -689,38 +731,42 @@ class TestIPv4FibCrud(VppTestCase): self.verify_capture(self.pg0, pkts, self.stream_1 + self.stream_2) def test_3_add_new_routes(self): - """ Add 1k routes + """Add 1k routes - re-add 5 routes check with traffic script. - add 100 routes check with traffic script. """ # config 1M FIB entries - self.configured_routes.extend(self.config_fib_many_to_one( - "10.0.0.%d", self.pg0.remote_ip4, 100)) - self.deleted_routes.extend(self.unconfig_fib_many_to_one( - "10.0.0.%d", self.pg0.remote_ip4, 10, start=10)) + self.configured_routes.extend( + self.config_fib_many_to_one("10.0.0.%d", self.pg0.remote_ip4, 100) + ) + self.deleted_routes.extend( + self.unconfig_fib_many_to_one( + "10.0.0.%d", self.pg0.remote_ip4, 10, start=10 + ) + ) for x in self.deleted_routes: self.configured_routes.remove(x) - tmp = self.config_fib_many_to_one( - "10.0.0.%d", self.pg0.remote_ip4, 5, start=10) + tmp = self.config_fib_many_to_one("10.0.0.%d", self.pg0.remote_ip4, 5, start=10) self.configured_routes.extend(tmp) for x in tmp: self.deleted_routes.remove(x) - self.configured_routes.extend(self.config_fib_many_to_one( - "10.0.1.%d", self.pg0.remote_ip4, 100)) + self.configured_routes.extend( + self.config_fib_many_to_one("10.0.1.%d", self.pg0.remote_ip4, 100) + ) self.verify_route_dump(self.configured_routes) self.stream_1 = self.create_stream( - self.pg1, self.pg0, self.configured_routes, 300) + self.pg1, self.pg0, self.configured_routes, 300 + ) self.stream_2 = self.create_stream( - self.pg2, self.pg0, self.configured_routes, 300) - self.stream_3 = self.create_stream( - self.pg1, self.pg0, self.deleted_routes, 100) - self.stream_4 = self.create_stream( - self.pg2, self.pg0, self.deleted_routes, 100) + self.pg2, self.pg0, self.configured_routes, 300 + ) + self.stream_3 = self.create_stream(self.pg1, self.pg0, self.deleted_routes, 100) + self.stream_4 = self.create_stream(self.pg2, self.pg0, self.deleted_routes, 100) self.pg1.add_stream(self.stream_1 + self.stream_3) self.pg2.add_stream(self.stream_2 + self.stream_4) @@ -732,17 +778,20 @@ class TestIPv4FibCrud(VppTestCase): # delete 5 routes check with traffic script. # add 100 routes check with traffic script. - self.deleted_routes.extend(self.unconfig_fib_many_to_one( - "10.0.0.%d", self.pg0.remote_ip4, 15)) - self.deleted_routes.extend(self.unconfig_fib_many_to_one( - "10.0.0.%d", self.pg0.remote_ip4, 85)) - self.deleted_routes.extend(self.unconfig_fib_many_to_one( - "10.0.1.%d", self.pg0.remote_ip4, 100)) + self.deleted_routes.extend( + self.unconfig_fib_many_to_one("10.0.0.%d", self.pg0.remote_ip4, 15) + ) + self.deleted_routes.extend( + self.unconfig_fib_many_to_one("10.0.0.%d", self.pg0.remote_ip4, 85) + ) + self.deleted_routes.extend( + self.unconfig_fib_many_to_one("10.0.1.%d", self.pg0.remote_ip4, 100) + ) self.verify_not_in_route_dump(self.deleted_routes) class TestIPNull(VppTestCase): - """ IPv4 routes via NULL """ + """IPv4 routes via NULL""" @classmethod def setUpClass(cls): @@ -770,23 +819,29 @@ class TestIPNull(VppTestCase): i.admin_down() def test_ip_null(self): - """ IP NULL route """ + """IP NULL route""" # # A route via IP NULL that will reply with ICMP unreachables # ip_unreach = VppIpRoute( - self, "10.0.0.1", 32, - [VppRoutePath("0.0.0.0", - 0xffffffff, - type=FibPathType.FIB_PATH_TYPE_ICMP_UNREACH)]) + self, + "10.0.0.1", + 32, + [ + VppRoutePath( + "0.0.0.0", 0xFFFFFFFF, type=FibPathType.FIB_PATH_TYPE_ICMP_UNREACH + ) + ], + ) ip_unreach.add_vpp_config() - p_unreach = (Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst="10.0.0.1") / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) + p_unreach = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst="10.0.0.1") + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) self.pg0.add_stream(p_unreach) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -809,17 +864,23 @@ class TestIPNull(VppTestCase): # A route via IP NULL that will reply with ICMP prohibited # ip_prohibit = VppIpRoute( - self, "10.0.0.2", 32, - [VppRoutePath("0.0.0.0", - 0xffffffff, - type=FibPathType.FIB_PATH_TYPE_ICMP_PROHIBIT)]) + self, + "10.0.0.2", + 32, + [ + VppRoutePath( + "0.0.0.0", 0xFFFFFFFF, type=FibPathType.FIB_PATH_TYPE_ICMP_PROHIBIT + ) + ], + ) ip_prohibit.add_vpp_config() - p_prohibit = (Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst="10.0.0.2") / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) + p_prohibit = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst="10.0.0.2") + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) self.pg0.add_stream(p_prohibit) self.pg_enable_capture(self.pg_interfaces) @@ -836,17 +897,21 @@ class TestIPNull(VppTestCase): self.assertEqual(icmp.dst, "10.0.0.2") def test_ip_drop(self): - """ IP Drop Routes """ + """IP Drop Routes""" - p = (Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst="1.1.1.1") / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst="1.1.1.1") + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) - r1 = VppIpRoute(self, "1.1.1.0", 24, - [VppRoutePath(self.pg1.remote_ip4, - self.pg1.sw_if_index)]) + r1 = VppIpRoute( + self, + "1.1.1.0", + 24, + [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)], + ) r1.add_vpp_config() rx = self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1) @@ -854,10 +919,12 @@ class TestIPNull(VppTestCase): # # insert a more specific as a drop # - r2 = VppIpRoute(self, "1.1.1.1", 32, - [VppRoutePath("0.0.0.0", - 0xffffffff, - type=FibPathType.FIB_PATH_TYPE_DROP)]) + r2 = VppIpRoute( + self, + "1.1.1.1", + 32, + [VppRoutePath("0.0.0.0", 0xFFFFFFFF, type=FibPathType.FIB_PATH_TYPE_DROP)], + ) r2.add_vpp_config() self.send_and_assert_no_replies(self.pg0, p * NUM_PKTS, "Drop Route") @@ -866,7 +933,7 @@ class TestIPNull(VppTestCase): class TestIPDisabled(VppTestCase): - """ IPv4 disabled """ + """IPv4 disabled""" @classmethod def setUpClass(cls): @@ -897,7 +964,7 @@ class TestIPDisabled(VppTestCase): i.admin_down() def test_ip_disabled(self): - """ IP Disabled """ + """IP Disabled""" MRouteItfFlags = VppEnum.vl_api_mfib_itf_flags_t MRouteEntryFlags = VppEnum.vl_api_mfib_entry_flags_t @@ -909,24 +976,32 @@ class TestIPDisabled(VppTestCase): route_232_1_1_1 = VppIpMRoute( self, "0.0.0.0", - "232.1.1.1", 32, + "232.1.1.1", + 32, MRouteEntryFlags.MFIB_API_ENTRY_FLAG_NONE, - [VppMRoutePath(self.pg1.sw_if_index, - MRouteItfFlags.MFIB_API_ITF_FLAG_ACCEPT), - VppMRoutePath(self.pg0.sw_if_index, - MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD)]) + [ + VppMRoutePath( + self.pg1.sw_if_index, MRouteItfFlags.MFIB_API_ITF_FLAG_ACCEPT + ), + VppMRoutePath( + self.pg0.sw_if_index, MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD + ), + ], + ) route_232_1_1_1.add_vpp_config() - pu = (Ether(src=self.pg1.remote_mac, - dst=self.pg1.local_mac) / - IP(src="10.10.10.10", dst=self.pg0.remote_ip4) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) - pm = (Ether(src=self.pg1.remote_mac, - dst=self.pg1.local_mac) / - IP(src="10.10.10.10", dst="232.1.1.1") / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) + pu = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src="10.10.10.10", dst=self.pg0.remote_ip4) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) + pm = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src="10.10.10.10", dst="232.1.1.1") + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) # # PG1 does not forward IP traffic @@ -965,7 +1040,7 @@ class TestIPDisabled(VppTestCase): class TestIPSubNets(VppTestCase): - """ IPv4 Subnets """ + """IPv4 Subnets""" @classmethod def setUpClass(cls): @@ -995,22 +1070,26 @@ class TestIPSubNets(VppTestCase): i.admin_down() def test_ip_sub_nets(self): - """ IP Sub Nets """ + """IP Sub Nets""" # # Configure a covering route to forward so we know # when we are dropping # - cover_route = VppIpRoute(self, "10.0.0.0", 8, - [VppRoutePath(self.pg1.remote_ip4, - self.pg1.sw_if_index)]) + cover_route = VppIpRoute( + self, + "10.0.0.0", + 8, + [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)], + ) cover_route.add_vpp_config() - p = (Ether(src=self.pg1.remote_mac, - dst=self.pg1.local_mac) / - IP(dst="10.10.10.10", src=self.pg0.local_ip4) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) + p = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(dst="10.10.10.10", src=self.pg0.local_ip4) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) @@ -1023,28 +1102,29 @@ class TestIPSubNets(VppTestCase): ip_addr_n = socket.inet_pton(socket.AF_INET, "10.10.10.10") self.vapi.sw_interface_add_del_address( - sw_if_index=self.pg0.sw_if_index, - prefix="10.10.10.10/16") - - pn = (Ether(src=self.pg1.remote_mac, - dst=self.pg1.local_mac) / - IP(dst="10.10.0.0", src=self.pg0.local_ip4) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) - pb = (Ether(src=self.pg1.remote_mac, - dst=self.pg1.local_mac) / - IP(dst="10.10.255.255", src=self.pg0.local_ip4) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) + sw_if_index=self.pg0.sw_if_index, prefix="10.10.10.10/16" + ) + + pn = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(dst="10.10.0.0", src=self.pg0.local_ip4) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) + pb = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(dst="10.10.255.255", src=self.pg0.local_ip4) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) self.send_and_assert_no_replies(self.pg1, pn, "IP Network address") self.send_and_assert_no_replies(self.pg1, pb, "IP Broadcast address") # remove the sub-net and we are forwarding via the cover again self.vapi.sw_interface_add_del_address( - sw_if_index=self.pg0.sw_if_index, - prefix="10.10.10.10/16", - is_add=0) + sw_if_index=self.pg0.sw_if_index, prefix="10.10.10.10/16", is_add=0 + ) self.pg1.add_stream(pn) self.pg_enable_capture(self.pg_interfaces) @@ -1062,14 +1142,15 @@ class TestIPSubNets(VppTestCase): ip_addr_n = socket.inet_pton(socket.AF_INET, "10.10.10.10") self.vapi.sw_interface_add_del_address( - sw_if_index=self.pg0.sw_if_index, - prefix="10.10.10.10/31") + sw_if_index=self.pg0.sw_if_index, prefix="10.10.10.10/31" + ) - pn = (Ether(src=self.pg1.remote_mac, - dst=self.pg1.local_mac) / - IP(dst="10.10.10.11", src=self.pg0.local_ip4) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) + pn = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(dst="10.10.10.11", src=self.pg0.local_ip4) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) self.pg1.add_stream(pn) self.pg_enable_capture(self.pg_interfaces) @@ -1079,8 +1160,8 @@ class TestIPSubNets(VppTestCase): # remove the sub-net and we are forwarding via the cover again self.vapi.sw_interface_add_del_address( - sw_if_index=self.pg0.sw_if_index, - prefix="10.10.10.10/31", is_add=0) + sw_if_index=self.pg0.sw_if_index, prefix="10.10.10.10/31", is_add=0 + ) self.pg1.add_stream(pn) self.pg_enable_capture(self.pg_interfaces) @@ -1089,7 +1170,7 @@ class TestIPSubNets(VppTestCase): class TestIPLoadBalance(VppTestCase): - """ IPv4 Load-Balancing """ + """IPv4 Load-Balancing""" @classmethod def setUpClass(cls): @@ -1126,7 +1207,7 @@ class TestIPLoadBalance(VppTestCase): return n def test_ip_load_balance(self): - """ IP Load-Balancing """ + """IP Load-Balancing""" fhc = VppEnum.vl_api_ip_flow_hash_config_t af = VppEnum.vl_api_address_family_t @@ -1144,33 +1225,47 @@ class TestIPLoadBalance(VppTestCase): src_mpls_pkts = [] for ii in range(NUM_PKTS): - port_ip_hdr = (IP(dst="10.0.0.1", src="20.0.0.1") / - UDP(sport=1234, dport=1234 + ii) / - Raw(b'\xa5' * 100)) - port_ip_pkts.append((Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - port_ip_hdr)) - port_mpls_pkts.append((Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - MPLS(label=66, ttl=2) / - port_ip_hdr)) - - src_ip_hdr = (IP(dst="10.0.0.1", src="20.0.0.%d" % ii) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) - src_ip_pkts.append((Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - src_ip_hdr)) - src_mpls_pkts.append((Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - MPLS(label=66, ttl=2) / - src_ip_hdr)) - - route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32, - [VppRoutePath(self.pg1.remote_ip4, - self.pg1.sw_if_index), - VppRoutePath(self.pg2.remote_ip4, - self.pg2.sw_if_index)]) + port_ip_hdr = ( + IP(dst="10.0.0.1", src="20.0.0.1") + / UDP(sport=1234, dport=1234 + ii) + / Raw(b"\xa5" * 100) + ) + port_ip_pkts.append( + (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / port_ip_hdr) + ) + port_mpls_pkts.append( + ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / MPLS(label=66, ttl=2) + / port_ip_hdr + ) + ) + + src_ip_hdr = ( + IP(dst="10.0.0.1", src="20.0.0.%d" % ii) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) + src_ip_pkts.append( + (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / src_ip_hdr) + ) + src_mpls_pkts.append( + ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / MPLS(label=66, ttl=2) + / src_ip_hdr + ) + ) + + route_10_0_0_1 = VppIpRoute( + self, + "10.0.0.1", + 32, + [ + VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index), + VppRoutePath(self.pg2.remote_ip4, self.pg2.sw_if_index), + ], + ) route_10_0_0_1.add_vpp_config() binding = VppMplsIpBind(self, 66, "10.0.0.1", 32) @@ -1185,15 +1280,17 @@ class TestIPLoadBalance(VppTestCase): # be guaranteed. But with 64 different packets we do expect some # balancing. So instead just ensure there is traffic on each link. # - rx = self.send_and_expect_load_balancing(self.pg0, port_ip_pkts, - [self.pg1, self.pg2]) + rx = self.send_and_expect_load_balancing( + self.pg0, port_ip_pkts, [self.pg1, self.pg2] + ) n_ip_pg0 = len(rx[0]) - self.send_and_expect_load_balancing(self.pg0, src_ip_pkts, - [self.pg1, self.pg2]) - self.send_and_expect_load_balancing(self.pg0, port_mpls_pkts, - [self.pg1, self.pg2]) - rx = self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts, - [self.pg1, self.pg2]) + self.send_and_expect_load_balancing(self.pg0, src_ip_pkts, [self.pg1, self.pg2]) + self.send_and_expect_load_balancing( + self.pg0, port_mpls_pkts, [self.pg1, self.pg2] + ) + rx = self.send_and_expect_load_balancing( + self.pg0, src_mpls_pkts, [self.pg1, self.pg2] + ) n_mpls_pg0 = len(rx[0]) # @@ -1201,12 +1298,14 @@ class TestIPLoadBalance(VppTestCase): # self.vapi.set_ip_flow_hash_router_id(router_id=0x11111111) - rx = self.send_and_expect_load_balancing(self.pg0, port_ip_pkts, - [self.pg1, self.pg2]) + rx = self.send_and_expect_load_balancing( + self.pg0, port_ip_pkts, [self.pg1, self.pg2] + ) self.assertNotEqual(n_ip_pg0, len(rx[0])) - rx = self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts, - [self.pg1, self.pg2]) + rx = self.send_and_expect_load_balancing( + self.pg0, src_mpls_pkts, [self.pg1, self.pg2] + ) self.assertNotEqual(n_mpls_pg0, len(rx[0])) # @@ -1217,22 +1316,24 @@ class TestIPLoadBalance(VppTestCase): self.vapi.set_ip_flow_hash_v2( af=af.ADDRESS_IP4, table_id=0, - flow_hash_config=(fhc.IP_API_FLOW_HASH_SRC_IP | - fhc.IP_API_FLOW_HASH_DST_IP | - fhc.IP_API_FLOW_HASH_PROTO)) - - self.send_and_expect_load_balancing(self.pg0, src_ip_pkts, - [self.pg1, self.pg2]) - self.send_and_expect_load_balancing(self.pg0, src_mpls_pkts, - [self.pg1, self.pg2]) + flow_hash_config=( + fhc.IP_API_FLOW_HASH_SRC_IP + | fhc.IP_API_FLOW_HASH_DST_IP + | fhc.IP_API_FLOW_HASH_PROTO + ), + ) + + self.send_and_expect_load_balancing(self.pg0, src_ip_pkts, [self.pg1, self.pg2]) + self.send_and_expect_load_balancing( + self.pg0, src_mpls_pkts, [self.pg1, self.pg2] + ) self.send_and_expect_only(self.pg0, port_ip_pkts, self.pg2) # # change the flow hash config back to defaults # - self.vapi.set_ip_flow_hash(vrf_id=0, src=1, dst=1, - proto=1, sport=1, dport=1) + self.vapi.set_ip_flow_hash(vrf_id=0, src=1, dst=1, proto=1, sport=1, dport=1) # # Recursive prefixes @@ -1243,48 +1344,64 @@ class TestIPLoadBalance(VppTestCase): src_pkts = [] for ii in range(257): - port_pkts.append((Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - IP(dst="1.1.1.1", src="20.0.0.1") / - UDP(sport=1234, dport=1234 + ii) / - Raw(b'\xa5' * 100))) - src_pkts.append((Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - IP(dst="1.1.1.1", src="20.0.0.%d" % ii) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100))) - - route_10_0_0_2 = VppIpRoute(self, "10.0.0.2", 32, - [VppRoutePath(self.pg3.remote_ip4, - self.pg3.sw_if_index), - VppRoutePath(self.pg4.remote_ip4, - self.pg4.sw_if_index)]) + port_pkts.append( + ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(dst="1.1.1.1", src="20.0.0.1") + / UDP(sport=1234, dport=1234 + ii) + / Raw(b"\xa5" * 100) + ) + ) + src_pkts.append( + ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(dst="1.1.1.1", src="20.0.0.%d" % ii) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) + ) + + route_10_0_0_2 = VppIpRoute( + self, + "10.0.0.2", + 32, + [ + VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index), + VppRoutePath(self.pg4.remote_ip4, self.pg4.sw_if_index), + ], + ) route_10_0_0_2.add_vpp_config() - route_1_1_1_1 = VppIpRoute(self, "1.1.1.1", 32, - [VppRoutePath("10.0.0.2", 0xffffffff), - VppRoutePath("10.0.0.1", 0xffffffff)]) + route_1_1_1_1 = VppIpRoute( + self, + "1.1.1.1", + 32, + [ + VppRoutePath("10.0.0.2", 0xFFFFFFFF), + VppRoutePath("10.0.0.1", 0xFFFFFFFF), + ], + ) route_1_1_1_1.add_vpp_config() # # inject the packet on pg0 - expect load-balancing across all 4 paths # self.vapi.cli("clear trace") - self.send_and_expect_load_balancing(self.pg0, port_pkts, - [self.pg1, self.pg2, - self.pg3, self.pg4]) - self.send_and_expect_load_balancing(self.pg0, src_pkts, - [self.pg1, self.pg2, - self.pg3, self.pg4]) + self.send_and_expect_load_balancing( + self.pg0, port_pkts, [self.pg1, self.pg2, self.pg3, self.pg4] + ) + self.send_and_expect_load_balancing( + self.pg0, src_pkts, [self.pg1, self.pg2, self.pg3, self.pg4] + ) # # bring down pg1 expect LB to adjust to use only those that are up # self.pg1.link_down() - rx = self.send_and_expect_load_balancing(self.pg0, src_pkts, - [self.pg2, self.pg3, - self.pg4]) + rx = self.send_and_expect_load_balancing( + self.pg0, src_pkts, [self.pg2, self.pg3, self.pg4] + ) self.assertEqual(len(src_pkts), self.total_len(rx)) # @@ -1292,8 +1409,9 @@ class TestIPLoadBalance(VppTestCase): # self.pg2.link_down() - rx = self.send_and_expect_load_balancing(self.pg0, src_pkts, - [self.pg3, self.pg4]) + rx = self.send_and_expect_load_balancing( + self.pg0, src_pkts, [self.pg3, self.pg4] + ) self.assertEqual(len(src_pkts), self.total_len(rx)) # @@ -1302,9 +1420,9 @@ class TestIPLoadBalance(VppTestCase): self.pg1.link_up() self.pg2.link_up() - rx = self.send_and_expect_load_balancing(self.pg0, src_pkts, - [self.pg1, self.pg2, - self.pg3, self.pg4]) + rx = self.send_and_expect_load_balancing( + self.pg0, src_pkts, [self.pg1, self.pg2, self.pg3, self.pg4] + ) self.assertEqual(len(src_pkts), self.total_len(rx)) # @@ -1312,16 +1430,17 @@ class TestIPLoadBalance(VppTestCase): # self.pg1.admin_down() self.pg2.admin_down() - rx = self.send_and_expect_load_balancing(self.pg0, src_pkts, - [self.pg3, self.pg4]) + rx = self.send_and_expect_load_balancing( + self.pg0, src_pkts, [self.pg3, self.pg4] + ) self.assertEqual(len(src_pkts), self.total_len(rx)) self.pg1.admin_up() self.pg2.admin_up() self.pg1.resolve_arp() self.pg2.resolve_arp() - rx = self.send_and_expect_load_balancing(self.pg0, src_pkts, - [self.pg1, self.pg2, - self.pg3, self.pg4]) + rx = self.send_and_expect_load_balancing( + self.pg0, src_pkts, [self.pg1, self.pg2, self.pg3, self.pg4] + ) self.assertEqual(len(src_pkts), self.total_len(rx)) # @@ -1331,19 +1450,26 @@ class TestIPLoadBalance(VppTestCase): port_pkts = [] for ii in range(257): - port_pkts.append((Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - IP(dst="1.1.1.2", src="20.0.0.2") / - UDP(sport=1234, dport=1234 + ii) / - Raw(b'\xa5' * 100))) - - route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32, - [VppRoutePath(self.pg3.remote_ip4, - self.pg3.sw_if_index)]) + port_pkts.append( + ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(dst="1.1.1.2", src="20.0.0.2") + / UDP(sport=1234, dport=1234 + ii) + / Raw(b"\xa5" * 100) + ) + ) + + route_10_0_0_3 = VppIpRoute( + self, + "10.0.0.3", + 32, + [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)], + ) route_10_0_0_3.add_vpp_config() - route_1_1_1_2 = VppIpRoute(self, "1.1.1.2", 32, - [VppRoutePath("10.0.0.3", 0xffffffff)]) + route_1_1_1_2 = VppIpRoute( + self, "1.1.1.2", 32, [VppRoutePath("10.0.0.3", 0xFFFFFFFF)] + ) route_1_1_1_2.add_vpp_config() # @@ -1358,33 +1484,39 @@ class TestIPLoadBalance(VppTestCase): # self.pg3.link_down() - route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32, - [VppRoutePath(self.pg3.remote_ip4, - self.pg3.sw_if_index), - VppRoutePath(self.pg4.remote_ip4, - self.pg4.sw_if_index)]) + route_10_0_0_3 = VppIpRoute( + self, + "10.0.0.3", + 32, + [ + VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index), + VppRoutePath(self.pg4.remote_ip4, self.pg4.sw_if_index), + ], + ) route_10_0_0_3.add_vpp_config() port_pkts = [] for ii in range(257): - port_pkts.append(Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - IP(dst="10.0.0.3", src="20.0.0.2") / - UDP(sport=1234, dport=1234 + ii) / - Raw(b'\xa5' * 100)) + port_pkts.append( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(dst="10.0.0.3", src="20.0.0.2") + / UDP(sport=1234, dport=1234 + ii) + / Raw(b"\xa5" * 100) + ) self.send_and_expect_only(self.pg0, port_pkts, self.pg4) # bring the link back up self.pg3.link_up() - rx = self.send_and_expect_load_balancing(self.pg0, port_pkts, - [self.pg3, self.pg4]) + rx = self.send_and_expect_load_balancing( + self.pg0, port_pkts, [self.pg3, self.pg4] + ) self.assertEqual(len(src_pkts), self.total_len(rx)) class TestIPVlan0(VppTestCase): - """ IPv4 VLAN-0 """ + """IPv4 VLAN-0""" @classmethod def setUpClass(cls): @@ -1415,15 +1547,15 @@ class TestIPVlan0(VppTestCase): super(TestIPVlan0, self).tearDown() def test_ip_vlan_0(self): - """ IP VLAN-0 """ + """IP VLAN-0""" - pkts = (Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - Dot1Q(vlan=0) / - IP(dst=self.pg1.remote_ip4, - src=self.pg0.remote_ip4) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) * NUM_PKTS + pkts = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / Dot1Q(vlan=0) + / IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) * NUM_PKTS # # Expect that packets sent on VLAN-0 are forwarded on the @@ -1433,7 +1565,7 @@ class TestIPVlan0(VppTestCase): class IPPuntSetup(object): - """ Setup for IPv4 Punt Police/Redirect """ + """Setup for IPv4 Punt Police/Redirect""" def punt_setup(self): self.create_pg_interfaces(range(4)) @@ -1449,37 +1581,38 @@ class IPPuntSetup(object): af_ip4 = VppEnum.vl_api_address_family_t.ADDRESS_IP4 udp_proto = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP punt_udp = { - 'type': pt_l4, - 'punt': { - 'l4': { - 'af': af_ip4, - 'protocol': udp_proto, - 'port': 1234, + "type": pt_l4, + "punt": { + "l4": { + "af": af_ip4, + "protocol": udp_proto, + "port": 1234, } - } + }, } self.vapi.set_punt(is_add=1, punt=punt_udp) af_ip6 = VppEnum.vl_api_address_family_t.ADDRESS_IP6 punt_udp = { - 'type': pt_l4, - 'punt': { - 'l4': { - 'af': af_ip6, - 'protocol': udp_proto, - 'port': 1236, + "type": pt_l4, + "punt": { + "l4": { + "af": af_ip6, + "protocol": udp_proto, + "port": 1236, } - } + }, } self.vapi.set_punt(is_add=1, punt=punt_udp) - self.pkt = (Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) + self.pkt = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) def punt_teardown(self): for i in self.pg_interfaces: @@ -1488,7 +1621,7 @@ class IPPuntSetup(object): class TestIPPunt(IPPuntSetup, VppTestCase): - """ IPv4 Punt Police/Redirect """ + """IPv4 Punt Police/Redirect""" def setUp(self): super().setUp() @@ -1499,26 +1632,30 @@ class TestIPPunt(IPPuntSetup, VppTestCase): super().tearDown() def test_ip_punt_api_validation(self): - """ IP punt API parameter validation """ + """IP punt API parameter validation""" nh_addr = self.pg1.remote_ip4 - punt = {"rx_sw_if_index": self.pg0.sw_if_index, - "af": VppEnum.vl_api_address_family_t.ADDRESS_IP4, - "n_paths": 1000000, - "paths": []} + punt = { + "rx_sw_if_index": self.pg0.sw_if_index, + "af": VppEnum.vl_api_address_family_t.ADDRESS_IP4, + "n_paths": 1000000, + "paths": [], + } with self.assertRaises(vpp_papi.VPPIOError): self.vapi.add_del_ip_punt_redirect_v2(punt=punt, is_add=True) - punt = {"rx_sw_if_index": self.pg0.sw_if_index, - "af": VppEnum.vl_api_address_family_t.ADDRESS_IP4, - "n_paths": 0, - "paths": []} + punt = { + "rx_sw_if_index": self.pg0.sw_if_index, + "af": VppEnum.vl_api_address_family_t.ADDRESS_IP4, + "n_paths": 0, + "paths": [], + } self.vapi.add_del_ip_punt_redirect_v2(punt=punt, is_add=True) def test_ip_punt(self): - """ IP punt police and redirect """ + """IP punt police and redirect""" pkts = self.pkt * 1025 @@ -1526,8 +1663,9 @@ class TestIPPunt(IPPuntSetup, VppTestCase): # Configure a punt redirect via pg1. # nh_addr = self.pg1.remote_ip4 - ip_punt_redirect = VppIpPuntRedirect(self, self.pg0.sw_if_index, - self.pg1.sw_if_index, nh_addr) + ip_punt_redirect = VppIpPuntRedirect( + self, self.pg0.sw_if_index, self.pg1.sw_if_index, nh_addr + ) ip_punt_redirect.add_vpp_config() self.send_and_expect(self.pg0, pkts, self.pg1) @@ -1554,9 +1692,9 @@ class TestIPPunt(IPPuntSetup, VppTestCase): stats = policer.get_stats() # Single rate policer - expect conform, violate but no exceed - self.assertGreater(stats['conform_packets'], 0) - self.assertEqual(stats['exceed_packets'], 0) - self.assertGreater(stats['violate_packets'], 0) + self.assertGreater(stats["conform_packets"], 0) + self.assertEqual(stats["exceed_packets"], 0) + self.assertGreater(stats["violate_packets"], 0) self.assertGreater(len(rx), 0) self.assertLess(len(rx), len(pkts)) @@ -1572,32 +1710,28 @@ class TestIPPunt(IPPuntSetup, VppTestCase): # remove the redirect. expect full drop. # ip_punt_redirect.remove_vpp_config() - self.send_and_assert_no_replies(self.pg0, pkts, - "IP no punt config") + self.send_and_assert_no_replies(self.pg0, pkts, "IP no punt config") # # Add a redirect that is not input port selective # - ip_punt_redirect = VppIpPuntRedirect(self, 0xffffffff, - self.pg1.sw_if_index, nh_addr) + ip_punt_redirect = VppIpPuntRedirect( + self, 0xFFFFFFFF, self.pg1.sw_if_index, nh_addr + ) ip_punt_redirect.add_vpp_config() self.send_and_expect(self.pg0, pkts, self.pg1) ip_punt_redirect.remove_vpp_config() def test_ip_punt_vrf(self): - """ IP punt/local with VRFs """ + """IP punt/local with VRFs""" # use a punt redirect to test if for-us packets are accepted pkts = self.pkt * 1025 - vlans_pg0 = [VppDot1QSubint(self, self.pg0, v) - for v in range(100, 104)] - vlans_pg1 = [VppDot1QSubint(self, self.pg1, v) - for v in range(100, 104)] - tbl4 = [VppIpTable(self, v).add_vpp_config() - for v in range(100, 104)] - tbl6 = [VppIpTable(self, v, True).add_vpp_config() - for v in range(100, 104)] + vlans_pg0 = [VppDot1QSubint(self, self.pg0, v) for v in range(100, 104)] + vlans_pg1 = [VppDot1QSubint(self, self.pg1, v) for v in range(100, 104)] + tbl4 = [VppIpTable(self, v).add_vpp_config() for v in range(100, 104)] + tbl6 = [VppIpTable(self, v, True).add_vpp_config() for v in range(100, 104)] for v in vlans_pg0 + vlans_pg1: v.admin_up() @@ -1608,27 +1742,35 @@ class TestIPPunt(IPPuntSetup, VppTestCase): v.resolve_arp() v.resolve_ndp() - [VppIpPuntRedirect - (self, - vlans_pg0[i].sw_if_index, - vlans_pg1[i].sw_if_index, - vlans_pg1[i].remote_ip4).add_vpp_config() - for i in range(4)] - [VppIpPuntRedirect - (self, - vlans_pg0[i].sw_if_index, - vlans_pg1[i].sw_if_index, - vlans_pg1[i].remote_ip6).add_vpp_config() - for i in range(4)] - - pkts = [(Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - Dot1Q(vlan=i.vlan) / - IP(src=i.remote_ip4, - dst=i.local_ip4) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) - for i in vlans_pg0] + [ + VppIpPuntRedirect( + self, + vlans_pg0[i].sw_if_index, + vlans_pg1[i].sw_if_index, + vlans_pg1[i].remote_ip4, + ).add_vpp_config() + for i in range(4) + ] + [ + VppIpPuntRedirect( + self, + vlans_pg0[i].sw_if_index, + vlans_pg1[i].sw_if_index, + vlans_pg1[i].remote_ip6, + ).add_vpp_config() + for i in range(4) + ] + + pkts = [ + ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / Dot1Q(vlan=i.vlan) + / IP(src=i.remote_ip4, dst=i.local_ip4) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) + for i in vlans_pg0 + ] self.send_and_expect(self.pg0, pkts, self.pg1) @@ -1637,33 +1779,37 @@ class TestIPPunt(IPPuntSetup, VppTestCase): # # we reject packets for source addresses in the wrong vlan/VRF - pkts = [(Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - Dot1Q(vlan=i.vlan) / - IP(src="1.1.1.1", - dst=i.local_ip4) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) - for i in vlans_pg0] + pkts = [ + ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / Dot1Q(vlan=i.vlan) + / IP(src="1.1.1.1", dst=i.local_ip4) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) + for i in vlans_pg0 + ] # single and dual loop self.send_and_assert_no_replies(self.pg0, [pkts[0]]) self.send_and_assert_no_replies(self.pg0, pkts) self.assert_error_counter_equal( - "/err/ip4-local/ip4 source lookup miss", - len(pkts) + 1) + "/err/ip4-local/ip4 source lookup miss", len(pkts) + 1 + ) # using the same source in different tables, should reject # for the table that the source is not present in # the first packet in the stream is drop - pkts = [(Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - Dot1Q(vlan=i.vlan) / - IP(src=vlans_pg0[0].remote_ip4, - dst=i.local_ip4) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) - for i in vlans_pg0] + pkts = [ + ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / Dot1Q(vlan=i.vlan) + / IP(src=vlans_pg0[0].remote_ip4, dst=i.local_ip4) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) + for i in vlans_pg0 + ] # single loop accept and drop # followed by both in the same frame/loop self.send_and_expect(self.pg0, [pkts[0]], self.pg1) @@ -1673,14 +1819,16 @@ class TestIPPunt(IPPuntSetup, VppTestCase): # using the same source in different tables, should reject # for the table that the source is not present in # the first packet in the stream is accept - pkts = [(Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - Dot1Q(vlan=i.vlan) / - IP(src=vlans_pg0[3].remote_ip4, - dst=i.local_ip4) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) - for i in vlans_pg0] + pkts = [ + ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / Dot1Q(vlan=i.vlan) + / IP(src=vlans_pg0[3].remote_ip4, dst=i.local_ip4) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) + for i in vlans_pg0 + ] # single loop accept and drop # followed by both in the same frame/loop @@ -1693,33 +1841,37 @@ class TestIPPunt(IPPuntSetup, VppTestCase): # # we reject packets for source addresses in the wrong vlan/VRF - pkts = [(Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - Dot1Q(vlan=i.vlan) / - IPv6(src="1::1", - dst=i.local_ip6) / - UDP(sport=1236, dport=1236) / - Raw(b'\xa5' * 100)) - for i in vlans_pg0] + pkts = [ + ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / Dot1Q(vlan=i.vlan) + / IPv6(src="1::1", dst=i.local_ip6) + / UDP(sport=1236, dport=1236) + / Raw(b"\xa5" * 100) + ) + for i in vlans_pg0 + ] # single and dual loop self.send_and_assert_no_replies(self.pg0, [pkts[0]]) self.send_and_assert_no_replies(self.pg0, pkts) self.assert_error_counter_equal( - "/err/ip6-input/ip6 source lookup miss", - len(pkts) + 1) + "/err/ip6-input/ip6 source lookup miss", len(pkts) + 1 + ) # using the same source in different tables, should reject # for the table that the source is not present in # the first packet in the stream is drop - pkts = [(Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - Dot1Q(vlan=i.vlan) / - IPv6(src=vlans_pg0[0].remote_ip6, - dst=i.local_ip6) / - UDP(sport=1236, dport=1236) / - Raw(b'\xa5' * 100)) - for i in vlans_pg0] + pkts = [ + ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / Dot1Q(vlan=i.vlan) + / IPv6(src=vlans_pg0[0].remote_ip6, dst=i.local_ip6) + / UDP(sport=1236, dport=1236) + / Raw(b"\xa5" * 100) + ) + for i in vlans_pg0 + ] # single loop accept and drop # followed by both in the same frame/loop self.send_and_expect(self.pg0, [pkts[0]], self.pg1) @@ -1729,14 +1881,16 @@ class TestIPPunt(IPPuntSetup, VppTestCase): # using the same source in different tables, should reject # for the table that the source is not present in # the first packet in the stream is accept - pkts = [(Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - Dot1Q(vlan=i.vlan) / - IPv6(src=vlans_pg0[3].remote_ip6, - dst=i.local_ip6) / - UDP(sport=1236, dport=1236) / - Raw(b'\xa5' * 100)) - for i in vlans_pg0] + pkts = [ + ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / Dot1Q(vlan=i.vlan) + / IPv6(src=vlans_pg0[3].remote_ip6, dst=i.local_ip6) + / UDP(sport=1236, dport=1236) + / Raw(b"\xa5" * 100) + ) + for i in vlans_pg0 + ] # single loop accept and drop # followed by both in the same frame/loop @@ -1751,18 +1905,21 @@ class TestIPPunt(IPPuntSetup, VppTestCase): v.set_table_ip6(0) def test_ip_punt_dump(self): - """ IP4 punt redirect dump""" + """IP4 punt redirect dump""" # # Configure a punt redirects # nh_address = self.pg3.remote_ip4 - ipr_03 = VppIpPuntRedirect(self, self.pg0.sw_if_index, - self.pg3.sw_if_index, nh_address) - ipr_13 = VppIpPuntRedirect(self, self.pg1.sw_if_index, - self.pg3.sw_if_index, nh_address) - ipr_23 = VppIpPuntRedirect(self, self.pg2.sw_if_index, - self.pg3.sw_if_index, "0.0.0.0") + ipr_03 = VppIpPuntRedirect( + self, self.pg0.sw_if_index, self.pg3.sw_if_index, nh_address + ) + ipr_13 = VppIpPuntRedirect( + self, self.pg1.sw_if_index, self.pg3.sw_if_index, nh_address + ) + ipr_23 = VppIpPuntRedirect( + self, self.pg2.sw_if_index, self.pg3.sw_if_index, "0.0.0.0" + ) ipr_03.add_vpp_config() ipr_13.add_vpp_config() ipr_23.add_vpp_config() @@ -1777,16 +1934,17 @@ class TestIPPunt(IPPuntSetup, VppTestCase): # # Dump punt redirects for all interfaces # - punts = self.vapi.ip_punt_redirect_dump(0xffffffff) + punts = self.vapi.ip_punt_redirect_dump(0xFFFFFFFF) self.assertEqual(len(punts), 3) for p in punts: self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index) self.assertNotEqual(punts[1].punt.nh, self.pg3.remote_ip4) - self.assertEqual(str(punts[2].punt.nh), '0.0.0.0') + self.assertEqual(str(punts[2].punt.nh), "0.0.0.0") class TestIPPuntHandoff(IPPuntSetup, VppTestCase): - """ IPv4 Punt Policer thread handoff """ + """IPv4 Punt Policer thread handoff""" + vpp_worker_count = 2 def setUp(self): @@ -1798,26 +1956,40 @@ class TestIPPuntHandoff(IPPuntSetup, VppTestCase): super(TestIPPuntHandoff, self).tearDown() def test_ip_punt_policer_handoff(self): - """ IP4 punt policer thread handoff """ + """IP4 punt policer thread handoff""" pkts = self.pkt * NUM_PKTS # # Configure a punt redirect via pg1. # nh_addr = self.pg1.remote_ip4 - ip_punt_redirect = VppIpPuntRedirect(self, self.pg0.sw_if_index, - self.pg1.sw_if_index, nh_addr) + ip_punt_redirect = VppIpPuntRedirect( + self, self.pg0.sw_if_index, self.pg1.sw_if_index, nh_addr + ) ip_punt_redirect.add_vpp_config() action_tx = PolicerAction( - VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, - 0) + VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0 + ) # # This policer drops no packets, we are just # testing that they get to the right thread. # - policer = VppPolicer(self, "ip4-punt", 400, 0, 10, 0, 1, - 0, 0, False, action_tx, action_tx, action_tx) + policer = VppPolicer( + self, + "ip4-punt", + 400, + 0, + 10, + 0, + 1, + 0, + 0, + False, + action_tx, + action_tx, + action_tx, + ) policer.add_vpp_config() ip_punt_policer = VppIpPuntPolicer(self, policer.policer_index) ip_punt_policer.add_vpp_config() @@ -1830,9 +2002,9 @@ class TestIPPuntHandoff(IPPuntSetup, VppTestCase): stats = policer.get_stats() # Single rate policer - expect conform, violate but no exceed - self.assertGreater(stats['conform_packets'], 0) - self.assertEqual(stats['exceed_packets'], 0) - self.assertGreater(stats['violate_packets'], 0) + self.assertGreater(stats["conform_packets"], 0) + self.assertEqual(stats["exceed_packets"], 0) + self.assertGreater(stats["violate_packets"], 0) # Worker 0, should have done all the policing stats0 = policer.get_stats(worker=0) @@ -1840,9 +2012,9 @@ class TestIPPuntHandoff(IPPuntSetup, VppTestCase): # Worker 1, should have handed everything off stats1 = policer.get_stats(worker=1) - self.assertEqual(stats1['conform_packets'], 0) - self.assertEqual(stats1['exceed_packets'], 0) - self.assertEqual(stats1['violate_packets'], 0) + self.assertEqual(stats1["conform_packets"], 0) + self.assertEqual(stats1["exceed_packets"], 0) + self.assertEqual(stats1["violate_packets"], 0) # Bind the policer to worker 1 and repeat policer.bind_vpp_config(1, True) @@ -1855,19 +2027,23 @@ class TestIPPuntHandoff(IPPuntSetup, VppTestCase): stats0 = policer.get_stats(worker=0) stats1 = policer.get_stats(worker=1) - self.assertGreater(stats0['conform_packets'], 0) - self.assertEqual(stats0['exceed_packets'], 0) - self.assertGreater(stats0['violate_packets'], 0) + self.assertGreater(stats0["conform_packets"], 0) + self.assertEqual(stats0["exceed_packets"], 0) + self.assertGreater(stats0["violate_packets"], 0) - self.assertGreater(stats1['conform_packets'], 0) - self.assertEqual(stats1['exceed_packets'], 0) - self.assertGreater(stats1['violate_packets'], 0) + self.assertGreater(stats1["conform_packets"], 0) + self.assertEqual(stats1["exceed_packets"], 0) + self.assertGreater(stats1["violate_packets"], 0) - self.assertEqual(stats0['conform_packets'] + stats1['conform_packets'], - stats['conform_packets']) + self.assertEqual( + stats0["conform_packets"] + stats1["conform_packets"], + stats["conform_packets"], + ) - self.assertEqual(stats0['violate_packets'] + stats1['violate_packets'], - stats['violate_packets']) + self.assertEqual( + stats0["violate_packets"] + stats1["violate_packets"], + stats["violate_packets"], + ) # Unbind the policer and repeat policer.bind_vpp_config(1, False) @@ -1880,11 +2056,9 @@ class TestIPPuntHandoff(IPPuntSetup, VppTestCase): stats0new = policer.get_stats(worker=0) stats1new = policer.get_stats(worker=1) - self.assertGreater(stats0new['conform_packets'], - stats0['conform_packets']) - self.assertEqual(stats0new['exceed_packets'], 0) - self.assertGreater(stats0new['violate_packets'], - stats0['violate_packets']) + self.assertGreater(stats0new["conform_packets"], stats0["conform_packets"]) + self.assertEqual(stats0new["exceed_packets"], 0) + self.assertGreater(stats0new["violate_packets"], stats0["violate_packets"]) self.assertEqual(stats1, stats1new) @@ -1897,7 +2071,7 @@ class TestIPPuntHandoff(IPPuntSetup, VppTestCase): class TestIPDeag(VppTestCase): - """ IPv4 Deaggregate Routes """ + """IPv4 Deaggregate Routes""" @classmethod def setUpClass(cls): @@ -1924,7 +2098,7 @@ class TestIPDeag(VppTestCase): i.admin_down() def test_ip_deag(self): - """ IP Deag Routes """ + """IP Deag Routes""" # # Create a table to be used for: @@ -1940,16 +2114,22 @@ class TestIPDeag(VppTestCase): # Add a route in the default table to point to a deag/ # second lookup in each of these tables # - route_to_dst = VppIpRoute(self, "1.1.1.1", 32, - [VppRoutePath("0.0.0.0", - 0xffffffff, - nh_table_id=1)]) + route_to_dst = VppIpRoute( + self, "1.1.1.1", 32, [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=1)] + ) route_to_src = VppIpRoute( - self, "1.1.1.2", 32, - [VppRoutePath("0.0.0.0", - 0xffffffff, - nh_table_id=2, - type=FibPathType.FIB_PATH_TYPE_SOURCE_LOOKUP)]) + self, + "1.1.1.2", + 32, + [ + VppRoutePath( + "0.0.0.0", + 0xFFFFFFFF, + nh_table_id=2, + type=FibPathType.FIB_PATH_TYPE_SOURCE_LOOKUP, + ) + ], + ) route_to_dst.add_vpp_config() route_to_src.add_vpp_config() @@ -1957,31 +2137,34 @@ class TestIPDeag(VppTestCase): # packets to these destination are dropped, since they'll # hit the respective default routes in the second table # - p_dst = (Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - IP(src="5.5.5.5", dst="1.1.1.1") / - TCP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) - p_src = (Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - IP(src="2.2.2.2", dst="1.1.1.2") / - TCP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) + p_dst = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src="5.5.5.5", dst="1.1.1.1") + / TCP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) + p_src = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src="2.2.2.2", dst="1.1.1.2") + / TCP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) pkts_dst = p_dst * 257 pkts_src = p_src * 257 - self.send_and_assert_no_replies(self.pg0, pkts_dst, - "IP in dst table") - self.send_and_assert_no_replies(self.pg0, pkts_src, - "IP in src table") + self.send_and_assert_no_replies(self.pg0, pkts_dst, "IP in dst table") + self.send_and_assert_no_replies(self.pg0, pkts_src, "IP in src table") # # add a route in the dst table to forward via pg1 # - route_in_dst = VppIpRoute(self, "1.1.1.1", 32, - [VppRoutePath(self.pg1.remote_ip4, - self.pg1.sw_if_index)], - table_id=1) + route_in_dst = VppIpRoute( + self, + "1.1.1.1", + 32, + [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)], + table_id=1, + ) route_in_dst.add_vpp_config() self.send_and_expect(self.pg0, pkts_dst, self.pg1) @@ -1989,34 +2172,36 @@ class TestIPDeag(VppTestCase): # # add a route in the src table to forward via pg2 # - route_in_src = VppIpRoute(self, "2.2.2.2", 32, - [VppRoutePath(self.pg2.remote_ip4, - self.pg2.sw_if_index)], - table_id=2) + route_in_src = VppIpRoute( + self, + "2.2.2.2", + 32, + [VppRoutePath(self.pg2.remote_ip4, self.pg2.sw_if_index)], + table_id=2, + ) route_in_src.add_vpp_config() self.send_and_expect(self.pg0, pkts_src, self.pg2) # # loop in the lookup DP # - route_loop = VppIpRoute(self, "2.2.2.3", 32, - [VppRoutePath("0.0.0.0", - 0xffffffff, - nh_table_id=0)]) + route_loop = VppIpRoute( + self, "2.2.2.3", 32, [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_table_id=0)] + ) route_loop.add_vpp_config() - p_l = (Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - IP(src="2.2.2.4", dst="2.2.2.3") / - TCP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) + p_l = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src="2.2.2.4", dst="2.2.2.3") + / TCP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) - self.send_and_assert_no_replies(self.pg0, p_l * 257, - "IP lookup loop") + self.send_and_assert_no_replies(self.pg0, p_l * 257, "IP lookup loop") class TestIPInput(VppTestCase): - """ IPv4 Input Exceptions """ + """IPv4 Input Exceptions""" @classmethod def setUpClass(cls): @@ -2043,7 +2228,7 @@ class TestIPInput(VppTestCase): i.admin_down() def test_ip_input(self): - """ IP Input Exceptions """ + """IP Input Exceptions""" # i can't find a way in scapy to construct an IP packet # with a length less than the IP header length @@ -2051,103 +2236,95 @@ class TestIPInput(VppTestCase): # # Packet too short - this is forwarded # - p_short = (Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, - dst=self.pg1.remote_ip4, - len=40) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) + p_short = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, len=40) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) rx = self.send_and_expect(self.pg0, p_short * NUM_PKTS, self.pg1) # # Packet too long - this is dropped # - p_long = (Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, - dst=self.pg1.remote_ip4, - len=400) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) + p_long = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, len=400) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) - rx = self.send_and_assert_no_replies(self.pg0, p_long * NUM_PKTS, - "too long") + rx = self.send_and_assert_no_replies(self.pg0, p_long * NUM_PKTS, "too long") # # bad chksum - this is dropped # - p_chksum = (Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, - dst=self.pg1.remote_ip4, - chksum=400) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) + p_chksum = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, chksum=400) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) - rx = self.send_and_assert_no_replies(self.pg0, p_chksum * NUM_PKTS, - "bad checksum") + rx = self.send_and_assert_no_replies( + self.pg0, p_chksum * NUM_PKTS, "bad checksum" + ) # # bad version - this is dropped # - p_ver = (Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, - dst=self.pg1.remote_ip4, - version=3) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) + p_ver = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, version=3) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) - rx = self.send_and_assert_no_replies(self.pg0, p_ver * NUM_PKTS, - "funky version") + rx = self.send_and_assert_no_replies( + self.pg0, p_ver * NUM_PKTS, "funky version" + ) # # fragment offset 1 - this is dropped # - p_frag = (Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, - dst=self.pg1.remote_ip4, - frag=1) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) + p_frag = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, frag=1) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) - rx = self.send_and_assert_no_replies(self.pg0, p_frag * NUM_PKTS, - "frag offset") + rx = self.send_and_assert_no_replies(self.pg0, p_frag * NUM_PKTS, "frag offset") # # TTL expired packet # - p_ttl = (Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, - dst=self.pg1.remote_ip4, - ttl=1) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 100)) + p_ttl = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=1) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) rxs = self.send_and_expect_some(self.pg0, p_ttl * NUM_PKTS, self.pg0) for rx in rxs: icmp = rx[ICMP] self.assertEqual(icmptypes[icmp.type], "time-exceeded") - self.assertEqual(icmpcodes[icmp.type][icmp.code], - "ttl-zero-during-transit") + self.assertEqual(icmpcodes[icmp.type][icmp.code], "ttl-zero-during-transit") self.assertEqual(icmp.src, self.pg0.remote_ip4) self.assertEqual(icmp.dst, self.pg1.remote_ip4) # # MTU exceeded # - p_mtu = (Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, - dst=self.pg1.remote_ip4, - ttl=10, flags='DF') / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 2000)) + p_mtu = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=10, flags="DF") + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 2000) + ) self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1500, 0, 0, 0]) @@ -2156,8 +2333,7 @@ class TestIPInput(VppTestCase): for rx in rxs: icmp = rx[ICMP] self.assertEqual(icmptypes[icmp.type], "dest-unreach") - self.assertEqual(icmpcodes[icmp.type][icmp.code], - "fragmentation-needed") + self.assertEqual(icmpcodes[icmp.type][icmp.code], "fragmentation-needed") self.assertEqual(icmp.nexthopmtu, 1500) self.assertEqual(icmp.src, self.pg0.remote_ip4) self.assertEqual(icmp.dst, self.pg1.remote_ip4) @@ -2171,25 +2347,25 @@ class TestIPInput(VppTestCase): # # source address 0.0.0.0 and 25.255.255.255 and for-us # - p_s0 = (Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - IP(src="0.0.0.0", - dst=self.pg0.local_ip4) / - ICMP(id=4, seq=4) / - Raw(load=b'\x0a' * 18)) + p_s0 = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src="0.0.0.0", dst=self.pg0.local_ip4) + / ICMP(id=4, seq=4) + / Raw(load=b"\x0a" * 18) + ) rx = self.send_and_assert_no_replies(self.pg0, p_s0 * 17) - p_s0 = (Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - IP(src="255.255.255.255", - dst=self.pg0.local_ip4) / - ICMP(id=4, seq=4) / - Raw(load=b'\x0a' * 18)) + p_s0 = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src="255.255.255.255", dst=self.pg0.local_ip4) + / ICMP(id=4, seq=4) + / Raw(load=b"\x0a" * 18) + ) rx = self.send_and_assert_no_replies(self.pg0, p_s0 * 17) class TestIPDirectedBroadcast(VppTestCase): - """ IPv4 Directed Broadcast """ + """IPv4 Directed Broadcast""" @classmethod def setUpClass(cls): @@ -2213,26 +2389,25 @@ class TestIPDirectedBroadcast(VppTestCase): i.admin_down() def test_ip_input(self): - """ IP Directed Broadcast """ + """IP Directed Broadcast""" # # set the directed broadcast on pg0 first, then config IP4 addresses # for pg1 directed broadcast is always disabled - self.vapi.sw_interface_set_ip_directed_broadcast( - self.pg0.sw_if_index, 1) - - p0 = (Ether(src=self.pg1.remote_mac, - dst=self.pg1.local_mac) / - IP(src="1.1.1.1", - dst=self.pg0._local_ip4_bcast) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 2000)) - p1 = (Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - IP(src="1.1.1.1", - dst=self.pg1._local_ip4_bcast) / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 2000)) + self.vapi.sw_interface_set_ip_directed_broadcast(self.pg0.sw_if_index, 1) + + p0 = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src="1.1.1.1", dst=self.pg0._local_ip4_bcast) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 2000) + ) + p1 = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src="1.1.1.1", dst=self.pg1._local_ip4_bcast) + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 2000) + ) self.pg0.config_ip4() self.pg0.resolve_arp() @@ -2245,19 +2420,19 @@ class TestIPDirectedBroadcast(VppTestCase): rx = self.send_and_expect(self.pg1, p0 * NUM_PKTS, self.pg0) self.assertTrue(rx[0][Ether].dst, "ff:ff:ff:ff:ff:ff") - self.send_and_assert_no_replies(self.pg0, p1 * NUM_PKTS, - "directed broadcast disabled") + self.send_and_assert_no_replies( + self.pg0, p1 * NUM_PKTS, "directed broadcast disabled" + ) # # toggle directed broadcast on pg0 # - self.vapi.sw_interface_set_ip_directed_broadcast( - self.pg0.sw_if_index, 0) - self.send_and_assert_no_replies(self.pg1, p0 * NUM_PKTS, - "directed broadcast disabled") + self.vapi.sw_interface_set_ip_directed_broadcast(self.pg0.sw_if_index, 0) + self.send_and_assert_no_replies( + self.pg1, p0 * NUM_PKTS, "directed broadcast disabled" + ) - self.vapi.sw_interface_set_ip_directed_broadcast( - self.pg0.sw_if_index, 1) + self.vapi.sw_interface_set_ip_directed_broadcast(self.pg0.sw_if_index, 1) rx = self.send_and_expect(self.pg1, p0 * NUM_PKTS, self.pg0) self.pg0.unconfig_ip4() @@ -2265,7 +2440,7 @@ class TestIPDirectedBroadcast(VppTestCase): class TestIPLPM(VppTestCase): - """ IPv4 longest Prefix Match """ + """IPv4 longest Prefix Match""" @classmethod def setUpClass(cls): @@ -2292,29 +2467,35 @@ class TestIPLPM(VppTestCase): i.unconfig_ip4() def test_ip_lpm(self): - """ IP longest Prefix Match """ + """IP longest Prefix Match""" - s_24 = VppIpRoute(self, "10.1.2.0", 24, - [VppRoutePath(self.pg1.remote_ip4, - self.pg1.sw_if_index)]) + s_24 = VppIpRoute( + self, + "10.1.2.0", + 24, + [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)], + ) s_24.add_vpp_config() - s_8 = VppIpRoute(self, "10.0.0.0", 8, - [VppRoutePath(self.pg2.remote_ip4, - self.pg2.sw_if_index)]) + s_8 = VppIpRoute( + self, + "10.0.0.0", + 8, + [VppRoutePath(self.pg2.remote_ip4, self.pg2.sw_if_index)], + ) s_8.add_vpp_config() - p_8 = (Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - IP(src="1.1.1.1", - dst="10.1.1.1") / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 2000)) - p_24 = (Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - IP(src="1.1.1.1", - dst="10.1.2.1") / - UDP(sport=1234, dport=1234) / - Raw(b'\xa5' * 2000)) + p_8 = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src="1.1.1.1", dst="10.1.1.1") + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 2000) + ) + p_24 = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src="1.1.1.1", dst="10.1.2.1") + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 2000) + ) self.logger.info(self.vapi.cli("sh ip fib mtrie")) rx = self.send_and_expect(self.pg0, p_8 * NUM_PKTS, self.pg2) @@ -2323,7 +2504,7 @@ class TestIPLPM(VppTestCase): @tag_fixme_vpp_workers class TestIPv4Frag(VppTestCase): - """ IPv4 fragmentation """ + """IPv4 fragmentation""" @classmethod def setUpClass(cls): @@ -2344,25 +2525,29 @@ class TestIPv4Frag(VppTestCase): super(TestIPv4Frag, cls).tearDownClass() def test_frag_large_packets(self): - """ Fragmentation of large packets """ + """Fragmentation of large packets""" self.vapi.cli("adjacency counters enable") - p = (Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) / - IP(src=self.src_if.remote_ip4, dst=self.dst_if.remote_ip4) / - UDP(sport=1234, dport=5678) / Raw()) + p = ( + Ether(dst=self.src_if.local_mac, src=self.src_if.remote_mac) + / IP(src=self.src_if.remote_ip4, dst=self.dst_if.remote_ip4) + / UDP(sport=1234, dport=5678) + / Raw() + ) self.extend_packet(p, 6000, "abcde") saved_payload = p[Raw].load - nbr = VppNeighbor(self, - self.dst_if.sw_if_index, - self.dst_if.remote_mac, - self.dst_if.remote_ip4).add_vpp_config() + nbr = VppNeighbor( + self, + self.dst_if.sw_if_index, + self.dst_if.remote_mac, + self.dst_if.remote_ip4, + ).add_vpp_config() # Force fragmentation by setting MTU of output interface # lower than packet size - self.vapi.sw_interface_set_mtu(self.dst_if.sw_if_index, - [5000, 0, 0, 0]) + self.vapi.sw_interface_set_mtu(self.dst_if.sw_if_index, [5000, 0, 0, 0]) self.pg_enable_capture() self.src_if.add_stream(p) @@ -2373,10 +2558,10 @@ class TestIPv4Frag(VppTestCase): packets = self.dst_if.get_capture(3) # we should show 3 packets thru the neighbor - self.assertEqual(3, nbr.get_stats()['packets']) + self.assertEqual(3, nbr.get_stats()["packets"]) # Assume VPP sends the fragments in order - payload = b'' + payload = b"" for p in packets: payload_offset = p.frag * 8 if payload_offset > 0: @@ -2387,7 +2572,7 @@ class TestIPv4Frag(VppTestCase): class TestIPReplace(VppTestCase): - """ IPv4 Table Replace """ + """IPv4 Table Replace""" @classmethod def setUpClass(cls): @@ -2420,7 +2605,7 @@ class TestIPReplace(VppTestCase): i.unconfig_ip4() def test_replace(self): - """ IP Table Replace """ + """IP Table Replace""" MRouteItfFlags = VppEnum.vl_api_mfib_itf_flags_t MRouteEntryFlags = VppEnum.vl_api_mfib_entry_flags_t @@ -2432,27 +2617,46 @@ class TestIPReplace(VppTestCase): for ii, t in enumerate(self.tables): for jj in range(N_ROUTES): uni = VppIpRoute( - self, "10.0.0.%d" % jj, 32, - [VppRoutePath(links[ii].remote_hosts[0].ip4, - links[ii].sw_if_index), - VppRoutePath(links[ii].remote_hosts[1].ip4, - links[ii].sw_if_index)], - table_id=t.table_id).add_vpp_config() + self, + "10.0.0.%d" % jj, + 32, + [ + VppRoutePath( + links[ii].remote_hosts[0].ip4, links[ii].sw_if_index + ), + VppRoutePath( + links[ii].remote_hosts[1].ip4, links[ii].sw_if_index + ), + ], + table_id=t.table_id, + ).add_vpp_config() multi = VppIpMRoute( - self, "0.0.0.0", - "239.0.0.%d" % jj, 32, + self, + "0.0.0.0", + "239.0.0.%d" % jj, + 32, MRouteEntryFlags.MFIB_API_ENTRY_FLAG_NONE, - [VppMRoutePath(self.pg0.sw_if_index, - MRouteItfFlags.MFIB_API_ITF_FLAG_ACCEPT), - VppMRoutePath(self.pg1.sw_if_index, - MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD), - VppMRoutePath(self.pg2.sw_if_index, - MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD), - VppMRoutePath(self.pg3.sw_if_index, - MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD)], - table_id=t.table_id).add_vpp_config() - routes[ii].append({'uni': uni, - 'multi': multi}) + [ + VppMRoutePath( + self.pg0.sw_if_index, + MRouteItfFlags.MFIB_API_ITF_FLAG_ACCEPT, + ), + VppMRoutePath( + self.pg1.sw_if_index, + MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD, + ), + VppMRoutePath( + self.pg2.sw_if_index, + MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD, + ), + VppMRoutePath( + self.pg3.sw_if_index, + MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD, + ), + ], + table_id=t.table_id, + ).add_vpp_config() + routes[ii].append({"uni": uni, "multi": multi}) # # replace the tables a few times @@ -2467,14 +2671,14 @@ class TestIPReplace(VppTestCase): dump = t.dump() mdump = t.mdump() for r in routes[ii]: - self.assertTrue(find_route_in_dump(dump, r['uni'], t)) - self.assertTrue(find_mroute_in_dump(mdump, r['multi'], t)) + self.assertTrue(find_route_in_dump(dump, r["uni"], t)) + self.assertTrue(find_mroute_in_dump(mdump, r["multi"], t)) # redownload the even numbered routes for ii, t in enumerate(self.tables): for jj in range(0, N_ROUTES, 2): - routes[ii][jj]['uni'].add_vpp_config() - routes[ii][jj]['multi'].add_vpp_config() + routes[ii][jj]["uni"].add_vpp_config() + routes[ii][jj]["multi"].add_vpp_config() # signal each table replace_end for t in self.tables: @@ -2485,29 +2689,29 @@ class TestIPReplace(VppTestCase): dump = t.dump() mdump = t.mdump() for jj in range(0, N_ROUTES, 2): - self.assertTrue(find_route_in_dump( - dump, routes[ii][jj]['uni'], t)) - self.assertTrue(find_mroute_in_dump( - mdump, routes[ii][jj]['multi'], t)) + self.assertTrue(find_route_in_dump(dump, routes[ii][jj]["uni"], t)) + self.assertTrue( + find_mroute_in_dump(mdump, routes[ii][jj]["multi"], t) + ) for jj in range(1, N_ROUTES - 1, 2): - self.assertFalse(find_route_in_dump( - dump, routes[ii][jj]['uni'], t)) - self.assertFalse(find_mroute_in_dump( - mdump, routes[ii][jj]['multi'], t)) + self.assertFalse(find_route_in_dump(dump, routes[ii][jj]["uni"], t)) + self.assertFalse( + find_mroute_in_dump(mdump, routes[ii][jj]["multi"], t) + ) # reload all the routes for ii, t in enumerate(self.tables): for r in routes[ii]: - r['uni'].add_vpp_config() - r['multi'].add_vpp_config() + r["uni"].add_vpp_config() + r["multi"].add_vpp_config() # all the routes are still there for ii, t in enumerate(self.tables): dump = t.dump() mdump = t.mdump() for r in routes[ii]: - self.assertTrue(find_route_in_dump(dump, r['uni'], t)) - self.assertTrue(find_mroute_in_dump(mdump, r['multi'], t)) + self.assertTrue(find_route_in_dump(dump, r["uni"], t)) + self.assertTrue(find_mroute_in_dump(mdump, r["multi"], t)) # # finally flush the tables for good measure @@ -2519,7 +2723,7 @@ class TestIPReplace(VppTestCase): class TestIPCover(VppTestCase): - """ IPv4 Table Cover """ + """IPv4 Table Cover""" @classmethod def setUpClass(cls): @@ -2552,7 +2756,7 @@ class TestIPCover(VppTestCase): i.unconfig_ip4() def test_cover(self): - """ IP Table Cover """ + """IP Table Cover""" # add a loop back with a /32 prefix lo = VppLoInterface(self) @@ -2560,21 +2764,23 @@ class TestIPCover(VppTestCase): a = VppIpInterfaceAddress(self, lo, "127.0.0.1", 32).add_vpp_config() # add a neighbour that matches the loopback's /32 - nbr = VppNeighbor(self, - lo.sw_if_index, - lo.remote_mac, - "127.0.0.1").add_vpp_config() + nbr = VppNeighbor( + self, lo.sw_if_index, lo.remote_mac, "127.0.0.1" + ).add_vpp_config() # add the default route which will be the cover for /32 - r = VppIpRoute(self, "0.0.0.0", 0, - [VppRoutePath("127.0.0.1", - lo.sw_if_index)], - register=False).add_vpp_config() + r = VppIpRoute( + self, + "0.0.0.0", + 0, + [VppRoutePath("127.0.0.1", lo.sw_if_index)], + register=False, + ).add_vpp_config() # add/remove/add a longer mask cover - r8 = VppIpRoute(self, "127.0.0.0", 8, - [VppRoutePath("127.0.0.1", - lo.sw_if_index)]).add_vpp_config() + r8 = VppIpRoute( + self, "127.0.0.0", 8, [VppRoutePath("127.0.0.1", lo.sw_if_index)] + ).add_vpp_config() r8.remove_vpp_config() r8.add_vpp_config() r8.remove_vpp_config() @@ -2587,7 +2793,7 @@ class TestIPCover(VppTestCase): class TestIP4Replace(VppTestCase): - """ IPv4 Interface Address Replace """ + """IPv4 Interface Address Replace""" @classmethod def setUpClass(cls): @@ -2614,7 +2820,7 @@ class TestIP4Replace(VppTestCase): return len(self.vapi.ip_address_dump(intf.sw_if_index)) def test_replace(self): - """ IP interface address replace """ + """IP interface address replace""" intf_pfxs = [[], [], [], []] @@ -2698,8 +2904,7 @@ class TestIP4Replace(VppTestCase): for intf in self.pg_interfaces: # 172.18.x.1/24 addr = "172.18.%d.1" % intf.sw_if_index - pfxs.append(VppIpInterfaceAddress(self, intf, addr, - 24).add_vpp_config()) + pfxs.append(VppIpInterfaceAddress(self, intf, addr, 24).add_vpp_config()) self.vapi.sw_interface_address_replace_end() @@ -2734,8 +2939,7 @@ class TestIP4Replace(VppTestCase): for intf in self.pg_interfaces: # 172.18.x.1/24 addr = "172.18.%d.1" % (intf.sw_if_index + 1) - pfxs.append(VppIpInterfaceAddress(self, intf, - addr, 24).add_vpp_config()) + pfxs.append(VppIpInterfaceAddress(self, intf, addr, 24).add_vpp_config()) self.vapi.sw_interface_address_replace_end() @@ -2748,7 +2952,7 @@ class TestIP4Replace(VppTestCase): class TestIPv4PathMTU(VppTestCase): - """ IPv4 Path MTU """ + """IPv4 Path MTU""" @classmethod def setUpClass(cls): @@ -2767,7 +2971,7 @@ class TestIPv4PathMTU(VppTestCase): super(TestIPv4PathMTU, cls).tearDownClass() def test_path_mtu(self): - """ Path MTU """ + """Path MTU""" # # The goal here is not to test that fragmentation works correctly, @@ -2777,28 +2981,26 @@ class TestIPv4PathMTU(VppTestCase): self.vapi.cli("adjacency counters enable") # set the interface MTU to a reasonable value - self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, - [1800, 0, 0, 0]) + self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1800, 0, 0, 0]) self.pg1.generate_remote_hosts(4) - p_2k = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, - dst=self.pg1.remote_ip4) / - UDP(sport=1234, dport=5678) / - Raw(b'0xa' * 640)) - p_1k = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, - dst=self.pg1.remote_ip4) / - UDP(sport=1234, dport=5678) / - Raw(b'0xa' * 320)) - - nbr = VppNeighbor(self, - self.pg1.sw_if_index, - self.pg1.remote_mac, - self.pg1.remote_ip4).add_vpp_config() + p_2k = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / UDP(sport=1234, dport=5678) + / Raw(b"0xa" * 640) + ) + p_1k = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) + / UDP(sport=1234, dport=5678) + / Raw(b"0xa" * 320) + ) + + nbr = VppNeighbor( + self, self.pg1.sw_if_index, self.pg1.remote_mac, self.pg1.remote_ip4 + ).add_vpp_config() # this is now the interface MTU frags self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=2) @@ -2832,21 +3034,18 @@ class TestIPv4PathMTU(VppTestCase): # raise the interface's MTU # should still use that of the path - self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, - [2000, 0, 0, 0]) + self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2000, 0, 0, 0]) self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3) self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2) # set path high and interface low pmtu.modify(2000) - self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, - [900, 0, 0, 0]) + self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [900, 0, 0, 0]) self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3) self.send_and_expect(self.pg0, [p_1k], self.pg1, n_rx=2) # remove the path MTU using the mark-n-sweep semantics - self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, - [1800, 0, 0, 0]) + self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1800, 0, 0, 0]) self.vapi.ip_path_mtu_replace_begin() self.vapi.ip_path_mtu_replace_end() @@ -2856,27 +3055,27 @@ class TestIPv4PathMTU(VppTestCase): # # set path MTU for a neighbour that doesn't exist, yet # - pmtu2 = VppIpPathMtu(self, - self.pg1.remote_hosts[2].ip4, - 900).add_vpp_config() - - p_2k = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, - dst=self.pg1.remote_hosts[2].ip4) / - UDP(sport=1234, dport=5678) / - Raw(b'0xa' * 640)) - p_1k = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_mac) / - IP(src=self.pg0.remote_ip4, - dst=self.pg1.remote_hosts[2].ip4) / - UDP(sport=1234, dport=5678) / - Raw(b'0xa' * 320)) - - nbr2 = VppNeighbor(self, - self.pg1.sw_if_index, - self.pg1.remote_hosts[2].mac, - self.pg1.remote_hosts[2].ip4).add_vpp_config() + pmtu2 = VppIpPathMtu(self, self.pg1.remote_hosts[2].ip4, 900).add_vpp_config() + + p_2k = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_hosts[2].ip4) + / UDP(sport=1234, dport=5678) + / Raw(b"0xa" * 640) + ) + p_1k = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_hosts[2].ip4) + / UDP(sport=1234, dport=5678) + / Raw(b"0xa" * 320) + ) + + nbr2 = VppNeighbor( + self, + self.pg1.sw_if_index, + self.pg1.remote_hosts[2].mac, + self.pg1.remote_hosts[2].ip4, + ).add_vpp_config() # should frag to the path MTU self.send_and_expect(self.pg0, [p_2k], self.pg1, n_rx=3) @@ -2916,7 +3115,7 @@ class TestIPv4PathMTU(VppTestCase): class TestIPv4ItfRebind(VppTestCase): - """ IPv4 Interface Bind w/ attached routes """ + """IPv4 Interface Bind w/ attached routes""" def setUp(self): super(TestIPv4ItfRebind, self).setUp() @@ -2927,7 +3126,7 @@ class TestIPv4ItfRebind(VppTestCase): super(TestIPv4ItfRebind, self).tearDown() def test_rebind(self): - """ Import to no import """ + """Import to no import""" TABLE_ID = 1 tbl = VppIpTable(self, TABLE_ID).add_vpp_config() @@ -2940,17 +3139,20 @@ class TestIPv4ItfRebind(VppTestCase): # add an attached route via an pg0 # in a different table. this prefix should import - rt = VppIpRoute(self, self.pg0.local_ip4, 24, - [VppRoutePath("0.0.0.0", - self.pg0.sw_if_index)], - table_id=TABLE_ID).add_vpp_config() - - p = (Ether(dst=self.pg1.local_mac, - src=self.pg1.remote_mac) / - IP(src=self.pg1.remote_ip4, - dst=self.pg0.remote_ip4) / - UDP(sport=1234, dport=5678) / - Raw(b'0xa' * 640)) + rt = VppIpRoute( + self, + self.pg0.local_ip4, + 24, + [VppRoutePath("0.0.0.0", self.pg0.sw_if_index)], + table_id=TABLE_ID, + ).add_vpp_config() + + p = ( + Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) + / IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) + / UDP(sport=1234, dport=5678) + / Raw(b"0xa" * 640) + ) rx = self.send_and_expect(self.pg1, [p], self.pg0) self.assertFalse(rx[0].haslayer(ARP)) @@ -2984,7 +3186,7 @@ class TestIPv4ItfRebind(VppTestCase): tbl.remove_vpp_config() def test_delete(self): - """ Swap import tables """ + """Swap import tables""" TABLE_ID1 = 1 tbl1_4 = VppIpTable(self, TABLE_ID1).add_vpp_config() @@ -3006,19 +3208,25 @@ class TestIPv4ItfRebind(VppTestCase): # add an attached route in the default table via pg0 # this should import to table 1 - rt4 = VppIpRoute(self, self.pg1.local_ip4, 24, - [VppRoutePath("0.0.0.0", - self.pg1.sw_if_index)]).add_vpp_config() - rt6 = VppIpRoute(self, self.pg1.local_ip6, 64, - [VppRoutePath("0.0.0.0", - self.pg1.sw_if_index)]).add_vpp_config() - - p1 = (Ether(dst=self.pg0.local_mac, - src=self.pg0.remote_mac) / - IP(src=self.pg1.remote_ip4, - dst=self.pg1.remote_ip4) / - UDP(sport=1234, dport=5678) / - Raw(b'0xa' * 640)) + rt4 = VppIpRoute( + self, + self.pg1.local_ip4, + 24, + [VppRoutePath("0.0.0.0", self.pg1.sw_if_index)], + ).add_vpp_config() + rt6 = VppIpRoute( + self, + self.pg1.local_ip6, + 64, + [VppRoutePath("0.0.0.0", self.pg1.sw_if_index)], + ).add_vpp_config() + + p1 = ( + Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) + / IP(src=self.pg1.remote_ip4, dst=self.pg1.remote_ip4) + / UDP(sport=1234, dport=5678) + / Raw(b"0xa" * 640) + ) # inject into table 0 rx = self.send_and_expect(self.pg0, [p1], self.pg1) @@ -3050,5 +3258,5 @@ class TestIPv4ItfRebind(VppTestCase): i.admin_down() -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner) |