aboutsummaryrefslogtreecommitdiffstats
path: root/test/test_srv6_ad_flow.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_srv6_ad_flow.py')
-rw-r--r--test/test_srv6_ad_flow.py295
1 files changed, 168 insertions, 127 deletions
diff --git a/test/test_srv6_ad_flow.py b/test/test_srv6_ad_flow.py
index f5452089a79..f776c71ac4b 100644
--- a/test/test_srv6_ad_flow.py
+++ b/test/test_srv6_ad_flow.py
@@ -1,37 +1,35 @@
#!/usr/bin/env python3
import unittest
-import binascii
-from socket import AF_INET6
-from framework import VppTestCase, VppTestRunner
+from framework import VppTestCase
+from asfframework import VppTestRunner
from vpp_ip import DpoProto
-from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable
+from vpp_ip_route import VppIpRoute, VppRoutePath
import scapy.compat
from scapy.packet import Raw
-from scapy.layers.l2 import Ether, Dot1Q
+from scapy.layers.l2 import Ether
from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
from scapy.layers.inet import IP, UDP
from util import ppp
-class TestSRv6(VppTestCase):
- """ SRv6 Flow-based Dynamic Proxy plugin Test Case """
+class TestSRv6AdFlow(VppTestCase):
+ """SRv6 Flow-based Dynamic Proxy plugin Test Case"""
@classmethod
def setUpClass(self):
- super(TestSRv6, self).setUpClass()
+ super(TestSRv6AdFlow, self).setUpClass()
@classmethod
def tearDownClass(cls):
- super(TestSRv6, cls).tearDownClass()
+ super(TestSRv6AdFlow, cls).tearDownClass()
def setUp(self):
- """ Perform test setup before each test case.
- """
- super(TestSRv6, self).setUp()
+ """Perform test setup before each test case."""
+ super(TestSRv6AdFlow, self).setUp()
# packet sizes, inclusive L2 overhead
self.pg_packet_sizes = [64, 512, 1518, 9018]
@@ -40,17 +38,15 @@ class TestSRv6(VppTestCase):
self.reset_packet_infos()
def tearDown(self):
- """ Clean up test setup after each test case.
- """
+ """Clean up test setup after each test case."""
self.teardown_interfaces()
- super(TestSRv6, self).tearDown()
+ super(TestSRv6AdFlow, self).tearDown()
- def configure_interface(self,
- interface,
- ipv6=False, ipv4=False,
- ipv6_table_id=0, ipv4_table_id=0):
- """ Configure interface.
+ def configure_interface(
+ self, interface, ipv6=False, ipv4=False, ipv6_table_id=0, ipv4_table_id=0
+ ):
+ """Configure interface.
:param ipv6: configure IPv6 on interface
:param ipv4: configure IPv4 on interface
:param ipv6_table_id: FIB table_id for IPv6
@@ -69,9 +65,8 @@ class TestSRv6(VppTestCase):
interface.resolve_arp()
interface.admin_up()
- def setup_interfaces(self, ipv6=[], ipv4=[],
- ipv6_table_id=[], ipv4_table_id=[]):
- """ Create and configure interfaces.
+ def setup_interfaces(self, ipv6=[], ipv4=[], ipv6_table_id=[], ipv4_table_id=[]):
+ """Create and configure interfaces.
:param ipv6: list of interface IPv6 capabilities
:param ipv4: list of interface IPv4 capabilities
@@ -106,9 +101,9 @@ class TestSRv6(VppTestCase):
# setup all interfaces
for i in range(count):
intf = self.pg_interfaces[i]
- self.configure_interface(intf,
- ipv6[i], ipv4[i],
- ipv6_table_id[i], ipv4_table_id[i])
+ self.configure_interface(
+ intf, ipv6[i], ipv4[i], ipv6_table_id[i], ipv4_table_id[i]
+ )
if any(ipv6):
self.logger.debug(self.vapi.cli("show ip6 neighbors"))
@@ -120,8 +115,7 @@ class TestSRv6(VppTestCase):
return self.pg_interfaces
def teardown_interfaces(self):
- """ Unconfigure and bring down interface.
- """
+ """Unconfigure and bring down interface."""
self.logger.debug("Tearing down interfaces")
# tear down all interfaces
# AFAIK they cannot be deleted
@@ -133,10 +127,9 @@ class TestSRv6(VppTestCase):
i.set_table_ip6(0)
def test_SRv6_End_AD_IPv6(self):
- """ Test SRv6 End.AD behavior with IPv6 traffic.
- """
- self.src_addr = 'a0::'
- self.sid_list = ['a1::', 'a2::a6', 'a3::']
+ """Test SRv6 End.AD behavior with IPv6 traffic."""
+ self.src_addr = "a0::"
+ self.sid_list = ["a1::", "a2::a6", "a3::"]
self.test_sid_index = 1
# send traffic to one destination interface
@@ -144,19 +137,32 @@ class TestSRv6(VppTestCase):
self.setup_interfaces(ipv6=[True, True])
# configure route to next segment
- route = VppIpRoute(self, self.sid_list[self.test_sid_index + 1], 128,
- [VppRoutePath(self.pg0.remote_ip6,
- self.pg0.sw_if_index,
- proto=DpoProto.DPO_PROTO_IP6)])
+ route = VppIpRoute(
+ self,
+ self.sid_list[self.test_sid_index + 1],
+ 128,
+ [
+ VppRoutePath(
+ self.pg0.remote_ip6,
+ self.pg0.sw_if_index,
+ proto=DpoProto.DPO_PROTO_IP6,
+ )
+ ],
+ )
route.add_vpp_config()
# configure SRv6 localSID behavior
- cli_str = "sr localsid address " + \
- self.sid_list[self.test_sid_index] + \
- " behavior end.ad.flow" + \
- " nh " + self.pg1.remote_ip6 + \
- " oif " + self.pg1.name + \
- " iif " + self.pg1.name
+ cli_str = (
+ "sr localsid address "
+ + self.sid_list[self.test_sid_index]
+ + " behavior end.ad.flow"
+ + " nh "
+ + self.pg1.remote_ip6
+ + " oif "
+ + self.pg1.name
+ + " iif "
+ + self.pg1.name
+ )
self.vapi.cli(cli_str)
# log the localsids
@@ -169,15 +175,18 @@ class TestSRv6(VppTestCase):
packet_header1 = self.create_packet_header_IPv6_SRH_IPv6(
srcaddr=self.src_addr,
sidlist=self.sid_list[::-1],
- segleft=len(self.sid_list) - self.test_sid_index - 1)
+ segleft=len(self.sid_list) - self.test_sid_index - 1,
+ )
# generate packets (pg0->pg1)
- pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
- self.pg_packet_sizes, count)
+ pkts1 = self.create_stream(
+ self.pg0, self.pg1, packet_header1, self.pg_packet_sizes, count
+ )
# send packets and verify received packets
- self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
- self.compare_rx_tx_packet_End_AD_IPv6_out)
+ self.send_and_verify_pkts(
+ self.pg0, pkts1, self.pg1, self.compare_rx_tx_packet_End_AD_IPv6_out
+ )
# log the localsid counters
self.logger.info(self.vapi.cli("show sr localsid"))
@@ -186,26 +195,27 @@ class TestSRv6(VppTestCase):
packet_header2 = self.create_packet_header_IPv6()
# generate returning packets (pg1->pg0)
- pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
- self.pg_packet_sizes, count)
+ pkts2 = self.create_stream(
+ self.pg1, self.pg0, packet_header2, self.pg_packet_sizes, count
+ )
# send packets and verify received packets
- self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
- self.compare_rx_tx_packet_End_AD_IPv6_in)
+ self.send_and_verify_pkts(
+ self.pg1, pkts2, self.pg0, self.compare_rx_tx_packet_End_AD_IPv6_in
+ )
# log the localsid counters
self.logger.info(self.vapi.cli("show sr localsid"))
# remove SRv6 localSIDs
- cli_str = "sr localsid del address " + \
- self.sid_list[self.test_sid_index]
+ cli_str = "sr localsid del address " + self.sid_list[self.test_sid_index]
self.vapi.cli(cli_str)
# cleanup interfaces
self.teardown_interfaces()
def compare_rx_tx_packet_End_AD_IPv6_out(self, tx_pkt, rx_pkt):
- """ Compare input and output packet after passing End.AD with IPv6
+ """Compare input and output packet after passing End.AD with IPv6
:param tx_pkt: transmitted packet
:param rx_pkt: received packet
@@ -230,7 +240,7 @@ class TestSRv6(VppTestCase):
self.logger.debug("packet verification: SUCCESS")
def compare_rx_tx_packet_End_AD_IPv6_in(self, tx_pkt, rx_pkt):
- """ Compare input and output packet after passing End.AD
+ """Compare input and output packet after passing End.AD
:param tx_pkt: transmitted packet
:param rx_pkt: received packet
@@ -251,8 +261,7 @@ class TestSRv6(VppTestCase):
# rx'ed seglist should be equal to SID-list in reversed order
self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
# segleft should be equal to previous segleft value minus 1
- self.assertEqual(rx_srh.segleft,
- len(self.sid_list) - self.test_sid_index - 2)
+ self.assertEqual(rx_srh.segleft, len(self.sid_list) - self.test_sid_index - 2)
# lastentry should be equal to the SID-list length minus 1
self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
@@ -267,10 +276,9 @@ class TestSRv6(VppTestCase):
self.logger.debug("packet verification: SUCCESS")
def test_SRv6_End_AD_IPv4(self):
- """ Test SRv6 End.AD behavior with IPv4 traffic.
- """
- self.src_addr = 'a0::'
- self.sid_list = ['a1::', 'a2::a4', 'a3::']
+ """Test SRv6 End.AD behavior with IPv4 traffic."""
+ self.src_addr = "a0::"
+ self.sid_list = ["a1::", "a2::a4", "a3::"]
self.test_sid_index = 1
# send traffic to one destination interface
@@ -278,19 +286,32 @@ class TestSRv6(VppTestCase):
self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
# configure route to next segment
- route = VppIpRoute(self, self.sid_list[self.test_sid_index + 1], 128,
- [VppRoutePath(self.pg0.remote_ip6,
- self.pg0.sw_if_index,
- proto=DpoProto.DPO_PROTO_IP6)])
+ route = VppIpRoute(
+ self,
+ self.sid_list[self.test_sid_index + 1],
+ 128,
+ [
+ VppRoutePath(
+ self.pg0.remote_ip6,
+ self.pg0.sw_if_index,
+ proto=DpoProto.DPO_PROTO_IP6,
+ )
+ ],
+ )
route.add_vpp_config()
# configure SRv6 localSID behavior
- cli_str = "sr localsid address " + \
- self.sid_list[self.test_sid_index] + \
- " behavior end.ad.flow" + \
- " nh " + self.pg1.remote_ip4 + \
- " oif " + self.pg1.name + \
- " iif " + self.pg1.name
+ cli_str = (
+ "sr localsid address "
+ + self.sid_list[self.test_sid_index]
+ + " behavior end.ad.flow"
+ + " nh "
+ + self.pg1.remote_ip4
+ + " oif "
+ + self.pg1.name
+ + " iif "
+ + self.pg1.name
+ )
self.vapi.cli(cli_str)
# log the localsids
@@ -303,15 +324,18 @@ class TestSRv6(VppTestCase):
packet_header1 = self.create_packet_header_IPv6_SRH_IPv4(
srcaddr=self.src_addr,
sidlist=self.sid_list[::-1],
- segleft=len(self.sid_list) - self.test_sid_index - 1)
+ segleft=len(self.sid_list) - self.test_sid_index - 1,
+ )
# generate packets (pg0->pg1)
- pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
- self.pg_packet_sizes, count)
+ pkts1 = self.create_stream(
+ self.pg0, self.pg1, packet_header1, self.pg_packet_sizes, count
+ )
# send packets and verify received packets
- self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
- self.compare_rx_tx_packet_End_AD_IPv4_out)
+ self.send_and_verify_pkts(
+ self.pg0, pkts1, self.pg1, self.compare_rx_tx_packet_End_AD_IPv4_out
+ )
# log the localsid counters
self.logger.info(self.vapi.cli("show sr localsid"))
@@ -320,26 +344,27 @@ class TestSRv6(VppTestCase):
packet_header2 = self.create_packet_header_IPv4()
# generate returning packets (pg1->pg0)
- pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
- self.pg_packet_sizes, count)
+ pkts2 = self.create_stream(
+ self.pg1, self.pg0, packet_header2, self.pg_packet_sizes, count
+ )
# send packets and verify received packets
- self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
- self.compare_rx_tx_packet_End_AD_IPv4_in)
+ self.send_and_verify_pkts(
+ self.pg1, pkts2, self.pg0, self.compare_rx_tx_packet_End_AD_IPv4_in
+ )
# log the localsid counters
self.logger.info(self.vapi.cli("show sr localsid"))
# remove SRv6 localSIDs
- cli_str = "sr localsid del address " + \
- self.sid_list[self.test_sid_index]
+ cli_str = "sr localsid del address " + self.sid_list[self.test_sid_index]
self.vapi.cli(cli_str)
# cleanup interfaces
self.teardown_interfaces()
def compare_rx_tx_packet_End_AD_IPv4_out(self, tx_pkt, rx_pkt):
- """ Compare input and output packet after passing End.AD with IPv4
+ """Compare input and output packet after passing End.AD with IPv4
:param tx_pkt: transmitted packet
:param rx_pkt: received packet
@@ -369,7 +394,7 @@ class TestSRv6(VppTestCase):
self.logger.debug("packet verification: SUCCESS")
def compare_rx_tx_packet_End_AD_IPv4_in(self, tx_pkt, rx_pkt):
- """ Compare input and output packet after passing End.AD
+ """Compare input and output packet after passing End.AD
:param tx_pkt: transmitted packet
:param rx_pkt: received packet
@@ -390,8 +415,7 @@ class TestSRv6(VppTestCase):
# rx'ed seglist should be equal to SID-list in reversed order
self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
# segleft should be equal to previous segleft value minus 1
- self.assertEqual(rx_srh.segleft,
- len(self.sid_list) - self.test_sid_index - 2)
+ self.assertEqual(rx_srh.segleft, len(self.sid_list) - self.test_sid_index - 2)
# lastentry should be equal to the SID-list length minus 1
self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
@@ -408,8 +432,7 @@ class TestSRv6(VppTestCase):
self.logger.debug("packet verification: SUCCESS")
- def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
- count):
+ def create_stream(self, src_if, dst_if, packet_header, packet_sizes, count):
"""Create SRv6 input packet stream for defined interface.
:param VppInterface src_if: Interface to create packet stream for
@@ -426,15 +449,17 @@ class TestSRv6(VppTestCase):
pkts = []
for i in range(0, count - 1):
payload_info = self.create_packet_info(src_if, dst_if)
- self.logger.debug(
- "Creating packet with index %d" % (payload_info.index))
+ self.logger.debug("Creating packet with index %d" % (payload_info.index))
payload = self.info_to_payload(payload_info)
# add L2 header if not yet provided in packet_header
- if packet_header.getlayer(0).name == 'Ethernet':
+ if packet_header.getlayer(0).name == "Ethernet":
p = packet_header / Raw(payload)
else:
- p = Ether(dst=src_if.local_mac, src=src_if.remote_mac) / \
- packet_header / Raw(payload)
+ p = (
+ Ether(dst=src_if.local_mac, src=src_if.remote_mac)
+ / packet_header
+ / Raw(payload)
+ )
size = packet_sizes[i % len(packet_sizes)]
self.logger.debug("Packet size %d" % (size))
self.extend_packet(p, size)
@@ -477,8 +502,9 @@ class TestSRv6(VppTestCase):
# verify captured packets
self.verify_captured_pkts(output, capture, compare_func)
- def create_packet_header_IPv6(self, saddr='1234::1', daddr='4321::1',
- sport=1234, dport=1234):
+ def create_packet_header_IPv6(
+ self, saddr="1234::1", daddr="4321::1", sport=1234, dport=1234
+ ):
"""Create packet header: IPv6 header, UDP header
:param dst: IPv6 destination address
@@ -491,9 +517,16 @@ class TestSRv6(VppTestCase):
p = IPv6(src=saddr, dst=daddr) / UDP(sport=sport, dport=dport)
return p
- def create_packet_header_IPv6_SRH_IPv6(self, srcaddr, sidlist, segleft,
- insrc='1234::1', indst='4321::1',
- sport=1234, dport=1234):
+ def create_packet_header_IPv6_SRH_IPv6(
+ self,
+ srcaddr,
+ sidlist,
+ segleft,
+ insrc="1234::1",
+ indst="4321::1",
+ sport=1234,
+ dport=1234,
+ ):
"""Create packet header: IPv6 encapsulated in SRv6:
IPv6 header with SRH, IPv6 header, UDP header
@@ -508,11 +541,12 @@ class TestSRv6(VppTestCase):
UDP source port and destination port are 1234
"""
- p = IPv6(src=srcaddr, dst=sidlist[segleft]) / \
- IPv6ExtHdrSegmentRouting(addresses=sidlist,
- segleft=segleft, nh=41) / \
- IPv6(src=insrc, dst=indst) / \
- UDP(sport=sport, dport=dport)
+ p = (
+ IPv6(src=srcaddr, dst=sidlist[segleft])
+ / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=41)
+ / IPv6(src=insrc, dst=indst)
+ / UDP(sport=sport, dport=dport)
+ )
return p
def create_packet_header_IPv4(self):
@@ -525,7 +559,7 @@ class TestSRv6(VppTestCase):
UDP source port and destination port are 1234
"""
- p = IP(src='123.1.1.1', dst='124.1.1.1') / UDP(sport=1234, dport=1234)
+ p = IP(src="123.1.1.1", dst="124.1.1.1") / UDP(sport=1234, dport=1234)
return p
def create_packet_header_IPv6_SRH_IPv4(self, srcaddr, sidlist, segleft):
@@ -543,16 +577,16 @@ class TestSRv6(VppTestCase):
UDP source port and destination port are 1234
"""
- p = IPv6(src=srcaddr, dst=sidlist[segleft]) / \
- IPv6ExtHdrSegmentRouting(addresses=sidlist,
- segleft=segleft, nh=4) / \
- IP(src='123.1.1.1', dst='124.1.1.1') / \
- UDP(sport=1234, dport=1234)
+ p = (
+ IPv6(src=srcaddr, dst=sidlist[segleft])
+ / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=4)
+ / IP(src="123.1.1.1", dst="124.1.1.1")
+ / UDP(sport=1234, dport=1234)
+ )
return p
def get_payload_info(self, packet):
- """ Extract the payload_info from the packet
- """
+ """Extract the payload_info from the packet"""
# in most cases, payload_info is in packet[Raw]
# but packet[Raw] gives the complete payload
# (incl L2 header) for the T.Encaps L2 case
@@ -564,7 +598,8 @@ class TestSRv6(VppTestCase):
# take packet[Raw], convert it to an Ether layer
# and then extract Raw from it
payload_info = self.payload_to_info(
- Ether(scapy.compat.raw(packet[Raw]))[Raw])
+ Ether(scapy.compat.raw(packet[Raw]))[Raw]
+ )
return payload_info
@@ -577,8 +612,10 @@ class TestSRv6(VppTestCase):
:param capture: captured packets
:param compare_func: function to compare in and out packet
"""
- self.logger.info("Verifying capture on interface %s using function %s"
- % (dst_if.name, compare_func.__name__))
+ self.logger.info(
+ "Verifying capture on interface %s using function %s"
+ % (dst_if.name, compare_func.__name__)
+ )
last_info = dict()
for i in self.pg_interfaces:
@@ -591,19 +628,19 @@ class TestSRv6(VppTestCase):
payload_info = self.get_payload_info(packet)
packet_index = payload_info.index
- self.logger.debug("Verifying packet with index %d"
- % (packet_index))
+ self.logger.debug("Verifying packet with index %d" % (packet_index))
# packet should have arrived on the expected interface
self.assertEqual(payload_info.dst, dst_sw_if_index)
self.logger.debug(
- "Got packet on interface %s: src=%u (idx=%u)" %
- (dst_if.name, payload_info.src, packet_index))
+ "Got packet on interface %s: src=%u (idx=%u)"
+ % (dst_if.name, payload_info.src, packet_index)
+ )
# search for payload_info with same src and dst if_index
# this will give us the transmitted packet
next_info = self.get_next_packet_info_for_interface2(
- payload_info.src, dst_sw_if_index,
- last_info[payload_info.src])
+ payload_info.src, dst_sw_if_index, last_info[payload_info.src]
+ )
last_info[payload_info.src] = next_info
# next_info should not be None
self.assertTrue(next_info is not None)
@@ -612,8 +649,9 @@ class TestSRv6(VppTestCase):
# data field of next_info contains the tx packet
txed_packet = next_info.data
- self.logger.debug(ppp("Transmitted packet:",
- txed_packet)) # ppp=Pretty Print Packet
+ self.logger.debug(
+ ppp("Transmitted packet:", txed_packet)
+ ) # ppp=Pretty Print Packet
self.logger.debug(ppp("Received packet:", packet))
@@ -627,11 +665,14 @@ class TestSRv6(VppTestCase):
# have all expected packets arrived?
for i in self.pg_interfaces:
remaining_packet = self.get_next_packet_info_for_interface2(
- i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
- self.assertTrue(remaining_packet is None,
- "Interface %s: Packet expected from interface %s "
- "didn't arrive" % (dst_if.name, i.name))
+ i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index]
+ )
+ self.assertTrue(
+ remaining_packet is None,
+ "Interface %s: Packet expected from interface %s "
+ "didn't arrive" % (dst_if.name, i.name),
+ )
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)