summaryrefslogtreecommitdiffstats
path: root/test/test_ip6_vrf_multi_instance.py
diff options
context:
space:
mode:
authorKlement Sekera <klement.sekera@gmail.com>2022-04-26 19:02:15 +0200
committerOle Tr�an <otroan@employees.org>2022-05-10 18:52:08 +0000
commitd9b0c6fbf7aa5bd9af84264105b39c82028a4a29 (patch)
tree4f786cfd8ebc2443cb11e11b74c8657204068898 /test/test_ip6_vrf_multi_instance.py
parentf90348bcb4afd0af2611cefc43b17ef3042b511c (diff)
tests: replace pycodestyle with black
Drop pycodestyle for code style checking in favor of black. Black is much faster, stable PEP8 compliant code style checker offering also automatic formatting. It aims to be very stable and produce smallest diffs. It's used by many small and big projects. Running checkstyle with black takes a few seconds with a terse output. Thus, test-checkstyle-diff is no longer necessary. Expand scope of checkstyle to all python files in the repo, replacing test-checkstyle with checkstyle-python. Also, fixstyle-python is now available for automatic style formatting. Note: python virtualenv has been consolidated in test/Makefile, test/requirements*.txt which will eventually be moved to a central location. This is required to simply the automated generation of docker executor images in the CI. Type: improvement Change-Id: I022a326603485f58585e879ac0f697fceefbc9c8 Signed-off-by: Klement Sekera <klement.sekera@gmail.com> Signed-off-by: Dave Wallace <dwallacelf@gmail.com>
Diffstat (limited to 'test/test_ip6_vrf_multi_instance.py')
-rw-r--r--test/test_ip6_vrf_multi_instance.py184
1 files changed, 98 insertions, 86 deletions
diff --git a/test/test_ip6_vrf_multi_instance.py b/test/test_ip6_vrf_multi_instance.py
index d95e7927f98..73df30d77f2 100644
--- a/test/test_ip6_vrf_multi_instance.py
+++ b/test/test_ip6_vrf_multi_instance.py
@@ -69,8 +69,14 @@ import socket
from scapy.packet import Raw
from scapy.layers.l2 import Ether
-from scapy.layers.inet6 import UDP, IPv6, ICMPv6ND_NS, ICMPv6ND_RA, \
- RouterAlert, IPv6ExtHdrHopByHop
+from scapy.layers.inet6 import (
+ UDP,
+ IPv6,
+ ICMPv6ND_NS,
+ ICMPv6ND_RA,
+ RouterAlert,
+ IPv6ExtHdrHopByHop,
+)
from scapy.utils6 import in6_ismaddr, in6_isllsnmaddr, in6_getAddrType
from scapy.pton_ntop import inet_ntop
@@ -80,8 +86,8 @@ from vrf import VRFState
def is_ipv6_misc_ext(p):
- """ Is packet one of uninteresting IPv6 broadcasts (extended to filter out
- ICMPv6 Neighbor Discovery - Neighbor Advertisement packets too)? """
+ """Is packet one of uninteresting IPv6 broadcasts (extended to filter out
+ ICMPv6 Neighbor Discovery - Neighbor Advertisement packets too)?"""
if p.haslayer(ICMPv6ND_RA):
if in6_ismaddr(p[IPv6].dst):
return True
@@ -96,7 +102,7 @@ def is_ipv6_misc_ext(p):
class TestIP6VrfMultiInst(VppTestCase):
- """ IP6 VRF Multi-instance Test Case """
+ """IP6 VRF Multi-instance Test Case"""
@classmethod
def setUpClass(cls):
@@ -114,8 +120,7 @@ class TestIP6VrfMultiInst(VppTestCase):
try:
# Create pg interfaces
- cls.create_pg_interfaces(
- range(cls.nr_of_vrfs * cls.pg_ifs_per_vrf))
+ cls.create_pg_interfaces(range(cls.nr_of_vrfs * cls.pg_ifs_per_vrf))
# Packet flows mapping pg0 -> pg1, pg2 etc.
cls.flows = dict()
@@ -124,7 +129,8 @@ class TestIP6VrfMultiInst(VppTestCase):
pg_list = [
cls.pg_interfaces[multiplicand * cls.pg_ifs_per_vrf + j]
for j in range(cls.pg_ifs_per_vrf)
- if (multiplicand * cls.pg_ifs_per_vrf + j) != i]
+ if (multiplicand * cls.pg_ifs_per_vrf + j) != i
+ ]
cls.flows[cls.pg_interfaces[i]] = pg_list
# Packet sizes - jumbo packet (9018 bytes) skipped
@@ -153,7 +159,8 @@ class TestIP6VrfMultiInst(VppTestCase):
set_id = i + 1
pg_list = [
cls.pg_interfaces[i * cls.pg_ifs_per_vrf + j]
- for j in range(cls.pg_ifs_per_vrf)]
+ for j in range(cls.pg_ifs_per_vrf)
+ ]
cls.pg_if_sets[set_id] = pg_list
except Exception:
@@ -185,8 +192,9 @@ class TestIP6VrfMultiInst(VppTestCase):
for i in range(self.pg_ifs_per_vrf):
pg_if = self.pg_if_sets[if_set_id][i]
pg_if.set_table_ip6(vrf_id)
- self.logger.info("pg-interface %s added to IPv6 VRF ID %d"
- % (pg_if.name, vrf_id))
+ self.logger.info(
+ "pg-interface %s added to IPv6 VRF ID %d" % (pg_if.name, vrf_id)
+ )
if pg_if not in self.pg_in_vrf:
self.pg_in_vrf.append(pg_if)
if pg_if in self.pg_not_in_vrf:
@@ -206,8 +214,9 @@ class TestIP6VrfMultiInst(VppTestCase):
"""
for i in range(count):
vrf_id = i + start
- self.vapi.ip_table_add_del(is_add=1,
- table={'table_id': vrf_id, 'is_ip6': 1})
+ self.vapi.ip_table_add_del(
+ is_add=1, table={"table_id": vrf_id, "is_ip6": 1}
+ )
self.logger.info("IPv6 VRF ID %d created" % vrf_id)
if vrf_id not in self.vrf_list:
self.vrf_list.append(vrf_id)
@@ -217,8 +226,7 @@ class TestIP6VrfMultiInst(VppTestCase):
self.logger.debug(self.vapi.ppcli("show ip6 fib"))
self.logger.debug(self.vapi.ppcli("show ip6 neighbors"))
- def create_vrf_by_id_and_assign_interfaces(self, set_id,
- vrf_id=0xffffffff):
+ def create_vrf_by_id_and_assign_interfaces(self, set_id, vrf_id=0xFFFFFFFF):
"""
Create a FIB table / VRF by vrf_id, put 3 pg-ip6 interfaces
to FIB table / VRF.
@@ -226,8 +234,7 @@ class TestIP6VrfMultiInst(VppTestCase):
:param int vrf_id: Required table ID / VRF ID. \
(Default value = 0xffffffff, ID will be selected automatically)
"""
- ret = self.vapi.ip_table_allocate(table={'table_id': vrf_id,
- 'is_ip6': 1})
+ ret = self.vapi.ip_table_allocate(table={"table_id": vrf_id, "is_ip6": 1})
vrf_id = ret.table.table_id
self.logger.info("IPv6 VRF ID %d created" % vrf_id)
if vrf_id not in self.vrf_list:
@@ -248,7 +255,7 @@ class TestIP6VrfMultiInst(VppTestCase):
"""
if if_set_id is None:
if_set_id = vrf_id
- self.vapi.ip_table_flush(table={'table_id': vrf_id, 'is_ip6': 1})
+ self.vapi.ip_table_flush(table={"table_id": vrf_id, "is_ip6": 1})
if vrf_id in self.vrf_list:
self.vrf_list.remove(vrf_id)
if vrf_id not in self.vrf_reset_list:
@@ -270,8 +277,7 @@ class TestIP6VrfMultiInst(VppTestCase):
self.vrf_list.remove(vrf_id)
if vrf_id in self.vrf_reset_list:
self.vrf_reset_list.remove(vrf_id)
- self.vapi.ip_table_add_del(is_add=0,
- table={'table_id': vrf_id, 'is_ip6': 1})
+ self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id, "is_ip6": 1})
def create_stream(self, src_if, packet_sizes):
"""
@@ -288,16 +294,20 @@ class TestIP6VrfMultiInst(VppTestCase):
src_host = random.choice(src_hosts)
pkt_info = self.create_packet_info(src_if, dst_if)
payload = self.info_to_payload(pkt_info)
- p = (Ether(dst=src_if.local_mac, src=src_host.mac) /
- IPv6(src=src_host.ip6, dst=dst_host.ip6) /
- UDP(sport=1234, dport=1234) /
- Raw(payload))
+ p = (
+ Ether(dst=src_if.local_mac, src=src_host.mac)
+ / IPv6(src=src_host.ip6, dst=dst_host.ip6)
+ / UDP(sport=1234, dport=1234)
+ / Raw(payload)
+ )
pkt_info.data = p.copy()
size = random.choice(packet_sizes)
self.extend_packet(p, size)
pkts.append(p)
- self.logger.debug("Input stream created for port %s. Length: %u pkt(s)"
- % (src_if.name, len(pkts)))
+ self.logger.debug(
+ "Input stream created for port %s. Length: %u pkt(s)"
+ % (src_if.name, len(pkts))
+ )
return pkts
def create_stream_crosswise_vrf(self, src_if, vrf_id, packet_sizes):
@@ -320,16 +330,20 @@ class TestIP6VrfMultiInst(VppTestCase):
src_host = random.choice(src_hosts)
pkt_info = self.create_packet_info(src_if, dst_if)
payload = self.info_to_payload(pkt_info)
- p = (Ether(dst=src_if.local_mac, src=src_host.mac) /
- IPv6(src=src_host.ip6, dst=dst_host.ip6) /
- UDP(sport=1234, dport=1234) /
- Raw(payload))
+ p = (
+ Ether(dst=src_if.local_mac, src=src_host.mac)
+ / IPv6(src=src_host.ip6, dst=dst_host.ip6)
+ / UDP(sport=1234, dport=1234)
+ / Raw(payload)
+ )
pkt_info.data = p.copy()
size = random.choice(packet_sizes)
self.extend_packet(p, size)
pkts.append(p)
- self.logger.debug("Input stream created for port %s. Length: %u pkt(s)"
- % (src_if.name, len(pkts)))
+ self.logger.debug(
+ "Input stream created for port %s. Length: %u pkt(s)"
+ % (src_if.name, len(pkts))
+ )
return pkts
def verify_capture(self, pg_if, capture):
@@ -350,11 +364,13 @@ class TestIP6VrfMultiInst(VppTestCase):
payload_info = self.payload_to_info(packet[Raw])
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_sw_if_index)
- self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
- (pg_if.name, payload_info.src, packet_index))
+ self.logger.debug(
+ "Got packet on port %s: src=%u (id=%u)"
+ % (pg_if.name, payload_info.src, packet_index)
+ )
next_info = self.get_next_packet_info_for_interface2(
- payload_info.src, dst_sw_if_index,
- last_info[payload_info.src])
+ payload_info.src, dst_sw_if_index, last_info[payload_info.src]
+ )
last_info[payload_info.src] = next_info
self.assertIsNotNone(next_info)
self.assertEqual(packet_index, next_info.index)
@@ -369,11 +385,13 @@ class TestIP6VrfMultiInst(VppTestCase):
raise
for i in self.pg_interfaces:
remaining_packet = self.get_next_packet_info_for_interface2(
- i, dst_sw_if_index, last_info[i.sw_if_index])
+ i, dst_sw_if_index, last_info[i.sw_if_index]
+ )
self.assertIsNone(
remaining_packet,
- "Port %u: Packet expected from source %u didn't arrive" %
- (dst_sw_if_index, i.sw_if_index))
+ "Port %u: Packet expected from source %u didn't arrive"
+ % (dst_sw_if_index, i.sw_if_index),
+ )
def verify_vrf(self, vrf_id, if_set_id=None):
"""
@@ -437,8 +455,9 @@ class TestIP6VrfMultiInst(VppTestCase):
capture = pg_if.get_capture(remark="interface is in VRF")
self.verify_capture(pg_if, capture)
elif pg_if in self.pg_not_in_vrf:
- pg_if.assert_nothing_captured(remark="interface is not in VRF",
- filter_out_fn=is_ipv6_misc_ext)
+ pg_if.assert_nothing_captured(
+ remark="interface is not in VRF", filter_out_fn=is_ipv6_misc_ext
+ )
self.logger.debug("No capture for interface %s" % pg_if.name)
else:
raise Exception("Unknown interface: %s" % pg_if.name)
@@ -458,7 +477,8 @@ class TestIP6VrfMultiInst(VppTestCase):
for vrf_id in self.vrf_list:
for pg_if in self.pg_if_sets[vrf_id]:
pkts = self.create_stream_crosswise_vrf(
- pg_if, vrf_id, self.pg_if_packet_sizes)
+ pg_if, vrf_id, self.pg_if_packet_sizes
+ )
pg_if.add_stream(pkts)
# Enable packet capture and start packet sending
@@ -468,29 +488,27 @@ class TestIP6VrfMultiInst(VppTestCase):
# Verify
# Verify outgoing packet streams per packet-generator interface
for pg_if in self.pg_interfaces:
- pg_if.assert_nothing_captured(remark="interface is in other VRF",
- filter_out_fn=is_ipv6_misc_ext)
+ pg_if.assert_nothing_captured(
+ remark="interface is in other VRF", filter_out_fn=is_ipv6_misc_ext
+ )
self.logger.debug("No capture for interface %s" % pg_if.name)
def test_ip6_vrf_01(self):
- """ IP6 VRF Multi-instance test 1 - create 4 VRFs
- """
+ """IP6 VRF Multi-instance test 1 - create 4 VRFs"""
# Config 1
# Create 4 VRFs
self.create_vrf_and_assign_interfaces(4)
# Verify 1
for vrf_id in self.vrf_list:
- self.assert_equal(self.verify_vrf(vrf_id),
- VRFState.configured, VRFState)
+ self.assert_equal(self.verify_vrf(vrf_id), VRFState.configured, VRFState)
# Test 1
self.run_verify_test()
self.run_crosswise_vrf_test()
def test_ip6_vrf_02(self):
- """ IP6 VRF Multi-instance test 2 - reset 2 VRFs
- """
+ """IP6 VRF Multi-instance test 2 - reset 2 VRFs"""
# Config 2
# Delete 2 VRFs
self.reset_vrf_and_remove_from_vrf_list(1)
@@ -498,11 +516,9 @@ class TestIP6VrfMultiInst(VppTestCase):
# Verify 2
for vrf_id in self.vrf_reset_list:
- self.assert_equal(self.verify_vrf(vrf_id),
- VRFState.reset, VRFState)
+ self.assert_equal(self.verify_vrf(vrf_id), VRFState.reset, VRFState)
for vrf_id in self.vrf_list:
- self.assert_equal(self.verify_vrf(vrf_id),
- VRFState.configured, VRFState)
+ self.assert_equal(self.verify_vrf(vrf_id), VRFState.configured, VRFState)
# Test 2
self.run_verify_test()
@@ -513,8 +529,7 @@ class TestIP6VrfMultiInst(VppTestCase):
# self.reset_vrf_and_remove_from_vrf_list(vrf_id)
def test_ip6_vrf_03(self):
- """ IP6 VRF Multi-instance 3 - add 2 VRFs
- """
+ """IP6 VRF Multi-instance 3 - add 2 VRFs"""
# Config 3
# Add 1 of reset VRFs and 1 new VRF
self.create_vrf_and_assign_interfaces(1)
@@ -522,11 +537,9 @@ class TestIP6VrfMultiInst(VppTestCase):
# Verify 3
for vrf_id in self.vrf_reset_list:
- self.assert_equal(self.verify_vrf(vrf_id),
- VRFState.reset, VRFState)
+ self.assert_equal(self.verify_vrf(vrf_id), VRFState.reset, VRFState)
for vrf_id in self.vrf_list:
- self.assert_equal(self.verify_vrf(vrf_id),
- VRFState.configured, VRFState)
+ self.assert_equal(self.verify_vrf(vrf_id), VRFState.configured, VRFState)
# Test 3
self.run_verify_test()
@@ -537,8 +550,7 @@ class TestIP6VrfMultiInst(VppTestCase):
# self.reset_vrf_and_remove_from_vrf_list(vrf_id)
def test_ip6_vrf_04(self):
- """ IP6 VRF Multi-instance test 4 - reset 4 VRFs
- """
+ """IP6 VRF Multi-instance test 4 - reset 4 VRFs"""
# Config 4
# Reset all VRFs (i.e. no VRF except VRF=0 configured)
for i in range(len(self.vrf_list)):
@@ -547,20 +559,20 @@ class TestIP6VrfMultiInst(VppTestCase):
# Verify 4
for vrf_id in self.vrf_reset_list:
- self.assert_equal(self.verify_vrf(vrf_id),
- VRFState.reset, VRFState)
+ self.assert_equal(self.verify_vrf(vrf_id), VRFState.reset, VRFState)
vrf_list_length = len(self.vrf_list)
self.assertEqual(
- vrf_list_length, 0,
- "List of configured VRFs is not empty: %s != 0" % vrf_list_length)
+ vrf_list_length,
+ 0,
+ "List of configured VRFs is not empty: %s != 0" % vrf_list_length,
+ )
# Test 4
self.run_verify_test()
self.run_crosswise_vrf_test()
def test_ip6_vrf_05(self):
- """ IP6 VRF Multi-instance test 5 - auto allocate vrf id
- """
+ """IP6 VRF Multi-instance test 5 - auto allocate vrf id"""
# Config 5
# Create several VRFs
# Set vrf_id manually first
@@ -571,11 +583,11 @@ class TestIP6VrfMultiInst(VppTestCase):
]
# Verify 5
- self.assert_equal(self.verify_vrf(10, 1), VRFState.configured,
- VRFState)
+ self.assert_equal(self.verify_vrf(10, 1), VRFState.configured, VRFState)
for i, vrf in enumerate(auto_vrf_id):
- self.assert_equal(self.verify_vrf(vrf, i+2),
- VRFState.configured, VRFState)
+ self.assert_equal(
+ self.verify_vrf(vrf, i + 2), VRFState.configured, VRFState
+ )
# Test 5
self.run_verify_test()
@@ -584,18 +596,19 @@ class TestIP6VrfMultiInst(VppTestCase):
# Reset VRFs
self.reset_vrf_and_remove_from_vrf_list(10, 1)
for i, vrf in enumerate(auto_vrf_id):
- self.reset_vrf_and_remove_from_vrf_list(vrf, i+2)
+ self.reset_vrf_and_remove_from_vrf_list(vrf, i + 2)
# Verify 5.1
self.assert_equal(self.verify_vrf(10, 1), VRFState.reset, VRFState)
for i, vrf in enumerate(auto_vrf_id):
- self.assert_equal(self.verify_vrf(vrf, i+2),
- VRFState.reset, VRFState)
+ self.assert_equal(self.verify_vrf(vrf, i + 2), VRFState.reset, VRFState)
vrf_list_length = len(self.vrf_list)
self.assertEqual(
- vrf_list_length, 0,
- "List of configured VRFs is not empty: %s != 0" % vrf_list_length)
+ vrf_list_length,
+ 0,
+ "List of configured VRFs is not empty: %s != 0" % vrf_list_length,
+ )
# Cleanup our extra created VRFs
for vrf in auto_vrf_id:
@@ -604,14 +617,12 @@ class TestIP6VrfMultiInst(VppTestCase):
self.delete_vrf(10)
def test_ip6_vrf_06(self):
- """ IP6 VRF Multi-instance test 6 - recreate 4 VRFs
- """
+ """IP6 VRF Multi-instance test 6 - recreate 4 VRFs"""
# Reconfigure all the VRFs
self.create_vrf_and_assign_interfaces(4)
# Verify
for vrf_id in self.vrf_list:
- self.assert_equal(self.verify_vrf(vrf_id),
- VRFState.configured, VRFState)
+ self.assert_equal(self.verify_vrf(vrf_id), VRFState.configured, VRFState)
# Test
self.run_verify_test()
self.run_crosswise_vrf_test()
@@ -620,16 +631,17 @@ class TestIP6VrfMultiInst(VppTestCase):
self.reset_vrf_and_remove_from_vrf_list(self.vrf_list[0])
# Verify
for vrf_id in self.vrf_reset_list:
- self.assert_equal(self.verify_vrf(vrf_id),
- VRFState.reset, VRFState)
+ self.assert_equal(self.verify_vrf(vrf_id), VRFState.reset, VRFState)
vrf_list_length = len(self.vrf_list)
self.assertEqual(
- vrf_list_length, 0,
- "List of configured VRFs is not empty: %s != 0" % vrf_list_length)
+ vrf_list_length,
+ 0,
+ "List of configured VRFs is not empty: %s != 0" % vrf_list_length,
+ )
# Test
self.run_verify_test()
self.run_crosswise_vrf_test()
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)