summaryrefslogtreecommitdiffstats
path: root/test/test_ip6_vrf_multi_instance.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_ip6_vrf_multi_instance.py')
-rw-r--r--test/test_ip6_vrf_multi_instance.py184
1 files changed, 98 insertions, 86 deletions
diff --git a/test/test_ip6_vrf_multi_instance.py b/test/test_ip6_vrf_multi_instance.py
index d95e7927f98..73df30d77f2 100644
--- a/test/test_ip6_vrf_multi_instance.py
+++ b/test/test_ip6_vrf_multi_instance.py
@@ -69,8 +69,14 @@ import socket
from scapy.packet import Raw
from scapy.layers.l2 import Ether
-from scapy.layers.inet6 import UDP, IPv6, ICMPv6ND_NS, ICMPv6ND_RA, \
- RouterAlert, IPv6ExtHdrHopByHop
+from scapy.layers.inet6 import (
+ UDP,
+ IPv6,
+ ICMPv6ND_NS,
+ ICMPv6ND_RA,
+ RouterAlert,
+ IPv6ExtHdrHopByHop,
+)
from scapy.utils6 import in6_ismaddr, in6_isllsnmaddr, in6_getAddrType
from scapy.pton_ntop import inet_ntop
@@ -80,8 +86,8 @@ from vrf import VRFState
def is_ipv6_misc_ext(p):
- """ Is packet one of uninteresting IPv6 broadcasts (extended to filter out
- ICMPv6 Neighbor Discovery - Neighbor Advertisement packets too)? """
+ """Is packet one of uninteresting IPv6 broadcasts (extended to filter out
+ ICMPv6 Neighbor Discovery - Neighbor Advertisement packets too)?"""
if p.haslayer(ICMPv6ND_RA):
if in6_ismaddr(p[IPv6].dst):
return True
@@ -96,7 +102,7 @@ def is_ipv6_misc_ext(p):
class TestIP6VrfMultiInst(VppTestCase):
- """ IP6 VRF Multi-instance Test Case """
+ """IP6 VRF Multi-instance Test Case"""
@classmethod
def setUpClass(cls):
@@ -114,8 +120,7 @@ class TestIP6VrfMultiInst(VppTestCase):
try:
# Create pg interfaces
- cls.create_pg_interfaces(
- range(cls.nr_of_vrfs * cls.pg_ifs_per_vrf))
+ cls.create_pg_interfaces(range(cls.nr_of_vrfs * cls.pg_ifs_per_vrf))
# Packet flows mapping pg0 -> pg1, pg2 etc.
cls.flows = dict()
@@ -124,7 +129,8 @@ class TestIP6VrfMultiInst(VppTestCase):
pg_list = [
cls.pg_interfaces[multiplicand * cls.pg_ifs_per_vrf + j]
for j in range(cls.pg_ifs_per_vrf)
- if (multiplicand * cls.pg_ifs_per_vrf + j) != i]
+ if (multiplicand * cls.pg_ifs_per_vrf + j) != i
+ ]
cls.flows[cls.pg_interfaces[i]] = pg_list
# Packet sizes - jumbo packet (9018 bytes) skipped
@@ -153,7 +159,8 @@ class TestIP6VrfMultiInst(VppTestCase):
set_id = i + 1
pg_list = [
cls.pg_interfaces[i * cls.pg_ifs_per_vrf + j]
- for j in range(cls.pg_ifs_per_vrf)]
+ for j in range(cls.pg_ifs_per_vrf)
+ ]
cls.pg_if_sets[set_id] = pg_list
except Exception:
@@ -185,8 +192,9 @@ class TestIP6VrfMultiInst(VppTestCase):
for i in range(self.pg_ifs_per_vrf):
pg_if = self.pg_if_sets[if_set_id][i]
pg_if.set_table_ip6(vrf_id)
- self.logger.info("pg-interface %s added to IPv6 VRF ID %d"
- % (pg_if.name, vrf_id))
+ self.logger.info(
+ "pg-interface %s added to IPv6 VRF ID %d" % (pg_if.name, vrf_id)
+ )
if pg_if not in self.pg_in_vrf:
self.pg_in_vrf.append(pg_if)
if pg_if in self.pg_not_in_vrf:
@@ -206,8 +214,9 @@ class TestIP6VrfMultiInst(VppTestCase):
"""
for i in range(count):
vrf_id = i + start
- self.vapi.ip_table_add_del(is_add=1,
- table={'table_id': vrf_id, 'is_ip6': 1})
+ self.vapi.ip_table_add_del(
+ is_add=1, table={"table_id": vrf_id, "is_ip6": 1}
+ )
self.logger.info("IPv6 VRF ID %d created" % vrf_id)
if vrf_id not in self.vrf_list:
self.vrf_list.append(vrf_id)
@@ -217,8 +226,7 @@ class TestIP6VrfMultiInst(VppTestCase):
self.logger.debug(self.vapi.ppcli("show ip6 fib"))
self.logger.debug(self.vapi.ppcli("show ip6 neighbors"))
- def create_vrf_by_id_and_assign_interfaces(self, set_id,
- vrf_id=0xffffffff):
+ def create_vrf_by_id_and_assign_interfaces(self, set_id, vrf_id=0xFFFFFFFF):
"""
Create a FIB table / VRF by vrf_id, put 3 pg-ip6 interfaces
to FIB table / VRF.
@@ -226,8 +234,7 @@ class TestIP6VrfMultiInst(VppTestCase):
:param int vrf_id: Required table ID / VRF ID. \
(Default value = 0xffffffff, ID will be selected automatically)
"""
- ret = self.vapi.ip_table_allocate(table={'table_id': vrf_id,
- 'is_ip6': 1})
+ ret = self.vapi.ip_table_allocate(table={"table_id": vrf_id, "is_ip6": 1})
vrf_id = ret.table.table_id
self.logger.info("IPv6 VRF ID %d created" % vrf_id)
if vrf_id not in self.vrf_list:
@@ -248,7 +255,7 @@ class TestIP6VrfMultiInst(VppTestCase):
"""
if if_set_id is None:
if_set_id = vrf_id
- self.vapi.ip_table_flush(table={'table_id': vrf_id, 'is_ip6': 1})
+ self.vapi.ip_table_flush(table={"table_id": vrf_id, "is_ip6": 1})
if vrf_id in self.vrf_list:
self.vrf_list.remove(vrf_id)
if vrf_id not in self.vrf_reset_list:
@@ -270,8 +277,7 @@ class TestIP6VrfMultiInst(VppTestCase):
self.vrf_list.remove(vrf_id)
if vrf_id in self.vrf_reset_list:
self.vrf_reset_list.remove(vrf_id)
- self.vapi.ip_table_add_del(is_add=0,
- table={'table_id': vrf_id, 'is_ip6': 1})
+ self.vapi.ip_table_add_del(is_add=0, table={"table_id": vrf_id, "is_ip6": 1})
def create_stream(self, src_if, packet_sizes):
"""
@@ -288,16 +294,20 @@ class TestIP6VrfMultiInst(VppTestCase):
src_host = random.choice(src_hosts)
pkt_info = self.create_packet_info(src_if, dst_if)
payload = self.info_to_payload(pkt_info)
- p = (Ether(dst=src_if.local_mac, src=src_host.mac) /
- IPv6(src=src_host.ip6, dst=dst_host.ip6) /
- UDP(sport=1234, dport=1234) /
- Raw(payload))
+ p = (
+ Ether(dst=src_if.local_mac, src=src_host.mac)
+ / IPv6(src=src_host.ip6, dst=dst_host.ip6)
+ / UDP(sport=1234, dport=1234)
+ / Raw(payload)
+ )
pkt_info.data = p.copy()
size = random.choice(packet_sizes)
self.extend_packet(p, size)
pkts.append(p)
- self.logger.debug("Input stream created for port %s. Length: %u pkt(s)"
- % (src_if.name, len(pkts)))
+ self.logger.debug(
+ "Input stream created for port %s. Length: %u pkt(s)"
+ % (src_if.name, len(pkts))
+ )
return pkts
def create_stream_crosswise_vrf(self, src_if, vrf_id, packet_sizes):
@@ -320,16 +330,20 @@ class TestIP6VrfMultiInst(VppTestCase):
src_host = random.choice(src_hosts)
pkt_info = self.create_packet_info(src_if, dst_if)
payload = self.info_to_payload(pkt_info)
- p = (Ether(dst=src_if.local_mac, src=src_host.mac) /
- IPv6(src=src_host.ip6, dst=dst_host.ip6) /
- UDP(sport=1234, dport=1234) /
- Raw(payload))
+ p = (
+ Ether(dst=src_if.local_mac, src=src_host.mac)
+ / IPv6(src=src_host.ip6, dst=dst_host.ip6)
+ / UDP(sport=1234, dport=1234)
+ / Raw(payload)
+ )
pkt_info.data = p.copy()
size = random.choice(packet_sizes)
self.extend_packet(p, size)
pkts.append(p)
- self.logger.debug("Input stream created for port %s. Length: %u pkt(s)"
- % (src_if.name, len(pkts)))
+ self.logger.debug(
+ "Input stream created for port %s. Length: %u pkt(s)"
+ % (src_if.name, len(pkts))
+ )
return pkts
def verify_capture(self, pg_if, capture):
@@ -350,11 +364,13 @@ class TestIP6VrfMultiInst(VppTestCase):
payload_info = self.payload_to_info(packet[Raw])
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_sw_if_index)
- self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
- (pg_if.name, payload_info.src, packet_index))
+ self.logger.debug(
+ "Got packet on port %s: src=%u (id=%u)"
+ % (pg_if.name, payload_info.src, packet_index)
+ )
next_info = self.get_next_packet_info_for_interface2(
- payload_info.src, dst_sw_if_index,
- last_info[payload_info.src])
+ payload_info.src, dst_sw_if_index, last_info[payload_info.src]
+ )
last_info[payload_info.src] = next_info
self.assertIsNotNone(next_info)
self.assertEqual(packet_index, next_info.index)
@@ -369,11 +385,13 @@ class TestIP6VrfMultiInst(VppTestCase):
raise
for i in self.pg_interfaces:
remaining_packet = self.get_next_packet_info_for_interface2(
- i, dst_sw_if_index, last_info[i.sw_if_index])
+ i, dst_sw_if_index, last_info[i.sw_if_index]
+ )
self.assertIsNone(
remaining_packet,
- "Port %u: Packet expected from source %u didn't arrive" %
- (dst_sw_if_index, i.sw_if_index))
+ "Port %u: Packet expected from source %u didn't arrive"
+ % (dst_sw_if_index, i.sw_if_index),
+ )
def verify_vrf(self, vrf_id, if_set_id=None):
"""
@@ -437,8 +455,9 @@ class TestIP6VrfMultiInst(VppTestCase):
capture = pg_if.get_capture(remark="interface is in VRF")
self.verify_capture(pg_if, capture)
elif pg_if in self.pg_not_in_vrf:
- pg_if.assert_nothing_captured(remark="interface is not in VRF",
- filter_out_fn=is_ipv6_misc_ext)
+ pg_if.assert_nothing_captured(
+ remark="interface is not in VRF", filter_out_fn=is_ipv6_misc_ext
+ )
self.logger.debug("No capture for interface %s" % pg_if.name)
else:
raise Exception("Unknown interface: %s" % pg_if.name)
@@ -458,7 +477,8 @@ class TestIP6VrfMultiInst(VppTestCase):
for vrf_id in self.vrf_list:
for pg_if in self.pg_if_sets[vrf_id]:
pkts = self.create_stream_crosswise_vrf(
- pg_if, vrf_id, self.pg_if_packet_sizes)
+ pg_if, vrf_id, self.pg_if_packet_sizes
+ )
pg_if.add_stream(pkts)
# Enable packet capture and start packet sending
@@ -468,29 +488,27 @@ class TestIP6VrfMultiInst(VppTestCase):
# Verify
# Verify outgoing packet streams per packet-generator interface
for pg_if in self.pg_interfaces:
- pg_if.assert_nothing_captured(remark="interface is in other VRF",
- filter_out_fn=is_ipv6_misc_ext)
+ pg_if.assert_nothing_captured(
+ remark="interface is in other VRF", filter_out_fn=is_ipv6_misc_ext
+ )
self.logger.debug("No capture for interface %s" % pg_if.name)
def test_ip6_vrf_01(self):
- """ IP6 VRF Multi-instance test 1 - create 4 VRFs
- """
+ """IP6 VRF Multi-instance test 1 - create 4 VRFs"""
# Config 1
# Create 4 VRFs
self.create_vrf_and_assign_interfaces(4)
# Verify 1
for vrf_id in self.vrf_list:
- self.assert_equal(self.verify_vrf(vrf_id),
- VRFState.configured, VRFState)
+ self.assert_equal(self.verify_vrf(vrf_id), VRFState.configured, VRFState)
# Test 1
self.run_verify_test()
self.run_crosswise_vrf_test()
def test_ip6_vrf_02(self):
- """ IP6 VRF Multi-instance test 2 - reset 2 VRFs
- """
+ """IP6 VRF Multi-instance test 2 - reset 2 VRFs"""
# Config 2
# Delete 2 VRFs
self.reset_vrf_and_remove_from_vrf_list(1)
@@ -498,11 +516,9 @@ class TestIP6VrfMultiInst(VppTestCase):
# Verify 2
for vrf_id in self.vrf_reset_list:
- self.assert_equal(self.verify_vrf(vrf_id),
- VRFState.reset, VRFState)
+ self.assert_equal(self.verify_vrf(vrf_id), VRFState.reset, VRFState)
for vrf_id in self.vrf_list:
- self.assert_equal(self.verify_vrf(vrf_id),
- VRFState.configured, VRFState)
+ self.assert_equal(self.verify_vrf(vrf_id), VRFState.configured, VRFState)
# Test 2
self.run_verify_test()
@@ -513,8 +529,7 @@ class TestIP6VrfMultiInst(VppTestCase):
# self.reset_vrf_and_remove_from_vrf_list(vrf_id)
def test_ip6_vrf_03(self):
- """ IP6 VRF Multi-instance 3 - add 2 VRFs
- """
+ """IP6 VRF Multi-instance 3 - add 2 VRFs"""
# Config 3
# Add 1 of reset VRFs and 1 new VRF
self.create_vrf_and_assign_interfaces(1)
@@ -522,11 +537,9 @@ class TestIP6VrfMultiInst(VppTestCase):
# Verify 3
for vrf_id in self.vrf_reset_list:
- self.assert_equal(self.verify_vrf(vrf_id),
- VRFState.reset, VRFState)
+ self.assert_equal(self.verify_vrf(vrf_id), VRFState.reset, VRFState)
for vrf_id in self.vrf_list:
- self.assert_equal(self.verify_vrf(vrf_id),
- VRFState.configured, VRFState)
+ self.assert_equal(self.verify_vrf(vrf_id), VRFState.configured, VRFState)
# Test 3
self.run_verify_test()
@@ -537,8 +550,7 @@ class TestIP6VrfMultiInst(VppTestCase):
# self.reset_vrf_and_remove_from_vrf_list(vrf_id)
def test_ip6_vrf_04(self):
- """ IP6 VRF Multi-instance test 4 - reset 4 VRFs
- """
+ """IP6 VRF Multi-instance test 4 - reset 4 VRFs"""
# Config 4
# Reset all VRFs (i.e. no VRF except VRF=0 configured)
for i in range(len(self.vrf_list)):
@@ -547,20 +559,20 @@ class TestIP6VrfMultiInst(VppTestCase):
# Verify 4
for vrf_id in self.vrf_reset_list:
- self.assert_equal(self.verify_vrf(vrf_id),
- VRFState.reset, VRFState)
+ self.assert_equal(self.verify_vrf(vrf_id), VRFState.reset, VRFState)
vrf_list_length = len(self.vrf_list)
self.assertEqual(
- vrf_list_length, 0,
- "List of configured VRFs is not empty: %s != 0" % vrf_list_length)
+ vrf_list_length,
+ 0,
+ "List of configured VRFs is not empty: %s != 0" % vrf_list_length,
+ )
# Test 4
self.run_verify_test()
self.run_crosswise_vrf_test()
def test_ip6_vrf_05(self):
- """ IP6 VRF Multi-instance test 5 - auto allocate vrf id
- """
+ """IP6 VRF Multi-instance test 5 - auto allocate vrf id"""
# Config 5
# Create several VRFs
# Set vrf_id manually first
@@ -571,11 +583,11 @@ class TestIP6VrfMultiInst(VppTestCase):
]
# Verify 5
- self.assert_equal(self.verify_vrf(10, 1), VRFState.configured,
- VRFState)
+ self.assert_equal(self.verify_vrf(10, 1), VRFState.configured, VRFState)
for i, vrf in enumerate(auto_vrf_id):
- self.assert_equal(self.verify_vrf(vrf, i+2),
- VRFState.configured, VRFState)
+ self.assert_equal(
+ self.verify_vrf(vrf, i + 2), VRFState.configured, VRFState
+ )
# Test 5
self.run_verify_test()
@@ -584,18 +596,19 @@ class TestIP6VrfMultiInst(VppTestCase):
# Reset VRFs
self.reset_vrf_and_remove_from_vrf_list(10, 1)
for i, vrf in enumerate(auto_vrf_id):
- self.reset_vrf_and_remove_from_vrf_list(vrf, i+2)
+ self.reset_vrf_and_remove_from_vrf_list(vrf, i + 2)
# Verify 5.1
self.assert_equal(self.verify_vrf(10, 1), VRFState.reset, VRFState)
for i, vrf in enumerate(auto_vrf_id):
- self.assert_equal(self.verify_vrf(vrf, i+2),
- VRFState.reset, VRFState)
+ self.assert_equal(self.verify_vrf(vrf, i + 2), VRFState.reset, VRFState)
vrf_list_length = len(self.vrf_list)
self.assertEqual(
- vrf_list_length, 0,
- "List of configured VRFs is not empty: %s != 0" % vrf_list_length)
+ vrf_list_length,
+ 0,
+ "List of configured VRFs is not empty: %s != 0" % vrf_list_length,
+ )
# Cleanup our extra created VRFs
for vrf in auto_vrf_id:
@@ -604,14 +617,12 @@ class TestIP6VrfMultiInst(VppTestCase):
self.delete_vrf(10)
def test_ip6_vrf_06(self):
- """ IP6 VRF Multi-instance test 6 - recreate 4 VRFs
- """
+ """IP6 VRF Multi-instance test 6 - recreate 4 VRFs"""
# Reconfigure all the VRFs
self.create_vrf_and_assign_interfaces(4)
# Verify
for vrf_id in self.vrf_list:
- self.assert_equal(self.verify_vrf(vrf_id),
- VRFState.configured, VRFState)
+ self.assert_equal(self.verify_vrf(vrf_id), VRFState.configured, VRFState)
# Test
self.run_verify_test()
self.run_crosswise_vrf_test()
@@ -620,16 +631,17 @@ class TestIP6VrfMultiInst(VppTestCase):
self.reset_vrf_and_remove_from_vrf_list(self.vrf_list[0])
# Verify
for vrf_id in self.vrf_reset_list:
- self.assert_equal(self.verify_vrf(vrf_id),
- VRFState.reset, VRFState)
+ self.assert_equal(self.verify_vrf(vrf_id), VRFState.reset, VRFState)
vrf_list_length = len(self.vrf_list)
self.assertEqual(
- vrf_list_length, 0,
- "List of configured VRFs is not empty: %s != 0" % vrf_list_length)
+ vrf_list_length,
+ 0,
+ "List of configured VRFs is not empty: %s != 0" % vrf_list_length,
+ )
# Test
self.run_verify_test()
self.run_crosswise_vrf_test()
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)